阅读 116

Flink - 只使用 Table&Sql 实现电商用户行为实时分析

Flink 对 DDL 的支持是在 1.9 版本之后的 1.10,1.11 投入了大量的精力做的一个内容,可以通过 SQL DDL 的方式来联通外部系统,对 DDL 的支持的程序决定了与外部系统的联通性,而这时计算引擎一个非常重要的内容。 正 Flink 因为强大的 DDL 的支持,让不会 java scala 语言的人也可以通过规范的 sql 的方式来使用 Flink.

接下来本文将基于伍翀老师的讲课内容使用 Kafka, MySQL, Elasticsearch, Kibana,通过Flink SQL 构建一个电商用户行为的实时分析应用。在 github 中 docker-compose 中使用到的镜像有的已经没有办法再 docker.hub 中找到,所以需要自己来打镜像。并且原有的使用的时一些旧版本的,这里讲使用 1.13.2 的 flink 的最新的稳定版。而打这些新的镜像遇到了一些问题,都会在本文做简单的记录。

操作步骤

原始的 docker-compose 文件内容:

version: '2.1' services:   sql-client:     image: jark/demo-sql-client:0.2     depends_on:       - kafka       - jobmanager       - elasticsearch     environment:       FLINK_JOBMANAGER_HOST: jobmanager       ZOOKEEPER_CONNECT: zookeeper       KAFKA_BOOTSTRAP: kafka       MYSQL_HOST: mysql       ES_HOST: elasticsearch   jobmanager:     image: flink:1.11.0-scala_2.11     ports:       - "8081:8081"     command: jobmanager     environment:       - |         FLINK_PROPERTIES=         jobmanager.rpc.address: jobmanager   taskmanager:     image: flink:1.11.0-scala_2.11     depends_on:       - jobmanager     command: taskmanager     environment:       - |         FLINK_PROPERTIES=         jobmanager.rpc.address: jobmanager         taskmanager.numberOfTaskSlots: 10   datagen:     image: jark/datagen:0.2     command: "java -classpath /opt/datagen/flink-sql-demo.jar myflink.SourceGenerator --input /opt/datagen/user_behavior.log --output kafka kafka:9094 --speedup 2000"     depends_on:       - kafka     environment:       ZOOKEEPER_CONNECT: zookeeper       KAFKA_BOOTSTRAP: kafka   mysql:     image: jark/mysql-example:0.2     ports:       - "3306:3306"     environment:       - MYSQL_ROOT_PASSWORD=123456   zookeeper:     image: wurstmeister/zookeeper:3.4.6     ports:       - "2181:2181"   kafka:     image: wurstmeister/kafka:2.12-2.2.1     ports:       - "9092:9092"       - "9094:9094"     depends_on:       - zookeeper     environment:       - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092       - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092       - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT       - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE       - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181       - KAFKA_CREATE_TOPICS="user_behavior:1:1"     volumes:       - /var/run/docker.sock:/var/run/docker.sock   elasticsearch:     image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0     environment:       - cluster.name=docker-cluster       - bootstrap.memory_lock=true       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"       - discovery.type=single-node     ports:       - "9200:9200"       - "9300:9300"     ulimits:       memlock:         soft: -1         hard: -1       nofile:         soft: 65536         hard: 65536   kibana:     image: docker.elastic.co/kibana/kibana:7.6.0     ports:       - "5601:5601" 复制代码

软件版本备注
flinkflink:1.13.2-scala_2.11
mysql5.6
kafka2.8.1
elasticsearch7.8.0
kibana7.8.0
zookeeper3.6.3
sql-clientflink:1.13.2-scala_2.11需要自己添加 jar 包
datagen
根据源码和新的 flink 版本,调整依赖包后使用

sql-client 镜像构建

主要在 sql-client 打包的时候,进加入伍翀老师依赖的 lib 包,是无法正常连接 kafka 的,会报错:

image.png 添加了匹配版本的依赖之后,仍然会报错:

image.png

这个错误明显是缺少什么依赖,但是通过查询网上的资料后,说是 lib 包没有成功加载上,经过重启等操作都不能解决这个问题。因为在之前的通过网页版上传任务时,遇到过一个问题就是使用 maven 打的任务的 jar 包,需要在 shade 的情况下打出完整的依赖。所以猜想需要找到一个 shade 版的flink-connector-kafka_2.11-1.13.2 jar 包,但是始终没有查找到,最后决定添加 kafka-clients-2.8.1.jar这个基础的 kafka 客户端依赖包,结果真的成功了。

image.png

lib
flink-connector-jdbc_2.11-1.13.2.jar
flink-connector-kafka_2.11-1.13.2.jar
flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
kafka-clients-2.8.1.jar
mysql-connector-java-8.0.18.jar

以上几个包时必须添加的,可以直接在 maven 仓库进行下载!

