본문으로 바로가기
반응형

이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.

https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

이 블로그 포스팅 당시의(2019.11.04) coroutine 버전은 1.3.2 입니다.


첫번째 글에 이어 포스팅 합니다.

Flow 첫번째 정리 포스팅: https://tourspace.tistory.com/258


Flow context

flow로 만들어진 collection은 이를 호출한 caller의 coroutine context에서 수행되며, 이를 context preservation(context 보존) 이라고 부릅니다.
따라서 collect() 함수 또는 기타 terminal api를 호출하는 coroutine에서 flow block을 처리합니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking {
    foo().collect { value -> log("Collected $value") } 
}            


collect()를 호출한 corouitne의 context는 main thread를 사용하므로 flow의 body 영역의 코드 역시 main thread에서 처리됩니다.

결과.

[main @coroutine#1] Started foo flow

[main @coroutine#1] Collected 1

[main @coroutine#1] Collected 2

[main @coroutine#1] Collected 3



Wrong emission withContext

하지만 CPU를 많이 사용하는 flow 연산이라면 background thread에서 수행하고, 이 결과를 받아 UI를 업데이트 하는 작업은 main thread에서 처리하도록 해야 하는경우가 발생할 수 있습니다.

아마도 많이 발생하는 상황입니다.

보통 coroutine에서는 context switching을 withContext를 이용하여 쉽게 전환할수 있습니다.

하지만 위에서 언급한 context preservation속성으로 인하여 emit을 하는 context와 수신하는 context를 다르게 하지 못하도록 되어 있습니다.

fun foo(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking {
    foo().collect { value -> println(value) } 
}

이 코드를 수행하면 아래와 같은 에러가 발생합니다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:

Flow was collected in [BlockingCoroutine{Active}@5232da77, BlockingEventLoop@5bf30999],

but emission happened in [DispatchedCoroutine{Active}@6daf0b39, DefaultDispatcher].

Please refer to 'flow' documentation or use 'flowOn' instead

at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:96)

at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:30)

at com.skt.coroutine.HelloKt$foo2$1$1.invokeSuspend(Hello.kt:32)

at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)

at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)

at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)

at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)

at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)


flowOn operator

이런 경우exception에 언급된것 처럼 withContext가 아니라 flowOn operater를 이용하여 emission하는 부분의 context를 바꿔줄 수 있습니다.

fun main(args: Array<String>) = runBlocking {
    println("main start!")
    foo2().collect { log("Collected $it")  }
    println("main end!")
}

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")결과

결과를 찍어보면 원하는대로 호출부분과 실제 flow를 처리하는 부분이 다른 thread로 바껴 있음을 확인할 수 있습니다.

