본문으로 바로가기

[RxKotlin] Reactive 코틀린 #3 - Subject

category 개발이야기/Kotlin 2019. 12. 6. 17:57
반응형

Hot observable을 구현시 publish() 이외에 subject를 이용할 수 도 있습니다.

Subject는 Observer 역할을 하기 때문에 여러 Observable에 구독을 신청할 수 있고, Overvable의 역할도 하기 때문에 받은 item을 재배출 하거나, 새로운 값을 배출할 수 있습니다.

Observable이면서 Observer인거죠.


subject를 설명하기 위한 마블 다이어그램은 하기 링크에서 작성된 자료를 사용했습니다.

http://reactivex.io/documentation/ko/subject.html


편의를 위해 delay를 kotlin coroutine의 runBlocking을 이용합니다.

(runBlocking으로 thread를 정지시키지는 않지만 delay가 있는 라인에서 해당 delay만큼 라인은 block 됩니다.)


PublishSubject

PublishSubject를 등록할 경우 등록 시점부터 이후 데이터를 전달 받습니다.

출처: http://reactivex.io/documentation/ko/subject.html

fun main(args: Array<String>) {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val subject = PublishSubject.create<Long>()
    observable.subscribe(subject)
    runBlocking { delay(300) }
    subject.subscribe { println("1st: $it") }
    runBlocking { delay(300) }
    subject.subscribe { println("2nd: $it") }
    runBlocking { delay(300) }
}
결과
1st: 3
1st: 4
1st: 5
1st: 6
2nd: 6
1st: 7
2nd: 7
1st: 8
2nd: 8

첫번째 subject가 등록되면서 Observable이 동작을 시작합니다.

이때 300ms 이후에 첫번째 subcriber를 subject에 등록하면, 3번째 부터 데이터를 받기 시작합니다.

이후 300ms 이후에 두번째 subcriber를 subject에 등록하면, 두번째 역시 등록한 시점부터 데이터를 받습니다.

실제 Observable은 cold Observer로 생성했으나, Subject를 구독하고, 이를 통해서 observer를 등록하는 경우 hot observable 처럼 동작합니다.


BehaviorSubject

BehaviorSubject의 경우 등록 시점에 이전에 배출된 직전값 하나를 전달받고 시작합니다.

위와 동일한 예제에서 subject만 교체합니다.

출처: http://reactivex.io/documentation/ko/subject.html

<code>fun main(args: Array<string>) {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val subject = BehaviorSubject.create<long>()
    observable.subscribe(subject)
    runBlocking { delay(300) }
    subject.subscribe { println("1st: $it") }
    runBlocking { delay(300) }
    subject.subscribe { println("2nd: $it") }
    runBlocking { delay(300) }
}
결과

1st: 2

1st: 3

1st: 4

1st: 5

2nd: 5

1st: 6

2nd: 6

1st: 7

2nd: 7

1st: 8

2nd: 8


첫번째 구독자는 직전 값이 2를 전달받고 이후 배출되는 값을 전달 받습니다.

두번째 구독자 또한 직전값인 5를 전달받고 이후 배출되는 값을 전달 받습니다. 


AsyncSubject

AsyncSubject는 Observable의 마지막값을 한번만 배출합니다.
위와 동일한 예제를 사용할 경우 Observable을 interval로 생성하여, 마지막 값이 존재 하지 않습니다. (시간단위로 무한으로 아이템을 생성하기 때문입니다.)
따라서 interval 대신 just를 이용하여 지정된 배출 개수를 갖는 Observable을 생성하여 동작을 확인합니다.
fun main(args: Array<String>) {
    val observable = Observable.just(1,2,3,4,5,6,7,8,9,10)
    val subject = AsyncSubject.create<Int>()
    observable.subscribe(subject)
    subject.subscribe { println("1st: $it") }
    subject.subscribe { println("2nd: $it") }
}

결과는 두 구독자 모두 마지막 값인 10을 return 합니다.

1st: 10

2nd: 10


ReplaySubject

ReplaySubject는 cold observable과 유사하게 등록 시점 이전값을 모두 전달받은후 새로 배출되는 값을 전달 받습니다.

출처: http://reactivex.io/documentation/ko/subject.html

fun main(args: Array) {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val subject = ReplaySubject.create()
    observable.subscribe(subject)
    runBlocking { delay(300) }
    subject.subscribe { println("1st: $it") }
    runBlocking { delay(300) }
    subject.subscribe { println("2nd: $it") }
    runBlocking { delay(300) }
}

결과는 아래와 같습니다.

2nd: 0

2nd: 1

2nd: 2

2nd: 3

2nd: 4

2nd: 5

1st: 6

2nd: 6

1st: 7

2nd: 7

1st: 8

2nd: 8


두개의 구독자 모두 구독 직전의 값을 모두 전달받은뒤 시작합니다.


Subject를 Observable로 활용

위 예제의 경우 Observable에 subscriber로 subject를 등록하여 사용합니다.

하지만 Observable없이 단독으로 subject만으로 Observable역할을 수행하도록 할 수 있습니다.

fun main(args: Array) {
    val observer = object : Observer<String> {
        override fun onComplete(){}
        override fun onNext(item: String) = println("onNext() - $item")
        override fun onError(e: Throwable){}
        override fun onSubscribe(d: Disposable) {}
    }

    val publicSubject = PublishSubject.create()
    publicSubject.subscribe(observer)

    val AsyncSubject = AsyncSubject.create()
    AsyncSubject.subscribe(observer)

    val behaviorSubject = BehaviorSubject.create()
    behaviorSubject.subscribe(observer)

    val replaySubject = ReplaySubject.create()
    replaySubject.subscribe(observer)

    (1..3).forEach {
        publicSubject.onNext("public: $it")
        AsyncSubject.onNext("async: $it")
        behaviorSubject.onNext("behavior: $it")
        replaySubject.onNext("replay: $it")
    }

    AsyncSubject.onComplete()
}

Subject를 생성해서 onNext()나 기타 다른 함수를 직접 호출하여 값을 넘겨줄 수 있습니다.

"replaySubject의 경우 cold observable과 동일하게 동작하는데 왜 필요할까?" 란 의문을 가질 수 있습니다.

Observable없이 subject객체만으로 사용할때 사용하면 되겠죠?


또한 AsyncSubject는 마지막 객체만을 배출하기 때문에 Subject를 사용한다면 명시적으로 onComplete()를 호출해야 마지막값을 전달받을 수 있습니다.


Observerable에 subcriber로 subject를 등록한 경우 데이터를 전달받은 subject는 내부적으로 onNext()를 호출해 줍니다.

그리고 위 예제에서 보듯이 필요에 따라 onNext()를 외부에서도 호출하여 등록된 subscriber에게 특정 데이터 전달이 가능합니다.


추가적으로 Subject의 subscribe() 함수 역시 Observable의 subscribe() 함수와 동일하게 onNext(), onError()..등등을 따로 람다로 등록해줄 수 있습니다.


반응형