Skip to content

Clear inProcess state on startup of outgoing mutation queue #391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Copyright 2018-2020 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import Foundation

final class MutationEventClearState {

let storageAdapter: StorageEngineAdapter
init(storageAdapter: StorageEngineAdapter) {
self.storageAdapter = storageAdapter
}

func clearStateOutgoingMutations(completion: @escaping BasicClosure) {
let fields = MutationEvent.keys
let predicate = fields.inProcess == true
let orderClause = """
ORDER BY \(MutationEvent.keys.createdAt.stringValue) ASC
"""

storageAdapter.query(MutationEvent.self,
predicate: predicate,
paginationInput: nil,
additionalStatements: orderClause) { result in
switch result {
case .failure(let dataStoreError):
log.error("Failed on clearStateOutgoingMutations: \(dataStoreError)")
case .success(let mutationEvents):
if !mutationEvents.isEmpty {
updateMutationsState(mutationEvents: mutationEvents,
completion: completion)
} else {
completion()
}
}
}
}

private func updateMutationsState(mutationEvents: [MutationEvent], completion: @escaping BasicClosure) {
var numMutationEventsUpdated = 0
for mutationEvent in mutationEvents {
var inProcessEvent = mutationEvent
inProcessEvent.inProcess = false
storageAdapter.save(inProcessEvent, condition: nil, completion: { result in
switch result {
case .success:
numMutationEventsUpdated += 1
if numMutationEventsUpdated >= mutationEvents.count {
completion()
}
Comment on lines +52 to +54
Copy link
Contributor

@drochetti drochetti Apr 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

man, we really need something like Promise.all() in Swift - just ranting, no action needed :D

case .failure(let error):
self.log.error("Failed to update mutationEvent:\(error)")
}
})
}
}

}

