Flink 一 环境搭建,输出wordCount
1. Flink环境搭建
1.1 Flink版本列表:
archive.apache.org/dist/flink/
1.2 选择最新的1.12.2版本进行安装
wget https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz复制代码
1.3 解压安装
tar -xzf flink-1.12.2-bin-scala_2.12.tgz ./bin/start-cluster.sh复制代码
检查是否安装成功:jps -l|grep flink
web UI页面地址:http://192.168.9.226:8081/#/overview
2. wordCount例子
2.1 springboot项目目录结构:
2.2 添加maven依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.19</version> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build>复制代码
2.3 示例1:批处理wordCount
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; //批处理wordCount public class WordCountBatch { public static void main(String[] args) throws Exception{ //创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //从文件中读取数据 String filePath = "F:\\ttWork\\flink-demo\\src\\main\\resources\\hello.txt"; DataSet<String> inputDateSet = env.readTextFile(filePath); //分词统计 DataSet<Tuple2<String, Integer>> sum = inputDateSet.flatMap(new MyFlatMap()) .groupBy(0) //第一个位置分组 .sum(1);//第二个位置汇总 sum.print(); } //实现flatMap操作 public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>>{ @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { //按空格分词 String[] words = s.split(" "); //遍历输出二元组 for (String word : words){ collector.collect(new Tuple2<>(word, 1)); } } } }复制代码
2.4 示例2:流处理wordCount
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; //流处理wordCount public class WordCountStream { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(4); //从文件中读取数据 String filePath = "F:\\ttWork\\flink-demo\\src\\main\\resources\\hello.txt"; DataStream<String> inputDataStream = env.readTextFile(filePath); //数据流转换操作 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap()) .keyBy(0) .sum(1); sum.print(); //启动任务 env.execute(); } //实现flatMap操作 public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { //按空格分词 String[] words = s.split(" "); //遍历输出二元组 for (String word : words){ collector.collect(new Tuple2<>(word, 1)); } } } }复制代码
2.5 示例3:socket流处理wordCount
socket端口输入测试数据
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; //socket流处理wordCount public class WordCountSocketStream { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 // env.setParallelism(4); ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); //从socket文本流读取数据 DataStream<String> inputDataStream = env.socketTextStream(host, port); //数据流转换操作 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap()) .keyBy(0) .sum(1); sum.print(); //启动任务 env.execute(); } //实现flatMap操作 public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { //按空格分词 String[] words = s.split(" "); //遍历输出二元组 for (String word : words){ collector.collect(new Tuple2<>(word, 1)); } } } }复制代码
2.6 将示例三使用flink web ui发送到flink服务器上
输入要运行的类,socket的地址以及端口:
在running jobs页上查看flink程序的执行情况,如各个算子的并行度,接受数据的条数大小等
在task managers页上查看flink的日志输出:
作者:画画的贝贝
链接:https://juejin.cn/post/7023210394894204936