- Coroutine Flow
Coroutine에서 Flow는 단일 값만을 반환하는 suspend function과는 달리 여러 값들을 순차적으로 방출할 수 있는 Data Stream이다.
Flow는 순차적인 값을 생성하는 Iterator와 비슷하지만 suspend function을 이용하여 값을 비동기적으로 생성하고 사용한다.
Data Stream은 크게 생산자(Producer), 중개자(Intermediary), 소비자(Consumer)로 구성된다.

생산자(Producer): Stream에 추가되는 데이터를 생산한다. Flow는 비동기적으로 데이터를 생산할 수 있다.
중간 연산자(Intermediary): Stream에서 방출하는 개별 값이나 Stream 그 자체를 수정할 수 있다.
소비자(Consumer): Stream의 값을 사용한다.
- Flow 생성 및 데이터 발행
Flow는 flow builder를 사용하여 쉽게 만들 수 있다. flow{ } 블록 내에서 생산자는 emit()이라는 메서드를 이용하여 값을 생성하고 발행할 수 있다.
하단의 DataSource는 서버로부터 스케줄 정보를 받아오는 Remote DataSource이다. 생성자는 데이터를 발행하고 5초를 기다린 후 반복해서 데이터를 발행하고 있다.
class ScheduleDataSourceImpl @Inject constructor(
private val apiService: ApiService
): ScheduleDataSource {
override suspend fun fetchSchedule(): Flow<ResponseScheduleDTO?> = flow {
while (true) {
val schedule =trainerMainApiService.fetchSchedule().verify()
emit(schedule)
delay(5000)
}
}
}
interface ApiService {
@GET("/api/schedule/today")
suspend fun fetchSchedule(): Response<ResponseScheduleDTO>
}
- 중간 연산자
중간연산자를 활용하여 값을 소비하지 않고도 데이터 스트림을 변경시킬 수 있다.
대표적인 중간 연산자에는 map, filter, onEach등이 있다.
스트림에 중간연산자를 적용하는 것만으로는 데이터를 수집이 트리거되지 않는다.
이렇게 DataSoruce를 이용하여 가져온 DTO형태의 데이터를 Repository에서 domain에서 사용하는 entity로 변경을 시켜줄 수 있다.
class DashBoardRepositoryImpl @Inject constructor(
private val dashBoardDataSource: DashBoardDataSource
): DashBoardRepository {
override suspend fun fetchTrainerSchedule(): Flow<ReservedScheduleEntity?> =
dashBoardDataSource.fetchTrainerSchedule().map {
it?.convertToReservedScheduleEntity()
}
}
- 데이터 소비
터미널 연산자를 이용하면 값 수신대기를 시작한다. 즉 flow 수집을 트리거 할 수 있다.
대표적인 터미널 연산자는 collect(), single(), reduce(), toList() 등이 있다.
데이터를 발행할 때 스트림의 모든 값을 가져오기 위해서는 collect()메서드를 이용한다.
이는 정지함수이므로 코루틴 내에서 실행해야한다.
이렇게 데이터를 발행하고 필요에 따라 변형시킨 후에는 데이터를 사용할 곳에서 소비하면 된다.
예를 들어 데이터를 viewModel에서 소비하고자 한다면 다음과 같이 사용하면 된다.
viewModelScope.launch {
val schedules = fetchScheduleUseCase().collect {
//update
}
}'Coroutines' 카테고리의 다른 글
| [Coroutines] Channel, Pipeline (0) | 2022.02.10 |
|---|---|
| [Coroutines] StateFlow (0) | 2022.02.09 |
| [Coroutines] Job (status, cancel error handling) (0) | 2022.02.08 |
| [Coroutines] Coroutine Basic1 (CoroutineScope, CoroutineContext, Dispatcher, Async, launch) (0) | 2022.02.08 |