본문으로 바로가기
반응형


이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.

https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

또한 예제에서 로그 print시 println과 안드로이드의 Log.e()를 혼용합니다.


Select expression (experimental)

여러개의 suspending function을 동시에 대기하고 사용 가능한 상태인 경우에는 선택이 되도록 할 수 있는 API 입니다.

하지만 Experimental 기능으로 추후 대거 변경될 가능성이 있으니 개념만 이해하시면 됩니다.


(나중에 api명 자체가 바뀔수도 있으니까요..)


Selecting from channels

channel의 값을 읽으려면 receive를 사용합니다.

만약 여러개의 채널을 전달받고 receive 가능한 채널의 값을 뽑아내야 한다면 기존 receive() api만을 사용해서는 구현하기가 어렵습니다.

channel의 특성상 send-receive는 pair이기 때문에 여러개의 채널중 어떤 채널이 먼저 receive가 가능해 질지를 예상하기 어렵기 때문입니다.


따라서 여러개의 channel의 값을 수신하면서 먼저 receive 가능한  채널의 값을 뽑아낼때 select를 사용하고 채널의 값을 얻는건 onReceive()를 이용합니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

fun CoroutineScope.fizz() = produce<String> {
    while (true) { // sends "Fizz" every 300 ms
        delay(300)
        send("Fizz")
    }
}

fun CoroutineScope.buzz() = produce<String> {
    while (true) { // sends "Buzz!" every 500 ms
        delay(500)
        send("Buzz!")
    }
}

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { //  means that this select expression does not produce any result 
        fizz.onReceive { value ->  // this is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // this is the second select clause
            println("buzz -> '$value'")
        }
    }
}

fun main() = runBlocking {
    val fizz = fizz()
    val buzz = buzz()
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
    coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}


fizz -> 'Fizz'

buzz -> 'Buzz!'

fizz -> 'Fizz'

fizz -> 'Fizz'

buzz -> 'Buzz!'

fizz -> 'Fizz'

buzz -> 'Buzz!'


select문 내부에서는 onReceive()를 사용할 수 있습니다.

위 예제에서 fizz()는 0.3초 간격으로 buzz()는 0.5초 간격으로 채널에 값을 보냅니다.

selectFizzBuzz()에서는 두개의 channel을 받아 select 구문으로 처리합니다.


따라서 두 채널의 값을 기다리고 있다가 receive 가능 상태가 되면 해당 값을 읽어 처리합니다.


Selecting on close

produce는 내부 블럭 {...}의 수행이 (send가)끝나고, channel의 값을 전부 receive로 받아가면 채널을 close 합니다.

close된 채널에 onReceive를 호출하면 exception이 발생합니다.

따라서 close된 채널에 접속하는 상황이 벌어지는 경우 onReceiveOrNull을 사용해야 합니다.

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
    select<String> {
        a.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'a' is closed" 
            else 
                "a -> '$value'"
        }
        b.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'b' is closed"
            else    
                "b -> '$value'"
        }
    }
    
