본문으로 바로가기
반응형

생산자를 병합하는 Operator에 대해서 설명합니다.

기본적인 구현이나 내용은 Coroutine flow의 연산자와 동일합니다.

필요에 따라서 flow의 내용을 언급하며, 상세한 내용은 하기 링크에서 확인할 수 있습니다.

2019/11/16 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#11- Asynchronous Flow(2/2)



startWith

이전에 이미 언급한 연산자로 Observable의 앞쪽에 데이터를 끼워 넣습니다.
이전 예제에서는 단일값을 넣는 형태 였지만 다른 Observable(flowable)을 앞쪽에 끼워 넣을수도 있습니다.

만약 list를 전달한다면, 내부적으로는 Observable로 변경되어 처리됩니다.
아마도 .toObservable()을 사용하겠죠?

fun main(args: Array<String>) {
    val ob = Observable.range(5, 5)

    ob.startWith(listOf(1, 2, 3, 4))
            .subscribe { println("Received $it") }

    println("--------------------")

    ob.startWith(Observable.just(1, 2, 3, 4))//(2)
            .subscribe { println("Received $it") }
}


Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
--------------------
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9

zip | zipWith

zip 연산자는 두개 이상의 Observable(flowable)을 병합 합니다.
각각의 Observable에서 방출하는 값에 대해서 pair를 맞춰서 방출하기 때문에 Observable의 방출 개수가 맞지 않으면, 가장 작은 개수의 Observable에 출력이 맞춰 집니다.
즉! 짝이 없으면 버려 집니다.
fun main(args: Array<String>){
    val ob1 = (1..3).toObservable()
    val ob2 = Observable.just("one", "two", "three", "four")

    println ("------zip()------")
    Observable.zip(ob1, ob2, BiFunction {a: Int, b:String -> "$a: $b" })
    .subscribe { println(it) }

    println ("-----zipWith()-----")
    ob1.zipWith(ob2, BiFunction {a: Int, b:String -> "$a: $b" })
            .subscribe { println(it) }
}


예제에서 ob2의 "four"는 짝이 없으므로 출력되지 못합니다.

------zip()------

1: one

2: two

3: three

-----zipWith()-----

1: one

2: two

3: three



zip은 Observable(Flowable)에서 지원하는 static method로 최대 9개까지 병합할 수 있습니다.
zipWith의 경우 Observable instance에서 제공하는 연산자로 다른 하나와 만 병합이 가능합니다.

추가적으로 zip은 pair를 맞추어 출력되기 때문에 한쪽의 출력이 느리다면 느린쪽에 맞춰 집니다.
zipWithcoroutine flow의 zip 함수와 동일하게 동작합니다.

combineLatest

두개 이상의 Observable(Flowable)을 병합하여 출력합니다.
Observable중 하나라도 방출되면 나머지 Observable의 최신값을 가지고 병합하며, 서로 방출 속도가 달라 다른 observable이 아직 방출된 값이 없는 상태에서 방출하면, 해당 방출은 버려집니다.

fun main(args: Array) = runBlocking {
    val ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    val ob2 = Observable.interval(250, TimeUnit.MILLISECONDS)

    val time = System.currentTimeMillis()
    ob1.subscribe { println("ob1 emit: $it") }
    ob2.subscribe { println("ob2 emit: $it") }

    Observable.combineLatest(ob1, ob2, BiFunction { a: Long, b: Long 
                                                   -> "Time:${System.currentTimeMillis() - time} ob1:$a ob2:$b" })
            .subscribe { println(it) }

    delay(500)
}


아래의 결과를 보면 ob1의 방출이 먼저 시작되나, 0과 1을 방출할 동안 ob2의 방출값이 없기 때문에 ob1의 해당 방출은 버려집니다.

그리고 나서 ob2의 첫번째 방출이 시작되면서 combineLatest로 인한 병합이 시작됩니다.

그 이후로는 ob1, ob2의 방출이 될때마다 각 observable의 최신값으로 병합됩니다.

ob1 emit: 0

ob1 emit: 1

ob2 emit: 0

Time:294 ob1:1 ob2:0

ob1 emit: 2

Time:347 ob1:2 ob2:0

ob1 emit: 3

Time:443 ob1:3 ob2:0

ob1 emit: 4

ob2 emit: 1

Time:543 ob1:4 ob2:0

Time:543 ob1:4 ob2:1

아쉽게도 RxJava의 함수이기 때문에 병합을 나타내는 부분을 kotlin의 람다 표현식 만으로는 사용할 수 가 없네요.

두개를 병합했으니 BiFunction, 3개 이상 병합시 Function3, 4개면 Function4... 를 사용하면 됩니다.

참고로 combineLatest는 coroutine flow의 combineLatest과도 동일한 동작입니다.


merge | mergeWith | mergeArray

