본문으로 바로가기
반응형

Observable은 기본적으로 subscribe하는 thread에서 동작됩니다.

interval이나, timer로 Observable을 생성하거나, delay operator를 사용하는 경우가 아니고서는 subscribe가 완료될때까지 thread가 block됩니다.

즉 subscribe{..} 블럭에서 모든 데이터를 수신받고 처리해야하만 해당 블럭을 벗어날 수 있는거죠.


그럼 일단 간단하게 test해 봅니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.just(1, 2, 3)
        .map {
            println("map: $it - ${Thread.currentThread().name}")
            it
        }

    ob.subscribe { println("First: $it - ${Thread.currentThread().name}") }

    println("------------------")

    Thread {ob.subscribe { println("Second: $it - ${Thread.currentThread().name}") }}.start()

    delay(1000)
}


observable에서 mapping을 하면서 thread 이름을 찍고, subscribe에서도 출력하면서 thread 이름을 찍습니다.

그리고 새로운 thead를 하나 띄워서 같은 observable을 다시 subscribe 합니다.

map: 1 - main

First: 1 - main

map: 2 - main

First: 2 - main

map: 3 - main

First: 3 - main

------------------

map: 1 - Thread-0

Second: 1 - Thread-0

map: 2 - Thread-0

Second: 2 - Thread-0

map: 3 - Thread-0

Second: 3 - Thread-0


위 결과로 판단할수 있는것들은 아래와 같습니다.

  • Observable과 Subscribe는 subscribe를 수행한 실행한 thread에서 수행된다.
  • 모든 데이터의 수신이 완료 되어야만 (subscribe {..} 블럭이 끝나야만) 다음 코드 라인이 수행된다.
    • 즉 subscribe {..} 블럭이 끝날때까지 thread는 blocking 된다.
Thread로 띄우긴 했지만 첫번째 subscribe가 끝날때 까지 대기 했다가 그 이후에 thread가 수행되었습니다.

하지만 subscribe를 호출하는 thread가 아닌 다른 thread에서 이런 작업을 수행하기 위해 rxjava에서는 Scheduler를 제공합니다.


Schedulers

  • Scheduler.io()
    • 파일 / network IO 작업을 할때 사용하는 용도입니다.
    • 내부적으로 cachedPool을 사용하기 때문에 thread가 동시에 계속 늘어나면서 생성될수 있으며, 유휴 thread가 있을 경우 재활용됩니다. 
  • Scheduler.computation()
    • cpu 의존적인 계산을 수행을 위한 thread pool을 사용합니다.
    • 코어개수만큼 thread pool을 만들어 사용합니다. (내부적으로 ForkJoinPool을 쓰는게 아닐지 예상해 봅니다..)
  • Scheduler.newThread()
    • new Thread() 처럼 새로운 Thread를 하나 만들어 사용합니다.
  • Scheduler.single()
    • singleThreadPool을 사용하므로, 해당 Scheduler로 여러 작업 수행시 Queuing 되어 순서가 보장됩니다.
  • Scheduler.trampoline()
    • 호출을 수행한 thread를 이용하여 수행합니다.
    • 호출한 스레드 역시 단일 thread 이므로 여러 작업 요청시 Queuing 되어 순서가 보장됩니다.
    • 단, 호출한 스레드를 사용하기 때문에 queuing된 모든 작업이 끝나야만 다음 코드라인이 수행될 수 있습니다.
  • Scheduler.from()
    • Executor를 전달하여 새로운 Scheduler를 생성할 수 있습니다.
  • AndroidSchedulers.mainThread()
    • RxAndroid 사용시 mainThread()에서 수행하기 위한 Scheduler 입니다.


fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.just(1)

    ob.subscribeOn(Schedulers.io())
        .subscribe { println("Schedulers.io() - ${Thread.currentThread().name}") }
    ob.subscribeOn(Schedulers.computation())
        .subscribe { println("Schedulers.computation() - ${Thread.currentThread().name}") }
    ob.subscribeOn(Schedulers.newThread())
        .subscribe { println("Schedulers.newThread() - ${Thread.currentThread().name}") }
    ob.subscribeOn(Schedulers.single())
        .subscribe { println("Schedulers.single() - ${Thread.currentThread().name}") }
    ob.subscribeOn(Schedulers.trampoline())
        .subscribe { println("Schedulers.trampoline() - ${Thread.currentThread().name}") }

    val executor = Executors.newFixedThreadPool(2)
    val customScheduler = Schedulers.from(executor)
    ob.subscribeOn(customScheduler)
        .subscribe { println("Schedulers.from() - ${Thread.currentThread().name}") }

    delay(1000)
}


결과는 아래와 같습니다.

Schedulers.io() - RxCachedThreadScheduler-1

Schedulers.computation() - RxComputationThreadPool-1

Schedulers.newThread() - RxNewThreadScheduler-1

Schedulers.single() - RxSingleScheduler-1

Schedulers.trampoline() - main

Schedulers.from() - pool-2-thread-1


AndroidScheduler는 예제에는 넣지 않았지만 당연하게도 main thread에서 수행됩니다.

그럼 각 Scheduler를 사용시 Scheduler에 따라 코드 수행 순서는 예상처럼 동작하는지 확인해 봅니다.



Schedulers.io vs Schedulers.computation

둘다 비동기로 다른 thread에서 동작하도록 합니다.

이때 Scheduler를 지정하기 위해서는 subscribeOn 연산자를 이용합니다.

(이 연산자에 대한 자세한 설명은 아래에서 따로 합니다.)

fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.just(1,2,3)

    println("start Schedulers.io()")
    ob.subscribeOn(Schedulers.io())
        .subscribe {
            runBlocking { delay(100) }
            println("$it: Schedulers.io() - ${Thread.currentThread().name}")
        }

    println("start Schedulers.computation()")
    ob.subscribeOn(Schedulers.computation())
        .subscribe {
            runBlocking { delay(100) }
            println("$it: Schedulers.computation() - ${Thread.currentThread().name}")
        }

    println("done")

    delay(500)
}


결과

start Schedulers.io()

start Schedulers.computation()

done

1: Schedulers.io() - RxCachedThreadScheduler-1

1: Schedulers.computation() - RxComputationThreadPool-1

2: Schedulers.computation() - RxComputationThreadPool-1

2: Schedulers.io() - RxCachedThreadScheduler-1

3: Schedulers.computation() - RxComputationThreadPool-1

3: Schedulers.io() - RxCachedThreadScheduler-1


일단 두개의 Observable 자체가 다른 thread에서 수행되면 호출한 thread의 코드인 "start Schedulers.io()" "startSchedulers.computation()" 와 마지막에 있는 "done" 까지 먼저 찍히고 시작됩니다.

이것으로 Observable이 비동기로 동작하는걸 확인할수 있습니다.

두개의 Observable은 각각 다른 Thread에서 비동기로 동작합니다.


Schedulers.single() vsSchedulers.trampoline()

둘다 single 스레드를 사용하여 순서를 보장하지만, single()은 worker thread를 하나 만들어 해당 thread queue에 작업을 넘겨주는 방식이고, trampoline()은 호출한 thread의 queue에 넘겨주는 방식입니다.

둘다 순서를 보장하지만, singlesingle을 사용하는 Observable간의 순서만 보장하고, trampoline은 trampoline을 사용한 observable을 포함한 다른 코드에 까지 영향을 줍니다.

말로는 어려우니 예제로 차이점을 확인해 보겠습니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    val ob = Observable.just(1,2,3)

    println("start - Observable#1")
    ob.subscribeOn(Schedulers.single())
        .subscribe {
            runBlocking { delay(100) }
            println("$it: Observable#1 - ${Thread.currentThread().name}")
        }

    println("start - Observable#2")
    ob.subscribeOn(Schedulers.single())
        .subscribe {
            runBlocking { delay(100) }
            println("$it: Observable#2 - ${Thread.currentThread().name}")
        }

    println("done")

    delay(1000)
}


결과

start - Observable#1

start - Observable#2

done

1: Observable#1 - RxSingleScheduler-1

