본문으로 바로가기
반응형


오랬만에 코루틴 페이지에 들어갔더니 coroutine builder가 추가되었네요.

그리고 channel도 experimental flag가 떼졌습니다.

(너무 오랬만에 들어가 봤나요?)


여튼 channel 사용이 가능해 지면서 actor의 사용도 가능해 졌으니, 동기화 부분에서도 좀더 코틀린 스럽고 코루틴스럽게 작성이 가능해 졌습니다.

그럼 새로 추가된 builder들에 대해서 알아봅니다.


이 글은 아래 링크의 내용을 기반으로 하여 설명합니다.

https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

이 블로그 포스팅 당시의(2019.11.04) coroutine 버전은 1.3.2 입니다.


Asynchronous Flow

Suspending function은 비동기로 동작하면서 하나의 값을 반환합니다.
Flow는 비동기로 동작하면서 여러개의 값을 반환하는 function을 만들때 사용하는 coroutine builder 입니다.

fun foo(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    foo().forEach { value -> println(value) } 
}
보통 여러개의 값을 반환할때는 Collection을 사용합니다.

위 예제처럼 list로 세개의 값을 반환하는 함수를 만들고 하나씩 출력하면 아래와 같은 코드가 출력됩니다.
1
2
3

뭐 당연한 결과죠.

그럼 이번엔 값을 하나씩 연산하고 출력하도록 하는 코드를 작성하기 위해서 sequence block을 사용해 봅니다.

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    foo().forEach { value -> println(value) } 
}


결과는 동일합니다. 

다만 100ms 간격으로 1,2,3을 찍습니다.

여기서 문제는 Thread.sleep를 사용했기 때문에 이를 실행한 (여기서는 main thread 입니다.) thread가 blocking 됩니다.

(sequence 는 coroutine builder가 아닌 크냥 kotlin에서 제공하는 builder 입니다.)

그래서 이를 방지하게 위해서 suspend 키워드를 사용해서 thread를 blocking 없이 사용하도록 만들 수 있습니다.

suspend fun foo(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking {
    foo().forEach { value -> println(value) } 
}


runBlocking으로 coroutine scope을 만들고 그안에서 suspend 함수를 실행시켰습니다.

물론 Thread.sleep 대신에 suspend 함수인 delay를 사용했기 떄문에 delay 되는 1초동안 main thread는 block되지 않습니다.


그럼 동일한 형태를 flow builder를 사용해서 작성해 보겠습니다.


fun main(args: Array) = runBlocking {
    println("main start!")
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    foo2().collect { value -> println(value) }
    println("main end!")
}

fun foo2(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

main thread가 blocking 되는지를 판단하기 위해 먼저 launch를 하나 띄워놓고 foo2() 함수를 수행했습니다.


main start!

I'm not blocked 1

1

I'm not blocked 2

2

I'm not blocked 3

3

main end!

결과는 원했던 대로 위와같이 나옵니다.

List<Int>값은 return type은 모든 계산이 끝난후에 한번에 결과를 반환하는걸 의미합니다.
하지만 비동기적으로 계산하면서 계산이 끝날때마다 하나씩 stream 형태로 값을 전달할때 flow를 사용합니다.
(아직까지는 channel과 비슷하다고 생각되네요??)

launch 내부의 print가 중간중간 찍혔으므로 thread가 blocking 되지 않았음을 확인할 수 있습니다.

만약 Thread.sleep를 사용하는 foo() 함수를 사용했다면 어떻게 될까요?


의심이 생길수 있으니 예상대로 동작하는지 돌려봅니다~

fun main(args: Array) = runBlocking {
    println("main start!")
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    foo().forEach { value -> println(value) }
    println("main end!")
}

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(1000) // pretend we are computing it
        yield(i) // yield next value
    }
}

sequence block을 사용하고 Thread.sleep을 사용했습니다.

결과는...

main start!

1

2

3

main end!

I'm not blocked 1

I'm not blocked 2

I'm not blocked 3


역시나, Thread.sleep으로 인하여 thread가 block되면서 launch 내부 구문이 돌지 못했다가 foo() 함수가 끝나고 나서야 한번에 도는걸 확인할 수 있습니다.

반쪽짜리 비동기네요.

두 코드를 비교해 보면 아래와 같은 부분이 다릅니다.

  • flow 키워드로 builder를 사용했고, return값으로는 Flow가 반환됩니다.
  • flow{...} 로 생성된 block은 suspend 할수 있습니다.
  • flow 역시 builder이기 때문에 suspend 키워드 없이 함수를 만들었습니다.
  • 값들을 emit 이란 함수로 내보냅니다.
  • 값들을 collect란 함수로 받아옵니다.