combineLatestzip은 여러개의 Observable이 방출하는 값을 가공하여 출력하지만 merge를 사용하면 가공 없이 단순히 다수의 Observable을 합쳐서 출력합니다.

  • merge: Observable의 static 함수로 최대 4개까지 병합
  • mergeArray: Observable의 static 함수로 args로 인자를 받아 다수의 Observable 병합 가능
  • mergeWith: Observable의 instance 객체를 이용하여 다른 Observable과 병합

fun main(args: Array) = runBlocking {
    val ob1 = Observable.just(1,2)
    val ob2 = Observable.just(3,4)

    println("----- merge() ------")
    Observable.merge(ob1, ob2)
            .subscribe{ println(it) }

    println("----- mergeWith() ------")
    ob1.mergeWith(ob2)
            .subscribe{ println(it) }
println("----- mergeArray() ------") val ob3 = Observable.just(5,6) val ob4 = Observable.just(7,8) val ob5 = Observable.just(9,10) Observable.mergeArray(ob1, ob2, ob3, ob4, ob5) .subscribe{ println(it) }
}


단순히 병합만 하는 연산자로 순서는 각 observable의 속도에 따라 방출됩니다. (병합 순서와는 상관이 없습니다.)

(아래는 병합 순서대로 나온것 같지만 실제로는 각 observable의 속도에 따라 방출 됩니다.)

----- merge() ------
1
2
3
4
----- mergeWith() ------
1
2
3
4
----- mergeArray() ------
1
2
3
4
5
6
7
8
9
10


concat | concatWith | concatArray

merge와 유사하게 Observable을 병합하는 작없을 하지만 concat은 선언된 순서를 보장하여 각 Observable을 이어 붙입니다.
따라서 앞선 Observable이 끝나야 다음 Observable이 방출을 할 수 있습니다.

아래 예제처럼 먼저 출력하는 ob1이 느리게 방출되더라도, ob2는 ob1의 방출이 다 끝난 다음에야 방출을 시작합니다.
또한 onComplete는 모든 방출이 끝난 맨 마지막에 한번만 호출됩니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob1 = Observable.just(1, 2)
            .map {
                runBlocking { delay(100) }
                it
            }
    val ob2 = Observable.just(3, 4)

    println("----- concat() ------")
    Observable.concat(ob1, ob2)
            .subscrube(
                    onNext = { println(it) },
                    onComplete = { println("completed") }
            )

    println("----- concatWith() ------")
    ob1.concatWith(ob2)
            .subscrube { println(it) }
println("----- concatArray() ------") val ob3 = Observable.just(5, 6) val ob4 = Observable.just(7, 8) val ob5 = Observable.just(9, 10) Observable.concatArray(ob1, ob2, ob3, ob4, ob5) .subscrube { println(it) }
}


결과는 아래와 같습니다.

----- concat() ------

1

2

3

4

completed

----- concatWith() ------

1

2

3

4

----- concatArray() ------

1

2

3

4

5

6

7

8

9

10



amb | ambArray

여러개의 Observable중 가장 빠르게 시작하는 Observable만 사용하고 나머지 Observable의 방출은 버립니다.
동일한 소스가 여러 서버에 퍼져 있을때 동시에 호출하고 가장 빨리 응답하는 서버의 응답만을 처리하거나, Android에서 여러가지 위치 측정 방법중 가장 빨리 응답하는 결과를 사용할때 유용 하겠네요.

fun main(args: Array<String>) {
    val ob1 = Observable.just(1, 2)
            .map {
                runBlocking { delay(100) }
                it
            }
    val ob2 = Observable.just(3, 4)

    Observable.amb(listOf(ob1, ob2)).blockingSubscribe { println("amb: $it") }
    Observable.ambArray(ob1, ob2).blockingSubscribe { println("ambArray: $it") }
}


amb의 인자로는 Observable의 collection이 사용하며, 직접넣을 경우 args 인자를 받는 ambArray를 사용하면 됩니다.

위 예제 결과로는 ob1의 결과인 1, 2 만 출력됩니다.


groupby

앞선 포스팅에서 toMultiMap으로 grouping된 map을 만들었습니다.

하지만 observable을 Map이나 Collection으로 만들어 반환하는건 바람직하지 않아 보입니다.

그럴꺼면 그냥 iterator를 썼겠죠?

따라서 Groupping하고, 결과로도 Observable을 반환하는 groupby 연산자를 제공합니다.

fun main(args: Array) = runBlocking {
    val ob1 = Observable.range(1, 10)

    ob1.groupBy { it % 2 == 0 }
            .subscribe {
                val key = it.key
                it.subscribe { println("key: $key value: $it") }
            }
}


짝수와 홀수를 구분하다록 처리한 예제이며 groupby의 결과로 Observable<GroupedObservable<K, V>>을 반환하므로 subscribe 내부에서 다시 subscribe를 이용하여 값을 수신합니다.

key: false value: 1

key: true value: 2

key: false value: 3

key: true value: 4

key: false value: 5

key: true value: 6

key: false value: 7

key: true value: 8

key: false value: 9

key: true value: 10


추가로 숫자를 group로 분리하는 예제 입니다.

