본문으로 바로가기
반응형

Observable의 경우 생산자 역할을 하고, Observer는 소비자 역할을 합니다.

Observerable에서는 생산과, 소비가 각각 따로 놀기 때문에 만약 Observer의 데이터 처리속도가 느린경우 생산은 대량으로 진행되고, 소비는 천천히 일어나는 상황을 마딱드릴 수 있습니다.

따라서 소비자의 상황에 따라 생산되는 데이터를 조절하기 위해 사용하는 채널을 backpressure라고 하며, Flowable은 backpressure를 지원합니다.

즉 생산속도를 조절할 수 있는거죠~


각 operator(API)의 동작을 설명하기 위한 marvel diagram은 하기 사이트에서 발췌하였습니다.

https://github.com/ReactiveX/RxJava/wiki/Backpressure


각 예제에서 생산, 소비의 속도를 조절하기 위하여 kotlin coroutine의 runBlocking을 사용합니다.

이는 Thread.sleep() 대신 suspend function인 delay()를 사용하기 위함인며, delay()는 thread blocking 없이 delay를 줄 수 있습니다.

좀더 자세한 사용법을 알고 싶다면 코루틴 관련글을 참고 하시기 바랍니다.

https://tourspace.tistory.com/150?category=797357


공통 사용 예제 format (runBlocking)

fun main(args:Array<String>) = runBlocking<Unit> {
    ...
    runBlocking { delay(xxx) }
    ...
}
  • runBlocking{..}은 Structured concurrency에 의해서 내부 코드가 전부 종료될때 까지 기다립니다.
  • dealy()는 suspend 함수로 해당 시간만큼 해당 line에서 머무릅니다. 단 thread 자체가 block 되지는 않습니다.


Flowable

Flowable은 Observable에서 backpressure를 지원하는 버전이라고 볼수 있습니다.
Flowable은 내부적으로 buffer(최대 128개)지원하여, 생산 속도를 제어할 수 있습니다.

Flowable은 기본적으로 Observable의 생성과 동일한 API를 사용합니다.
이전 포스팅에서 사용했던 range(), interval(), just(), generate()등의 함수를 그대로 사용하여 생성가능 합니다.

또한 기존 Observable에 toFlowable()을 이용하여 간단하게 toFlowable() 함수로 Backpressure를 지원하도록 변경할 수 있습니다.
 val observable = Observable.just(1, 2, 3, 4, 5)
 val flowable = observable.toFlowable(BackpressureStrategy.BUFFER)


인자값으로 사용된 BackpressureStarategy.BUFFER는 아래 부분에서 상세하게 설명합니다.

여기서는 toFlowable()로 간단하게 변환이 가능하다는것만 확인하면 됩니다. 



먼저 Observable을 사용하여 데이터를 100ms 단위로 생성하고 소비에는 200ms의 delay가 걸리는 예제를 만듭니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val observable = Observable.range(1,10)
    observable
            .map {
                val str = "Mapping item $it"
                runBlocking { delay(100) }
                println("$str - ${Thread.currentThread().name}")
                str
            }
            .observeOn(Schedulers.computation())
            .subscribe({
                println("Received $it - ${Thread.currentThread().name}")
                runBlocking { delay(200) }
            })

    delay(1000)
}


위에서 observeOn을 이용하여 subscriber가 실행되는 thread를 worker thread로 변경했습니다.

만약 observeOn을 주석처리 하면 생산 및 소비를 동일 thread (여기서는 main)이 진행하므로 생산->소비->생산->소비...가 순서대로 진행됩니다.

하지만 이럴경우 로직이 수행되는데 총 300ms 씩 10번 = 3초가 소요되서 너무 느려집니다.

따라서 observeOn을 이용하여 생산과 소비가 각각 다른 thread에서 수행하도록 (동시에 수행되도록) 코드를 작성합니다.

Mapping item 1 - main

Received Mapping item 1 - RxComputationThreadPool-1

Mapping item 2 - main

Received Mapping item 2 - RxComputationThreadPool-1

Mapping item 3 - main

Mapping item 4 - main

Mapping item 5 - main

Received Mapping item 3 - RxComputationThreadPool-1

Mapping item 6 - main

Received Mapping item 4 - RxComputationThreadPool-1

Mapping item 7 - main

Mapping item 8 - main

Received Mapping item 5 - RxComputationThreadPool-1

Mapping item 9 - main

Mapping item 10 - main

Received Mapping item 6 - RxComputationThreadPool-1

Received Mapping item 7 - RxComputationThreadPool-1

Received Mapping item 8 - RxComputationThreadPool-1

Received Mapping item 9 - RxComputationThreadPool-1