main start!
[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
main end!

flowOn을 사용함으로써 기본적으로 하나의 coroutine이 emission과 collection을 순차적으로 처리하는 flow를 변경시킵니다.

이 말은 emission과 collection이 각각의 coroutine에서 동시에 진행하게 된다는 의미 입니다.

(물론 예제에서는 Dispatch.Default로 옵션을 주었으므로 각각의 coroutine을 수행하는 주체도 다른 thread가 됩니다. )

결과에는 찍히지 않았지만, collection은 coroutine#1, emission은 coroutin#2에서 수행됩니다.


※ 예제를 직접 수행해 보니 정상동작을 하기는 하지만 flowOn()은 현재 코루틴 버전에서 (v1.3.2) experimental annotation이 붙어 있습니다.

버전이 올라가면 동작기능 변경없이 experimental이 풀리지 않을까 싶네요.


Buffering

기본적으로 Flow는 channel과 마찬가지로 produce and consumer pattern 입니다. 
만약 값을 생산하는 쪽과 소비하는쪽중에 한쪽 또는 양쪽 모두 느리다면 두 부분을 분리해서 처리하는 코루틴은 전체 처리 시간을 감소시키는 효과를 낼 수 있습니다.

아래 예제는 값을 생성하는데 100ms, 소비하는데 300ms가 걸리는 코드 입니다.
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking { 
    val time = measureTimeMillis {
        foo().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

값을 emit 하는데 100ms, 처리하는데 300ms이 걸리니 원소 하나당 총 400ms 시간이 필요합니다.

따라서 단순 계산으로 400ms x 3개원소 = 1200ms이 걸립니다.

1

2

3

Collected in 1220 ms

하지만 값을 생산하는쪽과 소비하는쪽을 분리해서 처리한다면 전체 processing 시간을 감소시킬수 있습니다.

fun foo(): Flow<Int> = flow {
...
}

fun main() = runBlocking { 
    val time = measureTimeMillis {
        foo()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

Collecting 하는 부분에 buffer()를 달아서 processing pipeline을 만들면 좀더 효율적으로 동작합니다.

1

2

3

Collected in 1071 ms

처음 데이터가 발생되는데 100ms + 출력하는데 300ms * 3개원소 = 1000ms의 시간이 소요 되었습니다.

emit() 하는 부분에 buffer를 만들고 emit()과 collect()의 동작을 순차적인 처리가 아닌 pipelining을 통해 동시에 동작하도록 하여 시간을 감소 시켰습니다.

flowOn operator역시 동일한 buffer mechanism을 사용합니다. 다만 context를 바꿔서 사용하기 때문에 buffer의 사용이 필수적이겠죠? (예제에서는 buffer의 역할을 보여주기 위해 context변경 없이 사용했습니다.) 



Conflation

flow는 연속적으로 값을 처리하여 emit(방출) 합니다.

만약 flow에서 내놓는 값이 어떤 연산의 중간값이거나, 상태값의 업데이트라면, 마지막 최신값만 의미있는 값이라고 볼수 있습니다.

collector의 동작이 매우 느리다면 conflate operator를 사용하여 중간값은 skip하도록 구현할 수 있습니다.

collector가 값을 처리하는 시점에서 emit되어 쌓여있는 값을 하나씩 처리하는게 아니라 쌓여있는 중간 값은 모두 버리고 마지막 값만 취하는거죠.

fun main(args: Array) = runBlocking {
    val time = measureTimeMillis {
        foo().conflate()
                .collect { value ->
                    try {
                        delay(300) // pretend we are processing it for 300 ms
                        println("Done $value")
                    } catch (ce: CancellationException) {
                        println("Cancelled $value")
                    }
                }
    }
    println("Collected in $time ms")
}

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
        println("emit $i")
    }
}


결과는 아래와 같습니다.

emit 1

emit 2

emit 3

Done 1

Done 3

Collected in 739 ms

총 3개를 방출했지만 2번은 생략되었습니다.

문득 드는 생각은 progress update를 할때 사용하면 좋겠다는 생각이 들더군요.

다운로드같이 상태를 업데이트 하는경우 background에서 다운로드를 하고 UI thread로 progress를 넘겨줄때 사용하면 유용해 보입니다.

또한 다수의 검색을 진행할때 중간중간 결과를 UI로 넘겨주는 형태를 만들때도 유용해 보입니다.


하지만 현재 conflate 역시 experimental 입니다.

하지만, API의 큰 변화없이 추후 버전에서 experimental이 없어지지 않을까 싶네요.


Processing the latest value

conflation은 emitter와 collector 둘다 느린경우 emit된 값을 drop 시키는 방법으로 processing 속도를 높이는데 사용합니다.

다른 방법으로 느린 collector가 이를 처리하기 전에 다른 값을 전달 받으면 이전 collector를 취소하고 새로 전달받은 값을 처리하도록 재시작하도록 할 수 있습니다.

collectLatest() operator를 사용하면 collector 동작중 새로운 값이 emit되어 전달받으면 기존 collect() 동작을 취소하고 새로운 값을 위한 collector를 재시작 시킵니다.

xxxLatest()처럼 xxx~로 시작하는 operator들은 동일하게 기존동작을 취소하고 최근값을 처리합니다.

println("main start!")
    val time = measureTimeMillis {
        foo().collectLatest { value ->
                    try {
                        println("collect $value")
                        delay(300) // pretend we are processing it for 300 ms
                        println("Done $value")
                    } catch (ce: CancellationException) {
                        println("Cancelled $value")
                    }
                }
    }
    println("Collected in $time ms")
    println("main end!")
}

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}


collect 동작중에 새로운 작업이 들어오면 기존 collect를 취소하고 재시작하므로 내부적으로는 CancellationException이 발생합니다.

결과는 아래와 같습니다.

main start!

collect 1

Cancelled 1

collect 2

Cancelled 2

collect 3

Done 3

Collected in 657 ms

main end!


시간은 conflate보다 더 줄었습니다.

하지만 실제 collect에서 처리된 값은 3 하나 입니다. (conflate의 경우 1과 3 두개가 collect로 처리됨)

conflatexxxLatest의 차이를 잘 구분하여 상황에 따라 맞는 operator를 사용하면 됩니다.

collectLatest는 현재버전(v1.3.2)에서는 experimental api 입니다.


Composing multiple flows

여러개의 flow를 병합하는 다양한 방법을 소개합니다.

Zip

flow에서 zip operator는 Kotlin 기본 라이브러리인 Sequence.zip extension function과 동일하게 두개의 flow를 병합하는 작업을 제공합니다.


fun main() = runBlocking {                           
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
}


결과는 아래와 같습니다.

1 -> one

2 -> two

3 -> three


zip의 경우 두개의 flow가 개수가 다르다면 적은 개수에 맞춰서 출력됩니다.

즉, 두개의 flow중 하나는 원소가 4개, 하나는 원소가 10개라면 출력값은 총 네개 입니다.

앞쪽 원소부터 하나씩 꺼내서 병합합니다.

zip은 현재버전(v1.3.2)에서는 experimental api 입니다.


Combine

flow가 conflation처럼 최신값이나 최종 연산값만을 사용하는 형태라면, 현재 flow에서 최신값만을 기준으로 연산하는 작업을 수행하도록 할 수 도 있습니다.

즉 두개의 flow에서 값을 emit 하고 서로 다른 타이밍으로 방출될때, 최신값만을 기준으로 두개의 방출값을 연산하도록 할수 있습니다.

말이 너무 어렵네요..


예제를 통하면 쉽게 이해할 수 있습니다.

먼저 zip을 이용해서 예제를 만들어 봅니다.

fun main(args: Array<String>) = runBlocking {
    println("main start!")
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
            .collect { value -> // collect and print
                println("$value at ${System.currentTimeMillis() - startTime} ms from start")
            }
    println("main end!")
}
두개의 flow가 각각 300ms / 400ms 으로 값을 방출합니다.

zip을 사용할 경우 아래 결과 처럼 두개의 최대 공배수인 400ms 단위로 값을 병합하여 출력합니다.

결국, 느린놈에 맞추는거죠.

main start!

1 -> one at 426 ms from start

2 -> two at 831 ms from start

3 -> three at 1234 ms from start

main end!

결과를 보면 약 400ms 간격으로 로그가 출력되는걸 알수 있습니다.

※ 위 예제에서는 onEach()를 사용하여 보다 명확하고 짧게 표현했습니다.


만약 같은 코드에 combine을 사용하면 어떻게 될까요?

fun main() = runBlocking {                                     
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

똑같은 코드로 zip 대신 combine을 사용했습니다.

결과는 아래와 같습니다.

1 -> one at 452 ms from start

2 -> one at 651 ms from start

2 -> two at 854 ms from start

3 -> two at 952 ms from start

3 -> three at 1256 ms from start

일단 출력된 결과의 개수가 다릅니다.

이는 각각의 flow가 본인이 방출하는 시점에 맞춰 최신값으로 merge하기 때문입니다.

먼저 nums가 300ms 이후에 "1"을 emit하였으나, strs에는 아지 emit된 값이 없기 때문에 combine 되지 못합니다.

그리고 400ms을 기다렸다가 str이 1을 emit하는 시점에 맞춰 "1 -> one"을 combine 합니다.

그리고 nums 기준으로 300ms 이후에 "2"를 emit하고 strs의 최신값인 one과 병합하여 "2 -> one" 을 combine 합니다.

그 다음 strs 기준으로 800ms가 지나면서 "two"가 emit 되고 nums의 최신값인 "2"와 병합하여 "2->two"가 combine 됩니다.

combine의 동작을 정리하면 아래와 같습니다.

  • 두개의 flow을 병합한다.
  • 병합은 두개의 값이 모두 존재해야만 가능하다. (둘중 하나라도 값이 없는 상태에서 combine이 수행되는 경우 해당 원소는 병합되지 못하고 생략된다.)
  • 두개의 flow가 각각 emit하는 시점에 각 flow의 최신 값으로 병합된다.
  • flow 두개의 출력의 타이밍에 따라 병합된 개수가 결정되므로 각 flow의 원소 개수와 combine의 출력 개수와는 pair가 맞지 않는다.


좀더 명확한 이해를 돕기위해 하나의 예제를 더 추가합니다.

명확하게 combine을 이해했다면 아래의 예제의 결과 또한 충분히 예측 가능합니다. 

fun main(args: Array) = runBlocking {
    println("main start!")
    val nums = (1..3).asFlow().onEach { delay(100) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three", "four").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
            .collect { value -> // collect and print
                println("$value at ${System.currentTimeMillis() - startTime} ms from start")
            }
    println("main end!")
}

결과...

main start!

3 -> one at 431 ms from start

3 -> two at 834 ms from start

3 -> three at 1234 ms from start

3 -> four at 1637 ms from start

main end!

combine은 현재버전(v1.3.2)에서는 experimental api 입니다.


Flattening flows

Flow는 비동기로 sequence의 값을 전달받는 형태로 동작하기 때문에 각 원소가 또다시 다른 sequence를 요청하는 상황을 만나기가 쉽습니다.
간단하게 말해 자바나 코틀린에서 list를 처리할때 각각의 원소(stream)가 list로 나오게 되면 반환값이 List<List<String>> 같은 형태를 되므로, 이를 처리하기 위해서 flatmap() 같은 operator를 사용합니다.

예를 들어 아래와 같이 Flow<Flow<String>> 형태를 반환하는 상황을 만날 수 있습니다.
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

(1..3).asFlow().map { requestFlow(it) } // map의 반환값은 Flow<Flow<String>> 임.


기본적으로 collection이나 sequence에서 중첩된 상태를 flat하기 만들기위해 flatten이나 flatMap 함수를 지원합니다.

flow 역시 유사하게 비동기로 동작하는 특성을 지원하기 위한 flatten관련 operator를 지원합니다.

flatMapConcat

flatMapConcat 또는 flattenConcat은 sequence의 특성과 가장 유사하게 flow를 연결하는 operator입니다.
이 operator는 내부 flow가 완료되어야만 다음 외부 collect를 수행합니다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

예제에서 100초 간격으로 1~3까지 flow가 emit 됩니다.

이때 requestFlow()를 또 수행하므로 출력형태는 Flow<Flow<String>>이 되어 flatMapConcat으로 한겹을 벗겨 줍니다.

그리고 나서 collect를 통해 emit된 값을 받아서 출력합니다.

1: First at 121 ms from start

1: Second at 622 ms from start

2: First at 727 ms from start

2: Second at 1227 ms from start

3: First at 1328 ms from start

3: Second at 1829 ms from start


외부의 flow가 차례로 시작되고, 내부의 flow역시 순서대로 시작되어 내부 flow가 완료 되어야만 다음 외부 flow의 collect가 진행됨을 알수 있습니다.

만약 flatMapContact 대신 map을 사용했다면 어떻게 될까요? 


kotlinx.coroutines.flow.SafeFlow@7e0b37bc at 112 ms from start

kotlinx.coroutines.flow.SafeFlow@3b95a09c at 217 ms from start

kotlinx.coroutines.flow.SafeFlow@6ae40994 at 320 ms from start

이런게 외부 flow만 emit되고 Flow<String> 내부의 코드는 수행되지 못합니다. (Flow<String>의 객체 hashcode가 찍힙니다.)

flatMapContact는 현재버전(v1.3.2)에서는 experimental api 입니다.


flatMapMerge

또다른 flattening mode로는 동시에 emit 가능한 값들을 emit 시키고 들어오는 모든 값들을 하나의 flow로 병합하여 collect할 수 도 있습니다.

flatMapMergeflattenMerge operator가 이러한 역할을 수행하며 parameter로 concurrency 값을 넘겨줄 수 있습니다.

(concurrency 값은 동시에 받을 수 있는 flow의 개수를 제한하는 값으로 기본값은 DEFAULT_CONCURRENCY 입니다. 

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        }
}

위와 동일한 예제로 flatMapConcat 대신에 flatMapMerge를 사용했습니다.

결과는 아래와 같습니다

1: First at 136 ms from start

2: First at 231 ms from start

3: First at 333 ms from start

1: Second at 639 ms from start

2: Second at 732 ms from start

3: Second at 833 ms from start

외부 flow는 외부 flow대로 수행되고, 내부적으로 발생하는 flow역시 동시에 수행되면서 하나의 flow로 병합되어 collect 됩니다.

따라서 flatMapContact처럼 순서를 보장하지 않고 외부, 내부의 flow가 각각 수행되는 형태를 취합니다.

flatMapMerge는 현재버전(v1.3.2)에서는 experimental api 입니다.


flatMapLatest

flatMapLatestcollectLatest와 유사하게 flow에서 emit 발생시 이전에 대기중이거나 동작중인 flow는 cancel 시킵니다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        }
}

