Skip to content

Commit a76bf46

Browse files
authored
Telemetry module for Swift (#369)
1 parent 98f1316 commit a76bf46

21 files changed

+542
-1
lines changed

Sources/Segment/Analytics.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ public class Analytics {
8787

8888
// Get everything running
8989
platformStartup()
90+
91+
Telemetry.shared.increment(metric: Telemetry.INVOKE_METRIC) {it in
92+
it["message"] = "configured"
93+
it["apihost"] = configuration.values.apiHost
94+
it["cdnhost"] = configuration.values.cdnHost
95+
it["flush"] =
96+
"at:\(configuration.values.flushAt) int:\(configuration.values.flushInterval) pol:\(configuration.values.flushPolicies.count)"
97+
it["config"] = "seg:\(configuration.values.autoAddSegmentDestination) ua:\(configuration.values.userAgent ?? "N/A")"
98+
}
9099
}
91100

92101
deinit {

Sources/Segment/Errors.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ extension Analytics {
7373
if fatal {
7474
exceptionFailure("A critical error occurred: \(translatedError)")
7575
}
76+
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
77+
(_ it: inout [String: String]) in
78+
it["error"] = "\(translatedError)"
79+
it["writekey"] = configuration.values.writeKey
80+
it["caller"] = Thread.callStackSymbols[3]
81+
}
7682
}
7783

7884
static public func reportInternalError(_ error: Error, fatal: Bool = false) {
@@ -83,5 +89,9 @@ extension Analytics {
8389
if fatal {
8490
exceptionFailure("A critical error occurred: \(translatedError)")
8591
}
92+
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
93+
(_ it: inout [String: String]) in
94+
it["error"] = "\(translatedError)"
95+
}
8696
}
8797
}

Sources/Segment/Plugins/StartupQueue.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ public class StartupQueue: Plugin, Subscriber {
2020
analytics?.store.subscribe(self) { [weak self] (state: System) in
2121
self?.runningUpdate(state: state)
2222
}
23+
if let store = analytics?.store {
24+
Telemetry.shared.subscribe(store)
25+
}
2326
}
2427
}
2528

