본문으로 바로가기
반응형

RxKotlin에도 Collection과 유사한 연산자들이 존재합니다.

가장 친숙한 연산자인 filtermap을 비롯하여 다양한 연산자를 제공합니다.


Kotlin이나 java stream에서 Collection 관련 연산자를 많이 써봤다면, RxKotlin의 연산자 역시 쉽게 이해할수 있습니다.

(사실 역할은 똑같습니다.)

다만 입력이 Observable 또는 flowable이고, 연산자를 거쳐서 나오는 출력또한 Observable 또는 flowable입니다.


이미 Collection에서 많이 사용하는 연산자는 예제없이 간단하게 언급만하고 넘어가며, collection에 있더라도 잘 쓰지 않았던 연산자나, RxKotlin용 연산자 위주로 예제와 함께 설명합니다.


filter

기존 collection에서 사용하는 형태와 사용법이 동일 합니다.


debounce

앞서 debounce 연산자는 throttling 부분에서 동작을 설명했었습니다.
빠르게 데이터가 생산되는 경우 특정 시간이상 데이터 생산 간격이 벌어지는 경우에만 값을 출력합니다. 

예를 들어 네이버 검색창에서 원하는 문구를 타이핑하면 타이핑 하는 동안 추천검색어를 하단에 보여줍니다.
이때 "고구마"를 검색한다고 가정하면, "ㄱ""ㅗ"는 따로 타이핑 되지만 두개의 추천검색어를 각각 보여줄 필요는 없습니다.
아마도 빠르게 타이핑해서 지나가기 때문이죠.
따라서 입력 간격이 특정 시간 이상 벌어지는 경우에만 값을 배출하도록 하는게 debounce 연산자의 역할 입니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.create {
        runBlocking {
            it.onNext("고")
            it.onNext("고구")
            it.onNext("고구마")
            delay(300)
            it.onNext("고구마 맛")
            it.onNext("고구마 맛있")
            it.onNext("고구마 맛있게")
            delay(300)
            it.onNext("고구마 맛있게 먹")
            it.onNext("고구마 맛있게 먹는")
            it.onNext("고구마 맛있게 먹는법")
            it.onComplete()
        }
    }

    ob.debounce(200, TimeUnit.MILLISECONDS)
            .subscribe { println(it) }
}
결과
고구마
고구마 맛있게
고구마 맛있게 먹는법

예제에서는 onNext() 자체에서 값을 누적하여 넘겨주도록 했습니다. debounce operator 자체가 값을 스스로 누적하지 않으므로 이점 주의해서 써야 합니다.

distict | distictUntilChanged

distict 연산자는 DB SQL에서 중복 데이터를 걸러낼때 사용하는것처럼 출력하는 모든 연산자를 기억하고 있다가 이전 출력값과 동일한 값이 배출되면 아이템을 걸러냅니다.

반대로 distictUntilChanged 연산자는 바로 직전 연산자와 동일한지만 판단하여 걸러냅니다.
즉, 연속되는 중복값만 걸러집니다.
fun distictTest() {
    println("distict")
    listOf(1,2,3,4,5,1,2,3,4,5,6).toObservable()
            .distinct()
            .subscribe { print("$it, ") }
    
    println("")
    println("distinctUntilChanged")
    
    listOf(1,1,2,3,4,5,5,5,6,1,2,3 ).toObservable()
            .distinctUntilChanged()
            .subscribe { print("$it, ") }
}


결과

distict

1, 2, 3, 4, 5, 6, 

distinctUntilChanged

1, 2, 3, 4, 5, 6, 1, 2, 3,


elementAt | ignoreElements

elementsAt은 배열 연산자와 동일하게 특정 순서에 배출되는 데이터를 획득할수 있습니다.

단, 앞단의 데이터가 모두 배출되고 나서 지정한 순서가 올때까지 기다려야 합니다.


