이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
또한 예제에서 로그 print시 println과 안드로이드의 Log.e()를 혼용합니다.
Shared mutable state and concurrency
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking {
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
}
위 예제에서 100개의 coroutine을 띄우고 각 코루틴은 전달받은 action을 1000번 수행합니다.
그리고 main() 함수에서 GlobalScope에서 수행시켰으니, massiveRun의 내부 launch들은 scope의 context를 상속받아 Dispatcher.Default로 수행됩니다.
(이해가 안간다면 Dispatcher 관련 글을 먼저 읽고 오세요. https://tourspace.tistory.com/153)
결과는 100* 1000인 100,000이 나와야 하지만 예상한대로 아예 다른값이 나옵니다.
coutner 변수는 공통으로 접근할수 있는 변수 이면서 동기화 처리를 안했기 때문입니다.
참고로 코어가 두개 미만인 옛날 cpu를 사용하는 경우에는 정확한 결과가 나옵니다.
Dispather.Default가 ForJoinPool을 이용하기 때문에 실제로 thread는 하나만 뜨기 때문이죠.
이건 논외의 상황이니 무시하도록 합니다.
Volatiles are of no help
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... // 위와 동일
}
@Volatile // 코틀린에서는 annotation으로 volatile을 표기함.
var counter = 0
fun main() = runBlocking {
... //위와 동일
}
결과도 틀리지만 연산속도도 느려졌습니다.
여기서 잠깐 자바의 동기화 문제에 대해 집고 넘어가야 할 필요가 있기에 잠깐 volatile에 대해 설명합니다.
Volatile의 동기화
- Cache를 이용하지 않고 main memory에 직접 access 하기 때문에 더 비싼 cost를 지불해야 한다.
- volatile 변수는 읽기 쓰기가 JVM에 의해 reordering 되지 않는다.
- volatile 읽기/쓰기 이후의 연산들은 반드시 읽기/쓰기 이후에 이루어 진다
- 따라서 필요에 따라 성능상의 이유로 JVM의 instruction reorder 동작을 못하도록 막기 때문에 성능면에서 손해를 본다.
- volatile 변수는 read시 항상 최신값을 반환한다. 단 여러 쓰레드가 동시 읽기, 쓰기를 하면 쓰기 시점을 알수없기 때문에 여전히 동기화 문제가 일어난다.
Thread-safe data structures
uspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... //위와 동일
}
var counter = AtomicInteger()
fun main() = runBlocking {
GlobalScope.massiveRun {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
위 예제는 counter 변수를 AtomicInteger()를 이용해서 변경하고, incrementAndGet() 함수를 통해 증가 시켰습니다.
즉 Atomic class를 사용하면 문제를 쉽게 해결할 수 있습니다.
Thread confinement fine-gained
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... // 위와 동일
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
singleThread의 context를 만들고 GlobalScope안에서 withContext를 이용하여 공유자원에 접근하는 부분을 single thread를 사용하는 context로 전환 시켰습니다.
출력값은 정상이지만 이 연산은 매우 느립니다.
코루틴은 Dispatcher.Default로 multi-thread에서 수행되지만, 연산할 때는 single thread를 갖는 context로 switching 하도록 fined-gained 하게 thread를 제한했기 때문에 속도 저하가 발생합니다.
Thread confinement coarse-gained
상태를 업데이트 하는 작업은 single thread에서 진행해야 합니다.
정확히 업데이트 하는 작업만을 분리해 내는게 아니라 updating 하는 작업을 포함하는 작업을 큰 덩어리로 나누고 이 덩어리 자체를 single thread로 제한하도록 해봅니다.
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... // 위와 동일
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
CoroutineScope(counterContext).massiveRun { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
massiveRun()의 작업 자체를 지정된 context에서 모두 수행하도록 합니다.
게다가 GlobalScope을 사용하지 않았으므로 multi-thread로 coroutine을 띄우지 않으며, 해당 작업은 주어진 single thread context에서 전부 동작합니다.
이렇게 큰 덩어리 자체를 특정 thread에 제한하도록 하면 좀더 빠른 속도로 원하는(동기화된) 결과물을 얻을 수 있습니다.
Mutual exclusion
coroutine에서는 lock과 unlock 메서드를 갖는 Mutex를 이용합니다.
당연히 Mutex.lock()는 suspend function입니다.
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
... // 위와동일
}
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
Mutext().withLock{...}은 lock과 unlock을 편리하게 사용하기 위한 extension function으로 아래와 동일한 코드 입니다.
mutex.lock()
try {
...
} finally {
mutex.unlock()
}
위 예제에서는 critical section을 특정 thread로 제한하지는 않았지만 fine-gained한 코드 입니다.
따라서 어느정도의 cost는 발생하지만 필요에 따라서 사용 가능한 코드 입니다.
Actors
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking {
val counter = counterActor() // create the actor
GlobalScope.massiveRun {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
- Actor의 scope은 ActorScope이며, 이는 CoroutineScope과 ReceiveChannel이 implements 하고 있다.
- context는 부모 CoroutineScope을 상속한다. 하지만 context 변수로 element를 추가할 수 있다.
- Dispatcher가 지정되지 않으면 기본은 Dispatchers.Default가 사용된다.
- Channel capacity도 지정할 수 있고, lazy하게 동작시킬수 있다.
'개발이야기 > Kotlin' 카테고리의 다른 글
[Kotlin] 코루틴 Exception 추가 예제 (0) | 2019.01.23 |
---|---|
[Kotlin] 코틀린 - 코루틴#9 Select (Experimental) (0) | 2019.01.06 |
[Kotlin] 코틀린 - 코루틴#7 - Channels (0) | 2018.12.21 |
[Kotlin] 코틀린 - 코루틴#6 - supervision (0) | 2018.12.12 |
[Kotlin] 코틀린 - 코루틴#5 - exception (0) | 2018.12.12 |