본문으로 바로가기

[Kotlin] 코틀린 - 코루틴#7 - Channels

category 개발이야기/Kotlin 2018. 12. 21. 11:56
반응형


이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.

https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

또한 예제에서 로그 print시 println과 안드로이드의 Log.e()를 혼용합니다.

Channels

Channels은 현재 (2018.12.17) experimental 기능이기 때문에 추후 API의 대대적인 변화가 있을수도 있습니다.
따라서 직접 사용하기 보다는 API와 컨셉을 확인하는 용도를 추천드립니다.
괜히 코드에 넣어다가 kotlin 버전올라가고 다 바꿔야 하는 낭패를 볼 수도 있습니다....

2019.10.28 현재 Experimental이 제거되었습니다. (정식사용 가능함)
정확하게 언제버전부터인지는 확인하지 않았으나 현재 최신 버전인 1.3.2에서는 사용이 가능합니다.


Deffered는 하나의 값을 반환합니다만 Channelstream을 반환합니다.

Channel은 BlockingQueue와 유사하게 동작합니다.
  • BlockingQueue의 put -> Channel의 send
  • BlockingQueue의 take -> Channel의 receive
따라서 동시성이 필요한 여러 coroutine에서 순서를 보장 받으면서 공유하여 사용할 수 있습니다.
fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

1

4

9

16

25

Done!


Closing and iteration over channels

queue와 다르게 channel은 더 이상 사용하지 않을때 close 시킬수가 있습니다.

channel의 반환값은 stream이기 때문에 이렇게 close를 시킬경우 반환값을 받아 사용하는 for-loop에서 채널 사용이 끝났음을 인지 할 수 있습니다.


close는 특별한 token의 형태로 이 값 역시 channel에 들어갑니다.

따라서 iterator들은 이 값을 만나면(receive) iteration을 멈춥니다.

이런 형태이기 때문에 close를 호출하더라도 그 이전에 넣어놓은 값들은 받아올 수 있음을 보장합니다.

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}


Building channel producers

coroutine으로 흔하게 사용하는 형태중 하나가 sequential한 데이터를 생산하는쪽과 소비하는쪽을 구현하는 producer-consumer 패턴 입니다.
생산하는 형태를 쉽게 구현하도록 제공하는 coroutine builder로 produce가 존재하고 이를 소비하는쪽에서 사용하는 extension function의 형태로 consumeEach가 있습니다.(consumeEach는 for loop으로 대체 가능합니다.)

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

produce를 CoroutienScope의 확장함수로 만들어 빼냈습니다.


만약 확장 함수로 만들고 싶지 않다면 아래와 같이 수정해야 합니다.

fun produceSquares2(coroutineScope: CoroutineScope): ReceiveChannel<Int> = 
coroutineScope.produce(Dispatchers.Default) {
    for(x in 1..5) send(x*x)
}


또한 Procude를 다른 thread에서 수행하고 싶다면 아래와 같이 Dispatcher를 지정해 줄수도 있습니다.

fun CoroutineScope.produceSquares(): ReceiveChannel = produce(Dispatchers.Default) {
    printThreadName()
    for(x in 1..5) send(x*x)
}


Pipelines

infinite한 sequence를 만들어 내고 동시에 이를 소비하는 패턴을 pipelining으로 만들어낼 수 있습니다.

fun main() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce<Int> {
    for (x in numbers) send(x * x)
}

produceNumbers()에서 무한한 수를 만들어내고, square()에서는 넘겨받은 channel에서 값을 뽑아내어 제곱합니다.

produce를 사용하기 때문에 함수 실행이 끝날때까지 block되는게 아니라 결과값이 생성될때마다 async하게 channel로 보내지며 이런 값들이 for문에서 5개를 receive()할때까지 대기합니다.


여기서 produceNumbers()와 square()을 CoroutineScope의 확장함수로 만들면, 함수로 분리 하면서도 하나의 scope안에서 실행되는 동시성을 같는 코드로 관리될수 있습니다.

