阅读 145

Flink 消费kafka写入ElasticSearch

目标

通过Flink 自带的 ElasticSearch Connector,将 Kafka 中的数据经过 Flink 处理后然后存储到 ElasticSearch。

pom

es的版本根据自己实际使用的版本进行配置,我这里线上使用的是es7;

kafka根据自己使用的scala版本进行配置;

<dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-sql-connector-kafka_2.11</artifactId>     <version>${flink.version}</version> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-connector-elasticsearch7_2.11</artifactId>     <version>${flink.version}</version> </dependency> 复制代码

ES 配置

地址解析

可以自己封装一个地址解析的方法

public static HttpHost[] loadHostArray(String nodes) {     if (httpHostArray == null) {         String[] split = nodes.split(",");         httpHostArray = new HttpHost[split.length];         for(int i = 0; i < split.length; ++i) {             String item = split[i];             httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");         }     }     return httpHostArray; } 复制代码

如果只是测试,可如下:

List<HttpHost> elsearchHosts = new ArrayList<>(); elsearchHosts.add(new HttpHost("127.0.0.1", 9200, "http")); 复制代码

创建 ElasticsearchSink.Builder

配置详解:

  • esSinkBuilder.setBulkFlushInterval(3000);//批量写入的时间间隔,如果设置,则忽略下面两个批量写入配置

  • esSinkBuilder.setBulkFlushMaxSizeMb(10);//批量写入时的最大数据量

  • esSinkBuilder.setBulkFlushMaxActions(1);//批量写入时的最大写入条数

  • esSinkBuilder.setBulkFlushBackoff(true);//是否开启重试机制

  • esSinkBuilder.setBulkFlushBackoffRetries(2);//失败重试次数

失败策略配置一个即可:

  • esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());//默认失败重试

  • esSinkBuilder.setFailureHandler;//重写失败策略,在es磁盘满的情况下还可以写入

ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(elsearchHosts, new ElasticsearchSinkFunction<JSONObject>() {     @Override     public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {         new ElasticsearchSinkFunction<JSONObject>() {             private String INDEX = "test";             @Override             public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {                 //如果尚未存在,则表明必须将部分文档用作upsert文档                 UpdateRequest updateRequest = new UpdateRequest(INDEX, jsonObject.getString("id"));                 updateRequest.upsert(jsonObject, XContentType.JSON);                 updateRequest.doc(jsonObject, XContentType.JSON);                 //添加请求                 requestIndexer.add(updateRequest);             }         };     } }); esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {     @Override     public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {         if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {             // full queue; re-add document for indexing             requestIndexer.add(actionRequest);         } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {             // malformed document; simply drop request without failing sink         } else {             // for all other failures, fail the sink             // here the failure is simply rethrown, but users can also choose to throw custom exceptions             //throw failure             //logger.error(throwable.getMessage());         }     } }); esSinkBuilder.setBulkFlushInterval(3000); esSinkBuilder.setBulkFlushMaxSizeMb(10); esSinkBuilder.setBulkFlushBackoff(true); esSinkBuilder.setBulkFlushBackoffRetries(2); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); 复制代码

sink

mapStream.addSink(esSinkBuilder.build()).name("sink"); 复制代码

kafka配置

参数

Properties properties=new Properties(); properties.setProperty("bootstrap.servers","127.0.0.1:9092"); properties.setProperty("group.id","test"); properties.setProperty("auto.offset.reset","latest"); properties.setProperty("flink.partition-discovery.interval-millis","5000"); properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("enable.auto.commit","true"); properties.setProperty("auto.commit.interval.ms","5000"); 复制代码

数据读取

自定义Schema

BinLogPojo可替换为自己的实体类

public class BinlogSchema implements DeserializationSchema<BinLogPojo> {     @Override     public BinLogPojo deserialize(byte[] bytes) {         String log = new String(bytes, StandardCharsets.UTF_8);         return JSON.parseObject(log, BinLogPojo.class);     }     @Override     public boolean isEndOfStream(BinLogPojo binLogPojo) {         return false;     }     @Override     public TypeInformation<BinLogPojo> getProducedType() {         return TypeInformation.of(new TypeHint<BinLogPojo>(){             @Override             public TypeInformation<BinLogPojo> getTypeInfo() {                 return super.getTypeInfo();             }         });     } } 复制代码

消费kafka

这里有一个注意点,就是不能直接返回一个JSONObject流,这样sink的时候就会报java.lang.UnsupportedOperationException错误,所以建议用String或者自己封装一个序列化的实体类

FlinkKafkaConsumerBase<BinLogPojo> eventConsumer = new FlinkKafkaConsumer<>(         TOPIC, new BinlogSchema(), properties)         .setStartFromLatest(); SingleOutputStreamOperator<JSONObject> mapStream = env.addSource(eventConsumer)         .map(new MapFunction<BinLogPojo, JSONObject>() {             @Override             public JSONObject map(BinLogPojo binLog) throws Exception {                 LinkedHashMap<String, String> binLogData = binLog.getData();                 JSONObject jsonObject = new JSONObject();                 jsonObject.put("status", 0);                 return jsonObject;             }         });


作者:杨慕晚
链接:https://juejin.cn/post/7029240773849661471


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