阅读 234

FLINK-connectors-Kafka读写

FLINK-connectors-Kafka读写

1.flink读取kafka

复制代码

import java.util.Propertiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.kafka.clients.consumer.ConsumerRecord

object finktest {

  val sourceTopic = "test_source"
  val DEFAULT_PARALLELISM = 3
  def main(args: Array[String]): Unit = {    //----------------------------读取参数------------------------------------------------
    val params = ParameterTool.fromArgs(args)
    val parallelism = params.getInt("parallelism",DEFAULT_PARALLELISM)    //----------------------------配置执行环境--------------------------------------------
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(parallelism)    //----------------------------创建数据源----------------------------------------------
    val consumerProps = new Properties()
    consumerProps.setProperty("bootstrap.servers", brokersIp)
    consumerProps.setProperty("group.id", "test")
    val consumer = new FlinkKafkaConsumer[ConsumerRecord[String,String]](sourceTopic,new KafkaDeserializationSchema,consumerProps)

    val source = env.addSource(consumer)    //----------------------------执行作业------------------------------------------------    source.print()
    env.execute()
  }
}

复制代码

2.flink写入kafka

复制代码

import java.util.Propertiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}import org.apache.kafka.clients.consumer.ConsumerRecord

object finktest {

  val sourceTopic = "test_source"
  val sinkTopic = "test_sink"
  val DEFAULT_PARALLELISM = 3
  def main(args: Array[String]): Unit = {    //----------------------------读取参数------------------------------------------------
    val params = ParameterTool.fromArgs(args)
    val parallelism = params.getInt("parallelism",DEFAULT_PARALLELISM)    //----------------------------配置执行环境--------------------------------------------
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(parallelism)    //----------------------------创建数据源----------------------------------------------
    val consumerProps = new Properties()
    consumerProps.setProperty("bootstrap.servers", brokersIp)
    consumerProps.setProperty("group.id", "test")
    val consumer = new FlinkKafkaConsumer[ConsumerRecord[String,String]](sourceTopic,new KafkaDeserializationSchema,consumerProps)

    val source = env.addSource(consumer).map(_.value())    
    //----------------------------将数据输出到kafka---------------------------------------
    val producerProps = new Properties()
    producerProps.setProperty("bootstrap.servers", brokersIp)
    val sink = new FlinkKafkaProducer[String](sinkTopic,new KafkaStringSerializer(sinkTopic),producerProps,
    FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
    source.addSink(sink)    //----------------------------执行作业------------------------------------------------    //source.print()    env.execute()
  }
}

复制代码

3.flink+kafka exactly once

env.enableCheckpointing(6000) // 启动检查点并设置保存检查点,时间间隔1分/次

4.flink kafkaSink自定义序列化类(实现按需流向不同topic

复制代码

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemacase class SensorReading(name:String,time:Long,tempature:Double)

object kafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment    //消费kafka流进行处理
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")    //FlinkKafkaConsumer011(topic: String, valueDeserializer: DeserializationSchema[T], props: Properties) 
    val dataStream = env.addSource(new FlinkKafkaConsumer011[String]("test_lir",new SimpleStringSchema(),properties))
    val senseStream = dataStream.map(f=>{
      val arr = f.split(",")
      SensorReading(arr(0).trim,arr(1).trim.toLong,arr(2).trim.toDouble)
    })    /*
   dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092","test_result",new SimpleStringSchema()))
    FlinkKafkaProducer011构造器参数:(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema)
    SimpleStringSchema为默认的序列化    */

    senseStream.addSink( new FlinkKafkaProducer011[SensorReading]("localhost:9092","test_result",new MySchema))
    env.execute("kafka sink test")
  }  class MySchema extends KeyedSerializationSchema[SensorReading]{
    override def serializeKey(t: SensorReading): Array[Byte] = t.name.getBytes()    //此方法才是实际底层produce的topic,FlinkKafkaProducer011中的topic_name级别不如此级别,重写该方法逻辑可以实现按需流向不同topic
    override def getTargetTopic(t: SensorReading): String = "test_result2"

    override def serializeValue(t: SensorReading): Array[Byte] = t.toString.getBytes
  }

}

复制代码

注意:getTargetTopic中的返回字符串为实际写入的topic名,级别比FlinkKafkaProducer中的topic要高,从FlinkKafkaProducer源码可知,只有targetTopic为空的时候才会使用FlinkKafkaProducer中的topic。


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