Data Pipelines & ETL
Flink一个最常见的use case就是做ETL。
1. Stateless Transformation
无状态的转换最基础的操作就是map和flatMap.
map操作执行的是一对一的转换,即对于每个stream中的元素都会输出一个转换后的元素。
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}
而flatMap通过一个Collector接收输出,所以输出的元素数量可以与输入的不一致。
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}
2. Keyed Streams
- KeyBy()
根据元素的某个属性进行分区,就像group by一样,通常这会导致昂贵的网络交换,序列化以及反序列化 - Keys are computed
也可以将多个属性的计算结果作为key, 但为了在需要的时候重新计算key要保证每次计算的结果都是相同的
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat));
- Aggregations on Keyed Streams
import org.joda.time.Interval;
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();
先将stream按照startCell分组,再对每组partition做聚合运算。上例会实时更新每个startCell的max duration
- Implicit State
在上例中程序维护了一个implicit的state, 即每个key的max duration.在这个例子中state很简单,但在实际生产中,我们最好一个时间窗口内保存state,而非在整个stream中。以避免state过大。
3. Stateful Transformations
Rich Functions
rich functions, 如RichFlatMapFunction,包含了额外的方法,如:
open(Configuration c): 只在operator初始化时调用一次,可以用来加载静态数据,或建立与外部服务的连接
close():
getRuntimeContext(): 可以创建或获取由Flink管理的stateKeyed State的例子
private static class Event {
public final String key;
public final long timestamp;
...
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();
env.execute();
}
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
Flink支持多种类型的keyed state, 本例中使用的是最简单的valueState. 对于每一个key, flink会维护一个对象。程序刚启动时,调用open()方法,还没有event, 也就没有key. 后面event出现调用flatMap时,可以获取到key,就可以用于在flink的state后端做判断。
部署到分布式集群上时,会有很多个Deduplicator 实例,每一个都只对整个keyspace上互不相关的state负责,因此当你看见一个valueState时,要明白这不止代表一个Boolean对象,而是一个分布式的共享的key-value store.
- Clearing State
如果例子中的key是无界的,我们就需要手动清理state, 这通过clear()方法实现
keyHasBeenSeen.clear();
你可以指定一个Timer来执行这个操作,或者指定valueState的Time-To-Live参数
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
- Non-keyed State
有时候我们也会需要管理non-keyed的state, 这通常称作operator state
4. Connected Stream
一个operator可以有两个及以上的source, 其中一个是data, 另一个可以是rules, thresholds或者其他参数等。也可以用作Streaming joins.
要注意的是两个连接在一起的stream必须要有兼容的key.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
对element执行flatMap1还是flatMap2是根据两个stream connect的顺序决定的。本例中control.connect(streamOfWords) 所以connect走flatMap1,dataStream走flatMap2. 但是你是没有办法控制flatMap1和flatMap2执行的顺序的,因为两个stream是竞争的关系,完全由Flink运行时决定的。所以如果顺序或者执行时间很重要的情境下,最好先将events缓存在flink state中,或者通过InputSelectable 接口指定执行的顺序。
作者:啵啵也爱吃香菜
原文链接:https://www.jianshu.com/p/3be1ac6741ce