본문으로 바로가기
반응형

photo by unsplash

coroutine는 callback을 사용하지 않고 비동기 처리를 가능해주는 장점을 가지고 있습니다. 따라서 비동기 처리를 코드의 순서대로 실행시키며 가독성을 높이고, 보다 심플한 코드를 작성할 수 있도록 도와줍니다. 즉 callback을 이용하여 비동기 작업을 처리하고 이 처리 결과에 따라 다음 동작으로 이어지는 형태의 코드를 작성할 때 callback 지옥에서 빠져나올 수 있게 해주는 아주 고마운 친구입니다.

단, 기존의 라이브러리나, 안드로이드에서는 무수히 많은 callback으로 비동기 처리를 하고 있습니다. 우리가 안드로이드를 시작할 때 보게 되는 onCreate()onResume()도 콜백형태 이므로, 안드로이드 시작부터가 callback입니다. (android는 framework 이므로 당연한 얘기입니다만..) 기타 네트워크 처리, 소켓통신을 제공하는 라이브러리 역시 대부분 요청 결과를 받기 위해서는 콜백을 interface로 제공합니다. 기타 sdk도 마찬가지지요.

따라서 callback을 피하기 위해서 coroutine을 잘 사용하다가도 이런 callback으로 결과를 넘겨주는 api를 호출한다면 호출 함수 내부에서 await()나 유사한 동작을 하는 coroutine api를 사용하여 callback의 종료 시점을 기다리도록 불편하게 코드를 작성해야 합니다.

corouitne에서는 이런 동작을 좀 더 간편하게 사용할 수 있도록 suspendCoroutine을 제공합니다. callbackFlow도 유사하지만 여기서는 suspendCoroutine에 대해서 알아봅니다. [1][2]

suspendCoroutine vs callbackFlow

두 개의 api는 모드 callback을 coroutine형태로 변환하는 기능을 제공합니다. "어떠한 경우에 이걸 써야 한다!"라는 명확한 사용 용도를 제한하지는 않지만 아래와 같은 케이스에 적합합니다.

  • suspendCoroutine - 일회성 callback - return값이 단일 값임
    • e.g. 네트워크 연결에 대한 성공/실패 callback, andorid view의 layout 완료 콜백 등.
  • callbackFlow - 지속적으로 메시지가 전달되어 오는 callback - return값이 flow임.
    • e.g. webSocket 연결 후 전달되는 지속적인 message에 대한 callback, android의 TextWatcher, broadcastReceiver 등

사실 return값으로 단일값을 전달받느냐, stream형태로 받느냐의 차이기 때문에 어떤 걸 쓰더라도 무방합니다. 하지만 callback의 결과가 한번 오고 끝날지, 여러 개의 결과가 반환되는지에 따라서 구분하여 API를 선택하면 좀 더 간결한 코드를 작성할 수 있습니다.

suspendCoroutine or suspendCancellableCoroutine

두 개의 동작은 유사하나 취소가 가능한지 아닌지의 차이만 있습니다. 대부분의 경우 suspendCancellableCoroutine을 사용하는 게 좀 더 안정적입니다. 

lifecycleScope.launch {
    withTimeoutOrNull(3000) {        
        suspendCoroutine<Unit> { continuation ->
            //3초 이후에 취소 안됨
            //do something
        }
    }
}

lifecycleScope.launch {
    withTimeoutOrNull(3000) {
        suspendCancellableCoroutine<Unit> { conrinuation ->
            //3초 이후에 취소됨.
            //do something
        }
    }
}

위 코드처럼 3초 안에 block의 수행이 끝나면 취소되도록 withTimeoutOrNull로 감싸는 경우 suspendCoroutine은 취소가 불가하며, suspendCancellableCoroutine은 취소가 진행됩니다. 대부분의 coroutine은 부모 coroutine의 동작이 정지되면 특수한 경우를 제외하고는 생명주기를 같이하기를 기대하기 때문에 (structured concurrency) GlobalScope의 동작을 유도하는 코드가 아니라면 suspendCancellableCoroutine을 사용하는 것이 좋습니다. 이후로는 suspendCancellableCorouitne으로 예제를 설명합니다.

suspendCancellableCoroutine

기본 정의는 아래와 같습니다. [2]

inline suspend fun <T> suspendCancellableCoroutine(
                   crossinline block: (CancellableContinuation<T>) -> Unit): T

param을 전달되는 block의 형태가 CancellableContinuation입니다. 따라서 취소가 가능한 block이며 참고로 suspendCoroutine의 경우 param이 Continuation block입니다.

테스트를 위해 sample code를 아래와 같이 준비합니다.

class TestViewModel : ViewModel() {
    ...

    // 네트워크 요청시 받는 Callback 형태
    interface NetworkResult {
        fun success(resultCode: Int)
        fun fail(cause: Throwable)
    }

    private var networkJob: Job? = null
    
    // 네트워크 접속 요청시 결과를 callback으로 넘겨주는 함수 (Dummy)
    private fun requestNetwork(resultCallback: NetworkResult) {
        networkJob = viewModelScope.launch {
            delay(500) //임의로 5초를 대기후 결과를 반환해 준다.
            resultCallback.success(200)
        }
    }
    ...
}

