阅读 118

Elasticsearch(简称ES)实现日报表、月报表、年报表统计,没数据补0

前言:

这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法。

之前写过"java实现日报表、月报表统计,没数据补0"文章,https://blog.csdn.net/JRocks/article/details/113841913,方法是用sql语句查询数据库得到结果,然后通过java代码实现日期无数据自动补0,这种方法是非常简单方便。

但也有弊端,如有些数据展示用sql语句关联查询非常麻烦并且效率低下、或者说是sql语句实现不了、又或者sql+java代码实现也很麻烦、又或者是随着业务数据迅速增长,复杂的sql关联查询导致页面响应速度慢等等情况,所以考虑使用ES来做处理,一是可以提高查询效率,二是ES可以自动补0。

产品需求:

在这里插入图片描述

需求描述:

  • 首先大家可以理解为这是1张订单表,表中有“购买周期、购买来源”字典字段;
  • 这是1张年月日报表,同时筛选条件允许跨年、跨月、跨日,我们需要做的就是根据查询条件将数据从“合计、购买周期、购买来源”3个维度进行汇总,特别需要说明一点就是"人数"汇总字段,相同用户需要去重
  • 项目已经上线,所以既需要对历史数据进行推送ES处理,又需要对单笔订单购买进行数据推送ES处理;

思路步骤:很重要

  1. kibana中新建ES数据索引;
  2. 新建批量添加ES索引数据的方法;
  3. 历史数据初始化推送ES对应索引中;
  4. kibana中新建ES查询模板;
  5. kibana中新建ES汇总查询模板
    (注:也就是产品需求中表格下的最后一列合计相关字段内容,是显示当前查询条件下的所有汇总,不只是显示当前分页数据结果);
  6. 对ES查询模板的数据结果进行汇总返回前端JSON数据;
  7. 单笔订单购买进行数据推送ES处理;

注明:这是微服务项目,文章中提到的是2个module,其中关于ES相关的代码单独是1个module,称为dg-search,在代码实战中我会列出。

代码实战:按照思路步骤逐步展开

一:kibana中新建ES数据索引;
首先得搭建好ES,搭建过程这里不展开,kibana界面如下

在这里插入图片描述

# 1、新建保证金购买统计表索引
PUT dg_financial_order_report
{
 "mappings": {
   "properties": {
     "orderTime":{
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 20
         }
       }
     },
     "totalType":{
       "type": "integer"
     },
     "deadline":{
       "type": "integer"
     },
     "orderType":{
       "type": "integer"
     },
     "qty":{
       "type": "long"
     },
     "orderMoney":{
       "type": "float"
     },
     "people":{
       "type": "long"
     }
   }
 }
}

二:新建批量添加ES索引数据的方法(dg-search项目)

// ------------------------------------------------------------------------------------------- 
// Feign接口
package com.dg.mall.search.feign.feign;

@FeignClient(name = "dg-search", path = "/dg-search/api/bzjFinanceReportEs")
public interface BzjFinanceReportFeignService {
    /**
     * 批量新增保证金购买表数据
     */
    @PostMapping("/batchAddOrUpdateDepositOrder")
    void batchAddOrUpdateDepositOrderEsMapping(@RequestBody List<DepositOrderMappingReq> list);

    /**
     * 保证金购买表数据查询
     * @param depositBucketsReq
     * @return
     */
    @GetMapping("/listDepositOrder")
    List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);

    /**
     * 保证金购买表--统计金额
     * @param depositBucketsReq
     * @return
     */
    @GetMapping("/getDepositOrderTotal")
    DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);
}

// ------------------------------------------------------------------------------------------- 
// Feign接口实现类
@RestController
@RequestMapping("/api/bzjFinanceReportEs")
public class BzjFinanceReportFeignProvider implements BzjFinanceReportFeignService {

@Autowired
private RestHighLevelClient restHighLevelClient;
    
/**
* 批量新增保证金购买表数据
*/
@Override
    public void batchAddOrUpdateDepositOrderEsMapping(List<DepositOrderMappingReq> list) {
        try {
            if (CollectionUtils.isEmpty(list)) {
                return;
            }

            BulkRequest bulkRequest = new BulkRequest();
            IndexRequest request = null;
            for (DepositOrderMappingReq req : list) {
                request = new IndexRequest("POST");
                request.index( "索引名"); // dg_financial_order_report
                request.id(req.getId());
                request.source(JSON.toJSONString(req), XContentType.JSON);
                bulkRequest.add(request);
            }
            restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkListener);
        } catch (Exception e) {
            LogUtil.error("批量添加保证金存款表es数据失败! message:{} --", e, e.getMessage());
            throw new ServiceException(SearchExceptionEnum.BATCH_ADD_BZJ_ORDER_ES_FAIL);
        }
    }
}

