FLINK-connectors-Kafka读写
FLINK-connectors-Kafka读写
1.flink读取kafka
import java.util.Propertiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.kafka.clients.consumer.ConsumerRecord object finktest { val sourceTopic = "test_source" val DEFAULT_PARALLELISM = 3 def main(args: Array[String]): Unit = { //----------------------------读取参数------------------------------------------------ val params = ParameterTool.fromArgs(args) val parallelism = params.getInt("parallelism",DEFAULT_PARALLELISM) //----------------------------配置执行环境-------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) //----------------------------创建数据源---------------------------------------------- val consumerProps = new Properties() consumerProps.setProperty("bootstrap.servers", brokersIp) consumerProps.setProperty("group.id", "test") val consumer = new FlinkKafkaConsumer[ConsumerRecord[String,String]](sourceTopic,new KafkaDeserializationSchema,consumerProps) val source = env.addSource(consumer) //----------------------------执行作业------------------------------------------------ source.print() env.execute() } }
2.flink写入kafka
import java.util.Propertiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}import org.apache.kafka.clients.consumer.ConsumerRecord object finktest { val sourceTopic = "test_source" val sinkTopic = "test_sink" val DEFAULT_PARALLELISM = 3 def main(args: Array[String]): Unit = { //----------------------------读取参数------------------------------------------------ val params = ParameterTool.fromArgs(args) val parallelism = params.getInt("parallelism",DEFAULT_PARALLELISM) //----------------------------配置执行环境-------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) //----------------------------创建数据源---------------------------------------------- val consumerProps = new Properties() consumerProps.setProperty("bootstrap.servers", brokersIp) consumerProps.setProperty("group.id", "test") val consumer = new FlinkKafkaConsumer[ConsumerRecord[String,String]](sourceTopic,new KafkaDeserializationSchema,consumerProps) val source = env.addSource(consumer).map(_.value()) //----------------------------将数据输出到kafka--------------------------------------- val producerProps = new Properties() producerProps.setProperty("bootstrap.servers", brokersIp) val sink = new FlinkKafkaProducer[String](sinkTopic,new KafkaStringSerializer(sinkTopic),producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) source.addSink(sink) //----------------------------执行作业------------------------------------------------ //source.print() env.execute() } }
3.flink+kafka exactly once
env.enableCheckpointing(6000) // 启动检查点并设置保存检查点,时间间隔1分/次
4.flink kafkaSink自定义序列化类(实现按需流向不同topic)
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemacase class SensorReading(name:String,time:Long,tempature:Double) object kafkaSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //消费kafka流进行处理 val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //FlinkKafkaConsumer011(topic: String, valueDeserializer: DeserializationSchema[T], props: Properties) val dataStream = env.addSource(new FlinkKafkaConsumer011[String]("test_lir",new SimpleStringSchema(),properties)) val senseStream = dataStream.map(f=>{ val arr = f.split(",") SensorReading(arr(0).trim,arr(1).trim.toLong,arr(2).trim.toDouble) }) /* dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092","test_result",new SimpleStringSchema())) FlinkKafkaProducer011构造器参数:(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) SimpleStringSchema为默认的序列化 */ senseStream.addSink( new FlinkKafkaProducer011[SensorReading]("localhost:9092","test_result",new MySchema)) env.execute("kafka sink test") } class MySchema extends KeyedSerializationSchema[SensorReading]{ override def serializeKey(t: SensorReading): Array[Byte] = t.name.getBytes() //此方法才是实际底层produce的topic,FlinkKafkaProducer011中的topic_name级别不如此级别,重写该方法逻辑可以实现按需流向不同topic override def getTargetTopic(t: SensorReading): String = "test_result2" override def serializeValue(t: SensorReading): Array[Byte] = t.toString.getBytes } }
注意:getTargetTopic中的返回字符串为实际写入的topic名,级别比FlinkKafkaProducer中的topic要高,从FlinkKafkaProducer源码可知,只有targetTopic为空的时候才会使用FlinkKafkaProducer中的topic。