본문으로 바로가기
반응형

Observable은 Consumer가 소비하는 값을 생성하는 역할을 합니다.

Consumer들은 Observable에 구독을 신청하고, Observable은 값을 생성한 후에 Consumer들에게 push 방식으로 값을 전달합니다.

 

Observable의 주요 이벤트

Observable은 주요하게 아래 event를 발생시킵니다.

  • onNext(item T): 값을 전달할때 호출하여 값을 넘겨줌
  • onError(e: Throwable): 에러가 발생하면 호출함
  • onSubscribe(d: Disposable): 구독을 신청하면 호출해줌
    • 이때 넘어오는 Disposable 객체는 Observer가 구독을 해제할때 사용함.
  • onComplete(): 가지고 있는 값을 모두 전달하면 호출함.
 
따라서 위 callback 함수들을 구현하여 Observable에 등록하면 Observable이 전달하는 이벤트를 받을 수 있습니다.

이 이벤트는 Observer interface에 정의되어 있으므로 Observer interface를 구현한 객체를 Observerable에 구독신청할 수 있습니다.

 

val observer: Observer<String> = object : Observer<String> { 
    override fun onComplete() { println("onComplete()")}    
    override fun onNext(item: String) { println("onNext() - $item") }  
    override fun onError(e: Throwable) { println("onError() - $") }   
    override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d ") }
} 

 

Observable의 생성

Observable을 생성하는 방법에는 여러가지가 존재합니다.
 

create

Observable.create를 이용하여 아래와 같이 생성할 수 있습니다.

fun main(args: Array<String>) {
    val observable1 = Observable.create {
        it.onNext(1) 
        it.onNext(2)    
        it.onComplete() 
    }
    
    val observable2 = Observable.create {
        it.onNext(1)
        it.onNext(2)
        it.onError(Exception("Wow!! exception"))
    }
    
    val observer: Observer = object : Observer { 
        override fun onComplete() = println("onComplete()") 
        override fun onNext(item: Int) = println("onNext() - $item") 
        override fun onError(e: Throwable) = println("onError() - $")
        override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ") 
    }     
    
    observable1.subscribe(observer)
    observable2.subscribe(observer) 
}
결과는 아래와 같습니다.
onSubscribe() - null 
onNext() - 1
onNext() - 2
onComplete()
onSubscribe() - null 
onNext() - 1
onNext() - 2
onError() - Wow!! exception

 

fromxxx

from을 이용하면 기존 구조체로 부터 Observable을 생성할 수 있습니다.

 

fun main(args: Array<String>) {
    val list = listOf(1, 2, 3)
    val listOb = Observable.fromIterable(list)
    val call = Callable<Int> { 4 }
    val callOb = Observable.fromCallable(call)
    val future = object : Future<Int> {
        override fun get() = 5 
        override fun get(timeout: Long, unit: TimeUnit) = 6 
        override fun isDone() = true     
        override fun cancel(mayInterruptIfRunning: Boolean) = false  
        override fun isCancelled() = false     
    } 
    val futureOb = Observable.fromFuture(future)   
    val observer: Observer<Int> = object : Observer<Int> { 
        override fun onComplete() = println("onComplete()")    
        override fun onNext(item: Int) = println("onNext() - $item")  
        override fun onError(e: Throwable) = println("onError() - $")   
        override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ")  
    }  
    listOb.subscribe(observer)
    callOb.subscribe(observer)   
    futureOb.subscribe(observer)
}

결과

 

onSubscribe() - io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable@6bf2d08e 
onNext() - 1
onNext() - 2
onNext() - 3
onComplete()
onSubscribe() - 0 
onNext() - 4
onComplete()
onSubscribe() - 0 
onNext() - 5
onComplete()
  • fromIterable(): list 같이 iterable을 지원하는 instance를 Observable 형태로 변경합니다.
    • 각 개별 아이템이 하나씩 전달됩니다.
  • fromCallable(): Callable 객체를 OBservable형태로 변경합니다.
    • call() 함수의 return값이 전달 됩니다.
  • fromFuture(): Future 객체를 Observable 형태로 변경합니다.
    • get() 함수의 return값이 전달됩니다.