实体类

/**
 * <p>
 *  保证金购买统计表--批量新增ES数据,请求实体
 * <p>
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DepositOrderMappingReq {
    /**
     * 存款ID
     */
    private String id;

    /**
     * 购买日期
     */
    private String orderTime;

    /**
     * 合计
     */
    private Integer totalType = null;

    /**
     * 购买周期
     */
    private Integer deadline = null;

    /**
     * 购买来源
     */
    private Integer orderType = null;

    /**
     * 笔数
     */
    private Long qty = 0L;

    /**
     * 金额
     */
    private BigDecimal orderMoney = BigDecimal.ZERO;

    /**
     * 人数(已去重)
     */
    private Long people = 0L;
}


/**
 * <p>
 * 保证金购买统计表--ES数据查询请求实体
 * <p>
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DepositBucketsReq {

    /**
     * 开始时间
     */
    private String gteTime;

    /**
     * 结束时间
     */
    private String lteTime;

    /**
     * 统计类型(年月日)
     */
    private String interval;

    /**
     * 时间格式化
     */
    private String format;
}

三:历史数据初始化推送ES索引
说明:这里采用任务调度的方式执行(没有启用),便于操作数据初始化,只是上线之后手动执行一次该定时任务。

在这里插入图片描述

package com.dg.mall.financial.jobhandle;

/**
 * <p>
 * 推送Es数据定时任务
 * <p>
 */
@Service
public class SyncDepositOrderToEsHandle extends IJobHandler {
    @Resource
    private DepositOrderService orderService;

    @Resource
    private BzjFinanceReportFeignService bzjFinanceReportFeignService;

    @Resource
    private DepositRollReportFeignService depositRollReportFeignService;

    @Resource
    private FinancialProducer financialProducer;

    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    @XxlJob("syncDepositOrderToEsHandle")
    @Override
    public ReturnT<String> execute(String param) throws Exception {

        // 这里采用这种写法的目的是:执行任务调度后可以马上返回结果提示
        threadLocal.set(param);

        Thread thread = new Thread(new SyncDepositOrderToEsHandle.SynExecute());
        thread.start();

        Thread expiredThread = new Thread(new SyncDepositOrderToEsHandle.SynExpiredExecute());
        expiredThread.start();

        return ReturnT.SUCCESS;
    }

    class SynExecute implements Runnable {
        @Override
        public void run() {
            //考虑数据量过大,进行分页推送数据,每次推送100条
            int current = 1, size = 100;
            for (; ; ) {
                PageVO pageVO = new PageVO<>(current, size);
                pageVO.setSearchCount(false);
                // 查询所有订单数据,按合计、购买来源、购买周期进行区分,见下面的mapping.xml
                PageVO<DepositOrderMappingRes> res = orderService.getSyncPushOrderDataToEs(pageVO, threadLocal.get());
                if (ObjectUtil.isEmpty(res.getRecords()) || res.getRecords().size() < 1) {
                    break;
                }

                final List<DepositOrderMappingReq> orderMappingReqs = Lists.newArrayList();
                for (DepositOrderMappingRes mappingRes : res.getRecords()) {
                    DepositOrderMappingReq reportReq = orderService.batchAddOrderData(mappingRes);
                    orderMappingReqs.add(reportReq);
                }

                if (CollectionUtils.isNotEmpty(orderMappingReqs)) {
                    // 调用dg-search批量新增的ES的方法
                    bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(orderMappingReqs);
                }
                current++;
            }
        }
    }
}

orderService.getSyncPushOrderDataToEs() 方法的mapping.xml

