阅读 315

Flink 使用之 SQL 和 DataStream 交互

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

本篇为大家带来 Flink SQL 和 DataStream API 结合使用的使用方法。Flink DataStream API是Flink数据流处理标准API,SQL是Flink后期版本提供的新的数据处理操作接口。SQL的引入为提高了Flink使用的灵活性。读者可以认为Flink SQL是一种通过字符串来定义数据流处理逻辑的描述语言。

引入依赖

参见Flink 使用之 SQL 连接 Kafka。本篇不使用Kafka,它的相关依赖可忽略。

一个完整的例子

在分布讲解SQL使用之前,先给大家一个完整的例子。该例子使用Scala代码编写。

package com.paultech.sql

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object SqlStreamEnvDemo {
    def main(args: Array[String]): Unit = {
        val environment = StreamExecutionEnvironment.getExecutionEnvironment
        val tEnv = StreamTableEnvironment.create(environment)

        val dataStream = environment.readTextFile( "/path/to/student.csv")

        val mapped = dataStream.map(line => {
            val strings = line.split(",")
            Student(strings(0).toInt, strings(1), strings(2).toInt)
        })

        val table = tEnv.fromDataStream(mapped)

        tEnv.createTemporaryView("student", table)

        val result = tEnv.sqlQuery("select name, score from student where score > 60")

        result.execute().print()
    }
}

case class Student(var id: Int, var name: String, var score: Int)

该例子包含的执行步骤如下:

  • 从CSV文件,按行读取文本到DataStream
  • 解析CSV内容,映射为Student
  • 查询score大于60的所有student信息

下面开始分析Flink SQL使用的基本套路。

创建 Environment

Flink提供了批和流的Environment,还有通用的TableEnvironment。这里推荐使用StreamTableEnvironment,只有它能够和DataStream API交互。创建它的方法如下:

val environment = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnvironment = StreamTableEnvironment.create(environment)

当然,我们也可以使用通用的TableEnvironment

val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment = TableEnvironment.create(bsSettings)

加载数据源

StreamTableEnvironment支持从DataStream转换为Table类型。这样DataStream也可以作为数据源来使用了。

// 转换DataStream为Table
val table = streamTableEnvironment.fromDataStream(dataStream)

// 创建一个视图对应这个table,后面的SQL语句可操作这个table
streamTableEnvironment.createTemporaryView("table_name", table)

Flink官网为我们总结了StreamTableEnvironment的转换操作:

fromDataStream(DataStream): Interprets a stream of insert-only changes and arbitrary type as a table. Event-time and watermarks are not propagated by default.

将DataStream转换为Table。DataStream只支持数据插入。默认情况event time和watermark不会向下游传播。

fromDataStream(DataStream, Schema): Interprets a stream of insert-only changes and arbitrary type as a table. The optional schema allows to enrich column data types and add time attributes, watermarks strategies, other computed columns, or primary keys.

除了可以专门指定table的 schema外,其余的和上一个方法基本相同。

createTemporaryView(String, DataStream): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream)).

createTemporaryView(String, fromDataStream(DataStream))的简写。

createTemporaryView(String, DataStream, Schema): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream, Schema)).

createTemporaryView(String, fromDataStream(DataStream, Schema))的简写。

toDataStream(table): Converts a table into a stream of insert-only changes. The default stream record type is org.apache.flink.types.Row. A single rowtime attribute column is written back into the DataStream API’s record. Watermarks are propagated as well.

将table转换为DataStream,默认转换出的数据类型为org.apache.flink.types.Row。包含了event time和watermark。

toDataStream(table, AbstractDataType): Converts a table into a stream of insert-only changes. This method accepts a data type to express the desired stream record type. The planner might insert implicit casts and reorders columns to map columns to fields of the (possibly nested) data type.

toDataStream(table, Class): A shortcut for toDataStream(DataStream, DataTypes.of(Class)) to quickly create the desired data type reflectively.

这两个重载方法可以指定转换出的数据类型。

指定 Table Schema

说到表就不可能不谈到Schema。那么Flink 从数据流转换为table的时候是如何得知table schema的?

通过 Java Bean 或者 Scala Case Class获取

如果DataStream中的数据是Java Bean或者是Scala的case class,则无需额外指定Schema,class的字段名就是表的列名。字段类型就是class成员变量的类型。

// 定义一个Case Class
// 传统Java的Java Bean和Scala的case class都可以被Flink支持
case class Student(var id: Int, var name: String, var score: Int)
// ...
val mapped = dataStream.map(line => {
    val strings = line.split(",")
    Student(strings(0).toInt, strings(1), strings(2).toInt)
})

通过 as 指定

通过as方法可以方便的指定列名,但无法指定数据类型。

val table = streamTableEnvironment.fromDataStream(dataStream).as("col1", "col2", "col3")

使用 Schema 定义

Flink还提供了Schema定义方式。它使用了Builder设计模式,不仅能够指定字段名和字段类型,还可以定义watermark,primary key等。是功能最为全面的Schema配置方式。

下面是一个来自官网的例子:

val table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
            .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
            .build())

查看 Schema

在Flink调试时候查看Table的Schema可以调用它的printSchema方法,打印Schema到控制台。

table.printSchema()

执行 SQL

使用sqlQuery方法可执行select查询语句,返回一个Table类型结果,可继续用于下一步数据加工处理。

val result = streamTableEnvironment.sqlQuery("select * from ...")

当然,我们还可以使用executeSql方法。需要注意的是,它返回TableResult类型,而不是Table类型。TableResult将能用于查看结果。

val tableResult = streamTableEnvironment.executeSql("...")

获取结果

对于Table类型,我们可调用它的execute方法,执行SQL并返回TableResult,然后调用print,打印结果到控制台。

result.execute().print()

还可以将数据转换为DataStream。这样我们就可以继续使用DataStream API来处理数据流了。

val resultStream = streamTableEnvironment.toChangelogStream(resultTable)
resultStream.print()

Change log stream类似于CDC(Change data capture),它反映出的是数据的变化过程。对应数据的新增和修改。

我们还可以将table转换为普通的DataStream

streamTableEnvironment.toDataStream(table).print()

作者:AlienPaul

原文链接:https://www.jianshu.com/p/3a00923c929a

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