阅读 117 SEO

flink整合spring boot

Flink框架:Flink整合springboot

首先说一下, 为什么flink 需要集成flink, spring boot给我们带来了更好的框架整合, 同时使用spring的DI和IOC,能更好的使用bean,当然直接使用spring 整合也是一样。

实现原理

实现原理, spring 的启动 一般使用 AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(AppConfig.class); 即可启动spring 容器, 对么spring boot 呢, 看过源码的人或许知道

SpringApplication.run(arge); 只需要在启动flink之前启动sping boot 即可。

代码

flink 整合spring boot 以及redission, 并将事件的id放入redis 中, 代码库 https://gitee.com/imomoda/flink-sprint-boot

  • spring boot 启动工具类

    @SpringBootApplication(scanBasePackages = {"io.github.jeesk.flink"})
    @Import(SpringUtil.class)
    @Slf4j
    @EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
    public class SpringBootApplicationUtil {
    
    
        static SpringApplication springBootApplication = null;
        static SpringApplicationBuilder springApplicationBuilder = null;
    
        public static synchronized void run(String[] arge) {
            if (springBootApplication == null) {
                StandardEnvironment standardEnvironment = new StandardEnvironment();
                MutablePropertySources propertySources = standardEnvironment.getPropertySources();
                propertySources.addFirst(new SimpleCommandLinePropertySource(arge));
                String startJarPath = SpringBootApplicationUtil.class.getResource("/").getPath().split("!")[0];
                String[] activeProfiles = standardEnvironment.getActiveProfiles();
                propertySources.addLast(new MapPropertySource("systemProperties", standardEnvironment.getSystemProperties()));
                propertySources.addLast(new SystemEnvironmentPropertySource("systemEnvironment", standardEnvironment.getSystemEnvironment()));
                if (springBootApplication == null) {
                    springApplicationBuilder = new SpringApplicationBuilder(SpringBootApplicationUtil.class);
                    // 这里可以通过命令行传入
                    springApplicationBuilder.profiles("dev");
                    springApplicationBuilder.sources(SpringBootApplicationUtil.class).web(WebApplicationType.NONE);
                }
                springBootApplication = springApplicationBuilder.build();
                springBootApplication.run(arge);
            }
        }
    
    
    }
    
  • flink job

    package io.github.jeesk.flink;
    
    import cn.hutool.extra.spring.SpringUtil;
    import io.github.jeesk.flink.config.SpringBootApplicationUtil;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.walkthrough.common.entity.Alert;
    import org.apache.flink.walkthrough.common.entity.Transaction;
    import org.apache.flink.walkthrough.common.sink.AlertSink;
    import org.apache.flink.walkthrough.common.source.TransactionSource;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    public class FraudDetectionJob {
        public static void main(String[] args) throws Exception {
    
            Configuration configuration = new Configuration();
            if (args != null) {
                configuration.setString("args", String.join(" ", args));
            }
            SpringBootApplicationUtil.run(args);
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Transaction> transactions = env
                    .addSource(new TransactionSource())
                    .name("transactions");
    
            DataStream<Alert> alerts = transactions
                    .keyBy(Transaction::getAccountId)
                    .process(new FraudDetector())
                    .name("fraud-detector");
    
            alerts
                    .addSink(new AlertSink())
                    .name("send-alerts");
    
            env.execute("Fraud Detection");
        }
    
        static public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
    
            private StringRedisTemplate redisTemplate = null;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化bean
                super.open(parameters);
                SpringBootApplicationUtil.run(parameters.getString("arge", "").split(" "));
                redisTemplate = SpringUtil.getBean(StringRedisTemplate.class);
    
            }
    
            @Override
            public void processElement(
                    Transaction transaction,
                    Context context,
                    Collector<Alert> collector) throws Exception {
    
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());
                // 将id 放入redis 中
                redisTemplate.opsForSet().add("tmpKey", String.valueOf(alert.getId()));
                collector.collect(alert);
            }
        }
    }
    
    
  • flink 使用logback 还是log4j, 本demo 使用的是Logback , 需要做以下的处理

    1. 服务器端处理: flink 的安装目录下面放入logback 的包,log4j-over-slf4j-1.7.15.jar,logback-classic-1.2.3.jar,logback-core-1.2.3.jar ,
    2. 然后删除lib下面关于log4j的包 log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar), 如果不懂这些包的作用可以仔细阅读: JAVA 常见日志依赖处理细节
    3. 在代码的pom文件里面排除log4j的包
        <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-java</artifactId>
              <version>1.13.1</version>
              <!--排除log4j-->
              <exclusions>
                  <exclusion>
                      <groupId>log4j</groupId>
                      <artifactId>*</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>org.slf4j</groupId>
                      <artifactId>slf4j-log4j12</artifactId>
                  </exclusion>
              </exclusions>
              <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-streaming-java_2.12</artifactId>
              <version>1.13.1</version>
              <!--排除log4j-->
              <exclusions>
                  <exclusion>
                      <groupId>log4j</groupId>
                      <artifactId>*</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>org.slf4j</groupId>
                      <artifactId>slf4j-log4j12</artifactId>
                  </exclusion>
              </exclusions>
              <!--<scope>provided</scope>-->
    </dependency>
    
    
  1. 如果想修改flink 的logback的日志文件 , 可以在flink的conf目录下面修改下面的三个文件

      logback-console.xml
      logback-session.xml
      logback.xml
    

参考内容

作者:山间草夫

原文链接:https://www.jianshu.com/p/1ac7671008ae

文章分类
后端
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 gxwowoo@163.com 举报,一经查实,本站将立刻删除。
相关推荐