<select id="getSyncPushOrderDataToEs"
            resultType="com.dg.mall.financial.vo.res.deposit.DepositOrderMappingRes">
        -- 1、合计
        SELECT
        o.id AS id,
        DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,
        0 AS totalType,
        NULL AS deadline,
        NULL AS orderType,
        count( * ) AS qty,
        sum( o.order_money ) AS orderMoney,
        o.user_id AS userId
        FROM
        dg_deposit_order o
        JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id
        WHERE
        <![CDATA[ o.order_status < 4 ]]>
        <if test="param != null and param !=''">
            <![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]>
        </if>

        GROUP BY
        orderTime,
        userId

        UNION ALL

        -- 2、购买周期
        SELECT
        o.id AS id,
        DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,
        NULL AS totalType,
        dos.deadline AS deadline,
        NULL AS orderType,
        count( * ) AS qty,
        sum( o.order_money ) AS orderMoney,
        o.user_id AS userId
        FROM
        dg_deposit_order o
        JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id
        WHERE
        <![CDATA[ o.order_status < 4 ]]>
        <if test="param != null and param !=''">
            <![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]>
        </if>

        GROUP BY
        orderTime,
        userId,
        deadline

        UNION ALL

        -- 3、购买来源
        SELECT
        o.id AS id,
        DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,
        NULL AS totalType,
        NULL AS deadline,
        o.order_type AS orderType,
        count( * ) AS qty,
        sum( o.order_money ) AS orderMoney,
        o.user_id AS userId
        FROM
        dg_deposit_order o
        JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id
        WHERE
        <![CDATA[ o.order_status < 4 ]]>
        <if test="param != null and param !=''">
            <![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]>
        </if>

        GROUP BY
        orderTime,
        userId,
        orderType
    </select>

四:kibana中新建ES查询模板
有小伙伴会问:为什么使用查询模板,它有什么好处呢?

  • 首先:我认为更好上手,可以提升你的成功自信心,“模板”从字面上讲就是给人一种套用,更简单这种感觉;
  • 第二:更直观、可以减少java代码量,让你更专心关注ES的聚合写法,如果你手动用java代码写过ES的复杂聚合,你会发现使用模板确实更方便好用;
# 保证金购买模板查询
GET /_search/template
{
  "id":"search_order_collect_template",
  "params":{
    "gteTime":"2021-02-02",
    "lteTime":"2021-02-02",
    "interval":"day",
    "format":"yyyy-MM-dd"
  }
}

