본문으로 바로가기
반응형

photo by unsplash

Coroutine 1.5.0 버전이 출시되었습니다. 메이저하게 패러다임이 바뀌지는 않았지만 세세하게 변경된 부분이 있어 해당 부분을 정리하려 합니다. [1]

오히려 큰 틀이 바뀌지 않았기 때문에 쉽게 이해하고 넘어가리라 생각이 드네요.

Overview

1. GlobalSocpe은 delicate API로 마킹되었습니다.

버전을 1.5.0으로 올리고 나면 IDE가 GlobalScope 사용 부분에 녹색 줄을 그어 주는 걸 볼 수 있습니다.

GlobalScope을 쓰는 부분에는 @OptIn을 사용하여 명시적으로 내가 GlobalScope을 의도하고 쓰는 것이라는 걸 표기해 줘야 합니다.

2. @CorouitneTimeout 이 Junit5에 추가되었습니다.

Junit4에도 있었지만 통합시켰다고 합니다. 추가된 Annotation으로 테스트의 timeout을 설정할 수 있습니다.

3. Channel API의 재 정의

기존에 사용하던 offer, poll 함수가 trySend, tryReceive로 대체됩니다.

4. Reactive Stream type과 coroutine flow 간 converting 함수가 추가되고 안정화되었습니다.

Observable, flux 등의 reactive stream과 coroutine의 flow 간 안정적인 변환을 위한 함수들이 추가되었습니다.

GlobalScope marked as a delicate API

GlobalScope은 process와 생명주기를 같이 합니다. 하지만 GlobalScope이 유지된다고 해서 process가 유지되지는 않죠. 따라서 Daemon thread와 같다고 생각할 수 있습니다.

따라서 Daemon thread가 갖는 문제를 동일하게 가질 수 있습니다. 말 그대로 제대 종료되지 않아 resource를 낭비하면서 전체적인 성능 저하를 가져올 수도, 어떠한 클래스의 멤버를 물고 있을 경우 memory leak을 유도할 수 도 있습니다.

이는 GlobalScope이 structured concurrency를 준수하지 못함인데 이는 coroutine의 장점을 반감시키는 요소이기도합니다.

따라서 1.5.0부터는 GlobalScope이 @DelcatedCouritneApi로 marking 되었습니다. 실제 코드를 따라가 보면 아래와 같이 나와 있습니다.

@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

GlobalScope의 대체 방법

GlobalScope을 쓰지 말아야 할?? 이유라기보다는 조금은 경각심을 가지고 써야 하는 이유에 대해서 언급했으니 어떻게 대체할 수 있는지 알아보겠습니다.

fun loadConfiguration() {
    GlobalScope.launch {
        val config = fetchConfigFromServer() // network request
        updateConfiguration(config)
    }
}

loadConfiguration() 함수는 내부에서 비동기 처리를 위하여 GlobalScope을 사용하도록 되어있습니다. 만약 네트워크가 느려서 지연이 생기는 경우 취소 동작에 대한 정의나 timeout등에 대한 처리를 하지 않았으니 문제가 생길 수 있습니다. (그냥 질질 늘어지고 있는 상태에서 반복적으로 호출하면 계속 누적되면서 resource를 낭비시키겠죠?)

주저리주저리 이유를 적긴 했지만 쓰지 말라는 데에는 이유가 있으니 위 코드를 아래와 같이 바꿔 봅니다.

suspend fun loadConfiguration() {
    val config = fetchConfigFromServer() // network request
    updateConfiguration(config)
}

만약 이 작업과 다른 비동기 작업을 동시에 수행해야 하는 경우에는 couritneScope을 통해서 아래와 같이 묶어줄 수도 있습니다.

// concurrently load configuration and data
suspend fun loadConfigurationAndData() {
    coroutineScope {
        launch { loadConfiguration() }
        launch { loadData() }
    }
}

 위 세 개의 code snippet은 가이드에 나와있는 예제와 대체 코드에 대한 내용입니다.

하지만 GlobalScope을 쓰게 되는 필수불가결한  경우라면, coroutine의 시작점을 찾지 못하는 경우일 가능성이 큽니다. (자바에서 시작한 코드라던가ㅠ.ㅠ 자체 coroutine scope을 가지 않는 어떤 manager나 controller에서 호출을 시작한다던가..)

따라서 위와 같은 예제 코드로의 대체는 한정적일 수밖에 없습니다. (GlobalScope에 대한 이해가 부족하여 남발한 경우에만 대체 가능)

