forked from swift-server/swift-aws-lambda-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLambdaRunner.swift
156 lines (144 loc) · 7.07 KB
/
LambdaRunner.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Dispatch
import Logging
import NIO
extension Lambda {
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
internal final class Runner {
private let runtimeClient: RuntimeClient
private let eventLoop: EventLoop
private let allocator: ByteBufferAllocator
private var isGettingNextInvocation = false
init(eventLoop: EventLoop, configuration: Configuration) {
self.eventLoop = eventLoop
self.runtimeClient = RuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.allocator = ByteBufferAllocator()
}
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger, factory: @escaping HandlerFactory) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occured
let context = InitializationContext(logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator)
return factory(context)
// Hopping back to "our" EventLoop is importnant in case the factory returns a future
// that originated from a foreign EventLoop/EventLoopGroup.
// This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
}
}
}
func run(logger: Logger, handler: Handler) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request invocation from lambda runtime engine
self.isGettingNextInvocation = true
return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, event in
// 2. send invocation to handler
self.isGettingNextInvocation = false
let context = Context(logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
invocation: invocation)
logger.debug("sending invocation to lambda handler \(handler)")
return handler.handle(context: context, event: event)
// Hopping back to "our" EventLoop is importnant in case the handler returns a future that
// originiated from a foreign EventLoop/EventLoopGroup.
// This can happen if the handler uses a library (lets say a DB client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.mapResult { result in
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result)
}
}.flatMap { invocation, result in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("could not report results to lambda runtime engine: \(error)")
}
}
}
/// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane)
/// only needed for debugging purposes.
func cancelWaitingForNextInvocation() {
if self.isGettingNextInvocation {
self.runtimeClient.cancel()
}
}
}
}
private extension Lambda.Context {
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, invocation: Lambda.Invocation) {
self.init(requestID: invocation.requestID,
traceID: invocation.traceID,
invokedFunctionARN: invocation.invokedFunctionARN,
deadline: DispatchWallTime(millisSinceEpoch: invocation.deadlineInMillisSinceEpoch),
cognitoIdentity: invocation.cognitoIdentity,
clientContext: invocation.clientContext,
logger: logger,
eventLoop: eventLoop,
allocator: allocator)
}
}
// TODO: move to nio?
extension EventLoopFuture {
// callback does not have side effects, failing with original result
func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture<Value> {
self.flatMapError { error in
callback(error)
return self
}
}
// callback does not have side effects, failing with original result
func peekError(_ callback: @escaping (Error) -> EventLoopFuture<Void>) -> EventLoopFuture<Value> {
self.flatMapError { error in
let promise = self.eventLoop.makePromise(of: Value.self)
callback(error).whenComplete { _ in
promise.completeWith(self)
}
return promise.futureResult
}
}
func mapResult<NewValue>(_ callback: @escaping (Result<Value, Error>) -> NewValue) -> EventLoopFuture<NewValue> {
self.map { value in
callback(.success(value))
}.flatMapErrorThrowing { error in
callback(.failure(error))
}
}
}
private extension Result {
var successful: Bool {
switch self {
case .success:
return true
case .failure:
return false
}
}
}