fun main(args: Array) = runBlocking {
    val ob1 = Observable.range(1, 30)

    val groupedObservable = ob1.groupBy { getKey(it) }

    groupedObservable.subscribe { groupedObservable ->
        groupedObservable.filter { groupedObservable.key == "small number" }
                .subscribe { println("${groupedObservable.key}: $it") }
    }

    groupedObservable.subscribe { groupedObservable ->
        groupedObservable.filter { groupedObservable.key == "middle number" }
                .subscribe { println("${groupedObservable.key}: $it") }
    }

    groupedObservable.subscribe { groupedObservable ->
        groupedObservable.filter { groupedObservable.key == "big number" }
                .subscribe { println("${groupedObservable.key}: $it") }
    }
}

fun getKey(num: Int): String {
    return when {
        num < 10 -> "small number"
        num < 20 -> "middle number"
        else -> "big number"
    }
}

결과는 아래와 같습니다.

small number: 1

small number: 2

small number: 3

small number: 4

small number: 5

small number: 6

small number: 7

small number: 8

small number: 9

middle number: 10

middle number: 11

middle number: 12

middle number: 13

middle number: 14

middle number: 15

middle number: 16

middle number: 17

middle number: 18

middle number: 19

big number: 20

big number: 21

big number: 22

big number: 23

big number: 24

big number: 25

big number: 26

big number: 27

big number: 28

big number: 29

big number: 30


flatMap, concatMap

이 두개의 operator는 Observable이 Observable을 방출할때 flatten 해주는 작업을 합니다.

flatMapconcatMap은 내부적으로 mergeconcat을 사용하기 때문에 flatMap은 각각 Observable의 속도에 따라 방출되고, concatMap은 순서를 보장하며 방출됩니다.

이 둘은 coroutine Flow의 flatMapMergeflatMapConcat과 동일하게 동작합니다.

fun main(args: Array) = runBlocking {
    val ob1 = Observable.range(1, 5)

    println("------ flatMap() ------")
    val elapsedTime1 = measureTimeMillis {
        ob1.flatMap { getDelayedObservable(it) }
                .blockingSubscribe { println(it) }
    }
    println("Elapsed Time:$elapsedTime1")

    println("------ concatMap() ------")
    val elapsedTime2 = measureTimeMillis {
        ob1.concatMap { getDelayedObservable(it) }
                .blockingSubscribe { println(it) }
    }
    println("Elapsed Time:$elapsedTime2")
}

//랜덤 delay를 갖는 observable을 반환
fun getDelayedObservable(value: Int): Observable {
    val delay = Random().nextInt(100)
    return Observable.just(value)
            .map { "Delay:$delay - $it" }
            .delay(delay.toLong(), TimeUnit.MILLISECONDS)
}


결과는 아래와 같습니다.

------ flatMap() ------

Delay:32 - 5

Delay:44 - 4

Delay:68 - 1

Delay:75 - 2

Delay:84 - 3

Elapsed Time:136

------ concatMap() ------

Delay:70 - 1

Delay:45 - 2

Delay:87 - 3

Delay:92 - 4

Delay:61 - 5

Elapsed Time:362


flatMapflatMap 블럭의 Observable을 병합하여 한번에 방출하기 때문에 각각의 dealy에 따라 방출됩니다.
단, concatMap은 순서를 보장하면서 방출하기 때문에 순서대로 delay 만큼 대기한 후에 다음값을 방출합니다.
따라서 방출하는데 걸리는 총 시간은 각각의 delay의 합 이상의 시간이 걸립니다.

예제에서는 subscribe 대신 blockingSubscribe를 사용했습니다.
보통 동일 thread에서 생산과 소비가 진행되는경우 subscribe 블럭이 모든 값을 수신해야만 해당 block을 벗어날 수 있습니다.
하지만 interval이나 timer로 생산된 Observable이나, delay operator로 시작 시간을 조정한 경우 subscribe block은 생산된 결과가 모두 수신될 때 까지 block 되지 않습니다.
(그렇지 않다면 interval의 경우 보통 끊없이 방출되므로 subscribe 블럭을 벗어날 수 없게죠?)

예제에서는 contactMapflatMap의 로그를 보기 쉽게 구분하여 순서대로 출력하기 위해서 사용했습니다.


switchMap

switchMap 역시 flatten하는 역할을 합니다.
flatMap처럼 비동기로 방출하나, Observable이 시간이 오래 걸려 다음 방출보다 값을 늦게 방출한다면 이전 방출은 버려집니다.

fun main(args: Array<String>) {
    Observable.range(1,10)
        .switchMap {
            if (it % 2 == 0) {
                Observable.just(it)
                    .delay(100, TimeUnit.MILLISECONDS)
            } else {
                Observable.just(it)
            }
        }
        .subscribe { println(it) }
}


위 예제에서는 짝수의 경우 100의 delay를 방출하고, 홀수는 바로 방출하도록 합니다.

이런경우 홀수가 출력 될때 방출을 대기하고 있던 이전값 (짝수)는 버려집니다.

1
3
5
7
9



반응형