Flow를 사용하면서 유용하게 사용할 수 있는 state flow와 shared flow가 다른 점과 각각 어떤 상황에서 적합한지를 알기 위하여 두 개의 특성을 비교하려고 합니다.
Flow builder로 생성한 flow들은 기본적으로 Cold stream 입니다.[2] 하지만 StateFlow와 SharedFlow는 둘다 Hot stream입니다. 이해를 돕기 위해 두 stream을 간단히 비교하면 아래와 같습니다.
Cold stream
- collect() (또는 이를 subscribe 할 때)를 호출할 때마다 flow block이 재실행 된다. 즉 1~10까지 emit 하는 flow가 있다면 collect 할때마다 1~10을 전달 받는다. 여러곳에서 collect를 호출하면 각각의 collect에서 1~10을 전달받는다.
Hot stream
- collect (또는 이를 subscribe 할때)를 호출하더라도 flow block이 호출되지 않는다. collect() 시점 이후에 emit 된 데이터를 전달받는다.
State flow 기본
Stateflow는 문자 그대로 현재 상태를 표현하기 적합한 flow입니다. 먼저 생성 방법은 아래와 같이 MutableStateFlow를 사용합니다.
private val _stateFlow = MutableStateFlow(99) //초기값을 99로 설정
val stateFlow = _stateFlow
StateFlow는 초기값이 필요합니다. 따라서 생성자에 반드시 초기값을 명시해야 합니다.
// value 속성을 사용
_stateFlow.value = 1
// flow의 기본 함수인 emit()을 사용
_stateFlow.emit(1)
stateFlow에 값을 전달할 때는 flow의 기본 함수인 emit()을 이용해도 되지만 value 속성을 사용할 수 있습니다. (LiveData와 유사하죠?)
아래와 같이 testViewModel.kt에 간단한 sample code를 만들어 보겠습니다.
private val _stateFlow = MutableStateFlow(99)
val stateFlow = _stateFlow
// state flow에 데이터 전달
suspend fun startSendDataToStateFlow() {
repeat(10) {
_stateFlow.value = it
delay(500)
}
}
startSendDataToStateFlow()를 호출하면 0~9까지의 숫자를 500ms 간격으로 emit 하게 되는 코드입니다.
이를 collect 하는 코드를 MainActivity.kt에 아래와 같이 포함시킵니다.
override fun onCreate(savedInstanceState: Bundle?) {
...
MainScope().launch {
testViewModel.stateFlow.collect {
Log.i(TAG, "stateFlow #1: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"State Collect End #1 - ${Thread.currentThread().name}")
}
// emit 시작
MainScope().launch { testViewModel.startSendDataToStateFlow() }
}
collect가 먼저 시작된 후 startSendDataToStateFlow()를 호출하였으므로 collect 시점에서는 아직 방출된 데이터가 없기 때문에 기본값을 받아갑니다. 따라서 이 코드에서는 99부터 수신되며 그 이후 방출된 0~9까지 총 10개의 데이터가 collect를 통해 수신됨을 확인할 수 있습니다.
override fun onCreate(savedInstanceState: Bundle?) {
...
// emit 시작
MainScope().launch { testViewModel.startSendDataToStateFlow() }
MainScope().launch {
testViewModel.stateFlow.collect {
Log.i(TAG, "stateFlow #1: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"State Collect End #1 - ${Thread.currentThread().name}")
}
}
만약 emit 코드를 먼저 호출한 이후 collect를 수행된다면 예상한 것처럼 초기값(99)이 아닌 0부터 전달받게 됩니다. 이는 이미 collect 하는 시점에는 stateFlow에 담겨있는 값이 0이 전달되어 있기 때문입니다.
정말 hot stream인지 조금 더 확실한 확인을 위해서 시작 부분에 1.1초의 delay를 주고 collect를 하도록 수정한 코드입니다.
MainScope().launch {
delay(1100) //1.1초 대기 후 collect 시작
testViewModel.stateFlow.collect {
Log.i(TAG, "stateFlow #1: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"State Collect End #1 - ${Thread.currentThread().name}")
}
이때는 1.1초 이후 state flow가 담고 있었던 99 -> 0 -> 1 데이터는 collect 되지 않고, 2부터 수신됩니다.
즉 현재 collect가 호출되면 현재 데이터부터 수신되기 시작하며, 여러 곳에서 collect를 호출하면 collect를 시작한 시점에서 각각의 데이터를 전달받기 시작합니다. 따라서 아래와 같이 동일한 stateFlow를 두 곳에서 collect 한다면 두곳 모두 동일한 데이터를 전달받습니다.
MainScope().launch {
testViewModel.stateFlow.collect {
Log.i(TAG, "stateFlow #1: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"State Collect End #1 - ${Thread.currentThread().name}")
}
MainScope().launch {
testViewModel.stateFlow.collect {
Log.i(TAG, "stateFlow #2: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"State Collect End #2 - ${Thread.currentThread().name}")
}
// emit 시작
MainScope().launch { testViewModel.startSendDataToStateFlow() }
Shared flow의 기본
Sharedflow는 stateFlow와 마찬가지로 MutableSharedFlow를 통해서 생성할 수 있습니다.
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val sharedFlow = _sharedFlow
단! Sharedflow 생성 시 좀 더 detail 한 설정값들을 생성자에 넘겨줄 수 있습니다.
replay
collect시 전달받을 이전 데이터의 개수를 지정합니다. replay가 0이라면 collect 시점에 담겨있던 데이터부터 전달받을 수 있습니다. 만약 1이라면 collect 시점 직전의 데이터부터 전달받으며 시작합니다. 만약 2라면 현재 데이터 이전 두개의 데이터 부터 전달받으면서 시작하게 됩니다.
예를 들어 0~9까지 emit 되는 state flow에서 3이 emit 되어 시점에 collect가 시작된다면
ex) 0->1->2->3-> collect 시작->4->5->6->7->8->9
- replay = 0 일 때: 4부터 수신 시작
- replay = 1 일때: 3부터 수신 시작
- replay = 4 이상 일때: 0부터 수신 시작
extraBufferCapacity
buffer 개수 설정을 설정합니다. flow의 emit이 빠르고 collect가 느릴 때 지정된 개수만큼 buffer에 저장되며 저장된 개수가 넘어가면 onBufferOverflow에 설정된 정책에 따라 동작하게 됩니다.
onBufferOverflow
Buffer가 다 찼을 때의 동작을 정의합니다. 이는 channel에서 사용하는 buffer의 정의와 동일합니다.
- BufferOverflow.SUSPEND : buffer가 꽉 찼을 때 emit을 수행하면 emit 코드가 blocking 됩니다. 즉, buffer의 빈자리가 생겨야 emit 코드 이후의 코드가 수행될 수 있습니다.
- BufferOverflow.DROP_OLDEST: buffer가 꽉 찼을 때 emit을 수행하면 오래된 데이터 부터 삭제하면서 새로운 데이터를 넣습니다.
- BufferOverflow.DROP_LATEST: buffer가 꽉찼을때 emit을 수행하면 최근 데이터를 삭제하고 새로운 데이터를 넣습니다.
실제 내부 정의를 보면 아래와 같이 api가 설정되어 있습니다.
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
replay의 기본값은 0이므로 collect 시점 이전에 전달된 데이터는 받지 않습니다. 또한 buffer size도 0이므로 buffer를 사용하지 않으며, buffer 사용 정책이 BufferOverflow.SUSPEND 이므로 collect가 emit보다 늦게 처리된다면 emit 부분이 block 되어 지연이 발생될 수 있습니다.
기본값은 위와 같이 정의되어 있지만 실제로 내부 코드에서 replay + extraBufferCapacity 한 값이 bufferSize로 사용됩니다.
이번에는 sharedFlow의 Sample 코드를 stateFlow와 유사하게 아래와 같이 작성해 보도록 하겠습니다.
//TestViewModel.kt
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val sharedFlow = _sharedFlow
// SharedFlow에 데이터 전달
suspend fun startSendDataToSharedFlow() {
repeat(10) {
_sharedFlow.emit(it)
delay(500)
}
}
SharedFlow는 value로 값을 할당할 수 없습니다(지원하지 않기에..) 따라서 emit()을 이용하여 500ms단위로 데이터를 전달하도록 구성합니다.
override fun onCreate(savedInstanceState: Bundle?) {
...
MainScope().launch { testViewModel.startSendDataToSharedFlow() }
MainScope().launch {
testViewModel.sharedFlow.collect {
Log.i(TAG, "sharedFlow #1: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"Shared Collect End #1 - ${Thread.currentThread().name}")
}
...
emit()은 0~9까지 이뤄지지만 위 샘플 코드에서는 emit 이후 collect를 수행하므로 collect 이후 수신되는 값인 1부터 수신받게 됩니다. collect 시점에 이미 0이 존재하는 상태이지만 replay=0으로 설정되어 있기 때문에 기존에 담겨있는 값은 가져가지 않게 됩니다. 만약 replay = 1로 설정해 놓았다면 stateFlow와 동일하게 동작이 가능합니다. (다른 점이 있으나 그건 아래에서 추가적으로 비교하여 설명하도록 하겠습니다.)
SharedFlow도 여러 곳에서 collect시 각각 collect 부분에서 동일한 데이터를 전달받을 수 있습니다. (stateFlow와 동일합니다.)
MainScope().launch {
delay(1000)
testViewModel.sharedFlow.collect {
Log.i(TAG, "sharedFlow #1: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"Shared Collect End #1 - ${Thread.currentThread().name}")
}
MainScope().launch {
testViewModel.sharedFlow.collect {
Log.i(TAG, "sharedFlow #2: $it - ${Thread.currentThread().name}")
}
Log.d(TAG,"Shared Collect End #2 - ${Thread.currentThread().name}")
}
이 처럼 collect를 여러 곳에서 진행하여 동일한 값을 전달받는지에 대한 테스트를 두 번씩 한 이유는 Channel의 pan out과 flow는 동작이 다르다는 걸 확인하기 위함입니다. [3]
SharedFlow vs StateFlow
두 개 모두 hot stream이지만 SharedFlow가 좀 더 일반화된 버전이라면 StateFlow는 SharedFlow의 param을 특정 값으로 고정시켜 놓은 형태(replay = 1)로 볼 수 있습니다. 사실 이외에 더 큰 차이가 있는데, 아래의 예제를 통해서 확인해 보겠습니다.
// StateFlow 생성
private val _stateFlow = MutableStateFlow(99)
val stateFlow = _stateFlow
// SharedFlow 생성
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val sharedFlow = _sharedFlow
// 양쪽 flow에 "100" 을 다섯번 반복 전송
suspend fun repeatSameDataToEachFlow() {
repeat(5) {
Log.d(TAG, "sendData #$it")
_sharedFlow.emit(100)
_stateFlow.value = 100
delay(500)
}
}
500ms 간격으로 sharedFlow와 stateFlow양쪽에 "100" 이란 값을 다섯 번 반복해서 전송합니다.
이를 collect 하는 MainActivity.kt 코드는 아래와 같습니다.
override fun onCreate(savedInstanceState: Bundle?) {
...
// sharedFlow collect
MainScope().launch {
testViewModel.sharedFlow.collect {
Log.i(TAG, "sharedFlow: $it")
}
}
// stateFlow collect
MainScope().launch {
testViewModel.stateFlow.collect {
Log.i(TAG, "stateFlow: $it")
}
}
// 양쪽 flow에 emit 시작
MainScope().launch {
testViewModel.repeatSameDataToEachFlow()
}
}
- stateFlow: 99(초기값) -> 100 출력 (두번만 출력함)
- sharedFlow: 다섯번 emit 한 100이 그대로 출력
즉 동일한 값이 입력되면 stateFlow의 경우 내부적으로 skip 하여 collect자체가 호출되지 않습니다. 이는 stateFlow를 따라가다 보면 아래와 같은 부분에서 자체적으로 conflate작업을 하고 있기 때문입니다.
//StateFlow.kt
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
...
collectorJob?.ensureActive()
// Conflate value emissions using equality
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
...
}
즉 이전 값과 비교하여 동일한 값이 들어오지 않는 경우에만 emit 호출하도록 되어 있습니다. 따라서 sharedFlow와 stateFlow 사용 시 사용 목적에 맞는 flow를 선택하여 사용해야 합니다.
stateIn vs shareIn
위에서 소개한 flow들은 hot stream입니다. 기존에 존재하는 cold stream을 hot stream으로 변환하기 위해서 Flow의 extension function으로 stateIn과 sharedIn이 존재합니다. 각각의 함수 정의는 아래와 같습니다. [4][5]
// cold stream flow -> SharedFlow로 변경
fun <T> Flow<T>.shareIn(scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0): SharedFlow<T>
// cold stream flow -> StateFlow로 변경
fun <T> Flow<T>.stateIn(scope: CoroutineScope,
started: SharingStarted,
initialValue: T): StateFlow<T>
이 두 개의 api는 하나의 flow에서 (single instance upstream) 방출되는 결과를 여러 곳에서 동일하게 수신하는 경우 용이하게 사용될 수 있습니다. 특히 cold flow의 생성 자체에 무거운 작업이 포함되어 있거나 유지하는 작업 자체의 비용이 많이 드는 경우 여러 곳에서 collect를 할때 매번 생성하여 사용하지 않고 하나의 flow만 생성하고, collect는 여러곳에서 수신받도록 구현합니다.
즉 방송국 처럼 송출은 한군데서 하지만 이를 여러곳에서 수신받는 형태 입니다.
예를 들어 비싼 비용을 지불해야 하는 network connection을 맺은 후 backend에서 오는 메시지를 flow로 전달받는 형태나, connection을 맺는 비용 자체가 비싼 아래와 같은 경우에 적합합니다.
// cold stream
val backendMessages: Flow<Message> = flow {
connectToBackend() // takes a lot of time
try {
while (true) {
emit(receiveMessageFromBackend())
}
} finally {
disconnectFromBackend()
}
}
// SharedFlow로 변경
val messages: SharedFlow<Message> =
backendMessages.shareIn(scope, SharingStarted.Eagerly)
coroutineScope과 SharingStarted param은 stateIn과 sharedIn에서 동일하게 사용됩니다. coroutineScope은 hot stream을 수행하는 coroutine을 생성할 scope을 넘겨주고, SharingStarted는 flow를 실행하는 방법을 세부적으로 조정하기 위해 사용됩니다.
SharingStarted.Eagerly
subscriber가 존재하지 않더라도 upstream flow(sharing)는 바로 시작되며 중간에 중지되지 않습니다. [6] 이때 누적되는 값은 replay 개수만큼이며, replay 개수보다 많은 값이 들어오면 바로 버려집니다.(BufferOverflow.DROP_OLDEST로 동작)
실제 api source code는 아래와 같습니다. (command() 가 호출되면서 flow가 시작됨)
//SharingStarted.kt
private class StartedEagerly : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
flowOf(SharingCommand.START)
override fun toString(): String = "SharingStarted.Eagerly"
}
SharingStarted.Lazily
첫 번째 subscriber가 등록(collect)한 이후부터 upstream flow가 동작을(sharing) 시작하며 중간에 중지되지 않습니다. [7] 이때 첫 번째 subscriber는 그동안 emit 된 모든 값들을 얻어가며, 이후의 subscriber는 replay값에 설정된 개수만큼만 얻어가면서 collect를 시작합니다. 구독자가 모두 없어지더라도 upstream flow는 동작을 유지합니다. 다만 이때는 replay 개수만큼만 cache 합니다.
api source code는 아래와 같습니다.
//SharingStarted.kt
private class StartedLazily : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
var started = false
subscriptionCount.collect { count ->
if (count > 0 && !started) {
started = true
emit(SharingCommand.START)
}
}
}
override fun toString(): String = "SharingStarted.Lazily"
}
SharingStarted.WhileSubscribed
구독자가 등록되면 sharing을 시작하며 구독자가 전부 없어지면 바로 중지됩니다. 또한 replay 개수만큼 cache합니다. 이는 아래와 같은 추가적인 정의로 좀더 세분화 하여 조정할 수 있습니다.
fun WhileSubscribed(stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE): SharingStarted
- stopTimeoutMillis: 구독자가 모두 사라진 이후에 정지할 delay를 넣습니다. 0이면 구독자가 모두 사라지는 순간 바로 sharing이 중지 됩니다.
- replayExpriationMilis: replay를 위해 cache 한 값을 유지할 시간을 지정합니다. 구독자가 모두 사라지는 순간 이후로 정해진 시간만큼 대기하다가 cache를 reset 시킵니다. 기본값은 Long의 Max이므로 영원이 cache 되나, 1000ms 또는 5000ms 등 특정 시간을 지정할 경우 해당 시간 이후로 구독자가 생기지 않으면 repaly를 위해 저장했던 값을 모두 초기화시킵니다. stateIn의 경우 초기값이 설정되며, sharedIn의 경우 emtpy 상태가 됩니다. [8] 만약 구독자가 모두 사라지고 cache 삭제를 대기 중에 구독자가 다시 들어온다면 cache가 그대로 유지됩니다.(아래 코드 확인)
만약 이 두 개의 param에 마이너스 값을 넣으면 IllegalArgumentException이 발생합니다. api source code는 아래와 같습니다.
private class StartedWhileSubscribed(
private val stopTimeout: Long,
private val replayExpiration: Long
) : SharingStarted {
...
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
.transformLatest { count ->
if (count > 0) {
emit(SharingCommand.START)
} else {
delay(stopTimeout) //구독자가 다 사라지면 stopTimeout만큼 대기
if (replayExpiration > 0) {
emit(SharingCommand.STOP) //sharing 중지
delay(replayExpiration) // replay reset 대기
}
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
}
}
...
SharingStarted Test
좀 더 세세한 동작을 확인하기 위하여 아래와 같이 test code를 생성하여 각 case별 동작을 추가적으로 확인해 보겠습니다.
//TestViewModel.kt
private val _connectionFlow = flow {
initHeavyLogic() //Expensive logic
var i = 0
while (true) {
delay(500) //500ms 간격으로 0부터 순증값을 emit
emit(i++)
}
}
val connectionFlow = _connectionFlow
private suspend fun initHeavyLogic() {
delay(1000)
Log.d(TAG, "initHeavyLogic()")
}
//MainActivity.kt
MainScope().launch {
testViewModel.connectionFlow.collect {
Log.i(TAG, "connectionFlow: $it")
}
}
위 코드는 기본적으로 0부터 500ms 간격으로 로그를 출력합니다. 여기에 sharedIn 연산자와 아래와 같이 SharingStarted.Eagerly를 사용하여 hot stream flow를 생성합니다.
//TestViewModel.kt
private val _connectionFlow = flow {
...
}
val connectionFlow = _connectionFlow
// Eagerly 사용 및 replay = 1로 설정
val connectionSharedFlow =
_connectionFlow.shareIn(viewModelScope, SharingStarted.Eagerly, 1)
//MainAcitivty.kt
MainScope().launch {
testViewModel.connectionFlow.collect {
Log.i(TAG, "connectionFlow: $it")
}
}
MainScope().launch {
delay(5100) //5초 정도 delay 후에 수신
testViewModel.connectionSharedFlow.collect {
Log.i(TAG, "connectionSharedFlow #2: $it")
}
}
기본적인 flow을 구동시키고 약 5초 후에 sharedFlow의 collect를 시작하면(8번을 수신받은 시점) replay = 1에 따라서 직전 값인 7부터 수신을 시작합니다.
다만 아래 코드처럼 기존 cold stream의 collect가 시작되지 않는다면 collect 자체가 시작을 하지 않기 때문에 shareIn으로 만든 sharedFlow 역시 시작하지 못합니다. 따라서 connectionSharedFlow 단독으로만 collect를 5초 뒤에 설정하더라도 0부터 시작하게 됩니다.
//MainAcitivty.kt
//MainScope().launch { //기본 flow를 동작시키지 않음
// testViewModel.connectionFlow.collect {
// Log.i(TAG, "connectionFlow: $it")
// }
//}
MainScope().launch {
delay(5100) //5초 정도 delay 후에 수신
testViewModel.connectionSharedFlow.collect {
Log.i(TAG, "connectionSharedFlow #2: $it")
}
}
이번에는 SharingStarted.Lazily를 이용하여 동일하게 flow를 만들어 수행시킵니다. 또한 기본 cold flow를 시작시키고 5초 이후에 첫 번째 sharedFlow를 collect, 10초 이후에 동일한 sharedFlow를 collect 시킵니다.
//TestViewModel.kt
...
// Laziliy 사용 및 replay = 1로 설정
val connectionSharedFlow =
_connectionFlow.shareIn(viewModelScope, SharingStarted.Laziliy, 1)
//MainAcitivty.kt
MainScope().launch {
testViewModel.connectionFlow.collect {
Log.i(TAG, "connectionFlow: $it")
}
}
MainScope().launch {
delay(5100)
testViewModel.connectionSharedFlow.collect {
Log.i(TAG, "connectionSharedFlow #1: $it")
}
}
MainScope().launch {
delay(10100)
testViewModel.connectionSharedFlow.collect {
Log.i(TAG, "connectionSharedFlow #2: $it")
}
}
원본 cold stream flow가 10개가 수신되는 시점에(5초 대기후) 첫번째 sharedFlow가 시작됩니다. 이때 Lazily 동작에 언급한 대로 처음 emit 된 값인 0번부터 값을 수신받습니다. 이후 (10초 대기후) 두 번째 sharedFlow가 시작되면 이는 replay = 1에 따라서 6번부터 값을 수신하기 시작합니다.
여기서 "initHeavyLogic()" 이란 로그는 flow가 시작되면서 찍는 로그입니다. 총 세 개의 flow가 collect 되고 있지만 원본 cold stream flow가 시작되면서 하나가 출력되고, 첫 번째 sharedFlow가 생성되면서도 출력됩니다. 또한 두 번째 sharedFlow는 기존에 생성된 flow를 공유하게 되므로 추가적으로 생성되지 않습니다. 이는 위에서 설명했던 single instance of flow가 생성되고 여러 개의 구독자가 추가 생성 없이 결과를 동일하게 전달받음을 확인할 수 있습니다.
※ sharedIn 또는 stateIn으로 생성된 hot stream은 원본 cold stream와 다른 새로운 coroutine을 생성하여 시작됩니다.
마지막으로 SharingStarted.WhileSubscribed로 아래와 같이 설정해 보겠습니다. 위에서 설명했던 것처럼 구독자가 있을 때만 stream이 동작했다가, 구독자가 없어지면 바로 중지되어야 합니다.
//TestViewModel.kt
private val _connectionFlow = flow {
initHeavyLogic() //Expensive logic -> flow 시작시 출력
...
}
val connectionFlow = _connectionFlow
// Eagerly 사용 및 replay = 1로 설정
val connectionSharedFlow =
_connectionFlow.shareIn(viewModelScope, SharingStarted.WhileSubscribed(), 1)
//MainAcitivty.kt
//cold stream 구독
MainScope().launch {
testViewModel.connectionFlow.collect {
//cold stream 시작
}
}
// hot stream을 두번에 나눠서 구독
MainScope().launch {
// 3초 동안만 collect
withTimeoutOrNull(3000) {
testViewModel.connectionSharedFlow.collect {
Log.i(TAG, "connectionSharedFlow #1: $it")
}
}
}
MainScope().launch {
delay (5000)
// 다시 collect 시작
testViewModel.connectionSharedFlow.collect {
Log.i(TAG, "connectionSharedFlow #2: $it")
}
}
구독하는 부분에서 먼저 3초 동안 값을 전달받습니다. 그 이후 2초 동안 대기후에 다시 collect를 수행하여 값을 전달받도록 합니다.
첫 번째 collect는 3초동안 수신후 중지되었고, 그 다음 2초 후에 시작되는 두번째 collect는 첫번째 collect가 중지된 시점의 2를 relay = 1 정책에 따라 전달받고 시작합니다. 다만 0부터 다시 수신받고 있습니다. 실제로 flow가 중지되었는지는 "initHeavyLogic()" 로그로 확인할 수 있습니다. 즉 로그에 출력되어 있는 세 개의 로그는 아래의 의미를 가집니다.
- 첫 번째 initHeavyLogic(): 원본이 되는 cold stream flow 시작됨을 표시
- 두 번째 initHeavyLogic(): shareIn으로 cold stream flow을 SharedFlow로 변환하여 독립된 coroutine으로 hot stream flow가 시작됨
- 세 번째initHeavyLogic(): shareIn으로 cold stream flow을 SharedFlow로 변환하여 독립된 coroutine으로 hot stream flow가 재시작됨 (먼저 시작한 SharedFlow는 중지되었음을 의미)
※ 테스트 코드는 모두 shareIn을 사용하여 SharedFlow로 구성되었지만 stateIn으로 생성된 StateFlow 역시 동일하게 동작합니다.
Upstream completion and error handling
upstream flow가 완료되더라도 기본적으로 구독자들에게 미치는 영향은 없습니다. 만약 upstream flow가 완료되었을 때 어떤 동작을 하고 싶다면 아래와 같이 flow operator인 onCompletion을 수행할 수 있습니다. 또한 catch나 retry로 추가 작업도 가능합니다. operator에 대한 내용은 이번 주제의 범위에서 벗어나므로 해당 글을 참고하시기 바랍니다. [9]
fun main(args: Array<String>) = runBlocking {
foo().onCompletion { cause ->
if (cause != null) println("Flow completed exceptionally: $cause")
}
.catch { cause -> println("Caught exception") }
.sharedIn(scope, SharingStarted.Eagerly)
또한 flow 시작 시 특정 값을 전달받고 싶다면 아래와 같이 onStart() 같은 operator를 지정할 수 있습니다.
val connectionSharedFlow = _connectionFlow
.onStart {emit(-99)}
.shareIn(viewModelScope, SharingStarted.Eagerly)
Buffering and conflation
stateFlow의 경우 buffer를 조정할 수 없으며, conflation 역시 강제적으로 내부 코드에서 적용되어 있음을 확인했습니다. 따라서 buffer와 conflate operator는 shareIn(SharedFlow)에서만 사용 가능합니다. MutableSharedFlow는 생성자로 buffer의 크기를 지정할 수 있으나 shareIn을 사용할 때에는 생성자에 따로 넣어줄 수 없습니다. 따라서 아래와 같은 형태로 shareIn 앞쪽에 buffer, conflate 연산자를 붙여서 해당 기능을 추가해야 합니다.
buffer(0).shareIn(scope, started, 0)
-> SharedFlow의 기본 buffer size(64)를 0으로 변경하여 생성 (buffer 없음)
buffer(b).shareIn(scope, started, r)
-> extraBufferCapacity = b, replay = r인 SharedFlow 생성
conflate().shareIn(scope, started, r)
-> onBufferOverflow = DROP_OLDEST, replay = r인 SharedFlow 생성 (이는 extraBufferCapacitiy = 1 replay = 0과 동일)
Operator fusion
아래의 operator들은 동작하지 않습니다. (has no effect)
SharedFlow
flowOn, buffer with RENDEZVOUS, cancellable
StateFlow
flowOn, buffer with (RENDEZVOUS or CONFLATE), cancellable, distinctUntilChanged
Flow lifecycle for Android
Flow는 LiveData처럼 android에서 lifecycle에 따라 자동으로 멈추거나 재시작되지 않습니다. 따라서 위에서 사용했던 예제의 경우 home키로 화면을 나가더라도 종료되지 않고 계속해서 emit 되는 값을 collect 합니다.
override fun onCreate(savedInstanceState: Bundle?) {
...
MainScope().launch {
testViewModel.connectionFlow.collect {
Log.i(TAG, "connectionFlow: $it")
}
}
...
}
Livedata처럼 android의 lifecycle에 맞춰 동작하도록 구성하려면 repeatOnLifecycle을 사용합니다. 이를 사용하기 위해서는 먼저 아래 라이브러리를 dependency에 추가하도록 합니다.
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.4.0"
그리고 아래와 같이 lifecycle에 의하여 제어되도록 코드를 변경합니다.
override fun onCreate(savedInstanceState: Bundle?) {
...
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
testViewModel.connectionFlow.collect {
Log.i(TAG, "connectionFlow: $it")
}
}
}
...
}
Lifecycle.State.STARTED의 정의는 아래와 같습니다.
//LifeCycle.java
public enum State {
...
/**
* Started state for a LifecycleOwner. For an {@link android.app.Activity},
* this state is reached in two cases:
* - after onStart call;
* - right before onPause call.
*/
STARTED,
...
따라서 activity가 onstart 되는 시점이나 onPause 되는 시점에 새로운 coroutine으로 시작되며, onStop시점에 cancel 됩니다.
References
[1] https://developer.android.com/kotlin/flow/stateflow-and-sharedflow
[2] 2019.11.04 - [개발 이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#10- Asynchronous Flow(1/2)
[3] 2018.12.21 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#7 - Channels
[9] 2019.11.16 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#11- Asynchronous Flow(2/2)
'개발이야기 > Kotlin' 카테고리의 다른 글
[Compose] Desktop application 개발 (1) | 2022.02.18 |
---|---|
[Coroutine] suspendCancellableCoroutine - Callback을 coroutine으로 변경 (4) | 2021.12.20 |
[Coroutine] Callback을 Flow로 변환 - callbackFlow (0) | 2021.11.29 |
[Kotlin] Coroutine - CoroutineDispatcher.limitedParallelism (0) | 2021.11.24 |
Kotlin Coroutines 1.5 - GlobalScope Marked as Delicate, Refined Channel API, and More (0) | 2021.06.07 |