suspend 함수로 만들어 상위 coroutine scope이 갖는 context의 job에 종속시켜 structured concurrency를 유지시키는 방법이 가장 좋습니다만, suspend 지옥에 빠지거나, 타고 타고 시작점까지 올라가다 보니 "어.. corouitne scope이 없네??"라는 난감한 상황을 맞닥드릴 수 있습니다.

"정말 제대로 이해하고 쓰는 거 맞지?"

위와 같이 대체 불가한 상황이 온다면 GlobalScope을 써야 합니다. 부제와 같이 "응!! 내가 의도해서 GlobalScope을 쓰는 거야"라는 합법적인 사용으로 명시를 위해 사용 부분에 @OptIn(DelicateCoroutinesApi::class)를 붙여 줍니다.

// A global coroutine to log statistics every second, 
// must be always active
@OptIn(DelicateCoroutinesApi::class)
val globalScopeReporter = GlobalScope.launch {
    while (true) {
        delay(1000)
        logStatistics()
    }
}

특이하게도 해당 annotation을 변수에 붙였습니다. 하지만 함수에, 생성자에, class에 모두 붙일 수 있습니다. 이는 해당 annotation의 정의가 아래와 같기 때문입니다.

/**
 * Allows to use the API denoted by the given markers in the annotated file, declaration, or expression.
 * If a declaration is annotated with [OptIn], its usages are **not** required to opt in to that API.
 *
 * This class requires opt-in itself and can only be used with the compiler argument `-Xopt-in=kotlin.RequiresOptIn`.
 */
@Target(
    CLASS, PROPERTY, LOCAL_VARIABLE, VALUE_PARAMETER, CONSTRUCTOR,
    FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER, EXPRESSION, FILE, TYPEALIAS
)
@Retention(SOURCE)
@SinceKotlin("1.3")
@RequireKotlin("1.3.70", versionKind = RequireKotlinVersionKind.COMPILER_VERSION)
public annotation class OptIn(
    vararg val markerClass: KClass<out Annotation>
)

Extensions for JUnit5

JUnit5에서 추가된  (정확하게는 JUnit4에서도 사용은 가능했지만 JUnit5에서 통합된) @CroutinesTimeout를 사용하면 테스트 코드를 분리된 thread에서 시작시키고 동작 시간을 한정하여 timeout이 발생 시 해당 thread를 interrupt 시킬 수 있습니다.

import kotlinx.coroutines.debug.junit5.CoroutinesTimeout
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.Test

@CoroutinesTimeout(100)
class CoroutinesTimeoutSimpleTest {

     @CoroutinesTimeout(300)
     @Test
     fun firstTest() {
         runBlocking {
             delay(200)  // succeeds
         }
     }

     @Test
     fun secondTest() {
         runBlocking {
             delay(200)  // fails
         }
     }
 }

firstTest()는 수행하는데 200ms이 걸리나 timeout을 300으로 개별 설정해 놓았기 때문에 성공으로 처리됩니다.

seconTest()는 함수 레벨에서 따로 Timeout을 정의하지 않았기 때문에 class 레벨에서 정의해 놓은 100ms를 적용받아 실패하게 됩니다. (사용법이 참 쉽죠?)

실제 @CoroutinesTimeout annotation의 정의를 보면 아래와 같습니다.

package kotlinx.coroutines.debug.junit5

public annotation class CoroutinesTimeout(
    val testTimeoutMs: Long,
    val cancelOnTimeout: Boolean = false
)

testTimeoutMs은 timeout 시간을 ms으로 지정하기 위함이며, cancelOnTimeout은 동작되고 있는 모든 coroutine을 취소시킬지의 여부를 설정합니다.

추가적으로 해당 annotation을 사용하면 coroutines debugger가 활성화되며, timeout 발생 시 coroutine 생성과 관련된 stack traces에 대한 dump를 남깁니다. 빠른 테스트 동작을 위해 coroutine creattion stack trace를 disable 시키고 싶다면 CorouitnesTimeoutExtenstion [2]을 이용하여 설정을 변경할 수 있습니다. 

Channel API refinement

Channel은 coroutine 간에 데이터를 주고받는 (말 그대로 채널 역할을 하는) 역할을 하는 중요한 매체입니다. 1.5.0 버전에서는 기존에 사용하던 offer / poll이란 함수를 deprecate 시키고 새로운 naming rule에 따라 함수명을 변경 한다고 합니다.

