Skip to content

Commit 7b6fe17

Browse files
committed
fix possible data race
1 parent 0777c80 commit 7b6fe17

File tree

1 file changed

+21
-17
lines changed

1 file changed

+21
-17
lines changed

Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift

+21-17
Original file line numberDiff line numberDiff line change
@@ -349,23 +349,25 @@ private struct LambdaHttpServer {
349349
private final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
350350
typealias Element = T
351351

352-
private let _buffer = Mutex<CircularBuffer<T>>(.init())
353-
private let _continuation = Mutex<CheckedContinuation<T, any Error>?>(nil)
352+
private let mutex = Mutex<(CircularBuffer<T>, CheckedContinuation<T, any Error>?)>((.init(), nil))
354353

355354
/// retrieve the first element from the buffer
356-
public func popFirst() async -> T? {
357-
self._buffer.withLock { $0.popFirst() }
355+
public func popFirst() -> T? {
356+
self.mutex.withLock { $0.0.popFirst() }
358357
}
359358

360359
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
361360
public func push(_ invocation: T) async {
362-
// if the iterator is waiting for an element, give it to it
363-
// otherwise, enqueue the element
364-
if let continuation = self._continuation.withLock({ $0 }) {
365-
self._continuation.withLock { $0 = nil }
366-
continuation.resume(returning: invocation)
367-
} else {
368-
self._buffer.withLock { $0.append(invocation) }
361+
self.mutex.withLock { mutexContent in
362+
var (_buffer, _continuation) = mutexContent
363+
// if the iterator is waiting for an element, give it to it
364+
// otherwise, enqueue the element
365+
if let continuation = _continuation {
366+
continuation.resume(returning: invocation)
367+
_continuation = nil
368+
} else {
369+
_buffer.append(invocation)
370+
}
369371
}
370372
}
371373

@@ -376,15 +378,17 @@ private struct LambdaHttpServer {
376378
return nil
377379
}
378380

379-
if let element = await self.popFirst() {
381+
if let element = self.popFirst() {
382+
// if there is an element in the buffer, dequeue it
380383
return element
381384
} else {
382385
// we can't return nil if there is nothing to dequeue otherwise the async for loop will stop
383-
// wait for an element to be enqueued
384-
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
385-
// store the continuation for later, when an element is enqueued
386-
self._continuation.withLock {
387-
$0 = continuation
386+
// so, wait for an element to be enqueued
387+
return try await withCheckedThrowingContinuation {
388+
(continuation: CheckedContinuation<T, any Error>) in
389+
self.mutex.withLock { mutexContent in
390+
// store the continuation for later, when an element is enqueued
391+
mutexContent.1 = continuation
388392
}
389393
}
390394
}

0 commit comments

Comments
 (0)