Kotlin Flow 三 StateFlow 和 SharedFlow
StateFlow
StateFlow 和 LiveData 差不多,都是可观察的数据容器。在 StateFlow
中任何数据的发送,它的每一个接收器都能接收到。在 StateFlow 和 SharedFlow 中收集器也可以被称为订阅者,不过这个订阅者会挂起当前协程,而且永远不会结束。
private val state = MutableStateFlow(1)suspend fun simpleStateFlow() { coroutineScope { launch { delay(1000) state.collect { println("before state value $it") } } launch { for (i in 1..100) { state.emit(i) delay(100) } } launch { state.collect { println("state value $it") } } }}
需要注意的是 collect
是一个挂起函数,所以一旦调用 collect
协程就会被挂起,所以上述的例子中在一个协程中发送数据,在两个协程中接收数据。
和 LiveData
不同的在于, LiveData
不需要初始值,但 StateFlow
需要。
LiveData
会与 Activity 绑定,当 View 进入 STOPED
状态时, LiveData.observer()
会自动取消注册,而从 StateFlow
或任意其他数据流收集数据的操作并不会停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle
块收集数据流。
StateFlow
是热流,并不是冷流。并且 StateFlow
的 collect
收不到调用之前发射的数据。
val state = MutableStateFlow(1)coroutineScope { launch { for (i in 0..10) { state.emit(i) delay(1000) } } launch { delay(2000) state.collect { println("receive state $it") } }}
可以看到最终的结果是:
receive state 2receive state 3receive state 4receive state 5receive state 6receive state 7receive state 8receive state 9receive state 10
因为在接受之前 delay 了 2s,所以最后是从 2 开始接收的。
把普通的 Flow 转化成 StateFlow
。
val flow = flow { for (i in 0..4) { emit(i) delay(100) }}coroutineScope { val stateFlow = flow.stateIn(this) launch { stateFlow.collect { println("receive flow.stateIn value $it") } }}
我们同样可以像 LiveData
一样直接获取它的值。
stateFlow.value
StateFlow
分为 StateFlow
和 MutableStateFlow
。就像 LiveData
和 MutableLiveData
一样。 StateFlow
只能接收数据,不能发送数据,而 MutableStateFlow
即可以发送也可以接收。
private suspend fun simpleStateFlowAndMutableStateFlow() { val mutableStateFlow = MutableStateFlow(1) coroutineScope { launch { collectData(mutableStateFlow.asStateFlow()) } launch { (1..10).forEach { delay(100) mutableStateFlow.emit(it) } } }}
如上代码所述,可以将 MutableStateFlow
通过 asStateFlow
转换成 StateFlow
。
StateFlow
中给我们提供了一个协程安全的并发修改 StateFlow
中的值的方法 compareAndSet
。该方法能够保证原子的修改 StateFlow
的值。该方法是通过 CAS 来修改值。
public fun compareAndSet(expect: T, update: T): Boolean
将当前的值和期待的值进行比较,如果相等则更新当前的值,并返回 true,如果不相等则返回 false。这里的比较并修改是原子性的。
SharedFlow
SharedFlow
和 StateFlow
相比,他有缓冲区区,并可以定义缓冲区的溢出规则,已经可以定义给一个新的接收器发送多少数据的缓存值。
SharedFlow
同样有与之对应的 MutableSharedFlow
。 MutableSharedFlow
的参数如下:
replay
给一个新的订阅者发送的缓冲区的数量。extraBufferCapacity
除了 replay 的数量之外的缓冲区的大小。onBufferOverflow
缓冲区溢出规则SUSPEND
挂起DROP_OLDEST
移除旧的值DROP_LATEST
移除新的值
SharedFlow
的缓冲区大于是 replay + extraBufferCapacity 。
注意相比于 MutableStateFlow
, MutableSharedFlow
不需要初始值。
suspend fun simpleSharedFlow() { val sharedFlow = MutableSharedFlow<Int>( replay = 5, extraBufferCapacity = 3, ) coroutineScope { launch { sharedFlow.collect { println("collect1 received shared flow $it") } } launch { (1..10).forEach { sharedFlow.emit(it) delay(100) } } // wait a minute delay(1000) launch { sharedFlow.collect { println("collect2 received shared flow $it") } } }}
同样的,我们可以把普通的 Flow 转换成 SharedFlow。
suspend fun simpleConvertToSharedFlow(started: SharingStarted) { var start = 0L // create normal flow val flow = (1..10).asFlow() .onStart { start = currTime() } .onEach { println("Emit $it ${currTime() - start}ms") delay(100) } // convert to shared flow // need coroutine scope coroutineScope { val sharedFlow = flow.shareIn(this, started, replay = 2) delay(400) launch { println("current time ") sharedFlow.collect { println("received convert shared flow $it at ${currTime() - start}ms") } } }}
这里的转换有些复杂,可以看到我们通过 shareIn
可以将普通的 flow 转换成 SharedFlow
。可以看到 sharedIn
有三个参数:
CoroutineScope - sharing 的协程的作用域。
SharingStarted
- 启动模式stopTimeoutMillis - 配置最后一个订阅者消失后 sharing flow 停止的延时。
replayExpirationMillis - 配置 sharing flow 协程的停止和重置缓冲区之间的间隔,单位是毫秒,默认值为 Long.MAX_VALUE 缓存永远都不重置,0 表示立即重置缓存。比较难懂可以看看下面的例子。
Eagerly
迫切的,渴望的,在转换完成后立即开始 sharing 数据,当上游的数据超过 replay 的时候,前面的数据就会被丢弃,相当于DROP_OLDEST
。Lazily
当有第一个订阅者(调用 collect)的时候开始发射数据。WhileSubscribed
当第一个订阅者出现的时候立即开始,当最后一个订阅者消失的时立即停止(默认情况下),replay 数量的缓存值将永远保留(默认情况下)。这是一个函数,可以通过参数来控制当最后一个订阅者消失时的行为,以及缓存的有效期。replay 当订阅的时候回复的数量。
如果上面的函数中传递的是 Eagerly
,其输出如下:
Emit 1 2ms Emit 2 109ms Emit 3 213ms Emit 4 313ms current time received convert shared flow 2 at 412ms received convert shared flow 3 at 412ms Emit 5 413ms received convert shared flow 4 at 414ms Emit 6 518ms received convert shared flow 5 at 519ms Emit 7 619ms received convert shared flow 6 at 619ms Emit 8 720ms received convert shared flow 7 at 720ms Emit 9 822ms received convert shared flow 8 at 823ms Emit 10 926ms received convert shared flow 9 at 926ms received convert shared flow 10 at 1027ms
如果传入的是 Lazily
,其输入如下:
current time Emit 1 2ms Emit 2 105ms received convert shared flow 1 at 106ms Emit 3 209ms received convert shared flow 2 at 209ms Emit 4 313ms received convert shared flow 3 at 313ms Emit 5 415ms received convert shared flow 4 at 415ms Emit 6 518ms received convert shared flow 5 at 518ms Emit 7 622ms received convert shared flow 6 at 622ms Emit 8 725ms received convert shared flow 7 at 725ms Emit 9 826ms received convert shared flow 8 at 826ms Emit 10 932ms received convert shared flow 9 at 932ms received convert shared flow 10 at 1032ms
很明显能够看出两者的区别。
下面看看 WhileSubscribed
,这种方式非常灵活。
fun currTime() = System.currentTimeMillis()suspend fun simpleConvertToSharedFlow(started: SharingStarted) { var start = 0L // create normal flow val flow = (1..10).asFlow() .onStart { start = currTime() } .onEach { println("Emit $it ${currTime() - start}ms") delay(100) } // convert to shared flow // need coroutine scope coroutineScope { val sharedFlow = flow.shareIn(this, started, replay = 2) val job = launch { println("current time ") sharedFlow.collect { println("received convert shared flow $it at ${currTime() - start}ms") } } launch { delay(1000L) job.cancel() delay(110L) sharedFlow.collect { println("received again shared flow $it") } println("shared flow has stop") } }}@OptIn(ExperimentalTime::class)suspend fun main() {// simpleSharedFlow() simpleConvertToSharedFlow( SharingStarted.WhileSubscribed( stopTimeout = 100L.toDuration(DurationUnit.MILLISECONDS), replayExpiration = 200L.toDuration(DurationUnit.MILLISECONDS) ) )}
这里配置当最后一个订阅者消失时 delay
100ms 后停止 sharing flow,在 sharing flow 停止后 200ms 后让缓存失效。这里可以通过调整 job.cancel
后的 delay
函数的时长来看看效果。当时间为 110ms 时,会重新接受到缓存 9 和 10,并重新开始 sharing flow,如果参数调整为 320ms 时,缓存会失效,会直接重新开始 sharing flow。
110ms 的结果:
current time Emit 1 1ms Emit 2 107ms received convert shared flow 1 at 108ms Emit 3 211ms received convert shared flow 2 at 211ms Emit 4 315ms received convert shared flow 3 at 315ms Emit 5 417ms received convert shared flow 4 at 417ms Emit 6 521ms received convert shared flow 5 at 521ms Emit 7 623ms received convert shared flow 6 at 624ms Emit 8 727ms received convert shared flow 7 at 727ms Emit 9 829ms received convert shared flow 8 at 829ms Emit 10 933ms received convert shared flow 9 at 933ms received again shared flow 9received again shared flow 10Emit 1 0ms Emit 2 105ms received again shared flow 1Emit 3 210ms received again shared flow 2Emit 4 314ms received again shared flow 3Emit 5 415ms received again shared flow 4Emit 6 519ms received again shared flow 5Emit 7 620ms received again shared flow 6Emit 8 721ms received again shared flow 7Emit 9 826ms received again shared flow 8Emit 10 927ms received again shared flow 9received again shared flow 10
320ms 的结果:
current time Emit 1 1ms Emit 2 106ms received convert shared flow 1 at 106ms Emit 3 210ms received convert shared flow 2 at 210ms Emit 4 314ms received convert shared flow 3 at 314ms Emit 5 414ms received convert shared flow 4 at 414ms Emit 6 517ms received convert shared flow 5 at 517ms Emit 7 623ms received convert shared flow 6 at 623ms Emit 8 726ms received convert shared flow 7 at 727ms Emit 9 827ms received convert shared flow 8 at 827ms Emit 10 931ms received convert shared flow 9 at 931ms Emit 1 0ms Emit 2 105ms received again shared flow 1Emit 3 209ms received again shared flow 2Emit 4 315ms received again shared flow 3Emit 5 418ms received again shared flow 4Emit 6 523ms received again shared flow 5Emit 7 627ms received again shared flow 6Emit 8 732ms received again shared flow 7Emit 9 833ms received again shared flow 8Emit 10 937ms received again shared flow 9received again shared flow 10
我们看下面这段源码就会很快明白:
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount .transformLatest { count -> if (count > 0) { emit(SharingCommand.START) } else { delay(stopTimeout) if (replayExpiration > 0) { emit(SharingCommand.STOP) delay(replayExpiration) } emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE) } } .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START .distinctUntilChanged() // just in case somebody forgets it, don't lea
作者:星流星
链接:https://www.jianshu.com/p/29585473ff65
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。