이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
또한 예제에서 로그 print시 println과 안드로이드의 Log.e()를 혼용합니다.
Select expression (experimental)
여러개의 suspending function을 동시에 대기하고 사용 가능한 상태인 경우에는 선택이 되도록 할 수 있는 API 입니다.
하지만 Experimental 기능으로 추후 대거 변경될 가능성이 있으니 개념만 이해하시면 됩니다.
(나중에 api명 자체가 바뀔수도 있으니까요..)
Selecting from channels
channel의 값을 읽으려면 receive를 사용합니다.
만약 여러개의 채널을 전달받고 receive 가능한 채널의 값을 뽑아내야 한다면 기존 receive() api만을 사용해서는 구현하기가 어렵습니다.
channel의 특성상 send-receive는 pair이기 때문에 여러개의 채널중 어떤 채널이 먼저 receive가 가능해 질지를 예상하기 어렵기 때문입니다.
따라서 여러개의 channel의 값을 수신하면서 먼저 receive 가능한 채널의 값을 뽑아낼때 select를 사용하고 채널의 값을 얻는건 onReceive()를 이용합니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
fun CoroutineScope.fizz() = produce<String> {
while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}
fun CoroutineScope.buzz() = produce<String> {
while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
fun main() = runBlocking {
val fizz = fizz()
val buzz = buzz()
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
select문 내부에서는 onReceive()를 사용할 수 있습니다.
위 예제에서 fizz()는 0.3초 간격으로 buzz()는 0.5초 간격으로 채널에 값을 보냅니다.
selectFizzBuzz()에서는 두개의 channel을 받아 select 구문으로 처리합니다.
따라서 두 채널의 값을 기다리고 있다가 receive 가능 상태가 되면 해당 값을 읽어 처리합니다.
Selecting on close
produce는 내부 블럭 {...}의 수행이 (send가)끝나고, channel의 값을 전부 receive로 받아가면 채널을 close 합니다.
close된 채널에 onReceive를 호출하면 exception이 발생합니다.
따라서 close된 채널에 접속하는 상황이 벌어지는 경우 onReceiveOrNull을 사용해야 합니다.
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
fun main() = runBlocking {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
- 여러개의 channel을 select로 처리하는 경우 동시에 값이 send되어 동시에 onReceive(또는 onReceiveOrNull)가 호출될 수 있다. 이런 경우 먼저 정의된 구문만 실행된다.
- Channel이 close된 경우 onReceiveOrNull을 사용하면 null이 즉시 반환된다.
Selecting to send
위에서는 select문 내부에서 onReceive를 이용하여 available한 channel을 선택적으로 처리하는 방법을 설명했습니다.
이와는 반대로 available한 channel로 send를 할 수 있는 onSend()에 대해서 알아 봅니다.
물론 onSend()는 select문 내부에서 사용이 가능합니다.
fun CoroutineScope.produceNumbers(secondCh: SendChannel<Int>) = produce<Int> {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
secondCh.onSend(num) {} // or to the side channel
}
}
}
fun main() = runBlocking<Unit> {
val secondCh = Channel<Int>() // allocate side channel
launch { // this is a very fast consumer for the side channel
secondCh.consumeEach { println("Side channel has $it") }
}
produceNumbers(side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
먼저 produceNumbers(side: SendChannel)이라는 함수를 정의 합니다.
이 함수는 produce 내부에서 select 문을 이용하여 값을 send합니다.
단 이때 produce 자체 channel과 param으로 넘겨받은 second channel에 onSend로 값을 보냅니다.
onSend를 사용하면 select내에 정의된 channel중 send가 가능한(available) channel에 값을 보내게 됩니다.
결과는 아래와 같습니다.
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
결과로 미루어 보건데 available한 channel이 여러개일 경우 onSend() 역시 select문에 먼저 선언된 onSend()가 우선순위가 높습니다.
Selecting deferred values
fun CoroutineScope.asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
fun main() = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
asyncString() 함수는 넘겨받은 시간만큼 delay를 하고 string을 반환합니다.
또한 asyncStringList() 함수는 랜덤으로 delay를 만들고 List에 asyncString() 함수를 (수행하고) 넣습니다.
main() 함수에서 list를 iteration하면서 전부 onAwait()를 걸어줍니다.
이때 가장 짧은 delay를 가진 asyncString() 함수가 하나만 선택되어 값을 반환하고 select문의 벗어 납니다.
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
또한 이때 남은 coroutine들은 아직 acitive하므로 나중에 onAwait()를 이용하여 값을 받아오거나, cancel 시키면 됩니다.
이건 마치 java의 completionService와 비슷하네요.
추가적으로 select문 안에서 정해진 형식이 아니라 임의의 코드를 만들어 넣었습니다.
(onAwait()를 지정된 형식 아닌, list의 forEach를 사용하여 호출함.)
이는 select문이 kotlin의 DSL로 만들어졌기 때문입니다.
Switch over a channel of deferred values
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}
fun CoroutineScope.asyncString(str: String, time: Long) = async {
delay(time)
str
}
fun main() = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // the channel for test
launch { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}
정리
'개발이야기 > Kotlin' 카테고리의 다른 글
[Kotlin] Coroutine의 배신 (5) | 2019.05.08 |
---|---|
[Kotlin] 코루틴 Exception 추가 예제 (0) | 2019.01.23 |
[Kotlin] 코틀린 - 코루틴#8 - 동기화 제어 (0) | 2018.12.26 |
[Kotlin] 코틀린 - 코루틴#7 - Channels (0) | 2018.12.21 |
[Kotlin] 코틀린 - 코루틴#6 - supervision (0) | 2018.12.12 |