阅读 106

Flink 1.12 Hive分区时态表批流一体

需求背景

我们生产常有将实时与Hive维表join来丰富数据的需求、Hive表是分区表、上周Flink 1.12发布了、刚好支撑了这种业务场景、我也将1.12版本部署了、做了一个线上需求并上线、分区时态表提升了很多开发效率、结合一些之前生产的例子、做一些小的分享

分享主题
  • 没有分区时态表实现的一个批流join例子
  • 1.12版本依赖以及注意项
  • Flink 1.12 : Hive分区时态表与kafka的批流一体带来的便捷
  • 分区时态表代码Demo
  • Flink SQL开发的小技巧
  • ORC格式BUG
没有分区时态表实现的一个批流join例子

在分区时态表出来之前、为了定期关联出最新的分区数据、通常要写kafkaStream程序、然后在map算子join返回Tuple2类型数据、再将该数据转换成Table、从而进行SQL查询

    StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);
    Configuration configuration = blinkStreamTableEnv.getConfig().getConfiguration();
    configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
    configuration.setString("table.exec.mini-batch.allow-latency", "60 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
    DataStream<Tuple2<MasterBean, HiveDayIndexBean>> indexBeanStream = masterDataStream.map(new IndexOrderJoin());

mapJoin的实现类: 将T-2的维度数据与实时数据join、并返回Tuple2数据、因为离线数仓出数一般在凌晨3点、有时候由于集群资源不稳定、导致出数慢、所以用的是T-2

public class IndexOrderJoin extends RichMapFunction<MasterBean, Tuple2<MasterBean, HiveDimBean>> {

    private Map<String, Map<String, HiveDimBean>> map = null;
    Logger logger;

    @Override
    public void open(Configuration parameters) throws Exception {
        logger = LoggerFactory.getLogger(Class.forName("com.hll.util.IndexOrderJoin"));
        map = new HashMap<>();
    }

    @Override
    public Tuple2<MasterBean, HiveDimBean> map(MasterBean masterBean) {
        if (map.get(masterBean.getReportDate()) == null) {
            //如果map里没有T-2的维表数据、则查询一次Hive、并将结果存入该map、map属于线程级别、所以保证Task维表数据是全的
            logger.info("initial hive data : {}", masterBean.getReportDate());
            map.put(masterBean.getReportDate(), ScalaHiveUtil.getHiveDayIndex(masterBean.getReportDate()));
        }
        //将传入过来的kafka数据、与hive join完后、返回Tuple2数据
        return new Tuple2<>(masterBean, map.get(masterBean.getReportDate()).get(masterBean.getGroupID().concat(masterBean.getShopID())));
    }
}
基于join好的indexBeanStream流转视图、然后做SQL查询
    blinkStreamTableEnv.createTemporaryView("index_order_master", indexBeanStream);
    blinkStreamTableEnv.sqlUpdate("select group_id, sum(amt) from index_order_master group by group_id");
    blinkStreamTableEnv.execute("rt_aggr_master_flink");

可以看出、没有Hive分区时态表的时候、简单的一个join便涉及到kafkaStream、map算子join、若分区数据过大、还要用上async IO防止阻塞、程序的代码量和维护成本会增加

1.12版本依赖以及注意项

  • 1.12中 flink-connector-kafka_2.11 替换了之前的 flink-connector-kafka-0.10[0.11]_2.11、无需注明artifactId里的kafka版本信息
  • 在kafka DDL中、连接时无需指定版本信息
    1.11版本: 'connector' = 'kafka-0.10'
    1.12版本: 'connector' = 'kafka'
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.8</scala.version>
        <mysql.version>5.1.39</mysql.version>
        <hive.version>3.1.0</hive.version>
        <hadoop.version>2.7.1</hadoop.version>
        <fastjosn.version>1.2.22</fastjosn.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clents</artifactId>
                </exclusion>
            </exclusions>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjosn.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!--Flink catlog Hive-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

Flink 1.12 : Hive分区时态表与kafka的批流一体带来的便捷

1.12的分区时态表特性、可以直接通过sql的方式、一体化实现实时与Hive分区表的join、并且会自动监听最新的hive分区数据、读取到TaskManager中、始终维护最新的一份快照数据、之前的数据会废弃、从而保证TaskManager内存不会无限增长、无需用户编写stream程序即可完成批流join

此图摘自阿里雪尽2020FFA PPT
Hive分区时态表
参数解释

streaming-source.enable 开启流式读取hive数据
streaming-source.partition.include 1.latest属性: 只读取最新分区数据、2. all: 读取全量分区数据 ,默认值为all,表示读所有分区,latest只能用在temporal join中,用于读取最新分区作为维表,不能直接读取最新分区数据
streaming-source.monitor-interval 监听新分区生成的时间、不宜过短、最短是1个小时,因为目前的实现是每个task都会查询metastore,高频的查可能会对metastore产生过大的压力
streaming-source.partition-order 分区策略: 主要有以下3种、其中最为推荐的是partition-name
1.partition-name 使用默认分区名称顺序加载最新分区
2.create-time 使用分区文件创建时间顺序
3.partition-time 使用分区时间顺序

注意事项

使用Tempmoral table之前 需要将table.dynamic-table-options.enabled属性设置为true、以此来打开SQL Hint属性、来使用/* option */ 设置table属性
例如

SELECT * 
FROM hive_table 
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

SQL CLINET
set  table.dynamic-table-options.enabled= true;
程序代码
 tblEnv.getConfig().getConfiguration().setString("table.dynamic-table-options.enabled", "true");
官网示例
Hive Tempmoral Demo

Hive Tempmoral Demo

分区时态表代码Demo

HiveCatalog注册
Flink Sql Clinet
vim conf/sql-client-defaults.yaml
catalogs:
  - name: hive_catalog
    type: hive
    hive-conf-dir: /disk0/soft/hive-conf/   该目录需要包含hive-site.xml文件
KAFKA TABLE DDL
CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (
    master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,
    foodLst ARRAY<ROW(reportDate String, orderKey String, groupID String, shopID String, shopName String, foodName String, isSetFood int, isSFDetail int, foodNumber double, foodCancelNumber double, foodSendNumber double, foodCategoryName String, foodRealAmount double, foodPriceAmount double, unit String, foodCode String)>,
    proctime as PROCTIME()  -- PROCTIME用来和Hive时态表关联
) WITH (
 'connector' = 'kafka',
 'topic' = 'topic_name',
 'format' = 'json',
 'properties.bootstrap.servers' = 'host:9092',
 'properties.group.id' = 'flinkTestGroup',
 'scan.startup.mode' = 'timestamp',
 'scan.startup.timestamp-millis' = '1607844694000'
);

Hive最新分区数据与Flink数据Join
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as
SELECT * FROM
 (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id, 
     ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn
    from hive_catalog.flink_db.kfk_fact_bill_master_12 t1
       JOIN hive_catalog.flink_db.dim_extend_shop_info 
     /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest',
    'streaming-source.monitor-interval' = '1 h','streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id
    where groupID in (202042)) t  where t.rn = 1

将结果数据SInk到Mysql
CREATE TABLE hive_catalog.flink_db_sink.rt_aggr_bill_food_unit_rollup_flk (
      report_date String,
      group_id int,
      group_name String,
      shop_id int,
      shop_name String,
      brand_id BIGINT,
      brand_name String,
      province_name String,
      city_name String,
      foodcategory_name String,
      food_name String,
      food_code String,
      unit String,
      rt_food_unit_cnt double,
      rt_food_unit_amt double,
      rt_food_unit_real_amt double,
    PRIMARY KEY (report_date, group_id, shop_id, brand_id, foodcategory_name, food_name, food_code, unit) NOT ENFORCED) WITH (
    'connector' = 'jdbc', 
    'url' = 'jdbc:mysql://host:4400/db_name?autoReconnect=true&useSSL=false',
    'table-name' = 'table-name', 
    'username' = 'username', 
    'password' = 'password'
)

insert into hive_catalog.flink_db_sink.rt_aggr_bill_food_unit_rollup_flk
select reportDate, group_id, group_name, shop_id, shop_name, brand_id, brand_name, province_name, city_name, foodCategoryName, foodName, foodCode, unit
   , SUM(foodNumber)  rt_food_unit_cnt
   , sum(foodPriceAmount)  rt_food_unit_amt
   , sum(foodRealAmount)  rt_food_unit_real_amt
   from  hive_catalog.flink_db.view_fact_bill_master
   group by reportDate, group_id, group_name, shop_id, shop_name, brand_id, brand_name, province_name, city_name;

Flink 程序、结合HiveCatalog、利用Sql Clinet已经创建好的source、sink、程序只需要关心逻辑代码、无需关注source、sink表的创建: insert into mysql_sink from kafka table join hive、提高代码复用性以及可读性、减少维护成本
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings);
        FlinkEnvUtils.initTblEnv(tblEnv);
        tblEnv.getConfig().getConfiguration().setString("table.dynamic-table-options.enabled", "true");

        //创建hive catalog
        tblEnv.executeSql("CREATE CATALOG hive_catalog WITH (\n" +
                "  'type'='hive',\n" +
                "  'hive-conf-dir'='/disk0/soft/hive-conf/'" +  //该目录需要包含hive-site.xml文件
                ")");
        tblEnv.createTemporaryFunction("boomFunction", ExplodeArray.class);

        tblEnv.executeSql("insert into hive_catalog.flink_db_sink.rt_aggr_bill_food_unit_rollup_flk\n" +
                "select reportDate, group_id, group_name, shop_id, shop_name, brand_id, brand_name, province_name, city_name, foodCategoryName, foodName, foodCode, unit\n" +
                "   , SUM(foodNumber)  rt_food_unit_cnt\n" +
                "   , sum(foodPriceAmount)  rt_food_unit_amt\n" +
                "   , sum(foodRealAmount)  rt_food_unit_real_amt\n" +
                "   from  hive_catalog.flink_db.view_fact_bill_master\n" +
                "   group by reportDate, group_id, group_name, shop_id, shop_name, brand_id, brand_name, province_name, city_name");

ORC读取BUG

分区时态表在读取ORC格式的时候、总是无法读取数据、我也向社区提了一个Jira: https://issues.apache.org/jira/browse/FLINK-20576?filter=-2
读取parquet和text都是正常的

Flink Sql开发小技巧

  • 结合Hive catalog、持久化source与sink、提高程序复用性、并使代码只需关注逻辑SQL
  • 结合Flink视图、抽象逻辑代码、提高复用性
  • 利用SQL Clinet调试SQL、程序没问题后、再打包上线、而不是以程序的形式、提交到集群做测试

作者:HideOnBushKi

原文链接:https://www.jianshu.com/p/ee839d496baf

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