Flink基础(126):FLINK-SQL语法 (20) DQL(12) OPERATIONS(9) Joins(3) Temporal Joins
1 Event Time Temporal Join
Temporal joins allow joining against a . This means a table can be enriched with changing metadata and retrieve its value at a certain point in time.
Temporal joins take an arbitrary table (left input/probe site) and correlate each row to the corresponding row’s relevant version in the versioned table (right input/build side). Flink uses the SQL syntax of FOR SYSTEM_TIME AS OF
to perform this operation from the SQL:2011 standard. The syntax of a temporal join is as follows;
SELECT [column_list] FROM table1 [AS] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ] ON table1.column-name1 = table2.column-name1
注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF
,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime
表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。
With an event-time attribute (i.e., a rowtime attribute), it is possible to retrieve the value of a key as it was at some point in the past. This allows for joining the two tables at a common point in time. The versioned table will store all versions - identified by time - since the last watermark.
For example, suppose we have a table of orders, each with prices in different currencies. To properly normalize this table to a single currency, such as USD, each order needs to be joined with the proper currency conversion rate from the point-in-time when the order was placed.
例如,假设我们有一个订单表,每个订单都有不同货币的价格。为了正确地将此表规范化为单一货币(如美元),每个订单都需要从下订单时起使用正确的货币换算率
-- Create a table of orders. This is a standard -- append-only dynamic table. CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); -- Define a versioned table of currency rates. -- This could be from a change-data-capture -- such as Debezium, a compacted Kafka topic, or any other -- way of defining a versioned table. CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL WATERMARK FOR update_time AS update_time ) WITH ( ‘connector‘ = ‘upsert-kafka‘, /* ... */ ); SELECT order_id, price, currency, conversion_rate, order_time, FROM orders LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time ON orders.currency = currency_rates.currency order_id price currency conversion_rate order_time ====== ==== ====== ============ ======== o_001 11.11 EUR 1.14 12:00:00 o_002 12.51 EUR 1.10 12:0600
注意:事件时间 Temporal Join 由左右两侧的水印触发;请确保连接的两侧已正确设置水印。
注意:事件时间 Temporal Join 需要 Join 条件的等价条件中包含主键。
2 Processing Time Temporal Join
A processing time temporal table join uses a processing-time attribute to correlate rows to the latest version of a key in an external versioned table.
By definition, with a processing-time attribute, the join will always return the most up-to-date value for a given key. One can think of a lookup table as a simple HashMap
The following processing-time temporal table join example shows an append-only table orders
that should be joined with the table LatestRates
. LatestRates
is a dimension table (e.g. HBase table) that is materialized with the latest rate. At time 10:15
, 10:30
, 10:52
, the content of LatestRates
looks as follows:
10:15> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:30> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:52> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 116 <==== changed from 114 to 116 Yen 1
The content of LastestRates
at times 10:15
and 10:30
are equal. The Euro rate has changed from 114 to 116 at 10:52
.
Orders
is an append-only table representing payments for the given amount
and the given currency
. For example, at 10:15
there was an order for an amount of 2 Euro
.
SELECT * FROM Orders; amount currency ====== ========= 2 Euro <== arrived at time 10:15 1 US Dollar <== arrived at time 10:30 2 Euro <== arrived at time 10:52
Given these tables, we would like to calculate all Orders
converted to a common currency.
amount currency rate amount*rate ====== ========= ======= ============ 2 Euro 114 228 <== arrived at time 10:15 1 US Dollar 102 102 <== arrived at time 10:30 2 Euro 116 232 <== arrived at time 10:52
With the help of temporal table join, we can express such a query in SQL as:
SELECT o.amount, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
Each record from the probe side will be joined with the current version of the build side table. In our example, the query uses the processing-time notion, so a newly appended order would always be joined with the most recent version of LatestRates
when executing the operation.
The result is not deterministic for processing-time. The processing-time temporal join is most often used to enrich the stream with an external table (i.e., dimension table).
In contrast to , the previous temporal table results will not be affected despite the changes on the build side. Compared to , temporal table joins do not define a time window within which the records join, i.e., old rows are not stored in state.
使用 processing time 属性,Join 将始终返回给定键的最新值。
下面的处理时间 Temporal Join 示例显示了表 LatestRates Join Append-only 表 orders。LatestRates 是维度表(例如 HBase 表)。在时间 10:15、10:30、10:52,LatestRates 表的内容如下:
10:15> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:30> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 10:52> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Euro 116 <==== changed from 114 to 116 Yen 1
在 10:15 和 10:30 时,LastestRates 的内容不变。欧元汇率在 10:52 从 114 变为 116。
Orders 是一个 Append-only 表,含有金额和货币的订单信息。例如,在 10:15 有一个 2 欧元的订单
SELECT * FROM Orders; amount currency ====== ========= 2 Euro <== arrived at time 10:15 1 US Dollar <== arrived at time 10:30 2 Euro <== arrived at time 10:52
根据这些表,计算所有订单,转换为相同货币。
amount currency rate amount*rate ====== ========= ======= ============ 2 Euro 114 228 <== arrived at time 10:15 1 US Dollar 102 102 <== arrived at time 10:30 2 Euro 116 232 <== arrived at time 10:52
SQL 查询如下:
SELECT o.amount, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
查询使用处理时间时,左侧的每条记录都将与右侧表的当前版本相关联,新的 Order 记录总是与最新版本的 LatestRates 连接在一起。
查询结果对于处理时间来说是不确定的。处理时间 Temporal join 最常用于通过连接外部表(维度表)补充信息。
参考:
https://xie.infoq.cn/article/bf67b2eac8650e57df1ae154c
原文:https://www.cnblogs.com/qiu-hua/p/15195698.html