Received Mapping item 10 - RxComputationThreadPool-1


생산이 소비보다 두배 빠르므로 점점 소비 작업이 계속 뒤로 밀립니다.

만약 생산하는 양이 많다면 생산이 먼저 수행되고 소비는 나중에 몰려서 수행 되므로 OutOfMemory가 발생할 수 있습니다.


이제 Flowable로 변경합니다.

Flowable은 기본 128개를 저장할 수있는 buffer를 제공합니다. 

따라서 Buffer의 동작을 보기 위해서 총 150개의 숫자를 delay 없이 생산하고, 처리는 10ms가 걸리도록 수정합니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val flowable = Flowable.range(1, 150)
    flowable
            .map {
                println("Mapping item $it")
                it
            }
            .observeOn(Schedulers.computation())
            .subscribe({
                println("Received item $it")
                runBlocking { delay(10) }
            })

    delay(10000)
}


결과는 아래와 같습니다.

Mapping item 1
...
Mapping item 5
Received item 1
Mapping item 6
Mapping item 7
...
Mapping item 126
Mapping item 127
Mapping item 128
Received item 2
Received item 3
Received item 4
...
Received item 94
Received item 95
Received item 96
Mapping item 129
Mapping item 130
...
Mapping item 149
Mapping item 150
Received item 97
Received item 98
...
Received item 148
Received item 149
Received item 150
생산이 128개가 되면 일단 생산을 멈춥니다.

그리고 96개가 버퍼에서 소비되는 순간 다시 생산이 시작되면서 생산 속도를 적절히 조정되는걸 확인할 수 있습니다.

(내부적으로 128-96 = 32개가 다시 생산이 가능하도록 하는 버퍼의 threshold값인듯 하네요)


간단하게 정리해 본다면 Flowable은 생산속도를 조절함으로써 memory 에러를 방지할 수 있지만 속도가 그만큼 느려집니다.

따라서 Flowable은 생산의 갯수가 수천, 수많개 정도로 많을때 사용하고 그렇지 않은경우 observable을 사용하면 됩니다.


Subscriber

Flowable은 observer 대신에 backpressure를 지원하는 Subscriber를 사용합니다.
Override 해야하는 method는 Observer와 동일합니다만 request() 함수를 이용하여 수신 할 데이터의 개수를 요청해야 합니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    Flowable.range(1, 150)
            .map {
                println("Mapping item $it - ${Thread.currentThread().name}")
                it
            }
            .observeOn(Schedulers.computation())
            .subscribe(MySubscriber())
    delay(5000)
}

class MySubscriber : Subscriber<Int> {
    override fun onSubscribe(s: Subscription) {
//        s.request(5)
        s.request(Long.MAX_VALUE)
    }

    override fun onComplete() = println("onComplete()")

    override fun onNext(t: Int) {
        runBlocking { delay(10) }
        println("onNext(): $t - ${Thread.currentThread().name}")

    }

    override fun onError(e: Throwable) = e.printStackTrace()
}

1부터 150까지 숫자를 방출합니다.

Flowable이므로 128개의 buffer가 사용됩니다.


Mapping item 1 - main

Mapping item 2 - main

...

Mapping item 127 - main

Mapping item 128 - main

onNext(): 1 - RxComputationThreadPool-1

onNext(): 2 - RxComputationThreadPool-1

...

onNext(): 95 - RxComputationThreadPool-1

onNext(): 96 - RxComputationThreadPool-1

Mapping item 129 - RxComputationThreadPool-1

Mapping item 130 - RxComputationThreadPool-1

...

Mapping item 149 - RxComputationThreadPool-1

Mapping item 150 - RxComputationThreadPool-1

onNext(): 97 - RxComputationThreadPool-1

onNext(): 98 - RxComputationThreadPool-1

...

onNext(): 149 - RxComputationThreadPool-1

onNext(): 150 - RxComputationThreadPool-1

onComplete()


request()를 호출하지 않는다면 생산만 되고 실제 소비는 되지 않습니다.

만약 위 소스에서 request(Long.MAX_VALUE)를 주석으로 막고 request(5)를 사용할 경우 아래와 같이 출력됩니다.

Mapping item 1 - main

Mapping item 2 - main

...

Mapping item 127 - main

Mapping item 128 - main

onNext(): 1 - RxComputationThreadPool-1

onNext(): 2 - RxComputationThreadPool-1

onNext(): 3 - RxComputationThreadPool-1

onNext(): 4 - RxComputationThreadPool-1

onNext(): 5 - RxComputationThreadPool-1

하지만 전체 개수를 받아서 처리하지 않았기 때문에 onComplete()는 호출되지 않습니다.

