본문으로 바로가기
반응형

이전 포스팅에서 Android에서 Coroutine이 사용 가능하도록 설정 된 상태라면 Room에서 coroutine을 이용한 DB 접근방법에 대해서 알아봅니다.

상세내용이 담긴 포스팅

2019/11/25 - [개발이야기/Android] - Android Room & Coroutines

2019/11/04 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#10- Asynchronous Flow(1/2)

2019/11/16 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#11- Asynchronous Flow(2/2)

 

 

room 2.1에서 부터 coroutine을 지원하기 시작했고 room 2.2부터 Flow를 지원합니다.

실제로 이 두가지를 어떻게 호출하고 사용하는지 알아봅니다.

 

Room의 suspend function 지원

아래처럼 사용자를 관리하는 Table에 CRUD를 하는 Dao가 존재한다고 예를 듭니다.
@Dao
interface UsersDao {
    @Query("SELECT * FROM users")
    fun getUsers(): List<User>
    @Query("UPDATE users SET age = age + 1 WHERE userId = :userId")
    fun incrementUserAge(userId: String)
    @Insert
    fun insertUser(user: User)
    @Update
    fun updateUser(user: User)
    @Delete
    fun deleteUser(user: User)
}

이제 coroutine의 사용이 가능해 졌으니 MainActivity에서 쉽제 비동기로 호출할 수 있습니다.

 (MVVM에서라면 repository를 만들거나, ViewModel에서 호출해야 하지만 ViewModel에서 코루틴을 사용하는 방법은 다음 포스팅에서 다룹니다.)

class MainActivity : BaseActivity() { // BaseActivity에서 CoroutineScope을 implement한 상태
    @Inject
    private lateinit var db: UserDatabase  
    ...
    fun insertUser(user: User) {
        launch(Dispatchers.IO) { db.userDao().insertUser(user) }
    }
    ...
}

이전 포스팅에서 BaseActivity에 Coroutine scope을 만들어 놓았으니, MainActivity도 이미 coroutine scope안에 존재합니다.

따라서 insertUser() 함수 내에서 바로 launch를 호출합니다.

만약 동작하는 동안 loading progress를 보여줘야 한다면 아래와 같이 수정 할 수 있습니다.

fun insertUser(user: User) {
    launch {
        showProgressDialog() //loading progress bar 화면 출력   
        withContext(Dispatchers.IO) { db.userDao().insertUser(user) }
        hideProgressDialog() //loading progress bar 화면 삭제  
    }
}

이제 Dao를 suspend로 바꿔 봅니다.

@Dao
interface UsersDao {
    @Query("SELECT * FROM users")
    suspend fun getUsers(): List<User>
    @Query("UPDATE users SET age = age + 1 WHERE userId = :userId")
    suspend fun incrementUserAge(userId: String)
    @Insert
    suspend fun insertUser(user: User)
    @Update
    suspend fun updateUser(user: User)
    @Delete
    suspend fun deleteUser(user: User)
}

간단하게 Dao의 함수에 suspend만 추가하면 됩니다.

이제 해당 함수들은 더이상 Background thread에서 호출할 필요가 없습니다.

따라서 아래와 같이 호출 부분이 변경됩니다.

fun insertUser(user: User) {
    launch {
        showProgressDialog() //loading progress bar 화면 출력
        db.userDao().insertUser(user)
        hideProgressDialog() //loading progress bar 화면 삭제
    }
}

Dao에  suspend 함수로 만들면 해당 함수들은 room 자체에서 Background thread에서 수행 후 호출한 coroutine context로 값을 반환해 줍니다.

따라서 해당 함수는 Main Thread에서 호출이 가능합니다.

만약 RxJava를 사용해야 했다면 아래와 같이 작성되어야 합니다.

 

fun insertUser(user: User) {
    Observalbe.fromCallable { db.userDao().insertUser(user) }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe { showProgressDialog() }
        .doOnTerminate { hideProgressDialog() }
        .subscribe()
}

물론 여기서 Dao의 insertUser() 함수는 suspend 함수가 아니여야 합니다.

