Skip to content

Refactored operatingMode related code. #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/swift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ jobs:
needs: cancel_previous
runs-on: macos-14
steps:
- name: Install yeetd
run: |
wget https://github.com/biscuitehh/yeetd/releases/download/1.0/yeetd-normal.pkg
sudo installer -pkg yeetd-normal.pkg -target /
yeetd &
- uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: "15.2"
Expand Down
49 changes: 8 additions & 41 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -228,56 +228,23 @@ extension Analytics {
/// called when flush has completed.
public func flush(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }

let flushGroup = DispatchGroup()
// gotta call enter at least once before we ask to be notified.
flushGroup.enter()
guard enabled == true else { completion?(); return }

let completionGroup = CompletionGroup(queue: configuration.values.flushQueue)
apply { plugin in
// we want to enter as soon as possible. waiting to do it from
// another queue just takes too long.
operatingMode.run(queue: configuration.values.flushQueue) {
completionGroup.add { group in
if let p = plugin as? FlushCompletion {
// flush handles the groups enter/leave calls
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
p.flush(group: group)
} else if let p = plugin as? EventPlugin {
flushGroup.enter()
// we have no idea if this will be async or not, assume it's sync.
group.enter()
p.flush()
flushGroup.leave()
group.leave()
}
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in sync mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify. we don't need to do it on
// .main since we are in synchronous mode.
if let completion { completion() }
} else if operatingMode == .asynchronous {
// if we're not, flip over to our serial queue, tell it to wait on the flush
// group to complete if we have a completion to hit. Otherwise, no need to
// wait on completion.
if let completion {
// NOTE: DispatchGroup's `notify` method on linux ended up getting called
// before the tasks have actually completed, so we went with this instead.
OperatingMode.defaultQueue.async { [weak self] in
let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/)
if timedOut == .timedOut {
self?.log(message: "flush(completion:) timed out waiting for completion.")
}
completion()
//DispatchQueue.main.async { completion() }
}
}
completionGroup.run(mode: operatingMode) {
completion?()
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public protocol VersionedPlugin {
}

public protocol FlushCompletion {
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
func flush(group: DispatchGroup)
}

// For internal platform-specific bits
Expand Down
47 changes: 24 additions & 23 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
// unused .. see flush(group:completion:)
}

public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
public func flush(group: DispatchGroup) {
group.enter()
defer { group.leave() }

guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }

// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// enter for the high level flush, allow us time to run through any existing files..
group.enter()

eventCount = 0
cleanupUploads()
Expand All @@ -143,25 +143,19 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion

if pendingUploads == 0 {
if type == .file, hasData {
flushFiles(group: group, completion: completion)
flushFiles(group: group)
} else if type == .data, hasData {
// we know it's a data-based transaction as opposed to file I/O
flushData(group: group, completion: completion)
} else {
// there was nothing to do ...
completion(self)
flushData(group: group)
}
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}

// leave for the high level flush
group.leave()
}
}

extension SegmentDestination {
private func flushFiles(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
private func flushFiles(group: DispatchGroup) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }
Expand All @@ -175,6 +169,9 @@ extension SegmentDestination {

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { [weak self] result in
defer {
group.leave()
}
guard let self else { return }
switch result {
case .success(_):
Expand All @@ -195,20 +192,19 @@ extension SegmentDestination {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
group.leave()
}

// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(url: url, data: nil, task: upload))
} else {
// we couldn't get a task, so we need to leave the group or things will hang.
group.leave()
}
}
}

private func flushData(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
private func flushData(group: DispatchGroup) {
// DO NOT CALL THIS FROM THE MAIN THREAD, IT BLOCKS!
// Don't make me add a check here; i'll be sad you didn't follow directions.
guard let storage = self.storage else { return }
Expand Down Expand Up @@ -239,6 +235,12 @@ extension SegmentDestination {

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data) { [weak self] result in
defer {
// leave for the url we kicked off.
group.leave()
semaphore.signal()
}

guard let self else { return }
switch result {
case .success(_):
Expand All @@ -259,16 +261,15 @@ extension SegmentDestination {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
group.leave()
semaphore.signal()
}

// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(url: nil, data: data, task: upload))
} else {
// we couldn't get a task, so we need to leave the group or things will hang.
group.leave()
semaphore.signal()
}

_ = semaphore.wait(timeout: .distantFuture)
Expand Down
54 changes: 54 additions & 0 deletions Sources/Segment/Utilities/CompletionGroup.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// CompletionGroup.swift
//
//
// Created by Brandon Sneed on 4/17/24.
//

import Foundation

class CompletionGroup {
let queue: DispatchQueue
var items = [(DispatchGroup) -> Void]()

init(queue: DispatchQueue) {
self.queue = queue
}

func add(workItem: @escaping (DispatchGroup) -> Void) {
items.append(workItem)
}

func run(mode: OperatingMode, completion: @escaping () -> Void) {
// capture self strongly on purpose
let task: () -> Void = { [self] in
let group = DispatchGroup()
group.enter()
group.notify(queue: queue) { [weak self] in
completion()
self?.items.removeAll()
}

for item in items {
item(group)
}

group.leave()

if mode == .synchronous {
group.wait()
}
}

switch mode {
case .synchronous:
queue.sync {
task()
}
case .asynchronous:
queue.async {
task()
}
}
}
}
1 change: 0 additions & 1 deletion Sources/Segment/Utilities/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,3 @@ internal func eventStorageDirectory(writeKey: String) -> URL {
return segmentURL
}


7 changes: 0 additions & 7 deletions Tests/LinuxMain.swift

This file was deleted.

36 changes: 5 additions & 31 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,6 @@ final class Analytics_Tests: XCTestCase {
let shared2 = Analytics.shared()
XCTAssertFalse(alive2 === shared2)
XCTAssertTrue(shared2 === shared)

}

func testAsyncOperatingMode() throws {
Expand All @@ -755,20 +754,6 @@ final class Analytics_Tests: XCTestCase {
.flushAt(9999)
.operatingMode(.asynchronous))

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)
Expand All @@ -777,13 +762,16 @@ final class Analytics_Tests: XCTestCase {

// put an event in the pipe ...
analytics.track(name: "completion test1")

RunLoop.main.run(until: .distantPast)

// flush it, that'll get us an upload going
analytics.flush {
// verify completion is called.
expectation.fulfill()
}

wait(for: [expectation], timeout: 5)
wait(for: [expectation], timeout: 10)

XCTAssertNil(analytics.pendingUploads)
}
Expand All @@ -795,20 +783,6 @@ final class Analytics_Tests: XCTestCase {
.flushAt(9999)
.operatingMode(.synchronous))

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)
Expand All @@ -822,7 +796,7 @@ final class Analytics_Tests: XCTestCase {
expectation.fulfill()
}

wait(for: [expectation], timeout: 1)
wait(for: [expectation], timeout: 10)

XCTAssertNil(analytics.pendingUploads)

Expand Down
Loading
Loading