생산자를 병합하는 Operator에 대해서 설명합니다.
기본적인 구현이나 내용은 Coroutine flow의 연산자와 동일합니다.
필요에 따라서 flow의 내용을 언급하며, 상세한 내용은 하기 링크에서 확인할 수 있습니다.
2019/11/16 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#11- Asynchronous Flow(2/2)
startWith
fun main(args: Array<String>) {
val ob = Observable.range(5, 5)
ob.startWith(listOf(1, 2, 3, 4))
.subscribe { println("Received $it") }
println("--------------------")
ob.startWith(Observable.just(1, 2, 3, 4))//(2)
.subscribe { println("Received $it") }
}
zip | zipWith
fun main(args: Array<String>){
val ob1 = (1..3).toObservable()
val ob2 = Observable.just("one", "two", "three", "four")
println ("------zip()------")
Observable.zip(ob1, ob2, BiFunction {a: Int, b:String -> "$a: $b" })
.subscribe { println(it) }
println ("-----zipWith()-----")
ob1.zipWith(ob2, BiFunction {a: Int, b:String -> "$a: $b" })
.subscribe { println(it) }
}
예제에서 ob2의 "four"는 짝이 없으므로 출력되지 못합니다.
------zip()------
1: one
2: two
3: three
-----zipWith()-----
1: one
2: two
3: three
combineLatest
fun main(args: Array) = runBlocking {
val ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
val ob2 = Observable.interval(250, TimeUnit.MILLISECONDS)
val time = System.currentTimeMillis()
ob1.subscribe { println("ob1 emit: $it") }
ob2.subscribe { println("ob2 emit: $it") }
Observable.combineLatest(ob1, ob2, BiFunction { a: Long, b: Long
-> "Time:${System.currentTimeMillis() - time} ob1:$a ob2:$b" })
.subscribe { println(it) }
delay(500)
}
아래의 결과를 보면 ob1의 방출이 먼저 시작되나, 0과 1을 방출할 동안 ob2의 방출값이 없기 때문에 ob1의 해당 방출은 버려집니다.
그리고 나서 ob2의 첫번째 방출이 시작되면서 combineLatest로 인한 병합이 시작됩니다.
그 이후로는 ob1, ob2의 방출이 될때마다 각 observable의 최신값으로 병합됩니다.
ob1 emit: 0
ob1 emit: 1
ob2 emit: 0
Time:294 ob1:1 ob2:0
ob1 emit: 2
Time:347 ob1:2 ob2:0
ob1 emit: 3
Time:443 ob1:3 ob2:0
ob1 emit: 4
ob2 emit: 1
Time:543 ob1:4 ob2:0
Time:543 ob1:4 ob2:1
아쉽게도 RxJava의 함수이기 때문에 병합을 나타내는 부분을 kotlin의 람다 표현식 만으로는 사용할 수 가 없네요.
두개를 병합했으니 BiFunction, 3개 이상 병합시 Function3, 4개면 Function4... 를 사용하면 됩니다.
참고로 combineLatest는 coroutine flow의 combineLatest과도 동일한 동작입니다.
merge | mergeWith | mergeArray
- merge: Observable의 static 함수로 최대 4개까지 병합
- mergeArray: Observable의 static 함수로 args로 인자를 받아 다수의 Observable 병합 가능
- mergeWith: Observable의 instance 객체를 이용하여 다른 Observable과 병합
fun main(args: Array) = runBlocking {
val ob1 = Observable.just(1,2)
val ob2 = Observable.just(3,4)
println("----- merge() ------")
Observable.merge(ob1, ob2)
.subscribe{ println(it) }
println("----- mergeWith() ------")
ob1.mergeWith(ob2)
.subscribe{ println(it) }
println("----- mergeArray() ------")
val ob3 = Observable.just(5,6)
val ob4 = Observable.just(7,8)
val ob5 = Observable.just(9,10)
Observable.mergeArray(ob1, ob2, ob3, ob4, ob5)
.subscribe{ println(it) }
}
단순히 병합만 하는 연산자로 순서는 각 observable의 속도에 따라 방출됩니다. (병합 순서와는 상관이 없습니다.)
(아래는 병합 순서대로 나온것 같지만 실제로는 각 observable의 속도에 따라 방출 됩니다.)
concat | concatWith | concatArray
fun main(args: Array<String>) = runBlocking<Unit> {
val ob1 = Observable.just(1, 2)
.map {
runBlocking { delay(100) }
it
}
val ob2 = Observable.just(3, 4)
println("----- concat() ------")
Observable.concat(ob1, ob2)
.subscrube(
onNext = { println(it) },
onComplete = { println("completed") }
)
println("----- concatWith() ------")
ob1.concatWith(ob2)
.subscrube { println(it) }
println("----- concatArray() ------")
val ob3 = Observable.just(5, 6)
val ob4 = Observable.just(7, 8)
val ob5 = Observable.just(9, 10)
Observable.concatArray(ob1, ob2, ob3, ob4, ob5)
.subscrube { println(it) }
}
결과는 아래와 같습니다.
----- concat() ------
1
2
3
4
completed
----- concatWith() ------
1
2
3
4
----- concatArray() ------
1
2
3
4
5
6
7
8
9
10
amb | ambArray
fun main(args: Array<String>) {
val ob1 = Observable.just(1, 2)
.map {
runBlocking { delay(100) }
it
}
val ob2 = Observable.just(3, 4)
Observable.amb(listOf(ob1, ob2)).blockingSubscribe { println("amb: $it") }
Observable.ambArray(ob1, ob2).blockingSubscribe { println("ambArray: $it") }
}
amb의 인자로는 Observable의 collection이 사용하며, 직접넣을 경우 args 인자를 받는 ambArray를 사용하면 됩니다.
위 예제 결과로는 ob1의 결과인 1, 2 만 출력됩니다.
groupby
앞선 포스팅에서 toMultiMap으로 grouping된 map을 만들었습니다.
하지만 observable을 Map이나 Collection으로 만들어 반환하는건 바람직하지 않아 보입니다.
그럴꺼면 그냥 iterator를 썼겠죠?
따라서 Groupping하고, 결과로도 Observable을 반환하는 groupby 연산자를 제공합니다.
fun main(args: Array) = runBlocking {
val ob1 = Observable.range(1, 10)
ob1.groupBy { it % 2 == 0 }
.subscribe {
val key = it.key
it.subscribe { println("key: $key value: $it") }
}
}
짝수와 홀수를 구분하다록 처리한 예제이며 groupby의 결과로 Observable<GroupedObservable<K, V>>을 반환하므로 subscribe 내부에서 다시 subscribe를 이용하여 값을 수신합니다.
key: false value: 1
key: true value: 2
key: false value: 3
key: true value: 4
key: false value: 5
key: true value: 6
key: false value: 7
key: true value: 8
key: false value: 9
key: true value: 10
추가로 숫자를 group로 분리하는 예제 입니다.
fun main(args: Array) = runBlocking {
val ob1 = Observable.range(1, 30)
val groupedObservable = ob1.groupBy { getKey(it) }
groupedObservable.subscribe { groupedObservable ->
groupedObservable.filter { groupedObservable.key == "small number" }
.subscribe { println("${groupedObservable.key}: $it") }
}
groupedObservable.subscribe { groupedObservable ->
groupedObservable.filter { groupedObservable.key == "middle number" }
.subscribe { println("${groupedObservable.key}: $it") }
}
groupedObservable.subscribe { groupedObservable ->
groupedObservable.filter { groupedObservable.key == "big number" }
.subscribe { println("${groupedObservable.key}: $it") }
}
}
fun getKey(num: Int): String {
return when {
num < 10 -> "small number"
num < 20 -> "middle number"
else -> "big number"
}
}
결과는 아래와 같습니다.
small number: 1
small number: 2
small number: 3
small number: 4
small number: 5
small number: 6
small number: 7
small number: 8
small number: 9
middle number: 10
middle number: 11
middle number: 12
middle number: 13
middle number: 14
middle number: 15
middle number: 16
middle number: 17
middle number: 18
middle number: 19
big number: 20
big number: 21
big number: 22
big number: 23
big number: 24
big number: 25
big number: 26
big number: 27
big number: 28
big number: 29
big number: 30
flatMap, concatMap
이 두개의 operator는 Observable이 Observable을 방출할때 flatten 해주는 작업을 합니다.
flatMap과 concatMap은 내부적으로 merge와 concat을 사용하기 때문에 flatMap은 각각 Observable의 속도에 따라 방출되고, concatMap은 순서를 보장하며 방출됩니다.
이 둘은 coroutine Flow의 flatMapMerge 와 flatMapConcat과 동일하게 동작합니다.
fun main(args: Array) = runBlocking {
val ob1 = Observable.range(1, 5)
println("------ flatMap() ------")
val elapsedTime1 = measureTimeMillis {
ob1.flatMap { getDelayedObservable(it) }
.blockingSubscribe { println(it) }
}
println("Elapsed Time:$elapsedTime1")
println("------ concatMap() ------")
val elapsedTime2 = measureTimeMillis {
ob1.concatMap { getDelayedObservable(it) }
.blockingSubscribe { println(it) }
}
println("Elapsed Time:$elapsedTime2")
}
//랜덤 delay를 갖는 observable을 반환
fun getDelayedObservable(value: Int): Observable {
val delay = Random().nextInt(100)
return Observable.just(value)
.map { "Delay:$delay - $it" }
.delay(delay.toLong(), TimeUnit.MILLISECONDS)
}
결과는 아래와 같습니다.
------ flatMap() ------
Delay:32 - 5
Delay:44 - 4
Delay:68 - 1
Delay:75 - 2
Delay:84 - 3
Elapsed Time:136
------ concatMap() ------
Delay:70 - 1
Delay:45 - 2
Delay:87 - 3
Delay:92 - 4
Delay:61 - 5
Elapsed Time:362
switchMap
fun main(args: Array<String>) {
Observable.range(1,10)
.switchMap {
if (it % 2 == 0) {
Observable.just(it)
.delay(100, TimeUnit.MILLISECONDS)
} else {
Observable.just(it)
}
}
.subscribe { println(it) }
}
위 예제에서는 짝수의 경우 100의 delay를 방출하고, 홀수는 바로 방출하도록 합니다.
이런경우 홀수가 출력 될때 방출을 대기하고 있던 이전값 (짝수)는 버려집니다.
1
3
5
7
9
'개발이야기 > Kotlin' 카테고리의 다른 글
[RxKotlin] Reactive 코틀린 #9 - Error handling operators (0) | 2019.12.19 |
---|---|
[RxKotlin] Reactive 코틀린 #8 - skip, take operators (0) | 2019.12.19 |
[RxKotlin] Reactive 코틀린 #6 - Basic operators (0) | 2019.12.16 |
[RxKotlin] Reactive 코틀린 #5 - Processor, sample, throttle, window, buffer (0) | 2019.12.16 |
[RxKotlin] Reactive 코틀린 #4 - Flowables 과 Backpressure (0) | 2019.12.12 |