构建 sql-client 的 Dockerfile 文件为:

 FROM maven:3.6.3-openjdk-11-slim AS builder ############################################################################### # Build SQL Playground Image ############################################################################### FROM flink:1.13.2-scala_2.11 # Copy sql-client script COPY bin/* /opt/sql-client/ RUN chmod a+x /opt/sql-client/sql-client.sh # connector libraries RUN mkdir -p /opt/sql-client/lib COPY lib/* /opt/sql-client/lib/ # Copy configuration COPY conf/* /opt/flink/conf/ WORKDIR /opt/sql-client ENV SQL_CLIENT_HOME /opt/sql-client COPY docker-entrypoint.sh / ENTRYPOINT ["/docker-entrypoint.sh"] 复制代码

datagen 数据生成器镜像构建

先从下载的flink-sql-demo-1.11-EN 源码中找到 pom.xml,根据选择的 flink 版本修改依赖的版本:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>     <groupId>org.example</groupId>     <artifactId>flink-sql-demo</artifactId>     <version>1.0.0</version>     <properties>         <flink.version>1.13.2</flink.version>         <kafka.version>2.8.1</kafka.version>         <jmh.version>1.17.5</jmh.version>     </properties>     <dependencies>         <!-- Flink modules -->         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-core</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-table-common</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.kafka</groupId>             <artifactId>kafka-clients</artifactId>             <version>${kafka.version}</version>         </dependency>         <!-- Logging dependencies -->         <dependency>             <groupId>org.slf4j</groupId>             <artifactId>slf4j-log4j12</artifactId>             <version>1.7.7</version>         </dependency>         <dependency>             <groupId>org.apache.logging.log4j</groupId>             <artifactId>log4j-core</artifactId>             <version>2.12.1</version>         </dependency>         <!-- JMH-->         <dependency>             <groupId>org.openjdk.jmh</groupId>             <artifactId>jmh-core</artifactId>             <version>${jmh.version}</version>         </dependency>         <dependency>             <groupId>org.openjdk.jmh</groupId>             <artifactId>jmh-generator-annprocess</artifactId>             <version>${jmh.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId>             <version>${flink.version}</version>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-sql-connector-elasticsearch7_2.11</artifactId>             <version>${flink.version}</version>         </dependency>         <!-- Tests -->         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.12</version>             <scope>test</scope>         </dependency>     </dependencies>     <build>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.8.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                     <!-- The semantics of this option are reversed, see MCOMPILER-209. -->                     <useIncrementalCompilation>false</useIncrementalCompilation>                     <compilerArgs>                         <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->                         <arg>-Xpkginfo:always</arg>                     </compilerArgs>                 </configuration>             </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-shade-plugin</artifactId>                 <executions>                     <execution>                         <id>flink-sql-demo</id>                         <phase>package</phase>                         <goals>                             <goal>shade</goal>                         </goals>                         <configuration>                             <finalName>flink-sql-demo</finalName>                         </configuration>                     </execution>                 </executions>             </plugin>         </plugins>     </build> </project> 复制代码

  • 通过 maven 打出完成的 jar 包flink-sql-demo.jar

  • 下载 user_behavior.log 数据,可以在数据集-阿里云天池 (aliyun.com)下载用户行为数据。

  • 修改 Dockerfile 文件,并构建镜像

 FROM maven:3.6.3-openjdk-11-slim AS builder FROM openjdk:11-jdk-slim AS jdkImage RUN mkdir -p /opt/datagen; COPY user_behavior.log /opt/datagen COPY flink-sql-demo.jar /opt/datagen WORKDIR /opt/datagen ENV DATEGEN_HOME /opt/datagen 复制代码

然后修改完成的 docker-compose 文件,选择对应的版本就可以进行整体的启动了,启动完成之后,可以查看到这几个 container 已经成功运行了。

image.png

然后就可以一步步的按照伍翀老师 github 步骤操作下去了!

执行分析

DDL 定义kafaka 数据源

CREATE TABLE user_behavior (     user_id BIGINT,     item_id BIGINT,     category_id BIGINT,     behavior STRING,     ts TIMESTAMP(3),     proctime AS PROCTIME(),   -- generates processing-time attribute using computed column     WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute ) WITH (     'connector' = 'kafka',  -- using kafka connector     'topic' = 'user_behavior',  -- kafka topic     'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning     'properties.bootstrap.servers' = 'kafka:9094',  -- kafka broker address     'format' = 'json'  -- the data format is json ); 复制代码

用 DDL 去创建并连接这个 Kafka 中的 topic.
这里的语句执行后,并不会在 flink 上提交任务,create table 的过程其实是定义 source 的过程。 在 SQL CLI 中成功创建 Kafka 表后,可以通过 show tables; 和 describe user_behavior; 来查看目前已注册的表,以及表的详细信息。我们也可以直接在 SQL CLI 中运行 SELECT * FROM user_behavior; 预览下数据(按q退出)。

Connector Options
下面仅介绍几个常用的,完整的内容参考:Kafka | Apache Flink

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, for Kafka use 'kafka'.
topicrequired for sink(none)StringTopic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.
topic-patternoptional(none)StringThe regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.
properties.bootstrap.serversrequired(none)StringComma separated list of Kafka brokers.
properties.group.idrequired by source(none)StringThe id of the consumer group for Kafka source, optional for Kafka sink.
properties.*optional(none)StringThis can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
formatrequired(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'value.format' option are required.
key.formatoptional(none)StringThe format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key.
key.fieldsoptional[]ListDefines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'.
key.fields-prefixoptional(none)StringDefines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'.
value.formatrequired(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'format' option are required.





scan.startup.modeoptionalgroup-offsetsStringStartup mode for Kafka consumer, valid values are 'earliest-offset''latest-offset''group-offsets''timestamp' and 'specific-offsets'. See the following Start Reading Position for more details.

统计每小时的成交量

使用 DDL 创建 Elasticsearch 表

CREATE TABLE buy_cnt_per_hour (     hour_of_day BIGINT,     buy_cnt BIGINT ) WITH (     'connector' = 'elasticsearch-7', -- using elasticsearch connector     'hosts' = 'http://elasticsearch:9200',  -- elasticsearch address     'index' = 'buy_cnt_per_hour'  -- elasticsearch index name, similar to database table name ); 复制代码

不需要在 Elasticsearch 中事先创建 buy_cnt_per_hour 索引,Flink Job 会自动创建该索引。

提交 Query 统计每小时的成交量

INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR); 复制代码

TUMBLE的含义就是滚动窗口,具体在 sql 中的使用方法可以参考:Windowing TVF | Apache Flink  和  Windowing TVF | Apache Flink

image.png 以上提交 query 执行后,就可以在 flink 的 jobmanager 的 web 页面看到执行中的 job 了。

image.png 该任务是一个流式任务,因此会一直运行。 该 job 运行后,就会真正的去 Elasticsearch 中创建索引了。接下来就可以在 kibana 中管理索引,并配置相应的图表来展示了。

Kibana 可视化结果

image.png

统计一天每10分钟累计独立用户数

使用 DDL 创建 Elasticsearch 表

CREATE TABLE cumulative_uv (     date_str STRING,     time_str STRING,     uv BIGINT,     PRIMARY KEY (date_str, time_str) NOT ENFORCED ) WITH (     'connector' = 'elasticsearch-7',     'hosts' = 'http://elasticsearch:9200',     'index' = 'cumulative_uv' ); 复制代码

提交 Query 统计一天每10分钟累计独立用户数

先抽取出日期和时间字段,我们使用 DATE_FORMAT 抽取出基本的日期与时间,再用 SUBSTR 和 字符串连接函数 || 将时间修正到10分钟级别,如: 12:1012:20。其次,我们在外层查询上基于日期分组,求当前最大的时间,和 UV,写入到 Elasticsearch 的索引中。UV 的统计我们通过内置的 COUNT(DISTINCT user_id)来完成,Flink SQL 内部对 COUNT DISTINCT 做了非常多的优化,因此可以放心使用。

INSERT INTO cumulative_uv SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv FROM (   SELECT     DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,     SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,     user_id   FROM user_behavior) GROUP BY date_str; 复制代码

以上提交 query 执行后,就可以在 flink 的 jobmanager 的 web 页面看到执行中的 job 了。

image.png

Kibana 可视化结果

image.png

顶级类目排行榜

创建 MySQL 表,后续用作维表查询

CREATE TABLE category_dim (     sub_category_id BIGINT,     parent_category_name STRING ) WITH (     'connector' = 'jdbc',     'url' = 'jdbc:mysql://mysql:3306/flink',     'table-name' = 'category',     'username' = 'root',     'password' = 'root',     'lookup.cache.max-rows' = '5000',     'lookup.cache.ttl' = '10min' ); 复制代码

再创建一个 Elasticsearch 表,用于存储类目统计结果。

CREATE TABLE top_category (     category_name STRING PRIMARY KEY NOT ENFORCED,     buy_cnt BIGINT ) WITH (     'connector' = 'elasticsearch-7',     'hosts' = 'http://elasticsearch:9200',     'index' = 'top_category' ); 复制代码

过维表关联,补全类目名称:

CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id; 复制代码

这是一种临时连接的写法,可以参考地址:Joins | Apache Flink

image.png 最后根据 类目名称分组,统计出 buy 的事件数,并写入 Elasticsearch 中。

INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name; 复制代码

flink 查看运行 job image.png

Kibana 可视化结果

image.png

最终效果图:

image.png


作者:张小舟
链接:https://juejin.cn/post/7018460286747148296


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