阅读 84

Data Pipelines & ETL

Flink一个最常见的use case就是做ETL。

1. Stateless Transformation

无状态的转换最基础的操作就是map和flatMap.
map操作执行的是一对一的转换,即对于每个stream中的元素都会输出一个转换后的元素。

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

而flatMap通过一个Collector接收输出,所以输出的元素数量可以与输入的不一致。

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

2. Keyed Streams

  • KeyBy()
    根据元素的某个属性进行分区,就像group by一样,通常这会导致昂贵的网络交换,序列化以及反序列化
  • Keys are computed
    也可以将多个属性的计算结果作为key, 但为了在需要的时候重新计算key要保证每次计算的结果都是相同的
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat));
  • Aggregations on Keyed Streams
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });

minutesByStartCell
  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
  .maxBy(1) // duration
  .print();

先将stream按照startCell分组,再对每组partition做聚合运算。上例会实时更新每个startCell的max duration

  • Implicit State
    在上例中程序维护了一个implicit的state, 即每个key的max duration.在这个例子中state很简单,但在实际生产中,我们最好一个时间窗口内保存state,而非在整个stream中。以避免state过大。

3. Stateful Transformations

  • Rich Functions
    rich functions, 如RichFlatMapFunction,包含了额外的方法,如:
    open(Configuration c): 只在operator初始化时调用一次,可以用来加载静态数据,或建立与外部服务的连接
    close():
    getRuntimeContext(): 可以创建或获取由Flink管理的state

  • Keyed State的例子

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();
  
    env.execute();
}

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

Flink支持多种类型的keyed state, 本例中使用的是最简单的valueState. 对于每一个key, flink会维护一个对象。程序刚启动时,调用open()方法,还没有event, 也就没有key. 后面event出现调用flatMap时,可以获取到key,就可以用于在flink的state后端做判断。
部署到分布式集群上时,会有很多个Deduplicator 实例,每一个都只对整个keyspace上互不相关的state负责,因此当你看见一个valueState时,要明白这不止代表一个Boolean对象,而是一个分布式的共享的key-value store.

  • Clearing State
    如果例子中的key是无界的,我们就需要手动清理state, 这通过clear()方法实现
keyHasBeenSeen.clear();

你可以指定一个Timer来执行这个操作,或者指定valueState的Time-To-Live参数

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • Non-keyed State
    有时候我们也会需要管理non-keyed的state, 这通常称作operator state

4. Connected Stream

一个operator可以有两个及以上的source, 其中一个是data, 另一个可以是rules, thresholds或者其他参数等。也可以用作Streaming joins.



要注意的是两个连接在一起的stream必须要有兼容的key.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env
        .fromElements("DROP", "IGNORE")
        .keyBy(x -> x);

    DataStream<String> streamOfWords = env
        .fromElements("Apache", "DROP", "Flink", "IGNORE")
        .keyBy(x -> x);
  
    control
        .connect(streamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

对element执行flatMap1还是flatMap2是根据两个stream connect的顺序决定的。本例中control.connect(streamOfWords) 所以connect走flatMap1,dataStream走flatMap2. 但是你是没有办法控制flatMap1和flatMap2执行的顺序的,因为两个stream是竞争的关系,完全由Flink运行时决定的。所以如果顺序或者执行时间很重要的情境下,最好先将events缓存在flink state中,或者通过InputSelectable 接口指定执行的顺序。

作者:啵啵也爱吃香菜

原文链接:https://www.jianshu.com/p/3be1ac6741ce

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