# 保证金购买模板
POST _scripts/search_order_collect_template
{
  "script": {
    "lang": "mustache",
    "source": {
      "size": 0,
      "query": {
        "range": {
          "orderTime": {
            "gte": "{{gteTime}}",
            "lte": "{{lteTime}}",
            "format": "yyyy-MM-dd"
          }
        }
      },
      "aggs": {
        "group_date_histogram_data": {
          "date_histogram": {
            "field": "orderTime",
            "calendar_interval": "{{interval}}",
            "format": "{{format}}"
          },
          "aggs": {
            "group_totalType_data": {
              "terms": {
                "field": "totalType"
              },
              "aggs": {
                "qty": {
                  "sum": {
                    "field": "qty"
                  }
                },
                "orderMoney": {
                  "sum": {
                    "field": "orderMoney"
                  }
                },
                "people": {
                  "sum": {
                    "field": "people"
                  }
                }
              }
            },
            "group_orderType": {
              "terms": {
                "field": "orderType"
              },
              "aggs": {
                "qty": {
                  "sum": {
                    "field": "qty"
                  }
                },
                "orderMoney": {
                  "sum": {
                    "field": "orderMoney"
                  }
                },
                "people": {
                  "sum": {
                    "field": "people"
                  }
                }
              }
            },
            "group_deadline": {
              "terms": {
                "field": "deadline"
              },
              "aggs": {
                "qty": {
                  "sum": {
                    "field": "qty"
                  }
                },
                "orderMoney": {
                  "sum": {
                    "field": "orderMoney"
                  }
                },
                "people": {
                  "sum": {
                    "field": "people"
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

模板查询显示效果

在这里插入图片描述

java代码(dg-search项目)

    //feign接口
    /**
     * 保证金购买表数据查询
     * @param depositBucketsReq
     * @return
     */
    @GetMapping("/listDepositOrder")
    List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);

    // 实现类
    @Override
    public List<DepositOrderBucketsRes> listDepositOrder(DepositBucketsReq depositBucketsReq) {
        final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq);
        String templateName = elasticsearchConfig.bzjConfig.getSearch_order_collect_template();
        LogUtil.info("模板名称:{}  请求参数为:{}", templateName, params.toString());

        SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,
                params, restHighLevelClient);

        Aggregations aggregations = searchResponse.getAggregations();
        // 和统计模板的不同之处
        ParsedDateHistogram dateHistogram = aggregations.get("group_date_histogram_data");
        if (dateHistogram.getBuckets().size() < 1) {
            return null;
        }

        return getDepositOrderCollect(dateHistogram.getBuckets());
    }

    private List<DepositOrderBucketsRes> getDepositOrderCollect(List<? extends Histogram.Bucket> buckets){
        List<DepositOrderBucketsRes> res = Lists.newLinkedList();
        buckets.stream().forEach(dateBucket -> {
            String dateString = dateBucket.getKeyAsString();
            DepositOrderBucketsRes build = DepositOrderBucketsRes.builder()
                    .orderTime(dateString)
                    .build();
            Aggregations aggregations = dateBucket.getAggregations();

            // 获取合计聚合数据
            getOrderSumAggsData(aggregations, build);

            // 获取购买周期数据
            getOrderDeadlineAggsData(build, aggregations);

            // 获取购买来源数据
            getOrderSourceAggsData(aggregations, build);
            res.add(build);
        });
        return res;
    }

    /**
     * 获取合计聚合数据
     * @param aggregations
     * @param build
     */
    public void getOrderSumAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {
        ParsedTerms totalTypeData = aggregations.get("group_totalType_data");
        List<? extends Terms.Bucket> totalTypeDataBuckets = totalTypeData.getBuckets();
        totalTypeDataBuckets.stream().forEach(bucket -> {
            Aggregations agg = bucket.getAggregations();
            ParsedSum sumQty = agg.get("qty");
            ParsedSum sumMoney = agg.get("orderMoney");
            ParsedSum sumPeople = agg.get("people");
            build.setSumQty(new BigDecimal(sumQty.getValueAsString()).intValue());
            build.setSumMoney(NumberUtil.round(new BigDecimal(sumMoney.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
            build.setSumPeople(new BigDecimal(sumPeople.getValueAsString()).intValue());
        });
    }

    /**
     * 获取购买周期数据
     * @param build
     * @param aggregations
     */
    private void getOrderDeadlineAggsData(DepositOrderBucketsRes build, Aggregations aggregations) {
        ParsedTerms deadlineData = aggregations.get("group_deadline");
        List<? extends Terms.Bucket> deadlineDataBuckets = deadlineData.getBuckets();
        deadlineDataBuckets.stream().forEach(bucket -> {
            Integer deadline = new BigDecimal(bucket.getKey().toString()).intValue();
            Aggregations agg = bucket.getAggregations();
            ParsedSum qty = agg.get("qty");
            ParsedSum money = agg.get("orderMoney");
            ParsedSum people = agg.get("people");
            DepositOrderDeadlineEnum deadlineEnum = DepositOrderDeadlineEnum.getDeadlineEnum(deadline);
            if (ObjectUtil.isEmpty(deadlineEnum)) {
                return;
            }
            switch (deadlineEnum) {
                case THIRTY_DAYS:
                    build.setThirtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());
                    build.setThirtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
                    build.setThirtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());
                    break;
                case NINETY_DAYS:
                    build.setNinetyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());
                    build.setNinetyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
                    build.setNinetyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());
                    break;
                case THREE_HUNDRED_SIXTY_DAYS:
                    build.setThreeHundredSixtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());
                    build.setThreeHundredSixtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
                    build.setThreeHundredSixtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());
                    break;
                default:
                    break;
            }
        });
    }

    /**
     * 获取购买来源数据
     * @param aggregations
     * @param build
     */
    public void getOrderSourceAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {
        ParsedTerms sourceData = aggregations.get("group_orderType");
        List<? extends Terms.Bucket> sourceDataBuckets = sourceData.getBuckets();
        sourceDataBuckets.stream().forEach(bucket -> {
            Integer orderType = new BigDecimal(bucket.getKey().toString()).intValue();
            Aggregations agg = bucket.getAggregations();
            ParsedSum qty = agg.get("qty");
            ParsedSum money = agg.get("orderMoney");
            ParsedSum people = agg.get("people");
            DepositOrderSourceEnum orderSourceEnum = DepositOrderSourceEnum.getOrderSourceEnum(orderType);
            if (ObjectUtil.isEmpty(orderSourceEnum)) {
                return;
            }
            switch (orderSourceEnum) {
                case DEPOSIT:
                    build.setDepositQty(new BigDecimal(qty.getValueAsString()).intValue());
                    build.setDepositMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
                    break;
                // 这几种来源,统称为商店入驻
                case CHANNEL_STORE:
                case FIRM:
                case CONSIGNMENT_STORE:
                case IDLE_STORE:
                    BigDecimal newQty = new BigDecimal(qty.getValueAsString());
                    BigDecimal oldQty = new BigDecimal(build.getStoreQty());
                    BigDecimal newMoney = new BigDecimal(money.getValueAsString());
                    BigDecimal oldMoney = new BigDecimal(build.getStoreMoney());
                    build.setStoreQty((oldQty.add(newQty)).intValue());
                    build.setStoreMoney(NumberUtil.round(oldMoney.add(newMoney), BigDecimal.ROUND_CEILING).toString());
                    break;
                default:
                    break;
            }
        });
    }

五:kibana中新建ES汇总查询模板

# 保证金存款表统计模板查询
GET /_search/template
{
  "id":"search_order_total_template",
  "params":{
    "gteTime":"2008-01-01",
    "lteTime":"2021-12-31",
    "interval":"day",
    "format":"yyyy-MM-dd"
  }
}

# 保证金存款表统计模板
POST _scripts/search_order_total_template
{
  "script": {
    "lang": "mustache",
    "source": {
      "size": 0,
      "query": {
        "range": {
          "orderTime": {
            "gte": "{{gteTime}}",
            "lte": "{{lteTime}}",
            "format": "yyyy-MM-dd"
          }
        }
      },
      "aggs": {
        "group_totalType_data": {
          "terms": {
            "field": "totalType"
          },
          "aggs": {
            "qty": {
              "sum": {
                "field": "qty"
              }
            },
            "orderMoney": {
              "sum": {
                "field": "orderMoney"
              }
            },
            "people": {
              "sum": {
                "field": "people"
              }
            }
          }
        },
        "group_orderType": {
          "terms": {
            "field": "orderType"
          },
          "aggs": {
            "qty": {
              "sum": {
                "field": "qty"
              }
            },
            "orderMoney": {
              "sum": {
                "field": "orderMoney"
              }
            },
            "people": {
              "sum": {
                "field": "people"
              }
            }
          }
        },
        "group_deadline": {
          "terms": {
            "field": "deadline"
          },
          "aggs": {
            "qty": {
              "sum": {
                "field": "qty"
              }
            },
            "orderMoney": {
              "sum": {
                "field": "orderMoney"
              }
            },
            "people": {
              "sum": {
                "field": "people"
              }
            }
          }
        }
      }
    }
  }
}

模板查询显示效果

在这里插入图片描述

java代码(dg-search项目)

// feign接口
/**
     * 保证金购买表--统计金额
     * @param depositBucketsReq
     * @return
     */
    @GetMapping("/getDepositOrderTotal")
    DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);

    // 实现类
    @Override
    public DepositOrderBucketsRes getDepositOrderTotal(DepositBucketsReq depositBucketsReq) {
        final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq);
        String templateName = elasticsearchConfig.bzjConfig.getSearch_order_total_template();
        LogUtil.info("模板名称:{}  请求参数为:{}", templateName, params.toString());

        SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,
                params, restHighLevelClient);
        Aggregations aggregations = searchResponse.getAggregations();

        DepositOrderBucketsRes build = DepositOrderBucketsRes.builder().build();

        // 获取合计聚合数据(代码同上)
        getOrderSumAggsData(aggregations, build);

        // 获取购买周期数据(代码同上)
        getOrderDeadlineAggsData(build, aggregations);

        // 获取购买来源数据(代码同上)
        getOrderSourceAggsData(aggregations, build);
        return build;
    }

六:对ES查询模板的数据结果进行汇总返回前端JSON数据

Controller:

/**
     * 保证金购买统计表--查询
     */
    @GetMapping("/order/list")
    public ResponseData listPageDepositOrderByReq(@Valid DepositCollectReq depositCollectReq){
        return ResponseData.success(financeReportService.listPageDepositOrderByReq(depositCollectReq));
    }

    /**
     * 保证金购买统计表--统计金额
     */
    @GetMapping("/order/total")
    public ResponseData getDepositOrderTotal(@Valid DepositCollectReq depositCollectReq){
        return ResponseData.success(financeReportService.getDepositOrderTotal(depositCollectReq));
    }

    /**
     * 保证金购买统计表--导出excel
     */
    @GetMapping("/order/export")
    public ResponseData getDepositOrderExportFile(@Valid DepositCollectReq depositCollectReq){
        return ResponseData.success(financeReportService.getDepositOrderExportFile(depositCollectReq));
    }

前端请求实体:

package com.dg.mall.financial.vo.req.report.collect;

import com.dg.mall.core.page.PageVO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

/**
 * <p>
 * 请求实体
 * <p>
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DepositCollectReq extends PageVO {

    /**
     * 报表类型(0:日,1:月,2:年)
     */
    @NotNull(message = "报表类型不能为空!")
    private Integer reportType;

    /**
     * 开始时间
     */
    @NotBlank(message = "开始日期不能为空!")
    private String gteTime;

    /**
     * 结束时间
     */
    @NotBlank(message = "结束日期不能为空!")
    private String lteTime;
}

