이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
또한 예제에서 로그 print시 println과 안드로이드의 Log.e()를 혼용합니다.
Channels
- BlockingQueue의 put -> Channel의 send
- BlockingQueue의 take -> Channel의 receive
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
1
4
9
16
25
Done!
Closing and iteration over channels
queue와 다르게 channel은 더 이상 사용하지 않을때 close 시킬수가 있습니다.
channel의 반환값은 stream이기 때문에 이렇게 close를 시킬경우 반환값을 받아 사용하는 for-loop에서 채널 사용이 끝났음을 인지 할 수 있습니다.
close는 특별한 token의 형태로 이 값 역시 channel에 들어갑니다.
따라서 iterator들은 이 값을 만나면(receive) iteration을 멈춥니다.
이런 형태이기 때문에 close를 호출하더라도 그 이전에 넣어놓은 값들은 받아올 수 있음을 보장합니다.
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
Building channel producers
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
produce를 CoroutienScope의 확장함수로 만들어 빼냈습니다.
만약 확장 함수로 만들고 싶지 않다면 아래와 같이 수정해야 합니다.
fun produceSquares2(coroutineScope: CoroutineScope): ReceiveChannel<Int> =
coroutineScope.produce(Dispatchers.Default) {
for(x in 1..5) send(x*x)
}
또한 Procude를 다른 thread에서 수행하고 싶다면 아래와 같이 Dispatcher를 지정해 줄수도 있습니다.
fun CoroutineScope.produceSquares(): ReceiveChannel = produce(Dispatchers.Default) {
printThreadName()
for(x in 1..5) send(x*x)
}
Pipelines
infinite한 sequence를 만들어 내고 동시에 이를 소비하는 패턴을 pipelining으로 만들어낼 수 있습니다.
fun main() = runBlocking {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce<Int> {
for (x in numbers) send(x * x)
}
produceNumbers()에서 무한한 수를 만들어내고, square()에서는 넘겨받은 channel에서 값을 뽑아내어 제곱합니다.
produce를 사용하기 때문에 함수 실행이 끝날때까지 block되는게 아니라 결과값이 생성될때마다 async하게 channel로 보내지며 이런 값들이 for문에서 5개를 receive()할때까지 대기합니다.
여기서 produceNumbers()와 square()을 CoroutineScope의 확장함수로 만들면, 함수로 분리 하면서도 하나의 scope안에서 실행되는 동시성을 같는 코드로 관리될수 있습니다.
(나중에 scope을 취소시켜서 한번에 취소하거나, 중지하기에 편리하겠죠?)
만약에 extension function으로 만드는게 부담스럽다면 아래와 같이 scope을 인자로 받아와서 수행하도록 합니다.
suspend fun produceSquares(scope: CoroutineScope): ReceiveChannel<Int> = scope.produce<Int> {
for (x in 1..5) send(x * x)
}
Prime numbers with pipeline
코루틴을 이용해서 무한히 수를 증가시키고 그 안에서 소수를 찾는걸 pipelining으로 구성해 봅니다.
fun main() = runBlocking {
var cur = numbersFrom(2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
3
5
7
11
13
17
19
23
29
생성된 숫자들을 filtering하여 prime number를 골라 냅니다.
위 함수는 main thread에서 수행되기 때문에 cancelChildren을 통해서 10개만 골라내고 중지 합니다.
예제의 이해가 쉽지 않기 때문에 아래와 같이 로그를 찍도록 변경해 봅니다.
fun main(arge:Array) = runBlocking {
var cur = numbersFrom(2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime, i)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numbersFrom(start: Int) = produce {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel, prime: Int, sequence: Int): ReceiveChannel = produce {
for (x in numbers) {
println("Sequence#$sequence, number: $x, prime:$prime")
if (x % prime != 0) {
send(x)
}
}
}
또한 위 구현은 coroutine이 아닌 아래 일반 함수로 대체하여 구현할 수도 있습니다.
- produce -> buildIterator
- send -> yield
- receive -> next
- receiveChannel -> Iterator
(개인적인 생각으로는 이 구현은 range 함수로 숫자를 생성하고 sequence를 써도 비슷하게 가능할것 같네요.)
Fan-out
fun main() = runBlocking {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
fun test3_9() = runBlocking {
... //위 코드와 동일
}
fun CoroutineScope.produceNumbers() = produce<Int> {
...//위 코드와 동일
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
try {
channel.consumeEach {
if (id == 3) {
throw IllegalArgumentException()
}
Log.e(TAG, "Processor #$id received $it")
}
} catch (e: Exception) {
Log.e(TAG, "3 exception")
}
}
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
3 exception
위 코드는 consumeEach를 사용하는 경우 id가 3인 coroutine에 강제로 exception을 발생시켰습니다.
이럴경우 중간에 channel이 취소되면서 다른 코루틴도 중지 됩니다.
만약 for-loop중에 exception이 발생했다면 해당 coroutien에서 처리중이였던 channel의 값만 처리가 불가되면서 종료되고, 다른 coroutine들이 channel의 남은 부분을 처리해 줍니다.
Fan-in
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
foo
foo
BAR!
foo
foo
BAR!
Buffered channels
fun main() = runBlocking {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
Channels are fair
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
"ping"과 "pong"이 공통 채널인 table에 send를 호출합니다.
ping이 먼저 launch 되므로 palyer를 호출하고, pong 역시 다음에 launch 됩니다.
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
하지만 명시적으로 위와같이 delay를 주지 않는다면 순서가 엉킬수도 있습니다.
(두개의 launch가 모두 Main thread에서 수행되므로 명시적인 delay가 없다면 cpu 스케줄링에 따라 일정시간동안 한쪽만 불릴 가능성이 있습니다.)
이슈내용: https://github.com/Kotlin/kotlinx.coroutines/issues/111
Ticker channels
Ticker 채널은 특별한 랑데뷰 채널로, 주어진 시간(delay)마다 Unit을 channel로 send 합니다.
사실 이 채널은 혼자서 쓰일 일은 거의 없습니다만 특정 시간 단위로 produce를 해야하는 pipeline이나 operations을 동작하는데 응용 될 수 있습니다.
fun main() = runBlocking {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
위 코드에서 dealy가 100인 주기 ticker를 만듭니다.
이 ticker는 채널에 100ms 단위로 Unit을 send 합니다.
따라서 withTimeoutOrNull로 코드에 blocking을 걸면서 channel에 있는 값을 receive 해보면 100ms 단위로 Unit이 보내져 있는걸 알 수 있습니다.
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
또한 main code 중간에 delay가 있더라도 ticker는 상관없이 100ms 마다 Unit을 채널에 send 합니다.
코드 중간에 150ms delay가 있는 경우 receive한 시간 간격과 상관없이 Unit이 반환되는걸 알수 있습니다.
또한 mode parameter로 아래 두가지 type을 optional하게 줄수 있습니다.
- TickerMode.FIXED_PERIOD
- TickerMode.FIXED_DELAY
@ObsoleteCoroutinesApi
public fun ticker(
delayMillis: Long,
initialDelayMillis: Long = delayMillis,
context: CoroutineContext = EmptyCoroutineContext,
mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) {
when (mode) {
TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel)
TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel)
}
}
}
'개발이야기 > Kotlin' 카테고리의 다른 글
[Kotlin] 코틀린 - 코루틴#9 Select (Experimental) (0) | 2019.01.06 |
---|---|
[Kotlin] 코틀린 - 코루틴#8 - 동기화 제어 (0) | 2018.12.26 |
[Kotlin] 코틀린 - 코루틴#6 - supervision (0) | 2018.12.12 |
[Kotlin] 코틀린 - 코루틴#5 - exception (0) | 2018.12.12 |
[Kotlin] 코틀린 - 코루틴#4 - context와 dispatchers (0) | 2018.12.09 |