diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index 7a4606ab..cded69c6 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -51,6 +51,11 @@ jobs: needs: cancel_previous runs-on: macos-14 steps: + - name: Install yeetd + run: | + wget https://github.com/biscuitehh/yeetd/releases/download/1.0/yeetd-normal.pkg + sudo installer -pkg yeetd-normal.pkg -target / + yeetd & - uses: maxim-lobanov/setup-xcode@v1 with: xcode-version: "15.2" diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index 797088ec..85dd400c 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -228,56 +228,23 @@ extension Analytics { /// called when flush has completed. public func flush(completion: (() -> Void)? = nil) { // only flush if we're enabled. - guard enabled == true else { return } - - let flushGroup = DispatchGroup() - // gotta call enter at least once before we ask to be notified. - flushGroup.enter() + guard enabled == true else { completion?(); return } + let completionGroup = CompletionGroup(queue: configuration.values.flushQueue) apply { plugin in - // we want to enter as soon as possible. waiting to do it from - // another queue just takes too long. - operatingMode.run(queue: configuration.values.flushQueue) { + completionGroup.add { group in if let p = plugin as? FlushCompletion { - // flush handles the groups enter/leave calls - p.flush(group: flushGroup) { plugin in - // we don't really care about the plugin value .. yet. - } + p.flush(group: group) } else if let p = plugin as? EventPlugin { - flushGroup.enter() - // we have no idea if this will be async or not, assume it's sync. + group.enter() p.flush() - flushGroup.leave() + group.leave() } } } - flushGroup.leave() // matches our initial enter(). - - // if we ARE in sync mode, we need to wait on the group. - // This effectively ends up being a `sync` operation. - if operatingMode == .synchronous { - flushGroup.wait() - // we need to call completion on our own since - // we skipped setting up notify. we don't need to do it on - // .main since we are in synchronous mode. - if let completion { completion() } - } else if operatingMode == .asynchronous { - // if we're not, flip over to our serial queue, tell it to wait on the flush - // group to complete if we have a completion to hit. Otherwise, no need to - // wait on completion. - if let completion { - // NOTE: DispatchGroup's `notify` method on linux ended up getting called - // before the tasks have actually completed, so we went with this instead. - OperatingMode.defaultQueue.async { [weak self] in - let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/) - if timedOut == .timedOut { - self?.log(message: "flush(completion:) timed out waiting for completion.") - } - completion() - //DispatchQueue.main.async { completion() } - } - } + completionGroup.run(mode: operatingMode) { + completion?() } } diff --git a/Sources/Segment/Plugins.swift b/Sources/Segment/Plugins.swift index f305896e..19705fb5 100644 --- a/Sources/Segment/Plugins.swift +++ b/Sources/Segment/Plugins.swift @@ -63,7 +63,7 @@ public protocol VersionedPlugin { } public protocol FlushCompletion { - func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) + func flush(group: DispatchGroup) } // For internal platform-specific bits diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index 5b6d846c..824b18da 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -123,15 +123,15 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion // unused .. see flush(group:completion:) } - public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) { + public func flush(group: DispatchGroup) { + group.enter() + defer { group.leave() } + guard let storage = self.storage else { return } guard let analytics = self.analytics else { return } // don't flush if analytics is disabled. guard analytics.enabled == true else { return } - - // enter for the high level flush, allow us time to run through any existing files.. - group.enter() eventCount = 0 cleanupUploads() @@ -143,25 +143,19 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion if pendingUploads == 0 { if type == .file, hasData { - flushFiles(group: group, completion: completion) + flushFiles(group: group) } else if type == .data, hasData { // we know it's a data-based transaction as opposed to file I/O - flushData(group: group, completion: completion) - } else { - // there was nothing to do ... - completion(self) + flushData(group: group) } } else { analytics.log(message: "Skipping processing; Uploads in progress.") } - - // leave for the high level flush - group.leave() } } extension SegmentDestination { - private func flushFiles(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) { + private func flushFiles(group: DispatchGroup) { guard let storage = self.storage else { return } guard let analytics = self.analytics else { return } guard let httpClient = self.httpClient else { return } @@ -175,6 +169,9 @@ extension SegmentDestination { // set up the task let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { [weak self] result in + defer { + group.leave() + } guard let self else { return } switch result { case .success(_): @@ -195,20 +192,19 @@ extension SegmentDestination { // make sure it gets removed and it's cleanup() called rather // than waiting on the next flush to come around. cleanupUploads() - // call the completion - completion(self) - // leave for the url we kicked off. - group.leave() } // we have a legit upload in progress now, so add it to our list. if let upload = uploadTask { add(uploadTask: UploadTaskInfo(url: url, data: nil, task: upload)) + } else { + // we couldn't get a task, so we need to leave the group or things will hang. + group.leave() } } } - private func flushData(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) { + private func flushData(group: DispatchGroup) { // DO NOT CALL THIS FROM THE MAIN THREAD, IT BLOCKS! // Don't make me add a check here; i'll be sad you didn't follow directions. guard let storage = self.storage else { return } @@ -239,6 +235,12 @@ extension SegmentDestination { // set up the task let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data) { [weak self] result in + defer { + // leave for the url we kicked off. + group.leave() + semaphore.signal() + } + guard let self else { return } switch result { case .success(_): @@ -259,16 +261,15 @@ extension SegmentDestination { // make sure it gets removed and it's cleanup() called rather // than waiting on the next flush to come around. cleanupUploads() - // call the completion - completion(self) - // leave for the url we kicked off. - group.leave() - semaphore.signal() } // we have a legit upload in progress now, so add it to our list. if let upload = uploadTask { add(uploadTask: UploadTaskInfo(url: nil, data: data, task: upload)) + } else { + // we couldn't get a task, so we need to leave the group or things will hang. + group.leave() + semaphore.signal() } _ = semaphore.wait(timeout: .distantFuture) diff --git a/Sources/Segment/Utilities/CompletionGroup.swift b/Sources/Segment/Utilities/CompletionGroup.swift new file mode 100644 index 00000000..e2407875 --- /dev/null +++ b/Sources/Segment/Utilities/CompletionGroup.swift @@ -0,0 +1,54 @@ +// +// CompletionGroup.swift +// +// +// Created by Brandon Sneed on 4/17/24. +// + +import Foundation + +class CompletionGroup { + let queue: DispatchQueue + var items = [(DispatchGroup) -> Void]() + + init(queue: DispatchQueue) { + self.queue = queue + } + + func add(workItem: @escaping (DispatchGroup) -> Void) { + items.append(workItem) + } + + func run(mode: OperatingMode, completion: @escaping () -> Void) { + // capture self strongly on purpose + let task: () -> Void = { [self] in + let group = DispatchGroup() + group.enter() + group.notify(queue: queue) { [weak self] in + completion() + self?.items.removeAll() + } + + for item in items { + item(group) + } + + group.leave() + + if mode == .synchronous { + group.wait() + } + } + + switch mode { + case .synchronous: + queue.sync { + task() + } + case .asynchronous: + queue.async { + task() + } + } + } +} diff --git a/Sources/Segment/Utilities/Utils.swift b/Sources/Segment/Utilities/Utils.swift index 6e11385a..27914c1c 100644 --- a/Sources/Segment/Utilities/Utils.swift +++ b/Sources/Segment/Utilities/Utils.swift @@ -85,4 +85,3 @@ internal func eventStorageDirectory(writeKey: String) -> URL { return segmentURL } - diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift deleted file mode 100644 index b56db817..00000000 --- a/Tests/LinuxMain.swift +++ /dev/null @@ -1,7 +0,0 @@ -import XCTest - -import Segment_Tests - -var tests = [XCTestCaseEntry]() -tests += Segment_SwiftTests.allTests() -XCTMain(tests) diff --git a/Tests/Segment-Tests/Analytics_Tests.swift b/Tests/Segment-Tests/Analytics_Tests.swift index b7371bff..a7cba463 100644 --- a/Tests/Segment-Tests/Analytics_Tests.swift +++ b/Tests/Segment-Tests/Analytics_Tests.swift @@ -745,7 +745,6 @@ final class Analytics_Tests: XCTestCase { let shared2 = Analytics.shared() XCTAssertFalse(alive2 === shared2) XCTAssertTrue(shared2 === shared) - } func testAsyncOperatingMode() throws { @@ -755,20 +754,6 @@ final class Analytics_Tests: XCTestCase { .flushAt(9999) .operatingMode(.asynchronous)) - // set the httpclient to use our blocker session - let segment = analytics.find(pluginType: SegmentDestination.self) - let configuration = URLSessionConfiguration.ephemeral - configuration.allowsCellularAccess = true - configuration.timeoutIntervalForResource = 30 - configuration.timeoutIntervalForRequest = 60 - configuration.httpMaximumConnectionsPerHost = 2 - configuration.protocolClasses = [BlockNetworkCalls.self] - configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8", - "Authorization": "Basic test", - "User-Agent": "analytics-ios/\(Analytics.version())"] - let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil) - segment?.httpClient?.session = blockSession - waitUntilStarted(analytics: analytics) analytics.storage.hardReset(doYouKnowHowToUseThis: true) @@ -777,13 +762,16 @@ final class Analytics_Tests: XCTestCase { // put an event in the pipe ... analytics.track(name: "completion test1") + + RunLoop.main.run(until: .distantPast) + // flush it, that'll get us an upload going analytics.flush { // verify completion is called. expectation.fulfill() } - wait(for: [expectation], timeout: 5) + wait(for: [expectation], timeout: 10) XCTAssertNil(analytics.pendingUploads) } @@ -795,20 +783,6 @@ final class Analytics_Tests: XCTestCase { .flushAt(9999) .operatingMode(.synchronous)) - // set the httpclient to use our blocker session - let segment = analytics.find(pluginType: SegmentDestination.self) - let configuration = URLSessionConfiguration.ephemeral - configuration.allowsCellularAccess = true - configuration.timeoutIntervalForResource = 30 - configuration.timeoutIntervalForRequest = 60 - configuration.httpMaximumConnectionsPerHost = 2 - configuration.protocolClasses = [BlockNetworkCalls.self] - configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8", - "Authorization": "Basic test", - "User-Agent": "analytics-ios/\(Analytics.version())"] - let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil) - segment?.httpClient?.session = blockSession - waitUntilStarted(analytics: analytics) analytics.storage.hardReset(doYouKnowHowToUseThis: true) @@ -822,7 +796,7 @@ final class Analytics_Tests: XCTestCase { expectation.fulfill() } - wait(for: [expectation], timeout: 1) + wait(for: [expectation], timeout: 10) XCTAssertNil(analytics.pendingUploads) diff --git a/Tests/Segment-Tests/CompletionGroup_Tests.swift b/Tests/Segment-Tests/CompletionGroup_Tests.swift new file mode 100644 index 00000000..a57fd82c --- /dev/null +++ b/Tests/Segment-Tests/CompletionGroup_Tests.swift @@ -0,0 +1,67 @@ +// +// CompletionGroup_Tests.swift +// +// +// Created by Brandon Sneed on 4/17/24. +// + +import XCTest +@testable import Segment + +final class CompletionGroup_Tests: XCTestCase { + + override func setUpWithError() throws { + // Put setup code here. This method is called before the invocation of each test method in the class. + } + + override func tearDownWithError() throws { + // Put teardown code here. This method is called after the invocation of each test method in the class. + } + + /*func testCompletionGroup() throws { + defer { + RunLoop.main.run() + } + + //let flushQueue = DispatchQueue(label: "com.segment.flush") + let flushQueue = DispatchQueue(label: "com.segment.flush", attributes: .concurrent) + + let group = CompletionGroup(queue: flushQueue) + + group.add { group in + group.enter() + print("item1 - sleeping 10") + sleep(10) + print("item1 - done sleeping") + group.leave() + } + + group.add { group in + group.enter() + print("item2 - launching an async task") + DispatchQueue.global(qos: .background).async { + print("item2 - background, sleeping 5") + sleep(5) + print("item2 - background, done sleeping") + group.leave() + } + } + + group.add { group in + group.enter() + print("item3 - returning real quick") + group.leave() + } + + group.add { group in + print("item4 - not entering group") + } + + group.run(mode: .asynchronous) { + print("all items completed.") + } + + print("test exited.") + }*/ + +}