requestNetowork()은 네트워크를 연결하는 함수로 성공 실패 여부를 param으로 넘어온 NetworkResult interface로 전달해 줍니다. 여기서는 임의로 0.5초를 기다린 후 callback의 sucess를 호출해 주도록 합니다.

이 callback 방식의 api를 아래와 같이 suspendCancellableCorouitne으로 구성합니다.

class TestViewModel : ViewModel() {

    interface NetworkResult {
        ...
    }

    private fun requestNetwork(resultCallback: NetworkResult) {
        ...
    }

    suspend fun connectNetwork(): Int {
        Log.i(TAG, "connectNetwork() START!")
        
        val result = suspendCancellableCoroutine<Int> { continuation ->
            Log.i(TAG, "suspendCancellableCoroutine START!")
            // 결과를 전달받을 Callback 생성
            val callbackImpl = object : NetworkResult {
                override fun success(resultCode: Int) {
                    Log.d(TAG, "Network request success - $resultCode")
                    // suspend된 suspendCancellableCorouitne을 -> resume 상태로 변경
                    continuation.resume(resultCode)
                }

                override fun fail(cause: Throwable) {
                   ...
                }
            }

            requestNetwork(callbackImpl)
            ...
            Log.i(TAG, "suspendCancellableCoroutine END!")
            // suspendCancellableCorouitne block이 수행되면 code가 suspend 된다.
        }
        
        Log.i(TAG, "connectNetwork() END!")
        return result
    }
}

callback을 구현한 후 suspendCancellableCorouitne 내부에서 호출해 줍니다. 이때 callback 내부에서 continuation.resume(result)를 호출해주도록 구성합니다.

connectNetwork()을 호출하면 suspendCancellableCoroutine {..} 코드 블록 내부의 모든 코드가 수행됩니다. 단 블록 내부의 코드가 전부 수행된 후 suspend 되어 다음 라인이 실행되지 않습니다. (코드가 blocking 됩니다.) 이 block을 벗어나려면 continuation의 resume이 호출되어야 하며, 위에서는 sucess 콜백을 받은 후 resume을 호출하므로 이때 코드의 blocking이 풀리면서 마지막 로그 ("connectNetwork() END!")가 출력되면 함수가 종료됩니다.

이를 호출하는 부분의 코드와 찍히는 로그의 순서는 아래와 같습니다.

class MainActivity : ComponentActivity() {
    ...    
    private val testViewModel: TestViewModel by viewModels()

    @ExperimentalCoroutinesApi
    override fun onCreate(savedInstanceState: Bundle?) {
      ...
        MainScope().launch {            
            val result = testViewModel.connectNetwork()            
        }
    }
}

resumeWithException

connectNetwork() 부분에서 실패가 발생할 경우 아래와 같이 fail() callback을 구성하고 requestNetwork()에서 임의로 Exception을 넘겨줍니다.

class TestViewModel : ViewModel() {

    interface NetworkResult {
        ...
    }

    private fun requestNetwork(resultCallback: NetworkResult) {
         viewModelScope.launch {
            delay(500)
            // resultCallback.success(200)
            resultCallback.fail(Exception("Network access failed!!"))
        }
    }

    suspend fun connectNetwork(): Int {
        Log.i(TAG, "connectNetwork() START!")
        
        val result = suspendCancellableCoroutine<Int> { continuation ->           
                ...

                override fun fail(cause: Throwable) {
                    Log.e(TAG, "Network request failed!!")
                    continuation.resumeWithException(cause)
                }
            }

            requestNetwork(callbackImpl)
            ...
            Log.i(TAG, "suspendCancellableCoroutine END!")
        }
        
        Log.i(TAG, "connectNetwork() END!")
        return result
    }
}

resumeWithExceptionresume 처리하여 코드 blocking을 해제시키고 exception을 외부로 throw 해주는 역할을 합니다. 따라서 아래와 같이 호출 부분에서 try-catch를 추가하여야 하며, 수행 시 로그는 아래와 같이 출력됩니다.

class MainActivity : ComponentActivity() {
   ...
    override fun onCreate(savedInstanceState: Bundle?) {
   ...

        MainScope().launch {
            try {
                val result = testViewModel.connectNetwork()      
            } catch (t: Throwable) {
                Log.e(TAG, "Throwable: $t")
            }
        }
    }
}

Prompt cancellation guarantee

suspendCancellableCoroutine은 이를 수행시킨 coroutine job이 cancel 되면 즉각적으로 취소됩니다. blocking된 상태를 해제 시키는 resume 또는 resumeWithException 같은 코드들은 suspend 된 상태를 resume으로 만들기 위해 해당 Dispatcher에 요청하게 되고 dispatcher의 스케줄에 따라서 실행 가능한 순간에 수행되게 됩니다. 따라서 job의 cancel이 resume이 호출되기전이나 동시에 일어날 수 있으며 이 경우 cancel이 즉각적으로 처리되고 code의 resume은 성공하지 못한 것으로 처리됩니다.