Sources/Segment/Timeline.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,20 @@ public class Timeline {
6565
internal class Mediator {
6666
internal func add(plugin: Plugin) {
6767
plugins.append(plugin)
68+
Telemetry.shared.increment(metric: Telemetry.INTEGRATION_METRIC) {
69+
(_ it: inout [String: String]) in
70+
it["message"] = "added"
71+
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
72+
}
6873
}
6974

7075
internal func remove(plugin: Plugin) {
7176
plugins.removeAll { (storedPlugin) -> Bool in
77+
Telemetry.shared.increment(metric: Telemetry.INTEGRATION_METRIC) {
78+
(_ it: inout [String: String]) in
79+
it["message"] = "removed"
80+
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
81+
}
7282
return plugin === storedPlugin
7383
}
7484
}
@@ -86,6 +96,11 @@ internal class Mediator {
8696
} else {
8797
result = plugin.execute(event: r)
8898
}
99+
Telemetry.shared.increment(metric: Telemetry.INTEGRATION_METRIC) {
100+
(_ it: inout [String: String]) in
101+
it["message"] = "event-\(r.type ?? "unknown")"
102+
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
103+
}
89104
}
90105
}
91106

Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
import Foundation
2+
import Sovran
3+
4+
public struct RemoteMetric: Codable {
5+
let type: String
6+
let metric: String
7+
var value: Int
8+
let tags: [String: String]
9+
let log: [String: String]?
10+
11+
init(type: String, metric: String, value: Int, tags: [String: String], log: [String: String]? = nil) {
12+
self.type = type
13+
self.metric = metric
14+
self.value = value
15+
self.tags = tags
16+
self.log = log
17+
}
18+
}
19+
20+
private let METRIC_TYPE = "Counter"
21+
22+
func logError(_ error: Error) {
23+
Analytics.reportInternalError(error)
24+
}
25+
26+
/// A class for sending telemetry data to Segment.
27+
/// This system is used to gather usage and error data from the SDK for the purpose of improving the SDK.
28+
/// It can be disabled at any time by setting Telemetry.shared.enable to false.
29+
/// Errors are sent with a write key, which can be disabled by setting Telemetry.shared.sendWriteKeyOnError to false.
30+
/// All data is downsampled and no PII is collected.
31+
public class Telemetry: Subscriber {
32+
public static let shared = Telemetry(session: HTTPSessions.urlSession())
33+
private static let METRICS_BASE_TAG = "analytics_mobile"
34+
public static let INVOKE_METRIC = "\(METRICS_BASE_TAG).invoke"
35+
public static let INVOKE_ERROR_METRIC = "\(METRICS_BASE_TAG).invoke.error"
36+
public static let INTEGRATION_METRIC = "\(METRICS_BASE_TAG).integration.invoke"
37+
public static let INTEGRATION_ERROR_METRIC = "\(METRICS_BASE_TAG).integration.invoke.error"
38+
39+
init(session: any HTTPSession) {
40+
self.session = session
41+
}
42+
43+
/// A Boolean value indicating whether to enable telemetry.
44+
#if DEBUG
45+
public var enable: Bool = false { // Don't collect data in debug mode (i.e. test environments)
46+
didSet {
47+
if enable {
48+
start()
49+
}
50+
}
51+
}
52+
#else
53+
public var enable: Bool = true {
54+
didSet {
55+
if enable {
56+
start()
57+
}
58+
}
59+
}
60+
#endif
61+
62+
/// A Boolean value indicating whether to send the write key with error metrics.
63+
public var sendWriteKeyOnError: Bool = true
64+
/// A Boolean value indicating whether to send the error log data with error metrics.
65+
public var sendErrorLogData: Bool = false
66+
/// A Callback for reporting errors that occur during telemetry.
67+
public var errorHandler: ((Error) -> Void)? = logError
68+
69+
internal var session: any HTTPSession
70+
internal var host: String = HTTPClient.getDefaultAPIHost()
71+
var sampleRate: Double = 0.10
72+
private var flushTimer: Int = 30 * 1000
73+
internal var maxQueueSize: Int = 20
74+
var errorLogSizeMax: Int = 4000
75+
76+
static private let MAX_QUEUE_BYTES = 28000
77+
var maxQueueBytes: Int = MAX_QUEUE_BYTES {
78+
didSet {
79+
maxQueueBytes = min(maxQueueBytes, Telemetry.MAX_QUEUE_BYTES)
80+
}
81+
}
82+
83+
internal var queue = [RemoteMetric]()
84+
private var queueBytes = 0
85+
private var queueSizeExceeded = false
86+
private var seenErrors = [String: Int]()
87+
internal var started = false
88+
private var rateLimitEndTime: TimeInterval = 0
89+
private var telemetryQueue = DispatchQueue(label: "telemetryQueue")
90+
private var telemetryTimer: Timer?
91+
92+
/// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment.
93+
func start() {
94+
guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return }
95+
started = true
96+
97+
if Double.random(in: 0...1) > sampleRate {
98+
resetQueue()
99+
}
100+
101+
telemetryTimer = Timer.scheduledTimer(withTimeInterval: TimeInterval(flushTimer) / 1000.0, repeats: true) { [weak self] _ in
102+
if (!(self?.enable ?? false)) {
103+
self?.started = false
104+
self?.telemetryTimer?.invalidate()
105+
}
106+
self?.flush()
107+
}
108+
}
109+
110+
/// Resets the telemetry state, including the queue and seen errors.
111+
func reset() {
112+
telemetryTimer?.invalidate()
113+
resetQueue()
114+
seenErrors.removeAll()
115+
started = false
116+
rateLimitEndTime = 0
117+
}
118+
119+
/// Increments a metric with the provided tags.
120+
/// - Parameters:
121+
/// - metric: The metric name.
122+
/// - buildTags: A closure to build the tags dictionary.
123+
func increment(metric: String, buildTags: (inout [String: String]) -> Void) {
124+
var tags = [String: String]()
125+
buildTags(&tags)
126+
127+
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), !tags.isEmpty, queueHasSpace() else { return }
128+
if Double.random(in: 0...1) > sampleRate { return }
129+
130+
addRemoteMetric(metric: metric, tags: tags)
131+
}
132+
133+
/// Logs an error metric with the provided log data and tags.
134+
/// - Parameters:
135+
/// - metric: The metric name.
136+
/// - log: The log data.
137+
/// - buildTags: A closure to build the tags dictionary.
138+
func error(metric: String, log: String, buildTags: (inout [String: String]) -> Void) {
139+
var tags = [String: String]()
140+
buildTags(&tags)
141+
142+
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), !tags.isEmpty, queueHasSpace() else { return }
143+
144+
var filteredTags = tags
145+
if (!sendWriteKeyOnError) {
146+
filteredTags = tags.filter { $0.key.lowercased() != "writekey" }
147+
}
148+
149+
var logData: String? = nil
150+
if (sendErrorLogData) {
151+
logData = String(log.prefix(errorLogSizeMax))
152+
}
153+
154+
if let errorKey = tags["error"] {
155+
if let count = seenErrors[errorKey] {
156+
seenErrors[errorKey] = count + 1
157+
if Double.random(in: 0...1) > sampleRate { return }
158+
addRemoteMetric(metric: metric, tags: filteredTags, value: Int(Double(count) * sampleRate), log: logData)
159+
seenErrors[errorKey] = 0
160+
} else {
161+
addRemoteMetric(metric: metric, tags: filteredTags, log: logData)
162+
flush()
163+
seenErrors[errorKey] = 0
164+
}
165+
} else {
166+
addRemoteMetric(metric: metric, tags: filteredTags, log: logData)
167+
}
168+
}
169+
170+
/// Flushes the telemetry queue, sending the metrics to the server.
171+
internal func flush() {
172+
guard enable else { return }
173+
174+
telemetryQueue.sync {
175+
guard !queue.isEmpty else { return }
176+
if rateLimitEndTime > Date().timeIntervalSince1970 {
177+
return
178+
}
179+
rateLimitEndTime = 0
180+
181+
do {
182+
try send()
183+
queueBytes = 0
184+
} catch {
185+
errorHandler?(error)
186+
sampleRate = 0.0
187+
}
188+
}
189+
}
190+
191+
private func send() throws {
192+
guard sampleRate > 0.0 && sampleRate <= 1.0 else { return }
193+
194+
var sendQueue = [RemoteMetric]()
195+
while !queue.isEmpty {
196+
var metric = queue.removeFirst()
197+
metric.value = Int(Double(metric.value) / sampleRate)
198+
sendQueue.append(metric)
199+
}
200+
queueBytes = 0
201+
queueSizeExceeded = false
202+
203+
let payload = try JSONEncoder().encode(["series": sendQueue])
204+
var request = upload(apiHost: host)
205+
request.httpBody = payload
206+
207+
let task = session.dataTask(with: request) { data, response, error in
208+
if let error = error {
209+
self.errorHandler?(error)
210+
return
211+
}
212+
213+
if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 {
214+
if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) {
215+
self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970
216+
}
217+
}
218+
}
219+
task.resume()
220+
}
221+
222+
private var additionalTags: [String: String] {
223+
var osVersion = ProcessInfo.processInfo.operatingSystemVersionString
224+
let osRegex = try! NSRegularExpression(pattern: "[0-9]+", options: [])
225+
if let match = osRegex.firstMatch(in: osVersion, options: [], range: NSRange(location: 0, length: osVersion.utf16.count)) {
226+
osVersion = (osVersion as NSString).substring(with: match.range)
227+
}
228+
#if os(iOS)
229+
osVersion = "iOS-\(osVersion)"
230+
#elseif os(macOS)
231+
osVersion = "macOS-\(osVersion)"
232+
#elseif os(tvOS)
233+
osVersion = "tvOS-\(osVersion)"
234+
#elseif os(watchOS)
235+
osVersion = "watchOS-\(osVersion)"
236+
#else
237+
osVersion = "unknown-\(osVersion)"
238+
#endif
239+
240+
return [
241+
"os": "\(osVersion)",
242+
"library": "analytics.swift",
243+
"library_version": __segment_version
244+
]
245+
}
246+
247+
private func addRemoteMetric(metric: String, tags: [String: String], value: Int = 1, log: String? = nil) {
248+
let fullTags = tags.merging(additionalTags) { (_, new) in new }
249+
250+
telemetryQueue.sync {
251+
if var found = queue.first(where: { $0.metric == metric && $0.tags == fullTags }) {
252+
found.value += value
253+
return
254+
}
255+
256+
let newMetric = RemoteMetric(
257+
type: METRIC_TYPE,
258+
metric: metric,
259+
value: value,
260+
tags: fullTags,
261+
log: log != nil ? ["timestamp": Date().iso8601() , "trace": log!] : nil
262+
)
263+
let newMetricSize = String(describing: newMetric).data(using: .utf8)?.count ?? 0
264+
if queueBytes + newMetricSize <= maxQueueBytes {
265+
queue.append(newMetric)
266+
queueBytes += newMetricSize
267+
} else {
268+
queueSizeExceeded = true
269+
}
270+
}
271+
}
272+
273+
/// Subscribes to the given store to receive system updates.
274+
/// - Parameter store: The store on which a sampleRate setting is expected.
275+
public func subscribe(_ store: Store) {
276+
store.subscribe(self,
277+
initialState: true,
278+
queue: telemetryQueue,
279+
handler: systemUpdate
280+
)
281+
}
282+
283+
private func systemUpdate(system: System) {
284+
if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue {
285+
self.sampleRate = sampleRate
286+
start()
287+
}
288+
}
289+
290+
private func upload(apiHost: String) -> URLRequest {
291+
var request = URLRequest(url: URL(string: "https://\(apiHost)/m")!)
292+
request.setValue("text/plain", forHTTPHeaderField: "Content-Type")
293+
request.httpMethod = "POST"
294+
295+
return request
296+
}
297+
298+
private func queueHasSpace() -> Bool {
299+
var under = false
300+
telemetryQueue.sync {
301+
under = queue.count < maxQueueSize
302+
}
303+
return under
304+
}
305+
306+
private func resetQueue() {
307+
telemetryQueue.sync {
308+
queue.removeAll()
309+
queueBytes = 0
310+
queueSizeExceeded = false
311+
}
312+
}
313+
}

0 commit comments

Comments
 (0)