2: Observable#1 - RxSingleScheduler-1

3: Observable#1 - RxSingleScheduler-1

1: Observable#2 - RxSingleScheduler-1

2: Observable#2 - RxSingleScheduler-1

3: Observable#2 - RxSingleScheduler-1


"start xxx""done"는 먼저 찍혔지만 Observable간에는 순서가 보장됐습니다.

"start xxx" 로그와 "done"은 main thread에서 찍었지만 Observable의 동작은 하나의 worker thread에서 실행되었기 때문입니다.


만약 Scheduler 부분만 trampoline()으로 변경할 경우 모두 시작 thread인 main에서 시작되므로 아래와 같은 순서로 출력됩니다.

start - Observable#1

1: Observable#1 - main

2: Observable#1 - main

3: Observable#1 - main

start - Observable#2

1: Observable#2 - main

2: Observable#2 - main

3: Observable#2 - main

done


코드 순서로 진행되었으며, 모두 코드를 호출한 main thread에서 수행되었습니다.


두번째 예제로 subscribe 사용시 block되지 않는 interval을 이용한 Observable에 trampoline()을 사용해 보겠습니다.

fun main(args: Array) = runBlocking {
    println("start!")
    val ob = Observable.interval(100, TimeUnit.MILLISECONDS)

    ob.subscribeOn(Schedulers.trampoline())
            .subscribe { println(it) }
    
    println("end!")
}


결과는 아래와 같습니다.

start!

end


결국 trampoline을 쓰더라도 시간값으로 생산하는 Observable은 영향을 미치지 않습니다.


subscribeOn vs observeOn

subscribeOn은 어느 위치에서 선언되든지 Observabe과 Observer 모두 특정 scheduler에서 동작하도록 지정합니다.

데이터의 생산과 소비를 동일한 스케줄러를 사용하도록 지정해 줍니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    Observable.range(1, 3)
        .map {
            println("mapping - ${Thread.currentThread().name}")
            it
        }
        .subscribeOn(Schedulers.io())
        .subscribe {
            println("subscribe $it - ${Thread.currentThread().name}")
        }

    delay(100)
}


아래 결과를 보면 Observable의 생산과 처리 모두 io scheduler에서 수행 했음을 알수 있습니다.

mapping - RxCachedThreadScheduler-1

subscribe 1 - RxCachedThreadScheduler-1

mapping - RxCachedThreadScheduler-1

subscribe 2 - RxCachedThreadScheduler-1

mapping - RxCachedThreadScheduler-1

subscribe 3 - RxCachedThreadScheduler-1


observeOn은 선언부분 이하의 downstream이 사용할 scheduler를 지정합니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    Observable.just(1)
        .observeOn(Schedulers.io())
        .map {
            println("mapping#1 - ${Thread.currentThread().name}")
            it
        }
        .observeOn(Schedulers.computation())
        .map {
            println("mapping#2 - ${Thread.currentThread().name}")
            it
        }
        .observeOn(Schedulers.single())
        .subscribe {
            println("subscribe $it - ${Thread.currentThread().name}")
        }

    delay(100)
}


mapping#1 - RxCachedThreadScheduler-1

mapping#2 - RxComputationThreadPool-1

subscribe 1 - RxSingleScheduler-1


observeOn을 이용하여 각각의 작업을 지정한 Scheduler에서 수행하도록 변경했습니다.

연산자 하나로 손쉽게 context를 switching 할수 있습니다.


만약 안드로이드였다면 백그라운드에서 작업을 수행하고, 처리가 완료되면 데이터를 UI에서 보여줄수 있도록 하기위해 observeOn(AndroidSchedulers.mainThread())를 통해 subscribe를 하도록 한다면 간단하게 비동기 작업을 처리할 수 있습니다.


subscribeOn vs observeOn 의 우선순위

subscribeOn은 구문 어디에 들어있든 선언된 Scheduler로 생산과, 소비 모두를 동작 시킵니다.
observeOn은 선언된 이후 하위 stream의 처리를 선언된 Scheduler로 실행시킵니다.
만약 이 두가지를 섞어서 쓴다면 어떤 Scheduler로 동작하는지 테스트해 봅니다.

