본문으로 바로가기
반응형

예제에서는 생산의 속도, 소비의 임의 속도 조절을 위해 kotlin의 corouitne을 사용합니다.

runBloking을 사용하여 structured concurrency를 보장하도록 하며, suspend function인 delay()를 사용하여 thread block 없이 코드를 지연시키도록 합니다.

여기서 다루는 내용은 아니지만 coroutine에 대한 자세한 내용은 하기 링크를 참고 바랍니다.

https://tourspace.tistory.com/283


추가적으로 Operator에 관한 내용은 하기 링크를 참고하였습니다.

https://github.com/ReactiveX/RxJava/wiki/Backpressure


ConnectableFlowable

이전에 사용했던 flowable은 cold observable로써 subscriber 등록되는 순간 데이터를 배출 합니다.

Observable에서 hot observable을 ConnectableObservable로 제공하듯이 동일한 방법으로 hot flowable을 만들 수 있습니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val connectableFlowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
            .publish()
    connectableFlowable
            .subscribeOn(Schedulers.computation())
            .subscribe { println("Subscription 1: $it") }


    connectableFlowable.connect()
    delay(500)

    connectableFlowable
            .subscribeOn(Schedulers.io())
            .subscribe { println("Subscription 2 $it") }

    delay(500)
}

100ms 단위당 하나씩 데이터를 배출하는 connectableFlowable을 만들고 총 1초간 출력을 진행합니다.

이중 하나는 처음부터 등록하고, 하나는 500ms 이후에 등록합니다.

Subscription 1: 0

Subscription 1: 1

Subscription 1: 2

Subscription 1: 3

Subscription 1: 4

Subscription 1: 5

Subscription 2 5

Subscription 1: 6

Subscription 2 6

Subscription 1: 7

Subscription 2 7

Subscription 1: 8

Subscription 2 8

Subscription 1: 9

Subscription 2 9

Observable과 동일하게 중간에 등록된 Subscription 2는 중간에 등록되어 중간부터 데이터를 수신받습니다. 


Processor

Processor는 backpressure를 지원하는 Subject입니다.
기본적으로 Subject의 모든 타입에 대해 Processor가 존재합니다.
Subject는 하기 링크에서 확인 가능합니다.

예를들어 가장 심플하게 동작하는 PublishProcessor는 아래와 같습니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val connectableFlowable = Flowable.interval(100, TimeUnit.MILLISECONDS)

    val publishProcessor = PublishProcessor.create()

    publishProcessor.subscribe{println("processor 1 - $it")}
    connectableFlowable.subscribe(publishProcessor)

    delay(300)

    publishProcessor.subscribe{println("processor 2 - $it")}

    delay(300)
}
결과
processor 1 - 0
processor 1 - 1
processor 1 - 2
processor 1 - 3
processor 2 - 3
processor 1 - 4
processor 2 - 4
processor 1 - 5
processor 2 - 5


interval 처럼 지속적으로 데이터가 생산되어 배출되는 경우 backpressure로 생산을 지연시키기 어렵습니다.
따라서 이런 경우 생산을 지연시키기 보다는 소비쪽에서 모아서 처리하거나 버리거나 하는 작업을 수행할 수 있습니다.

Throttling

연속적인 emission을 수신측에서 다 처리할 수 없는 경우 특정 배출만 획득하고 나머지는 버리도록 하는 작업입니다.

sample or throttleLast
정해진 단위 시간내에 배출된 마지막 값을 단위시간이 끝날때 마다 주기적으로 획득합니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val time = System.currentTimeMillis()
    observable.sample(200, TimeUnit.MILLISECONDS)
            .subscribe {
               println("Time: ${System.currentTimeMillis()-time} : $it") 
            }
    delay(1000)
}
결과
Time: 209 : 1
Time: 407 : 2
Time: 608 : 4
Time: 806 : 6
Time: 1009 : 8
 

throttleFirst
정해진 단위 시간내에 배출된 첫번째 값을 단위시간이 시작할때 주기적으로 획득합니다.

동일 예제에서 sample() -> throttleFirst()로 변경하면 아래와 같은 결과가 나옵니다.
Time: 107 : 0
Time: 409 : 3
Time: 708 : 6
Time: 1008 : 9

debounce or throttleWithTimeout
정해진 단위시간 보다 더 긴 간격으로 배출되는 아이템만을 획득합니다.


따라서 위 예제의 경우에는 단순히 debounce()로만 operator를 변경시 아무것도 출력되지 않습니다.
debounce 간격은 200ms 이나, 데이터는 100ms 간격으로 배출되기 때문에 배출간격이 200ms 이상인 아이템이 없기 때문입니다.
만약 debounce의 간격을 200ms -> 90ms으로 변경한다면 모든 데이터가 배출됩니다.
-> 100ms 간격으로 데이터를 배출하므로, 두 데이터간 배출 간격이(debounce) 90ms 이상이므로 모두 조건을 만족하기 때문입니다.


Buffers and Windows


throttling은 연속적인 데이터의 일부를 버리고 규칙에 따라 특정 데이터만 취득하는 반면, bufferwindow는 단위시간동안 해당 데이터를 모아서 수신받는 형태로, 데이터의 손실 없이 배출 간격을 좀더 늦출 수 있습니다.
느리게 동작하는 소비자는 이렇게 모아진 데이터중에서 하나를 골라서 처리하거나, 모아서 병합하거나등의 작업을 따로 구현할 수 있습니다.

buffers
buffer는 조건에 따라서 배출되는 데이터를 묶어 collection 형태로 반환합니다.
인자로 묶을 개수를 넣어줍니다.

Time interval
또는 특정 시간 간격으로 묶는것도 가능합니다.



fun main(args: Array<String>) = runBlocking<Unit> {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val intervalTime = Observable.interval(200, TimeUnit.MILLISECONDS)

    val time = System.currentTimeMillis()
    observable.buffer(intervalTime)
            .subscribe {
                println("Time: ${System.currentTimeMillis()-time} : $it")
            }
    delay(1000)
}


결과

Time: 310 : [0, 1, 2]

Time: 606 : [3, 4, 5]

Time: 907 : [6, 7, 8]

사실 9번도 배출되긴 했지만 3개를 모으지 못했기 때문에 배출되지 않습니다.

또한 여기서 배출되는 데이터 타입은 MutableList<Long> 입니다.


window

buffer와 매우 유사하나, 출력값이 collection이 아닌 Observable로 반환됩니다.

즉, 특정 단위로 묶어 sub-Observable를 배출합니다.

만약 Flowable 이었다면 출력은 flowable이 됩니다.

예제는 Flowable로 작성해 보겠습니다.


Time interval

특정 시간 단위로 묶어서 flowable로 배출합니다.

출처: https://github.com/ReactiveX/RxJava/wiki/Backpressure

fun main(args: Array<String>) = runBlocking<Unit> {
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)

    flowable.window(300, TimeUnit.MILLISECONDS)
            .subscribe { longFlowable ->
                longFlowable.subscribe {
                    print("$it ")
                }
                println("")
            }
    delay(1000)
}

결과

0 1 2 

3 4 5 

6 7 8 

9


count

특정 개수로 묶어서 배출합니다.

출처: https://github.com/ReactiveX/RxJava/wiki/Backpressure

fun main(args: Array<String>) = runBlocking<Unit> {
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)

    flowable.window(5)
            .subscribe { longFlowable ->
                longFlowable.subscribe {
                    print("$it ")
                }
                println("")
            }
    delay(1000)
}



반응형