본문으로 바로가기
반응형

Observable이 생산하는 값들중 일부를 skip하거나 반대로 일부만 take하는 operator들에 대해 설명합니다.


skip

skip 연산자는 말그대로 방출시점에 조건에 따라 방출을 전달하지 않습니다.

fun main(args: Array<String>) = runBlocking {
    val ob = Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS)

    println("----- skip(count) ------")
    ob.skip(5)
            .blockingSubscribe{
                println(it)
            }

    println("----- skip(time) ------")
    ob.skip(300, TimeUnit.MILLISECONDS)
            .blockingSubscribe{
                println(it)
            }
}


인자값으로 개수를 넘길수도 있고, skip할 시간을 넘길수도 있습니다.

----- skip(count) ------

6

7

8

9

10

----- skip(time) ------

5

6

7

8

9

10


skipLast

skipLast는 정해진 개수 만큼 마지막 방출을 전달하지 않습니다.
skip은 시작점부터 건너뛰고, skipLast는 방출 끝에서 건너뜁니다.
Observable의 특성상 모든 방출이 완료 되어야만 끝을 알수 있기 때문에 skipLast을 사용하면 뒤에서부터 특정개수를 방출 하지는 않지만, 모든 데이터의 방출이 완료되는만큼의 시간이 소요됩니다.
fun main(args: Array<String>) {
    val ob = Observable.create {
        it.onNext(1)
        it.onNext(2)
        it.onNext(3)
        it.onNext(4)
        it.onNext(5)
        it.onNext(6)
        it.onNext(7)
        it.onNext(8)
        it.onNext(9)
        it.onNext(10)
        runBlocking { delay(300) }
        it.onComplete()
    }

    val time = System.currentTimeMillis()
    val elapsedTime = measureTimeMillis {
        ob
                .skipLast(3)
                .subscribe { println("Emission Time:${System.currentTimeMillis() - time} - value:$it") }
    }

    println(elapsedTime)
}


Emission Time:6 - value:1

Emission Time:6 - value:2

Emission Time:6 - value:3

Emission Time:6 - value:4

Emission Time:6 - value:5

Emission Time:7 - value:6

Emission Time:7 - value:7

312


실제로 모든 방출이 끝난 이후에 마지막에 300ms만큼 대기하도록 되어 있지만, 전체 걸린시간은 312ms로 전부 방출될때까지 기다린걸 확인할 수 있습니다.
skipLast역시 인자값으로 시간을 사용할 수 있습니다.


skipWhile

skipWhile은 skip 조건이 만족할때까지 skip되다가 조건이 맞지 않으면 그 이후 나머지는 조건과 상관없이 모두 방출 됩니다.
즉, 1회성 filter 역할을 합니다.
fun main(args: Array<String>) {
    Observable.range(1, 10)
            .skipWhile { it % 3 != 0 }
            .subscribe { println(it) }
}

1~10사이에서 3으로 나눠지지 않는 값은 1,2,4,5,7,8,10 이지만 1회성 filter의 역할을 하기 때문에 3을 방출하면 더이상 조건을 체크하지 않고 모두 방출합니다.

3

4

5

6

7

8

9

10



skipUntil

Observable의 배출 조건을 다른 Observable의 배출시점으로 맞춥니다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    val ob2 = Observable.timer(400, TimeUnit.MILLISECONDS)

    ob1.skipUntil(ob2)
            .subscribe { println(it) }

    delay(1000)
}


결과

3

4

5

6

7

8

9


take

take는 skip과는 반대로 특정 조건에 따라 방출된 데이터를 획득합니다.
fun main(args: Array) = runBlocking {
    val ob = Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS)

    println("----- take(count) ------")
    ob.take(5)
        .blockingSubscribe{
            println(it)
        }

    println("----- take(time) ------")
    ob.take(300, TimeUnit.MILLISECONDS)
        .blockingSubscribe{
            println(it)
        }
} 

----- take(count) ------

1

2

3

4

5

----- take(time) ------

1

2

3

4


takeLast

skipLast와는 반대로 끝에서 조건을 만족하는 개수만큼 가져옵니다.
조건으로 개수를 넘길수도 있고, 시간을 지정할 수 도 있습니다.
이 연산자 또한 모두 방출되야만 끝을 알 수 있기 때문에 모든 방출이 끝날때까지 기다렸다가 값이 전달 됩니다.
fun main(args: Array<String>) = runBlocking {
    val ob = Observable.create {
        it.onNext(1)
        it.onNext(2)
        it.onNext(3)
        it.onNext(4)
        it.onNext(5)
        it.onNext(6)
        it.onNext(7)
        it.onNext(8)
        it.onNext(9)
        it.onNext(10)
        runBlocking { delay(300) }
        it.onComplete()
    }

    val time = System.currentTimeMillis()
    val elapsedTime = measureTimeMillis {
        ob
            .takeLast(4)
            .subscribe { println("Emission Time:${System.currentTimeMillis() - time} - value:$it") }
    }

    println(elapsedTime)
}


Emission Time:313 - value:7

Emission Time:314 - value:8

Emission Time:314 - value:9

Emission Time:314 - value:10

314


takeWhile

특정 조건을 만족하면 방출된값을 획득하고, 조건을 만족하지 않으면 그 이후의 모든값은 버려집니다.
fun main(args: Array<String>) = runBlocking<Unit> {
    Observable.range(1, 10)
        .takeWhile { it % 3 != 0 }
        .subscribe { println(it) }
}

결과

1

2


takeUtil

다른 observable이 출력을 시작하기 직전까지만 값을 획득하고 나머지는 버립니다.
fun main(args: Array) = runBlocking {
    val ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    val ob2 = Observable.timer(400, TimeUnit.MILLISECONDS)

    ob1.takeUntil(ob2)
        .subscribe { println(it) }

    delay(1000)
}

결과

0
1
2
3


반응형