본문으로 바로가기
반응형

이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.

https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

또한 예제에서 로그 print시 println과 안드로이드의 Log.e()를 혼용합니다.

Shared mutable state and concurrency

코루틴은 Dispathcer.Default같은 dispatcher를 이용해서 multi-thread에서 동작이 가능합니다.
따라서 공유된 변경가능한 상태에 접근하는 경우 multi-thread가 접근하는 경우 문제가 발생합니다.
이를 해결하기 위한 방법은 일반적으로 multi-thread에서 공유 객체에 접근할때와 같은것들도 있고, 코루틴만의 특수한 방법들이 있습니다.

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

var counter = 0

fun main() = runBlocking {
    GlobalScope.massiveRun {
        counter++
    }
    println("Counter = $counter")
}

위 예제에서 100개의 coroutine을 띄우고 각 코루틴은 전달받은 action을 1000번 수행합니다.

그리고 main() 함수에서 GlobalScope에서 수행시켰으니, massiveRun의 내부 launch들은 scope의 context를 상속받아  Dispatcher.Default로 수행됩니다.

(이해가 안간다면 Dispatcher 관련 글을 먼저 읽고 오세요. https://tourspace.tistory.com/153)


결과는 100* 1000인 100,000이 나와야 하지만 예상한대로 아예 다른값이 나옵니다.

coutner 변수는 공통으로 접근할수 있는 변수 이면서 동기화 처리를 안했기 때문입니다.


참고로 코어가 두개 미만인 옛날 cpu를 사용하는 경우에는 정확한 결과가 나옵니다.

Dispather.Default가 ForJoinPool을 이용하기 때문에 실제로 thread는 하나만 뜨기 때문이죠.

이건 논외의 상황이니 무시하도록 합니다.


Volatiles are of no help

동기화 방법중의 하나인 volatile을 이용하는 방법도 있습니다만, 이것 역시 결과인 100,000을 보장하지 않습니다.
(기존 자바 동기화에서 일어나는 똑같은 문제 입니다.)

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... // 위와 동일
}

@Volatile // 코틀린에서는 annotation으로 volatile을 표기함.
var counter = 0

fun main() = runBlocking {
... //위와 동일
}


결과도 틀리지만 연산속도도 느려졌습니다.

여기서 잠깐 자바의 동기화 문제에 대해 집고 넘어가야 할 필요가 있기에 잠깐 volatile에 대해 설명합니다.


Volatile의 동기화

Class의 멤버변수는 heap 메모리에 존재하기 때문에 thread가 공유하여 접근할 수 있습니다.
이때 각 thread는 속도향상을 위해 main memory에 접근해서 값을 가져가는게 아니라, cache에서 변수값을 읽어 갑니다.
쓰기 또한 cache값을 이용하며, 어떤 시점에 변경된 cache값이 main memory에 업데이트 됩니다.

따라서 multi thread가 해당 변수의 값을 읽거나 쓰면 각각 자신의 cache에서 값을 읽고 쓰고, 어떤 시점에 변경된 값을 main memory에 update하기 떄문에 실제 원하는값과 달라지게 됩니다.

만약 Thread 1이 해당 변수값을 바꿨는데, Thread 2는 변경전 값을 cache하여 이 값을 기준으로 연산하여 변수의 값을 변경하게 되면 추후 실제 main memory에는 엉뚱한 값이 써있게 되는겁니다.

이런 문제를 막기위한 키워드가 volatile 입니다.
volatile 키워드의 경우 접근가능한 변수의 값을 cache를 통해 사용하지 않고 thread가 직접 main memory에 접근하여 읽고 씁니다.
하지만 volatile로도 완벽한 동기화를 할수 없으며, 아래와 같은 문제점도 동시에 안고 있습니다.

  • Cache를 이용하지 않고 main memory에 직접 access 하기 때문에 더 비싼 cost를 지불해야 한다.
  • volatile 변수는 읽기 쓰기가 JVM에 의해 reordering 되지 않는다.
    • volatile 읽기/쓰기 이후의 연산들은 반드시 읽기/쓰기 이후에 이루어 진다
    • 따라서 필요에 따라 성능상의 이유로 JVM의 instruction reorder 동작을 못하도록 막기 때문에 성능면에서 손해를 본다.
  • volatile 변수는 read시 항상 최신값을 반환한다. 단 여러 쓰레드가 동시 읽기, 쓰기를 하면 쓰기 시점을 알수없기 때문에 여전히 동기화 문제가 일어난다.
