Flink - 只使用 Table&Sql 实现电商用户行为实时分析
Flink 对 DDL 的支持是在 1.9 版本之后的 1.10,1.11 投入了大量的精力做的一个内容,可以通过 SQL DDL 的方式来联通外部系统,对 DDL 的支持的程序决定了与外部系统的联通性,而这时计算引擎一个非常重要的内容。 正 Flink 因为强大的 DDL 的支持,让不会 java scala 语言的人也可以通过规范的 sql 的方式来使用 Flink.
接下来本文将基于伍翀老师的讲课内容使用 Kafka, MySQL, Elasticsearch, Kibana,通过Flink SQL 构建一个电商用户行为的实时分析应用。在 github 中 docker-compose 中使用到的镜像有的已经没有办法再 docker.hub 中找到,所以需要自己来打镜像。并且原有的使用的时一些旧版本的,这里讲使用 1.13.2 的 flink 的最新的稳定版。而打这些新的镜像遇到了一些问题,都会在本文做简单的记录。
操作步骤
原始的 docker-compose 文件内容:
version: '2.1' services: sql-client: image: jark/demo-sql-client:0.2 depends_on: - kafka - jobmanager - elasticsearch environment: FLINK_JOBMANAGER_HOST: jobmanager ZOOKEEPER_CONNECT: zookeeper KAFKA_BOOTSTRAP: kafka MYSQL_HOST: mysql ES_HOST: elasticsearch jobmanager: image: flink:1.11.0-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.11.0-scala_2.11 depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 10 datagen: image: jark/datagen:0.2 command: "java -classpath /opt/datagen/flink-sql-demo.jar myflink.SourceGenerator --input /opt/datagen/user_behavior.log --output kafka kafka:9094 --speedup 2000" depends_on: - kafka environment: ZOOKEEPER_CONNECT: zookeeper KAFKA_BOOTSTRAP: kafka mysql: image: jark/mysql-example:0.2 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.12-2.2.1 ports: - "9092:9092" - "9094:9094" depends_on: - zookeeper environment: - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092 - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CREATE_TOPICS="user_behavior:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0 environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 kibana: image: docker.elastic.co/kibana/kibana:7.6.0 ports: - "5601:5601" 复制代码
软件 | 版本 | 备注 |
---|---|---|
flink | flink:1.13.2-scala_2.11 | |
mysql | 5.6 | |
kafka | 2.8.1 | |
elasticsearch | 7.8.0 | |
kibana | 7.8.0 | |
zookeeper | 3.6.3 | |
sql-client | flink:1.13.2-scala_2.11 | 需要自己添加 jar 包 |
datagen | 根据源码和新的 flink 版本,调整依赖包后使用 |
sql-client 镜像构建
主要在 sql-client 打包的时候,进加入伍翀老师依赖的 lib 包,是无法正常连接 kafka 的,会报错:
添加了匹配版本的依赖之后,仍然会报错:
这个错误明显是缺少什么依赖,但是通过查询网上的资料后,说是 lib 包没有成功加载上,经过重启等操作都不能解决这个问题。因为在之前的通过网页版上传任务时,遇到过一个问题就是使用 maven 打的任务的 jar 包,需要在 shade 的情况下打出完整的依赖。所以猜想需要找到一个 shade 版的flink-connector-kafka_2.11-1.13.2
jar 包,但是始终没有查找到,最后决定添加 kafka-clients-2.8.1.jar
这个基础的 kafka 客户端依赖包,结果真的成功了。
lib |
---|
flink-connector-jdbc_2.11-1.13.2.jar |
flink-connector-kafka_2.11-1.13.2.jar |
flink-sql-connector-elasticsearch7_2.11-1.13.2.jar |
kafka-clients-2.8.1.jar |
mysql-connector-java-8.0.18.jar |
以上几个包时必须添加的,可以直接在 maven 仓库进行下载!
构建 sql-client 的 Dockerfile 文件为:
FROM maven:3.6.3-openjdk-11-slim AS builder ############################################################################### # Build SQL Playground Image ############################################################################### FROM flink:1.13.2-scala_2.11 # Copy sql-client script COPY bin/* /opt/sql-client/ RUN chmod a+x /opt/sql-client/sql-client.sh # connector libraries RUN mkdir -p /opt/sql-client/lib COPY lib/* /opt/sql-client/lib/ # Copy configuration COPY conf/* /opt/flink/conf/ WORKDIR /opt/sql-client ENV SQL_CLIENT_HOME /opt/sql-client COPY docker-entrypoint.sh / ENTRYPOINT ["/docker-entrypoint.sh"] 复制代码
datagen 数据生成器镜像构建
先从下载的flink-sql-demo-1.11-EN
源码中找到 pom.xml,根据选择的 flink 版本修改依赖的版本:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flink-sql-demo</artifactId> <version>1.0.0</version> <properties> <flink.version>1.13.2</flink.version> <kafka.version>2.8.1</kafka.version> <jmh.version>1.17.5</jmh.version> </properties> <dependencies> <!-- Flink modules --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <!-- Logging dependencies --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.12.1</version> </dependency> <!-- JMH--> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>${jmh.version}</version> </dependency> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>${jmh.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-elasticsearch7_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Tests --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> <!-- The semantics of this option are reversed, see MCOMPILER-209. --> <useIncrementalCompilation>false</useIncrementalCompilation> <compilerArgs> <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 --> <arg>-Xpkginfo:always</arg> </compilerArgs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>flink-sql-demo</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <finalName>flink-sql-demo</finalName> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> 复制代码
通过 maven 打出完成的 jar 包
flink-sql-demo.jar
。下载 user_behavior.log 数据,可以在数据集-阿里云天池 (aliyun.com)下载用户行为数据。
修改 Dockerfile 文件,并构建镜像
FROM maven:3.6.3-openjdk-11-slim AS builder FROM openjdk:11-jdk-slim AS jdkImage RUN mkdir -p /opt/datagen; COPY user_behavior.log /opt/datagen COPY flink-sql-demo.jar /opt/datagen WORKDIR /opt/datagen ENV DATEGEN_HOME /opt/datagen 复制代码
然后修改完成的 docker-compose 文件,选择对应的版本就可以进行整体的启动了,启动完成之后,可以查看到这几个 container 已经成功运行了。
然后就可以一步步的按照伍翀老师 github 步骤操作下去了!
执行分析
DDL 定义kafaka 数据源
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime AS PROCTIME(), -- generates processing-time attribute using computed column WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- defines watermark on ts column, marks ts as event-time attribute ) WITH ( 'connector' = 'kafka', -- using kafka connector 'topic' = 'user_behavior', -- kafka topic 'scan.startup.mode' = 'earliest-offset', -- reading from the beginning 'properties.bootstrap.servers' = 'kafka:9094', -- kafka broker address 'format' = 'json' -- the data format is json ); 复制代码
用 DDL 去创建并连接这个 Kafka 中的 topic.
这里的语句执行后,并不会在 flink 上提交任务,create table 的过程其实是定义 source 的过程。 在 SQL CLI 中成功创建 Kafka 表后,可以通过 show tables;
和 describe user_behavior;
来查看目前已注册的表,以及表的详细信息。我们也可以直接在 SQL CLI 中运行 SELECT * FROM user_behavior;
预览下数据(按q
退出)。
Connector Options
下面仅介绍几个常用的,完整的内容参考:Kafka | Apache Flink
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, for Kafka use 'kafka' . |
topic | required for sink | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2' . Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks. |
topic-pattern | optional | (none) | String | The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources. |
properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
properties.group.id | required by source | (none) | String | The id of the consumer group for Kafka source, optional for Kafka sink. |
properties.* | optional | (none) | String | This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false' . But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer' . |
format | required | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'value.format' option are required. |
key.format | optional | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key. |
key.fields | optional | [] | List | Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2' . |
key.fields-prefix | optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY' . |
value.format | required | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'format' option are required. |
scan.startup.mode | optional | group-offsets | String | Startup mode for Kafka consumer, valid values are 'earliest-offset' , 'latest-offset' , 'group-offsets' , 'timestamp' and 'specific-offsets' . See the following Start Reading Position for more details. |
统计每小时的成交量
使用 DDL 创建 Elasticsearch 表
CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', -- using elasticsearch connector 'hosts' = 'http://elasticsearch:9200', -- elasticsearch address 'index' = 'buy_cnt_per_hour' -- elasticsearch index name, similar to database table name ); 复制代码
不需要在 Elasticsearch 中事先创建 buy_cnt_per_hour
索引,Flink Job 会自动创建该索引。
提交 Query 统计每小时的成交量
INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR); 复制代码
TUMBLE
的含义就是滚动窗口,具体在 sql 中的使用方法可以参考:Windowing TVF | Apache Flink 和 Windowing TVF | Apache Flink
以上提交 query 执行后,就可以在 flink 的 jobmanager 的 web 页面看到执行中的 job 了。
该任务是一个流式任务,因此会一直运行。 该 job 运行后,就会真正的去 Elasticsearch 中创建索引了。接下来就可以在 kibana 中管理索引,并配置相应的图表来展示了。
Kibana 可视化结果
统计一天每10分钟累计独立用户数
使用 DDL 创建 Elasticsearch 表
CREATE TABLE cumulative_uv ( date_str STRING, time_str STRING, uv BIGINT, PRIMARY KEY (date_str, time_str) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'cumulative_uv' ); 复制代码
提交 Query 统计一天每10分钟累计独立用户数
先抽取出日期和时间字段,我们使用 DATE_FORMAT
抽取出基本的日期与时间,再用 SUBSTR
和 字符串连接函数 ||
将时间修正到10分钟级别,如: 12:10
, 12:20
。其次,我们在外层查询上基于日期分组,求当前最大的时间,和 UV,写入到 Elasticsearch 的索引中。UV 的统计我们通过内置的 COUNT(DISTINCT user_id)
来完成,Flink SQL 内部对 COUNT DISTINCT 做了非常多的优化,因此可以放心使用。
INSERT INTO cumulative_uv SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv FROM ( SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str, user_id FROM user_behavior) GROUP BY date_str; 复制代码
以上提交 query 执行后,就可以在 flink 的 jobmanager 的 web 页面看到执行中的 job 了。
Kibana 可视化结果
顶级类目排行榜
创建 MySQL 表,后续用作维表查询
CREATE TABLE category_dim ( sub_category_id BIGINT, parent_category_name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysql:3306/flink', 'table-name' = 'category', 'username' = 'root', 'password' = 'root', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '10min' ); 复制代码
再创建一个 Elasticsearch 表,用于存储类目统计结果。
CREATE TABLE top_category ( category_name STRING PRIMARY KEY NOT ENFORCED, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'top_category' ); 复制代码
过维表关联,补全类目名称:
CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id; 复制代码
这是一种临时连接的写法,可以参考地址:Joins | Apache Flink
最后根据 类目名称分组,统计出 buy
的事件数,并写入 Elasticsearch 中。
INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name; 复制代码
flink 查看运行 job
Kibana 可视化结果
最终效果图:
作者:张小舟
链接:https://juejin.cn/post/7018460286747148296