ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 코루틴(Coroutine) - 입문2(Asynchronous Flow)
    Android(+ Kotlin) 2020. 2. 6. 19:05

    2020/01/22 - [Android(+ Kotlin)] - 코루틴(Coroutine) - 입문

     

    코루틴(Coroutine) - 입문

    2020/01/16 - [Android(+ Kotlin)] - 코루틴(Coroutine) 기본개념 코루틴(Coroutine) 기본개념 코루틴은 Kotlin언어를 개발한 jetbrains에서 만들어졌다. Java에서는 사용할 수 없다. 서브루틴(subroutine) 먼저..

    charko.tistory.com

    이전글 참조요망

     

    https://kotlinlang.org/docs/reference/coroutines/flow.html

     

    Asynchronous Flow - Kotlin Programming Language

     

    kotlinlang.org

    위 링크 참조하여 작성되었습니다. (작성도중 너무 길다 생각됐는데 1,2부로 나누었어야 했나 ㅠㅠ 스압주의)

    Asynchronous Flow

    서스팬딩 함수는 비동기로 단일값을 리턴한다. 그렇다면 비동기로 계산된 여러 값(리스트 등)을 어떻게 리턴될까?

    Representing multiple values

    코틀린에서는 여러개의 값을 표시할때 collections 타입을 사용한다.(배열 리스트등)

    Int타입의 데이터를 List<>타입으로 묶어 일괄 리턴한다.

    fun foo(): List<Int> = listOf(1, 2, 3)
    
    fun main() {
        foo().forEach { value -> println(value) }
    }

    Sequences

    메인스레드에서 CPU프로세싱을 100ms마다 sleep 후 하나씩 전달한다. (비동기가 아니다)

    fun foo(): Sequence<Int> = sequence {
        for (i in 1..3) {
            Thread.sleep(100)
            yield(i)
        }
    }
    
    fun main() {
        foo().forEach {
                value -> println(value)
        }
    }

    리턴되는 값은 sequence의 주소값을 리턴하며, 해당 값을 확인할 수 없고, 해당 함수에서 yield() 메소드를 통해 하나씩 발생 시킨다.

    suspending functions

    코루틴에서는 "suspend"의 키워드를 통해 시작, 일시중단, 재시작이 가능한 함수로 지정하였다.

    suspend fun foo(): List<Int> {
        delay(1000)
        return listOf(1, 2, 3)
    }
    
    fun main() = runBlocking<Unit> {
        foo().forEach { value -> println(value) }
    }

    Flows

    비동기로 계산된 값을 실시간으로 나타낼 수 있다. Flow<Int> 타입을 사용하며 Sequence의 비동기형으로 보아도 무방할것 같다.

    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(100)
            }
        }
        foo().collect { value -> println(value) }
    }
    
    // 출력결과
    // I'm not blocked 1
    // 1
    // I'm not blocked 2
    // 2
    // I'm not blocked 3
    // 3

    위 코드는 메인 스레드를 block하지 않고, 0.1초를 기다린 후 숫자를 리턴하여 출력하며, 메인에서 실행된 코루틴이 차단된 상태가 아니라고 반복해서 나타낸다.

     

    위에서 설명했던 예제들과 Flows가 다른 부분을 살펴보면

    • Flow타입의 빌드함수는 flow로 불린다.
    • 코드 내부가 일시 중단될 수 있다.
    • foo()함수에 suspend 수정자를 붙이지 않아도 된다.
    • emit을 통해 flow에서 해당 값이 리턴된다. (함수 내부)
    • collect을 통해 flow에서 값의 수집이 시작된다. (함수 호출시)

    * delay를 대신하여 Thread.sleep()을 사용할 경우 메인스레드에서 block이 된다.

    Flows are cold

    sequence와 매우 유사한 cold(?) 스트림이다. 아래의 코드의 Flow는 flow가 수집이 완료될때까지 진행되지 않는다.

    fun foo(): Flow<Int> = flow {
        println("Flow started")
        for (i in 1..3) {
            delay(1000)
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        println("Calling foo...")
        val flow = foo()
        println("Calling collect...")
        flow.collect { value -> println(value) }
        println("Calling collect again...")
        flow.collect { value -> println(value) }
    }
    
    // 출력결과
    // Calling foo...
    // Calling collect...
    // Flow started
    // 1
    // 2
    // 3
    // Calling collect again...
    // Flow started
    // 1
    // 2
    // 3

    foo() 함수에 suspend 수정자를 작성하지 않은 이유이다.(사용하더라도 동일하게 동작이 수행된다.) foo() 함수는 빠르게 리턴되며 아무것도 기다리지 않는다. flow가 연달아 collect 호출 시 모두 수집된 후 시작한다.

    Flow cancellation

    Flow는 일반적인 코루틴의 취소를 준수하지만 구조 내 별도 취소지점이 없다. 함수 내 취소가 가능한 상태일 때(delay함수등의 suspend 기능이 있을 때.) 취소가 되며 그렇지 않을 경우 취소가 불가능하다.

    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100)
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        withTimeoutOrNull(200) {
            foo().collect { value -> println(value) }
        }
        println("Done")
    }
    
    // 출력결과
    // Emitting 1
    // 1
    // Emitting 2
    // 2
    // Done

    withTimeoutOrNull로 인해 0.2초만에 foo()함수가 진행 중에 종료가 되었다.

     

    quiz 아래의 코드의 출력결과는?

    fun foo(): Flow<Int> = flow {
        for (i in 1..10) {
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        withTimeoutOrNull(1) {
            foo().collect { value -> println(value) }
        }
        println("Done")
    }

    *정답은 아래 블록지정 해보면 보입니다.

    foo()함수의 내부에 suspend 기능을 가진 함수가 없음 으로, 1~10까지 출력이 진행 된다.

    ⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️⬆️

     

    Flow builders

    flow { ... } 빌더는 가장 기본이 되는 예제이다. 쉽게 선언하는 다른 방법에 대해서 알아보자

    flowOf builder - 고정값을 외부로 보내는 flow

    .asFlow() - 변수의 collections, sequences 를 변경할 수 있다.

    (1..3).asFlow().collect { value -> println(value) }

    Intermediate flow operators (중간연산자)

    suspend fun performRequest(request: Int): String {
        delay(1000)
        return "response $request"
    }
    
    fun main() = runBlocking<Unit> {
        (1..3).asFlow()
            .map { request -> performRequest(request) }
            .collect { response -> println(response) }
    }

    사용된 map, filter 오퍼레이터는 내부를 suspend함수를 사용할 수 있게 만들 수 있다. 인바운드된 데이터를 map을(업스트림) 통해 결과(다운스트림)를 매핑시킬 수 있다. (collections, sequence에서 동일)

    Transform operator

    transform을 통해 여러개의 값을 매핑시켜 전달 할 수 있다.

    (1..3).asFlow()
        .take(2)
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }

    Size-limiting operators

    take( int ) 를 통해 해당 리스트(?)의 갯수를 제한 시킬 수 있다.

    (1..3).asFlow()
        .take(2)
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
        
    // 출력결과
    // Making request 1
    // response 1
    // Making request 2
    // response 2

    Terminal flow operators

    시작하는 콜랙션 흐름을 일시중단한다. collect()는 기본이지만 더 쉽게 만들 수 있는 터미널 오퍼레이터들이 있다.

    toList, toSet 변수의 변환이 가능하다.

    first - 변수의 값이 true일 때 최초 1번만 아웃스트림으로 전달(해당 리스트 중 true의 값이 없을 경우 NoSuchElementException 호출)

    val sum = (1..5).asFlow()
        .map { it * it }
        .first { it ->
            it == 4
        }
    println(sum)
    
    // 출력결과
    // 4

    single - 리스트의 값이 오직 1개일 때 처리, 그 외 예외처리 진행(Exception in thread "main" java.lang.IllegalStateException: Expected only one element)

    val target: MutableList<Int> = arrayListOf()
    target.add(2)
    // target.add(1) // 값이 2개가 되어 에러 발생
    
    val sum = target.asFlow().single()
    println(sum)
    
    // 출력결과
    // 2

    reduce - 누산 기능(첫번째 리스트의 값은 accumulator 바로 들어감)

    val sum = (1..5).asFlow()
        .map { it * it }
        .reduce { accumulator, value ->
            accumulator + value
        }
    println(sum)
    
    // 출력결과
    // 55

    fold - 누산기능2(초기값 지정)

    val sum = (1..5).asFlow()
        .map() { it * it }
        .fold(0) { R, T ->
            R + T
        }
    println(sum)
    
    // 출력결과
    // 55

    Flows are sequential

    여러 플로우에서 작동하는 특수연산자를 사용하지 않으면 각각의 collection이 순차적으로 수행된다. (element-by-element or lazy order 처리방식) 기본적으로는 새로운 코루틴이 시작되지 않고, collection은 터미널 연산자에 의해 업스트림에서 다운스트림으로 처리 된 후 전달된다.

    fun main() = runBlocking {
        (1..5).asFlow()
            .filter {
                println("filter $it")
                it % 2 == 0
            }
            .map {
                println("map $it")
                "string $it"
            }
            .collect {
                println("collect $it")
            }
    }
    
    // 출력결과
    // filter 1
    // filter 2
    // map 2
    // collect string 2
    // filter 3
    // filter 4
    // map 4
    // collect string 4
    // filter 5

    Flow context

    컬렉션 흐름은 항상 코루틴의 컨텍스트에서 발생한다. 아래의 코드는 foo() 흐름의 세부 구현사항에 관계없이 코드의 작성자가 지정한 컨텍스트에서 실행된다.

    withContext(coroutineContext) {
        foo().collect {
            println(it)
            log("$it")
        }
    }

    이러한 특성은 context preservation(컨텍스트 보전)라고 한다.

    따라서 기본적으로 flow { ... } 빌더의 코드는 해당 플로우 컬렉터가 제공하는 컨텍스트에서 실행 된다.

    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
    
    fun foo(): Flow<Int> = flow {
        log("Started foo flow")
        for (i in 1..3) {
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo().collect { value -> log("Collected $value") }
    }
    
    // 출력결과
    // [main @coroutine#1] Started foo flow
    // [main @coroutine#1] Collected 1
    // [main @coroutine#1] Collected 2
    // [main @coroutine#1] Collected 3

    메인 스레드에서 foo().collect 호출되어 하나의 코루틴 인스턴스가 발생되가 때문에 foo의 flow 사용된 context도 메인에 호출된 스레드이다.

    실행 컨텍스트를 신경 쓰지 않고 호출자를 차단하지 않는 빠른실행 or 비동기 코드의 기본이다.

    Wrong emission withContext

    장기 실행되는 CPU소비되는 코드가 컨텍스트(Dispatchers.Default)에서 실행해야 할 수 있다. 그리고 UI 업데이트 코드 일 경우 컨텍스트(.Main)에서 실행 해야 할 수 있다.

    일반적으로 withContext는 코틀린 코루틴을 사용하여 코드의 컨텍스트를 변경하는데 사용되지만 flow { ... } 빌더의 코드는 context preservation(컨텍스트 보전) 특성을 준수해야하며, 다른 컨텍스트에서 emit 할 수 없다.

    un foo(): Flow<Int> = flow {
        withContext(Dispatchers.Default) {
            for (i in 1..3) {
                sleep(100)
                emit(i)
            }
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo().collect { value -> log("Collected $value") }
    }
    
    // 출력결과
    // Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
    // ~~~ Please refer to 'flow' documentation or use 'flowOn' ins

    컴파일 결과 예외가 발생한다.

    결국 호출할 때 main에서 사용된(?) dispatchers context를 사용하는데(context preservation 특성을 통해 별도 호출은 필요 없음) 다른 context로 인해 오류가 발생한다.

    flowOn operator

    예외 내용중 Please refer to 'flow' documentation or use 'flowOn' instead 있다.

    flowOn을 사용하라 라는 내용이다. 다른 context를 이용을 위해서는 아래의 코드를 참조하자.

    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            sleep(100)
            log("Emitting $i")
            emit(i)
        }
    }.flowOn(Dispatchers.Default)
    
    fun main() = runBlocking<Unit> {
        foo().collect { value -> log("Collected $value") }
    }
    
    // 출력결과
    // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
    // [main @coroutine#1] Collected 1
    // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
    // [main @coroutine#1] Collected 2
    // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
    // [main @coroutine#1] Collected 3

    메인 스레드에서 collect되는 동안 foo() 함스에서는 다른 스레드에서 진행된 것을 확인 할 수 있다.

    flowOn이 내부 스레드의 특성을 변경했고, CorutineDispatcher를 다른 context로 변경하여 다른 코루틴을 통해 업스트림 flow가 진행되었다.

    Buffering

    다른 코루틴 플로우의 다른부분을 실행하면 flow를 collect하는데 걸리는 전체 시간, 특히 장기 실행 비동기 작동일 때 도움을 준다.

    아래의 코드로 예를 들면 foo()함수의 흐름의해 100ms가 delay된 후 emit이 동작한다. collect에서도 delay로 300ms가 걸린다.

    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        val time = measureTimeMillis {
            foo().collect { value ->
                delay(300)
                println(value)
            }
        }
        println("collect in $time ms")
    }
    
    // 출력결과
    // 1
    // 2
    // 3
    // collect in 1233 ms (성능에 따라 상이)

    foo()함수에서 100ms, collect에서 300ms에 따라 최소 1200ms 소요가 된다.

     

    buffer 오퍼레이터를 사용하면 순차적으로 실행 되지만 foo()함수 내의 속도를 줄일 수 있다.

    val time = measureTimeMillis {
        foo()
            .buffer()
            .collect { value ->
                delay(300)
                println(value)
            }
    }
    println("collect in $time ms")
    
    // 출력결과
    // ...
    // collect in 1057 ms

    *flowOn 오퍼레이터도 비슷한 버퍼링 매커니즘을 사용한다.

    Conflation

    flow작업의 일부 결과를 나타내거나 작업 상태를 업데이트할 때 각 값을 처리 할 필요 없거나, 가장 최근의 하나만 처리가 필요할때(?) — 여기 한번더 체크체체체

    conflate 오퍼레이터는 collector가 느릴 때 중간값을 스킵한다.

    val time = measureTimeMillis {
        foo()
            .conflate()
            .collect { value ->
                delay(300)
                println(value)
            }
    }
    println("collect in $time ms")
    
    // 출력결과
    // 1
    // 3
    // collect in 756 ms

    Processing the latest value

    Conflation은 emitter, collector가 느릴 때 프로세싱을 빠르게 하는 한가지 방법으로 emit() 값을 삭제 한다. 다른 방법으로 느린 collector를 삭제 하고 새로운 값으로 재시작하여 전달한다. xxxLatest 쓰는 오퍼레이터들이다.(transform, collect, combine(deprecate??), flatMap, map) 이 것들은 해당 코드블록을 취소 한다.

    val time = measureTimeMillis {
        foo().collectLatest { value ->
            delay(300)
            println(value)
        }
    }
    println("collect in $time ms")
    
    // 출력결과
    // 3
    // collect in 720 ms (성능에 따라 상이)

    위 foo()함수에서 100ms씩 3번(300ms) emit되며 collectLatest의 바디에서 300ms가 걸리지만 마지막 값만 처리 하기 때문에 300ms시간이 소요된다. 그래서 총 600ms이상의 시간이 걸린다.

    Zip

    코틀린 기본 lib중 Sequence.zip의 확장함수와 동일하게 flow에서 zip 오퍼레이터는 두개의 flow를 결합시킨다.

    fun main() = runBlocking {
        val nums = (1..3).asFlow()
        val strs = flowOf("one", "two", "tree")
    
        num.zip(strs, { T1, T2 ->
            "$T1 -> $T2"
        }).collect { it -> println(it) }
    }
    
    // 출력결과
    // 1 -> one
    // 2 -> two
    // 3 -> tree

    *nums, strs의 sequence의 사이즈가 다를 경우 작은크기를 따른다(내부 exception 처리가 되어 있는 듯 하다.)

    num.zip(strs, { T1, T2 ->
        "$T1 -> $T2"
    }).collect { it -> println(it) }

    위 코드와 아래 코드는 같다.

    num.zip(strs) { T1, T2 ->
        "$T1 -> $T2"
    }.collect { println(it) }

    Combine

    flow의 변수 또는 연산의 최신 값을 나타내는 경우(conflation 참조), 해당 flow 최신 값을 계산하고 업스트림 flow에 emit 때마다 다시 계산한다.

    fun main() = runBlocking {
        val nums = (1..4).asFlow().onEach { delay(300)}
        val strs = flowOf("one", "two", "tree").onEach { delay(400) }
        val startTime = System.currentTimeMillis()
    
        num.zip(strs) { T1, T2 ->
            "$T1 -> $T2"
        }.collect {
            println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
    }
    
    // 출력결과 (성능에 따라 상이)
    // 1 -> one at 430 ms from start
    // 2 -> two at 832 ms from start
    // 3 -> tree at 1259 ms from start

    위 코드를 보면 num은 300ms, strs는 400ms다, zip 오퍼레이터를 통해 합쳐질 때 400ms가 소모된다.

    onEach() 중간연산자는 각 element를 delay시키고 코드의 간결성을 가지고 온다.

    하지만 combine 오퍼레이터를 사용할때는 다르다.

    fun main() = runBlocking {
        val nums = (1..4).asFlow().onEach { delay(300)}
        val strs = flowOf("one", "two", "tree").onEach { delay(400) }
        val startTime = System.currentTimeMillis()
    
        num.combine(strs) { T1, T2 ->
            "$T1 -> $T2"
        }.collect {
            println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
    }
    
    // 출력결과 (성능에 따라 상이)
    // 1 -> one at 436 ms from start
    // 2 -> one at 636 ms from start
    // 2 -> two at 839 ms from start
    // 3 -> two at 937 ms from start
    // 4 -> two at 1237 ms from start
    // 4 -> tree at 1239 ms from start

    zip은 두개의 element 값을 기다려 합친 값을 전달 하였지만, combine은 각 element값이 있다면 기다리지 않고 출력한다.

    nums하나의 element를 가져오는데 300ms 대기한다. strs는 400ms 대기한다. 이 두개의 flow는 element를 전달하는 시간이 다르다.

    nums, strs 각 collect되면서 T1, T2의 값을 가질 때 해당값을 출력한다.

    nums가 300ms후 T1에 값이 들어 갔으나, 해당 시간에 T2에는 값이 없어 진행되지 않으며

    400ms후 출력(strs 400ms 지남 nums: 1, strs: one), 600ms후 출력(nums 300ms 지남 - nums: 2, strs: one), 800ms후 출력(strs 400ms지남 nums: 2, sts: two) ....... 형태로 진행이 된다.

    Flattening flows

    플로우는 비동기식으로 수신 된 값 시퀀스를 나타내므로 각 값이 다른 시퀀스에 대한 요청을 트리거하는 상황에 쉽게 도달할 수 있다. 아래의 코드를 참조하여 flow의 emit → delay → emit 단계를 거치는 함수이다.

    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First")
        delay(500)
        emit("$i: Second")
    }
    (1..3).asFlow().map { requestFlow(it) }

    3개의 int값을 requestFlow로 전달 후 리턴 타입은 Flow<Flow<String>>이다. 이 타입을 Flow<String>으로 만들 수 있는 flatten(평판화) 작업이 필요하다. collection, sequence에는 flatten, flatMap 오퍼레이터를 가지고 있다.

    flatMapConcat

    즉시 연결 모드로 flatMapConcat, flattenConcat 오퍼레이터가 있다.

    • faltMapConcat
    fun main() = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow().onEach { delay(100) }
            .flatMapConcat {
                requestFlow(it)
            }.collect { value ->
                println("$value at ${System.currentTimeMillis() - startTime} ms from start")
            }
    }
    
    // 출력결과
    // 1: First at 124 ms from start
    // 1: Second at 626 ms from start
    // 2: First at 731 ms from start
    // 2: Second at 1235 ms from start
    // 3: First at 1339 ms from start
    // 3: Second at 1843 ms from start

     

    • flattenConcat
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .map { requestFlow(it) }
        .flattenConcat()
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
    
    // 출력결과
    // 1: First at 121 ms from start
    // 1: Second at 626 ms from start
    // 2: First at 730 ms from start
    // 2: Second at 1234 ms from start
    // 3: First at 1338 ms from start
    // 3: Second at 1841 ms from start

     

     

    flatMapMerge

    들어오는 flow를 동시에 실행시켜 단일 flow로 가능한 빨리 값을 전달한다. flatMapMerge, flattenMerge 오퍼레이터가 있다. 두개는 concurrency를 파라미터로 받는 옵션이 있다.( flow의 갯수를 재한할 수 있다. 기본값 16개)

    fun main() = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow().onEach { delay(100) }
            .flatMapMerge { requestFlow(it) }
            .collect { value ->
                println("$value at ${System.currentTimeMillis() - startTime} ms from start")
            }
    }
    
    // 출력결과
    // 1: First at 158 ms from start
    // 2: First at 259 ms from start
    // 3: First at 365 ms from start
    // 1: Second at 661 ms from start
    // 2: Second at 764 ms from start
    // 3: Second at 870 ms from start
    (1..3).asFlow().onEach { delay(100) }
        .map { requestFlow(it) }
        .flattenMerge()
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
    
    // 출력결과
    // 1: First at 158 ms from start
    // 2: First at 259 ms from start
    // 3: First at 365 ms from start
    // 1: Second at 661 ms from start
    // 2: Second at 764 ms from start
    // 3: Second at 870 ms from start

    * requestFlow(it) 함수를 순서대로 호출 하지만, collect의 결과는 flow를 동시에(순서 보장X) 받는다.

    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .map { requestFlow(it) }
        .flattenMerge(2)
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
    
    // 출력결과는 위와 유사(성능에 따라 상이)
    // 1: First at 143 ms from start
    // 2: First at 244 ms from start
    // 1: Second at 645 ms from start
    // 3: First at 646 ms from start
    // 2: Second at 744 ms from start
    // 3: Second at 1148 ms from start

    flatMapLatest

    xxxLatest오퍼레이터와 동일하다 호출하여 마지막값이 아니면 취소하고 마지막 값만 진행이 끝까지 된다.

    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapLatest { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
    // 출력결과
    // 1: First at 164 ms from start
    // 2: First at 338 ms from start
    // 3: First at 443 ms from start
    // 3: Second at 947 ms from start

    * 호출되는 requestFlow 함수에서 delay가 없다면 Lastest기능을 활용 할 수 없다. 취소를 시키기 위해서는 delay와 같은 멈춤이 있을 때 가능하다.(기본개념의 cancel확인)

    Flow exception

    collect 과정에서 emit혹은 코드 내부상황에서 예외처리가 가능하다. 몇개의 예외처리방법이 있다.

    Collector try and catch

    어느 코드에서나 사용되는 try/catch, collect { ... } 내부에 사용가능하다.

    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> {
        try {
            foo().collect { value ->
                println(value)
                check(value <= 1) { "Collected $value" }
            }
        } catch (e: Throwable) {
            println("Caught $e")
        }
    }
    
    // 출력결과
    // Emitting 1
    // 1
    // Emitting 2
    // 2
    // Caught java.lang.IllegalStateException: Collected 2

    check함수를 통해 exception이 진행된다.

    Everything is caught

    fun foo(): Flow<String> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }.map { value ->
        check(value <= 1) { "Crashed on $value" }
        "string $value"
    }

    foo 함수 내부에 check하여 예외처리를 발생시킨다.

    *foo 함수를 사용할때 IDE에서 별도의 표시는 없다. 직접 try/catch를 하지 않을 경우 오류가 발생되어 시스템이 중단될 수 있다.(throw처럼 예외 발생의 가능성이 있는 만큼 IDE에서 잡아주면 좋을 것 같다)

    Exception transparency

    위 코드처럼 flow { ... } 빌더 내부에 try/catch를 사용했었다. 하지만 데이터 방출시 예외처리를 캡슐화 할수있다.

    • catch 오퍼레이터를 사용하여 예외처리를 진행 할 수 있다.
    • 다시 한번더 throw를 할 수 있다.
    • 예외처리 문으로 들어온 데이터를 다시한번더 emit할 수 있다.
    • 무시할 수 있고, 다른 코드를 삽입할 수 있다.
    foo().catch { prlintln("Caught $it")}
        .collect { println(it) }

    Transparent catch

    위와 같이 catch가 중간연산자로 있을경우, collect { ... } 내부에 발생되는 예외처리가 발생했을때 시스템이 죽는다.

    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo().catch { println("Caught $it") }
            .collect { value ->
                check(value <= 1) { "Collected $value" }
                println(value)
            }
    }

    Catching declaratively

    위 문제를 해결하기 위해 onEach { ... }블럭에서 처리하면 된다.

    foo().onEach {
        check(it <= 1) { "Collected $it" }
        println(it)
    }
        .catch { println("Caught $it") }
        .collect()

    * 예외처리를 위해 안정적인 코드를 짜기 위해선, onEach에서 비즈니스 로직을 구현하고, catch로 예외에 대한 처리를 하는게 좋을 것 같다.

    Imperative finally block

    try/catch 문 외 collect는 finally 블럭을 사용하여 완료 후 작업을 요청 할 수 있다.

    fun foo(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        try {
            foo().collect { value -> println(value) }
        } finally {
            println("Done")
        }
    }
    
    // 출력결과
    // 1
    // 2
    // 3
    // Done

    Declarative handling

    위 코드처럼 flow의 완료에 대한 로직 구성보다 onCompletion 오퍼레이터는 간단한 방법을 제시한다.

    foo().onCompletion { println("Done") }
            .collect { value -> println(value) }

    그리고 onCompletion의 장점은 Throwable?(nullable)로 파라미터를 받아 정상완료 또는 예외처리에 대해 판별이 가능하다.

    fun foo(): Flow<Int> = flow {
        emit(1)
        throw RuntimeException()
    }
    
    fun main() = runBlocking<Unit> {
        foo()
            .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
            .catch { cause -> println("Caught exception") }
            .collect { value -> println(value) }
    }
    
    // 출력결과
    // 1
    // Flow completed exceptionally
    // Caught exception

    onCompletion은 catch와 달리 예외처리를 핸들링하지 않는다.

    * 예외처리 발생시 catch이전에 onCompletion을 먼저 진행된다.

    Upstream exceptions only

    onCompletion은 업스트림(foo 함수)에서 내려오는 예외처리를 확인할 수 있지만, 다운스트림(collect { ... })의 예외처리는 확인할 수 없다.

    fun foo(): Flow<Int> = (1..2).asFlow()
    
    fun main() = runBlocking {
        foo()
            .onCompletion { cause -> println("Flow completed with $cause") }
            .collect { value ->
                check(value <= 1) { "Collected $value" }
                println(value)
            }
    }
    
    // 출력결과
    // 1
    // Flow completed with null
    // Exception in thread "main" java.lang.IllegalStateException: Collected 2

    이 문제는 위 Transparent catch에서 언급했던것과 같이 onEach에서 비즈니스 로직을 작성하는것이 좋을 것 같다.

    Imperative versus declarative

    위 예제에서 보여준 flow의 collect 사용방식과 예외처리와 완료에 대한 핸들링 모든것들의 사용방법이 다양하다. 어떤 코드가 베스트인지는 모르고 kotlin에서는 위 방법중 하나를 지지하지 않았다. 각자의 개발스타일에 맞춰 선택하여 사용하는것이 좋다.

    * 저의 사견은 exception처리는 업스트림에서, onEach에 비즈니스 로직을, catch문과, 필요에 따라 onCompletion을 써주는께 좋을 것 같다.

    Launching flow

    flow를 사용하여 일부 소스에서 오는 비동기 이벤트를 쉽게 표현할 수 있다. 이 경우 addEventListener와 같은 들어오는 이벤트를 생성할 수 있어야 한다. onEach 오퍼레이터 반응으로 코드를 등록하고 추가 작업을 계속할 수 있다.

    fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
    
    fun main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .collect()
        println("Done")
    }
    
    // 출력결과
    // Event: 1
    // Event: 2
    // Event: 3
    // Done

    위 출력결과를 보면 이벤트에 대해 리스너 한것이 아닌 코루틴을 실행한 결과와 동일하다.

    onEach는 중간 오퍼레이터라 flow를 collect하려면 터미널 오퍼레이터도 필요하다.

    터미널 오퍼레이터로 launchIn을 사용한다.(collect 대신)

    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this)
    println("Done")
    
    // 출력결과
    // Done
    // Event: 1
    // Event: 2
    // Event: 3

    launchIn은 CoroutineScope를 필수 파라미터로 사용한다. runBlocking 코루틴 빌더에서 해당 scope가 파라미터로 사용되어 자식 코루틴이 종료되기까지 메인 함수는 대기한다.

    실제 애플리케이션 범위는 수명이 제한된 엔티티에서 제공된다. 이 엔티티의 수명이 종료되자마자 해당 scope가 취소되고 해당 flow의 collect가 취소된다.

    onEach { ... }.launchIn(this)는 addEventListener와 동일하다 그러나 취소 및 구조적 동시성 용도로 사용되므로 removeEventListener가 필요하지 않다.

    * launchIn은 Job을 리턴한다. 전체 범위를 취소하지 않고 flow collect만 취소하거나 join(끝날때 까지 대기)을 사용할 수 있다.

     

    Flow and Reactive Streams

    For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.

    Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.

    While being different, conceptually, Flow is a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in corresponding reactive modules (kotlinx-coroutines-reactive for Reactive Streams, kotlinx-coroutines-reactor for Project Reactor and kotlinx-coroutines-rx2 for RxJava2). Integration modules include conversions from and to Flow, integration with Reactor's Context and suspension-friendly ways to work with various reactive entities.

     

    Java의 Reactive Stream 또는 RxJava 및 Reactor 프로젝트와 같은 reactive framework 친숙하면 flow도 어렵지 않을것이다.

    실제로 설계도 Reactive Streams와 다양한 구현에서 영감을 얻었다. 그러나 Flow의 주요 목표는 가능한 한 단순한구조를 유지하고 Kotlin과 suspend 친화적이며 구조적 동시성을 존중하는 것이다. 이 목표를 달성하는것은 적극적 개척자의 노력 없이는 불가능할 것입니다. https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 (한번 읽어보자)

    개념적으로 다르지만, Flow는 반응 스트림으로 반응(spec and TCK compliant?) 게시자로 변환하거나 그 반대로 변환 할 수 있습니다. 이러한 변환기는 기본적으로 제공되며 해당 반응성 모듈( kotlinx-coroutines-reactive반응 형 스트림, kotlinx-coroutines-reactor프로젝트 리액터 및 kotlinx-coroutines-rx2RxJava2)에서 찾을 수 있습니다. 통합 모듈에는 Flow Reactor와의 통합 및 Context및 다양한 반응 엔티티와 작업하기위한 suspend 친화적인 방법이 포함됩니다.

     

    끝.

     

    참조문서 : https://tourspace.tistory.com/258, https://tourspace.tistory.com/260?category=797357

    댓글

Designed by Tistory.