예)
1. volatile 변수에 0을 저장
2. Thread1에서 0을 읽어감(최신값)
3. Thread2에서 0을 읽어감(역시 최신값)
4. Thread1이 변수를 하나 증가 (0 -> 1)
5. Thread2역시 변수를 하나 증가( 0 ->1)
6. main memory에 기록된 값은 1. -> 실제 두번의 증가로 2가 되었어야 함.

Thread-safe data structures

Multi-thread의 연산이든, multi-thread를 이용한 코루틴의 연산이든 Thread-safe 한 data 구조를 갖으면 됩니다.
말 그대로 코루틴과 상관없이 그냥 multi-thread에서 동기화 문제를 해결하는 방법을 사용하면 됩니다.
uspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
 ... //위와 동일
}

var counter = AtomicInteger()

fun main() = runBlocking {
    GlobalScope.massiveRun {
        counter.incrementAndGet()
    }
    println("Counter = ${counter.get()}")
}


위 예제는 counter 변수를 AtomicInteger()를 이용해서 변경하고, incrementAndGet() 함수를 통해 증가 시켰습니다.

Atomic class를 사용하면 문제를 쉽게 해결할 수 있습니다.

하지만 보다 복잡한 상태를 갖거나 복잡한 연산을 갖는경우에는 Atomic가지고는 충분하지 않습니다.


Thread confinement fine-gained

UI 작업 및 상태관리는 이를 전담하는 UI thread를 통해 이루어 집니다.
이처럼 공유 자원에 접근하는 모든 부분의 thread를 single thread로 제한하여 동시성 문제를 해결 할 수 있으며, 코루틴에서는 이를 쉽게 적용 가능합니다.
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
   ... // 위와 동일
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
        withContext(counterContext) { // but confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter")    
}

singleThread의 context를 만들고 GlobalScope안에서 withContext를 이용하여 공유자원에 접근하는 부분을 single thread를 사용하는 context로 전환 시켰습니다.

출력값은 정상이지만 이 연산은 매우 느립니다.

코루틴은 Dispatcher.Default로 multi-thread에서 수행되지만, 연산할 때는 single thread를 갖는 context로 switching 하도록 fined-gained 하게 thread를 제한했기 때문에 속도 저하가 발생합니다.


Thread confinement coarse-gained

상태를 업데이트 하는 작업은 single thread에서 진행해야 합니다.

정확히 업데이트 하는 작업만을 분리해 내는게 아니라 updating 하는 작업을 포함하는 작업을 큰 덩어리로 나누고 이 덩어리 자체를 single thread로 제한하도록 해봅니다.

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
   ... // 위와 동일
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    CoroutineScope(counterContext).massiveRun { // run each coroutine in the single-threaded context
        counter++
    }
    println("Counter = $counter")    
}

massiveRun()의 작업 자체를 지정된 context에서 모두 수행하도록 합니다.

게다가 GlobalScope을 사용하지 않았으므로 multi-thread로 coroutine을 띄우지 않으며, 해당 작업은 주어진 single thread context에서 전부 동작합니다.


이렇게 큰 덩어리 자체를 특정 thread에 제한하도록 하면 좀더 빠른 속도로 원하는(동기화된) 결과물을 얻을 수 있습니다.


Mutual exclusion

Mutual exclusion은 공유자원의 변경이 일어나는 critical section의 동시접근을 block하여 사용하는 기법을 말합니다.
자바에서는 SynchronizedReentrantLock을 이용하여 critical section을 block 시킵니다.

coroutine에서는 lockunlock 메서드를 갖는 Mutex를 이용합니다.

당연히 Mutex.lock()는 suspend function입니다.

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... // 위와동일
}

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    GlobalScope.massiveRun {
        mutex.withLock {
            counter++        
        }
    }
    println("Counter = $counter")
}

Mutext().withLock{...}은 lock과 unlock을 편리하게 사용하기 위한 extension function으로 아래와 동일한 코드 입니다.

mutex.lock()
try {
    ...
} finally {
    mutex.unlock()
}    


위 예제에서는 critical section을 특정 thread로 제한하지는 않았지만 fine-gained한 코드 입니다.

따라서 어느정도의 cost는 발생하지만 필요에 따라서 사용 가능한 코드 입니다.


Actors

