본문으로 바로가기
반응형

coroutine 1.6.0-RC에서 limitedParalleism이란 API가 추가되었습니다. 이는 특정 목적에 따라서 하나의 실행 flow를(isolated serializable execution flow) 가져가려고 할때를 위한 api 입니다.[1]

이 API는 이전에 제공하던 newFiexedThreadPoolContextnewSingleThreadContext를 대체하기 위함입니다. 

newFixedThreadPoolContext는 아래와 같은 경우에 사용했었습니다.

// 최대 10개까지 DB에 동시 접속하도록 Pool 생성
al DB = newFixedThreadPoolContext(10, "DB")

//사용
withContext(DB) {...}

이렇게 최대 사용 가능한 개수를 제한하기 위해 thread pool을 만들어 놓고 사용하거나, 아니면 Background thread를 써야 하지만 순서대로의 처리가 필요할때 newSingleThreadContext를 사용했었습니다.

문제는 Thread의 추가 생성 및 관리, switching의 비용 발생, 그리고 사용완료 후 명확하게 close해주지 않으면 thead leak 발생등, coroutine의 장점을 이용할 수 없는게 문제이기 때문에 coroutine에서는 권장하는 방법은 아니였습니다. 물론 권장하지는 않아도 대체재가 없기 때문에 사용했었지만요.

따라서 여러 개발자들이 이러 저러한 논의와 나름의 자구책들이 있었고, 결론적으로 coroutine에서 이를 공식적으로 제공하는 기능을 추가했습니다.

Design overview

limitedparallelism() api는 dispatcher의 view를 생성하도록 해줍니다. 이 view는 추가적인 thread나 resource를 사용하지 않고 기존 dispatcher를 이용하여(얹혀서) 동작하고 요청되는 task나 coroutines을 제한(limit)하는 역할을 합니다. 따라서 이 API는 Dispatchers.Default나 Deispatchers.IO에서 사용되면 굉장히 유용해 집니다.

/** 기존 사용 방법 **/
// At most 10 connections to the database
private val myDatabaseDispatcher = newFixedThreadPool(10, "DB")

// Prevent more than 200% of CPU consumption on images
private val myCpuHeavyImageProcessingDispatcher 
                               = newFixedThreadPool(2, "Image rescaling")


/** 신규 API 사용 방법 **/
// At most 10 connections to the database
private val myDatabaseDispatcher = Dispatchers.IO.limitedParallelism(10)

// Prevent more than 200% of CPU consumption on images
private val myCpuHeavyImageProcessingDispatcher
                               = Dispatchers.Default.limitedParallelism(2)

즉 기존에 제공되던 Dispatcher를 사용하여 추가적인 Thread 생성 없이 사용할 개수를 한정할 수 있습니다.

API changes

limitedParralelism은 하기와 같이 CoroutienDispatcher class에 정의되었습니다.

public abstract class CoroutineDispatcher {

    public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher
   
    ...the rest of the methods...
}

따라서 개념상 이 api는 CoroutineDispatcher를 상속받은 모든 Dispatcher에서 사용이 가능합니다. 이는 single-threaded dispatcher인 Dispatchers.Main에서도 호출 가능하다는 의미인데, 사실 Dispatchers.Main에서 이 api를 호출하더라도 return this를 반환하도록 되어 있어 그냥 Dispacthers.Main 가 반환됩니다.

 //MaincoroutineDispatcher.kt
 
 public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
 
     ...
 
     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
         parallelism.checkParallelism()
         // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
         return this
     }
     ...

 

CoroutineDispatcher.kt 파일에 limitedParalledism() 함수의 주석으로 아래와 같은 내용이 적혀 있습니다.

정리하면

  • 현재 Dispatcher에 View를 생성하여 param 값으로 받은 개수로 병렬처리를 제한한다. 주어진 dispatcher를 사용하며, param 개수 이상으로 동시 실행되지 않도록 보장한다.
  • param으로 받은 개수만큼 view를 생성하고  이는 독립적으로 병렬처리를 제어하지만 해당 dispatcher에서 제공하는 개수 이상의 view 요청하더라도 당연히 개수 이상의 view개 생성되지 않는다.
  • 4개의 thread를 갖는 newFixedThreadPoolContext(4, "App Background)를 생성하는 경우 parallelism 값으로 1,2,3,4 등을 줄수 있다. 이때 1인 경우 IO thread중에 한개만 사용하기 때문에 singleThread 처럼 동작하고, 만약 6같은 수를 넣도라도 동시에 처리되는 coroutine은 4개이다.
  • 실제로 Dispatchers.IO, Dispatchers.Default에서 사용하게 될꺼다.
    /**
     * Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
     * The resulting view uses the original dispatcher for execution, but with the guarantee that
     * no more than [parallelism] coroutines are executed at the same time.
     *
     * This method does not impose restrictions on the number of views or the total sum of parallelism values,
     * each view controls its own parallelism independently with the guarantee that the effective parallelism
     * of all views cannot exceed the actual parallelism of the original dispatcher.
     *
     * ### Limitations
     *
     * The default implementation of `limitedParallelism` does not support direct dispatchers,
     * such as executing the given runnable in place during [dispatch] calls.
     * Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
     * For direct dispatchers, it is recommended to override this method
     * and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
     *
     * ### Example of usage
     * ```
     * private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
     * // At most 2 threads will be processing images as it is really slow and CPU-intensive
     * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
     * // At most 3 threads will be processing JSON to avoid image processing starvation
     * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
     * // At most 1 thread will be doing IO
     * private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
     * ```
     * is 6. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
     *
     * Note that this example was structured in such a way that it illustrates the parallelism guarantees.
     * In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a
     * `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them.
     */
    @ExperimentalCoroutinesApi
    public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
        parallelism.checkParallelism()
        return LimitedDispatcher(this, parallelism)
    }

 

References

[1] https://github.com/Kotlin/kotlinx.coroutines/issues/2919

[2] https://github.com/Kotlin/kotlinx.coroutines/issues/261

반응형