Thread의 blocking과 상관없이 collect를 만나면 collect 함수가 종료될때 까지 해당 코드 라인을 넘어가지는 않습니다.
(Thread blocking은 아닙니다만  코루틴의 기본 원직인 sequential 하게 동작한다를 만족합니다.)


Flows are cold

Flow는 sequence 처럼 cold stream 입니다. 따라서 collect가 호출되기 전까지는 수행되지 않습니다.
또한 호출할때마다 처음부터 값을 전부 방출합니다.
(RxJava의 Observable 처럼 hot stream은 지원하지 않습니다.)

fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}


결과는 아래와 같습니다.

Calling foo...

Calling collect...

Flow started

1

2

3

Calling collect again...

Flow started

1

2

3


또한 flow는 소모성이 아니기 때문에 collect를 호출될때마다 다시 시작됩니다.
그래서 collect를 부를때마다 "Flow started"가 찍힙니다.

Flow cancellation

flow는 일반적인 코루틴의 cancel 로직을 따릅니다.

flow 내부에 delay 같은 suspend function을 만났을때 cancel되며, CPU를 계속 점유하거나 소비하는 연속적인 작업같은 경우에는 취소되지 않습니다.

네..launch나 async 같은 다른 builder의 cancel 동작과 동일하다고 보면 됩니다.


다만 flow 자체에는 cancel 함수를 지원하지 않습니다.

따라서 아래와 같이 타이머로 종료시키거나 launch로 감싸서 취소해야 합니다.

fun foo(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    println("main start!")
    withTimeoutOrNull(250) { // Timeout after 250ms 
        foo().collect { value -> println(value) } 
    }
    println("main end!")
}


또한 아래처럼 사용해도 취소가 가능합니다.


fun main(args: Array) = runBlocking {
    println("main start!")

    val fooLaunch = launch {
        // Timeout after 250ms
        foo3().collect { value -> println(value) }
    }

    delay(250)
    fooLaunch.cancel()

    println("main end!")
}
어떤 형식이든 아래와 같이 결과가 출력됩니다.

main start!

Emitting 1

1

Emitting 2

2

main end!


Flow builders

flow{...}를 이용해서 flow를 만드는건 가장 기본적인 방법입니다.
그 이외에도 아래와 같은 형태로 flow를 만들수 있습니다.
  • 값이 고정되어 있을경우 flowOf builder를 이용
  • 다양한 Collection들을 .asFlow() extension function으로 flow로 변경가능.
fun main(args: Array) = runBlocking {
    println("main start!")

    val flow1 = flowOf(1,2,3)
    flow1.collect{ value -> println("flow1:$value")}

    println("/////////////////")
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println("flow2:$value")}

    println("main end!")
}


결과는..

main start!

flow1:1

flow1:2

flow1:3

/////////////////

flow2:1

flow2:2

flow2:3

main end!


Intermediate flow operators

flow는 collection이나 sequence처럼 중간 연산자로 변환할 수 있으며, 중간 연산자는 flows의 기본동작과 같이 cold 하게 동작합니다.
하지만 이런 중간 연산자들은 suspending function은 아닙니다.
하지만 빠르게 동작하여 새로운 flow를 반환합니다.

일반적으로 collection이나 sequence에서 사용하는 map이나 filter를 flow에서도 사용할 수 있습니다.
flow에서 사용되는 map이나 filter의 블럭 안에서 delay 같은 suspending function을 사용할수 있다는점이 sequence나 collection의 연산자와 가장 크게 다른점입니다.

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}


위 예제에서 map 내부 블럭에서 performRequest()를 호출합니다.

이는 suspend function으로 delay 처럼 어떤 긴 비동기 작업이 존재하더라도 mapping하여 결과를 반환합니다.

response1

response2

response3


Transform operator

flow의 변환 연산자중에 가장 일반적인것중 하나가 transform 연산자 입니다.

이 연산자는 map 이나 filter처럼 간단하게 값들을 변환할수도 있고, 복잡한 변환을 수행하도록 할수도 있습니다.

transform 연산자를 이용해서 임의의 값을 여러번 반복해서 방출하도록 만들수 있습니다.


suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
}

1~3까지를 emit 하는 flow를 만들고 transform 명령어를 이용하여 하나의 원소당 두번 emit 하도록 변경한 코드 입니다.

performRequest() 함수는 시간이 걸리는 작업이지만 emit한 순서에 맞게 순차적으로 방출하며, 이를 collect를 통해서 받아올 수 있습니다.


결과

Making request 1

response 1

Making request 2

response 2

Making request 3

response 3


