본문으로 바로가기
반응형

photo by unsplash

coroutine을 정리하면서 같이 정리했던 flow 부분에 유용한 변화들이 발생하여 추가된 flow들에 대해서 추가적인 정리를 하려고 합니다. 기존 글을 작성했을 당시 (2019.11) coroutine이 v1.3.2 정도였는데, 벌써 v1.6.0-RC가 나온 상태입니다. 곧  정식 1.6.0이 나오겠지만, 그사이 추가된 coroutine 관련 api들을 시간 날 때마다 추가하여 정리하도록 하겠습니다.

예제 생성

callbackFlow는 Flow라기 보단 flow builder라는 표현이 더 맞을것 같습니다. 이름에서 보이듯이 callback을 flow로 변경합니다. 기존에는 비동기 처리를 위해 callback 구조를 많이 사용하기도 했고, 여타 라이브러리들나 SDK들이 callback으로 응답을 주는 경우가 많기 때문에 이를 중간에서 flow로 converting 하기 위해 사용됩니다.

먼저 Callback으로 결과를 return해 주는 예제 함수를 하나 만들겠습니다.

// 네트워크 요청 결과 callback
interface NetworkResult {
    fun success()
    fun fail()
}

// 네트워크 요청
private suspend fun requestNetworkData(resultCallback: NetworkResult) {
    repeat(5) {
        delay(500)
        Log.d(TAG, "requestNetworkData() - ${Thread.currentThread().name}")
        resultCallback.success(it)
    }
}

// 네트워크 처리 완료시 close 처리
private fun releaseNetwork() {
    Log.i(TAG, "releaseNetwork() - Network released")
}

NetworkResult inteface

  • network 요청 결과에 대한 success(), fail() 함수를 호출해 줍니다.

requestNetworkData()

  • 임의로 500ms 간격으로 success()를 호출하도록 되어 있습니다.

releaseNetwrok()

  • 사용이 완료되면 network을 닫는 작업(resource 해제 작업)을 해주게 됩니다.

requestNetworkData() 함수는 응답을 callback 주기 때문에 이를 flow로 쉽게 전환하기 위해서 callbackFlow를 사용하도록 합니다.

그전에 먼저 callbackFlow의 정의를 보면 아래와 같습니다.