@available(iOS 13.0, *)
extension MutationEventClearState: DefaultLogger { }
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ extension RemoteSyncEngine {
case receivedStart

case pausedSubscriptions
case pausedMutationQueue(APICategoryGraphQLBehavior, StorageEngineAdapter)
case pausedMutationQueue(StorageEngineAdapter)
case clearedStateOutgoingMutations(APICategoryGraphQLBehavior, StorageEngineAdapter)
case initializedSubscriptions
case performedInitialSync
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher)
Expand All @@ -41,6 +42,8 @@ extension RemoteSyncEngine {
return "pausedSubscriptions"
case .pausedMutationQueue:
return "pausedMutationQueue"
case .clearedStateOutgoingMutations:
return "resetStateOutgoingMutations"
case .initializedSubscriptions:
return "initializedSubscriptions"
case .performedInitialSync:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ extension RemoteSyncEngine {
case (.pausingSubscriptions, .pausedSubscriptions):
return .pausingMutationQueue

case (.pausingMutationQueue, .pausedMutationQueue(let api, let storageEngineAdapter)):
case (.pausingMutationQueue, .pausedMutationQueue(let storageEngineAdapter)):
return .clearingStateOutgoingMutations(storageEngineAdapter)

case (.clearingStateOutgoingMutations, .clearedStateOutgoingMutations(let api, let storageEngineAdapter)):
return .initializingSubscriptions(api, storageEngineAdapter)

case (.initializingSubscriptions, .initializedSubscriptions):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extension RemoteSyncEngine {

case pausingSubscriptions
case pausingMutationQueue
case clearingStateOutgoingMutations(StorageEngineAdapter)
case initializingSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter)
case performingInitialSync
case activatingCloudSubscriptions
Expand All @@ -38,6 +39,8 @@ extension RemoteSyncEngine {
return "pausingSubscriptions"
case .pausingMutationQueue:
return "pausingMutationQueue"
case .clearingStateOutgoingMutations:
return "clearingStateOutgoingMutations"
case .initializingSubscriptions:
return "initializingSubscriptions"
case .performingInitialSync:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
pauseSubscriptions()
case .pausingMutationQueue:
pauseMutations()
case .clearingStateOutgoingMutations(let storageAdapter):
clearStateOutgoingMutations(storageAdapter: storageAdapter)
case .initializingSubscriptions(let api, let storageAdapter):
initializeSubscriptions(api: api, storageAdapter: storageAdapter)
case .performingInitialSync:
Expand Down Expand Up @@ -209,8 +211,19 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
outgoingMutationQueue.pauseSyncingToCloud()

remoteSyncTopicPublisher.send(.mutationsPaused)
if let api = self.api, let storageAdapter = self.storageAdapter {
stateMachine.notify(action: .pausedMutationQueue(api, storageAdapter))
if let storageAdapter = self.storageAdapter {
stateMachine.notify(action: .pausedMutationQueue(storageAdapter))
}
}

private func clearStateOutgoingMutations(storageAdapter: StorageEngineAdapter) {
log.debug(#function)
let mutationEventClearState = MutationEventClearState(storageAdapter: storageAdapter)
mutationEventClearState.clearStateOutgoingMutations {
if let api = self.api {
self.remoteSyncTopicPublisher.send(.clearedStateOutgoingMutations)
self.stateMachine.notify(action: .clearedStateOutgoingMutations(api, storageAdapter))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum RemoteSyncEngineEvent {
case storageAdapterAvailable
case subscriptionsPaused
case mutationsPaused
case clearedStateOutgoingMutations
case subscriptionsInitialized
case performedInitialSync
case subscriptionsActivated
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//
// Copyright 2018-2020 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import XCTest

@testable import Amplify
@testable import AmplifyTestCommon
@testable import AWSPluginsCore
@testable import AWSDataStoreCategoryPlugin

class MutationEventClearStateTests: XCTestCase {
var mockStorageAdapter: MockSQLiteStorageEngineAdapter!
var mutationEventClearState: MutationEventClearState!

override func setUp() {
mockStorageAdapter = MockSQLiteStorageEngineAdapter()
mutationEventClearState = MutationEventClearState(storageAdapter: mockStorageAdapter)
}

func testInProcessIsSetFromTrueToFalse() {
let queryExpectation = expectation(description: "query is called")
let saveExpectation = expectation(description: "save is Called")
let completionExpectation = expectation(description: "completion handler is called")

let queryResponder = QueryModelTypePredicateAdditionalStatementsResponder<MutationEvent> { _, _, _ in
queryExpectation.fulfill()
var mutationEvent = MutationEvent(modelId: "1111-22",
modelName: "Post",
json: "{}",
mutationType: .create)
mutationEvent.inProcess = true
return .success([mutationEvent])
}
mockStorageAdapter.responders[.queryModelTypePredicateAdditionalStatements] = queryResponder

let saveResponder = SaveModelCompletionResponder<MutationEvent> { model, completion in
XCTAssertEqual("1111-22", model.modelId)
XCTAssertFalse(model.inProcess)
saveExpectation.fulfill()
completion(.success(model))
}
mockStorageAdapter.responders[.saveModelCompletion] = saveResponder

mutationEventClearState.clearStateOutgoingMutations {
completionExpectation.fulfill()
}
wait(for: [queryExpectation,
saveExpectation,
completionExpectation], timeout: 1.0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,9 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {

tryOrFail {
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
try saveMutationEvent(of: .create, for: post, inProcess: true)
try setUpDataStore()
try startAmplifyAndWaitForSync()
try saveMutationEvent(of: .create, for: post, inProcess: true)
}

let saveResultReceived = expectation(description: "Save result received")
Expand Down Expand Up @@ -671,10 +671,10 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {

tryOrFail {
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
try saveMutationEvent(of: .create, for: post, inProcess: true)
try savePost(post)
try setUpDataStore()
try startAmplifyAndWaitForSync()
try savePost(post)
try saveMutationEvent(of: .create, for: post, inProcess: true)
}

var mutatedPost = post
Expand Down Expand Up @@ -725,9 +725,9 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {

tryOrFail {
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
try saveMutationEvent(of: .create, for: post, inProcess: true)
try setUpDataStore()
try startAmplifyAndWaitForSync()
try saveMutationEvent(of: .create, for: post, inProcess: true)
}

let deleteResultReceived = expectation(description: "Delete result received")
Expand Down
Loading