Flink 二 消费kafka数据
前言:
Flink的DataSoures模块中,定义了DataStream API的数据输入操作,Flink将数据源主要分为内置和第三方数据源,内置数据源包含文件、Socket网络端口以及集合数据类型,主要用于数据测试。而第三方数据源,Flink通过SourceFunction定义了很多的数据源连接器,通过它们进行数据的读写操作。本篇通过Flink的kafka connector实现消费效果。
flink消费kafka:
引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>复制代码
自定义消息序列化类:
import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; public class MessageDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> { @Override public boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) { return false; } @Override public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { //返回ConsumerRecord return new ConsumerRecord<>( consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), new String(consumerRecord.key()), new String(consumerRecord.value()) ); } @Override public TypeInformation<ConsumerRecord<String, String>> getProducedType() { return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){}); } }复制代码
消费程序:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.Properties; public class FlinkConsumer { public static void main(String[] args) throws Exception{ // 构建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //指定checkpoint的触发间隔 env.enableCheckpointing(5000); // 默认的CheckpointingMode就是EXACTLY_ONCE,也可以指定为AT_LEAST_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.9.239:9092"); //设置消费者组 properties.setProperty("group.id", "flink_consumer"); //消费的三种方式,默认是latest //earliest:各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest:各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 properties.setProperty("auto.offset.reset", "earliest"); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("flink_kafka", new MessageDeSerializationSchema(), properties); DataStreamSource<ConsumerRecord<String, String>> stream = env .addSource(kafkaConsumer).setParallelism(2); //消费多个topic // List<String> topics = new ArrayList<>(); // topics.add("topic1"); // topics.add("topic2"); // DataStream<String> stream = env // .addSource(new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties)); stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() { @Override public void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception { collector.collect("offset:"+record.offset()); collector.collect("partition:"+record.partition()); collector.collect("topic:"+record.topic()); collector.collect("key:"+record.key()); collector.collect("value:"+record.value()); } }).print(); env.execute("consumer start"); } }复制代码
消息生产程序:
import com.alibaba.fastjson.JSON; import com.tt.flinkdemo.domain.MessageEntity; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTime; import org.springframework.stereotype.Component; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @Slf4j @Component public class MessageProducter { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.9.239:9092"); props.put("acks", "all"); // 发送所有ISR props.put("retries", Integer.MAX_VALUE); // 重试次数 props.put("batch.size", 16384); // 批量发送大小 props.put("buffer.memory", 102400); // 缓存大小,根据本机内存大小配置 props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送 props.put("client.id", "producer-syn-1"); // 发送端id,便于统计 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) throws Exception { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); for (int i=20;i<=30;i++){ MessageEntity message = new MessageEntity(); message.setMessage("第"+i+"条:message"); message.setTotalDate(DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); message.setMessageId(System.currentTimeMillis()); log.info(JSON.toJSONString(message)); ProducerRecord<String, String> record = new ProducerRecord<>("flink_kafka", System.currentTimeMillis()+"", JSON.toJSONString(message)); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata; try { recordMetadata = metadataFuture.get(); log.info("发送成功!"); log.info("topic:"+recordMetadata.topic()); log.info("partition:"+recordMetadata.partition()); log.info("offset:"+recordMetadata.offset()); } catch (InterruptedException|ExecutionException e) { System.out.println("发送失败!"); e.printStackTrace(); } Thread.sleep(100); } } }复制代码
测试效果:
作者:画画的贝贝
链接:https://juejin.cn/post/7023213022768267278