어떤 구문이 더 편할지는 개발자 개인의 판단입니다.

withTransaction의 사용

Dao에서 @Transaction 명령을 이용하여 Transaction을 사용할수 있습니다.

만약 여러개의 Dao의 함수들 또는 하나의 Dao에서 여러작업을 하나의 Transaction으로 묶고 싶다면 withTransaction 함수를 사용할 수 있습니다.

class MainActivity : BaseActivity() {
    @Inject
    private lateinit var db: UserDatabase  ...
    fun insertUser(user: User) {
        launch {
            db.withTransaction {
                db.userDao().deleteUser(user) //deleteUser()는 suspend function인 상태  
                db.userDao().insertUser(user) //insertUser()는 suspend function인 상태   
            }
        }
    }  ...
}

예제에서는 같은 dao의 함수들을 호출했지만 다른 Dao에 존재하는 함수 역시 하나의 transaction으로 묶을수 있다는 점이 큰 장점 입니다.

withTransaction 함수 역시 suspend 함수 입니다. 따라서 coroutine scope 내에서 호출 되어야 합니다.

Transaction 또는 suspend function은 room에서 정해진 Thread를 이용하여 동작합니다.

이는 setTransaction 또는 setQueryExecutor를 이용하여 원하는 Thread를 각각 세팅해 줄수도 있습니다.

만약 room에서 사용할 Background thread를 따로 세팅해 줄 경우 LiveData도 영향을 받습니다.

여기서는 thread를 세팅하는 방법을 설명하지는 않습니다. 상세 내용이 필요하다면 글 처음에 링크된 포스팅을 확인하시기 바랍니다.

 

Flow의 사용

위에서 언급 했듯이 Room 2.2부터는 함수의 반환값으로 Flow를 설정할 수 있습니다.

이제 Dao에서 query 함수에 Flow를 넣어 봅니다.

@Dao
interface UsersDao {
    @Query("SELECT * FROM users")
    fun getUsersFlow(): Flow<List<User>>
    @Query("SELECT * FROM users")
    fun getUsersLiveData(): LiveData<List<User>>  ...
}

Flow는 suspend function이 아닙니다.

flow 자체가 coroutine builder이기 때문에 suspend function일 필요가 없는거죠.

사실 쓰임새만 봐서는 LiveData와 다르지 않습니다.

따라서 똑같은 역할을 하는 함수를 return값만 다르게 하여 호출하는 부분이 어떻게 달라지는지 확인해 보겠습니다.

 

위 두개의 함수를 호출하는 부분은 아래와 같습니다.

(ViewModel과 Repository를 이용해야 하지만 일단 여기서는 activity에서 db dao를 직접 호출하도록 합니다.

class MainActivity : BaseActivity() {
    @Inject
    private lateinit var db: UserDatabase
    private lateinit var adapter: UserListAdapter   ...
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        ...
        // Flow를 이용한 user list refresh  
        launch {
            db.userDao().getUsersFlow().collect {
                adapter.setData(it)
                adapter.notifyDataSetChanged () 
            }
        }
        
        // LiveData를 이용한 user list refresh
        db.userDao().getUsersLivedata().observe(this, Observer { 
            adapter.setData(it)
            adapter.notifyDataSetChanged ()
        })
    }
    ...
}

두개의 동작은 동일합니다.

실제 user list를 가져오는 동작 역시 Room이 자체 background thread에서 작업후 호출한 thread로 값을 반환해 줍니다.

따라서 예제 코드처럼 해당 함수들은 main thread에서 함수를 호출하며, background에서 데이터를 처리를 완료하면 그때 데이터를 반환해 줍니다.

 


flow는 cold stream이기 때문에 collect()가 호출되는 순간 동작합니다.

또한 모든 방출(emission)이 끝나야지만 블럭이 종료 됩니다.

Room은 flow로 데이터를 반환하며, 데이터 변동사항이 생길때마다 데이터를 방출합니다.

위 예제에서 누군가 user table에 update, insert, delete를 한다면 내부적으로 emit(List<user>)가 발생하여 collect 내부의 함수가 동작합니다.

