阅读 123

Flink读取Kafka数据写入Hive

前言

本文针对数据库CDC(change data capture)场景设计,探讨基于Flink1.12最新版本提供的实时写入Hive的技术可行性,下面为本地IDEA程序案例可供参考。

一、整体思路

数据库CDC(change data capture)是实时捕获数据库中的变化数据,经过处理之后(也可能无需处理),将其更新到目标端的一种技术。为实现实时捕获,本文引入Debezium作为数据库connector,其提供了MongoDBMySQLPostgreSQLSQL ServerOracleDb2CassandraVitess(其中OracleDb2CassandraVitess连接器正在孵化中)等数据库的连接器。并通过kafka topic将各种数据库的全量数据、存量数据、增量数据发送出去。而在数据处理环节只需要创建kafka cunsumer并订阅相应topic即可获取topic中的数据。另外,Flink本身也提供了Flink SQL CDC的技术支持(由阿里巴巴技术团队伍翀 (云邪大佬)等提供,献上大佬博客地址),但其在flink 1.12发布版本中并未提供,预计将在1.13版本中提供MySQL 、PostgreSQL等数据库的支持。

第一次启动debezium时,其会扫描全表并发送schema信息以及全量数据,然后在实时捕捉变化信息(增量数据)以保证源端和目标端的数据一致性。在发送全量数据之前,首先将数据库、表等的schema信息通过history topic发送出去,并为数据库的每一张表创建一个topic,topic名为<connector name>. <database name>.<table name>,其中connector name为连接器的名称,开发者可以自定义。该topic用于发送全量数据、存量数据以及增量数据。

1aa73888721f254667c0752e38e61b25.png

debezium捕获到的表结构信息

93185a4ecaad47a01e0399c96d4bb57b.png

debezium捕获到的数据信息

二、引入依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sequence-file</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.12.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libfb303</artifactId>
            <version>0.9.3</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

三、创建执行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamEnv.enableCheckpointing(60000);

EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));

四、注册hive catalog

注册hive catalog用于读写hive

String name = "hiveCatalog";      // Catalog名称,定义一个唯一的名称表示
String defaultDatabase = "test";  // 默认数据库名称
String hiveConfDir = "D:\\";  // hive-site.xml路径
String version = "3.1.2";       // Hive版本号
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);
tableEnv.registerCatalog(name, hiveCatalog);
tableEnv.useCatalog(name);
image.gif

五、连接kafka数据源

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
//创建kafka cunsumer
FlinkKafkaConsumer<ObjectNode> flinkKafkaConsumer = new FlinkKafkaConsumer<ObjectNode>("topic", new JSONKeyValueDeserializationSchema(true), properties)
flinkKafkaConsumer.setStartFromEarliest();     // start from the earliest record possible
//将kafka cunsumer加入数据源
DataStream<String> stream = streamEnv.addSource(flinkKafkaConsumer);

六、业务相关代码

String[] fieldNames = {"id", "log", "op"};
TypeInformation[] types = {Types.STRING, Types.STRING, Types.STRING};
//以下为业务相关代码,不做详细展开
SingleOutputStreamOperator<Row> mapedStream= dataStreamSource.map(new GetIncDataMap(),new RowTypeInfo(types, fieldNames));
//flink流转换为表
tableEnv.createTemporaryView("kafkaRow", mapedStream);

七、执行具体sql

将kafka流表插入到hive中

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS hivelogtab").print();
tableEnv.executeSql("CREATE TABLE hivelogtab ( id STRING,log STRING,op STRING)").print();
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
//        CloseableIterator<Row> result = tableEnv.sqlQuery("SELECT id,log,op FROM  kafkaRow").execute().collect();
//        while(result.hasNext()){
//            System.out.println(result.next());
//        }
TableResult tableResult = tableEnv.executeSql("INSERT INTO  hiveCatalog.test.hivelogtab SELECT id,log,op FROM  kafkaRow");        
streamEnv.execute("job");

八、测试

c0ad63aec5f3ea1f9543f9f5a456940f.png

mysql源数据

0e3de1181dfe6da1aec8b89bb3853e66.png

hive目标端同步的数据

注:

由于hive是数据仓库,其本身为数据分析而生,不支持update、delete操作,但是在CDC场景中,不能保证源端数据库的操作全部为insert,因此可采用以下方式进行处理(参考美团数仓平台的架构与实践):

63204251a21339e74d44db2ff5545b9f.jpg

图片源于网络,侵权删

TO DO :

schema信息中ddl语句的自动解析(包括源端与目标端的数据类型、关键字差异解析)

关注公众号 HEY DATA,一起交流更多

作者:另存為

原文链接:https://www.jianshu.com/p/1efefc3d7a60

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