만약 데이터의 배출에는 관심없고, 배출이 완료 되었는지에 대한 판단만 필요하다면 ignoreElements 연산자를 이용할수 있습니다.

ignoreElements 연산자는 onComplete()가 호출될때 값을 전달합니다.

fun main(args: Array) = runBlocking {
    val ob = Observable.range(1, 50)

    ob.elementAt(3)
            .subscribe{println("element 3rd: $it")}

    ob.elementAt(100)
            .subscribe{println("element 100th: $it")}

    ob.ignoreElements()
            .subscribe{println("emission completed")}
}


element 3rd: 4

emission completed

위 예제에서 Observable은 50개의 값을 방출하며, elementAt(100)으로 잘못된 값(100번째 추출 요청) 요청시 해당 요청은 무시됩니다.


first | last

이 연산자 역시 collection에서 사용하는 연산자와 동일하게 첫번째 값이나, 마지막 값을 반환합니다
단 param으로 기본값을 넣어야 하며, firstlast 값이 없을경우 param으로 전달받은 기본값을 반환합니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.range(1,100)
    ob.first(2) //first 값이 없으면 기본값 2 출력
            .subscribeBy { println("first item: $it") }

    ob.last(2) //last 값이 없으면 기본값 2 출력
            .subscribeBy { println("last item: $it") }

    Observable.empty().first(-1) // 빈 Observable이므로 first가 없음 -> 기본값인 -1 출력 
            .subscribeBy { println("default item: $it") }
}


결과

first item: 1

last item: 100

default item: -1



take | cast

take는 앞에서 부터 특정 개수만큼만 수신하고 나머지는 무시합니다.
또한 cast는 배출된 타입의 클래스를 다른 타입 클래스로 casting할때 사용합니다.
(물론 cast 가능한 타입이어야 겠죠?)
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.range(1,100)

    ob.cast(Number::class.java)
            .take(5)
            .subscribe { println(it) }
}


결과

1

2

3

4

5


take로 5개만 뽑아내도록 제한하였고, Int 타입을 부모 클래스인 Number 타입으로 casting해서 출력했습니다.


flatMap

flatMap 역시 collection과 동일한 기능을 제공합니다.
다만 collection에서는 List<List<Int>> 처럼 중복된 colletion을 flatten 시킨다면, 여기서는 Observable<Observable<Int>>Flowable<Flowable<Int>> 처럼 중첩된 생산자를 flatten 시킵니다.

어차피 collection의 flatmap 연산자와 개념은 똑같습니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.range(1, 3)

    ob.flatMap { makeObserver(it) }
            .subscribeBy(
                    onNext = {
                        println(" item: $it")
                    },
                    onComplete = {
                        println("complete")
                    }
            )
}

fun makeObserver(seed: Int) = Observable.create<Int> {
    for (i in 0 until seed) {
        it.onNext(seed)
    }
    it.onComplete()
}


위 예제처럼 Observable을 수신하면서 방출된 결과를 다른 Observable로 넘겨서 또다른 Observable을 만들어 내도록 코드를 작성했습니다.

사실 예제는 간단하지만 실무에서 복잡한 코드를 작성하게 되면 observable 내부에서 observable이 생성되는 일이 간혹 발생할 수 있습니다.

(아마도 collection에서 flatmap 쓰는 정도..인데, 제 경우엔 한두번 써본것 같네요..)

이럴 경우 중간 배출값이 Observable<Observable<Int>> 가 되므로 flatMap으로 flatten 시켜야 하며, 이때 onCompelte()는 모든 Observable의 방출이 끝난 이후에 한번만 호출됩니다.

 item: 1

 item: 2

 item: 2

 item: 3

 item: 3

 item: 3

complete


defaultIfEmpty | switchIfEmpty