Flow를 반환하는 Dao의 getUserFlow()는 suspend function이 아니기 때문에 coroutine Builder로 coroutine scope을 만들어 그 안에서 호출 할 필요는 없습니다.

다만 collect() 함수가 호출되면 해당 flow의 방출이 끝나야지만 해당 코드 라인을 벗어날 수 있습니다. (Thread를 block하는게 아니고 code만 block 됩니다.)

따라서 launch로 감싸주지 않으면 다음 라인인 getUserLiveData()를 호출할 수 없습니다.

여기까지만 보면 LiveData보다 Flow가 나은게 없습니다.^^a

써야 할 이유가 딱히 없습니다.

하지만 Flow는 LiveData보다 다양한 operator를 제공합니다.

예를 들면 map, filter의 중간 operator를 제공하기 때문에 collect로 넘어가기전에 데이터를 원하는 형태로 가공해서 전환 할 수 있습니다.

물론 map, filter의 기능은 LiveData에서 Observer {...} 블럭안에서도 코드로 만들 수도 있습니다.

 

하지만 내부에서 발생하는 Exception의 처리, 데이터의 병합(zip, comblne, flatten)등 다양한 operator를 통해서 좀더 간결한 코드를 만들어 낼수 있습니다.

자세한 사용법은 최상단에 링크해 놓았지만 Flow의 기본 사용 및 operator를  확인 가능합니다.

2019/11/04 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#10- Asynchronous Flow(1/2)

2019/11/16 - [개발이야기/Kotlin] - [Kotlin] 코틀린 - 코루틴#11- Asynchronous Flow(2/2)

 

Flow cancel

flow는 자체적으로 cancel 함수를 제공하지 않습니다.
따라서 flow를 취소하려면 val job = launch{...} 감싼 이후에 launch를 cancel 시키거나, withTimeoutOrNull 같은 timer 함수로 감싸서 취소 시켜야 합니다.
취소와 관련된 내용은 상단 링크의 "코루틴#10 - Asynchronous Flow(1/2)" 에서 확인 가능합니다.
 
만약에 데이터를 일회성으로 한번만 받아야 한다면 어떻게 해야 하는 코드로 알아봅니다.
특정 user의 정보만 받아와야 하는 코드가 있습니다.
@Dao
interface UsersDao {
    @Query("SELECT * FROM users WHERE _id = :userId")
    fun getUserFlow(userId: Long): Flow<User>
    @Query("SELECT * FROM users WHERE _id = :userId")
    fun getUserLiveData(userId: Long): LiveData<User>  ...
}

이런 형태의 코드는 사실 변경점을 지속적으로 받을 필요가 없이 호출시 한번만 전달 받고 취소되어야 합니다.

먼저 liveData에서의 사용법을 보겠습니다

val userLiveData = getUserLiveData(1L)
userLiveData.observe (object : Observer {
    override fun onChanged(user: User) {
        showUser(user)
        userLiveData.removeObserver(this)
    }
})

userId가 1번인 User객체를 Room에서 받아 옵니다.

다만 1회성으로 받아와야 하기 때문에 Observer를 Anonymous object로 만들어 등록하고 사용이 끝내면 내부에서 this를 removeObserver로 해제해 줍니다.

launch {
    getUserFlow(1L).first{ showUser(it) }
}

Flow의 경우 first terminal operator를 사용하면 하나의 emission 이후에 나머지는 취소 됩니다.

cancel 명령어가 없으므로 first 같은 operator를 이용하여 cancel을 유도합니다.

gerUserFlow(1L).onEach { showUser(it) }
.take(1)
.launchIn(this)

take를 이용해서 1개의 값만 반환하고 cancel을 유도할 수도 있습니다.

onEachlaunchIn 같은 operator에 대한 설명은 링크된 Flow 글에서 확인 가능합니다.

상황에 따라 다르겠지만 여기서는 사실 Flow나 Live데이터 보다는 suspend function을 이용하는게 훨씬더 효율적입니다.

즉 Dao와 호출 부분을 아래와 같이 사용하는게 더 좋겠죠?

