阅读 75

Flink基础(126):FLINK-SQL语法 (20) DQL(12) OPERATIONS(9) Joins(3) Temporal Joins

1 Event Time Temporal Join

Temporal joins allow joining against a . This means a table can be enriched with changing metadata and retrieve its value at a certain point in time.

 

Temporal joins take an arbitrary table (left input/probe site) and correlate each row to the corresponding row’s relevant version in the versioned table (right input/build side). Flink uses the SQL syntax of FOR SYSTEM_TIME AS OF to perform this operation from the SQL:2011 standard. The syntax of a temporal join is as follows;

SELECT [column_list]
FROM table1 [AS ]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ]
ON table1.column-name1 = table2.column-name1

注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。

With an event-time attribute (i.e., a rowtime attribute), it is possible to retrieve the value of a key as it was at some point in the past. This allows for joining the two tables at a common point in time. The versioned table will store all versions - identified by time - since the last watermark.

For example, suppose we have a table of orders, each with prices in different currencies. To properly normalize this table to a single currency, such as USD, each order needs to be joined with the proper currency conversion rate from the point-in-time when the order was placed.

例如,假设我们有一个订单表,每个订单都有不同货币的价格。为了正确地将此表规范化为单一货币(如美元),每个订单都需要从下订单时起使用正确的货币换算率

-- Create a table of orders. This is a standard
-- append-only dynamic table.
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- Define a versioned table of currency rates. 
-- This could be from a change-data-capture
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table. 
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL
    WATERMARK FOR update_time AS update_time
) WITH (
   connector = upsert-kafka,
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time,
FROM orders
LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency

order_id price currency conversion_rate  order_time
====== ==== ======  ============  ========
o_001    11.11  EUR        1.14                    12:00:00
o_002    12.51  EUR        1.10                    12:0600

注意:事件时间 Temporal Join 由左右两侧的水印触发;请确保连接的两侧已正确设置水印。

注意:事件时间 Temporal Join 需要 Join 条件的等价条件中包含主键。

2 Processing Time Temporal Join

A processing time temporal table join uses a processing-time attribute to correlate rows to the latest version of a key in an external versioned table.

By definition, with a processing-time attribute, the join will always return the most up-to-date value for a given key. One can think of a lookup table as a simple HashMap that stores all the records from the build side. The power of this join is it allows Flink to work directly against external systems when it is not feasible to materialize the table as a dynamic table within Flink.

The following processing-time temporal table join example shows an append-only table orders that should be joined with the table LatestRatesLatestRates is a dimension table (e.g. HBase table) that is materialized with the latest rate. At time 10:1510:3010:52, the content of LatestRates looks as follows:

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

The content of LastestRates at times 10:15 and 10:30 are equal. The Euro rate has changed from 114 to 116 at 10:52.

Orders is an append-only table representing payments for the given amount and the given currency. For example, at 10:15 there was an order for an amount of 2 Euro.

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52

Given these tables, we would like to calculate all Orders converted to a common currency.

amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52

With the help of temporal table join, we can express such a query in SQL as:

SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

Each record from the probe side will be joined with the current version of the build side table. In our example, the query uses the processing-time notion, so a newly appended order would always be joined with the most recent version of LatestRates when executing the operation.

The result is not deterministic for processing-time. The processing-time temporal join is most often used to enrich the stream with an external table (i.e., dimension table).

In contrast to , the previous temporal table results will not be affected despite the changes on the build side. Compared to , temporal table joins do not define a time window within which the records join, i.e., old rows are not stored in state.

 

使用 processing time 属性,Join 将始终返回给定键的最新值。

下面的处理时间 Temporal Join 示例显示了表 LatestRates Join Append-only 表 orders。LatestRates 是维度表(例如 HBase 表)。在时间 10:15、10:30、10:52,LatestRates 表的内容如下:

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

在 10:15 和 10:30 时,LastestRates 的内容不变。欧元汇率在 10:52 从 114 变为 116。

Orders 是一个 Append-only 表,含有金额和货币的订单信息。例如,在 10:15 有一个 2 欧元的订单

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52

根据这些表,计算所有订单,转换为相同货币。

amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52

SQL 查询如下:

SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

查询使用处理时间时,左侧的每条记录都将与右侧表的当前版本相关联,新的 Order 记录总是与最新版本的 LatestRates 连接在一起。

 

查询结果对于处理时间来说是不确定的。处理时间 Temporal join 最常用于通过连接外部表(维度表)补充信息。

参考:

https://xie.infoq.cn/article/bf67b2eac8650e57df1ae154c

 

原文:https://www.cnblogs.com/qiu-hua/p/15195698.html

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