阅读 136

Flink State、CheckPoint与Savepoint、Flink的Exactly Once

State

State简述

State分类:

  • Keyed state
  • Operator state

State两种形态:

  • Raw State(原始状态)
    只需要继承RichFunction系列而不需要额外继承其他接口,因此从getRunntime中获取State
    以字节流的形式写入进 checkpoint
  • Managed State(托管状态)
    托管状态可以使用 Flink runtime 提供的数据结构来表示,例如内部哈希表或者 RocksDB。具体有 ValueState,ListState 等。Flink runtime 会对这些状态进行编码然后将它们写入到 checkpoint 中。需要继承实现 CheckpointedFunction 或者 ListCheckpointed 接口。这两个接口实现的方法中都可以通过context去获取state。

推荐使用托管状态,因为如果使用托管状态,当并行度发生改变时,Flink 可以自动的帮你重分配 state,同时还可以更好的管理内存。

分配策略:

  • Event Split
  • Union redistribution

下面代码片对CheckpointedFunction使用案例:

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                //将数据发到外部系统
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

ListCheckpointed
是一种受限的 CheckpointedFunction,只支持 List 风格的状态和 even-spit 的重分配策略。


State状态存储

  • MemoryStateBackend
    将状态保存在JobManager,因此这种情况会对JobManager分配到的内存带来压力
    支持异步和同步
  • FsStateBackend
    工作状态仍然是存储在 Task Manager 中的内存中,虽然在 Checkpoint 的时候会存在文件中,所以还是得注意这个状态要保证不超过 Task Manager 的内存
  • RocksDBStateBackend
    相对于上述两种状态保存的方案,该中方式不会将State保存在内存中,而是保存在HDFS或者文件中,当恢复的时候再去读取恢复接口,因此相比于状态直接保存在内存中,效率会有所降低。但这种方式是Flink推荐的。
    RocksDB支持增量保存,其原理如下:
    当使用 RocksDBStateBackend 时,增量 Checkpoint 是如何实现的呢?RocksDB 是一个基于 LSM 实现的 KV 数据库。LSM 全称 Log Structured Merge Trees,LSM 树本质是将大量的磁盘随机写操作转换成磁盘的批量写操作来极大地提升磁盘数据写入效率。一般 LSM Tree 实现上都会有一个基于内存的 MemTable 介质,所有的增删改操作都是写入到 MemTable 中,当 MemTable 足够大以后,将 MemTable 中的数据 flush 到磁盘中生成不可变且内部有序的 ssTable(Sorted String Table)文件,全量数据保存在磁盘的多个 ssTable 文件中。HBase 也是基于 LSM Tree 实现的,HBase 磁盘上的 HFile 就相当于这里的 ssTable 文件,每次生成的 HFile 都是不可变的而且内部有序的文件。基于 ssTable 不可变的特性,才实现了增量 Checkpoint,具体流程如下所示:


    image.png

第一次 Checkpoint 时生成的状态快照信息包含了两个 sstable 文件:sstable1 和 sstable2 及 Checkpoint1 的元数据文件 MANIFEST-chk1,所以第一次 Checkpoint 时需要将 sstable1、sstable2 和 MANIFEST-chk1 上传到外部持久化存储中。第二次 Checkpoint 时生成的快照信息为 sstable1、sstable2、sstable3 及元数据文件 MANIFEST-chk2,由于 sstable 文件的不可变特性,所以状态快照信息的 sstable1、sstable2 这两个文件并没有发生变化,sstable1、sstable2 这两个文件不需要重复上传到外部持久化存储中,因此第二次 Checkpoint 时,只需要将 sstable3 和 MANIFEST-chk2 文件上传到外部持久化存储中即可。这里只将新增的文件上传到外部持久化存储,也就是所谓的增量 Checkpoint。

基于 LSM Tree 实现的数据库为了提高查询效率,都需要定期对磁盘上多个 sstable 文件进行合并操作,合并时会将删除的、过期的以及旧版本的数据进行清理,从而降低 sstable 文件的总大小。图中可以看到第三次 Checkpoint 时生成的快照信息为sstable3、sstable4、sstable5 及元数据文件 MANIFEST-chk3, 其中新增了 sstable4 文件且 sstable1 和 sstable2 文件合并成 sstable5 文件,因此第三次 Checkpoint 时只需要向外部持久化存储上传 sstable4、sstable5 及元数据文件 MANIFEST-chk3。

基于 RocksDB 的增量 Checkpoint 从本质上来讲每次 Checkpoint 时只将本次 Checkpoint 新增的快照信息上传到外部的持久化存储中,依靠的是 LSM Tree 中 sstable 文件不可变的特性。对 LSM Tree 感兴趣的同学可以深入研究 RocksDB 或 HBase 相关原理及实现。


