Skip to content
This repository was archived by the owner on Jan 10, 2025. It is now read-only.

Add a sample showing how to synchronously await for remote loads to be applied #987

Merged
merged 4 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import androidx.paging.LoadState
import com.android.example.paging.pagingwithnetwork.GlideApp
import com.android.example.paging.pagingwithnetwork.databinding.ActivityRedditBinding
import com.android.example.paging.pagingwithnetwork.reddit.ServiceLocator
import com.android.example.paging.pagingwithnetwork.reddit.paging.asMergedLoadStates
import com.android.example.paging.pagingwithnetwork.reddit.repository.RedditPostRepository
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
Expand Down Expand Up @@ -85,7 +86,7 @@ class RedditActivity : AppCompatActivity() {

lifecycleScope.launchWhenCreated {
adapter.loadStateFlow.collectLatest { loadStates ->
binding.swipeRefresh.isRefreshing = loadStates.refresh is LoadState.Loading
binding.swipeRefresh.isRefreshing = loadStates.mediator?.refresh is LoadState.Loading
}
}

Expand All @@ -97,10 +98,15 @@ class RedditActivity : AppCompatActivity() {

lifecycleScope.launchWhenCreated {
adapter.loadStateFlow
// Only emit when REFRESH LoadState for RemoteMediator changes.
// Use a state-machine to track LoadStates such that we only transition to
// NotLoading from a RemoteMediator load if it was also presented to UI.
.asMergedLoadStates()
// Only emit when REFRESH changes, as we only want to react on loads replacing the
// list.
.distinctUntilChangedBy { it.refresh }
// Only react to cases where Remote REFRESH completes i.e., NotLoading.
// Only react to cases where REFRESH completes i.e., NotLoading.
.filter { it.refresh is LoadState.NotLoading }
// Scroll to top is synchronous with UI updates, even if remote load was triggered.
.collect { binding.list.scrollToPosition(0) }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package com.android.example.paging.pagingwithnetwork.reddit.paging

import androidx.annotation.VisibleForTesting
import androidx.paging.CombinedLoadStates
import androidx.paging.LoadState
import androidx.paging.LoadState.NotLoading
import androidx.paging.LoadState.Loading
import androidx.paging.LoadStates
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.scan
import kotlin.Error
import androidx.paging.PagingDataAdapter
import androidx.paging.RemoteMediator
import androidx.paging.PagingSource
import androidx.paging.LoadType.REFRESH
import androidx.paging.LoadType
import com.android.example.paging.pagingwithnetwork.reddit.paging.MergedState.NOT_LOADING
import com.android.example.paging.pagingwithnetwork.reddit.paging.MergedState.REMOTE_STARTED
import com.android.example.paging.pagingwithnetwork.reddit.paging.MergedState.REMOTE_ERROR
import com.android.example.paging.pagingwithnetwork.reddit.paging.MergedState.SOURCE_ERROR
import com.android.example.paging.pagingwithnetwork.reddit.paging.MergedState.SOURCE_LOADING

/**
* Converts the raw [CombinedLoadStates] [Flow] from [PagingDataAdapter.loadStateFlow] into a new
* [Flow] of [CombinedLoadStates] that track [CombinedLoadStates.mediator] states as they are
* synchronously applied in the UI. Any [Loading] state triggered by [RemoteMediator] will only
* transition back to [NotLoading] after the fetched items have been synchronously shown in UI by a
* successful [PagingSource] load of type [REFRESH].
*
* Note: This class assumes that the [RemoteMediator] implementation always invalidates
* [PagingSource] on a successful fetch, even if no data was modified (which Room does by default).
* Using this class without this guarantee can cause [LoadState] to get indefinitely stuck as
* [Loading] in cases where invalidation doesn't happen because the fetched network data represents
* exactly what is already cached in DB.
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun Flow<CombinedLoadStates>.asMergedLoadStates(): Flow<LoadStates> {
val syncRemoteState = LoadStatesMerger()
return scan(syncRemoteState.toLoadStates()) { _, combinedLoadStates ->
syncRemoteState.updateFromCombinedLoadStates(combinedLoadStates)
syncRemoteState.toLoadStates()
}
}

/**
* Track the combined [LoadState] of [RemoteMediator] and [PagingSource], so that each load type
* is only set to [NotLoading] when [RemoteMediator] load is applied on presenter-side.
*/
private class LoadStatesMerger {
var refresh: LoadState = NotLoading(endOfPaginationReached = false)
private set
var prepend: LoadState = NotLoading(endOfPaginationReached = false)
private set
var append: LoadState = NotLoading(endOfPaginationReached = false)
private set
var refreshState: MergedState = NOT_LOADING
private set
var prependState: MergedState = NOT_LOADING
private set
var appendState: MergedState = NOT_LOADING
private set

fun toLoadStates() = LoadStates(
refresh = refresh,
prepend = prepend,
append = append
)

/**
* For every new emission of [CombinedLoadStates] from the original [Flow], update the
* [MergedState] of each [LoadType] and compute the new [LoadState].
*/
fun updateFromCombinedLoadStates(combinedLoadStates: CombinedLoadStates) {
computeNextLoadStateAndMergedState(
sourceRefreshState = combinedLoadStates.source.refresh,
sourceState = combinedLoadStates.source.refresh,
remoteState = combinedLoadStates.mediator?.refresh,
currentMergedState = refreshState,
).also {
refresh = it.first
refreshState = it.second
}
computeNextLoadStateAndMergedState(
sourceRefreshState = combinedLoadStates.source.refresh,
sourceState = combinedLoadStates.source.prepend,
remoteState = combinedLoadStates.mediator?.prepend,
currentMergedState = prependState,
).also {
prepend = it.first
prependState = it.second
}
computeNextLoadStateAndMergedState(
sourceRefreshState = combinedLoadStates.source.refresh,
sourceState = combinedLoadStates.source.append,
remoteState = combinedLoadStates.mediator?.append,
currentMergedState = appendState,
).also {
append = it.first
appendState = it.second
}
}

/**
* Compute which [LoadState] and [MergedState] to transition, given the previous and current
* state for a particular [LoadType].
*/
private fun computeNextLoadStateAndMergedState(
sourceRefreshState: LoadState,
sourceState: LoadState,
remoteState: LoadState?,
currentMergedState: MergedState,
): Pair<LoadState, MergedState> {
if (remoteState == null) return sourceState to NOT_LOADING

return when (currentMergedState) {
NOT_LOADING -> when (remoteState) {
is Loading -> Loading to REMOTE_STARTED
is Error -> remoteState to REMOTE_ERROR
else -> NotLoading(remoteState.endOfPaginationReached) to NOT_LOADING
}
REMOTE_STARTED -> when {
remoteState is Error -> remoteState to REMOTE_ERROR
sourceRefreshState is Loading -> Loading to SOURCE_LOADING
else -> Loading to REMOTE_STARTED
}
REMOTE_ERROR -> when (remoteState) {
is Error -> remoteState to REMOTE_ERROR
else -> Loading to REMOTE_STARTED
}
SOURCE_LOADING -> when {
sourceRefreshState is Error -> sourceRefreshState to SOURCE_ERROR
remoteState is Error -> remoteState to REMOTE_ERROR
sourceRefreshState is NotLoading -> {
NotLoading(remoteState.endOfPaginationReached) to NOT_LOADING
}
else -> Loading to SOURCE_LOADING
}
SOURCE_ERROR -> when (sourceRefreshState) {
is Error -> sourceRefreshState to SOURCE_ERROR
else -> sourceRefreshState to SOURCE_LOADING
}
}
}
}

/**
* State machine used to compute [LoadState] values in [LoadStatesMerger].
*
* This allows [LoadStatesMerger] to track whether to block transitioning to [NotLoading] from the
* [Loading] state if it was triggered by [RemoteMediator], until [PagingSource] invalidates and
* completes [REFRESH].
*/
private enum class MergedState {
/**
* Idle state; defer to remote state for endOfPaginationReached.
*/
NOT_LOADING,

/**
* Remote load triggered; start listening for source refresh.
*/
REMOTE_STARTED,

/**
* Waiting for remote in error state to get retried
*/
REMOTE_ERROR,

/**
* Source refresh triggered by remote invalidation, once this completes we can be sure
* the next generation was loaded.
*/
SOURCE_LOADING,

/**
* Remote load completed, but waiting for source refresh in error state to get retried.
*/
SOURCE_ERROR,
}