Size-limiting operators
flow는 연속적인 값들의 stream을 asynchronous 하게 (기존 thread를 blocking하지 않고) 반환하는 builder 입니다.
하지만 몇개의 값만 처리가 필요한 경우 take를 통하여 개수를 제한 할 수 있습니다.
기본적으로 take는 제한된 개수까지만 flow를 수행하고 그 이후에는 cancel 시키는 형태입니다.

코루틴은 기본적으로 cancel시 Cancellation exception을 발생시키므로 try-catch를 이용하여 resource 관리를 할 수 있습니다.
fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}            

try-catch문의 위치는 flow{..} 내부입니다.

만약 numbers()에 try-catch로 감싼다면 Cancellation exception을 catch 할수 없습니다.

따라서 try-catch하는 부분을 잘 염두해두고 써야 합니다. (코루틴의 exception 처리는 항상 헷깔리네요.)


결과...

1

2

Finally in numbers


코드를 보던중에 flow를 타고 들어가보니..

역시나..

내부적으로 Channel을 사용하고 있네요

Flow라는 builder 자체가 collection에 직접 channel 이용하여 stream하게 사용하는 형태를 좀더 사용하기 편하도록 만들어놓은 느낌입니다.

제 느낌이 맞는지..

계속 보도록 하겠습니다.


Terminal flow operators

Terminal operator는 collection을 시작시키는 suspending function 입니다. (flow는 cold 하므로 terminal operator를 만나야 시작됩니다.)
collect는 가장 기본적인 operator이고 그외에도 다양한 operator들이 존재합니다.
  • toList 또는 toSet : flow를 MutableList나 MutableSet으로 변환
  • first: 첫번째 원소를 반환하고 나머지는 cancel 시킴
  • reduce: 첫번째 원소에 주어진 operation을 이용하여 누적시켜 최종값을 반환
  • fold: 초기값을 입력받아 주어진 operation을 이용하여 누적시켜 최종값을 반환

 

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
}


자바8 stream에서 제공하는 함수와 비슷합니다만 전부 coroutine package에서 제공하는 api들 입니다.

결과

55


Flows are sequential

각각의 colection으로 이루어진 flow들은 특별하게 multiple flow로 동작하도록 하는 operator를 사용하지 않는한 순차적으로(sequential)하게 동작합니다.
또한 기본적으로 terminal operator를 호출하는 coroutine에서 바로 수행되며, 새로운 코루틴을 생성해서 사용하지 않습니다.

여러개의 중간 operator로 조합된 경우 코틀린에서의 sequence처럼 순차적으로 동작합니다.
(일반 collection처럼 단계별로 새로운 collection을 만들면서 메모리를 낭비하지 않습니다.)

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }    
}


위 설명에서 언급한대로 원소 하나씩 중간 operator를 거쳐서 변환됩니다.

Filter 1

Filter 2

Map 2

Collect string 2

Filter 3

Filter 4

Map 4

Collect string 4

Filter 5


첫번째 원소인 1은 filter를 거치면서 로그는 찍으나, filter 연산자를 통과하지 못합니다.

두번째 원소인 2는 filter를 거치면서 로그를 찍고 조건을 만족하여 filter를 통과하여 map 함수로 넘겨 집니다.

map 연산자를 통해 변경된 원소는 collector로 넘겨져 처리됩니다.

그리고 나서 다음 원소가 다시 filter부터 다시 거칩니다.


이런 방식은 Kotlin에서 일반적인 collection 처리가 아닌 sequence 처리와 동일합니다.


여기까지 봤을때 어떤 생각이 드시나요?

처음 의문은 "channel의 동작과 크게 다르지 않은데 왜 Flow라는 builder를 만들었을까" 였습니다.

하지만 여기까지 정리하고 보니 java8의 stream과 너무나 닮았습니다.

(Terminal 연산자를 만날때 까지 lazy하게 실행이 되지 않는것도 동일하구요.)


Kotlin을 사용하면서 지원하지 못해 아쉬웠던 java8의 stream의 처리를 너무나 유사하게 흉내? 내고 있네요.

만약 제 예상이 맞다면 문맥상 여기까지 나온걸 봐서, 이후에는 각 원소별로 thread를 분리해서 처리하는 방법과, 이에 따른 pipelining을 하는 방법이 나올만 합니다..

번역하면서, 코드를 돌려보면서, 정리하면서, 갑자기 두근두근 합니다.

(저도 원문 띄워놓고 테스트해보고 예제도 만들어보면서 동시에 포스팅을 하는거라..)


드디어, 코틀린에서 자바의 stream을  극복할 수있는 방향을 제시하는지는 저도 더 번역해 보고 돌려봐야 할것 같습니다.

내용이 많아 일단 flow의 첫번째 분량은 여기까지 포스팅 합니다.

다음 포스팅에 flow의 나머지  부분에 대해서 정리하도록 하겠습니다.

반응형