Skip to content

Commit 782de9c

Browse files
committed
Added retry/restart logic to RemoteSyncEngine without cancel downstream components
1 parent bd82f59 commit 782de9c

File tree

11 files changed

+200
-52
lines changed

11 files changed

+200
-52
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Action.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ extension RemoteSyncEngine {
2323
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher)
2424
case activatedMutationQueue
2525
case notifiedSyncStarted
26+
case cleanedUp(AmplifyError?)
27+
case scheduleRestart(AmplifyError?)
2628

2729
// Terminal actions
2830
case receivedCancel
29-
case errored(AmplifyError)
31+
case errored(AmplifyError?)
3032

3133
var displayName: String {
3234
switch self {
@@ -46,6 +48,10 @@ extension RemoteSyncEngine {
4648
return "activatedMutationQueue"
4749
case .notifiedSyncStarted:
4850
return "notifiedSyncStarted"
51+
case .cleanedUp:
52+
return "cleanedUp"
53+
case .scheduleRestart:
54+
return "scheduleRestart"
4955
case .receivedCancel:
5056
return "receivedCancel"
5157
case .errored:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,11 @@ import Foundation
1313
extension RemoteSyncEngine {
1414
@available(iOS 13.0, *)
1515
func onReceiveCompletion(receiveCompletion: Subscribers.Completion<DataStoreError>) {
16-
if case .failure(let error) = receiveCompletion {
17-
remoteSyncTopicPublisher.send(completion: .failure(error))
18-
}
19-
if case .finished = receiveCompletion {
20-
let unexpectedFinishError = DataStoreError.unknown("ReconcilationQueue sent .finished message",
21-
AmplifyErrorMessages.shouldNotHappenReportBugToAWS(),
22-
nil)
23-
remoteSyncTopicPublisher.send(completion: .failure(unexpectedFinishError))
16+
switch receiveCompletion {
17+
case .failure(let error):
18+
stateMachine.notify(action: .errored(error))
19+
case .finished:
20+
stateMachine.notify(action: .errored(nil))
2421
}
2522
}
2623

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Resolver.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,22 @@ extension RemoteSyncEngine {
3737

3838
case (.activateMutationQueue, .activatedMutationQueue):
3939
return .notifySyncStarted
40+
4041
case (.activateMutationQueue, .errored(let error)):
4142
return .cleanup(error)
4243

4344
case (.notifySyncStarted, .notifiedSyncStarted):
4445
return .syncEngineActive
4546

47+
case (.syncEngineActive, .errored(let error)):
48+
return .cleanup(error)
49+
50+
case (.cleanup, .cleanedUp(let error)):
51+
return .scheduleRestart(error)
52+
53+
case (.scheduleRestart, .receivedStart):
54+
return .pauseSubscriptions
55+
4656
default:
4757
log.warn("Unexpected state transition. In \(currentState.displayName), got \(action.displayName)")
4858
log.verbose("Unexpected state transition. In \(currentState), got \(action)")
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Foundation
10+
11+
@available(iOS 13.0, *)
12+
extension RemoteSyncEngine {
13+
14+
func resetCurrentAttemptNumber() {
15+
currentAttemptNumber = 1
16+
}
17+
18+
func scheduleRestart(error: AmplifyError?) {
19+
let advice = getRetryAdviceIfRetryable(error: error)
20+
if advice.shouldRetry {
21+
scheduleRestart(advice: advice)
22+
} else {
23+
if let error = error {
24+
remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.api(error)))
25+
} else {
26+
remoteSyncTopicPublisher.send(completion: .finished)
27+
}
28+
}
29+
30+
}
31+
32+
private func getRetryAdviceIfRetryable(error: Error?) -> RequestRetryAdvice {
33+
//TODO: Parse error from the receive completion to use as an input into getting retry advice.
34+
// For now, specifying not connected to internet to force a retry up to our maximum
35+
let urlError = URLError(.notConnectedToInternet)
36+
let advice = requestRetryablePolicy.retryRequestAdvice(urlError: urlError,
37+
httpURLResponse: nil,
38+
attemptNumber: currentAttemptNumber)
39+
return advice
40+
}
41+
42+
private func scheduleRestart(advice: RequestRetryAdvice) {
43+
log.verbose("\(#function) scheduling retry for restarting remote sync engine")
44+
resolveReachabilityPublisher()
45+
mutationRetryNotifier = MutationRetryNotifier(advice: advice,
46+
networkReachabilityPublisher: networkReachabilityPublisher) {
47+
self.mutationRetryNotifier = nil
48+
self.stateMachine.notify(action: .receivedStart)
49+
}
50+
currentAttemptNumber += 1
51+
}
52+
53+
private func resolveReachabilityPublisher() {
54+
if networkReachabilityPublisher == nil {
55+
if let reachability = api as? APICategoryReachabilityBehavior {
56+
do {
57+
networkReachabilityPublisher = try reachability.reachabilityPublisher()
58+
} catch {
59+
log.error("\(#function): Unable to listen on reachability: \(error)")
60+
}
61+
}
62+
}
63+
}
64+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+State.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ extension RemoteSyncEngine {
2424

2525
case syncEngineActive
2626

27-
case cleanup(AmplifyError)
27+
case cleanup(AmplifyError?)
28+
case scheduleRestart(AmplifyError?)
29+
2830
var displayName: String {
2931
switch self {
3032
case .notStarted:
@@ -47,6 +49,8 @@ extension RemoteSyncEngine {
4749
return "syncEngineActive"
4850
case .cleanup:
4951
return "cleanup"
52+
case .scheduleRestart:
53+
return "scheduleRestart"
5054
}
5155
}
5256
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,21 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
4444
let stateMachine: StateMachine<State, Action>
4545
private var stateMachineSink: AnyCancellable?
4646

47+
var networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, Never>?
48+
var mutationRetryNotifier: MutationRetryNotifier?
49+
let requestRetryablePolicy: RequestRetryablePolicy
50+
var currentAttemptNumber: Int
51+
4752
/// Initializes the CloudSyncEngine with the specified storageAdapter as the provider for persistence of
4853
/// MutationEvents, sync metadata, and conflict resolution metadata. Immediately initializes the incoming mutation
4954
/// queue so it can begin accepting incoming mutations from DataStore.
5055
convenience init(storageAdapter: StorageEngineAdapter,
5156
outgoingMutationQueue: OutgoingMutationQueueBehavior? = nil,
5257
initialSyncOrchestratorFactory: InitialSyncOrchestratorFactory? = nil,
5358
reconciliationQueueFactory: IncomingEventReconciliationQueueFactory? = nil,
54-
stateMachine: StateMachine<State, Action>? = nil) throws {
59+
stateMachine: StateMachine<State, Action>? = nil,
60+
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, Never>? = nil,
61+
requestRetryablePolicy: RequestRetryablePolicy? = nil) throws {
5562
let mutationDatabaseAdapter = try AWSMutationDatabaseAdapter(storageAdapter: storageAdapter)
5663
let awsMutationEventPublisher = AWSMutationEventPublisher(eventSource: mutationDatabaseAdapter)
5764
let outgoingMutationQueue = outgoingMutationQueue ?? OutgoingMutationQueue()
@@ -61,14 +68,18 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
6168
AWSInitialSyncOrchestrator.init(api:reconciliationQueue:storageAdapter:)
6269
let stateMachine = stateMachine ?? StateMachine(initialState: .notStarted,
6370
resolver: RemoteSyncEngine.Resolver.resolve(currentState:action:))
71+
let requestRetryablePolicy = requestRetryablePolicy ?? RequestRetryablePolicy()
72+
6473

6574
self.init(storageAdapter: storageAdapter,
6675
outgoingMutationQueue: outgoingMutationQueue,
6776
mutationEventIngester: mutationDatabaseAdapter,
6877
mutationEventPublisher: awsMutationEventPublisher,
6978
initialSyncOrchestratorFactory: initialSyncOrchestratorFactory,
7079
reconciliationQueueFactory: reconciliationQueueFactory,
71-
stateMachine: stateMachine)
80+
stateMachine: stateMachine,
81+
networkReachabilityPublisher: networkReachabilityPublisher,
82+
requestRetryablePolicy: requestRetryablePolicy)
7283
}
7384

7485
init(storageAdapter: StorageEngineAdapter,
@@ -77,19 +88,25 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
7788
mutationEventPublisher: MutationEventPublisher,
7889
initialSyncOrchestratorFactory: @escaping InitialSyncOrchestratorFactory,
7990
reconciliationQueueFactory: @escaping IncomingEventReconciliationQueueFactory,
80-
stateMachine: StateMachine<State, Action>) {
91+
stateMachine: StateMachine<State, Action>,
92+
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, Never>?,
93+
requestRetryablePolicy: RequestRetryablePolicy) {
8194
self.storageAdapter = storageAdapter
8295
self.mutationEventIngester = mutationEventIngester
8396
self.mutationEventPublisher = mutationEventPublisher
8497
self.outgoingMutationQueue = outgoingMutationQueue
8598
self.initialSyncOrchestratorFactory = initialSyncOrchestratorFactory
8699
self.reconciliationQueueFactory = reconciliationQueueFactory
87100
self.remoteSyncTopicPublisher = PassthroughSubject<RemoteSyncEngineEvent, DataStoreError>()
101+
self.networkReachabilityPublisher = networkReachabilityPublisher
102+
self.requestRetryablePolicy = requestRetryablePolicy
88103

89104
self.syncQueue = OperationQueue()
90105
syncQueue.name = "com.amazonaws.Amplify.\(AWSDataStorePlugin.self).CloudSyncEngine"
91106
syncQueue.maxConcurrentOperationCount = 1
92107

108+
self.currentAttemptNumber = 1
109+
93110
self.stateMachine = stateMachine
94111
self.stateMachineSink = self.stateMachine
95112
.$state
@@ -128,10 +145,11 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
128145
case .syncEngineActive:
129146
break
130147

131-
case .cleanup(let amplifyError):
132-
//todo
133-
print("error: \(amplifyError)")
148+
case .cleanup(let error):
149+
cleanup(error: error)
134150

151+
case .scheduleRestart(let error):
152+
scheduleRestart(error: error)
135153
}
136154
}
137155

@@ -217,7 +235,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
217235
private func activateCloudSubscriptions() {
218236
log.debug(#function)
219237
reconciliationQueue?.start()
220-
238+
221239
//Notifying the publisher & state machine are handled in:
222240
// RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift
223241
}
@@ -231,10 +249,20 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
231249
stateMachine.notify(action: .activatedMutationQueue)
232250
}
233251

252+
private func cleanup(error: AmplifyError?) {
253+
// TODO:
254+
// handle clean up here
255+
remoteSyncTopicPublisher.send(.cleanedUp)
256+
stateMachine.notify(action: .cleanedUp(error))
257+
}
258+
234259
private func notifySyncStarted() {
260+
resetCurrentAttemptNumber()
235261
Amplify.Hub.dispatch(to: .dataStore,
236262
payload: HubPayload(eventName: HubPayload.EventName.DataStore.syncStarted))
263+
237264
remoteSyncTopicPublisher.send(.syncStarted)
265+
stateMachine.notify(action: .notifiedSyncStarted)
238266
}
239267

240268
func reset(onComplete: () -> Void) {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngineBehavior.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ enum RemoteSyncEngineEvent {
1717
case subscriptionsActivated
1818
case mutationQueueStarted
1919
case syncStarted
20+
case cleanedUp
2021
case mutationEvent(MutationEvent)
2122
}
2223

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/LocalSubscriptionTests.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ class LocalSubscriptionTests: XCTestCase {
4141
mutationEventPublisher: awsMutationEventPublisher,
4242
initialSyncOrchestratorFactory: NoOpInitialSyncOrchestrator.factory,
4343
reconciliationQueueFactory: MockAWSIncomingEventReconciliationQueue.factory,
44-
stateMachine: stateMachine)
44+
stateMachine: stateMachine,
45+
networkReachabilityPublisher: nil,
46+
requestRetryablePolicy: MockRequestRetryablePolicy())
4547

4648
storageEngine = StorageEngine(storageAdapter: storageAdapter,
4749
syncEngine: syncEngine)

0 commit comments

Comments
 (0)