-
Notifications
You must be signed in to change notification settings - Fork 113
/
Copy pathLambdaRuntime.swift
143 lines (124 loc) · 5.62 KB
/
LambdaRuntime.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOConcurrencyHelpers
import NIOCore
import Synchronization
#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif
// This is our gardian to ensure only one LambdaRuntime is running at the time
// We use an Atomic here to ensure thread safety
private let _isRunning = Atomic<Bool>(false)
// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today.
// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this
// sadly crashes the compiler today (on Linux).
public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: StreamingLambdaHandler {
// TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore
let handlerMutex: NIOLockedValueBox<Handler?>
let logger: Logger
let eventLoop: EventLoop
public init(
handler: sending Handler,
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
self.handlerMutex = NIOLockedValueBox(handler)
self.eventLoop = eventLoop
// by setting the log level here, we understand it can not be changed dynamically at runtime
// developers have to wait for AWS Lambda to dispose and recreate a runtime environment to pickup a change
// this approach is less flexible but more performant than reading the value of the environment variable at each invocation
var log = logger
log.logLevel = Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
self.logger = log
self.logger.debug("LambdaRuntime initialized")
}
/// Make sure only one run() is called at a time
public func run() async throws {
// we use an atomic global variable to ensure only one LambdaRuntime is running at the time
let (_, original) = _isRunning.compareExchange(expected: false, desired: true, ordering: .relaxed)
// if the original value was already true, run() is already running
if original {
throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}
try await withTaskCancellationHandler {
// call the internal _run() method
do {
try await self._run()
} catch {
// when we catch an error, flip back the global variable to false
_isRunning.store(false, ordering: .relaxed)
throw error
}
} onCancel: {
// when task is cancelled, flip back the global variable to false
_isRunning.store(false, ordering: .relaxed)
}
// when we're done without error and without cancellation, flip back the global variable to false
_isRunning.store(false, ordering: .relaxed)
}
private func _run() async throws {
let handler = self.handlerMutex.withLockedValue { handler in
let result = handler
handler = nil
return result
}
guard let handler else {
throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}
// are we running inside an AWS Lambda runtime environment ?
// AWS_LAMBDA_RUNTIME_API is set when running on Lambda
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html
if let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") {
let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) }
try await LambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
} else {
#if LocalServerSupport
// we're not running on Lambda and we're compiled in DEBUG mode,
// let's start a local server for testing
try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"))
{
try await LambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: "127.0.0.1", port: 7000),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
}
#else
// in release mode, we can't start a local server because the local server code is not compiled.
throw LambdaRuntimeError(code: .missingLambdaRuntimeAPIEnvironmentVariable)
#endif
}
}
}