fun main() = runBlocking {
    val a = produce<String> {
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String> {
        repeat(4) { send("World $it") }
    }
    repeat(8) { // print first eight results
        println(selectAorB(a, b))
    }
    coroutineContext.cancelChildren() 
}    


a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed

위 코드에서 a,b에서 각각 채널에 네개씩을 보내고, selectAorB()를 8번 수행하여 값을 받아 옵니다.
결과는 8개지만 로그가 좀 이상합니다.

위 코드를 이해하려며 select의 내부적인 정책에 대해서 이해하고 있어야 합니다.
  • 여러개의 channel을 select로 처리하는 경우 동시에 값이 send되어 동시에 onReceive(또는 onReceiveOrNull)가 호출될 수 있다. 이런 경우 먼저 정의된 구문만 실행된다.
  • Channel이 close된 경우 onReceiveOrNull을 사용하면 null이 즉시 반환된다.
위 코드에서 a channel에 값 네개를 먼저 읽었고, 중간중간 b channel의 값을 꺼내와 읽었습니다.
다만 produce 구문은 send를 네번 하고 나면 종료되기 때문에 블럭이 종료되면서 자신의 ReceiveChannel을 close 시킵니다.

두번째 정의에 따라서 close된 channel에서 onReceiveOrNull을 호출하면 null을 즉시 반환합니다.
따라서 b 채널에 남은 두개의 값이 send 되겠지만 b channel에서 값을 읽어갈 수 있는 상태이면서 a 구문도 값을 읽어갈 수 있는 상태가 됩니다.
두개의 채널이 경합하는 경우 a 구문이 select문에 먼저 명시 되어 있기 때문에 a값을 먼저 읽어 갑니다.

그래서 위 로직은 selectAorB()의 횟수를 늘리더라도 먼저 종료되는 한쪽만 자신의 로그를 다찍고, 다른 채널은 항상 자신의 로그를 다 못찍는 상황이 발생 합니다.

추가적으로 위 코드에서 onReceiveOrNull 대신에 onReceive를 호출하면 exception이 발생하면서 process가 죽습니다.
이는 close된 채널에 onReceive를 하면 exception을 발생시키기 때문입니다.


Selecting to send

위에서는 select문 내부에서 onReceive를 이용하여 available한 channel을 선택적으로 처리하는 방법을 설명했습니다.

이와는 반대로 available한 channel로 send를 할 수 있는 onSend()에 대해서 알아 봅니다.

물론 onSend()는 select문 내부에서 사용이 가능합니다.

fun CoroutineScope.produceNumbers(secondCh: SendChannel<Int>) = produce<Int> {
    for (num in 1..10) { // produce 10 numbers from 1 to 10
        delay(100) // every 100 ms
        select<Unit> {
            onSend(num) {} // Send to the primary channel
            secondCh.onSend(num) {} // or to the side channel     
        }
    }
}

fun main() = runBlocking<Unit> {
    val secondCh = Channel<Int>() // allocate side channel
    launch { // this is a very fast consumer for the side channel
        secondCh.consumeEach { println("Side channel has $it") }
    }
    produceNumbers(side).consumeEach { 
        println("Consuming $it")
        delay(250) // let us digest the consumed number properly, do not hurry
    }
    println("Done consuming")
    coroutineContext.cancelChildren()  
}


먼저 produceNumbers(side: SendChannel)이라는 함수를 정의 합니다.

이 함수는 produce 내부에서 select 문을 이용하여 값을 send합니다.

단 이때 produce 자체 channel과 param으로 넘겨받은 second channel에 onSend로 값을 보냅니다.

onSend를 사용하면 select내에 정의된 channel중 send가 가능한(available) channel에 값을 보내게 됩니다.


결과는 아래와 같습니다.

Consuming 1

Side channel has 2

Side channel has 3

Consuming 4

Side channel has 5

Side channel has 6

Consuming 7

Side channel has 8

Side channel has 9

Consuming 10

Done consuming


결과로 미루어 보건데 available한 channel이 여러개일 경우 onSend() 역시 select문에 먼저 선언된 onSend()가 우선순위가 높습니다.


Selecting deferred values

Deferred value를 갖는 여러개의 코루틴이 존재하는 경우 select문 내부에서 onAwait()를 통하여 가장 먼저 값이 반환되는 하나를 선택할 수 있습니다.

fun CoroutineScope.asyncString(time: Int) = async {
    delay(time.toLong())
    "Waited for $time ms"
}

fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) { asyncString(random.nextInt(1000)) }
}

fun main() = runBlocking<Unit> {
    val list = asyncStringsList()
    val result = select<String> {
        list.withIndex().forEach { (index, deferred) ->
            deferred.onAwait { answer ->
                "Deferred $index produced answer '$answer'"
            }
        }
    }
    println(result)
    val countActive = list.count { it.isActive }
    println("$countActive coroutines are still active")
}

