예제에서는 생산의 속도, 소비의 임의 속도 조절을 위해 kotlin의 corouitne을 사용합니다.
runBloking을 사용하여 structured concurrency를 보장하도록 하며, suspend function인 delay()를 사용하여 thread block 없이 코드를 지연시키도록 합니다.
여기서 다루는 내용은 아니지만 coroutine에 대한 자세한 내용은 하기 링크를 참고 바랍니다.
https://tourspace.tistory.com/283
추가적으로 Operator에 관한 내용은 하기 링크를 참고하였습니다.
ConnectableFlowable
이전에 사용했던 flowable은 cold observable로써 subscriber 등록되는 순간 데이터를 배출 합니다.
Observable에서 hot observable을 ConnectableObservable로 제공하듯이 동일한 방법으로 hot flowable을 만들 수 있습니다.
fun main(args: Array<String>) = runBlocking<Unit> {
val connectableFlowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
.publish()
connectableFlowable
.subscribeOn(Schedulers.computation())
.subscribe { println("Subscription 1: $it") }
connectableFlowable.connect()
delay(500)
connectableFlowable
.subscribeOn(Schedulers.io())
.subscribe { println("Subscription 2 $it") }
delay(500)
}
100ms 단위당 하나씩 데이터를 배출하는 connectableFlowable을 만들고 총 1초간 출력을 진행합니다.
이중 하나는 처음부터 등록하고, 하나는 500ms 이후에 등록합니다.
Subscription 1: 0
Subscription 1: 1
Subscription 1: 2
Subscription 1: 3
Subscription 1: 4
Subscription 1: 5
Subscription 2 5
Subscription 1: 6
Subscription 2 6
Subscription 1: 7
Subscription 2 7
Subscription 1: 8
Subscription 2 8
Subscription 1: 9
Subscription 2 9
Observable과 동일하게 중간에 등록된 Subscription 2는 중간에 등록되어 중간부터 데이터를 수신받습니다.
Processor
fun main(args: Array<String>) = runBlocking<Unit> {
val connectableFlowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
val publishProcessor = PublishProcessor.create()
publishProcessor.subscribe{println("processor 1 - $it")}
connectableFlowable.subscribe(publishProcessor)
delay(300)
publishProcessor.subscribe{println("processor 2 - $it")}
delay(300)
}
결과Throttling
fun main(args: Array<String>) = runBlocking<Unit> {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val time = System.currentTimeMillis()
observable.sample(200, TimeUnit.MILLISECONDS)
.subscribe {
println("Time: ${System.currentTimeMillis()-time} : $it")
}
delay(1000)
}
결과Buffers and Windows
fun main(args: Array<String>) = runBlocking<Unit> {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val intervalTime = Observable.interval(200, TimeUnit.MILLISECONDS)
val time = System.currentTimeMillis()
observable.buffer(intervalTime)
.subscribe {
println("Time: ${System.currentTimeMillis()-time} : $it")
}
delay(1000)
}
결과
Time: 310 : [0, 1, 2]
Time: 606 : [3, 4, 5]
Time: 907 : [6, 7, 8]
사실 9번도 배출되긴 했지만 3개를 모으지 못했기 때문에 배출되지 않습니다.
또한 여기서 배출되는 데이터 타입은 MutableList<Long> 입니다.
window
buffer와 매우 유사하나, 출력값이 collection이 아닌 Observable로 반환됩니다.
즉, 특정 단위로 묶어 sub-Observable를 배출합니다.
만약 Flowable 이었다면 출력은 flowable이 됩니다.
예제는 Flowable로 작성해 보겠습니다.
Time interval
특정 시간 단위로 묶어서 flowable로 배출합니다.
출처: https://github.com/ReactiveX/RxJava/wiki/Backpressure
fun main(args: Array<String>) = runBlocking<Unit> {
val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
flowable.window(300, TimeUnit.MILLISECONDS)
.subscribe { longFlowable ->
longFlowable.subscribe {
print("$it ")
}
println("")
}
delay(1000)
}
결과
0 1 2
3 4 5
6 7 8
9
count
특정 개수로 묶어서 배출합니다.
출처: https://github.com/ReactiveX/RxJava/wiki/Backpressure
fun main(args: Array<String>) = runBlocking<Unit> {
val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
flowable.window(5)
.subscribe { longFlowable ->
longFlowable.subscribe {
print("$it ")
}
println("")
}
delay(1000)
}
'개발이야기 > Kotlin' 카테고리의 다른 글
[RxKotlin] Reactive 코틀린 #7 - Composing operators (2) | 2019.12.18 |
---|---|
[RxKotlin] Reactive 코틀린 #6 - Basic operators (0) | 2019.12.16 |
[RxKotlin] Reactive 코틀린 #4 - Flowables 과 Backpressure (0) | 2019.12.12 |
[RxKotlin] Reactive 코틀린 #3 - Subject (0) | 2019.12.06 |
[RxKotlin] Reactive 코틀린 #2 - Observable, ConnectableObservable (0) | 2019.12.04 |