Returning resuorces from a suspended coroutine

만약 위 예제에서 network의 연결 후 성공 / 실패와 상관없이 닫아야 하는 resource가 있다고 가정합니다.  따라서 이런 resource를 해제(close) 하기 위해서 성공/실패 쪽에 resource를 release 하는 코드를 넣을 수도 있지만 즉각적인 cancel일 발생하여 resume이 수행되지 못하는 경우에도 대비해야 합니다.

물론 호출 부분에서 CancellationException을 catch해도 되지만 continuation의 invokeOnCancellation을 이용하여 coroutine의 cancel이 발생하는 경우 명시적으로 호출할 수 있는 코드를 정의할 수 있습니다.

class TestViewModel : ViewModel() {
   ...    

    private fun cancelNetwork() {
        networkJob?.cancel() //진행중이던 network 작업 취소.
        Log.i(TAG, "cancelNetwork() - Network cancelled")
    }

    suspend fun connectNetwork(): Int {
        Log.i(TAG, "connectNetwork() START!")

        val result = suspendCancellableCoroutine<Int> { continuation ->
            Log.i(TAG, "suspendCancellableCoroutine START!")
            val callbackImpl = object : NetworkResult {
               ...
            }

            requestNetwork(callbackImpl)

            // coroutine scope이 cancel 될때 호출된다.
            continuation.invokeOnCancellation {
                // Thread-safe한 함수들만 호출되어야 한다.
                Log.i(TAG, "Cancel network!!")
                cancelNetwork()
            }
            Log.i(TAG, "suspendCancellableCoroutine END!")
        }

        Log.i(TAG, "connectNetwork() END!")
        return result
    }
}

호출 부분에 아래와 같이 100ms 안에 끝나지 않으면 취소되도록 유도하는 경우 이 함수를 수행한 coroutine의 job이 cancel 되면서 invokeOnCancelation에 의하여 releaseNetwork() 코드가 호출됨을 확인할 수 있습니다. (requestNetwork()에서 임의로 500ms이 걸리도록 만들어 놓았기 때문에 100ms 안에 완료되지 못하고 timeout이 발생합니다.

- 호출 부분 코드

...
    override fun onCreate(savedInstanceState: Bundle?) {
        MainScope().launch {
            try {
                withTimeoutOrNull(100) {
                    val result = testViewModel.connectNetwork()
                }
            } catch (t: Throwable) {
                Log.e(TAG, "Throwable: $t")
            }
        }
    }

또 다른 방법으로는 cancel에 대한 callback을 받을 수 있는 resume을 사용하는 방법입니다.

    //CancellableContinuation.kt
    
    @ExperimentalCoroutinesApi // since 1.2.0
    public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)

따라서 아래와 같이 코드를 수정할 수 있습니다.

class TestViewModel : ViewModel() {
   ...    
  
    suspend fun connectNetwork(): Int {
        Log.i(TAG, "connectNetwork() START!")

        val result = suspendCancellableCoroutine<Int> { continuation ->
            Log.i(TAG, "suspendCancellableCoroutine START!")
            val callbackImpl = object : NetworkResult {
                override fun success(resultCode: Int) {
                    Log.d(TAG, "Network request success - $resultCode")
                    continuation.resume(resultCode) {
                        // job cancel시 호출해야 하는 코드 명시
                        Log.i(TAG, "resume with cancel request!")
                        cancelNetwork()
                    }
                }
               ...
            }

            requestNetwork(callbackImpl)

            // coroutine scope이 cancel 될때 호출된다.
//          continuation.invokeOnCancellation {
                // Thread-safe한 함수들만 호출 되어야 한다.
//              Log.i(TAG, "Cancel network!!")
//              cancelNetwork()
//         }
            Log.i(TAG, "suspendCancellableCoroutine END!")
        }

      ...
    }
}

만약 resume에도 cancel시 처리할 동작을 명시하고 invokeOnCancellation도 등록해 놓는다면 둘 다 호출되므로 주의해야 합니다.

두 가지 방식 중 어떤 방식으로 cancel의 동작을 정의하든 cancel 동작은 빠르고 thread-safe 해야 합니다. cancel블록을 실행하는 dispatcher가 Main이 아니라면 (IO나 Default라면) 어떤 thread에서도 호출될 수 있기 때문입니다.

추가적으로 job이 cancel되지 않는한 해당 코드는 호출되지 않습니다. 즉 정상적으로 resume 된다면 cancellation block에 정의된 코드는 처리되지 않습니다. 예제에서는 releaseNetwork()이 network 요청을 취소하는 역할이므로 취소시점에만 호출 되어야 합니다. 만약 resume되든(exception이 발생하든), cancel되든 무조건 해제해야 하는 resource가 있다면 호출부분에서 처리해야 누락을 방지할 수 있습니다. 여기서는 connectNetwork()를 호출하는 MainActivity가 되겠죠?

 

References

[1] 2021.11.29 - [개발이야기/Kotlin] - [Coroutine] Callback을 Flow로 변환 - callbackFlow

[2] https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/suspend-cancellable-coroutine.html

반응형