Elasticsearch(简称ES)实现日报表、月报表、年报表统计,没数据补0
前言:
这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法。
之前写过"java实现日报表、月报表统计,没数据补0"文章,https://blog.csdn.net/JRocks/article/details/113841913,方法是用sql语句查询数据库得到结果,然后通过java代码实现日期无数据自动补0,这种方法是非常简单方便。
但也有弊端,如有些数据展示用sql语句关联查询非常麻烦并且效率低下、或者说是sql语句实现不了、又或者sql+java代码实现也很麻烦、又或者是随着业务数据迅速增长,复杂的sql关联查询导致页面响应速度慢等等情况,所以考虑使用ES来做处理,一是可以提高查询效率,二是ES可以自动补0。
产品需求:
在这里插入图片描述
需求描述:
首先大家可以理解为这是1张订单表,表中有“购买周期、购买来源”字典字段;
这是1张年月日报表,同时筛选条件允许跨年、跨月、跨日,我们需要做的就是根据查询条件将数据从“合计、购买周期、购买来源”3个维度进行汇总,特别需要说明一点就是"人数"汇总字段,相同用户需要去重;
项目已经上线,所以既需要对历史数据进行推送ES处理,又需要对单笔订单购买进行数据推送ES处理;
思路步骤:很重要
kibana中新建ES数据索引;
新建批量添加ES索引数据的方法;
历史数据初始化推送ES对应索引中;
kibana中新建ES查询模板;
kibana中新建ES汇总查询模板
(注:也就是产品需求中表格下的最后一列合计相关字段内容,是显示当前查询条件下的所有汇总,不只是显示当前分页数据结果);对ES查询模板的数据结果进行汇总返回前端JSON数据;
单笔订单购买进行数据推送ES处理;
注明:这是微服务项目,文章中提到的是2个module,其中关于ES相关的代码单独是1个module,称为dg-search,在代码实战中我会列出。
代码实战:按照思路步骤逐步展开
一:kibana中新建ES数据索引;
首先得搭建好ES,搭建过程这里不展开,kibana界面如下
在这里插入图片描述
# 1、新建保证金购买统计表索引 PUT dg_financial_order_report{ "mappings": { "properties": { "orderTime":{ "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", "fields": { "keyword": { "type": "keyword", "ignore_above": 20 } } }, "totalType":{ "type": "integer" }, "deadline":{ "type": "integer" }, "orderType":{ "type": "integer" }, "qty":{ "type": "long" }, "orderMoney":{ "type": "float" }, "people":{ "type": "long" } } }}
二:新建批量添加ES索引数据的方法(dg-search项目)
// ------------------------------------------------------------------------------------------- // Feign接口package com.dg.mall.search.feign.feign;@FeignClient(name = "dg-search", path = "/dg-search/api/bzjFinanceReportEs")public interface BzjFinanceReportFeignService { /** * 批量新增保证金购买表数据 */ @PostMapping("/batchAddOrUpdateDepositOrder") void batchAddOrUpdateDepositOrderEsMapping(@RequestBody List<DepositOrderMappingReq> list); /** * 保证金购买表数据查询 * @param depositBucketsReq * @return */ @GetMapping("/listDepositOrder") List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq); /** * 保证金购买表--统计金额 * @param depositBucketsReq * @return */ @GetMapping("/getDepositOrderTotal") DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);}// ------------------------------------------------------------------------------------------- // Feign接口实现类@RestController@RequestMapping("/api/bzjFinanceReportEs")public class BzjFinanceReportFeignProvider implements BzjFinanceReportFeignService {@Autowiredprivate RestHighLevelClient restHighLevelClient; /** * 批量新增保证金购买表数据 */@Override public void batchAddOrUpdateDepositOrderEsMapping(List<DepositOrderMappingReq> list) { try { if (CollectionUtils.isEmpty(list)) { return; } BulkRequest bulkRequest = new BulkRequest(); IndexRequest request = null; for (DepositOrderMappingReq req : list) { request = new IndexRequest("POST"); request.index( "索引名"); // dg_financial_order_report request.id(req.getId()); request.source(JSON.toJSONString(req), XContentType.JSON); bulkRequest.add(request); } restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkListener); } catch (Exception e) { LogUtil.error("批量添加保证金存款表es数据失败! message:{} --", e, e.getMessage()); throw new ServiceException(SearchExceptionEnum.BATCH_ADD_BZJ_ORDER_ES_FAIL); } }}
实体类
/** * <p> * 保证金购买统计表--批量新增ES数据,请求实体 * <p> */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class DepositOrderMappingReq { /** * 存款ID */ private String id; /** * 购买日期 */ private String orderTime; /** * 合计 */ private Integer totalType = null; /** * 购买周期 */ private Integer deadline = null; /** * 购买来源 */ private Integer orderType = null; /** * 笔数 */ private Long qty = 0L; /** * 金额 */ private BigDecimal orderMoney = BigDecimal.ZERO; /** * 人数(已去重) */ private Long people = 0L;}/** * <p> * 保证金购买统计表--ES数据查询请求实体 * <p> */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class DepositBucketsReq { /** * 开始时间 */ private String gteTime; /** * 结束时间 */ private String lteTime; /** * 统计类型(年月日) */ private String interval; /** * 时间格式化 */ private String format;}
三:历史数据初始化推送ES索引
说明:这里采用任务调度的方式执行(没有启用),便于操作数据初始化,只是上线之后手动执行一次该定时任务。
在这里插入图片描述
package com.dg.mall.financial.jobhandle;/** * <p> * 推送Es数据定时任务 * <p> */@Servicepublic class SyncDepositOrderToEsHandle extends IJobHandler { @Resource private DepositOrderService orderService; @Resource private BzjFinanceReportFeignService bzjFinanceReportFeignService; @Resource private DepositRollReportFeignService depositRollReportFeignService; @Resource private FinancialProducer financialProducer; private static ThreadLocal<String> threadLocal = new ThreadLocal<>(); @XxlJob("syncDepositOrderToEsHandle") @Override public ReturnT<String> execute(String param) throws Exception { // 这里采用这种写法的目的是:执行任务调度后可以马上返回结果提示 threadLocal.set(param); Thread thread = new Thread(new SyncDepositOrderToEsHandle.SynExecute()); thread.start(); Thread expiredThread = new Thread(new SyncDepositOrderToEsHandle.SynExpiredExecute()); expiredThread.start(); return ReturnT.SUCCESS; } class SynExecute implements Runnable { @Override public void run() { //考虑数据量过大,进行分页推送数据,每次推送100条 int current = 1, size = 100; for (; ; ) { PageVO pageVO = new PageVO<>(current, size); pageVO.setSearchCount(false); // 查询所有订单数据,按合计、购买来源、购买周期进行区分,见下面的mapping.xml PageVO<DepositOrderMappingRes> res = orderService.getSyncPushOrderDataToEs(pageVO, threadLocal.get()); if (ObjectUtil.isEmpty(res.getRecords()) || res.getRecords().size() < 1) { break; } final List<DepositOrderMappingReq> orderMappingReqs = Lists.newArrayList(); for (DepositOrderMappingRes mappingRes : res.getRecords()) { DepositOrderMappingReq reportReq = orderService.batchAddOrderData(mappingRes); orderMappingReqs.add(reportReq); } if (CollectionUtils.isNotEmpty(orderMappingReqs)) { // 调用dg-search批量新增的ES的方法 bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(orderMappingReqs); } current++; } } }}
orderService.getSyncPushOrderDataToEs() 方法的mapping.xml
<select id="getSyncPushOrderDataToEs" resultType="com.dg.mall.financial.vo.res.deposit.DepositOrderMappingRes"> -- 1、合计 SELECT o.id AS id, DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime, 0 AS totalType, NULL AS deadline, NULL AS orderType, count( * ) AS qty, sum( o.order_money ) AS orderMoney, o.user_id AS userId FROM dg_deposit_order o JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id WHERE <![CDATA[ o.order_status < 4 ]]> <if test="param != null and param !=''"> <![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]> </if> GROUP BY orderTime, userId UNION ALL -- 2、购买周期 SELECT o.id AS id, DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime, NULL AS totalType, dos.deadline AS deadline, NULL AS orderType, count( * ) AS qty, sum( o.order_money ) AS orderMoney, o.user_id AS userId FROM dg_deposit_order o JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id WHERE <![CDATA[ o.order_status < 4 ]]> <if test="param != null and param !=''"> <![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]> </if> GROUP BY orderTime, userId, deadline UNION ALL -- 3、购买来源 SELECT o.id AS id, DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime, NULL AS totalType, NULL AS deadline, o.order_type AS orderType, count( * ) AS qty, sum( o.order_money ) AS orderMoney, o.user_id AS userId FROM dg_deposit_order o JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id WHERE <![CDATA[ o.order_status < 4 ]]> <if test="param != null and param !=''"> <![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]> </if> GROUP BY orderTime, userId, orderType </select>
四:kibana中新建ES查询模板
有小伙伴会问:为什么使用查询模板,它有什么好处呢?
首先:我认为更好上手,可以提升你的成功自信心,“模板”从字面上讲就是给人一种套用,更简单这种感觉;
第二:更直观、可以减少java代码量,让你更专心关注ES的聚合写法,如果你手动用java代码写过ES的复杂聚合,你会发现使用模板确实更方便好用;
# 保证金购买模板查询 GET /_search/template{ "id":"search_order_collect_template", "params":{ "gteTime":"2021-02-02", "lteTime":"2021-02-02", "interval":"day", "format":"yyyy-MM-dd" }}# 保证金购买模板 POST _scripts/search_order_collect_template{ "script": { "lang": "mustache", "source": { "size": 0, "query": { "range": { "orderTime": { "gte": "{{gteTime}}", "lte": "{{lteTime}}", "format": "yyyy-MM-dd" } } }, "aggs": { "group_date_histogram_data": { "date_histogram": { "field": "orderTime", "calendar_interval": "{{interval}}", "format": "{{format}}" }, "aggs": { "group_totalType_data": { "terms": { "field": "totalType" }, "aggs": { "qty": { "sum": { "field": "qty" } }, "orderMoney": { "sum": { "field": "orderMoney" } }, "people": { "sum": { "field": "people" } } } }, "group_orderType": { "terms": { "field": "orderType" }, "aggs": { "qty": { "sum": { "field": "qty" } }, "orderMoney": { "sum": { "field": "orderMoney" } }, "people": { "sum": { "field": "people" } } } }, "group_deadline": { "terms": { "field": "deadline" }, "aggs": { "qty": { "sum": { "field": "qty" } }, "orderMoney": { "sum": { "field": "orderMoney" } }, "people": { "sum": { "field": "people" } } } } } } } } }}
模板查询显示效果
在这里插入图片描述
java代码(dg-search项目)
//feign接口 /** * 保证金购买表数据查询 * @param depositBucketsReq * @return */ @GetMapping("/listDepositOrder") List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq); // 实现类 @Override public List<DepositOrderBucketsRes> listDepositOrder(DepositBucketsReq depositBucketsReq) { final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq); String templateName = elasticsearchConfig.bzjConfig.getSearch_order_collect_template(); LogUtil.info("模板名称:{} 请求参数为:{}", templateName, params.toString()); SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName, params, restHighLevelClient); Aggregations aggregations = searchResponse.getAggregations(); // 和统计模板的不同之处 ParsedDateHistogram dateHistogram = aggregations.get("group_date_histogram_data"); if (dateHistogram.getBuckets().size() < 1) { return null; } return getDepositOrderCollect(dateHistogram.getBuckets()); } private List<DepositOrderBucketsRes> getDepositOrderCollect(List<? extends Histogram.Bucket> buckets){ List<DepositOrderBucketsRes> res = Lists.newLinkedList(); buckets.stream().forEach(dateBucket -> { String dateString = dateBucket.getKeyAsString(); DepositOrderBucketsRes build = DepositOrderBucketsRes.builder() .orderTime(dateString) .build(); Aggregations aggregations = dateBucket.getAggregations(); // 获取合计聚合数据 getOrderSumAggsData(aggregations, build); // 获取购买周期数据 getOrderDeadlineAggsData(build, aggregations); // 获取购买来源数据 getOrderSourceAggsData(aggregations, build); res.add(build); }); return res; } /** * 获取合计聚合数据 * @param aggregations * @param build */ public void getOrderSumAggsData(Aggregations aggregations, DepositOrderBucketsRes build) { ParsedTerms totalTypeData = aggregations.get("group_totalType_data"); List<? extends Terms.Bucket> totalTypeDataBuckets = totalTypeData.getBuckets(); totalTypeDataBuckets.stream().forEach(bucket -> { Aggregations agg = bucket.getAggregations(); ParsedSum sumQty = agg.get("qty"); ParsedSum sumMoney = agg.get("orderMoney"); ParsedSum sumPeople = agg.get("people"); build.setSumQty(new BigDecimal(sumQty.getValueAsString()).intValue()); build.setSumMoney(NumberUtil.round(new BigDecimal(sumMoney.getValueAsString()), BigDecimal.ROUND_CEILING).toString()); build.setSumPeople(new BigDecimal(sumPeople.getValueAsString()).intValue()); }); } /** * 获取购买周期数据 * @param build * @param aggregations */ private void getOrderDeadlineAggsData(DepositOrderBucketsRes build, Aggregations aggregations) { ParsedTerms deadlineData = aggregations.get("group_deadline"); List<? extends Terms.Bucket> deadlineDataBuckets = deadlineData.getBuckets(); deadlineDataBuckets.stream().forEach(bucket -> { Integer deadline = new BigDecimal(bucket.getKey().toString()).intValue(); Aggregations agg = bucket.getAggregations(); ParsedSum qty = agg.get("qty"); ParsedSum money = agg.get("orderMoney"); ParsedSum people = agg.get("people"); DepositOrderDeadlineEnum deadlineEnum = DepositOrderDeadlineEnum.getDeadlineEnum(deadline); if (ObjectUtil.isEmpty(deadlineEnum)) { return; } switch (deadlineEnum) { case THIRTY_DAYS: build.setThirtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue()); build.setThirtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString()); build.setThirtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue()); break; case NINETY_DAYS: build.setNinetyDaysQty(new BigDecimal(qty.getValueAsString()).intValue()); build.setNinetyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString()); build.setNinetyDaysPeople(new BigDecimal(people.getValueAsString()).intValue()); break; case THREE_HUNDRED_SIXTY_DAYS: build.setThreeHundredSixtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue()); build.setThreeHundredSixtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString()); build.setThreeHundredSixtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue()); break; default: break; } }); } /** * 获取购买来源数据 * @param aggregations * @param build */ public void getOrderSourceAggsData(Aggregations aggregations, DepositOrderBucketsRes build) { ParsedTerms sourceData = aggregations.get("group_orderType"); List<? extends Terms.Bucket> sourceDataBuckets = sourceData.getBuckets(); sourceDataBuckets.stream().forEach(bucket -> { Integer orderType = new BigDecimal(bucket.getKey().toString()).intValue(); Aggregations agg = bucket.getAggregations(); ParsedSum qty = agg.get("qty"); ParsedSum money = agg.get("orderMoney"); ParsedSum people = agg.get("people"); DepositOrderSourceEnum orderSourceEnum = DepositOrderSourceEnum.getOrderSourceEnum(orderType); if (ObjectUtil.isEmpty(orderSourceEnum)) { return; } switch (orderSourceEnum) { case DEPOSIT: build.setDepositQty(new BigDecimal(qty.getValueAsString()).intValue()); build.setDepositMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString()); break; // 这几种来源,统称为商店入驻 case CHANNEL_STORE: case FIRM: case CONSIGNMENT_STORE: case IDLE_STORE: BigDecimal newQty = new BigDecimal(qty.getValueAsString()); BigDecimal oldQty = new BigDecimal(build.getStoreQty()); BigDecimal newMoney = new BigDecimal(money.getValueAsString()); BigDecimal oldMoney = new BigDecimal(build.getStoreMoney()); build.setStoreQty((oldQty.add(newQty)).intValue()); build.setStoreMoney(NumberUtil.round(oldMoney.add(newMoney), BigDecimal.ROUND_CEILING).toString()); break; default: break; } }); }
五:kibana中新建ES汇总查询模板
# 保证金存款表统计模板查询 GET /_search/template{ "id":"search_order_total_template", "params":{ "gteTime":"2008-01-01", "lteTime":"2021-12-31", "interval":"day", "format":"yyyy-MM-dd" }}# 保证金存款表统计模板 POST _scripts/search_order_total_template{ "script": { "lang": "mustache", "source": { "size": 0, "query": { "range": { "orderTime": { "gte": "{{gteTime}}", "lte": "{{lteTime}}", "format": "yyyy-MM-dd" } } }, "aggs": { "group_totalType_data": { "terms": { "field": "totalType" }, "aggs": { "qty": { "sum": { "field": "qty" } }, "orderMoney": { "sum": { "field": "orderMoney" } }, "people": { "sum": { "field": "people" } } } }, "group_orderType": { "terms": { "field": "orderType" }, "aggs": { "qty": { "sum": { "field": "qty" } }, "orderMoney": { "sum": { "field": "orderMoney" } }, "people": { "sum": { "field": "people" } } } }, "group_deadline": { "terms": { "field": "deadline" }, "aggs": { "qty": { "sum": { "field": "qty" } }, "orderMoney": { "sum": { "field": "orderMoney" } }, "people": { "sum": { "field": "people" } } } } } } }}
模板查询显示效果
在这里插入图片描述
java代码(dg-search项目)
// feign接口/** * 保证金购买表--统计金额 * @param depositBucketsReq * @return */ @GetMapping("/getDepositOrderTotal") DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq); // 实现类 @Override public DepositOrderBucketsRes getDepositOrderTotal(DepositBucketsReq depositBucketsReq) { final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq); String templateName = elasticsearchConfig.bzjConfig.getSearch_order_total_template(); LogUtil.info("模板名称:{} 请求参数为:{}", templateName, params.toString()); SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName, params, restHighLevelClient); Aggregations aggregations = searchResponse.getAggregations(); DepositOrderBucketsRes build = DepositOrderBucketsRes.builder().build(); // 获取合计聚合数据(代码同上) getOrderSumAggsData(aggregations, build); // 获取购买周期数据(代码同上) getOrderDeadlineAggsData(build, aggregations); // 获取购买来源数据(代码同上) getOrderSourceAggsData(aggregations, build); return build; }
六:对ES查询模板的数据结果进行汇总返回前端JSON数据
Controller:
/** * 保证金购买统计表--查询 */ @GetMapping("/order/list") public ResponseData listPageDepositOrderByReq(@Valid DepositCollectReq depositCollectReq){ return ResponseData.success(financeReportService.listPageDepositOrderByReq(depositCollectReq)); } /** * 保证金购买统计表--统计金额 */ @GetMapping("/order/total") public ResponseData getDepositOrderTotal(@Valid DepositCollectReq depositCollectReq){ return ResponseData.success(financeReportService.getDepositOrderTotal(depositCollectReq)); } /** * 保证金购买统计表--导出excel */ @GetMapping("/order/export") public ResponseData getDepositOrderExportFile(@Valid DepositCollectReq depositCollectReq){ return ResponseData.success(financeReportService.getDepositOrderExportFile(depositCollectReq)); }
前端请求实体:
package com.dg.mall.financial.vo.req.report.collect;import com.dg.mall.core.page.PageVO;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import javax.validation.constraints.NotBlank;import javax.validation.constraints.NotNull;/** * <p> * 请求实体 * <p> */@Data@AllArgsConstructor@NoArgsConstructorpublic class DepositCollectReq extends PageVO { /** * 报表类型(0:日,1:月,2:年) */ @NotNull(message = "报表类型不能为空!") private Integer reportType; /** * 开始时间 */ @NotBlank(message = "开始日期不能为空!") private String gteTime; /** * 结束时间 */ @NotBlank(message = "结束日期不能为空!") private String lteTime;}
Service:
/** * 保证金购买统计表--分页查询 * @param depositCollectReq * @return */ PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq); /** * 保证金购买统计表--导出excel * @param depositCollectReq * @return */ String getDepositOrderExportFile(DepositCollectReq depositCollectReq); /** * 保证金购买统计表--统计金额 * @param depositCollectReq * @return */ DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq);
ServiceImpl:
// 查询 @Override public PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq) { PageVO<DepositOrderBucketsRes> pageVO = new PageVO<>(); // 查询条件转换成ES的请求实体类并添加默认参数 DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq); List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq); if (CollectionUtils.isEmpty(resList)){ return pageVO; } List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList()); final PageUtils pageUtils = new PageUtils(Integer.valueOf(depositCollectReq.getCurrent() + ""), Integer.valueOf(depositCollectReq.getSize() + ""), collect); pageVO.setRecords(pageUtils.getCurrentList()); pageVO.setTotal(pageUtils.getAllList().size()); pageVO.setSize(depositCollectReq.getSize()); pageVO.setCurrent(depositCollectReq.getCurrent()); return pageVO; } //统计 @Override public DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq) { DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq); return bzjFinanceReportFeignService.getDepositOrderTotal(depositBucketsReq); } // 导出excel(easyPoi) @Override public String getDepositOrderExportFile(DepositCollectReq depositCollectReq) { DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq); List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq); if (CollectionUtils.isEmpty(resList)){ return null; } List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList()); final Map<String, Object> map = Maps.newHashMap(); TemplateExportParams params = new TemplateExportParams("template/excel/导出保证金购买报表模板.xlsx"); map.put("mapList", collect); Workbook workbook = ExcelExportUtil.exportExcel(params, map); return uploadUtils.uploadFile(workbook, "保证金购买报表"); } //查询条件转换成ES的请求实体类并添加默认参数 @Override public DepositBucketsReq getDepositBucketsReq(DepositCollectReq depositCollectReq) { String gteTime = depositCollectReq.getGteTime(); String lteTime = depositCollectReq.getLteTime(); Integer type = depositCollectReq.getReportType(); ReportDateEnum reportDateEnum = ReportDateEnum.getReportDateEnum(type); switch (reportDateEnum) { case MONTH_FORMAT: { gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr(); lteTime = DateUtil.endOfMonth(TimeUtils.getDateTime(lteTime, type)).toDateStr(); break; } case YEAR_FORMAT: { gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr(); lteTime = DateUtil.endOfYear(TimeUtils.getDateTime(lteTime, type)).toDateStr(); break; } default: break; } return DepositBucketsReq.builder() .gteTime(gteTime) .lteTime(lteTime) .format(reportDateEnum.getFormat()) .interval(reportDateEnum.getInterval()) .build(); }
逻辑分页:因为ES查询模板我们没有实现使用ES的from、size这种分页语法(可能可以,但我们没有实现),所以封装了一个逻辑分页类
package com.dg.mall.financial.utils;import com.google.common.collect.Lists;import lombok.Data;import java.io.Serializable;import java.util.List;/** * <p> * 逻辑分页工具 * <p> */@Datapublic class PageUtils implements Serializable { /** * 当前页 */ private Integer current; /** * 每页数据 */ private Integer size; /** * 当前页数据 */ private List currentList; /** * 所有数据 */ private List allList; public PageUtils(Integer current, Integer size, List allList) { final List arrayList = Lists.newArrayList(); this.current = current; this.size = size; this.allList = allList; if (allList.size() <= size) { this.currentList = allList; return; } int start = (current - 1) * size; for (int i = 0; i < size; i++) { if (start + i >= allList.size()) { break; } arrayList.add(allList.get(start + i)); } this.currentList = arrayList; }}
七:单笔订单购买进行数据推送ES处理
注意2点:
记得做人数统计的去重,这里使用的是redis;
req.setId(),这个Id记得做到唯一,因为这个id会存入ES索引数据中,而ES索引id相同,会做数据覆盖;
注:我这里单条数据推送使用了消息队列,大家可以参照、也可以直接调用批量插入ES的方法;
/** * 购买表数据推送es(区分购买周期和购买来源) * * @param depositOrder */ public void sendOrderToEs(DepositOrder depositOrder) { // 合计 日期_类型_userid addOrderByTotalTypeData(depositOrder); // 购买周期 if (addOrderByDeadlineData(depositOrder)) return; // 购买来源 addOrderBySourceData(depositOrder); } /** * 添加订单数据,合计、并对人数进行区分 * * @param depositOrder */ private void addOrderByTotalTypeData(DepositOrder depositOrder) { DepositOrderMappingReq req = DepositOrderMappingReq.builder().build(); String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"); // 人数去重 String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(SUM_TYPE.getType()).concat("_").concat(depositOrder.getUserId()); if (!redisTemplate.hasKey(key)) { req.setPeople(1L); redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS); } req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(SUM_TYPE.getType())); req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd")); req.setTotalType(SUM_TYPE.getCode()); req.setQty(1L); req.setOrderMoney(depositOrder.getOrderMoney()); ReportReq reportReq = ReportReq.builder() .shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ) .data(JSON.toJSONString(req)).build(); financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq)); } /** * 添加订单数据,根据购买周期区分 * * @param depositOrder * @return */ private boolean addOrderByDeadlineData(DepositOrder depositOrder) { DepositOrderMappingReq req = DepositOrderMappingReq.builder().build(); String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"); DepositOrderSubsidy orderSubsidy = depositOrderSubsidyService.getOne(new LambdaQueryWrapper<DepositOrderSubsidy>() .eq(DepositOrderSubsidy::getOrderId, depositOrder.getOrderId())); if (ObjectUtil.isEmpty(orderSubsidy)) { return true; } // 人数去重 String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(DEADLINE_TYPE.getType()).concat("_").concat(String.valueOf(orderSubsidy.getDeadline())).concat("_").concat(depositOrder.getUserId()); if (!redisTemplate.hasKey(key)) { req.setPeople(1L); redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS); } req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(DEADLINE_TYPE.getType())); req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd")); req.setDeadline(orderSubsidy.getDeadline()); req.setQty(1L); req.setOrderMoney(depositOrder.getOrderMoney()); ReportReq reportReq = ReportReq.builder() .shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ) .data(JSON.toJSONString(req)).build(); financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq)); return false; } /** * 添加订单数据,根据购买来源区分 * * @param depositOrder */ private void addOrderBySourceData(DepositOrder depositOrder) { DepositOrderMappingReq req = DepositOrderMappingReq.builder() .id(String.valueOf(depositOrder.getId()).concat("_").concat(ORDER_SOURCE_TYPE.getType())) .orderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd")) .orderType(depositOrder.getOrderType()) .qty(1L) .orderMoney(depositOrder.getOrderMoney()) .build(); ReportReq reportReq = ReportReq.builder() .shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ) .data(JSON.toJSONString(req)).build(); financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq)); }/** * 发送消息队列 * * @param exchange * @param routingKey * @param content */ public void sendMessage(String exchange, String routingKey, Object content) { rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); CorrelationData correlationData = new CorrelationData(String.valueOf(IdWorker.getId())); rabbitTemplate.convertAndSend(exchange, routingKey, getMessage(content), message -> { message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, correlationData); LogUtil.info("消息队列发送完成,消息为:" + content); }
消息队列--消费者:单条数据推送ES
package com.dg.mall.financial.rabbitmq.consumer;/** * <p> * 财务报表--消费 * <p> */@Servicepublic class DepositFinancelReportConsumer { @Autowired private BzjFinanceReportFeignService bzjFinanceReportFeignService; @Resource private DepositRenewalReportFeignService depositRenewalReportFeignService; @Resource private DepositRollReportFeignService depositRollReportFeignService; @Resource private DepositAwardEsFeignService depositAwardEsFeignService; @Resource private DepositFinanceReportService financeReportService; @Resource private SubsidyAwardService subsidyAwardService; @Resource private RedisTemplate redisTemplate; @RabbitListener(queues = "deposit-financel-report-queue") @RabbitHandler @Transactional(rollbackFor = Exception.class) public void financelReportQueue(Message msg, Channel channel) throws Exception { long deliveryTag = msg.getMessageProperties().getDeliveryTag(); String resMsg = new String(msg.getBody(), "utf-8"); try { // 根据报表类型区分 ReportReq req = JSON.parseObject(resMsg, new TypeReference<ReportReq>() {}); String shortName = req.getShortName(); switch (shortName) { case ReportTypeConstants.FINANCIEL_BZJGMTJ: // 批量添加--保证金购买统计表es数据 batchAddOrUpdateDepositOrderEsMapping(channel, deliveryTag, req); break; // …… default: throw new IllegalStateException("Unexpected value: " + shortName); } } catch (Exception e) { channel.basicNack(deliveryTag, false, true); throw new RuntimeException(); } channel.basicAck(deliveryTag, true); } /** * 批量新增保证金存款数据(按"购买周期、购买来源"区分) * * @param channel * @param deliveryTag * @param req * @throws IOException */ public void batchAddOrUpdateDepositOrderEsMapping(Channel channel, long deliveryTag, ReportReq req) throws IOException { DepositOrderMappingReq orderMappingReq = JSON.parseObject(req.getData(), new TypeReference<DepositOrderMappingReq>() { }); if (ObjectUtil.isEmpty(orderMappingReq.getOrderTime())) { channel.basicAck(deliveryTag, true); return; } ArrayList<DepositOrderMappingReq> list = new ArrayList<>(); list.add(orderMappingReq); bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(list); return; }}
总结:
至此为止,功能就算全部完成了,对于我来说,重点就在于如何设计ES索引数据结构和查询模板的聚合使用。
作者:Jrocks
链接:https://www.jianshu.com/p/2944501da071
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。