public fun <T> callbackFlow(
      @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
                                                     CallbackFlowBuilder(block)
                                                     
...

넘겨받은 block이 CallbackFlowBuilder()의 param으로 들어갑니다. 여기서 param의 정의만으로 생성된 block은 ProducerScope이라는 걸 알 수 있습니다. 즉, 내부적으로 channel을 생성한다는 얘긴데 좀 더 들어 보면 아래와 같습니다.

private class CallbackFlowBuilder<T>(
    private val block: suspend ProducerScope<T>.() -> Unit,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) {
...

Channel을 기본적으로 생성해주는 Procuder (coroutine Builder)는 추가적으로 두개의 인자를 받습니다. 여기서는 기본값으로 고정되어 세팅되어 있으므로 그 의미는 아래와 같습니다.

buffer는 기본값을 사용하여 64개를 사용합니다. 그리고 BufferOverflow.SUSPEND 옵션에 따라서 channel에 쌓여진 data가 64개가 넘어가면 suspending 되어 처리가 send가 block 됩니다. [3]

기본 구성

requestNetworkData()의 결과를 flow로 변환하기 위해서 아래와 같이 함수를 구성합니다.

@ExperimentalCoroutinesApi    
    fun getNetworkResultFlow(): Flow<String> = callbackFlow {
        val callbackImpl = object : NetworkResult {
            override fun success(result: Int) {
                Log.d(TAG, "Network request success - $result - ${Thread.currentThread().name}")
                trySend("SUCCESS")
            }

            override fun fail() {
                Log.e(TAG, "Network request failed - ${Thread.currentThread().name}")
                trySend("ERROR")
                // 실패시 channel을 닫는다.
                close()
            }
        }

        requestNetworkData(callbackImpl)

        // coroutine scope이 cancel 또는 close될때 호출된다.
        awaitClose {
            Log.i(TAG, "Release request! - ${Thread.currentThread().name}")
            releaseNetwork()
        }
    }

callbackFlow로 생성되는 coroutine scope은 ProducerScope이므로 기본적인 Channel api를 사용하여 데이터의 방출을 처리합니다. 예제에서는 trySend()를 이용하여 성공과 실패일때 각각 데이터를 전달하도록 처리해 놨습니다. trySend는 coroutine v1.5.0 이후부터 offer를 대신하는 api로 buffer가 꽉 찬 상태에서는 false를 반환합니다. 자세한 내용은 하기 링크를 확인하시기 바랍니다. [5]

awaitClose {..}는 ProducerScope block의 코드의 실행이 완료되고 나서 바로 종료되는 걸 막는 코드입니다. 위 예제에서는 콜백이 한 번만 올 수 있으나, addListener(또는 addObserver)로 특정 이벤트를 관찰하는 경우에는 callback이 호출되는 걸 지속적으로 관찰해야 하기 때문에 이 api를 써서 지속적으로 callback을 전달받을 수 있도록 flow를 유지합니다.

awaitClose는 flow가 cancel 되거나 close 될 때(channel.close()가 명시적으로 호출될 때) 해당 블록을 호출시킵니다. 따라서 해당 block안에서는 resource를 해체하는 코드가 들어가야 합니다.  만약 callback을 addObserver()로 등록하여 전달받고 있었다면 removeOvserver()가 들어가야 하겠죠?

또한 awaitClose를 명시하지 않을 경우 observer leak이 발생할 수 있다고 판단하기 때문에 반드시 block의 맨 마지막에 명시해야 하며, 그렇지 않을 경우 exception이 발생하므로 주의해야 합니다.

호출 부분의 구성

호출 부분은 아래와 같이 구성합니다.

// MainActivity.kt

override fun onCreate() {
...
    MainScope().launch {
        testViewModel.getNetworkResultFlow().collect {
            Log.i(TAG, "Network result#1: $it - ${Thread.currentThread().name}")
        }
        Log.d(TAG,"Collect End #1 - ${Thread.currentThread().name}")
   }
...
}

collect는 suspend function이므로 coroutine scope에서 호출되어야 합니다. 따라서 MainScope()으로 (SuperviorJob + Dispatchers.Main) coroutine scope을 만들어 호출했습니다. 수행 결과는 아래와 같습니다.

네트워크 호출 함수와 callback모두 main thread에서 호출되었으며, 500ms 간격으로 5번이 순서대로 잘 호출되었습니다. 이 flow는 mainActivity가 destroy 되어 flow가 cancel 되거나, network 요청으로 fail()이 호출되면 fail callback 내부에 선언한 명시적인 close()로 인하여 flow가 종료됩니다. 따라서 위 MainScope {..} 내부에 collect() 이후에 선언한 "Collect End #1" 로그는 출력되지 않았습니다. 만약 아래와 같이 network 결과로 fail()을 호출하도록 코드를 변경하면, collect가 종료됨을 확인할 수 있습니다.

//TestViewModel.kt

private suspend fun requestNetworkData(resultCallback: NetworkResult) {
      repeat(5) {
         delay(500)
         Log.d(TAG, "requestNetworkData() - ${Thread.currentThread().name}")
//       resultCallback.success(it)
         resultCallback.fail()
      }
}

 

callbackFlow is Cold stream

callbackFlow는 Cold stream입니다. Cold stream이기 때문에 만약 두 곳에서 collect를 한다면 동일한 결과를 두 번 반환합니다. 즉 callbackFlow로 구성한 block이 각각 호출됩니다.

// MainActivity.kt

override fun onCreate() {
...
    MainScope().launch {
        testViewModel.getNetworkResultFlow().collect {
            Log.i(TAG, "Network result#1: $it - ${Thread.currentThread().name}")
        }
        Log.d(TAG,"Collect End #1 - ${Thread.currentThread().name}")
   }
   
    MainScope().launch {
        testViewModel.getNetworkResultFlow().collect {
            Log.i(TAG, "Network result#2: $it - ${Thread.currentThread().name}")
        }
        Log.d(TAG,"Collect End #2 - ${Thread.currentThread().name}")
   }
...
}

 

Buffer의 변경

callbackFlow는 기본적으로 기본 buffer 크기를 사용하도록 고정되어 있습니다. 생성자 param으로 넘겨줄 수 없기 때문에 buffer의 크기를 변경하려면 buffer() operator를 사용할 수 있습니다.

// TestViewModel.kt

    @ExperimentalCoroutinesApi
    fun getNetworkResultFlow(): Flow<String> = callbackFlow {
        ...
    }.buffer(1)

위 코드처럼 buffer()를 사용하여 buffer를 1로 세팅할 수 있습니다.

References

[1] https://developer.android.com/kotlin/flow

[2] https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html

[3] 2018.12.21 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#7 - Channels

[4] 2021.06.07 - [개발이야기/Kotlin] - Kotlin Coroutines 1.5 - GlobalScope Marked as Delicate, Refined Channel API, and More

반응형