Service:

/**
     * 保证金购买统计表--分页查询
     * @param depositCollectReq
     * @return
     */
    PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq);

    /**
     * 保证金购买统计表--导出excel
     * @param depositCollectReq
     * @return
     */
    String getDepositOrderExportFile(DepositCollectReq depositCollectReq);

    /**
     * 保证金购买统计表--统计金额
     * @param depositCollectReq
     * @return
     */
    DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq);

ServiceImpl:

    // 查询
    @Override
    public PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq) {
        PageVO<DepositOrderBucketsRes> pageVO = new PageVO<>();
        // 查询条件转换成ES的请求实体类并添加默认参数
        DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);
        List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);
        if (CollectionUtils.isEmpty(resList)){
            return pageVO;
        }
        List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());
        final PageUtils pageUtils = new PageUtils(Integer.valueOf(depositCollectReq.getCurrent() + ""), Integer.valueOf(depositCollectReq.getSize() + ""), collect);
        pageVO.setRecords(pageUtils.getCurrentList());
        pageVO.setTotal(pageUtils.getAllList().size());
        pageVO.setSize(depositCollectReq.getSize());
        pageVO.setCurrent(depositCollectReq.getCurrent());
        return pageVO;
    }

    //统计
    @Override
    public DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq) {
        DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);
        return bzjFinanceReportFeignService.getDepositOrderTotal(depositBucketsReq);
    }

    // 导出excel(easyPoi)
    @Override
    public String getDepositOrderExportFile(DepositCollectReq depositCollectReq) {
        DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);
        List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);
        if (CollectionUtils.isEmpty(resList)){
            return null;
        }
        List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());
        final Map<String, Object> map = Maps.newHashMap();
        TemplateExportParams params = new TemplateExportParams("template/excel/导出保证金购买报表模板.xlsx");
        map.put("mapList", collect);

        Workbook workbook = ExcelExportUtil.exportExcel(params, map);
        return uploadUtils.uploadFile(workbook, "保证金购买报表");
    }
    

    //查询条件转换成ES的请求实体类并添加默认参数
    @Override
    public DepositBucketsReq getDepositBucketsReq(DepositCollectReq depositCollectReq) {
        String gteTime = depositCollectReq.getGteTime();
        String lteTime = depositCollectReq.getLteTime();
        Integer type = depositCollectReq.getReportType();
        ReportDateEnum reportDateEnum = ReportDateEnum.getReportDateEnum(type);
        switch (reportDateEnum) {
            case MONTH_FORMAT: {
                gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();
                lteTime = DateUtil.endOfMonth(TimeUtils.getDateTime(lteTime, type)).toDateStr();
                break;
            }

            case YEAR_FORMAT: {
                gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();
                lteTime = DateUtil.endOfYear(TimeUtils.getDateTime(lteTime, type)).toDateStr();
                break;
            }
            default:
                break;
        }

        return DepositBucketsReq.builder()
                .gteTime(gteTime)
                .lteTime(lteTime)
                .format(reportDateEnum.getFormat())
                .interval(reportDateEnum.getInterval())
                .build();
    }