actor는 coroutine의 복합체로 구성되어 있으며, 상태(state)는 사용된 코루틴 안에서만 유효하고 다른 coroutines과는 channel을 통해서 통신 합니다.
Actor는 간단하게 사용될때는 function으로 사용될수도 있지만 복잡한 state를 갖는 경우에는 class로 사용하는것이 더 적합 합니다.


actor 코루틴 빌더를 사용하면 actor scope 내부에서 mailbox channel이 연결되어 수신된 메시지를 처리할 수 있고, actor의 결과값(return값)에 send 채널이 연결되어 하나의 actor reference만 가지고도 channel의 send / receive를 처리할 수 있습니다.

Actor는 우리가 흔히 사용하는 Thread-Handler 패턴과 유사하다고 생각하면 이해하기 쉽습니다.
시간이 걸리는 연산작업은 Background로 처리하고, 완료되면 결과를 Handler로 던지는거죠.
Handler에 던진 message는 queuing 되어 sequential하게 처리되는데, 이 Handler의 역할이 Actor라고 생각하면 됩니다.

동기화 이슈가 있는 State 값들은 Action가 가지고 있으면서 외부에서 작업을 처리하고 해당 state 변경이 필요하면 actor에게로 state를 변경하라는 요청을 하는거죠.
단 actor의 channel을 이용하기 때문에 sequence를 보장하면서 동시성으로 인한 오류도 방지할 수 있습니다. 

Actor를 사용하기 위해서는 먼저 actor가 처리할 동작을 정의하는 message를 나타내는 클래스를 정의 합니다.
이럴때 sealed classes를 이용하면 좀더 명확하게 이런 메시지들을 정의할 수 있습니다.

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")   
}

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() = runBlocking {
    val counter = counterActor() // create the actor
    GlobalScope.massiveRun {
        counter.send(IncCounter)
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}


위 예제에서는 CounterMsgsealed class를 정의하고 이를 상속받는 각각의 message를 정의했습니다.
IncCounter는 값을 증가 시키고, GetCounter는 값을 반환합니다.
이때 GetCounter에서는 인자값으로 CompletableDeffered<Int>를 사용합니다.
CompletableDeffered는 한개의 값을 반환하면서 연산이 완료되어 complete를 호출할 때까지 코드가 block 됩니다.

위 예제는 참고 문서에 나온 예제이나 그다지 좋은 예제로 보이지는 않습니다.
GlobalScope.massiveRun{...}을 수행하면 내부적으로는 multi-thread를 사용하지만 해당 block이 끝나야만 다음 라인으로 넘어갈수 있습니다.

즉 이미 100,000 이라는 결과 연산이 완료된 이후에 GetCounter로 send를 보내기 때문에 await() 호출시 바로 값이 반환됩니다.
만약 massiveRun을 launch로 띄우고 GetCounter를 수행했다면 엉뚱한 값이 나오겠죠? (0 또는 매우 작은값)
코드가 blocking되어 흘러가는지 non-blocking되어 흘러가는지를 유의하고 있어야 합니다.

Actor는 어떤 context가 자신을 수행하는지와 상관없이 동기화 문제에서 자유로울 수 있습니다.
마치 자바에서 연산은 background에서 하고 결과처리는 UI thread의 handler로 보내서 하는것과 같은 방식이라고 보면 됩니다.

단 예제에서는 actor안에 mutable state를 넣고, message를 통해서만 수정되도록 해 놓았습니다.
(actor가 handler를 가지고 메시지를 처리한다고 생각할 수 있습니다.)
actor 역시 coroutine이며, coroutine은 순차적으로 수행됩니다.
더군다나, channel을 통해서 action을 전달하므로, lock 없이도 sequence를 보장할 수 있습니다.

무엇보다 위 예제에서 actor의 가장 큰 장점은 context의 switching이 없기 때문에 thread confinement한 방법 보다 좀더 빠르고 효율적입니다.


추가적으로 actor의 API 설명을 보면 아래와 같은 내용들이 있습니다.
  • Actor의 scope은 ActorScope이며, 이는 CoroutineScope과 ReceiveChannel이 implements 하고 있다.
  • context는 부모 CoroutineScope을 상속한다. 하지만 context 변수로 element를 추가할 수 있다.
  • Dispatcher가 지정되지 않으면 기본은 Dispatchers.Default가 사용된다.
  • Channel capacity도 지정할 수 있고, lazy하게 동작시킬수 있다.


반응형