여기서 naming rule이란 suspending과 non-suspending을 구분하는 형태를 말합니다. 앞으로도 계속 이런 기조를 유지하면서 함수를 naming 할 거라고 합니다.

이 친구들이 밝힌 naming rule은 아래와 같습니다.

  • 일반적인 suspending 함수는 기존과 같이 유지합니다. e.g., send, receive
  • non-suspending 이면서 error를 encapsulation 하고 있다면 prefix로 "try"를 붙입니다. e.g., trySend (offer 대체), tryReceive (poll 대체)
  • 추후에 새롭게 error encapsulation 되는 함수들은 suffix로 "Catching"이 붙습니다.

Suspending functions vs non-Suspending functions for Channel

channel에서 제공하는 suspending과 non-suspeding function은 send / offer, receive / poll로 짝지을 수 있습니다. 채널에는 기본적으로 연속적인 send를 할 수 없고, 연속적으로 receive 할 수 없습니다.

따라서 send -> receive가 순차적으로 일어나야 하며, channel에 데이터가 있는 상태에서 send를 요청하면 send 함수는 suspending 되고, channel이 비어 있는 상태에서 receive를 하게 되면 receive 함수가 suspending 됩니다. (이건 기본적인 channel의 동작 내용입니다.)

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        // Suspends until the element can be sent
        println("Sending...")
        channel.send("Element")
     }
     // Suspends until the element can be received
     println("Receiving...")
     println(channel.receive())
}

따라서 위와 같이 코드를 작성한 경우 출력 결과는 아래와 같습니다.

  1. "Receiving..." 출력
  2. 수신 대기
  3. 비동기로 띄운 "Sending..." 출력
  4. "Element"를 채널에 send
  5. receive() 함수가 resume 되면서 "Element" 출력

이렇게 suspending 되는 동작을 원치 않는다면 non-suspending function인 trySendtryReceive를 을 사용할 수 있습니다. 그전에 offerpoll이 왜 위 함수로 대체되었는지에 대한 이유를 생각해 보겠습니다.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        println("Sending...")
        // Doesn't suspend
        // Returns 'false' if the channel is full
        // Or throws an exception if it's closed
        channel.offer("Element")
    }
    println("Receiving...")
    // Doesn't suspend
    // Returns 'null' if the channel is empty
    println(channel.poll())
//  channel.close()
}

위 코드의 실행 순서는 아래와 같습니다.

  1. "Receiving..." 출력
  2. channel.poll()을 수행하나, 해당 channel은 비어있는 상태이므로 null 출력
  3. launch 내부의"Sending..." 출력
  4. offer를 하였으나 false를 반환함 (buffer가 0인 rendezvous channel이므로)

위와 같은 동작은 원했던 동작이 아닙니다만 호출 순서가 잘못되었기 때문에 의도와는 다르게 동작합니다. (원래는 채널에 정상적으로 넣고/빼는 걸 원했던 거겠죠?)

사실 offer의 결과로는 false를 기대할 수 있으나, 위 코드에서 마지막 라인의 close()를 주석 해제하면 offer 호출 시 exception이 발생합니다. (이미 닫힌 채널에 offer를 시도했으므로)

이런 동작은 "매번 offer / poll 마다 try-catch를 하라는 건가?"라는 생각이 들 정도로 상태에 따라서 동작이 달라져 에러를 유발하는 코드가 됩니다. (그래서 항의 좀 많이 받았다고 하네요... [1])

따라서 이 function들이 trySend, tryReceive로 수정되어 아래와 같이 사용됩니다.

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        println("Sending...")
        // Doesn't suspend
        // Returns 'Failed' if the channel is full
        // Or 'Channel was closed' result if it's closed
        val result = channel.trySend("Element")
        println(result)

        //We can verify the result
        if(result.isClosed){
            println("Sending failed. The channel is closed.")
        }
    }
    println("Receiving...")
    println(channel.tryReceive())
//  channel.close()
}

이 function들은 ChannelResultvalue calss [3] (예전 용어로는 inline class)를 반환합니다. 이는 아래와 같이 세 가지의 상태를 갖습니다.

@JvmInline
public value class ChannelResult<out T>
@PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
  ...
    public val isSuccess: Boolean get() = holder !is Failed
  ...
   
    public val isFailure: Boolean get() = holder is Failed
  ...
   
    public val isClosed: Boolean get() = holder is Closed
  ...
}

