이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
이 블로그 포스팅 당시의(2019.11.04) coroutine 버전은 1.3.2 입니다.
첫번째 글에 이어 포스팅 합니다.
Flow 첫번째 정리 포스팅: https://tourspace.tistory.com/258
Flow context
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking {
foo().collect { value -> log("Collected $value") }
}
collect()를 호출한 corouitne의 context는 main thread를 사용하므로 flow의 body 영역의 코드 역시 main thread에서 처리됩니다.
결과.
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
Wrong emission withContext
하지만 CPU를 많이 사용하는 flow 연산이라면 background thread에서 수행하고, 이 결과를 받아 UI를 업데이트 하는 작업은 main thread에서 처리하도록 해야 하는경우가 발생할 수 있습니다.
아마도 많이 발생하는 상황입니다.
보통 coroutine에서는 context switching을 withContext를 이용하여 쉽게 전환할수 있습니다.
하지만 위에서 언급한 context preservation속성으로 인하여 emit을 하는 context와 수신하는 context를 다르게 하지 못하도록 되어 있습니다.
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking {
foo().collect { value -> println(value) }
}
이 코드를 수행하면 아래와 같은 에러가 발생합니다.
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@5232da77, BlockingEventLoop@5bf30999],
but emission happened in [DispatchedCoroutine{Active}@6daf0b39, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:96)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:30)
at com.skt.coroutine.HelloKt$foo2$1$1.invokeSuspend(Hello.kt:32)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
flowOn operator
이런 경우exception에 언급된것 처럼 withContext가 아니라 flowOn operater를 이용하여 emission하는 부분의 context를 바꿔줄 수 있습니다.
fun main(args: Array<String>) = runBlocking {
println("main start!")
foo2().collect { log("Collected $it") }
println("main end!")
}
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")결과
결과를 찍어보면 원하는대로 호출부분과 실제 flow를 처리하는 부분이 다른 thread로 바껴 있음을 확인할 수 있습니다.
main start!
[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
main end!
flowOn을 사용함으로써 기본적으로 하나의 coroutine이 emission과 collection을 순차적으로 처리하는 flow를 변경시킵니다.
이 말은 emission과 collection이 각각의 coroutine에서 동시에 진행하게 된다는 의미 입니다.
(물론 예제에서는 Dispatch.Default로 옵션을 주었으므로 각각의 coroutine을 수행하는 주체도 다른 thread가 됩니다. )
결과에는 찍히지 않았지만, collection은 coroutine#1, emission은 coroutin#2에서 수행됩니다.
※ 예제를 직접 수행해 보니 정상동작을 하기는 하지만 flowOn()은 현재 코루틴 버전에서 (v1.3.2) experimental annotation이 붙어 있습니다.
버전이 올라가면 동작기능 변경없이 experimental이 풀리지 않을까 싶네요.
Buffering
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
값을 emit 하는데 100ms, 처리하는데 300ms이 걸리니 원소 하나당 총 400ms 시간이 필요합니다.
따라서 단순 계산으로 400ms x 3개원소 = 1200ms이 걸립니다.
1
2
3
Collected in 1220 ms
하지만 값을 생산하는쪽과 소비하는쪽을 분리해서 처리한다면 전체 processing 시간을 감소시킬수 있습니다.
fun foo(): Flow<Int> = flow {
...
}
fun main() = runBlocking {
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
Collecting 하는 부분에 buffer()를 달아서 processing pipeline을 만들면 좀더 효율적으로 동작합니다.
1
2
3
Collected in 1071 ms
처음 데이터가 발생되는데 100ms + 출력하는데 300ms * 3개원소 = 1000ms의 시간이 소요 되었습니다.
즉 emit() 하는 부분에 buffer를 만들고 emit()과 collect()의 동작을 순차적인 처리가 아닌 pipelining을 통해 동시에 동작하도록 하여 시간을 감소 시켰습니다.
※ flowOn operator역시 동일한 buffer mechanism을 사용합니다. 다만 context를 바꿔서 사용하기 때문에 buffer의 사용이 필수적이겠죠? (예제에서는 buffer의 역할을 보여주기 위해 context변경 없이 사용했습니다.)
Conflation
flow는 연속적으로 값을 처리하여 emit(방출) 합니다.
만약 flow에서 내놓는 값이 어떤 연산의 중간값이거나, 상태값의 업데이트라면, 마지막 최신값만 의미있는 값이라고 볼수 있습니다.
collector의 동작이 매우 느리다면 conflate operator를 사용하여 중간값은 skip하도록 구현할 수 있습니다.
collector가 값을 처리하는 시점에서 emit되어 쌓여있는 값을 하나씩 처리하는게 아니라 쌓여있는 중간 값은 모두 버리고 마지막 값만 취하는거죠.
fun main(args: Array) = runBlocking {
val time = measureTimeMillis {
foo().conflate()
.collect { value ->
try {
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
} catch (ce: CancellationException) {
println("Cancelled $value")
}
}
}
println("Collected in $time ms")
}
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
println("emit $i")
}
}
결과는 아래와 같습니다.
emit 1
emit 2
emit 3
Done 1
Done 3
Collected in 739 ms
총 3개를 방출했지만 2번은 생략되었습니다.
문득 드는 생각은 progress update를 할때 사용하면 좋겠다는 생각이 들더군요.
다운로드같이 상태를 업데이트 하는경우 background에서 다운로드를 하고 UI thread로 progress를 넘겨줄때 사용하면 유용해 보입니다.
또한 다수의 검색을 진행할때 중간중간 결과를 UI로 넘겨주는 형태를 만들때도 유용해 보입니다.
하지만 현재 conflate 역시 experimental 입니다.
하지만, API의 큰 변화없이 추후 버전에서 experimental이 없어지지 않을까 싶네요.
Processing the latest value
conflation은 emitter와 collector 둘다 느린경우 emit된 값을 drop 시키는 방법으로 processing 속도를 높이는데 사용합니다.
다른 방법으로 느린 collector가 이를 처리하기 전에 다른 값을 전달 받으면 이전 collector를 취소하고 새로 전달받은 값을 처리하도록 재시작하도록 할 수 있습니다.
collectLatest() operator를 사용하면 collector 동작중 새로운 값이 emit되어 전달받으면 기존 collect() 동작을 취소하고 새로운 값을 위한 collector를 재시작 시킵니다.
xxxLatest()처럼 xxx~로 시작하는 operator들은 동일하게 기존동작을 취소하고 최근값을 처리합니다.
println("main start!")
val time = measureTimeMillis {
foo().collectLatest { value ->
try {
println("collect $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
} catch (ce: CancellationException) {
println("Cancelled $value")
}
}
}
println("Collected in $time ms")
println("main end!")
}
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
collect 동작중에 새로운 작업이 들어오면 기존 collect를 취소하고 재시작하므로 내부적으로는 CancellationException이 발생합니다.
결과는 아래와 같습니다.
main start!
collect 1
Cancelled 1
collect 2
Cancelled 2
collect 3
Done 3
Collected in 657 ms
main end!
시간은 conflate보다 더 줄었습니다.
하지만 실제 collect에서 처리된 값은 3 하나 입니다. (conflate의 경우 1과 3 두개가 collect로 처리됨)
conflate와 xxxLatest의 차이를 잘 구분하여 상황에 따라 맞는 operator를 사용하면 됩니다.
collectLatest는 현재버전(v1.3.2)에서는 experimental api 입니다.
Composing multiple flows
flow에서 zip operator는 Kotlin 기본 라이브러리인 Sequence.zip extension function과 동일하게 두개의 flow를 병합하는 작업을 제공합니다.
fun main() = runBlocking {
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
}
결과는 아래와 같습니다.
1 -> one
2 -> two
3 -> three
zip의 경우 두개의 flow가 개수가 다르다면 적은 개수에 맞춰서 출력됩니다.
즉, 두개의 flow중 하나는 원소가 4개, 하나는 원소가 10개라면 출력값은 총 네개 입니다.
앞쪽 원소부터 하나씩 꺼내서 병합합니다.
zip은 현재버전(v1.3.2)에서는 experimental api 입니다.
Combine
flow가 conflation처럼 최신값이나 최종 연산값만을 사용하는 형태라면, 현재 flow에서 최신값만을 기준으로 연산하는 작업을 수행하도록 할 수 도 있습니다.
즉 두개의 flow에서 값을 emit 하고 서로 다른 타이밍으로 방출될때, 최신값만을 기준으로 두개의 방출값을 연산하도록 할수 있습니다.
말이 너무 어렵네요..
예제를 통하면 쉽게 이해할 수 있습니다.
먼저 zip을 이용해서 예제를 만들어 봅니다.
fun main(args: Array<String>) = runBlocking {
println("main start!")
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
println("main end!")
}
두개의 flow가 각각 300ms / 400ms 으로 값을 방출합니다.zip을 사용할 경우 아래 결과 처럼 두개의 최대 공배수인 400ms 단위로 값을 병합하여 출력합니다.
결국, 느린놈에 맞추는거죠.
main start!
1 -> one at 426 ms from start
2 -> two at 831 ms from start
3 -> three at 1234 ms from start
main end!
결과를 보면 약 400ms 간격으로 로그가 출력되는걸 알수 있습니다.
※ 위 예제에서는 onEach()를 사용하여 보다 명확하고 짧게 표현했습니다.
만약 같은 코드에 combine을 사용하면 어떻게 될까요?
fun main() = runBlocking {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
똑같은 코드로 zip 대신 combine을 사용했습니다.
결과는 아래와 같습니다.
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
일단 출력된 결과의 개수가 다릅니다.
이는 각각의 flow가 본인이 방출하는 시점에 맞춰 최신값으로 merge하기 때문입니다.
먼저 nums가 300ms 이후에 "1"을 emit하였으나, strs에는 아지 emit된 값이 없기 때문에 combine 되지 못합니다.
그리고 400ms을 기다렸다가 str이 1을 emit하는 시점에 맞춰 "1 -> one"을 combine 합니다.
그리고 nums 기준으로 300ms 이후에 "2"를 emit하고 strs의 최신값인 one과 병합하여 "2 -> one" 을 combine 합니다.
그 다음 strs 기준으로 800ms가 지나면서 "two"가 emit 되고 nums의 최신값인 "2"와 병합하여 "2->two"가 combine 됩니다.
combine의 동작을 정리하면 아래와 같습니다.
- 두개의 flow을 병합한다.
- 병합은 두개의 값이 모두 존재해야만 가능하다. (둘중 하나라도 값이 없는 상태에서 combine이 수행되는 경우 해당 원소는 병합되지 못하고 생략된다.)
- 두개의 flow가 각각 emit하는 시점에 각 flow의 최신 값으로 병합된다.
- flow 두개의 출력의 타이밍에 따라 병합된 개수가 결정되므로 각 flow의 원소 개수와 combine의 출력 개수와는 pair가 맞지 않는다.
좀더 명확한 이해를 돕기위해 하나의 예제를 더 추가합니다.
명확하게 combine을 이해했다면 아래의 예제의 결과 또한 충분히 예측 가능합니다.
fun main(args: Array) = runBlocking {
println("main start!")
val nums = (1..3).asFlow().onEach { delay(100) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three", "four").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
println("main end!")
}
결과...
main start!
3 -> one at 431 ms from start
3 -> two at 834 ms from start
3 -> three at 1234 ms from start
3 -> four at 1637 ms from start
main end!
combine은 현재버전(v1.3.2)에서는 experimental api 입니다.
Flattening flows
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
(1..3).asFlow().map { requestFlow(it) } // map의 반환값은 Flow<Flow<String>> 임.
기본적으로 collection이나 sequence에서 중첩된 상태를 flat하기 만들기위해 flatten이나 flatMap 함수를 지원합니다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
예제에서 100초 간격으로 1~3까지 flow가 emit 됩니다.
이때 requestFlow()를 또 수행하므로 출력형태는 Flow<Flow<String>>이 되어 flatMapConcat으로 한겹을 벗겨 줍니다.
그리고 나서 collect를 통해 emit된 값을 받아서 출력합니다.
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
만약 flatMapContact 대신 map을 사용했다면 어떻게 될까요?
kotlinx.coroutines.flow.SafeFlow@7e0b37bc at 112 ms from start
kotlinx.coroutines.flow.SafeFlow@3b95a09c at 217 ms from start
kotlinx.coroutines.flow.SafeFlow@6ae40994 at 320 ms from start
이런게 외부 flow만 emit되고 Flow<String> 내부의 코드는 수행되지 못합니다. (Flow<String>의 객체 hashcode가 찍힙니다.)
flatMapContact는 현재버전(v1.3.2)에서는 experimental api 입니다.
flatMapMerge
또다른 flattening mode로는 동시에 emit 가능한 값들을 emit 시키고 들어오는 모든 값들을 하나의 flow로 병합하여 collect할 수 도 있습니다.
flatMapMerge와 flattenMerge operator가 이러한 역할을 수행하며 parameter로 concurrency 값을 넘겨줄 수 있습니다.
(concurrency 값은 동시에 받을 수 있는 flow의 개수를 제한하는 값으로 기본값은 DEFAULT_CONCURRENCY 입니다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
위와 동일한 예제로 flatMapConcat 대신에 flatMapMerge를 사용했습니다.
결과는 아래와 같습니다
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
외부 flow는 외부 flow대로 수행되고, 내부적으로 발생하는 flow역시 동시에 수행되면서 하나의 flow로 병합되어 collect 됩니다.
따라서 flatMapContact처럼 순서를 보장하지 않고 외부, 내부의 flow가 각각 수행되는 형태를 취합니다.
flatMapMerge는 현재버전(v1.3.2)에서는 experimental api 입니다.
flatMapLatest
flatMapLatest는 collectLatest와 유사하게 flow에서 emit 발생시 이전에 대기중이거나 동작중인 flow는 cancel 시킵니다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
다른 예제와 동일하게 flatMapLatest부분만 변경했습니다.
결과는 아래와 같습니다.
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
requestFlow() 함수 내부의 Second는 500ms 대기중에 외부 flow의 emit으로 인하여 cancel되며, 외부 flow의 emit이 완료되고 난뒤 마지막 second만 출력됩니다.
그럼 내부의 second가 emit되면 외부의 flow를 cancel 시킬까요?
만약 그렇다면 문제가 되겠죠?
하지만 의심이 생길수 있으니 아래와 requestFlow()의 delay를 50ms로 변경한후 돌려 봅니다.
fun main(args: Array) = runBlocking {
...
}
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(50) // wait 50 ms
emit("$i: Second")
}
결과는 아래와 같습니다.
1: First at 136 ms from start
1: Second at 186 ms from start
2: First at 237 ms from start
2: Second at 290 ms from start
3: First at 338 ms from start
3: Second at 389 ms from start
외부의 flow동작보다 내부 flow의 동작이 빠르기 때문에 순서대로 전부 emit 되어 출력됩니다.
또한 xxxLatest operator의 경우 위에서 언급한것 처럼 새로운 flow가 emit되면 이전 flow는 cancel 시킵니다.
이때 cancel되는 부분은 { requestFlow(it) } 입니다. ( {...} 블럭 내부를 전부 cancel 시킵니다.
따라서 아래와 같이 try catch로 묶는다면 cancel에 대한 처리도 진행할 수 있습니다.
fun main(args: Array) = runBlocking {
...
}
fun requestFlow(i: Int): Flow<String> = flow {
try {
emit("$i: First")
delay(50) // wait 500 ms
emit("$i: Second")
} catch (ce: CancellationException) {
println("cancelled!!")
}
}
위에서 본것과 같은 예제로, 두번째 "Second"는 맨 마지막번째를 제외하고는 전부 취소처리 되므로 아래와 같은 결과가 나옵니다.
1: First at 137 ms from start
cancelled!!
2: First at 245 ms from start
cancelled!!
3: First at 347 ms from start
3: Second at 851 ms from start
보통 내부의 flow 동작은 그 자체로도 매우 빠르기며, 내부적으로 suspending code가 없는 경우 cancel을 할수 없습니다.
하지만 예제에서는 requestFlow() 내부에 delay를 명시적으로 주었기 때문에 cancel됨을 명시적으로 표현했습니다.
flatMapLatest는 현재버전(v1.3.2)에서는 experimental api 입니다.
Flow exceptions
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
가장 간단하게 collect를 외부에서 try-catch로 감싸면서 exception을 처리할 수 있습니다.
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
everything is caught
위 예제에서 try-catch문은 collect를 감싸고 있기 때문에 emiiter에서 발생한 exception 뿐만 아니라 중간 operator에서 발생한 exception, terminal operator에서 발생한 exception 모두를 처리합니다.
아래 예제는 emit된 값을 한번더 string으로 mapping하는 코드로 변경하고 여전히 exception을 발생 시키도록 합니다.
fun foo(): Flow<Int> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
위 코드 역시 collect를 감싼 try-catch문에서 처리되면 collection은 stop 됩니다.
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
Exception transparency
- throw를 이용하여 reThrown 할 수 있습니다.
- catch의 body 내부에 emit 함수를 이용하여 값을 emission 할 수 있습니다.
- 무시하거나, 로그로 남기거나 다른 처리를 하는등의 코드가 삽입될 수 있습니다.
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking {
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
}
결과는 위 예제와 동일 합니다.fun main(args: Array<String>) = runBlocking {
foo().catch { e ->
when (e) {
is IllegalStateException -> emit("Illeagl Ex. Caught $e")
is NullPointerException -> emit("NPE Ex. Caught $e")
else -> emit("Unknown exception")
} // emit on exception
}.collect { value -> println(value) }
}
fun foo(): Flow<String> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}.map { value ->
// check(value <= 1) { "Crashed on $value" }
if (value == 2) {
throw java.lang.NullPointerException("Make NPE")
}
"string $value"
}
강제로 NPE를 발생 시켰으며, catch{...} body에 따라 NPE 관련 문구가 emit되어 처리 됩니다.
Transparent catch
catch는 downstream에서 발생한 에러만 처리 가능합니다.
따라서 아래와 같이 collect에서 발생한 에러는 catch 할 수 없습니다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
collect에 check문이 있으니, 2를 collect 할때 exception이 발생하면서 코드가 죽습니다.
하지만 catch에 구현에 놓은 "Caught..." 문구는 출력되지 않습니다.
Catching declaratively
위와 같이 collect 내부의 코드는 catch로 exception을 처리할 수 없습니다.
따라서 모든 에러를 catch로 handling 하기 위해서는 collect의 body를 onEach로 옴기고 그 이후에 catch를 연결하면 전체 구문의 외부에 try-catch를 하지 않아도 동일한 처리를 할 수 있습니다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking {
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}
onEach에 collect의 body를 옴겨서 처리구문을 emitter의 일부로 변경했습니다.
그리고 나서 catch를 처리하면 "Caught..." 가 출력되면서 정상적으로 동작함을 알 수 있습니다.
또한 Flow을 시작 시키기 위하여 param 없는 collect()를 호출합니다.
catch는 현재버전(v1.3.2)에서는 experimental api 입니다.
Flow completion
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
결과는 아래와 같습니다.
1
2
3
Done
사실 try-finally 구문 없이 collect 다음에 println("Donn") 을 해도 똑같은 값이 출력됩니다.
이전 #1번 글에서 언급 했듯이 flow가 complete 할때까지는 collect라인을 넘어가지 못하기 때문에 flow의 처리가 완료되어야 다음 라인을 실행합니다.
(하지만 collect 라인에 걸려있는 동안 thread 자체는 block하지는 않는다고도 언급했었죠?)
Declarative handling
명시적으로 flow 종료후 작업을 선언하려면 intermediate operator인 onCompletion을 추가하면 됩니다.
이전 예제를 onCompletion을 사용하여 바꿔 보겠습니다.
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking {
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
결과는 동일합니다.사실 onCompetion의 가장 큰 장점은 람다식에서 nullable한 Throwable을 param으로 넘겨주기 때문에 collect의 완료가 정상적으로 되었는지, exception 발생 때문이었는지를 판단할수 있도록 해줍니다.
fun main(args: Array<String>) = runBlocking {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally: $cause") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
println("done")
}
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
emit(1)
throw RuntimeException()
}
}
결과는 아래와 같습니다.onCompletion은 exception을 handling 하지 않고, 아래 방향으로 그대로 전달합니다.
따라서 그 이후의 catch에서 exception을 처리하게 됩니다.
Upstream exceptions only
catch operator와 동일하게 onCompletion 역시 위에서 내려오는(upstream) exception은 확인할 수 있으나 아래쪽(downstream)의 exception은 확인하지 않습니다.
fun foo(): Flow = (1..3).asFlow()
fun main() = runBlocking {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
결과는 아래와 같습니다.
1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2
Imperative versus declarative
Launching flow
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
결과는 아래와 같습니다.collect() 함수를 만나면 모든 작업이 끝날때 까지 대기합니다.
따라서 Done이 마지막에 출력됩니다.
하지만 우리가 eventListener를 등록하는 이유는 코드는 계속 진행하되, 특정 이벤트가 발생시에만 등록된 코드를 동작시키기 위합니다.
이때 collect 대신 launchIn을 사용하여 분리된 coroutine으로 시작하고 이후 코드는 바로 실행되도록 할 수 있습니다.
// Imitate a flow of events
fun events(): Flow = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
결과는 아래와 같습니다.Done이 바로 찍히고 flow는 새로운 launch에서 수행됩니다.
이때 launchIn의 parameter로 coroutineScope을 명시적으로 넣어줘야 합니다.
주어진 scope에서 flow가 launch 되며, 예제에서는 runBlocking builder의 this가 들어 갔으니, coroutine의 structured concurrency에 의해서 flow가 완료되어야만 runBlocking의 블럭이 종료됩니다.
보통 어떤 lifetime을 갖는 entity로 부터 scope을 받아서 사용하며, lifetime이 종료되면 scope이 cancel되면서 이 scope에 속해있던 flow의 동작도 cancelling 됩니다.
onEach{..}.launchIn(this) 구문은 마치 addEventListener 처럼 동작하지만 coroutine의 structured concurrency에 의해서 따로 removeEventListener를 불러줄 필요가 없다는 장점을 가집니다.
추가로 launchIn은 job을 return 합니다.
따라서 필요에 따라 scope의 취소 없이 flow만을 취소할 수도 있고, join을 이용하여 완료될때 까지 특정 시점에 대기 시킬수도 있습니다.
(아마도 launchIn의 내부 구현은 this.launch { flow {...}..}의 형태로 되어 있을꺼라고 예상되네요~)
Flow and Reactive streams
- Reactive Streams: kotlinx-coroutines-reactive
- Project Reactor: kotlinx-coroutines-reactor
- RxJava2: kotlinx-coroutines-rx2
'개발이야기 > Kotlin' 카테고리의 다른 글
[RxKotlin] Reactive 코틀린 #2 - Observable, ConnectableObservable (0) | 2019.12.04 |
---|---|
[RxKotlin] Reactive 코틀린 #1 - 개념 및 설치 (0) | 2019.12.04 |
[Kotlin] 코틀린 - 코루틴#10- Asynchronous Flow(1/2) (0) | 2019.11.04 |
IntelliJ에서 maven kotlin 프로젝트 만들기 (0) | 2019.09.18 |
[Kotlin] 코틀린 let을 null check으로 쓰지 마세요~ (좀더 스마트한 let 사용법) (8) | 2019.07.19 |