iterable의 경우 아래처럼 코틀린 확장함수인 toObservable()을 이용하여 Observable을 만들수도 있습니다.

(내부적으로는 fromIterable()을 이용합니다.

 

val listOb = listOf(1,2,3).toObservable()

 

 

 

just

just는 받은 인자를 그대로 전달합니다.

따라서 list를 받든 map을 받는 객체 자체를 전달하며, 여러개를 전달하려면 각각의 인자로 넣어서 호출해야 합니다.

 

fun main(args: Array<String>) {
    val list = listOf(1, 2, 3)  
    val num = 3   
    val str = "wow!"  
    val map = mapOf(1 to "one", 2 to "two")  
    val justOb = Observable.just(list, num, str, map)  
    val observer: Observer<Any> = object : Observer<Any> {     
        override fun onComplete() = println("onComplete()")     
        override fun onNext(item: Any) = println("onNext() - $item") 
        override fun onError(e: Throwable) = println("onError() - $")   
        override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ") 
    }  
    justOb.subscribe(observer)
}

결과

onSubscribe() - io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@15615099 
onNext() - [1, 2, 3]
onNext() - 3
onNext() - wow!
onNext() -
onComplete()

 

넘긴 인자만큼 onNext()를 호출해서 해당 item을 전달하며, 받은 객체 그대로를 전달하는걸 알수 있습니다.

만약 list의 각각의 원소를 전달해야 한다면 위에서 언급한 fromIterable()을 사용해야 겠죠?

 

또한 just는 모든 item의 전달이 완료되면 onComplete()을 호출해 줍니다.

 


 

range

특정 범위만큼 수를 생성하여 전달합니다.

 

val observer = object : Observer<Int> {
    override fun onComplete() = println("onComplete()")  
    override fun onNext(item: Int) = println("onNext() - $item")   
    override fun onError(e: Throwable) = println("onError() - $")    
    override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ")
} 

Observable.range(1,3).subscribe(observer)

결과

onSubscribe() - 0 
onNext() - 1
onNext() - 2
onNext() - 3
onComplete()

 

empty

아무값을 전달하지는 않지만 onComplete()를 호출해 줍니다.

 

val observer = object : Observer {... } 
Observable.empty().subscribe(observer)

결과

 

onSubscribe() - INSTANCE 
onComplete()

 

 

 

interval

특정 시간 간격으로 0부터 숫자를 증가시키면서 반환합니다.

 

fun main(args: Array) { 
    val observer = object : Observer<Long> {        ...     } 
    Thread() .start()      / /0.3초간 main thread를 대기시킨다. 
    val th1 = Thread() { Thread.sleep(300)}  
    th1.start()  
    th1.join() 
}

결과

main 함수이기 때문에 thread sleep를 하지 않으면 process가 죽기 때문에 observer의 수행은 background에서 하고 main thread는 0.3초 block 시킵니다.

onSubscribe() - null 

onNext() - 0

onNext() - 1

 

timer

주어진 시간에 한번만 값을 전달합니다.

 

Thread() .start() // 0.3초간 대기 한다.
val th2 = Thread() { Thread.sleep(300)} 
th2.start() th2.join()

결과

이 함수 역시 그냥 main함수에서 수행하면 process가 종료되어 버리므로, background에서 처리하고 main thread는 0.3초 대기 시키도록 합니다.

onSubscribe() - null 
onNext() - 0
onComplete()

 

Subscribe

Observable에서 방출하는 값을 받기 위해서는 subscribe() 함수를 이용해 등록해야 합니다.
위 예제에서는 Observer instance를 등록했으나, 각각의 메서드를 필요한것만 따로 등록할 수 도 있습니다.
 
Observable의 class를 보면 subscribe에 대해 param에 따라 다양한 정의가 되어 있습니다.
  • subscribe(): Disposable
  • subscribe(onNext: Consumer): Disposable
  • subscribe(onNext: Consumer, onError: Consumer): Disposable
  • subscribe(onNext: Consumer, onError: Consumer, onComplete: Action): Disposable
  • subscribe(onNext: Consumer, onError: Consumer, onComplete: Action, onSubscribe: Consumer): Disposable

따라서 observer의 instance를 등록해도 되지만 필요한 callback만 따로 등록해서 사용할 수 도 있습니다.

 

fun main(args: Array<String>) {
    val observable = Observable.range(1,3)  
    observable.subscribe(       
        { item -> println("onNext - $item") },         
        { err -> println("onError - $err") },   
        { println("onComplete()") }
    )
}

결과는 아래와 같습니다.

 

onNext - 1
onNext - 2
onNext - 3
onComplete()

 

구독 해지

Observer를 subscribe를 이용해 등록하면 onSubscribe(d: Disposable) = {...} 으로 Disposable의 instance를 전달 받습니다.

Disposable은 interface로 아래와 같습니다.

 

public interface Disposable {
/**      * Dispose the resource, the operation should be idempotent.      */  
void dispose();
/**      * Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed(); 
}

즉 구독이 해지 되었는지를 확인하는 함수와 구독 해지를 요청하는 함수로 구성됩니다.

 

 

 

fun main(args: Array) { 
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observer = object : Observer<Long> {   
        lateinit var disposable: Disposable    
        override fun onComplete() = println("onComplete()")   
        override fun onNext(item: Long) {        
        println("onNext() - $item")      
            if (item >= 5 && disposable.isDisposed == false) {   
                disposable.dispose()          
            }      
        }
    
        override fun onError(e: Throwable) = println("onError() - $")    
        override fun onSubscribe(d: Disposable) {     
            println("onSubscribe() - $d ")   
            disposable = d     
        }   
    }    
    observable.subscribe(observer)      // 0.5초 대기후 dispose() 호출
    Thread() {   
        Thread.sleep(1000)  
    }.apply {  
        start()
        join()   
    }
} 
만약 subscribe()함수에 따로 따로 각각의 함수를 구현해 넣었다면 return 값으로 Disposable instance를 전달받을 수 있습니다.
따라서 이렇게 전달받은 함수로 dispose()를 호출해 주면 구독이 해지 됩니다.
fun main(args: Array) { 
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val disposable = observable.subscribe( ,    
        { err -> println("onError - $err")},    
        { println("onComplete()")}     
    )      // 0.5초 대기후 dispose() 호출
    Thread() {      
        Thread.sleep(500)   
        disposable.dispose()   
        Thread.sleep(100)   
    }.apply {   
        start()     
        join()  
    }
}

 

Hot & Cold Observable

앞서 언급된 observable들은 subscribe를 신청하면 가지고 있는 데이터를 순서에 맞춰 전부 내보내 줍니다.
여러번 subscribe를 하더라도 처음부터 순서에 맞춰서 동일한 데이터를 내어줍니다.
즉, Observable의 데이터는 subscribe해서 소모되는것이 아니라, 계속 저장되어 있다가 구독자가 추가될때마다 데이터 전부를 내어주도록 되어 있습니다.
이를 Cold Observable이라고 합니다.

 

반대의 의미인 Hot Observable은 배출하는 시점이 구독시점이 아니며, 이에 따라 뒤늦게 구독하는 Observer는 이전 데이터를 받지 못하고, 구독 신청 이후의 데이터 부터 받게 됩니다.

Cold Observable

  • subscribe가 호출되면 데이터를 배출하기 시작한다. (Observable이 배출하는 동작을 시작한다.)
  • 처음부터 모든 데이터가 순서대로 배출된다.
  • 구독할때마다 동일한 데이터가 동일한 순서로 배출된다. (데이터가 배출되었다고 소모되지 않는다.)
Hot Observable
  • subscribe와 상관없이 데이터를 배출한다.
  • 구독시점부터 데이터를 전달 받으며, 구독신청전의 데이터는 받을 수 없다.
  • Event를 전달받는 형태로 사용함.
 

ConnectableObservable

Hot observable중에 하나로 connect를 호출하면 배출을 시작합니다.

따라서 observer는 subscribe와 상관없이 구독 신청 시점부터 데이터를 전달받습니다.

 

fun main(args: Array<String>) {  
    val connectableObservable = (1..10).toObservable().publish()
    // 1번 구독자 등록   
    connectableObservable.subscribe{ println("first subscriber: $it") }  
    println("Add first subscriber.")  
    
    // 2번 구독자 등록  
    connectableObservable.map { "second subscriber: $it" }   
    .subscribe{ println(it) }  
    println("Add second subscriber.") 
    
    // observable connect()  
    connectableObservable.connect()  
    
    // 3번 구독자 등록  
    connectableObservable.subscribe{ println("Subscription 3: $it") } 
}

결과

Add first subscriber.
Add second subscriber.
first subscriber: 1
second subscriber: 1
first subscriber: 2
second subscriber: 2
first subscriber: 3
second subscriber: 3
first subscriber: 4
second subscriber: 4
first subscriber: 5
second subscriber: 5
first subscriber: 6
second subscriber: 6
first subscriber: 7
second subscriber: 7
first subscriber: 8
second subscriber: 8
first subscriber: 9
second subscriber: 9
first subscriber: 10
second subscriber: 10

 

publish()를 통하여 hot observable로 변경합니다.

subscribe()를 호출하였으나, onNext()가 호출되지 않습니다.

1번과 2번 구독자를 등록후 Observable의 connect()를 호출하면 그때서야 데이터가 배출됩니다.

또한 배출이 완료된 이후에 등록된 3번은 데이터를 하나도 전달받지 못합니다.

 

아래에서는 동일한 코드에 Observable을 interval로 바꾸어 항상 출력 가능한 상태로 만들고, connect 이전과 이후에 구독 신청에 따라 어떤게 데이터를 수신받는지 확인해 보겠습니다.

 

fun main(args: Array<String>) { 
    val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish() 
    // 1번 구독자 등록   
    connectableObservable.subscribe { println("1st subscriber: $it") }  

    // 2번 구독자 등록   
    connectableObservable.map { "2nd subscriber: $it" }     
    .subscribe { println(it) }   

    // observable connect()  
    connectableObservable.connect() 
    runBlocking { delay(300) }  

    // 3번 구독자 등록 
    connectableObservable.map { "3rd subscriber: $it" }  
    .subscribe { println(it) } 
    runBlocking { delay(300)}
}

결과

1st subscriber: 0
2nd subscriber: 0
1st subscriber: 1
2nd subscriber: 1
1st subscriber: 2
2nd subscriber: 2
1st subscriber: 3
2nd subscriber: 3
3rd subscriber: 3
1st subscriber: 4
2nd subscriber: 4
3rd subscriber: 4
1st subscriber: 5
2nd subscriber: 5
3rd subscriber: 5

connect() 이후에 300ms을 대기하므로 1번과 2번 구독자는 각각 세개씩 배출된 데이터를 받습니다.

그 이후 3번 구독자가 등록되면 3번은 등록 이후 데이터 부터 전달받습니다.

 

위에서 쓰인 runBlocking은 코틀린의 coroutine으로 0.3초동안 delay를 주며, 해당 라인에서 대기합니다. (단 해당 Thread가 block 되는건 아닙니다.)

thread.join으로 계속 써오다가 불편하여 코루틴으로 변경하여 예제를 작성했습니다.

 

반응형