CheckPoint

CheckPoint需要的先决条件:

  • Source需要支持数据的短时间重放功能
  • 需要一个能保存状态的持久化存储介质

CheckPoint如何配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
env.enableCheckpointing(1000);

// Checkpoint 语义设置为 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// CheckPoint 的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间,只允许 有 1 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 两次 Checkpoint 之间的最小时间间隔为 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

// Checkpoint 失败后,整个 Flink 任务也会失败(flink 1.9 之前)
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(true)

Checkpoint与SavePoint的区别

image.png

Operate UID的重要性
如何为算子指定UID

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

CheckPoint流程

  1. JobManager中CheckPointCoordinator 会定期向所有 SourceTask 发送 CheckPointTrigger,Source Task 会在数据流中安插 Checkpoint barrier


    image.png

2.当 task 收到上游所有实例的 barrier 后,向自己的下游继续传递 barrier,然后自身同步进行快照,并将自己的状态异步写入到持久化存储中
   - 如果是增量 Checkpoint,则只是把最新的一部分更新写入到外部持久化存储中;
   - 为了下游尽快进行 Checkpoint,所以 task 会先发送 barrier 到下游,自身再同步进行快照;


image.png
  1. 当 task 将状态信息完成备份后,会将备份数据的地址(state handle)通知给 JobManager 的CheckPointCoordinator,如果 Checkpoint 的持续时长超过了 Checkpoint 设定的超时时间CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator 就会认为本次 Checkpoint 失败,会把这次 Checkpoint 产生的所有状态数据全部删除

  2. 如果 CheckPointCoordinator 收集完所有算子的 State Handle,CheckPointCoordinator 会把整个 StateHandle 封装成 completed Checkpoint Meta,写入到外部存储中,Checkpoint 结束


    image.png

Flink Exactly Once

在Flink中的Exactly Once有两种层面,一种是Flink CheckPoint处,还有一处是研究端到端的,因此读者有必要在学习之前先熟悉它的两个层面。

CheckPoint处的Exactly Once
在上述文章中,我们知道了CheckPoint的流程,但此时需要了解Barrier机制。当Exactly Once中Task在收到来自Source的Barrier时,如果停下手中的数据处理任务并等待所有Source的Barrier,与此同时先把数据保存在缓存中,待所有Barrier收集完毕之后再去做CheckPoint,那么这种方式就称呼为Barrier对齐。如果不停下手中的任务(不去等待收集到所有Source实例的Barrier),等到收集到所有的Source Barrier再去做CheckPoint,那么这种方式就被称呼为Barrier不对齐,就会产生整个系统的At least once。在配置上来看是否配置的ExactlyOnce就是配置了Barrier是否对齐。

端到端的ExactlyOnce
仔细思考CheckPoint的机制就会发现,CheckPoint的ExactlyOnce并不能保证数据端到端的ExactlyOnce,比如说Sink在CHK100 ~ CHK101之间挂了,但是最近的一次成功CheckPoint为CHK100,在这期间以及写入的Sink数据无法被删除,那么端到端的就会退化成At least once。
那么如何解决端到端的Exactly Once呢,有两种方案:

  • 假如我们使用的存储介质支持按照全局主键去重
  • Sink数据时,与CheckPoint的时机做一次强绑定,Sink成功CheckPoint才可能成功,否则CheckPoint失败,但这样做有一个前提就是需要Sink端支持事务。
    第一种方式比较好理解,这里我们主要讨论第二种方式:
    首先需要先去了解一下2PC的实现方式,Flink保证端到端的Exactly Once其实就是使用了这种思想,将JobManager作为协调者,所有的Sink不会直接提交已经处理好的数据,而是先放在缓存中,等到JobManager下发CHeckPoint之后,在snapshot方法中对Sink进行数据precommit(只是flush操作但是不能commit事务),然后snapshot继续做CheckPont的工作,等JobManager收到所有算子的CheckPoint成功通知后,会调用CheckPointLinstener中的complete Hook通知所有的算子CheckPoint已经成功,这个时候各个算子才能真正的提交事务。
    因此这种方式需要考虑CheckPoint的间隔时间,因为数据的真正写入Sink时机完全是和CheckPoint同步的。

补充:
在FlinkKafkaConsumer中分区分配原则是使用assign的方式实现;

在assign中使用的分区分配其实是使用的是round-robin的策略;

当并行度改变时,FlinkKafka使用的是UnionSplit的策略方式,因为使用event-split可能获取到的State和分区得到的Partition并不匹配;

在恢复时,若发现Partition不在内存中的TopPartition --> Offset映射关系中,则让当前的Partition从EARLIEST处开始消费。

作者:当个笑话

原文链接:https://www.jianshu.com/p/80bf277aae14

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