(나중에 scope을 취소시켜서 한번에 취소하거나, 중지하기에 편리하겠죠?)


만약에 extension function으로 만드는게 부담스럽다면 아래와 같이 scope을 인자로 받아와서 수행하도록 합니다.

suspend fun produceSquares(scope: CoroutineScope): ReceiveChannel<Int> = scope.produce<Int> {
        for (x in 1..5) send(x * x)
    }

Prime numbers with pipeline

코루틴을 이용해서 무한히 수를 증가시키고 그 안에서 소수를 찾는걸 pipelining으로 구성해 봅니다.

fun main() = runBlocking {
    var cur = numbersFrom(2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}



2

3

5

7

11

13

17

19

23

29


생성된 숫자들을 filtering하여 prime number를 골라 냅니다.

위 함수는 main thread에서 수행되기 때문에 cancelChildren을 통해서 10개만 골라내고 중지 합니다.


예제의 이해가 쉽지 않기 때문에 아래와 같이 로그를 찍도록 변경해 봅니다.

fun main(arge:Array) = runBlocking {
    var cur = numbersFrom(2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime, i)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

fun CoroutineScope.numbersFrom(start: Int) = produce {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel, prime: Int, sequence: Int): ReceiveChannel = produce {
    for (x in numbers) {
        println("Sequence#$sequence, number: $x, prime:$prime")
        if (x % prime != 0) {
            send(x)
        }
    }
}




또한 위 구현은 coroutine이 아닌 아래 일반 함수로 대체하여 구현할 수도 있습니다.

  • produce -> buildIterator
  • send -> yield
  • receive -> next
  • receiveChannel -> Iterator
buildIterator를 사용할 경우 runBlocking을 사용할 필요가 없습니다
다만 coroutine으로 사용할 경우 Dispatcher.Defalut를 이용하면 multiple CPU core를 이용해서 좀더 효율적으로 실행할 수 있습니다.


(개인적인 생각으로는 이 구현은 range 함수로 숫자를 생성하고 sequence를 써도 비슷하게 가능할것 같네요.)

Fan-out

여러개의 coroutine이 하나의 channel에의 값을 receive 할 수 있습니다.
이러면 일을 여러개의 coroutine에 나눠서 시킬 수 있습니다.
fun main() = runBlocking {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}


Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

producer를 취소하면 해당 channel은 닫힙니다.
따라서 launch 시켰던 for문은 종료 됩니다.

추가적으로 여러개의 코루틴이 채널의 data를 소비하는 경우 for-loop을 쓰는것이 안전합니다.
for loop을 사용할 경우 코루틴중 하나가 실패하더라도 나머지 코루틴이 channel의 데이터를 처리 합니다.
하지만 comsumeEach를 사용한다면 코루틴중 하나라도 실패하는 경우 channel이 닫히면서 다른 코루틴까지 수행이 끝납니다.
fun test3_9() = runBlocking {
       ... //위 코드와 동일
    }

    fun CoroutineScope.produceNumbers() = produce<Int> {
        ...//위 코드와 동일
    }

    fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
        try {
            channel.consumeEach {
                if (id == 3) {
                    throw IllegalArgumentException()
                }
                Log.e(TAG, "Processor #$id received $it")
            }
        } catch (e: Exception) {
            Log.e(TAG, "3 exception")
        }
    }

Processor #0 received 1 

Processor #0 received 2 

Processor #1 received 3 

Processor #2 received 4 

3 exception 


위 코드는 consumeEach를 사용하는 경우 id가 3인 coroutine에 강제로 exception을 발생시켰습니다.

이럴경우 중간에 channel이 취소되면서 다른 코루틴도 중지 됩니다.

만약 for-loop중에 exception이 발생했다면 해당 coroutien에서 처리중이였던 channel의 값만 처리가 불가되면서 종료되고, 다른 coroutine들이 channel의 남은 부분을 처리해 줍니다.


Fan-in

fan-out과 반대로 하나의 channel에 여러개의 coroutine이 send를 할수도 있습니다.
fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}


