阅读 601

DataStream的设计与实现(datastream使用教程)

DataStream的设计与实现

一. DataStream的含义以及成员构成

1. DataStream的含义

由源码翻译可知,DataStream是一个集合,包含了相同类型元素的数据流,一个DataStream可以通过transformation(如Map,filter操作)变成另外一种DataStream.同时,DataStream主要用于表达业务逻辑,实际上并没有存储真实数据

2.DataStream的继承关系

image.png

图一  DataStream的成员变量以及子类

1.由图一可知,DataStream主要是由StreamExecutionEnvironment 以及transformation构成.其中SingleOutputStreamOperator、keyedStream继承了DataStream抽象类.

  • transformation是当前的DataStream对应的上一次的转换操作,即上个阶段的流完成这个transformation操作然后得到了当前流。

  • StreamExecutionEnvironment会将DataStream之间的转换操作存储至StreamExecutionEnvironment的List中.然后基于这些转换操作构建左右Pipeline拓扑,用于描述整个作业的计算逻辑

图一中也标识了DataStreamSink,主要是用来从数据流拓扑结构生成元素,同时和其他DataStream进行一个区分

3. DataStream的方法

image.png

图二  DataStream的方法介绍

DataStream有大量的transform(转换)操作方法可以调用,主要是将DataStream流进行操作变换成为另外一种流,其底层是调用了Transform方法.

二、DataStream API的应用实例

 1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2. DataStream<String> inputStream = env.readTextFile("xxxx.csv"); 3. SingleOutputStreamOperator<Role> dataStream = inputStream.map(new MapFunction<String, Role>() {             @Override             public Role map(String value) throws Exception {                 String[] arrs = value.split(",");                 return new Role(Long.parseLong(arrs[0]), Long.parseLong(arrs[1]), Integer.parseInt(arrs[2]), arrs[3], Long.parseLong(arrs[4]));             }         }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Role>() {             @Override             public long extractAscendingTimestamp(Role element) {                 return element.timestamp * 1000L;             }         }); 复制代码

  1. 第一句首先构建了一个StreamExecutionEnvironment对象env,env.readText会生成DataStreamSource

  2. 第二句env.readTextFile方法中构建出了DataStream对象,DataStreamSource继承了DataStream

  3. 第三句DataStream对象调用了map方法,生成了SingleOutputStreamOperator对象,因此由当前的dataStream转换成了另外一个dataStream

三、 DataStream的map方法调用流程

image.png

图三  DataStream的map方法调用流程

如图三所示,是DataStream的map方法的调用流程,由图得知,所有调用的方法都在dataStream类中进行调用跳转.最后调用了doTransform方法来生成DataStream,返回SingleOutputStreamOperator.
值得注意的是

SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream
SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream
SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream

因此transformation操作本质上就是一种datastream变成另外一种datastream

doTransform方法实现逻辑

 伪原创工具 SEO网站优化  https://www.237it.com/ 

  • 如图三所示, 在调用doTransform方法的时候,主要是把之前map操作中的mapfunction封装成operator.同时,将当前datastream的transformation作为参数,一起构建成新的transformation

  • 将新的tranformation加入至executionenvironment中,用于构建streamGraph

  • 将excutionenvironment和tranformation作为参数,构建新的DataStream

DataStream.doTranform()方法定义

 protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // 获取上一次tranformation的输出类型 transformation.getOutputType();              //将本次tranformation作为输入,生成一个新的tranformation OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism());         //将新的environment和新的tranformation作为参数,构建成新的datastream(SingleOutputStreamOperator) @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);         //将当前新生成的tranformation加入到executionenvironment中,用于生成streamGraph getExecutionEnvironment().addOperator(resultTransform); return returnStream; }


作者:大虾饺
链接:https://juejin.cn/post/7035485165929365535


文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