본문으로 바로가기
반응형

기존 포스팅에서는 RxKotlin에서 제공하는 operators에 대해서 설명했습니다.

물론 기본적으로 RxJava의 내용이지만 Collection.toObservable() 같은 kotlin 확장 함수도 같이 설명되었습니다.

추가적으로 Observable이나 Flowable에 Extension function을 추가하지 않고도 커스텀 연산자를 선언하여 사용할 수 있습니다.


얼마나 자주 사용할지는, 개개인의 성향과 Custom operator가 얼마나 자주 사용되는지 빈도에 따라 결정되겠지만 일단은 지원한다는데 더 초점을 맞춰서 설명해 보도록 하지요. ㅎㅎ


lift && ObservableOperator<Downstream, Upstream>

Observer로 선언된 부분을 custom하게 변경 하기 위하여 lift 함수를 제공하며, 인자로 ObservableOperator를 전달 받습니다. 
ObservableOperator는 RxJava에서 제공하는 interface로 아래와 같이 apply 함수를 구현하도록 되어 있습니다.

apply 함수는 인자로 자식 observer를 전달받고, apply 함수 내부에서 가공하여 새로운 upstream Observer를 return해 줘야 합니다.

fun main(args: Array<String>) {
    Observable.range(1, 10)
            .lift(SquareNumber())
            .map { "($it)" }
            .subscribe { println("result: $it") }
}

class SquareNumber : ObservableOperator<String, T> {

    override fun apply(observer: Observer<in String>): Observer<in T> {
        return object : Observer<T> {
            override fun onComplete() = observer.onComplete()

            override fun onSubscribe(d: Disposable)= observer.onSubscribe(d)

            override fun onError(e: Throwable) = observer.onError(e)

            override fun onNext(t: T) {
                if (t is Number) {
                    observer.onNext("Input value:$t square:${t.toInt() * t.toInt()}")
                } else {
                    observer.onNext("Input value:$t it is not a number")
                }
            }
        }
    }
}


먼저 ObservableOperator를 구현하는 SquareNumber 클래스를 구현합니다.

apply에서 전달받은 자식 Observer는 한번씩 다 호출해 주나 onNext에서 제곱수를 구하여 출력하도록 바꿉니다.


SqureNumber 클래스를 사용하기 위해 lift 연산자를 이용합니다.

그리고 나서 map으로 출력값을 괄호로 묶어 줍니다.

result: (Input value:1 square:1)

result: (Input value:2 square:4)

result: (Input value:3 square:9)

result: (Input value:4 square:16)

result: (Input value:5 square:25)

completed!!!


출력값은 위와 같습니다.


compose && ObservableTransformer<Upstream, Downstream>

위에서는 custom 연산자로 자식 observer을 받아 parent observer를 반환하는 함수에 대해서 설명했습니다.

이와는 좀 다르게 composeObservableTransformer를 인자로 전달받아 여러 operator를 통합 할 수 있는 역할을 합니다.


ObservableTransformer 인터페이스는 ObservableOperator와 동일하게 apply 함수를 구현하지만 인자로 upstream observable을 받고 downstream observable을 반환하도록 정의되어 있습니다.

(Flowable인 경우 FlowableTransformer를 사용하면 됩니다.)

fun main(args: Array<String>) {
    Observable.range(1, 10)
        .compose(SquareNumber2())
        .map { "square: $it" }
        .blockingSubscribe { println(it) }
}

class SquareNumber2 : ObservableTransformer<Int, Int> {

    override fun apply(upstream: Observable<Int>): ObservableSource<Int> {
        return upstream
            .map { it * it }
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.io())
    }
}

compose 함수를 이용하여 ObservableTransformer 를 구현한 함수를 Observable chain에 추가합니다.

compose의 반환값 역시 Observable입니다.

SqureNumber2 클래스는 넘겨받은 upstream에 값을 square로 만들고 계산은 computation thread에서 실행시키고, subscribe는 io thread에서 하도록 thread를 조정해 줍니다.

결과는 아래와 같습니다.

square: 1

square: 4

square: 9

square: 16

square: 25

square: 36

square: 49

square: 64

square: 81

square: 100


이는 중간 연산자가 복잡하게 사용 되거나, 같은 형태로 여러곳에서 사용할때 만들어 놓고 사용하면 편리할것 같습니다.

하지만 중간 연산자를 복잡하게 사용하거나, 동일한 연산자 chain을 여러곳에서 사용하는일은 많지 않을것 같습니다.

경우에 따라서는 가독성을 해치는 형태가 될수도 있으니 꼭 필요한곳에 사용하면 좋겠네요.

반응형