asyncString() 함수는 넘겨받은 시간만큼 delay를 하고 string을 반환합니다.

또한 asyncStringList() 함수는 랜덤으로 delay를 만들고 List에 asyncString() 함수를 (수행하고) 넣습니다.


main() 함수에서 list를 iteration하면서 전부 onAwait()를 걸어줍니다.

이때 가장 짧은 delay를 가진 asyncString() 함수가 하나만 선택되어 값을 반환하고 select문의 벗어 납니다.

Deferred 4 produced answer 'Waited for 128 ms'

11 coroutines are still active


또한 이때 남은 coroutine들은 아직 acitive하므로 나중에 onAwait()를 이용하여 값을 받아오거나, cancel 시키면 됩니다.

이건 마치 java의 completionService와 비슷하네요.


추가적으로 select문 안에서 정해진 형식이 아니라 임의의 코드를 만들어 넣었습니다.

(onAwait()를 지정된 형식 아닌, list의 forEach를 사용하여 호출함.)

이는 select문이 kotlin의 DSL로 만들어졌기 때문입니다.


Switch over a channel of deferred values

select 문 내부에 같은 api만 사용해야 하는건 아닙니다.
위에선 언급했던 api들을 혼용해서 사용해도 되는데, 이를 테스트 하기 위한 예제를 만들어 봅니다.

produce 함수를 만들고 deferred를 전달받는 채널을 param으로 넘겨 받습니다.
채널에서 수신된 deferred 값은 채널이 닫히거나 channel에 다음 값을 전달받기 전까지만 유효합니다.

따라서 작업이 오래 걸리는 deferred를 대기중에 다른값이 채널로 send되면 이전값은 처리는 버려집니다.
물론 deferred 대기중에 채널이 닫혀도 값은 버려집니다.

fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
    var current = input.receive() // start with first received deferred value
    while (isActive) { // loop while not cancelled/closed
        val next = select<Deferred<String>> { // return next deferred value from this select or null
            input.onReceiveOrNull { update ->
                update // replaces next value to wait
            }
            current.onAwait { value ->
                send(value) // send value that current deferred has produced
                input.receiveOrNull() // and use the next deferred from the input channel
            }
        }
        if (next == null) {
            println("Channel was closed")
            break // out of loop
        } else {
            current = next
        }
    }
}

fun CoroutineScope.asyncString(str: String, time: Long) = async {
    delay(time)
    str
}

fun main() = runBlocking<Unit> {
    val chan = Channel<Deferred<String>>() // the channel for test
    launch { // launch printing coroutine
        for (s in switchMapDeferreds(chan)) 
            println(s) // print each received string
    }
    chan.send(asyncString("BEGIN", 100))
    delay(200) // enough time for "BEGIN" to be produced
    chan.send(asyncString("Slow", 500))
    delay(100) // not enough time to produce slow
    chan.send(asyncString("Replace", 100))
    delay(500) // give it time before the last one
    chan.send(asyncString("END", 500))
    delay(1000) // give it time to process
    chan.close() // close the channel ... 
    delay(500) // and wait some time to let it finish
}


BEGIN
Replace
END
Channel was closed

Slow는 처리가 되기전에 다음값이 channel로 전달되어 처리되지 않았습니다.


정리

이로써 코틀린의 코루틴에 대한 포스팅을 마칩니다.
Experimental인 항목들은 추후 api 이름이나 사용 형식에 대한 변경이 있을수는 있다 하더라도, 개념자체가 크게 변경될것 같지는 않습니다.
물론 기존 코드로 구현할 수 없지는 않으나 select나 channel을 사용하면 좀더 간단하게 원하는 코드를 작성할 수 있기 때문에 굳이 많들어진 기능을 버전이 올라간다고 없어지진 않을것 같네요.


반응형