다른 예제와 동일하게 flatMapLatest부분만 변경했습니다.

결과는 아래와 같습니다.

1: First at 142 ms from start

2: First at 322 ms from start

3: First at 425 ms from start

3: Second at 931 ms from start

requestFlow() 함수 내부의 Second는 500ms 대기중에 외부 flow의 emit으로 인하여 cancel되며, 외부 flow의 emit이 완료되고 난뒤 마지막 second만 출력됩니다.

그럼 내부의 second가 emit되면 외부의 flow를 cancel 시킬까요?

만약 그렇다면 문제가 되겠죠?

하지만 의심이 생길수 있으니 아래와 requestFlow()의 delay를 50ms로 변경한후 돌려 봅니다.

fun main(args: Array) = runBlocking {
   ...
}

fun requestFlow(i: Int): Flow<String> = flow {
     emit("$i: First")
     delay(50) // wait 50 ms
     emit("$i: Second")    
}

결과는 아래와 같습니다.

1: First at 136 ms from start

1: Second at 186 ms from start

2: First at 237 ms from start

2: Second at 290 ms from start

3: First at 338 ms from start

3: Second at 389 ms from start

외부의 flow동작보다 내부 flow의 동작이 빠르기 때문에 순서대로 전부 emit 되어 출력됩니다.

