Android平台的Kotlin协程-Flow和Channel的那些事
Android平台的Kotlin协程-Flow和Channel的那些事
Flow
简介
Flow
是在Kotlin Coroutines 1.2.0 alpha
之后新增的一套API,也叫做异步流,是Kotlin
协程与响应式编程模型结合的产物。
Flow
是Kotlin
版本的RxJava
。
虽然,Flow
的很多功能RxJava
都可以替代。但是,相比RxJava
,Flow
提供了更简介的API。
而且,Google已经在Compose
、Pagging
、ViewModel
等许多地方用了Flow
。所以,学习Flow
是很有必要的,否则,以后你就连Google的源码都看不懂了。
Flow
的定义
类似RxJava
。Flow
表示一个数据流。既然是数据流,就应该有发射和接收两个基本的API。
在Kotlin中的Flow
只是一个接口:
public interface Flow<out T> { @InternalCoroutinesApi public suspend fun collect(collector: FlowCollector<T>) } 复制代码
它只有一个方法,就是collect
,接收一个FlowCollector
。
public interface FlowCollector<in T> { public suspend fun emit(value: T) } 复制代码
FlowCollector
也很简单,只有一个方法,就是发射一个value
。
类似RxJava
,Flow
提供了大量的操作符,这些操作符都是以扩展函数的方式出现的。如
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value -> return@transform emit(transform(value)) } 复制代码
创建一个Flow
最简单的是通过顶层函数flow
来创建:
flow { (1..3).forEach { emit(it) } } 复制代码
也可以使用顶层函数flowOf
来创建:
flowOf(1, 2, 3) 复制代码
或者,我们可以把一个集合转化为flow
(1..3).asFlow() 复制代码
除此之外官方还提供了许多其他的顶层函数。这里就不一一举例了。 我们来看flow
的定义:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 复制代码
值得一提的是:
这里的
block
参数标注了suspend
关键词。这意味其内部可以调用挂起函数。flow
的emit
是线程不安全的,所以不要在内部使用withContext
之类的方式修改协程的上下文。如果需要修改,请使用flowOn
之类的方法,下文会详细说明。默认情况下,Flow
的数据发射和接收在同一个线程中。Flow
是不需要背压策略的。默认情况下,数据的发射和接收在一个协程的上下文中。发射一个数据才会接收一个数据。但是,在使用flowOn
之后情况会发生改变,flowOn
会得到一个ChannelFlowOperatorImpl
,可以参考下文ChannelFlow
部分。Flow
是冷流。也就是说,除非调用collect
之类的末端操作符,否则,Flow
不会开始发射数据。
末端操作符
如上文所说,flow
只有在调用末端操作符后,才会开始发射数据。常见的末端操作符除了collect
外,还有:
集合转换类型。如
toList
、toSet
获取数据流特定元素。如
first
、last
。末端的运算符(累加器)。如
reduce
、fold
。只处理最新数据的
collectLatest
。如果在接受到最新数据的时候,对于先前数据的还未完成的处理,将被取消。
下面的代码把每个发射的数字相加,最终得到6。
flowOf(1, 2, 3) .reduce { accumulator, value -> accumulator + value } 复制代码
fold
与reduce
几乎一样,只是可以设置一个初始值。
launchIn
。与collect
相似,但是指定收集的代码运行在特定的协程作用域。但是,launchIn
会忽略末端的数据流,所以常见的做法是与onEach
一起配合使用。
val scope = CoroutineScope(Dispatchers.IO) flowOf(1, 2, 3) .onEach { print(it) } // print 1, 2, 3 .launchIn(scope) 复制代码
中间操作符
transform
中间操作符的作用是接收来自上游发射的数据,进行拦截或转换后发送给下游。中间操作符不会触发数据的收集。
public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = flow { collect { value -> return@collect transform(value) } } 复制代码
transform的实现里只做了转换。实际应用中的大部分情况下,还需要emit才能让下游接收到数据。 其他数据流变换操作符都是在transform
基础上拓展而来。
一个小细节,为什么
collect
方法里要return
?特地goole了一下,和Kotlin编译器的一个bug有关。如果这里不return
,就无法进行TCE(Tail Call Elimination,尾部调用消除)的优化。更多信息可以参考这个issue。
变换
map
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value -> return@transform emit(transform(value)) } 复制代码
map操作符用来把上游发射的数据,通过指定的transform方法转换为新的数据后,继续发送给下游。举例:
flowOf(1, 2, 3) .map { it * 2 } .collect { println(it) } // print 2, 4, 6 复制代码
铺平变换
flatMapConcat
、flatMapMerge
和flatMapLatest
flatMap
有着和map
一样的功能。但是它还有另一个作用,就是将上游的数据转换为新的数据流,依次发射每个数据流。
flatMapConcat
的数据流的发送是同步的。当一个数据流发送完成后,才会继续发送下一个数据流。 而flatMapMerge
则允许并发操作,同时发送多个数据流。它们之间的区别参考官网的一个例子就明白了。
首先我们定义一个方法,接收Int
并转化为flow<Int>
。
private fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // 等待 500 毫秒 emit("$i: Second") } 复制代码
创建数据流并使用flatMap
进行转化。
val startTime = System.currentTimeMillis() flowOf(1, 2, 3) .onEach { delay(100) } // 延时100ms .flatMapConcat { requestFlow(it) } .collect { Log.d("denny", "at ${System.currentTimeMillis() - startTime} ${Thread.currentThread()} $it") } 复制代码
注意看下面每条日志的时间戳:
D/denny: at 103 Thread[main @coroutine#5,5,main] 1: First D/denny: at 604 Thread[main @coroutine#5,5,main] 1: Second D/denny: at 704 Thread[main @coroutine#5,5,main] 2: First D/denny: at 1205 Thread[main @coroutine#5,5,main] 2: Second D/denny: at 1306 Thread[main @coroutine#5,5,main] 3: First D/denny: at 1807 Thread[main @coroutine#5,5,main] 3: Second 复制代码
如果我们把上面的例子换成flatMapMerge
,那么数据的发射将是并发的:
D/denny: at 107 Thread[main @coroutine#5,5,main] 1: First D/denny: at 207 Thread[main @coroutine#5,5,main] 2: First D/denny: at 308 Thread[main @coroutine#5,5,main] 3: First D/denny: at 608 Thread[main @coroutine#5,5,main] 1: Second D/denny: at 708 Thread[main @coroutine#5,5,main] 2: Second D/denny: at 809 Thread[main @coroutine#5,5,main] 3: Second 复制代码
flatMapMerge
接收一个参数concurrency
,表示最大的并发数量。默认是16。当concurrency
等于1的时候,就和flatMapMerge
没有区别了。
@FlowPreview public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> { require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" } return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency) } 复制代码
继续把上面的方法换成flatMapLatest
,打印:
D/denny: at 107 Thread[main @coroutine#5,5,main] 1: First D/denny: at 229 Thread[main @coroutine#5,5,main] 2: First D/denny: at 332 Thread[main @coroutine#5,5,main] 3: First D/denny: at 832 Thread[main @coroutine#5,5,main] 3: Second 复制代码
flatMapLatest
只接收最新的数据。新数据到来时,会取消之前未完成的数据流的收集。
上下文切换
flowOn
还记得上文说的flow
的emit
是线程不安全的。对于允许发送数据的操作符来说,不允许内部调用诸如withContext
修改协程上下文的挂起函数,仅允许在数据流中使用flowOn
操作符修改协程上下文。
flowOn
可以多次调用。作用范围是从当前flowOn
操作符到前一个flowOn
(或数据源)。 数据收集通常在创建flow
时的上下文执行,可以使用launchIn
改变。
val scope = CoroutineScope(Dispatchers.Main) flow { emit(1) } // emit on IO .flowOn(Dispatchers.IO) .onEach { println(it) } // collect on Main .launchIn(scope) 复制代码
事件监听
onEach
、onStart
、onCompletion
和onEmpty
flow { emit(1) } .onStart { emit(2) } // 在上游数据发射前调用,会先发射2,然后才是1 .onEach { println("onEach $it") } // 在上游数据流的每个值发送前执行操作,包括onStart中发射的数据 .onEmpty { println("onEmpty") } // 在上游数据流完成时,没有任何值传递消费,则触发执行操作,可发送额外的元素。上游数据流的数据也包括诸如onStart、onCompletion等操作符发送的数据 .onCompletion { println("onCompletion") } // 在上游数据流完成、取消、异常时,执行的操作 .collect { println("collect $it") } 复制代码
异常处理机制
Flow
有多种异常处理机制。最直观的,可以直接使用Kotlin
的try、catch来捕捉异常。
try { flowOf(1, 2, 3) .onEach { check(it < 2) } .collect { println(it) } } catch (e: Throwable) { println(Log.getStackTraceString(e)) } 复制代码
上面的代码会发射1,接着在发射2的时候抛出异常,然后被catch的代码块捕捉到。在这之后,flow
不再发射任何值。
一些时候,我们会提供一个flow
给外部监听,这个时候就不适合使用try-catch的代码块了。但是,我们仍然希望异常对于外部是透明的,可以考虑使用catch
操作符。
flowOf(1, 2, 3) .onEach { check(it != 2) } .catch { println(Log.getStackTraceString(it)) } .onCompletion { cause -> println(Log.getStackTraceString(cause)) } .collect { println(it) } 复制代码
注意,上面的例子中,onCompletion
操作符同样可以监听到异常,并且可以根据cause
是否为null判断是否成功完成。但是与catch
不同的是,onCompletion
只接受异常,但并不处理异常。
另外,kotlin还提供了retry
和retryWhen
,可以在发生错误的时候进行重试。这里就不再举例了。
合并
conflate
conflate
的作用是只处理最新值,而不是对每个值都进行处理。
flowOf(1, 2, 3) .onEach { delay(100) } .conflate() .collect { println("collect start $it") delay(500) println("collect end $it") } 复制代码
输出:
D/denny: Thread[main @coroutine#5,5,main] collect start 1 D/denny: Thread[main @coroutine#5,5,main] collect end 1 D/denny: Thread[main @coroutine#5,5,main] collect start 3 D/denny: Thread[main @coroutine#5,5,main] collect end 3 复制代码
与之比较类似的之前提到的collectLatest
。collectLatest
也是只处理最新的值,但是在新的数据到达的时候,还未完成的处理上个数据的代码块会被取消。
flowOf(1, 2, 3) .onEach { delay(100) } .collectLatest { println("collect start $it") delay(500) println("collect end $it") } 复制代码
这段代码打印的数据是:
D/denny: Thread[main @coroutine#7,5,main] collect start 1 D/denny: Thread[main @coroutine#8,5,main] collect start 2 D/denny: Thread[main @coroutine#9,5,main] collect start 3 D/denny: Thread[main @coroutine#9,5,main] collect end 3 复制代码
组合
combine
和zip
combine
的作用是把两个数据源的最新的数据合并,发送给下游。 而zip
是把两个数据源的值一一对应后,发送给下游。 举个例子: 使用combine
组合两个发射速度不同的flow
:
val flow1 = flowOf(1, 2, 3) .onEach { delay(50) } val flow2 = flowOf("one", "two", "three") .onEach { delay(100) } flow1.combine(flow2) { arg1, arg2 -> "$arg1 $arg2" }.collect { println(it) } 复制代码
输出:
D/denny: Thread[main @coroutine#5,5,main] 1 one D/denny: Thread[main @coroutine#5,5,main] 2 one D/denny: Thread[main @coroutine#5,5,main] 3 one D/denny: Thread[main @coroutine#5,5,main] 3 two D/denny: Thread[main @coroutine#5,5,main] 3 three 复制代码
如果改用zip
进行组合,则输出:
D/denny: Thread[main @coroutine#5,5,main] 1 one D/denny: Thread[main @coroutine#5,5,main] 2 two D/denny: Thread[main @coroutine#5,5,main] 3 three 复制代码
可以看到,即使发射速率不同,数据依然是一一对应的。
缓存
buffer
flow
默认是没有buffer的。使用这个操作符,可以为flow
添加缓存。 添加缓存后,即使下游来不及消费数据,只要缓存未满,上游依然会发射数据。
val flow = flow { (1..5).forEach { println("emit start $it") emit(it) println("emit end $it") } }.buffer(1) flow.collect { println("collect $it") delay(200) } 复制代码
上面这段代码中,上游发射数据的速率明显要大于下游,但是我们为flow
添加了一个容量为1的缓存。 输出:
D/denny: Thread[main @coroutine#6,5,main] emit start 1 D/denny: Thread[main @coroutine#6,5,main] emit end 1 D/denny: Thread[main @coroutine#6,5,main] emit start 2 D/denny: Thread[main @coroutine#6,5,main] emit end 2 D/denny: Thread[main @coroutine#6,5,main] emit start 3 D/denny: Thread[main @coroutine#5,5,main] collect 1 D/denny: Thread[main @coroutine#5,5,main] collect 2 D/denny: Thread[main @coroutine#6,5,main] emit end 3 D/denny: Thread[main @coroutine#6,5,main] emit start 4 D/denny: Thread[main @coroutine#5,5,main] collect 3 D/denny: Thread[main @coroutine#6,5,main] emit end 4 D/denny: Thread[main @coroutine#6,5,main] emit start 5 D/denny: Thread[main @coroutine#5,5,main] collect 4 D/denny: Thread[main @coroutine#6,5,main] emit end 5 D/denny: Thread[main @coroutine#5,5,main] collect 5 复制代码
细心的朋友可能已经发现,我们添加的是一个buffer为1的缓存。那为什么一开始发射了两个数据呢? 前文已经提到过,buffer
操作符本质上返回的是一个channelFlow
。所以这个疑问,要留到后面的channel
部分再来解答了。
其他常用操作符
filter 中间操作符,过滤上游数据流的值,仅允许满足条件的值继续传递。
take 中间操作符,只从上游数据流获取指定个数的元素,传递到下游,后续元素抛弃
debounce 中间操作符,在指定时间内,只允许最新的值传递到下游,多用于过滤上游高频率的生产者数据。
distinctUntilChanged 中间操作符,过滤上游数据流中的重复元素,等价于
RxJava
内的distinct
操作符。
流取消检测
流的构建器会在内部调用ensureActive
来检测流是否已经取消。如果已经取消,就会抛出CancellationException
的异常。 如下面这段代码,尝试发射第4个数字时,就会抛出异常。
flow { (1..5).forEach { emit(it) } } .collect { if (it == 3) cancel() } 复制代码
但是,使用IntRange.asFlow
之类的扩展函数创建的flow
,会处于性能原因取消内部的检测。所以它们会发射完所有的数字,在从runBlocking
返回的时候才抛出异常。
背压
默认情况下,flow
的数据和发送都在同一个协程上下文中,且发送和接收的方法都是支持挂起的,只有接收端准备好的时候才会发送数据。 但是,在使用buffer
或者flowOn
操作符后,flow
会转化为channelFlow
,此时也就支持了背压。 具体的背压策略,我们还是放在channelFlow
部分再介绍。
Channel
简介
Channel
出现在Flow
之前,最初被设计用来进行协程间通信。但在Flow
出现之后,Channel
就逐渐退居幕后了。在Flow
的源码中还是可以看到Channel
的身影,其本身的职责越发单一,仅作为协程间通信的并发安全的缓冲队列而存在。
Channel
的设计和Java中的BlockQueue
相似。区别在于,Channel
并不阻塞线程,而是提供了挂起函数send
和 receive
。
当需要的时候, 多个协程可以向同一个channel
发送数据, 一个channel
的数据也可以被多个协程接收.
当多个协程从同一个channel
接收数据的时候, 每个元素仅被其中一个consumer消费一次. 处理元素会自动将其从channel
里删除.
冷Flow
,热Channel
与Flow
不同的是,即使没有接受者,Channel
的发送端依旧会发射数据。
定义
Channel
本质上是一个接口:
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 复制代码
Channel
不可以直接实例化。但是可以通过官方提供的函数进行创建:
public fun <E> Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E> 复制代码
参数介绍
capacity
capacity
是channel
的容量,kotlin为我们定义了几种常量:
RENDEZVOUS
只有消费端调用时,才会发送数据,否则挂起发送操作。这也是
channel
的默认类型。CONFLATED
capacity
是这个参数时,onBufferOverflow
参数只能为BufferOverflow.SUSPEND
。缓冲区满时,永远用最新元素替代,之前的元素将被废弃。可以理解为是
onBufferOverflow
等于DROP_OLDEST
的快捷创建版。UNLIMITED
无限制容量,缓冲队列满后,会直接扩容,直到OOM。
BUFFERED
默认创建64位容量的缓冲队列,当缓存队列满后,会挂起发送数据,直到队列有空余。
我们也可以直接传递一个数值,来创建指定缓冲大小的
channel
。
onBufferOverflow
指定当缓冲区满的时候的背压策略。有3种选择:
SUSPEND 挂起
DROP_OLDEST 丢弃最旧的元素
DROP_LATEST 丢弃最新的元素
onUndeliveredElement
指定数据发送但是接收者没有收到的时候的回调。
Channel
的基本用法
除了使用send
和receive
来接收发送和接收元素,还可以直接遍历channel
中的每个元素:
val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() // 我们结束发送 } // 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭) for (y in channel) println(y) println("Done!") 复制代码
Channel
是公平的
Channel
的发送和接收操作是公平的。如果在多个协程中执行发送和接受,它们遵守先进先出原则。
一个小测验帮你理解Channel
:
fun main() = runBlocking<Unit> { val channel = Channel<String>() launch { // 协程体A channel.send("A1") channel.send("A2") log("A done") } launch { // 协程体B channel.send("B1") log("B done") } launch { // 协程体C repeat(3) { val x = channel.receive() log(x) } } } fun log(message: Any?) { println("[${Thread.currentThread().name}] $message") } 复制代码
上面的代码中启动了两个发送的协程,总共发送3个数据,之后接受3个数据并打印。 那么上面的结果会是什么呢?
答案揭晓:
[main @coroutine#4] A1 [main @coroutine#4] B1 [main @coroutine#2] A done [main @coroutine#3] B done [main @coroutine#4] A2 复制代码
要理解上面的结果,首先要理解两点:
默认的协程类型是RENDEZVOUS,没有buffer。因此,send和receive要一一对应才会成功,否则会挂起。
channel
是公平的。
上面的代码的执行顺序是:
协程体A尝试发送数据,但是因为没有接受者所以挂起。
协程体B尝试发送数据,但是因为没有接受者所以挂起。
协程体依次接受来自协程体A、B发送的数据并打印,在接受第三个数据的时候因为没有发送者所以挂起。
协程体A发送数据
A2
,这个时候已经有接受者了,所以直接发送。之后打印A done
,结束。协程体B打印
B done
,结束。协程体C接受最后一个数据
A2
并打印,之后结束。
明白了这个例子之后,相信你也已经明白前面flow
的buffer
部分,为什么我们设置的buffer
为1,却一开始就发送了两个数据。
Select
表达式
定义
Select
是Channel
中一个特殊的机制。他允许等待多个挂起函数,并且返回其中最先返回结果的那个挂起函数的结果,作为select
的结果。
Select
可以适用于这样一个场景:我们需要同时从网络和本地缓存中拉取数据,并使用最先返回的结果作为展示。
Select
的定义如下:
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R { return suspendCoroutineUninterceptedOrReturn { uCont -> val scope = SelectBuilderImpl(uCont) try { builder(scope) } catch (e: Throwable) { scope.handleBuilderException(e) } scope.getResult() } } 复制代码
Select
接收一个SelectBuilder
作为参数。 我们再看一下SelectBuilder
的定义:
public interface SelectBuilder<in R> { public operator fun SelectClause0.invoke(block: suspend () -> R) public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) } 复制代码
SelectBuilder
为我们定义了一系列的扩展函数,这些扩展函数都是基于SelectClauseN
。所以,我们要使用返回值为SelectClauseN
系列类型的函数作为子语句。
官方为我们定义了一系列的选择表达式:
Deferred
接口:
public interface Deferred<out T> : Job { public val onAwait: SelectClause1<T> } 复制代码
ReceiveChannel
接口
public interface ReceiveChannel<out E> { public val onReceive: SelectClause1<E> public val onReceiveCatching: SelectClause1<ChannelResult<E>> } 复制代码
SendChannel
接口
public interface SendChannel<in E> { public val onReceive: SelectClause1<E> public val onReceiveCatching: SelectClause1<ChannelResult<E>> } 复制代码
举个例子吧
这里有一个官方Kotlin教程中的例子:
fun CoroutineScope.fizz() = produce<String> { while (true) { // 每 300 毫秒发送一个 "Fizz" delay(300) send("Fizz") } } fun CoroutineScope.buzz() = produce<String> { while (true) { // 每 500 毫秒发送一个"Buzz!" delay(500) send("Buzz!") } } suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> 意味着该 select 表达式不返回任何结果 fizz.onReceive { value -> // 这是第一个 select 子句 println("fizz -> '$value'") } buzz.onReceive { value -> // 这是第二个 select 子句 println("buzz -> '$value'") } } } 复制代码
使用:
val fizz = fizz() val buzz = buzz() repeat(7) { selectFizzBuzz(fizz, buzz) } coroutineContext.cancelChildren() // 取消 fizz 和 buzz 协程 复制代码
这段代码的执行结果如下:
fizz -> 'Fizz' buzz -> 'Buzz!' fizz -> 'Fizz' fizz -> 'Fizz' buzz -> 'Buzz!' fizz -> 'Fizz' buzz -> 'Buzz!' 复制代码
需要注意的是,Select
会按照顺序执行SelectBuilder
中的表达式。如果第一项无法执行,才会选择下一项,优先级依次类推。 如果需要完全公平的选择表达式,则使用selectUnbiased
。
后续TODO
接下来如果有时间,我会再写一下ChannelFlow
、SharedFlow
和StateFlow
等用法与源码分析。
作者:dennyz
链接:https://juejin.cn/post/7048919023580348430