foo

foo

BAR!

foo

foo

BAR!


Buffered channels

Channel은 버퍼가 없습니다. 따라서 channel에 send와 receive는 항상 짝을 이뤄야 합니다.
send가 먼저 발생하면 channel은 receive가 발생될때까지 suspend 됩니다.

반대로 receive가 먼저 발생하면 channel은 send가 올때까지 suspend 합니다.

Channel()과 produce 모두 buffer size를 지정할수 있는 capacity parameter는 optional로 제공합니다.
fun main() = runBlocking {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
}


Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

0~3까지는 buffer에 쌓이고 4번을 send했을때 blocking 됩니다.
만약 버퍼없이 채널을 생성했다면 sending 0, 1 까지만 찍혔겠죠?


Channels are fair

여러 coroutine에서 하나의 채널에 send와 receive를 하는경우, 이를 요청한 순서는 보장됩니다.
Channel은 FIFO로 동작하므로 먼저 receive를 요청한 코루틴이 해당 원소를 받아갑니다.

이건 예제를 보면서 이해하는게 빠릅니다.
data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}


"ping"과 "pong"이 공통 채널인 table에 send를 호출합니다.

ping이 먼저 launch 되므로 palyer를 호출하고, pong 역시 다음에 launch 됩니다.

ping Ball(hits=1)

pong Ball(hits=2)

ping Ball(hits=3)

pong Ball(hits=4)

하지만 명시적으로 위와같이 delay를 주지 않는다면 순서가 엉킬수도 있습니다.

(두개의 launch가 모두 Main thread에서 수행되므로 명시적인 delay가 없다면 cpu 스케줄링에 따라 일정시간동안 한쪽만 불릴 가능성이 있습니다.)

이슈내용: https://github.com/Kotlin/kotlinx.coroutines/issues/111

Ticker channels

Ticker 채널은 특별한 랑데뷰 채널로, 주어진 시간(delay)마다 Unit을 channel로 send 합니다.

사실 이 채널은 혼자서 쓰일 일은 거의 없습니다만 특정 시간 단위로 produce를 해야하는 pipeline이나 operations을 동작하는데 응용 될 수 있습니다.

fun main() = runBlocking {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}


위 코드에서 dealy가 100인 주기 ticker를 만듭니다.

이 ticker는 채널에 100ms 단위로 Unit을 send 합니다.

따라서 withTimeoutOrNull로 코드에 blocking을 걸면서 channel에 있는 값을 receive 해보면 100ms 단위로 Unit이 보내져 있는걸 알 수 있습니다.


Initial element is available immediately: kotlin.Unit

Next element is not ready in 50 ms: null

Next element is ready in 100 ms: kotlin.Unit

Consumer pauses for 150ms

Next element is available immediately after large consumer delay: kotlin.Unit

Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit


또한 main code 중간에 delay가 있더라도 ticker는 상관없이 100ms 마다 Unit을 채널에 send 합니다.

코드 중간에 150ms delay가 있는 경우 receive한 시간 간격과 상관없이 Unit이 반환되는걸 알수 있습니다.

또한 mode parameter로 아래 두가지 type을 optional하게 줄수 있습니다.

  • TickerMode.FIXED_PERIOD
  • TickerMode.FIXED_DELAY

ticker는 Dispatchers.unconfined 를 사용합니다.
자세한건 ticker api를 따라가보면 알 수 있습니다.
@ObsoleteCoroutinesApi
public fun ticker(
    delayMillis: Long,
    initialDelayMillis: Long = delayMillis,
    context: CoroutineContext = EmptyCoroutineContext,
    mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel {
    require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
    require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
    return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) {
        when (mode) {
            TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel)
            TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel)
        }
    }
}


반응형