또한 xxxLatest operator의 경우 위에서 언급한것 처럼 새로운 flow가 emit되면 이전 flow는 cancel 시킵니다.

이때 cancel되는 부분은 { requestFlow(it) } 입니다. ( {...} 블럭 내부를 전부 cancel 시킵니다.

 따라서 아래와 같이 try catch로 묶는다면 cancel에 대한 처리도 진행할 수 있습니다.

fun main(args: Array) = runBlocking {
 ...
}

fun requestFlow(i: Int): Flow<String> = flow {
    try {
        emit("$i: First")
        delay(50) // wait 500 ms
        emit("$i: Second")
    } catch (ce: CancellationException) {
        println("cancelled!!")
    }
}

위에서 본것과 같은 예제로, 두번째 "Second"는 맨 마지막번째를 제외하고는 전부 취소처리 되므로 아래와 같은 결과가 나옵니다.

1: First at 137 ms from start

cancelled!!

2: First at 245 ms from start

cancelled!!

3: First at 347 ms from start

3: Second at 851 ms from start

보통 내부의 flow 동작은 그 자체로도 매우 빠르기며, 내부적으로 suspending code가 없는 경우 cancel을 할수 없습니다.

하지만  예제에서는 requestFlow() 내부에 delay를 명시적으로 주었기 때문에 cancel됨을 명시적으로 표현했습니다.

flatMapLatest는 현재버전(v1.3.2)에서는 experimental api 입니다.


Flow exceptions

collection은 emitter나 다른 operator에서 발생하는 exception으로 완료 처리될 수 있습니다.
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    try {
        foo().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}       

가장 간단하게 collect를 외부에서 try-catch로 감싸면서 exception을 처리할 수 있습니다.

Emitting 1

1

Emitting 2

2

Caught java.lang.IllegalStateException: Collected 2

everything is caught

위 예제에서 try-catch문은 collect를 감싸고 있기 때문에 emiiter에서 발생한 exception 뿐만 아니라 중간 operator에서 발생한 exception, terminal operator에서 발생한 exception 모두를 처리합니다.

아래 예제는 emit된 값을 한번더 string으로 mapping하는 코드로 변경하고 여전히 exception을 발생 시키도록 합니다.

fun foo(): Flow<Int> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking {
    try {
        foo().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}           

위 코드 역시 collect를 감싼 try-catch문에서 처리되면 collection은 stop 됩니다.

Emitting 1

string 1

Emitting 2

Caught java.lang.IllegalStateException: Crashed on 2


Exception transparency

emitter의 코드가 자신에 의해서 발생한 exception의 handling을 캡슐화 하려면 어떻게 해야 할까요?

Flow문은 exception에 투명해야 합니다.
하지만 try-catch문 내부에서 flow{...} builder가 값을 emit 하는 것은 Exception transparency에 어긋납니다.
위 예제에서 보듯이 emitter에서 error가 발생하였으나, collector를 수행하면서 collector를 통해서 exception이 catch되기 때문입니다.
반면에 전체를 감싸고 있기 때문에 collector가 발생시키는 에러는 모두 catch 할수 있음이 보장 되기도 합니다.

따라서 emiiter에서는 catch라는 operator를 이용하여 exception의 투명성을 보장하고 exception handling을 encapsulation 할수 있습니다.
catch operator의 body 내부에서 exception을 분석하고 이에 따라 아래와 같은 다른 처리를 할수 있습니다.

  • throw를 이용하여 reThrown 할 수 있습니다.
  • catch의 body 내부에 emit 함수를 이용하여 값을 emission 할 수 있습니다.
  • 무시하거나, 로그로 남기거나 다른 처리를 하는등의 코드가 삽입될 수 있습니다.

아래 예제에서는 exception 발생시 error 관련 문구를 emit 하도록 처리하고 있습니다.
fun foo(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking {
    foo()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}
결과는 위 예제와 동일 합니다.

만약 exception에 따라서 다른 처리가 필요하다면 아래와 같이 사용할수 있겠죠?
fun main(args: Array<String>) = runBlocking {
    foo().catch { e ->
        when (e) {
            is IllegalStateException -> emit("Illeagl Ex. Caught $e")
            is NullPointerException -> emit("NPE Ex. Caught $e")
            else -> emit("Unknown exception")
        } // emit on exception
    }.collect { value -> println(value) }
}


fun foo(): Flow<String> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}.map { value ->
    //    check(value <= 1) { "Crashed on $value" }
    if (value == 2) {
        throw java.lang.NullPointerException("Make NPE")
    }
    "string $value"
}

강제로 NPE를 발생 시켰으며, catch{...} body에 따라 NPE 관련 문구가 emit되어 처리 됩니다.

Transparent catch

catch는 downstream에서 발생한 에러만 처리 가능합니다.

따라서 아래와 같이 collect에서 발생한 에러는 catch 할 수 없습니다.

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    foo()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

collect에 check문이 있으니, 2를 collect 할때 exception이 발생하면서 코드가 죽습니다.

하지만 catch에 구현에 놓은 "Caught..." 문구는 출력되지 않습니다.


Catching declaratively

위와 같이 collect 내부의 코드는 catch로 exception을 처리할 수 없습니다.

따라서 모든 에러를 catch로 handling 하기 위해서는 collect의 body를 onEach로 옴기고 그 이후에 catch를 연결하면 전체 구문의 외부에 try-catch를 하지 않아도 동일한 처리를 할 수 있습니다.

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    foo()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
        .catch { e -> println("Caught $e") }
        .collect()
}          

onEach에 collect의 body를 옴겨서 처리구문을 emitter의 일부로 변경했습니다.

그리고 나서 catch를 처리하면 "Caught..." 가 출력되면서 정상적으로 동작함을 알 수 있습니다.

또한 Flow을 시작 시키기 위하여 param 없는 collect()를 호출합니다.


catch는 현재버전(v1.3.2)에서는 experimental api 입니다.


Flow completion

flow는 정상적으로 emit한 값을 전부 collect 하거나, 중간에 exception이 발생한 경우나 모두 완료로 처리합니다.
보통 collection이 완료되면 특정한 작업을 진행하도록 할 수 있으며, 명시적으로, 암시적으로 이 작업을 지정해 줄수 있습니다.

Imperative finally block
try-catch의 finally block을 사용하면 collect 완료 이후의 작업을 선언해 줄수 있습니다.

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking {
    try {
        foo().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

결과는 아래와 같습니다.

1

2

3

Done

사실 try-finally 구문 없이 collect 다음에 println("Donn") 을 해도 똑같은 값이 출력됩니다.

이전 #1번 글에서 언급 했듯이 flow가 complete 할때까지는 collect라인을 넘어가지 못하기 때문에 flow의 처리가 완료되어야 다음 라인을 실행합니다.

(하지만 collect 라인에 걸려있는 동안 thread 자체는 block하지는 않는다고도 언급했었죠?)


Declarative handling

명시적으로 flow 종료후 작업을 선언하려면 intermediate operator인 onCompletion을 추가하면 됩니다.

이전 예제를 onCompletion을 사용하여 바꿔 보겠습니다.

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking {
    foo()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}
결과는 동일합니다.

사실 onCompetion의 가장 큰 장점은 람다식에서 nullable한 Throwable을 param으로 넘겨주기 때문에 collect의 완료가 정상적으로 되었는지, exception 발생 때문이었는지를 판단할수 있도록 해줍니다.

fun main(args: Array<String>) = runBlocking {
    foo()
            .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally: $cause") }
            .catch { cause -> println("Caught exception") }
            .collect { value -> println(value) }
    println("done")
}


fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        emit(1)
        throw RuntimeException()
    }
}
결과는 아래와 같습니다.
1
Flow completed exceptionally: java.lang.RuntimeException
Caught exception
done

onCompletion은 exception을 handling 하지 않고, 아래 방향으로 그대로 전달합니다.

따라서 그 이후의 catch에서 exception을 처리하게 됩니다.


Upstream exceptions only

catch operator와 동일하게 onCompletion 역시 위에서 내려오는(upstream) exception은 확인할 수 있으나 아래쪽(downstream)의 exception은 확인하지 않습니다.

fun foo(): Flow = (1..3).asFlow()

fun main() = runBlocking {
    foo()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

결과는 아래와 같습니다.


1

Flow completed with null

Exception in thread "main" java.lang.IllegalStateException: Collected 2

따라서 이를 방지하려면 하나 위 section에서 언급했던것 처럼 onEach를 이용해서 collect문을 emitter로 포함시키면 됩니다.

Completion은 현재버전(v1.3.2)에서는 experimental api 입니다.


Imperative versus declarative

앞서서 Flow를 collect 하는 방법과, completion를 handling 하는 방법, exception을 암시적으로, 명시적으로 handling 하는 방법에 대해 설명했습니다.
이쯤에서 명시적이든 암시적이든 어떤 방법이 더 좋은가에 대한 의문이 생길 수 있습니다.
flow는 library로서, 특정한 방법이 더 좋다고 옹호하지 않습니다.
본인의 기호나, 코딩 스타일에 따라서 선택해서 사용하면 되며, 두 방법 모두 유효합니다.

Launching flow

어떤 source로 부터 전달되는 비동기적인 event를 flow를 이용하여 쉽게 표현 할 수 있습니다.
이런 경우 수신쪽에서 event가 발생할 때마다 특정 동작을 처리하도록 하고 그 이후의 다른 코드는 계속 진행되도록 하기 위해서 addEventListener 같은 기능이 필요할수도 있습니다. 

onEach operator가 바로 그런 역할을 제공하는데, onEach operator는 intermediate operator이므로 자체적으로 collect를 수행하지 못합니다.
따라서 onEach로 event 발생시 동작을 정의하고, 그 다음으로 아무 parameter가 없는 collect()를 이용하여 flow를 구동시키도록 하여 eventListener와 유사하게 구현할 수 있습니다.

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}
결과는 아래와 같습니다.
Event: 1
Event: 2
Event: 3
Done

collect() 함수를 만나면 모든 작업이 끝날때 까지 대기합니다.

따라서 Done이 마지막에 출력됩니다.


하지만 우리가 eventListener를 등록하는 이유는 코드는 계속 진행하되, 특정 이벤트가 발생시에만 등록된 코드를 동작시키기 위합니다.

이때 collect 대신 launchIn을 사용하여 분리된 coroutine으로 시작하고 이후 코드는 바로 실행되도록 할 수 있습니다.

// Imitate a flow of events
fun events(): Flow = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}
결과는 아래와 같습니다.

