Flink读取Kafka数据写入Hive
本文针对数据库CDC(change data capture)场景设计,探讨基于Flink1.12最新版本提供的实时写入Hive的技术可行性,下面为本地IDEA程序案例可供参考。 数据库CDC(change data capture)是实时捕获数据库中的变化数据,经过处理之后(也可能无需处理),将其更新到目标端的一种技术。为实现实时捕获,本文引入Debezium作为数据库connector,其提供了MongoDB、MySQL、PostgreSQL、SQL Server、Oracle、Db2 、Cassandra 、Vitess(其中Oracle、Db2 、Cassandra 、Vitess连接器正在孵化中)等数据库的连接器。并通过kafka topic将各种数据库的全量数据、存量数据、增量数据发送出去。而在数据处理环节只需要创建kafka cunsumer并订阅相应topic即可获取topic中的数据。另外,Flink本身也提供了Flink SQL CDC的技术支持(由阿里巴巴技术团队伍翀 (云邪大佬)等提供,献上大佬博客地址),但其在flink 1.12发布版本中并未提供,预计将在1.13版本中提供MySQL 、PostgreSQL等数据库的支持。 第一次启动debezium时,其会扫描全表并发送schema信息以及全量数据,然后在实时捕捉变化信息(增量数据)以保证源端和目标端的数据一致性。在发送全量数据之前,首先将数据库、表等的schema信息通过history topic发送出去,并为数据库的每一张表创建一个topic,topic名为<connector name>. <database name>.<table name>,其中connector name为连接器的名称,开发者可以自定义。该topic用于发送全量数据、存量数据以及增量数据。 1aa73888721f254667c0752e38e61b25.png debezium捕获到的表结构信息 93185a4ecaad47a01e0399c96d4bb57b.png debezium捕获到的数据信息 注册hive catalog用于读写hive image.gif 将kafka流表插入到hive中 八、测试 c0ad63aec5f3ea1f9543f9f5a456940f.png 0e3de1181dfe6da1aec8b89bb3853e66.png hive目标端同步的数据 由于hive是数据仓库,其本身为数据分析而生,不支持update、delete操作,但是在CDC场景中,不能保证源端数据库的操作全部为insert,因此可采用以下方式进行处理(参考美团数仓平台的架构与实践): 63204251a21339e74d44db2ff5545b9f.jpg 图片源于网络,侵权删 schema信息中ddl语句的自动解析(包括源端与目标端的数据类型、关键字差异解析) 关注公众号 HEY DATA,一起交流更多前言
一、整体思路
二、引入依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sequence-file</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
三、创建执行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(1);streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);streamEnv.enableCheckpointing(60000);EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
四、注册hive catalog
String name = "hiveCatalog"; // Catalog名称,定义一个唯一的名称表示String defaultDatabase = "test"; // 默认数据库名称String hiveConfDir = "D:\\"; // hive-site.xml路径String version = "3.1.2"; // Hive版本号HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);tableEnv.registerCatalog(name, hiveCatalog);tableEnv.useCatalog(name);
五、连接kafka数据源
Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "test");//创建kafka cunsumerFlinkKafkaConsumer<ObjectNode> flinkKafkaConsumer = new FlinkKafkaConsumer<ObjectNode>("topic", new JSONKeyValueDeserializationSchema(true), properties)flinkKafkaConsumer.setStartFromEarliest(); // start from the earliest record possible//将kafka cunsumer加入数据源DataStream<String> stream = streamEnv.addSource(flinkKafkaConsumer);
六、业务相关代码
String[] fieldNames = {"id", "log", "op"};TypeInformation[] types = {Types.STRING, Types.STRING, Types.STRING};//以下为业务相关代码,不做详细展开SingleOutputStreamOperator<Row> mapedStream= dataStreamSource.map(new GetIncDataMap(),new RowTypeInfo(types, fieldNames));//flink流转换为表tableEnv.createTemporaryView("kafkaRow", mapedStream);
七、执行具体sql
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);tableEnv.executeSql("DROP TABLE IF EXISTS hivelogtab").print();tableEnv.executeSql("CREATE TABLE hivelogtab ( id STRING,log STRING,op STRING)").print();tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// CloseableIterator<Row> result = tableEnv.sqlQuery("SELECT id,log,op FROM kafkaRow").execute().collect();// while(result.hasNext()){// System.out.println(result.next());// }TableResult tableResult = tableEnv.executeSql("INSERT INTO hiveCatalog.test.hivelogtab SELECT id,log,op FROM kafkaRow"); streamEnv.execute("job");
mysql源数据注:
TO DO :
作者:另存為
链接:https://www.jianshu.com/p/1efefc3d7a60
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。