//DAO를 suspend function으로 변경
interface UsersDao {
    @Query("SELECT * FROM users WHERE _id = :userId")
    suspend fun getUser(userId: Long): User ...
}
// 호출부분
launch { showUser(getUser(1L)) }

coroutine을 사용하지 않았다면 livedata를 이용하여 observer를 뗏다 붙였다 하는 수고로움을 필요로 하거나, 일반적인 Dao function을 background에서 호출하고 결과는 UI에서 처리하도록 부가적인 코드가 들어가야 합니다.

coroutine으로 인한 간결함이 잘 나타나는 예제입니다.

 


Under the hood

Flow에 대해서 좀더 얘기해 봅니다.

Room에서 Flow를 반환하도록 하면 suspend function과 같이 Room이 내부 동작은 Background thread에서 처리해 주고, 결과는 호출한 Thread (정확하게는 호출한 coroutine scope의 coroutine context)로 반환해 줍니다.

Flow의 기본소개 두번째 글에 있듯이 Flow는 특성상 동일한 context에서 producing고 consuming 해야 합니다. (context preservation)

그렇지 않으면 IllegalStateException을 발생시킵니다. (Flow invariant is violated)

따라서 produce는 Background에서 consume은 UI thread에서 하도록 하려면 Flow에서 제공하는 flowOn 이라는 operator를 사용해야 합니다.

추측하건데 Room에서도 flowOn operator를 사용 할거라는 예측이 가능합니다.

하지만 이 operator는 아직 Experimental operator 로써 정식 operator는 아닙니다. (tentative stable 주석이 붙어 있기는 함.)

Room에서 만들어 놓은 Dao의 코드들은 자동생성되어 IDE에서 실제 Room이 auto generation한 코드를 열어볼 수 있습니다.

실제 자동 생성된 코드는 아래와 같습니다.

@Override public Flow getUser(final long userd) {
    final String _sql = "SELECT * FROM users WHERE _id = ?";
    final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
    int _argIndex = 1;
    _statement.bindLong(
        _argIndex,
        cardId
    ); 
    return CoroutinesRoom.createFlow(...
    ...

Flow의 생성은 CoroutineRoom.createFlow() 함수를 통해서 하고 있으니 Room의 소스 코드를 따라가 보면 아래와 같습니다.

https://android.googlesource.com/platform/frameworks/support/+/refs/heads/androidx-master-dev/room/ktx/src/main/java/androidx/room/CoroutinesRoom.kt

@JvmStatic
fun createFlow(db: RoomDatabase, inTransaction: Boolean, tableNames: Array, callable: Callable): Flow<@JvmSuppressWildcards R> =
    flow {     // Observer channel receives signals from the invalidation tracker to emit queries.
        val observerChannel = Channel(Channel.CONFLATED)
        val observer = object : InvalidationTracker.Observer(tableNames) {
            override fun onInvalidated(tables: MutableSet) {
                observerChannel.offer(Unit)
            }
        }
        observerChannel.offer(Unit) // Initial signal to perform first query.
        val flowContext = coroutineContext
        val queryContext = if (inTransaction)
                db.transactionDispatcher
            else
                db.queryDispatcher
        withContext(queryContext) {
            db.invalidationTracker.addObserver(observer)
            try {
                // Iterate until cancelled, transforming observer signals to query results to
                // be emitted to the flow.
                for (signal in observerChannel) {
                    val result = callable.call()
                    withContext(flowContext) { emit(result) }
                }
            } finally {
                db.invalidationTracker.removeObserver(observer)
            }
        }
    }

코드에서 보면 먼저 flow{..} 를 이용하여 Flow를 생성합니다.실제 query를 실행하기 위해서 withContext로 dispatcher를 전환합니다.

이때 dispatcher는 transaction 그외 구문에 따라 기본 설정된 executor를 사용합니다.

실제 IO 작업이 끝나면 결과값은 다시 withContext를 이용하여 caller의 coroutine context로 전환하여 보냅니다.

결론적으로 flowOn을 사용하지 않고,  결과 방출때 마다 withContext로 context를 switching 한다는걸 알 수 있습니다.

 

반응형