Skip to content

Commit 51b4a1c

Browse files
authored
dataconnect: Improve usage of MutableStateFlow to improve readability (#6840)
1 parent e98dd2c commit 51b4a1c

File tree

5 files changed

+107
-108
lines changed

5 files changed

+107
-108
lines changed

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt

+57-57
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import kotlinx.coroutines.ensureActive
4747
import kotlinx.coroutines.flow.MutableStateFlow
4848
import kotlinx.coroutines.flow.filter
4949
import kotlinx.coroutines.flow.first
50+
import kotlinx.coroutines.flow.getAndUpdate
5051
import kotlinx.coroutines.launch
5152

5253
/** Base class that shares logic for managing the Auth token and AppCheck token. */
@@ -148,9 +149,18 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
148149
*/
149150
fun close() {
150151
logger.debug { "close()" }
152+
151153
weakThis.clear()
152154
coroutineScope.cancel()
153-
setClosedState()
155+
156+
val oldState = state.getAndUpdate { State.Closed }
157+
when (oldState) {
158+
is State.Closed -> {}
159+
is State.New -> {}
160+
is State.StateWithProvider -> {
161+
removeTokenListener(oldState.provider)
162+
}
163+
}
154164
}
155165

156166
/**
@@ -175,51 +185,41 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
175185
logger.debug { "awaitTokenProvider() done: currentState=$currentState" }
176186
}
177187

178-
// This function must ONLY be called from close().
179-
private fun setClosedState() {
180-
while (true) {
181-
val oldState = state.value
182-
val provider: T? =
183-
when (oldState) {
184-
is State.Closed -> return
185-
is State.New -> null
186-
is State.Idle -> oldState.provider
187-
is State.Active -> oldState.provider
188-
}
189-
190-
if (state.compareAndSet(oldState, State.Closed)) {
191-
provider?.let { removeTokenListener(it) }
192-
break
193-
}
194-
}
195-
}
196-
197188
/**
198189
* Sets a flag to force-refresh the token upon the next call to [getToken].
199190
*
200191
* If [close] has been called, this method does nothing.
201192
*/
202193
fun forceRefresh() {
203194
logger.debug { "forceRefresh()" }
204-
while (true) {
205-
val oldState = state.value
206-
val newState: State.StateWithForceTokenRefresh<T> =
207-
when (oldState) {
208-
is State.Closed -> return
209-
is State.New -> oldState.copy(forceTokenRefresh = true)
210-
is State.Idle -> oldState.copy(forceTokenRefresh = true)
211-
is State.Active -> {
212-
val message = "needs token refresh (wgrwbrvjxt)"
213-
oldState.job.cancel(message, ForceRefresh(message))
214-
State.Idle(oldState.provider, forceTokenRefresh = true)
195+
val oldState =
196+
state.getAndUpdate { currentState ->
197+
val newState =
198+
when (currentState) {
199+
is State.Closed -> State.Closed
200+
is State.New -> currentState.copy(forceTokenRefresh = true)
201+
is State.Idle -> currentState.copy(forceTokenRefresh = true)
202+
is State.Active -> State.Idle(currentState.provider, forceTokenRefresh = true)
215203
}
204+
205+
check(newState is State.Closed || newState is State.StateWithForceTokenRefresh<T>) {
206+
"internal error gbazc7qr66: newState should have been Closed or " +
207+
"StateWithForceTokenRefresh, but got: $newState"
208+
}
209+
check((newState as? State.StateWithForceTokenRefresh<T>)?.forceTokenRefresh !== false) {
210+
"internal error fnzwyrsez2: newState.forceTokenRefresh should have been true"
216211
}
217212

218-
check(newState.forceTokenRefresh) {
219-
"newState.forceTokenRefresh should be true (error code gnvr2wx7nz)"
213+
newState
220214
}
221-
if (state.compareAndSet(oldState, newState)) {
222-
break
215+
216+
when (oldState) {
217+
is State.Closed -> {}
218+
is State.New -> {}
219+
is State.Idle -> {}
220+
is State.Active -> {
221+
val message = "needs token refresh (wgrwbrvjxt)"
222+
oldState.job.cancel(message, ForceRefresh(message))
223223
}
224224
}
225225
}
@@ -350,30 +350,30 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
350350
logger.debug { "onProviderAvailable(newProvider=$newProvider)" }
351351
addTokenListener(newProvider)
352352

353-
while (true) {
354-
val oldState = state.value
355-
val newState =
356-
when (oldState) {
357-
is State.Closed -> {
358-
logger.debug {
359-
"onProviderAvailable(newProvider=$newProvider)" +
360-
" unregistering token listener that was just added"
361-
}
362-
removeTokenListener(newProvider)
363-
break
364-
}
365-
is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh)
366-
is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh)
367-
is State.Active -> {
368-
val newProviderClassName = newProvider::class.qualifiedName
369-
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
370-
oldState.job.cancel(message, NewProvider(message))
371-
State.Idle(newProvider, forceTokenRefresh = false)
372-
}
353+
val oldState =
354+
state.getAndUpdate { currentState ->
355+
when (currentState) {
356+
is State.Closed -> State.Closed
357+
is State.New -> State.Idle(newProvider, currentState.forceTokenRefresh)
358+
is State.Idle -> State.Idle(newProvider, currentState.forceTokenRefresh)
359+
is State.Active -> State.Idle(newProvider, forceTokenRefresh = false)
373360
}
361+
}
374362

375-
if (state.compareAndSet(oldState, newState)) {
376-
break
363+
when (oldState) {
364+
is State.Closed -> {
365+
logger.debug {
366+
"onProviderAvailable(newProvider=$newProvider)" +
367+
" unregistering token listener that was just added"
368+
}
369+
removeTokenListener(newProvider)
370+
}
371+
is State.New -> {}
372+
is State.Idle -> {}
373+
is State.Active -> {
374+
val newProviderClassName = newProvider::class.qualifiedName
375+
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
376+
oldState.job.cancel(message, NewProvider(message))
377377
}
378378
}
379379
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt

+30-22
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import kotlinx.coroutines.async
5454
import kotlinx.coroutines.cancel
5555
import kotlinx.coroutines.flow.MutableStateFlow
5656
import kotlinx.coroutines.flow.collect
57+
import kotlinx.coroutines.flow.updateAndGet
5758
import kotlinx.coroutines.runBlocking
5859
import kotlinx.coroutines.sync.Mutex
5960
import kotlinx.coroutines.sync.withLock
@@ -406,35 +407,42 @@ internal class FirebaseDataConnectImpl(
406407
dataConnectAuth.close()
407408
dataConnectAppCheck.close()
408409

409-
// Start the job to asynchronously close the gRPC client.
410-
while (true) {
411-
val oldCloseJob = closeJob.value
412-
413-
oldCloseJob.ref?.let {
414-
if (!it.isCancelled) {
415-
return it
416-
}
410+
// Create the "close job" to asynchronously close the gRPC client.
411+
@OptIn(DelicateCoroutinesApi::class)
412+
val newCloseJob =
413+
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
414+
lazyGrpcRPCs.initializedValueOrNull?.close()
417415
}
416+
newCloseJob.invokeOnCompletion { exception ->
417+
if (exception === null) {
418+
logger.debug { "close() completed successfully" }
419+
} else {
420+
logger.warn(exception) { "close() failed" }
421+
}
422+
}
418423

419-
@OptIn(DelicateCoroutinesApi::class)
420-
val newCloseJob =
421-
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
422-
lazyGrpcRPCs.initializedValueOrNull?.close()
423-
}
424-
425-
newCloseJob.invokeOnCompletion { exception ->
426-
if (exception === null) {
427-
logger.debug { "close() completed successfully" }
424+
// Register the new "close job". Do not overwrite a close job that is already in progress (to
425+
// avoid having more than one close job in progress at a time) or a close job that completed
426+
// successfully (since there is nothing to do if a previous close job was successful).
427+
val updatedCloseJobRef =
428+
closeJob.updateAndGet { currentCloseJobRef: NullableReference<Deferred<Unit>> ->
429+
if (currentCloseJobRef.ref !== null && !currentCloseJobRef.ref.isCancelled) {
430+
currentCloseJobRef
428431
} else {
429-
logger.warn(exception) { "close() failed" }
432+
NullableReference(newCloseJob)
430433
}
431434
}
432435

433-
if (closeJob.compareAndSet(oldCloseJob, NullableReference(newCloseJob))) {
434-
newCloseJob.start()
435-
return newCloseJob
436+
// Start the updated "close job" (if it was already started then start() is a no-op).
437+
val updatedCloseJob =
438+
checkNotNull(updatedCloseJobRef.ref) {
439+
"internal error: closeJob.updateAndGet() returned a NullableReference whose 'ref' " +
440+
"property was null; however it should NOT have been null (error code y5fk4ntdnd)"
436441
}
437-
}
442+
updatedCloseJob.start()
443+
444+
// Return the "close job", which _may_ already be completed, so the caller can await it.
445+
return updatedCloseJob
438446
}
439447

440448
// The generated SDK relies on equals() and hashCode() using object identity.

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt

+10-14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kotlinx.coroutines.cancelAndJoin
2727
import kotlinx.coroutines.flow.Flow
2828
import kotlinx.coroutines.flow.MutableStateFlow
2929
import kotlinx.coroutines.flow.channelFlow
30+
import kotlinx.coroutines.flow.update
3031
import kotlinx.coroutines.launch
3132

3233
internal class QuerySubscriptionImpl<Data, Variables>(query: QueryRefImpl<Data, Variables>) :
@@ -80,22 +81,17 @@ internal class QuerySubscriptionImpl<Data, Variables>(query: QueryRefImpl<Data,
8081
}
8182

8283
private fun updateLastResult(prospectiveLastResult: QuerySubscriptionResultImpl) {
83-
// Update the last result in a compare-and-swap loop so that there is no possibility of
84-
// clobbering a newer result with an older result, compared using their sequence numbers.
8584
// TODO: Fix this so that results from an old query do not clobber results from a new query,
8685
// as set by a call to update()
87-
while (true) {
88-
val currentLastResult = _lastResult.value
89-
if (currentLastResult.ref != null) {
90-
val currentSequenceNumber = currentLastResult.ref.sequencedResult.sequenceNumber
91-
val prospectiveSequenceNumber = prospectiveLastResult.sequencedResult.sequenceNumber
92-
if (currentSequenceNumber >= prospectiveSequenceNumber) {
93-
return
94-
}
95-
}
96-
97-
if (_lastResult.compareAndSet(currentLastResult, NullableReference(prospectiveLastResult))) {
98-
return
86+
_lastResult.update { currentLastResult ->
87+
if (
88+
currentLastResult.ref != null &&
89+
currentLastResult.ref.sequencedResult.sequenceNumber >=
90+
prospectiveLastResult.sequencedResult.sequenceNumber
91+
) {
92+
currentLastResult
93+
} else {
94+
NullableReference(prospectiveLastResult)
9995
}
10096
}
10197
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt

+5-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import kotlinx.coroutines.channels.BufferOverflow
3131
import kotlinx.coroutines.flow.MutableSharedFlow
3232
import kotlinx.coroutines.flow.MutableStateFlow
3333
import kotlinx.coroutines.flow.onSubscription
34+
import kotlinx.coroutines.flow.update
3435
import kotlinx.coroutines.withContext
3536
import kotlinx.serialization.DeserializationStrategy
3637
import kotlinx.serialization.modules.SerializersModule
@@ -84,17 +85,14 @@ internal class RegisteredDataDeserializer<T>(
8485
lazyDeserialize(requestId, sequencedResult)
8586
)
8687

87-
// Use a compare-and-swap ("CAS") loop to ensure that an old update never clobbers a newer one.
88-
while (true) {
89-
val currentUpdate = latestUpdate.value
88+
latestUpdate.update { currentUpdate ->
9089
if (
9190
currentUpdate.ref !== null &&
9291
currentUpdate.ref.sequenceNumber > sequencedResult.sequenceNumber
9392
) {
94-
break // don't clobber a newer update with an older one
95-
}
96-
if (latestUpdate.compareAndSet(currentUpdate, NullableReference(newUpdate))) {
97-
break
93+
currentUpdate // don't clobber a newer update with an older one
94+
} else {
95+
NullableReference(newUpdate)
9896
}
9997
}
10098

firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt

+5-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.google.firebase.dataconnect.testutil
1919
import kotlinx.coroutines.flow.MutableStateFlow
2020
import kotlinx.coroutines.flow.filter
2121
import kotlinx.coroutines.flow.first
22+
import kotlinx.coroutines.flow.update
2223

2324
/**
2425
* An implementation of [java.util.concurrent.CountDownLatch] that suspends instead of blocking.
@@ -60,14 +61,10 @@ class SuspendingCountDownLatch(count: Int) {
6061
* @throws IllegalStateException if called when the count has already reached zero.
6162
*/
6263
fun countDown(): SuspendingCountDownLatch {
63-
while (true) {
64-
val oldValue = _count.value
65-
check(oldValue > 0) { "countDown() called too many times (oldValue=$oldValue)" }
66-
67-
val newValue = oldValue - 1
68-
if (_count.compareAndSet(oldValue, newValue)) {
69-
return this
70-
}
64+
_count.update { currentValue ->
65+
check(currentValue > 0) { "countDown() called too many times (currentValue=$currentValue)" }
66+
currentValue - 1
7167
}
68+
return this
7269
}
7370
}

0 commit comments

Comments
 (0)