case #1 subscribeOn과 observeOn의 혼용

데이터 생산 및 처리는 background에서 수행시키고, 결과를 받아와 UI에 출력하도록 합니다.
fun main(args: Array<String>) = runBlocking {
    println("start!")
    val ob = Observable.just(1)

    ob.subscribeOn(Schedulers.io())
            .map {
                println("processing in ${Thread.currentThread().name}")
                it
            }
//            .observeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.single())
            .subscribe { println("subscribed: $it - ${Thread.currentThread().name}") }

    delay(100)
    println("end!")
}


안드로이드 환경이라면 주석으로 막은 부분을 사용하면 되지만, 일단 일반적인 java main 프로그램에서 수행하기 위해 Observable의 생산은 io Scheduler, 수신은 single Scheduler에서 하도록 합니다.

start!

processing in RxCachedThreadScheduler-1

subscribed: kotlin.Unit - RxSingleScheduler-1

end!

의도한 대로 각각 해당 Scheduler에서 동작합니다.


case #2 observeOn 아래에 subscriberOn을 사용한다면?


fun main(args: Array<String>) = runBlocking {
    println("start!")
    val ob = Observable.just(1)

    ob.subscribeOn(Schedulers.io())
            .map {
                println("processing in ${Thread.currentThread().name}")
                it
            }
            .observeOn(Schedulers.single())
            .subscribeOn(Schedulers.computation())
            .subscribe { println("subscribed: $it - ${Thread.currentThread().name}") }

    delay(100)
    println("end!")
}


결과

start!

processing in RxCachedThreadScheduler-1

subscribed: 1 - RxSingleScheduler-1

end!


subscribeOn이 두번 쓰였지만 먼저 선언된 Schedulers.io()가 동작합니다. (따라서 Thread는 cachedThread가 쓰입니다.)

observeOn 아래에 subscribeOn이 선언되었지만 subscribe 블럭내 동작은 observeOn의 영향을 받습니다.


Case #3 subscribeOn을 여러개 사용한다면?

fun main(args: Array<String>) = runBlocking {
    println("start!")
    val ob = Observable.just(1)

    ob.subscribeOn(Schedulers.io())
            .map {
                println("processing in ${Thread.currentThread().name}")
                it
            }
            .subscribeOn(Schedulers.computation())
            .subscribe { println("subscribed: $it - ${Thread.currentThread().name}") }

    delay(100)
    println("end!")
}


결과
start!
processing in RxCachedThreadScheduler-1
subscribed: 1 - RxCachedThreadScheduler-1
end!

결국 subscribeOn을 중복해서 사용하면 먼저 선언된 Scheduler로 동작함을 알 수 있습니다.


Case #4 subscribeOn의 위치는 정말 상관 없을까?

fun main(args: Array) = runBlocking {
    println("start!")
    val ob = Observable.just(1)

    ob.map {
        println("processing in ${Thread.currentThread().name}")
        it
    }
            .observeOn(Schedulers.single())
            .subscribeOn(Schedulers.computation())
            .subscribe { println("subscribed: $it - ${Thread.currentThread().name}") }

    delay(100)
    println("end!")
}
 결과

start!
processing in RxComputationThreadPool-1
subscribed: 1 - RxSingleScheduler-1
end!

observerOn 아래에 subscribeOn을 위치시켰지만, Observable은 computation Scheduler에서 동작하고, subscribe 블럭은 observeOn에서 선언된 single Scheduler에서 정상적으로 수행됨을 알수 있습니다.


정리하면 아래와 같은 rule을 따릅니다.

  1. subscribeOn은 어디에 선언되든 Observable과 subscribe가 동작되는 전체 Scheduler를 지정한다.
  2. subscribeOn이 여러개 선언되면, 가장 먼저 선언된 Scheduler로 동작된다.
  3. subscribeOnobserveOn이 혼용될 경우 subscribeOn은 observeOn 선언 직전 부분의 코드를 실행하고, observeOn 선언 이후부터는 observeOn에서 선언된 Scheduler로 동작된다.


반응형