DataStream的设计与实现(datastream使用教程)
DataStream的设计与实现
一. DataStream的含义以及成员构成
1. DataStream的含义
由源码翻译可知,DataStream是一个集合,包含了相同类型
元素的数据流,一个DataStream可以通过transformation(如Map,filter操作)变成另外一种DataStream.同时,DataStream主要用于表达业务逻辑,实际上并没有存储真实数据
2.DataStream的继承关系
图一 DataStream的成员变量以及子类
1.由图一可知,DataStream主要是由StreamExecutionEnvironment
以及transformation
构成.其中SingleOutputStreamOperator、keyedStream继承了DataStream抽象类.
transformation是当前的DataStream对应的上一次的转换操作,即上个阶段的流完成这个transformation操作然后得到了当前流。
StreamExecutionEnvironment会将DataStream之间的转换操作存储至StreamExecutionEnvironment的List中.然后基于这些转换操作构建左右Pipeline拓扑,用于描述整个作业的计算逻辑
图一中也标识了
DataStreamSink,主要是用来从数据流拓扑结构生成元素,同时和其他DataStream进行一个区分
3. DataStream的方法
图二 DataStream的方法介绍
DataStream有大量的transform(转换)操作方法可以调用,主要是将DataStream流进行操作变换成为另外一种流,其底层是调用了Transform方法.
二、DataStream API的应用实例
1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2. DataStream<String> inputStream = env.readTextFile("xxxx.csv"); 3. SingleOutputStreamOperator<Role> dataStream = inputStream.map(new MapFunction<String, Role>() { @Override public Role map(String value) throws Exception { String[] arrs = value.split(","); return new Role(Long.parseLong(arrs[0]), Long.parseLong(arrs[1]), Integer.parseInt(arrs[2]), arrs[3], Long.parseLong(arrs[4])); } }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Role>() { @Override public long extractAscendingTimestamp(Role element) { return element.timestamp * 1000L; } }); 复制代码
第一句首先构建了一个StreamExecutionEnvironment对象env,env.readText会生成DataStreamSource
第二句env.readTextFile方法中构建出了DataStream对象,DataStreamSource继承了DataStream
第三句DataStream对象调用了map方法,生成了SingleOutputStreamOperator对象,因此由当前的dataStream转换成了另外一个dataStream
三、 DataStream的map方法调用流程
图三 DataStream的map方法调用流程
如图三所示,是DataStream的map方法的调用流程,由图得知,所有调用的方法都在dataStream类中进行调用跳转.最后调用了doTransform方法来生成DataStream,返回SingleOutputStreamOperator.
值得注意的是
SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream
SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream
SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream
因此transformation操作本质上就是一种datastream变成另外一种datastream
doTransform方法实现逻辑
伪原创工具 SEO网站优化 https://www.237it.com/
如图三所示, 在调用doTransform方法的时候,主要是把之前map操作中的mapfunction封装成operator.同时,将当前datastream的transformation作为参数,一起构建成新的transformation
将新的tranformation加入至executionenvironment中,用于构建streamGraph
将excutionenvironment和tranformation作为参数,构建新的DataStream
DataStream.doTranform()方法定义
protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // 获取上一次tranformation的输出类型 transformation.getOutputType(); //将本次tranformation作为输入,生成一个新的tranformation OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); //将新的environment和新的tranformation作为参数,构建成新的datastream(SingleOutputStreamOperator) @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); //将当前新生成的tranformation加入到executionenvironment中,用于生成streamGraph getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
作者:大虾饺
链接:https://juejin.cn/post/7035485165929365535