逻辑分页:因为ES查询模板我们没有实现使用ES的from、size这种分页语法(可能可以,但我们没有实现),所以封装了一个逻辑分页类

package com.dg.mall.financial.utils;

import com.google.common.collect.Lists;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

/**
 * <p>
 * 逻辑分页工具
 * <p>
 */
@Data
public class PageUtils implements Serializable {

    /**
     * 当前页
     */
    private Integer current;

    /**
     * 每页数据
     */
    private Integer size;

    /**
     * 当前页数据
     */
    private List currentList;

    /**
     * 所有数据
     */
    private List allList;

    public PageUtils(Integer current, Integer size, List allList) {
        final List arrayList = Lists.newArrayList();
        this.current = current;
        this.size = size;
        this.allList = allList;

        if (allList.size() <= size) {
            this.currentList = allList;
            return;
        }

        int start = (current - 1) * size;
        for (int i = 0; i < size; i++) {
            if (start + i >= allList.size()) {
                break;
            }

            arrayList.add(allList.get(start + i));
        }

        this.currentList = arrayList;
    }
}

七:单笔订单购买进行数据推送ES处理

注意2点:

  • 记得做人数统计的去重,这里使用的是redis;
  • req.setId(),这个Id记得做到唯一,因为这个id会存入ES索引数据中,而ES索引id相同,会做数据覆盖;
    注:我这里单条数据推送使用了消息队列,大家可以参照、也可以直接调用批量插入ES的方法;
    /**
     * 购买表数据推送es(区分购买周期和购买来源)
     *
     * @param depositOrder
     */
    public void sendOrderToEs(DepositOrder depositOrder) {
        // 合计 日期_类型_userid
        addOrderByTotalTypeData(depositOrder);

        // 购买周期
        if (addOrderByDeadlineData(depositOrder)) return;

        // 购买来源
        addOrderBySourceData(depositOrder);

    }

    /**
     * 添加订单数据,合计、并对人数进行区分
     *
     * @param depositOrder
     */
    private void addOrderByTotalTypeData(DepositOrder depositOrder) {
        DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();
        String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");
        // 人数去重
        String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(SUM_TYPE.getType()).concat("_").concat(depositOrder.getUserId());
        if (!redisTemplate.hasKey(key)) {
            req.setPeople(1L);
            redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);
        }
        req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(SUM_TYPE.getType()));
        req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));
        req.setTotalType(SUM_TYPE.getCode());
        req.setQty(1L);
        req.setOrderMoney(depositOrder.getOrderMoney());
        ReportReq reportReq = ReportReq.builder()
                .shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)
                .data(JSON.toJSONString(req)).build();
        financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));
    }

    /**
     * 添加订单数据,根据购买周期区分
     *
     * @param depositOrder
     * @return
     */
    private boolean addOrderByDeadlineData(DepositOrder depositOrder) {
        DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();
        String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");
        DepositOrderSubsidy orderSubsidy = depositOrderSubsidyService.getOne(new LambdaQueryWrapper<DepositOrderSubsidy>()
                .eq(DepositOrderSubsidy::getOrderId, depositOrder.getOrderId()));
        if (ObjectUtil.isEmpty(orderSubsidy)) {
            return true;
        }

        // 人数去重
        String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(DEADLINE_TYPE.getType()).concat("_").concat(String.valueOf(orderSubsidy.getDeadline())).concat("_").concat(depositOrder.getUserId());
        if (!redisTemplate.hasKey(key)) {
            req.setPeople(1L);
            redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);
        }
        req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(DEADLINE_TYPE.getType()));
        req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));
        req.setDeadline(orderSubsidy.getDeadline());
        req.setQty(1L);
        req.setOrderMoney(depositOrder.getOrderMoney());
        ReportReq reportReq = ReportReq.builder()
                .shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)
                .data(JSON.toJSONString(req)).build();
        financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));
        return false;
    }

    /**
     * 添加订单数据,根据购买来源区分
     *
     * @param depositOrder
     */
    private void addOrderBySourceData(DepositOrder depositOrder) {
        DepositOrderMappingReq req = DepositOrderMappingReq.builder()
                .id(String.valueOf(depositOrder.getId()).concat("_").concat(ORDER_SOURCE_TYPE.getType()))
                .orderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"))
                .orderType(depositOrder.getOrderType())
                .qty(1L)
                .orderMoney(depositOrder.getOrderMoney())
                .build();
        ReportReq reportReq = ReportReq.builder()
                .shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)
                .data(JSON.toJSONString(req)).build();
        financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));
    }

