Skip to content

Add custom HTTP session abilities. #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class Configuration {
var jsonNonConformingNumberStrategy: JSONSafeEncoder.NonConformingFloatEncodingStrategy = .zero
var storageMode: StorageMode = .disk
var anonymousIdGenerator: AnonymousIdGenerator = SegmentAnonymousId()
var httpSession: (() -> any HTTPSession) = HTTPSessions.urlSession
}

internal var values: Values
Expand Down Expand Up @@ -272,6 +273,16 @@ public extension Configuration {
values.anonymousIdGenerator = generator
return self
}

/// Use a custom HTTP session; Useful for non-apple platforms where Swift networking isn't as mature
/// or has issues to work around.
/// - Parameter httpSession: A class conforming to the HTTPSession protocol
/// - Returns: The current configuration
@discardableResult
func httpSession(_ httpSession: @escaping @autoclosure () -> any HTTPSession) -> Configuration {
values.httpSession = httpSession
return self
}
}

extension Analytics {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
internal struct UploadTaskInfo {
let url: URL?
let data: Data?
let task: URLSessionDataTask
let task: DataTask
// set/used via an extension in iOSLifecycleMonitor.swift
typealias CleanupClosure = () -> Void
var cleanup: CleanupClosure? = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public enum HTTPClientErrors: Error {
public class HTTPClient {
private static let defaultAPIHost = "api.segment.io/v1"
private static let defaultCDNHost = "cdn-settings.segment.com/v1"
internal var session: URLSession

internal var session: any HTTPSession
private var apiHost: String
private var apiKey: String
private var cdnHost: String
Expand All @@ -35,7 +35,7 @@ public class HTTPClient {
self.apiHost = analytics.configuration.values.apiHost
self.cdnHost = analytics.configuration.values.cdnHost

self.session = Self.configuredSession(for: self.apiKey)
self.session = analytics.configuration.values.httpSession()
}

func segmentURL(for host: String, path: String) -> URL? {
Expand All @@ -52,7 +52,7 @@ public class HTTPClient {
/// - batch: The array of the events, considered a batch of events.
/// - completion: The closure executed when done. Passes if the task should be retried or not if failed.
@discardableResult
func startBatchUpload(writeKey: String, batch: URL, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> URLSessionDataTask? {
func startBatchUpload(writeKey: String, batch: URL, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> (any DataTask)? {
guard let uploadURL = segmentURL(for: apiHost, path: "/b") else {
self.analytics?.reportInternalError(HTTPClientErrors.failedToOpenBatch)
completion(.failure(HTTPClientErrors.failedToOpenBatch))
Expand All @@ -77,7 +77,7 @@ public class HTTPClient {
/// - batch: The array of the events, considered a batch of events.
/// - completion: The closure executed when done. Passes if the task should be retried or not if failed.
@discardableResult
func startBatchUpload(writeKey: String, data: Data, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> URLSessionDataTask? {
func startBatchUpload(writeKey: String, data: Data, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> (any UploadTask)? {
guard let uploadURL = segmentURL(for: apiHost, path: "/b") else {
self.analytics?.reportInternalError(HTTPClientErrors.failedToOpenBatch)
completion(.failure(HTTPClientErrors.failedToOpenBatch))
Expand Down Expand Up @@ -199,11 +199,4 @@ extension HTTPClient {

return request
}

internal static func configuredSession(for writeKey: String) -> URLSession {
let configuration = URLSessionConfiguration.ephemeral
configuration.httpMaximumConnectionsPerHost = 2
let session = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
return session
}
}
11 changes: 11 additions & 0 deletions Sources/Segment/Utilities/Networking/HTTPSession+Apple.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import Foundation

#if os(Linux) || os(Windows)
import FoundationNetworking
#endif

extension URLSessionDataTask: DataTask {}
extension URLSessionUploadTask: UploadTask {}

// Give the built in `URLSession` conformance to HTTPSession so that it can easily be used
extension URLSession: HTTPSession {}
34 changes: 34 additions & 0 deletions Sources/Segment/Utilities/Networking/HTTPSession.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Foundation

#if os(Linux) || os(Windows)
import FoundationNetworking
#endif

public protocol DataTask {
var state: URLSessionTask.State { get }
func resume()
}

public protocol UploadTask: DataTask {}

// An enumeration of default `HTTPSession` configurations to be used
// This can be extended buy consumer to easily refer back to their configured session.
public enum HTTPSessions {
/// An implementation of `HTTPSession` backed by Apple's `URLSession`.
public static func urlSession() -> any HTTPSession {
let configuration = URLSessionConfiguration.ephemeral
configuration.httpMaximumConnectionsPerHost = 2
let session = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
return session
}
}

public protocol HTTPSession {
associatedtype DataTaskType: DataTask
associatedtype UploadTaskType: UploadTask

func uploadTask(with request: URLRequest, fromFile file: URL, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> UploadTaskType
func uploadTask(with request: URLRequest, from bodyData: Data?, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> UploadTaskType
func dataTask(with request: URLRequest, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> DataTaskType
func finishTasksAndInvalidate()
}
64 changes: 56 additions & 8 deletions Tests/Segment-Tests/HTTPClient_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,76 @@
// Created by Brandon Sneed on 1/21/21.
//

#if !os(Linux)

import XCTest
@testable import Segment

class HTTPClientTests: XCTestCase {

override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class.
RestrictedHTTPSession.reset()
}

override func tearDownWithError() throws {
// Put teardown code here. This method is called after the invocation of each test method in the class.
}

/*func testExample() throws {
func testCustomHTTPSessionUpload() throws {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(
configuration: Configuration(writeKey: "testCustomSesh")
.flushInterval(9999)
.flushAt(9999)
.httpSession(RestrictedHTTPSession())
)

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

analytics.identify(userId: "brandon", traits: MyTraits(email: "[email protected]"))

let flushDone = XCTestExpectation(description: "flush done")
analytics.flush {
flushDone.fulfill()
}

wait(for: [flushDone])

XCTAssertEqual(RestrictedHTTPSession.fileUploads, 1)
}

func testPerformanceExample() throws {
// This is an example of a performance test case.
self.measure {
// Put the code you want to measure the time of here.

func testDefaultHTTPSessionUpload() throws {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(
configuration: Configuration(writeKey: "testDefaultSesh")
.flushInterval(9999)
.flushAt(9999)
)

// reach in and set it, would be the same as the default ultimately
let segment = analytics.find(pluginType: SegmentDestination.self)
XCTAssertTrue(!(segment?.httpClient?.session is RestrictedHTTPSession))
XCTAssertTrue(segment?.httpClient?.session is URLSession)
segment?.httpClient?.session = RestrictedHTTPSession()

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

analytics.identify(userId: "brandon", traits: MyTraits(email: "[email protected]"))

let flushDone = XCTestExpectation(description: "flush done")
analytics.flush {
flushDone.fulfill()
}
}*/


wait(for: [flushDone])

XCTAssertEqual(RestrictedHTTPSession.fileUploads, 1)
}
}

#endif
54 changes: 18 additions & 36 deletions Tests/Segment-Tests/StressTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@
// Created by Brandon Sneed on 11/4/21.
//

#if !os(Linux) && !os(tvOS) && !os(watchOS)

import XCTest
@testable import Segment

class StressTests: XCTestCase {

override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class.
RestrictedHTTPSession.reset()
}

override func tearDownWithError() throws {
// Put teardown code here. This method is called after the invocation of each test method in the class.
}

// Linux doesn't know what URLProtocol is and on tvOS/watchOS it somehow works differently and isn't hit.
#if !os(Linux) && !os(tvOS) && !os(watchOS)
func testDirectoryStorageStress2() throws {
// register our network blocker
guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return }

let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2").errorHandler({ error in
XCTFail("Storage Error: \(error)")
}))
let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2")
.errorHandler({ error in
XCTFail("Storage Error: \(error)")
})
.httpSession(RestrictedHTTPSession())
)

analytics.purgeStorage()
analytics.storage.hardReset(doYouKnowHowToUseThis: true)

Expand All @@ -41,20 +47,6 @@ class StressTests: XCTestCase {

waitUntilStarted(analytics: analytics)

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

@Atomic var ready = false
var queues = [DispatchQueue]()
for i in 0..<30 {
Expand Down Expand Up @@ -110,9 +102,12 @@ class StressTests: XCTestCase {
// register our network blocker
guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return }

let analytics = Analytics(configuration: Configuration(writeKey: "stressTest").errorHandler({ error in
XCTFail("Storage Error: \(error)")
}))
let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2")
.errorHandler({ error in
XCTFail("Storage Error: \(error)")
})
.httpSession(RestrictedHTTPSession())
)
analytics.storage.hardReset(doYouKnowHowToUseThis: true)

DirectoryStore.fileValidator = { url in
Expand All @@ -126,20 +121,6 @@ class StressTests: XCTestCase {

waitUntilStarted(analytics: analytics)

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

let writeQueue1 = DispatchQueue(label: "write queue 1", attributes: .concurrent)
let writeQueue2 = DispatchQueue(label: "write queue 2", attributes: .concurrent)
let writeQueue3 = DispatchQueue(label: "write queue 3", attributes: .concurrent)
Expand Down Expand Up @@ -317,5 +298,6 @@ class StressTests: XCTestCase {
RunLoop.main.run(until: Date.distantPast)
}
}
#endif
}

#endif
55 changes: 55 additions & 0 deletions Tests/Segment-Tests/Support/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,61 @@ extension XCTestCase {

#if !os(Linux)

class RestrictedHTTPSession: HTTPSession {
let sesh: URLSession
static var fileUploads: Int = 0
static var dataUploads: Int = 0
static var dataTasks: Int = 0
static var invalidated: Int = 0

init(blocking: Bool = true, failing: Bool = false) {
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]

var protos = [URLProtocol.Type]()
if blocking { protos.append(BlockNetworkCalls.self) }
if failing { protos.append(FailedNetworkCalls.self) }
configuration.protocolClasses = protos

sesh = URLSession(configuration: configuration)
}

static func reset() {
fileUploads = 0
dataUploads = 0
dataTasks = 0
invalidated = 0
}

func uploadTask(with request: URLRequest, fromFile file: URL, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> URLSessionUploadTask {
defer { Self.fileUploads += 1 }
return sesh.uploadTask(with: request, fromFile: file, completionHandler: completionHandler)
}

func uploadTask(with request: URLRequest, from bodyData: Data?, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> URLSessionUploadTask {
defer { Self.dataUploads += 1 }
return sesh.uploadTask(with: request, from: bodyData, completionHandler: completionHandler)
}

func dataTask(with request: URLRequest, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> URLSessionDataTask {
defer { Self.dataTasks += 1 }
return sesh.dataTask(with: request, completionHandler: completionHandler)
}

func finishTasksAndInvalidate() {
defer { Self.invalidated += 1 }
sesh.finishTasksAndInvalidate()
}
}



class BlockNetworkCalls: URLProtocol {
var initialURL: URL? = nil
override class func canInit(with request: URLRequest) -> Bool {
Expand Down
Loading