Observable은 Consumer가 소비하는 값을 생성하는 역할을 합니다.
Consumer들은 Observable에 구독을 신청하고, Observable은 값을 생성한 후에 Consumer들에게 push 방식으로 값을 전달합니다.
Observable의 주요 이벤트
Observable은 주요하게 아래 event를 발생시킵니다.
- onNext(item T): 값을 전달할때 호출하여 값을 넘겨줌
- onError(e: Throwable): 에러가 발생하면 호출함
- onSubscribe(d: Disposable): 구독을 신청하면 호출해줌
- 이때 넘어오는 Disposable 객체는 Observer가 구독을 해제할때 사용함.
- onComplete(): 가지고 있는 값을 모두 전달하면 호출함.
이 이벤트는 Observer interface에 정의되어 있으므로 Observer interface를 구현한 객체를 Observerable에 구독신청할 수 있습니다.
val observer: Observer<String> = object : Observer<String> {
override fun onComplete() { println("onComplete()")}
override fun onNext(item: String) { println("onNext() - $item") }
override fun onError(e: Throwable) { println("onError() - $") }
override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d ") }
}
Observable의 생성
create
Observable.create를 이용하여 아래와 같이 생성할 수 있습니다.
fun main(args: Array<String>) {
val observable1 = Observable.create {
it.onNext(1)
it.onNext(2)
it.onComplete()
}
val observable2 = Observable.create {
it.onNext(1)
it.onNext(2)
it.onError(Exception("Wow!! exception"))
}
val observer: Observer = object : Observer {
override fun onComplete() = println("onComplete()")
override fun onNext(item: Int) = println("onNext() - $item")
override fun onError(e: Throwable) = println("onError() - $")
override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ")
}
observable1.subscribe(observer)
observable2.subscribe(observer)
}
결과는 아래와 같습니다.onSubscribe() - nullonNext() - 1onNext() - 2onComplete()onSubscribe() - nullonNext() - 1onNext() - 2onError() - Wow!! exception
fromxxx
from을 이용하면 기존 구조체로 부터 Observable을 생성할 수 있습니다.
fun main(args: Array<String>) {
val list = listOf(1, 2, 3)
val listOb = Observable.fromIterable(list)
val call = Callable<Int> { 4 }
val callOb = Observable.fromCallable(call)
val future = object : Future<Int> {
override fun get() = 5
override fun get(timeout: Long, unit: TimeUnit) = 6
override fun isDone() = true
override fun cancel(mayInterruptIfRunning: Boolean) = false
override fun isCancelled() = false
}
val futureOb = Observable.fromFuture(future)
val observer: Observer<Int> = object : Observer<Int> {
override fun onComplete() = println("onComplete()")
override fun onNext(item: Int) = println("onNext() - $item")
override fun onError(e: Throwable) = println("onError() - $")
override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ")
}
listOb.subscribe(observer)
callOb.subscribe(observer)
futureOb.subscribe(observer)
}
결과
onSubscribe() - io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable@6bf2d08eonNext() - 1onNext() - 2onNext() - 3onComplete()onSubscribe() - 0onNext() - 4onComplete()onSubscribe() - 0onNext() - 5onComplete()
- fromIterable(): list 같이 iterable을 지원하는 instance를 Observable 형태로 변경합니다.
- 각 개별 아이템이 하나씩 전달됩니다.
- fromCallable(): Callable 객체를 OBservable형태로 변경합니다.
- call() 함수의 return값이 전달 됩니다.
- fromFuture(): Future 객체를 Observable 형태로 변경합니다.
- get() 함수의 return값이 전달됩니다.
iterable의 경우 아래처럼 코틀린 확장함수인 toObservable()을 이용하여 Observable을 만들수도 있습니다.
(내부적으로는 fromIterable()을 이용합니다.
val listOb = listOf(1,2,3).toObservable()
just
just는 받은 인자를 그대로 전달합니다.
따라서 list를 받든 map을 받는 객체 자체를 전달하며, 여러개를 전달하려면 각각의 인자로 넣어서 호출해야 합니다.
fun main(args: Array<String>) {
val list = listOf(1, 2, 3)
val num = 3
val str = "wow!"
val map = mapOf(1 to "one", 2 to "two")
val justOb = Observable.just(list, num, str, map)
val observer: Observer<Any> = object : Observer<Any> {
override fun onComplete() = println("onComplete()")
override fun onNext(item: Any) = println("onNext() - $item")
override fun onError(e: Throwable) = println("onError() - $")
override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ")
}
justOb.subscribe(observer)
}
결과
onSubscribe() - io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@15615099onNext() - [1, 2, 3]onNext() - 3onNext() - wow!onNext() -onComplete()
넘긴 인자만큼 onNext()를 호출해서 해당 item을 전달하며, 받은 객체 그대로를 전달하는걸 알수 있습니다.
만약 list의 각각의 원소를 전달해야 한다면 위에서 언급한 fromIterable()을 사용해야 겠죠?
또한 just는 모든 item의 전달이 완료되면 onComplete()을 호출해 줍니다.
range
특정 범위만큼 수를 생성하여 전달합니다.
val observer = object : Observer<Int> {
override fun onComplete() = println("onComplete()")
override fun onNext(item: Int) = println("onNext() - $item")
override fun onError(e: Throwable) = println("onError() - $")
override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d ")
}
Observable.range(1,3).subscribe(observer)
결과
onSubscribe() - 0onNext() - 1onNext() - 2onNext() - 3onComplete()
empty
아무값을 전달하지는 않지만 onComplete()를 호출해 줍니다.
val observer = object : Observer {... }
Observable.empty().subscribe(observer)
결과
onSubscribe() - INSTANCEonComplete()
interval
특정 시간 간격으로 0부터 숫자를 증가시키면서 반환합니다.
fun main(args: Array) {
val observer = object : Observer<Long> { ... }
Thread() .start() / /0.3초간 main thread를 대기시킨다.
val th1 = Thread() { Thread.sleep(300)}
th1.start()
th1.join()
}
결과
main 함수이기 때문에 thread sleep를 하지 않으면 process가 죽기 때문에 observer의 수행은 background에서 하고 main thread는 0.3초 block 시킵니다.
onSubscribe() - null
onNext() - 0
onNext() - 1
timer
주어진 시간에 한번만 값을 전달합니다.
Thread() .start() // 0.3초간 대기 한다.
val th2 = Thread() { Thread.sleep(300)}
th2.start() th2.join()
결과
이 함수 역시 그냥 main함수에서 수행하면 process가 종료되어 버리므로, background에서 처리하고 main thread는 0.3초 대기 시키도록 합니다.
onSubscribe() - nullonNext() - 0onComplete()
Subscribe
- subscribe(): Disposable
- subscribe(onNext: Consumer): Disposable
- subscribe(onNext: Consumer, onError: Consumer): Disposable
- subscribe(onNext: Consumer, onError: Consumer, onComplete: Action): Disposable
- subscribe(onNext: Consumer, onError: Consumer, onComplete: Action, onSubscribe: Consumer): Disposable
따라서 observer의 instance를 등록해도 되지만 필요한 callback만 따로 등록해서 사용할 수 도 있습니다.
fun main(args: Array<String>) {
val observable = Observable.range(1,3)
observable.subscribe(
{ item -> println("onNext - $item") },
{ err -> println("onError - $err") },
{ println("onComplete()") }
)
}
결과는 아래와 같습니다.
onNext - 1onNext - 2onNext - 3onComplete()
구독 해지
Observer를 subscribe를 이용해 등록하면 onSubscribe(d: Disposable) = {...} 으로 Disposable의 instance를 전달 받습니다.
Disposable은 interface로 아래와 같습니다.
public interface Disposable {
/** * Dispose the resource, the operation should be idempotent. */
void dispose();
/** * Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
즉 구독이 해지 되었는지를 확인하는 함수와 구독 해지를 요청하는 함수로 구성됩니다.
fun main(args: Array) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val observer = object : Observer<Long> {
lateinit var disposable: Disposable
override fun onComplete() = println("onComplete()")
override fun onNext(item: Long) {
println("onNext() - $item")
if (item >= 5 && disposable.isDisposed == false) {
disposable.dispose()
}
}
override fun onError(e: Throwable) = println("onError() - $")
override fun onSubscribe(d: Disposable) {
println("onSubscribe() - $d ")
disposable = d
}
}
observable.subscribe(observer) // 0.5초 대기후 dispose() 호출
Thread() {
Thread.sleep(1000)
}.apply {
start()
join()
}
}
fun main(args: Array) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val disposable = observable.subscribe( ,
{ err -> println("onError - $err")},
{ println("onComplete()")}
) // 0.5초 대기후 dispose() 호출
Thread() {
Thread.sleep(500)
disposable.dispose()
Thread.sleep(100)
}.apply {
start()
join()
}
}
Hot & Cold Observable
반대의 의미인 Hot Observable은 배출하는 시점이 구독시점이 아니며, 이에 따라 뒤늦게 구독하는 Observer는 이전 데이터를 받지 못하고, 구독 신청 이후의 데이터 부터 받게 됩니다.
Cold Observable
- subscribe가 호출되면 데이터를 배출하기 시작한다. (Observable이 배출하는 동작을 시작한다.)
- 처음부터 모든 데이터가 순서대로 배출된다.
- 구독할때마다 동일한 데이터가 동일한 순서로 배출된다. (데이터가 배출되었다고 소모되지 않는다.)
- subscribe와 상관없이 데이터를 배출한다.
- 구독시점부터 데이터를 전달 받으며, 구독신청전의 데이터는 받을 수 없다.
- Event를 전달받는 형태로 사용함.
ConnectableObservable
Hot observable중에 하나로 connect를 호출하면 배출을 시작합니다.
따라서 observer는 subscribe와 상관없이 구독 신청 시점부터 데이터를 전달받습니다.
fun main(args: Array<String>) {
val connectableObservable = (1..10).toObservable().publish()
// 1번 구독자 등록
connectableObservable.subscribe{ println("first subscriber: $it") }
println("Add first subscriber.")
// 2번 구독자 등록
connectableObservable.map { "second subscriber: $it" }
.subscribe{ println(it) }
println("Add second subscriber.")
// observable connect()
connectableObservable.connect()
// 3번 구독자 등록
connectableObservable.subscribe{ println("Subscription 3: $it") }
}
결과
Add first subscriber.Add second subscriber.first subscriber: 1second subscriber: 1first subscriber: 2second subscriber: 2first subscriber: 3second subscriber: 3first subscriber: 4second subscriber: 4first subscriber: 5second subscriber: 5first subscriber: 6second subscriber: 6first subscriber: 7second subscriber: 7first subscriber: 8second subscriber: 8first subscriber: 9second subscriber: 9first subscriber: 10second subscriber: 10
publish()를 통하여 hot observable로 변경합니다.
subscribe()를 호출하였으나, onNext()가 호출되지 않습니다.
1번과 2번 구독자를 등록후 Observable의 connect()를 호출하면 그때서야 데이터가 배출됩니다.
또한 배출이 완료된 이후에 등록된 3번은 데이터를 하나도 전달받지 못합니다.
아래에서는 동일한 코드에 Observable을 interval로 바꾸어 항상 출력 가능한 상태로 만들고, connect 이전과 이후에 구독 신청에 따라 어떤게 데이터를 수신받는지 확인해 보겠습니다.
fun main(args: Array<String>) {
val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish()
// 1번 구독자 등록
connectableObservable.subscribe { println("1st subscriber: $it") }
// 2번 구독자 등록
connectableObservable.map { "2nd subscriber: $it" }
.subscribe { println(it) }
// observable connect()
connectableObservable.connect()
runBlocking { delay(300) }
// 3번 구독자 등록
connectableObservable.map { "3rd subscriber: $it" }
.subscribe { println(it) }
runBlocking { delay(300)}
}
결과
1st subscriber: 02nd subscriber: 01st subscriber: 12nd subscriber: 11st subscriber: 22nd subscriber: 21st subscriber: 32nd subscriber: 33rd subscriber: 31st subscriber: 42nd subscriber: 43rd subscriber: 41st subscriber: 52nd subscriber: 53rd subscriber: 5
connect() 이후에 300ms을 대기하므로 1번과 2번 구독자는 각각 세개씩 배출된 데이터를 받습니다.
그 이후 3번 구독자가 등록되면 3번은 등록 이후 데이터 부터 전달받습니다.
위에서 쓰인 runBlocking은 코틀린의 coroutine으로 0.3초동안 delay를 주며, 해당 라인에서 대기합니다. (단 해당 Thread가 block 되는건 아닙니다.)
thread.join으로 계속 써오다가 불편하여 코루틴으로 변경하여 예제를 작성했습니다.
'개발이야기 > Kotlin' 카테고리의 다른 글
[RxKotlin] Reactive 코틀린 #4 - Flowables 과 Backpressure (0) | 2019.12.12 |
---|---|
[RxKotlin] Reactive 코틀린 #3 - Subject (0) | 2019.12.06 |
[RxKotlin] Reactive 코틀린 #1 - 개념 및 설치 (0) | 2019.12.04 |
[Kotlin] 코틀린 - 코루틴#11- Asynchronous Flow(2/2) (0) | 2019.11.16 |
[Kotlin] 코틀린 - 코루틴#10- Asynchronous Flow(1/2) (0) | 2019.11.04 |