Flink 消费kafka写入ElasticSearch
目标
通过Flink 自带的 ElasticSearch Connector,将 Kafka 中的数据经过 Flink 处理后然后存储到 ElasticSearch。
pom
es的版本根据自己实际使用的版本进行配置,我这里线上使用的是es7;
kafka根据自己使用的scala版本进行配置;
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>${flink.version}</version> </dependency> 复制代码
ES 配置
地址解析
可以自己封装一个地址解析的方法
public static HttpHost[] loadHostArray(String nodes) { if (httpHostArray == null) { String[] split = nodes.split(","); httpHostArray = new HttpHost[split.length]; for(int i = 0; i < split.length; ++i) { String item = split[i]; httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http"); } } return httpHostArray; } 复制代码
如果只是测试,可如下:
List<HttpHost> elsearchHosts = new ArrayList<>(); elsearchHosts.add(new HttpHost("127.0.0.1", 9200, "http")); 复制代码
创建 ElasticsearchSink.Builder
配置详解:
esSinkBuilder.setBulkFlushInterval(3000);//批量写入的时间间隔,如果设置,则忽略下面两个批量写入配置
esSinkBuilder.setBulkFlushMaxSizeMb(10);//批量写入时的最大数据量
esSinkBuilder.setBulkFlushMaxActions(1);//批量写入时的最大写入条数
esSinkBuilder.setBulkFlushBackoff(true);//是否开启重试机制
esSinkBuilder.setBulkFlushBackoffRetries(2);//失败重试次数
失败策略配置一个即可:
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());//默认失败重试
esSinkBuilder.setFailureHandler;//重写失败策略,在es磁盘满的情况下还可以写入
ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(elsearchHosts, new ElasticsearchSinkFunction<JSONObject>() { @Override public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { new ElasticsearchSinkFunction<JSONObject>() { private String INDEX = "test"; @Override public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { //如果尚未存在,则表明必须将部分文档用作upsert文档 UpdateRequest updateRequest = new UpdateRequest(INDEX, jsonObject.getString("id")); updateRequest.upsert(jsonObject, XContentType.JSON); updateRequest.doc(jsonObject, XContentType.JSON); //添加请求 requestIndexer.add(updateRequest); } }; } }); esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() { @Override public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable { if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) { // full queue; re-add document for indexing requestIndexer.add(actionRequest); } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions //throw failure //logger.error(throwable.getMessage()); } } }); esSinkBuilder.setBulkFlushInterval(3000); esSinkBuilder.setBulkFlushMaxSizeMb(10); esSinkBuilder.setBulkFlushBackoff(true); esSinkBuilder.setBulkFlushBackoffRetries(2); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); 复制代码
sink
mapStream.addSink(esSinkBuilder.build()).name("sink"); 复制代码
kafka配置
参数
Properties properties=new Properties(); properties.setProperty("bootstrap.servers","127.0.0.1:9092"); properties.setProperty("group.id","test"); properties.setProperty("auto.offset.reset","latest"); properties.setProperty("flink.partition-discovery.interval-millis","5000"); properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("enable.auto.commit","true"); properties.setProperty("auto.commit.interval.ms","5000"); 复制代码
数据读取
自定义Schema
BinLogPojo可替换为自己的实体类
public class BinlogSchema implements DeserializationSchema<BinLogPojo> { @Override public BinLogPojo deserialize(byte[] bytes) { String log = new String(bytes, StandardCharsets.UTF_8); return JSON.parseObject(log, BinLogPojo.class); } @Override public boolean isEndOfStream(BinLogPojo binLogPojo) { return false; } @Override public TypeInformation<BinLogPojo> getProducedType() { return TypeInformation.of(new TypeHint<BinLogPojo>(){ @Override public TypeInformation<BinLogPojo> getTypeInfo() { return super.getTypeInfo(); } }); } } 复制代码
消费kafka
这里有一个注意点,就是不能直接返回一个JSONObject流,这样sink的时候就会报java.lang.UnsupportedOperationException错误,所以建议用String或者自己封装一个序列化的实体类
FlinkKafkaConsumerBase<BinLogPojo> eventConsumer = new FlinkKafkaConsumer<>( TOPIC, new BinlogSchema(), properties) .setStartFromLatest(); SingleOutputStreamOperator<JSONObject> mapStream = env.addSource(eventConsumer) .map(new MapFunction<BinLogPojo, JSONObject>() { @Override public JSONObject map(BinLogPojo binLog) throws Exception { LinkedHashMap<String, String> binLogData = binLog.getData(); JSONObject jsonObject = new JSONObject(); jsonObject.put("status", 0); return jsonObject; } });
作者:杨慕晚
链接:https://juejin.cn/post/7029240773849661471