filterdebounce등 특정 조건에 맞는 값을 출력하는경우 조건을 맞추지 못하면 출력값이 하나도 없는경우도 발생합니다.
이때 기본값을 설정하는 defaultIfEmpty와 기본 Observable을 설정할 수 있도록 하는 switchIfEmpty 연산자를 제공합니다.
private fun createSlowObservable(startNum: Int): Observable {
    val endNum = startNum + 10
    return Observable.create {
        for (i in startNum until endNum) {
            it.onNext(i)
            runBlocking { delay(100) }
        }
        it.onComplete()
    }
}

fun main(args: Array) = runBlocking {
    // 1부터 10까지 100ms 간격으로 출력
    val ob = createSlowObservable(1)

    val elapsedTime1 = measureTimeMillis {

        ob.filter { it == 20 }
                .defaultIfEmpty(-1)
                .subscribe { println("received: $it") }
    }

    println("elapsed time1:$elapsedTime1")

    // 11부터 20까지 100ms 간격으로 출력
    val ob2 = createSlowObservable(11)
    val elapsedTime2 = measureTimeMillis {
        ob.filter { it == 20 }
            .switchIfEmpty(ob2)
            .filter { it == 20 }
            .defaultIfEmpty(-1)
            .subscribe { println("received: $it") }
    }

    println("elapsed time2:$elapsedTime2")

}


먼저 100ms 단위로 10개의 숫자를 생성하는 createSlowObservable(startNum: Int) 함수를 생성합니다.

이 함수는 넘겨받은 시작값으로 부터 순서대로 10개의 숫자를 생성하여 100ms 단위로 방출 하는 Observable 반환합니다.


첫번째 코드의 경우 1~10까지의 숫자를 생성하나 filter에서 모두 걸러져 방출 값이 없게 됩니다.

이럴경우 기본값으로 -1을 방출합니다.


두번째 코드의 경우 동일하게 1~10까지 숫자를 생성하여 filter으로 인하여 방출값이 없으므로 11~20을 방출하는 observable로 대체시킵니다.

그리고 나서 다시 filter { it == 20 }를 적용하고, 그래도 없을경우 -1을 찍도록 합니다.

received: -1

elapsed time1:1040

received: 20

elapsed time2:2073


결과를 보면 의도한 대로 기본값이 찍히거나, 대체 Observable로 전환되었으나, 걸린 시간은 모든 값을 방출하고 난 후 동작 했음을 알 수 있습니다.


startWith

생산자의 맨 앞에 값을 추가하는 역할을 합니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.range(1, 5)

    ob.startWith(0)
            .subscribe { println("received: $it") }
}


received: 0

received: 1

received: 2

received: 3

received: 4

received: 5


Sorted

정렬을 위해 sorted 연산자를 사용할 수 있습니다.
다만 이 연산자는 모든 방출을 저장했다가, 방출이 완료되면 정렬하기 때문에 시간, 성능, 메모리의 단점이 존재합니다.
너무 느린 방출인 경우 정렬을 위해 방출의 마지막까지 기다려야 하며, 대규모의 방출인 경우 OOM 발생 여지가 있습니다.

fun main(args: Array) = runBlocking {
    val ob = Observable.just(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)

    println("sort()")
    ob.startWith(0)
            .sorted()
            .subscribe { print("$it, ") }
    println("")

    println("comaprator sort()")
    ob.startWith(0)
            .sorted { num1, num2 -> if (num1 >= num2) -1 else 1 }
            .subscribe { print("$it, ") }
}


sorted()를 이용하여 정렬할수도 있으며, 필요한 경우 comparator를 직접 구현하여 넘겨줄수 있습니다.

sort()

0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 

comaprator sort()

10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0,


scan | reduce

scan은 방출시점에 방출된 값과 직전 이전값을 같이 넘겨 받습니다.
따라서 방출되는 값을 누적시켜서 적용하는 용도로 사용하기 유리하며, reduce의 경우 방출되는 값과 이전값을 누적연산하여 마지막 방출때 하나의 값만을 전달 받습니다.
(reduce는 collection의 reduce와 쓰임과 활용이 동일합니다.)