실패인지, 성공인지, 채널이 닫혔는지에 대한 상태를 세분화하여 반환해 줍니다. 이제 try catch로 exception handling을 해줘야 할지 말아야 할지 더 이상의 고민이 없어졌네요~

추가적으로 위에서 언급했던 naming rule에 따라 1.5.0 이후부터 xxxCatching으로 된 function은 내부적으로 exception을 capsulation 해줍니다.

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    channel.close()
    
    // 채널이 닫혔지만 정상적으로 종료됨.
    println(channel.receiveCatching())
}

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    channel.close()
    
    // ClosedReceiveChannelException 발생
    println(channel.receive())    
}

 

기존 코드의 migration

IDE에서 offer / poll은 이제 deprecate가 표시됩니다. 따라서 똑똑한 IDE에서 추천해 주는 다음 코드로 간단하게 변경이 가능합니다.

  • offer("AAA) -> trySend("AAA").isSuccess
  • poll() -> tryReceive.getOrNull()

 만약 exception에 따른 어떤 추가 로직이 동작하도록 이미 정의되어 있어 "close가 된 상태에서 접근 시 exception이 발생해야 돼!!!"라고 하면 좀 지저분하겠지만 아래와 같이 수정이 가능합니다.

//offer에 대한 migration
channel
  .trySend("Element")
  .onClosed { throw it ?: ClosedSendChannelException("Channel was closed") }
  .isSuccess
  
//poll에 대한 migration  
channel.tryReceive()
  .onClosed { if (it != null) throw it }
  .getOrNull()

Reactive integrations on the road to stability

1.5에서 reactive framework과 통합시키는 대부분의 function들이 statble로 승격되었습니다. 따라서 JVM 환경에서 동작하는 비동기 stream들 (Reactive Streams의 표준을 따르는)을 kotlin의 flow로 전환시킬 수 있습니다. 물론 그 반대도 가능합니다.

다만 이런 converting function들은 kotlinx.coroutines에 포함되어 있지는 않으며, 연관되는 reactive module에 따라 분리되었습니다.

dependencies {          
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

예를 들면 위처럼 모듈별로 따로 import 해서 사용해야 합니다.

이렇게 dependency를 설정해 놓으며 flow를 Reactive Sreams type인 publisher로 converting 하거나, Project Reactor type인 flux로 converting 할 수 있습니다.

// acquire a Flow instance
val flow: Flow<event> = flow { … }

// Convert Flow to Publisher
val publisher = flow.asPublisher()

// Convert Flow to Reactor's Flux
val flux = flow.asFlux()

//Convert back to Flow 
val anotherFlow = flux.asFlow()

이렇게 기타 Reactive streams들을 converting 하는 많은 api들이 @ExperimentalCorouinesApi 이름표를 떼고!! 정식 api로 승격되었습니다. 변경된 list들은 references link [4] 번에서 확인할 수 있습니다.

예를 들어 이제 RxJava의 Observable은 아래와 같이 stable 한 API를 이용하여 변경할 수 있습니다.

fun legacyFunThatHaveToReturnObservable(): Observable<int> {
  return flow<int> {
    // Use the power of flow!
  }
  // various flow operations
  .asObservable()
}

 

New convenience functions

Reactive type인 Mono를 couroutine에서 사용할 때 thread blocking 없이 값을 반환하도록 지원하는 여러 개의 function들이 존재합니다.

1.5부터는 Mono와 Many를 위해서 존재했던 awaitSingleOr*로 시작하는 functions들은 deprecate 되고 await*로 대체됩니다.

추가적으로 Mono는 결괏값을 하나만 생성하기 때문에 first, last값이 동일합니다.

따라서 Mono.awaitFirst(), Mono.awaitLast()Mono.awaitSingle()로 대체됩니다.

Start using Kotlinx.coroutines 1.5.0

build.gradle.kts file에 아래와 같이 추가합니다.

plugins {
   kotlin("jvm") version "1.5.0"
}

dependency에 필요에 따라서 아래와 같이 추가합니다.

val coroutinesVersion = "1.5.0"

dependencies { 
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3:$coroutinesVersion")
  ...
}

 

References

1. https://blog.jetbrains.com/kotlin/2021/05/kotlin-coroutines-1-5-0-released/#use-coroutines-1-5-0

2. https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-debug/src/junit/junit5/CoroutinesTimeoutExtension.kt

3. https://kotlinlang.org/docs/inline-classes.html#members

4. https://github.com/Kotlin/kotlinx.coroutines/commit/47a063c0987177551bdbdf09a458998a30571ac2

 

반응형