FlinkSQL写入Kafka/ES/MySQL示例-JAVA
FlinkSQL写入Kafka/ES/MySQL示例-JAVA
一、背景说明
Flink的API做了4层的封装,上两层TableAPI、SQL语法相对简单便于编写,面对小需求可以快速上手解决,本文参考官网及部分线上教程编写source端、sink端代码,分别读取socket、kafka及文本作为source,并将流数据输出写入Kafka、ES及MySQL,方便后续查看使用。
二、代码部分
说明:这里使用connect及DDL两种写法,connect满足Flink1.10及以前版本使用,目前官方文档均是以DDL写法作为介绍,建议1.10以后的版本使用DDL写法操作,通用性更强。
1.读取(Source)端写法
1.1 基础环境建立,方便演示并行度为1且不设置CK
//建立Stream环境,设置并行度为1StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//建立Table环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
1.2 读取Socket端口数据,并使用TableAPI及SQL两种方式查询
//读取服务器9999端口数据,并转换为对应JavaBeanSingleOutputStreamOperator<WaterSensor> mapDS = env.socketTextStream("hadoop102", 9999) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0] , Long.parseLong(split[1]) , Integer.parseInt(split[2]));});//创建表:将流转换成动态表。Table table = tableEnv.fromDataStream(mapDS);//对动态表进行查询,TableAPI方式Table selectResult = table.where($("id").isEqual("ws_001")).select($("id"), $("ts"), $("vc"));//对动态表镜像查询,SQL方式-未注册表Table selectResult = tableEnv.sqlQuery("select * from " + table);
1.3 读取文本(FileSystem)数据,并使用TableAPI进行查询
//Flink1.10写法使用connect方式,读取txt文件并建立临时表tableEnv.connect(new FileSystem().path("input/sensor.txt")) .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")) .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .createTemporaryTable("sensor");//转换成表对象,对表进行查询。SQL写法参考Socket段写法Table table = tableEnv.from("sensor"); Table selectResult = table.groupBy($("id")).aggregate($("id").count().as("id_count"))select($("id"), $("id_count"));
1.4 消费Kafka数据,并使用TableAPI进行查询,分别用conncet及DDL写法
//Flink1.10写法使用connect方式,消费kafka对应主题并建立临时表tableEnv.connect(new Kafka().version("universal") .topic("sensor") .startFromLatest() .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092") .property(ConsumerConfig.GROUP_ID_CONFIG,"BD"))//消费者组 .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .withFormat(new Csv()) .createTemporaryTable("sensor");//Flink1.10以后使用DDL写法tableEnv.executeSql("CREATE TABLE sensor (" + " `id` STRING," + " `ts` BIGINT," + " `vc` INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'sensor'," + " 'properties.bootstrap.servers' = 'hadoop102:9092'," + " 'properties.group.id' = 'BD'," + " 'scan.startup.mode' = 'latest-offset'," + " 'format' = 'csv'" + ")");//转换成表对象,对表进行查询。SQL写法参考Socket段写法Table table = tableEnv.from("sensor"); Table selectResult = table.groupBy($("id")).aggregate($("id").count().as("id_count")) .select($("id"), $("id_count"));
2.写入(Sink)端部分写法
2.1 写入文本文件
//创建表:创建输出表,connect写法tableEnv.connect(new FileSystem().path("out/sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .createTemporaryTable("sensor");//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表selectResult.executeInsert("sensor");
2.2 写入Kafka
//connect写法tableEnv.connect(new Kafka().version("universal") .topic("sensor") .sinkPartitionerRoundRobin() //轮询写入 .property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")) .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .withFormat(new Json()) .createTemporaryTable("sensor");//DDL写法tableEnv.executeSql("CREATE TABLE sensor (" + " `id` STRING," + " `ts` BIGINT," + " `vc` INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'sensor'," + " 'properties.bootstrap.servers' = 'hadoop102:9092'," + " 'format' = 'json'" + ")");//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表selectResult.executeInsert("sensor");
2.3 写入MySQL(JDBC方式,这里手动导入了mysql-connector-java-5.1.9.jar)
//DDLtableEnv.executeSql("CREATE TABLE sink_sensor (" + " id STRING," + " ts BIGINT," + " vc INT," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://hadoop102:3306/test?useSSL=false'," + " 'table-name' = 'sink_test'," + " 'username' = 'root'," + " 'password' = '123456'" + ")");//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表selectResult.executeInsert("sensor");
2.4 写入ES
//connect写法tableEnv.connect(new Elasticsearch() .index("sensor") .documentType("_doc") .version("7") .host("localhost",9200,"http") //设置为1,每行数据都写入是方便客户端输出展示,生产勿使用 .bulkFlushMaxActions(1)) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .withFormat(new Json()) .inAppendMode() .createTemporaryTable("sensor");//DDL写法tableEnv.executeSql("CREATE TABLE sensor (" + " id STRING," + " ts BIGINT," + " vc INT," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'elasticsearch-7'," + " 'hosts' = 'http://localhost:9200'," + " 'index' = 'users'," + " 'sink.bulk-flush.max-actions' = '1')";)//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表selectResult.executeInsert("sensor");
三、补充说明
依赖部分pom.xml
<properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.12.0</flink.version> <scala.version>2.12</scala.version> <hadoop.version>3.1.3</hadoop.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.8.0</version> </dependency> <!-- elasticsearch 的客户端 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.8.0</version> </dependency> <!-- elasticsearch 依赖 2.x 的 log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.9</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> </dependencies></project>
学习交流,有任何问题还请随时评论指出交流。
来源https://www.cnblogs.com/rango-lhl/p/14900235.html