또한 buffer내부의 일정량을 소비하지 못했기 때문에(다시 생산을 진행할수 있는 buffer의 빈공간이 32개 이상 되어야 함.) 추가적인 생산도 진행되지 않습니다.


BackpressureStrategy

flowable은 observable과 유사하거나 동일한 operator로 생성이 가능합니다.
create를 사용하여 flowable을 생성할 경우 backpressure에 대한 처리 옵션을 설정할 수 있습니다.


BackpressureStrategy enum class

backpressure에 대한 처리 옵션은 아래와 같습니다.

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers all onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}



BackpressureStrategy.BUFFER

무제한 버퍼를 제공합니다. (기본값인 128개가 아닙니다.)

따라서 생산하는 데이터 개수가 많고, 수신하여 처리하는쪽이 너무 느릴경우 OutOfMemory가 발생할 수 있습니다.

생산속도는 10ms, 수신 속도는 100ms로 설정한 예제 코드를 생성합니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val flowable = Flowable.create ({
        for(i in 1..200) {
            println("send item $i")
            it.onNext(i)
            runBlocking { delay(10) }
        }
        it.onComplete()
    }, BackpressureStrategy.BUFFER)

    // runBlocking 블럭을 유지하기 위해 observing이 끝날때 까지 대기하기 위해서 사용한다.
    val waitingJob = launch {
        delay(Long.MAX_VALUE)
    }

    flowable.observeOn(Schedulers.computation())
        .subscribe(MySubscriber(waitingJob))
}

class MySubscriber(val job: Job) : Subscriber<Int> {
    override fun onSubscribe(s: Subscription) {
        s.request(200)
    }

    override fun onComplete() {
        println("onComplete()")
        // 완료되면 대기를 멈춘다.
        job.cancel()
    }