fun main(args: Array) = runBlocking {
    val ob = scanTest()

    println("----- scan() -----")
    ob.scan { prev, next -> "$prev$next" }
            .subscribe { println(it) }

    println("")
    println("----- reduce() -----")
    ob.reduce { prev, next -> "$prev$next" }
            .subscribe { println(it) }

    println("")
    println("----- scan() with debounce() -----")
    ob.scan { prev, next -> "$prev$next" }
            .debounce (200, TimeUnit.MILLISECONDS)
            .subscribe { println(it) }
}

fun scanTest(): Observable {
    return Observable.create {
        runBlocking { // 중간 중간 delay 함수를 쓰기위해 runBlocking을 사용합니다.
            it.onNext("고")
            it.onNext("구")
            it.onNext("마")
            it.onNext(" ")
            delay(300)
            it.onNext("맛")
            it.onNext("있")
            it.onNext("게")
            it.onNext(" ")
            delay(300)
            it.onNext("먹")
            it.onNext("는")
            it.onNext("법")
            it.onComplete()
        }
    }
}

결과

----- scan() -----

고구

고구마

고구마 

고구마 맛

고구마 맛있

고구마 맛있게

고구마 맛있게 

고구마 맛있게 먹

고구마 맛있게 먹는

고구마 맛있게 먹는법


----- reduce() -----

고구마 맛있게 먹는법


----- scan() with debounce() -----

고구마 

고구마 맛있게 

고구마 맛있게 먹는법


scan은 방출할때마다 누적값을 찍고, reduceonComplete()가 호출되면 누적된 최종값을 출력합니다.

마지막 예제처럼  debounce()와 같이 사용하여 응용할 수도 있습니다.


참고로 scan의 경우 scan {..} 블럭의 최종 출력값이 다음 방출시 이전값으로 들어 옵니다.

위 예제처럼 '고'가 들어왔을 때는 이전값이 없으므로 subscribe에서는 '고'만 출력됩니다.

하지만 '구'가 방출될때는 scan param으로 prev 값은 '고' , next 값은 '구'가 됩니다.

다음 출력 '마'가 방출 될때는 이전 scan에 의해서 두값을 합친 string인 '고구'가 prev 값으로 들어오고 next 값은 '마' 가 됩니다.

Emit 

prev 

next

 scan 출력값 ($prev$next)

 고

 null

 고

 고

 구

 고

 구

 고구

 마

 고구

 마

 고구마

 

 고구마

 (공백)

 고구마(공백)

 맛

 고구마(공백)

 맛

 고구마 맛

 ...  ...

 ...


toMap | toMultimap

배출된 정보를 모두 모아 map으로 출력할수 있습니다.
이때 toMap을 사용하면, key와 value를 지정해야하고 동일key에 여러 값을 set하게 되면 마지막값만 저장됩니다.
또한 toMultimap을 사용하면 특정 key로 grouping 할 수 있습니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.range(1, 10)

    ob.toMap({ key -> key }, { value -> value * value })
            .subscribeBy { println(it) }

    ob.toMultimap {
        if (it % 2 == 0) {
            "even"
        } else {
            "odd"
        }
    }.subscribeBy { println(it) }
}


{1=1, 2=4, 3=9, 4=16, 5=25, 6=36, 7=49, 8=64, 9=81, 10=100}

{even=[2, 4, 6, 8, 10], odd=[1, 3, 5, 7, 9]}


toMap을 사용하여 key, value값을 지정해 주면, 모든 방출이 끝날때 map을 onSuccess()로 넘겨 줍니다.
또한 toMultiMap을 사용하면 value를 list로 만들어 넘겨주므로 grouping 기능을 수행할 수 있습니다.


toList | toSortedList

위와 유사하게 방출된 정보를 list 또는 정렬된 list로 만들어 List 객체를 반환합니다.



반응형