RxKotlin에도 Collection과 유사한 연산자들이 존재합니다.
가장 친숙한 연산자인 filter나 map을 비롯하여 다양한 연산자를 제공합니다.
Kotlin이나 java stream에서 Collection 관련 연산자를 많이 써봤다면, RxKotlin의 연산자 역시 쉽게 이해할수 있습니다.
(사실 역할은 똑같습니다.)
다만 입력이 Observable 또는 flowable이고, 연산자를 거쳐서 나오는 출력또한 Observable 또는 flowable입니다.
이미 Collection에서 많이 사용하는 연산자는 예제없이 간단하게 언급만하고 넘어가며, collection에 있더라도 잘 쓰지 않았던 연산자나, RxKotlin용 연산자 위주로 예제와 함께 설명합니다.
filter
debounce
fun main(args: Array<String>) = runBlocking<Unit> {
val ob = Observable.create {
runBlocking {
it.onNext("고")
it.onNext("고구")
it.onNext("고구마")
delay(300)
it.onNext("고구마 맛")
it.onNext("고구마 맛있")
it.onNext("고구마 맛있게")
delay(300)
it.onNext("고구마 맛있게 먹")
it.onNext("고구마 맛있게 먹는")
it.onNext("고구마 맛있게 먹는법")
it.onComplete()
}
}
ob.debounce(200, TimeUnit.MILLISECONDS)
.subscribe { println(it) }
}
결과distict | distictUntilChanged
fun distictTest() {
println("distict")
listOf(1,2,3,4,5,1,2,3,4,5,6).toObservable()
.distinct()
.subscribe { print("$it, ") }
println("")
println("distinctUntilChanged")
listOf(1,1,2,3,4,5,5,5,6,1,2,3 ).toObservable()
.distinctUntilChanged()
.subscribe { print("$it, ") }
}
결과
distict
1, 2, 3, 4, 5, 6,
distinctUntilChanged
1, 2, 3, 4, 5, 6, 1, 2, 3,
elementAt | ignoreElements
elementsAt은 배열 연산자와 동일하게 특정 순서에 배출되는 데이터를 획득할수 있습니다.
단, 앞단의 데이터가 모두 배출되고 나서 지정한 순서가 올때까지 기다려야 합니다.
만약 데이터의 배출에는 관심없고, 배출이 완료 되었는지에 대한 판단만 필요하다면 ignoreElements 연산자를 이용할수 있습니다.
ignoreElements 연산자는 onComplete()가 호출될때 값을 전달합니다.
fun main(args: Array) = runBlocking {
val ob = Observable.range(1, 50)
ob.elementAt(3)
.subscribe{println("element 3rd: $it")}
ob.elementAt(100)
.subscribe{println("element 100th: $it")}
ob.ignoreElements()
.subscribe{println("emission completed")}
}
element 3rd: 4
emission completed
위 예제에서 Observable은 50개의 값을 방출하며, elementAt(100)으로 잘못된 값(100번째 추출 요청) 요청시 해당 요청은 무시됩니다.
first | last
fun main(args: Array<String>) = runBlocking<Unit> {
val ob = Observable.range(1,100)
ob.first(2) //first 값이 없으면 기본값 2 출력
.subscribeBy { println("first item: $it") }
ob.last(2) //last 값이 없으면 기본값 2 출력
.subscribeBy { println("last item: $it") }
Observable.empty().first(-1) // 빈 Observable이므로 first가 없음 -> 기본값인 -1 출력
.subscribeBy { println("default item: $it") }
}
결과
first item: 1
last item: 100
default item: -1
take | cast
fun main(args: Array<String>) = runBlocking<Unit> {
val ob = Observable.range(1,100)
ob.cast(Number::class.java)
.take(5)
.subscribe { println(it) }
}
결과
1
2
3
4
5
take로 5개만 뽑아내도록 제한하였고, Int 타입을 부모 클래스인 Number 타입으로 casting해서 출력했습니다.
flatMap
fun main(args: Array<String>) = runBlocking<Unit> {
val ob = Observable.range(1, 3)
ob.flatMap { makeObserver(it) }
.subscribeBy(
onNext = {
println(" item: $it")
},
onComplete = {
println("complete")
}
)
}
fun makeObserver(seed: Int) = Observable.create<Int> {
for (i in 0 until seed) {
it.onNext(seed)
}
it.onComplete()
}
위 예제처럼 Observable을 수신하면서 방출된 결과를 다른 Observable로 넘겨서 또다른 Observable을 만들어 내도록 코드를 작성했습니다.
사실 예제는 간단하지만 실무에서 복잡한 코드를 작성하게 되면 observable 내부에서 observable이 생성되는 일이 간혹 발생할 수 있습니다.
(아마도 collection에서 flatmap 쓰는 정도..인데, 제 경우엔 한두번 써본것 같네요..)
이럴 경우 중간 배출값이 Observable<Observable<Int>> 가 되므로 flatMap으로 flatten 시켜야 하며, 이때 onCompelte()는 모든 Observable의 방출이 끝난 이후에 한번만 호출됩니다.
item: 1
item: 2
item: 2
item: 3
item: 3
item: 3
complete
defaultIfEmpty | switchIfEmpty
private fun createSlowObservable(startNum: Int): Observable {
val endNum = startNum + 10
return Observable.create {
for (i in startNum until endNum) {
it.onNext(i)
runBlocking { delay(100) }
}
it.onComplete()
}
}
fun main(args: Array) = runBlocking {
// 1부터 10까지 100ms 간격으로 출력
val ob = createSlowObservable(1)
val elapsedTime1 = measureTimeMillis {
ob.filter { it == 20 }
.defaultIfEmpty(-1)
.subscribe { println("received: $it") }
}
println("elapsed time1:$elapsedTime1")
// 11부터 20까지 100ms 간격으로 출력
val ob2 = createSlowObservable(11)
val elapsedTime2 = measureTimeMillis {
ob.filter { it == 20 }
.switchIfEmpty(ob2)
.filter { it == 20 }
.defaultIfEmpty(-1)
.subscribe { println("received: $it") }
}
println("elapsed time2:$elapsedTime2")
}
먼저 100ms 단위로 10개의 숫자를 생성하는 createSlowObservable(startNum: Int) 함수를 생성합니다.
이 함수는 넘겨받은 시작값으로 부터 순서대로 10개의 숫자를 생성하여 100ms 단위로 방출 하는 Observable 반환합니다.
첫번째 코드의 경우 1~10까지의 숫자를 생성하나 filter에서 모두 걸러져 방출 값이 없게 됩니다.
이럴경우 기본값으로 -1을 방출합니다.
두번째 코드의 경우 동일하게 1~10까지 숫자를 생성하여 filter으로 인하여 방출값이 없으므로 11~20을 방출하는 observable로 대체시킵니다.
그리고 나서 다시 filter { it == 20 }를 적용하고, 그래도 없을경우 -1을 찍도록 합니다.
received: -1
elapsed time1:1040
received: 20
elapsed time2:2073
결과를 보면 의도한 대로 기본값이 찍히거나, 대체 Observable로 전환되었으나, 걸린 시간은 모든 값을 방출하고 난 후 동작 했음을 알 수 있습니다.
startWith
fun main(args: Array<String>) = runBlocking<Unit> {
val ob = Observable.range(1, 5)
ob.startWith(0)
.subscribe { println("received: $it") }
}
received: 0
received: 1
received: 2
received: 3
received: 4
received: 5
Sorted
fun main(args: Array) = runBlocking {
val ob = Observable.just(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
println("sort()")
ob.startWith(0)
.sorted()
.subscribe { print("$it, ") }
println("")
println("comaprator sort()")
ob.startWith(0)
.sorted { num1, num2 -> if (num1 >= num2) -1 else 1 }
.subscribe { print("$it, ") }
}
sorted()를 이용하여 정렬할수도 있으며, 필요한 경우 comparator를 직접 구현하여 넘겨줄수 있습니다.
sort()
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
comaprator sort()
10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0,
scan | reduce
fun main(args: Array) = runBlocking {
val ob = scanTest()
println("----- scan() -----")
ob.scan { prev, next -> "$prev$next" }
.subscribe { println(it) }
println("")
println("----- reduce() -----")
ob.reduce { prev, next -> "$prev$next" }
.subscribe { println(it) }
println("")
println("----- scan() with debounce() -----")
ob.scan { prev, next -> "$prev$next" }
.debounce (200, TimeUnit.MILLISECONDS)
.subscribe { println(it) }
}
fun scanTest(): Observable {
return Observable.create {
runBlocking { // 중간 중간 delay 함수를 쓰기위해 runBlocking을 사용합니다.
it.onNext("고")
it.onNext("구")
it.onNext("마")
it.onNext(" ")
delay(300)
it.onNext("맛")
it.onNext("있")
it.onNext("게")
it.onNext(" ")
delay(300)
it.onNext("먹")
it.onNext("는")
it.onNext("법")
it.onComplete()
}
}
}
결과
----- scan() -----
고
고구
고구마
고구마
고구마 맛
고구마 맛있
고구마 맛있게
고구마 맛있게
고구마 맛있게 먹
고구마 맛있게 먹는
고구마 맛있게 먹는법
----- reduce() -----
고구마 맛있게 먹는법
----- scan() with debounce() -----
고구마
고구마 맛있게
고구마 맛있게 먹는법
scan은 방출할때마다 누적값을 찍고, reduce는 onComplete()가 호출되면 누적된 최종값을 출력합니다.
마지막 예제처럼 debounce()와 같이 사용하여 응용할 수도 있습니다.
참고로 scan의 경우 scan {..} 블럭의 최종 출력값이 다음 방출시 이전값으로 들어 옵니다.
위 예제처럼 '고'가 들어왔을 때는 이전값이 없으므로 subscribe에서는 '고'만 출력됩니다.
하지만 '구'가 방출될때는 scan param으로 prev 값은 '고' , next 값은 '구'가 됩니다.
다음 출력 '마'가 방출 될때는 이전 scan에 의해서 두값을 합친 string인 '고구'가 prev 값으로 들어오고 next 값은 '마' 가 됩니다.
Emit |
prev |
next |
scan 출력값 ($prev$next) |
고 |
null |
고 |
고 |
구 |
고 |
구 |
고구 |
마 |
고구 |
마 |
고구마 |
|
고구마 |
(공백) |
고구마(공백) |
맛 |
고구마(공백) |
맛 |
고구마 맛 |
... | ... | ... |
toMap | toMultimap
fun main(args: Array<String>) = runBlocking<Unit> {
val ob = Observable.range(1, 10)
ob.toMap({ key -> key }, { value -> value * value })
.subscribeBy { println(it) }
ob.toMultimap {
if (it % 2 == 0) {
"even"
} else {
"odd"
}
}.subscribeBy { println(it) }
}
{1=1, 2=4, 3=9, 4=16, 5=25, 6=36, 7=49, 8=64, 9=81, 10=100}
{even=[2, 4, 6, 8, 10], odd=[1, 3, 5, 7, 9]}
toList | toSortedList
'개발이야기 > Kotlin' 카테고리의 다른 글
[RxKotlin] Reactive 코틀린 #8 - skip, take operators (0) | 2019.12.19 |
---|---|
[RxKotlin] Reactive 코틀린 #7 - Composing operators (2) | 2019.12.18 |
[RxKotlin] Reactive 코틀린 #5 - Processor, sample, throttle, window, buffer (0) | 2019.12.16 |
[RxKotlin] Reactive 코틀린 #4 - Flowables 과 Backpressure (0) | 2019.12.12 |
[RxKotlin] Reactive 코틀린 #3 - Subject (0) | 2019.12.06 |