    override fun onNext(t: Int) {
        runBlocking { delay(100) }
        println("onNext(): $t - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        println("onError: $e")
        // 완료되면 대기를 멈춘다.
        job.cancel()
    }
}


BackpressureStrategy.BUFFER를 사용하여 200개를 한꺼번에 생산하고, 동시에 수신처리를 합니다.

send item 1

send item 2

...

send item 190

onNext(): 21 - RxComputationThreadPool-1

send item 191

...

send item 199

onNext(): 22 - RxComputationThreadPool-1

send item 200

onNext(): 23 - RxComputationThreadPool-1

onNext(): 24 - RxComputationThreadPool-1

...

onNext(): 199 - RxComputationThreadPool-1

onNext(): 200 - RxComputationThreadPool-1

onComplete()


결과를 보면 제한 없이 버퍼에 계속 쌓습니다.

또한 중간중간 데이터를 소비하다가(소비 속도인 100ms 단위로) 데이터 생산이 끝나면 버퍼에 쌓인 데이터를 전부 일괄 처리하고 끝납니다.

결론적으로 Flowable은 자신의 속도대로 모두 방출하고 수신자 역시 자신의 처리 속도에 따라 모든 데이터를 처리 합니다.

(이는 개념 설명을 위해 생산과 소비를 두개의 thread를 분리하여 각각 처리하도록 했기 때문이기도 합니다.)


BackpressureStrategy.ERROR

소비쪽에서 생산을 따라잡을 수 없는경우 error를 발생시킵니다.

단 기본 버퍼인 128개는 제공합니다. 

fun main(args: Array<String>) = runBlocking<Unit> {
    val flowable: Flowable = Flowable.create ({
        ...
    }, BackpressureStrategy.ERROR)

    flowable.observeOn(Schedulers.computation())
            .subscribe(MySubscriber())
    delay(1000)
}

class MySubscriber : Subscriber<Int> {
    ...
    override fun onError(e: Throwable) = e.printStackTrace()
}

동일한 코드에 create의 인자값만 BackpressureStrategy.ERROR로 변경했습니다.

send item 1

send item 2

...

send item 9

onNext(): 1 - RxComputationThreadPool-1

send item 10

...

send item 18

onNext(): 2 - RxComputationThreadPool-1

send item 19

send item 20

...

send item 128

send item 129

onNext(): 14 - RxComputationThreadPool-1

onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

send item 130

send item 131

...

send item 199

send item 200

기본 buffer 크기인 128개 까지는 데이터 방출이후에 처리가 따라가지 못하므로 onError()을 호출 하고 MissingBackpressureException을 발생시킵니다.

BackpressureStrategy.DROP

수신자가 처리중에 생산자로 부터 전달을 받으면 해당 데이터는 버립니다.

개념상 10개를 생성하더라도 1개를 수신중이라면 나머지 9개는 버려지는 개념이나, 여기도 기본 버퍼(128개)가 존재합니다.

따라서 기본 버퍼개수 128개까지는 버퍼에 쌓여있어 순차적으로 수신자가 처리하나, 버퍼에 담기지 못하는 129개째 부터 수신중에 생산된 데이터 이므로 버려집니다.

수신자의 동작 기준으로 데이터를 처리하고, 처리중에 생산된 데이터는 버려지며, 수신 처리가 끝나면 그 다음으로 수신하는 데이터를 처리합니다.

send item 1

send item 2

...

send item 195

send item 196

onNext(): 22 - RxComputationThreadPool-1

send item 197

send item 198

send item 199

send item 200

onNext(): 23 - RxComputationThreadPool-1

onNext(): 24 - RxComputationThreadPool-1

...

onNext(): 127 - RxComputationThreadPool-1

onNext(): 128 - RxComputationThreadPool-1

onComplete()

데이터 생산은 200개 모두 되었으나, 200개를 다 받을때 까지도 수신단에서 아직 처리를 못했으므로, 버퍼 크기인 128개까지만 저장되고 그 이후부터는 버려집니다.

따라서 수신측에서는 버퍼에 저장된 128개를 처리한 이후에 들어오는 데이터가 없기 때문에 (이미 방출이 끝난상태) 추가 처리 없이 종료 됩니다.


BackpressureStrategy.LATEST

Drop과 유사하게 수신 처리중에 받은 데이터는 무시됩니다. 다만 무시할때 마지막 배출된 값을 저장하여 최종값을 유지하고 있습니다.

수신쪽의 처리가 완료되면 무시된 값들중 최종값 하나를 전달한 후 이후에 배출되는 값을 수신 받습니다.

동일한 코드에 create의 인자값만 BackpressureStrategy.LATEST로 변경했습니다.

send item 1

send item 2

...

send item 199

send item 200

onNext(): 23 - RxComputationThreadPool-1

onNext(): 24 - RxComputationThreadPool-1

...

onNext(): 127 - RxComputationThreadPool-1

onNext(): 128 - RxComputationThreadPool-1

onNext(): 200 - RxComputationThreadPool-1

onComplete()


DROP과 유사하게 동작하나 마지막 값인 200을 전달 받아 처리합니다.

이 동작 역시 수신쪽에서 모든 버퍼의 데이터를 처리한 시점에서 이미 생산자는 데이터 방출을 끝난 상태이기 때문에 추가 처리없이 종료됩니다.


BackpressureStrategy.MISSING

Missing값은 기본적으로 Flowable에서 제공하는 backpressure를 사용하지 않겠다는 의미 입니다.

다만 위에서 언급한 strategy와 동일하게 동작시키기 위해 추가적인 operator를 제공합니다.

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

onBackpressureBuffer

fun main(args: Array<String>) = runBlocking<Unit> {
    val flowable = Flowable.create({
      ...
    }, BackpressureStrategy.MISSING)

   ...

    flowable.onBackpressureBuffer()
            .observeOn(Schedulers.computation())
            .subscribe(MySubscriber(waitingJob))
}

class MySubscriber(val job: Job) : Subscriber {
   ...
}

위와 같이 BackpressureStrategy.MISSING을 사용하고, onBackpressureBuffer()사용하면 BackpressureStrategy.BUFFER를 사용한 것과 동일하게 동작 합니다.

다만 onBackpressureBuffer()는 인자값으로 버퍼 크기를 한정할 수 있습니다.


따라서 버퍼 크기를 한정하도록 해당라인을 아래와 같이 바꾸면, 처리 도중에 buffer 사이즈가 부족하다는 error를 발생시킵니다.

...
    flowable.onBackpressureBuffer(20)
                 .observeOn(Schedulers.computation())
                 .subscribe(MySubscriber(waitingJob))
....

결과

...

send item 44

send item 45

onNext(): 5 - RxComputationThreadPool-1

onError: io.reactivex.exceptions.MissingBackpressureException: Buffer is full

send item 46

send item 47

...


onBackpressureDrop
이 동작 역시 BackpressureStrategy.DROP과 동일하지만 drop되는 정보를 넘겨받을 수 있습니다.
...
flowable.onBackpressureDrop{println("Drop item $it")}
...
결과
send item 125
send item 126
send item 127
send item 128
onNext(): 14 - RxComputationThreadPool-1
send item 129
Drop item 129
send item 130
Drop item 130
send item 131
Drop item 131
send item 132
Drop item 132


기본 버퍼를 다 채우고 난뒤 이후 방출되는 데이터는 drop됨을 알수 있습니다.


onBackpressureLatest

이 동작은 BackpressureStrategy.LATEST와 동일합니다.


반응형