Done
Event: 1
Event: 2
Event: 3

Done이 바로 찍히고 flow는 새로운 launch에서 수행됩니다.

이때 launchIn의 parameter로 coroutineScope을 명시적으로 넣어줘야 합니다.

주어진  scope에서 flow가 launch 되며, 예제에서는 runBlocking builder의 this가 들어 갔으니, coroutine의 structured concurrency에 의해서 flow가 완료되어야만 runBlocking의 블럭이 종료됩니다.

보통 어떤 lifetime을 갖는 entity로 부터 scope을 받아서 사용하며, lifetime이 종료되면 scope이 cancel되면서 이 scope에 속해있던 flow의 동작도 cancelling 됩니다.

onEach{..}.launchIn(this) 구문은 마치 addEventListener 처럼 동작하지만 coroutine의 structured concurrency에 의해서 따로 removeEventListener를 불러줄 필요가 없다는 장점을 가집니다.

추가로 launchIn은 job을 return 합니다.

따라서 필요에 따라 scope의 취소 없이 flow만을 취소할 수도 있고, join을 이용하여 완료될때 까지 특정 시점에 대기 시킬수도 있습니다.

(아마도 launchIn의 내부 구현은 this.launch { flow {...}..}의 형태로 되어 있을꺼라고 예상되네요~)


Flow and Reactive streams

Reactive streams이나 RxJava, Project reactor같은 reactive framework에 친숙한 사람들은 Flow의 design이 이것들과 매우 비슷하다고 생각할 수 있습니다.
Flow는 실제로 Reactive streams과 reactive stream의 다양한 구현에서 영감을 받았습니다.
하지만 flow는 가능한 simple하면서 코틀린스럽고, suspend에 친숙하며 structured concurrency를 유지하도록 설계되었습니다.
이러한 목적을 달성하기 까지는 reactive 선구자들의 엄청난 노력이 없었다면 불가능했습니다.

컨셉상으로는 다르지만 Flow는 reactive stream으로써 다른 reactive Publisher와 상호간에 변환 할 수 있습니다.
이러한 converter는 kotlinx.coroutines외부에서 제공되며, 해당 reactive module들에서 찾을 수 있습니다.

reactive modules
  • Reactive Streams: kotlinx-coroutines-reactive
  • Project Reactor: kotlinx-coroutines-reactor
  • RxJava2: kotlinx-coroutines-rx2


반응형