阅读 61

Flink 的状态管理实践

我们之前写的 wordcount 的例子,没有包含状态管理。如果一个 task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink 引入了 state 和 checkpoint。

因此可以说 flink 因为引入了 state 和 checkpoint 所以才支持的 exactly once

首先区分一下两个概念:

state

state 一般指一个具体的 task/operator 的状态:

  • state 数据默认保存在 java 的堆内存中,TaskManage 节点的内存中。

  • operator 表示一些算子在运行的过程中会产生的一些中间结果。

checkpoint

checkpoint 可以理解为 checkpoint 是把 state 数据定时持久化存储了,则表示了一个 Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有 task/operator 的状态。

注意:task(subTask)是 Flink 中执行的基本单位。operator 指算子(transformation)

State 可以被记录,在失败的情况下数据还可以恢复。

Flink 中有两种基本类型的 State:

  • Keyed State

  • Operator State

Keyed State 和 Operator State,可以以两种形式存在:

  • 原始状态(raw state)

  • 托管状态(managed state)

托管状态是由 Flink 框架管理的状态。

我们说 operator 算子保存了数据的中间结果,中间结果保存在什么类型中,如果我们这里是托管状态,则由 flink 框架自行管理

原始状态由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte[]来读写状态内容,对其内部数据结构一无所知。

通常在 DataStream 上的状态推荐使用托管的状态,当实现一个用户自定义的 operator 时,会使用到原始状态。

1. State-Keyed State

基于 KeyedStream 上的状态。这个状态是跟特定的 key 绑定的,对 KeyedStream 流上的每一个 key,都对应一个 state,比如:stream.keyBy(…)。KeyBy 之后的 Operator State,可以理解为分区过的 Operator State。

保存 state 的数据结构:

ValueState:即类型为 T 的单值状态。这个状态与对应的 key 绑定,是最简单的状态了。它可以通过 update 方法更新状态值,通过 value()方法获取状态值。

ListState:即 key 上的状态值为一个列表。可以通过 add 方法往列表中附加值;也可以通过 get()方法返回一个 Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的 reduceFunction,每次调用 add 方法添加值的时候,会调用 reduceFunction,最后合并到一个单一的状态值。

MapState<UK, UV>:即状态值为一个 map。用户通过 put 或 putAll 方法添加元素。

需要注意的是,以上所述的 State 对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。

1. ValueState

使用 ValueState 保存中间结果对下面数据进行分组求和。

开发步骤:

  1. 获取流处理执行环境   2. 加载数据源   3. 数据分组   4. 数据转换,定义ValueState,保存中间结果   5. 数据打印   6. 触发执行 复制代码

ValueState:测试数据源:

 List(    (1L, 4L),    (2L, 3L),    (3L, 1L),    (1L, 2L),    (3L, 2L),    (1L, 2L),    (2L, 2L),    (2L, 9L) ) 复制代码

示例代码:

import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector object TestKeyedState {   class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {     /**      * ValueState状态句柄. 第一个值为count,第二个值为sum。      */     private var sum: ValueState[(Long, Long)] = _     override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {       // 获取当前状态值       val tmpCurrentSum: (Long, Long) = sum.value       // 状态默认值       val currentSum = if (tmpCurrentSum != null) {         tmpCurrentSum       } else {         (0L, 0L)       }       // 更新       val newSum = (currentSum._1 + 1, currentSum._2 + input._2)       // 更新状态值       sum.update(newSum)       // 如果count >=3 清空状态值,重新计算       if (newSum._1 >= 3) {         out.collect((input._1, newSum._2 / newSum._1))         sum.clear()       }     }     override def open(parameters: Configuration): Unit = {       sum = getRuntimeContext.getState(         new ValueStateDescriptor[(Long, Long)]("average", // 状态名称           TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 状态类型       )     }   }   def main(args: Array[String]): Unit = {     //初始化执行环境     val env = StreamExecutionEnvironment.getExecutionEnvironment     //构建数据源     val inputStream: DataStream[(Long, Long)] = env.fromCollection(       List(         (1L, 4L),         (2L, 3L),         (3L, 1L),         (1L, 2L),         (3L, 2L),         (1L, 2L),         (2L, 2L),         (2L, 9L))     )     //执行数据处理     inputStream.keyBy(0)       .flatMap(new CountWithKeyedState)       .setParallelism(1)       .print     //运行任务     env.execute   } } 复制代码

2. MapState

使用 MapState 保存中间结果对下面数据进行分组求和:

  1. 获取流处理执行环境   2. 加载数据源   3. 数据分组   4. 数据转换,定义MapState,保存中间结果   5. 数据打印   6. 触发执行 复制代码

MapState:测试数据源:

List(    ("java", 1),    ("python", 3),    ("java", 2),    ("scala", 2),    ("python", 1),    ("java", 1),    ("scala", 2) ) 复制代码

示例代码:

object MapState {   def main(args: Array[String]): Unit = {     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)     /**       * 使用MapState保存中间结果对下面数据进行分组求和       * 1.获取流处理执行环境       * 2.加载数据源       * 3.数据分组       * 4.数据转换,定义MapState,保存中间结果       * 5.数据打印       * 6.触发执行       */     val source: DataStream[(String, Int)] = env.fromCollection(List(       ("java", 1),       ("python", 3),       ("java", 2),       ("scala", 2),       ("python", 1),       ("java", 1),       ("scala", 2)))     source.keyBy(0)       .map(new RichMapFunction[(String, Int), (String, Int)] {         var mste: MapState[String, Int] = _         override def open(parameters: Configuration): Unit = {           val msState = new MapStateDescriptor[String, Int]("ms",             TypeInformation.of(new TypeHint[(String)] {}),             TypeInformation.of(new TypeHint[(Int)] {}))           mste = getRuntimeContext.getMapState(msState)         }         override def map(value: (String, Int)): (String, Int) = {           val i: Int = mste.get(value._1)           mste.put(value._1, value._2 + i)           (value._1, value._2 + i)         }       }).print()     env.execute()   } } 复制代码

2. State-Operator State

与 Key 无关的 State,与 Operator 绑定的 state,整个 operator 只对应一个 state

保存 state 的数据结构:

ListState

举例来说,Flink 中的 Kafka Connector,就使用了 operator state。它会在每个 connector 实例中,保存该实例中消费 topic 的所有(partition, offset)映射。

步骤:

  1. 获取执行环境

  2. 设置检查点机制:路径,重启策略

  3. 自定义数据源

    • 需要继承并行数据源和 CheckpointedFunction

    • 设置 listState,通过上下文对象 context 获取

    • 数据处理,保留 offset

    • 制作快照

  4. 数据打印

  5. 触发执行

示例代码:

import java.util import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object ListOperate {   def main(args: Array[String]): Unit = {     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)     env.enableCheckpointing(5000)     env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))     env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)     env.getCheckpointConfig.setCheckpointTimeout(60000)     env.getCheckpointConfig.setFailOnCheckpointingErrors(false)     env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)     //重启策略     env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))     //模拟kakfa偏移量     env.addSource(new MyRichParrelSourceFun)       .print()     env.execute()   } } class MyRichParrelSourceFun extends RichParallelSourceFunction[String]   with CheckpointedFunction {   var listState: ListState[Long] = _   var offset: Long = 0L   //任务运行   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {     val iterState: util.Iterator[Long] = listState.get().iterator()     while (iterState.hasNext) {       offset = iterState.next()     }     while (true) {       offset += 1       ctx.collect("offset:"+offset)       Thread.sleep(1000)       if(offset > 10){         1/0       }     }   }   //取消任务   override def cancel(): Unit = ???   //制作快照   override def snapshotState(context: FunctionSnapshotContext): Unit = {     listState.clear()     listState.add(offset)   }   //初始化状态   override def initializeState(context: FunctionInitializationContext): Unit = {     listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](       "listState", TypeInformation.of(new TypeHint[Long] {})     ))   } } 复制代码

3. Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

1) API 介绍

通常,我们首先会创建一个 Keyed 或 Non-Keyed 的 Data Stream,然后再创建一个 Broadcasted Stream,最后通过 Data Stream 来连接(调用 connect 方法)到 Broadcasted Stream 上,这样实现将 Broadcast State 广播到 Data Stream 下游的每个 Task 中。

如果 Data Stream 是 Keyed Stream,则连接到 Broadcasted Stream 后,添加处理 ProcessFunction 时需要使用 KeyedBroadcastProcessFunction 来实现,下面是 KeyedBroadcastProcessFunction 的 API,代码如下所示:

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {     public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;     public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception; } 复制代码

上面泛型中的各个参数的含义,说明如下:

  • KS:表示 Flink 程序从最上游的 Source Operator 开始构建 Stream,当调用 keyBy 时所依赖的 Key 的类型;

  • IN1:表示非 Broadcast 的 Data Stream 中的数据记录的类型;

  • IN2:表示 Broadcast Stream 中的数据记录的类型;

  • OUT:表示经过 KeyedBroadcastProcessFunction 的 processElement()和 processBroadcastElement()方法处理后输出结果数据记录的类型。

如果 Data Stream 是 Non-Keyed Stream,则连接到 Broadcasted Stream 后,添加处理 ProcessFunction 时需要使用 BroadcastProcessFunction 来实现,下面是 BroadcastProcessFunction 的 API,代码如下所示:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;     } 复制代码

上面泛型中的各个参数的含义,与前面 KeyedBroadcastProcessFunction 的泛型类型中的后 3 个含义相同,只是没有调用 keyBy 操作对原始 Stream 进行分区操作,就不需要 KS 泛型参数。

注意事项:

  1. Broadcast State 是 Map 类型,即 K-V 类型。

  2. Broadcast State 只有在广播一侧的方法中 processBroadcastElement 可以修改;在非广播一侧方法中 processElement 只读。

  3. Broadcast State 在运行时保存在内存中。

2) 场景举例

  1. 动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游 Task 中。

  2. 实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游 Task 中。


作者:五分钟学大数据
链接:https://juejin.cn/post/7026162503776206856


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