/**
     * 发送消息队列
     *
     * @param exchange
     * @param routingKey
     * @param content
     */
    public void sendMessage(String exchange, String routingKey, Object content) {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData correlationData = new CorrelationData(String.valueOf(IdWorker.getId()));
        rabbitTemplate.convertAndSend(exchange, routingKey, getMessage(content),
                message -> {
                    message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }, correlationData);
        LogUtil.info("消息队列发送完成,消息为:" + content);
    }

消息队列--消费者:单条数据推送ES

package com.dg.mall.financial.rabbitmq.consumer;

/**
 * <p>
 * 财务报表--消费
 * <p>
 */
@Service
public class DepositFinancelReportConsumer {

    @Autowired
    private BzjFinanceReportFeignService bzjFinanceReportFeignService;

    @Resource
    private DepositRenewalReportFeignService depositRenewalReportFeignService;

    @Resource
    private DepositRollReportFeignService depositRollReportFeignService;

    @Resource
    private DepositAwardEsFeignService depositAwardEsFeignService;

    @Resource
    private DepositFinanceReportService financeReportService;

    @Resource
    private SubsidyAwardService subsidyAwardService;

    @Resource
    private RedisTemplate redisTemplate;

    @RabbitListener(queues = "deposit-financel-report-queue")
    @RabbitHandler
    @Transactional(rollbackFor = Exception.class)
    public void financelReportQueue(Message msg, Channel channel) throws Exception {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        String resMsg = new String(msg.getBody(), "utf-8");
        try {
            // 根据报表类型区分
            ReportReq req = JSON.parseObject(resMsg, new TypeReference<ReportReq>() {});
            String shortName = req.getShortName();
            switch (shortName) {
                case ReportTypeConstants.FINANCIEL_BZJGMTJ:
                    // 批量添加--保证金购买统计表es数据
                    batchAddOrUpdateDepositOrderEsMapping(channel, deliveryTag, req);
                    break;
                // ……
                default:
                    throw new IllegalStateException("Unexpected value: " + shortName);
            }
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, true);
            throw new RuntimeException();
        }
        channel.basicAck(deliveryTag, true);
    }

    /**
     * 批量新增保证金存款数据(按"购买周期、购买来源"区分)
     *
     * @param channel
     * @param deliveryTag
     * @param req
     * @throws IOException
     */
    public void batchAddOrUpdateDepositOrderEsMapping(Channel channel, long deliveryTag, ReportReq req) throws IOException {
        DepositOrderMappingReq orderMappingReq = JSON.parseObject(req.getData(), new TypeReference<DepositOrderMappingReq>() {
        });
        if (ObjectUtil.isEmpty(orderMappingReq.getOrderTime())) {
            channel.basicAck(deliveryTag, true);
            return;
        }

        ArrayList<DepositOrderMappingReq> list = new ArrayList<>();
        list.add(orderMappingReq);
        bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(list);
        return;
    }
}

总结:
至此为止,功能就算全部完成了,对于我来说,重点就在于如何设计ES索引数据结构和查询模板的聚合使用。

作者:Jrocks

原文链接:https://www.jianshu.com/p/2944501da071

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