Skip to content

Commit 352680b

Browse files
authored
fix: Clear inProcess state on startup of outgoing mutation queue (#391)
1 parent 2c9bc07 commit 352680b

File tree

10 files changed

+199
-31
lines changed

10 files changed

+199
-31
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import Foundation
10+
11+
final class MutationEventClearState {
12+
13+
let storageAdapter: StorageEngineAdapter
14+
init(storageAdapter: StorageEngineAdapter) {
15+
self.storageAdapter = storageAdapter
16+
}
17+
18+
func clearStateOutgoingMutations(completion: @escaping BasicClosure) {
19+
let fields = MutationEvent.keys
20+
let predicate = fields.inProcess == true
21+
let orderClause = """
22+
ORDER BY \(MutationEvent.keys.createdAt.stringValue) ASC
23+
"""
24+
25+
storageAdapter.query(MutationEvent.self,
26+
predicate: predicate,
27+
paginationInput: nil,
28+
additionalStatements: orderClause) { result in
29+
switch result {
30+
case .failure(let dataStoreError):
31+
log.error("Failed on clearStateOutgoingMutations: \(dataStoreError)")
32+
case .success(let mutationEvents):
33+
if !mutationEvents.isEmpty {
34+
updateMutationsState(mutationEvents: mutationEvents,
35+
completion: completion)
36+
} else {
37+
completion()
38+
}
39+
}
40+
}
41+
}
42+
43+
private func updateMutationsState(mutationEvents: [MutationEvent], completion: @escaping BasicClosure) {
44+
var numMutationEventsUpdated = 0
45+
for mutationEvent in mutationEvents {
46+
var inProcessEvent = mutationEvent
47+
inProcessEvent.inProcess = false
48+
storageAdapter.save(inProcessEvent, condition: nil, completion: { result in
49+
switch result {
50+
case .success:
51+
numMutationEventsUpdated += 1
52+
if numMutationEventsUpdated >= mutationEvents.count {
53+
completion()
54+
}
55+
case .failure(let error):
56+
self.log.error("Failed to update mutationEvent:\(error)")
57+
}
58+
})
59+
}
60+
}
61+
62+
}
63+
64+
@available(iOS 13.0, *)
65+
extension MutationEventClearState: DefaultLogger { }

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Action.swift

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ extension RemoteSyncEngine {
1717
case receivedStart
1818

1919
case pausedSubscriptions
20-
case pausedMutationQueue(APICategoryGraphQLBehavior, StorageEngineAdapter)
20+
case pausedMutationQueue(StorageEngineAdapter)
21+
case clearedStateOutgoingMutations(APICategoryGraphQLBehavior, StorageEngineAdapter)
2122
case initializedSubscriptions
2223
case performedInitialSync
2324
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher)
@@ -41,6 +42,8 @@ extension RemoteSyncEngine {
4142
return "pausedSubscriptions"
4243
case .pausedMutationQueue:
4344
return "pausedMutationQueue"
45+
case .clearedStateOutgoingMutations:
46+
return "resetStateOutgoingMutations"
4447
case .initializedSubscriptions:
4548
return "initializedSubscriptions"
4649
case .performedInitialSync:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Resolver.swift

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ extension RemoteSyncEngine {
2020
case (.pausingSubscriptions, .pausedSubscriptions):
2121
return .pausingMutationQueue
2222

23-
case (.pausingMutationQueue, .pausedMutationQueue(let api, let storageEngineAdapter)):
23+
case (.pausingMutationQueue, .pausedMutationQueue(let storageEngineAdapter)):
24+
return .clearingStateOutgoingMutations(storageEngineAdapter)
25+
26+
case (.clearingStateOutgoingMutations, .clearedStateOutgoingMutations(let api, let storageEngineAdapter)):
2427
return .initializingSubscriptions(api, storageEngineAdapter)
2528

2629
case (.initializingSubscriptions, .initializedSubscriptions):

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+State.swift

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ extension RemoteSyncEngine {
1616

1717
case pausingSubscriptions
1818
case pausingMutationQueue
19+
case clearingStateOutgoingMutations(StorageEngineAdapter)
1920
case initializingSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter)
2021
case performingInitialSync
2122
case activatingCloudSubscriptions
@@ -38,6 +39,8 @@ extension RemoteSyncEngine {
3839
return "pausingSubscriptions"
3940
case .pausingMutationQueue:
4041
return "pausingMutationQueue"
42+
case .clearingStateOutgoingMutations:
43+
return "clearingStateOutgoingMutations"
4144
case .initializingSubscriptions:
4245
return "initializingSubscriptions"
4346
case .performingInitialSync:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift

+15-2
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
135135
pauseSubscriptions()
136136
case .pausingMutationQueue:
137137
pauseMutations()
138+
case .clearingStateOutgoingMutations(let storageAdapter):
139+
clearStateOutgoingMutations(storageAdapter: storageAdapter)
138140
case .initializingSubscriptions(let api, let storageAdapter):
139141
initializeSubscriptions(api: api, storageAdapter: storageAdapter)
140142
case .performingInitialSync:
@@ -209,8 +211,19 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
209211
outgoingMutationQueue.pauseSyncingToCloud()
210212

211213
remoteSyncTopicPublisher.send(.mutationsPaused)
212-
if let api = self.api, let storageAdapter = self.storageAdapter {
213-
stateMachine.notify(action: .pausedMutationQueue(api, storageAdapter))
214+
if let storageAdapter = self.storageAdapter {
215+
stateMachine.notify(action: .pausedMutationQueue(storageAdapter))
216+
}
217+
}
218+
219+
private func clearStateOutgoingMutations(storageAdapter: StorageEngineAdapter) {
220+
log.debug(#function)
221+
let mutationEventClearState = MutationEventClearState(storageAdapter: storageAdapter)
222+
mutationEventClearState.clearStateOutgoingMutations {
223+
if let api = self.api {
224+
self.remoteSyncTopicPublisher.send(.clearedStateOutgoingMutations)
225+
self.stateMachine.notify(action: .clearedStateOutgoingMutations(api, storageAdapter))
226+
}
214227
}
215228
}
216229

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngineBehavior.swift

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ enum RemoteSyncEngineEvent {
1212
case storageAdapterAvailable
1313
case subscriptionsPaused
1414
case mutationsPaused
15+
case clearedStateOutgoingMutations
1516
case subscriptionsInitialized
1617
case performedInitialSync
1718
case subscriptionsActivated
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Foundation
9+
import XCTest
10+
11+
@testable import Amplify
12+
@testable import AmplifyTestCommon
13+
@testable import AWSPluginsCore
14+
@testable import AWSDataStoreCategoryPlugin
15+
16+
class MutationEventClearStateTests: XCTestCase {
17+
var mockStorageAdapter: MockSQLiteStorageEngineAdapter!
18+
var mutationEventClearState: MutationEventClearState!
19+
20+
override func setUp() {
21+
mockStorageAdapter = MockSQLiteStorageEngineAdapter()
22+
mutationEventClearState = MutationEventClearState(storageAdapter: mockStorageAdapter)
23+
}
24+
25+
func testInProcessIsSetFromTrueToFalse() {
26+
let queryExpectation = expectation(description: "query is called")
27+
let saveExpectation = expectation(description: "save is Called")
28+
let completionExpectation = expectation(description: "completion handler is called")
29+
30+
let queryResponder = QueryModelTypePredicateAdditionalStatementsResponder<MutationEvent> { _, _, _ in
31+
queryExpectation.fulfill()
32+
var mutationEvent = MutationEvent(modelId: "1111-22",
33+
modelName: "Post",
34+
json: "{}",
35+
mutationType: .create)
36+
mutationEvent.inProcess = true
37+
return .success([mutationEvent])
38+
}
39+
mockStorageAdapter.responders[.queryModelTypePredicateAdditionalStatements] = queryResponder
40+
41+
let saveResponder = SaveModelCompletionResponder<MutationEvent> { model, completion in
42+
XCTAssertEqual("1111-22", model.modelId)
43+
XCTAssertFalse(model.inProcess)
44+
saveExpectation.fulfill()
45+
completion(.success(model))
46+
}
47+
mockStorageAdapter.responders[.saveModelCompletion] = saveResponder
48+
49+
mutationEventClearState.clearStateOutgoingMutations {
50+
completionExpectation.fulfill()
51+
}
52+
wait(for: [queryExpectation,
53+
saveExpectation,
54+
completionExpectation], timeout: 1.0)
55+
}
56+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/MutationQueue/MutationIngesterConflictResolutionTests.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -622,9 +622,9 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {
622622

623623
tryOrFail {
624624
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
625-
try saveMutationEvent(of: .create, for: post, inProcess: true)
626625
try setUpDataStore()
627626
try startAmplifyAndWaitForSync()
627+
try saveMutationEvent(of: .create, for: post, inProcess: true)
628628
}
629629

630630
let saveResultReceived = expectation(description: "Save result received")
@@ -671,10 +671,10 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {
671671

672672
tryOrFail {
673673
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
674-
try saveMutationEvent(of: .create, for: post, inProcess: true)
675-
try savePost(post)
676674
try setUpDataStore()
677675
try startAmplifyAndWaitForSync()
676+
try savePost(post)
677+
try saveMutationEvent(of: .create, for: post, inProcess: true)
678678
}
679679

680680
var mutatedPost = post
@@ -725,9 +725,9 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {
725725

726726
tryOrFail {
727727
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
728-
try saveMutationEvent(of: .create, for: post, inProcess: true)
729728
try setUpDataStore()
730729
try startAmplifyAndWaitForSync()
730+
try saveMutationEvent(of: .create, for: post, inProcess: true)
731731
}
732732

733733
let deleteResultReceived = expectation(description: "Delete result received")

0 commit comments

Comments
 (0)