Skip to content

Vendor swift-async-algorithms #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

--swiftversion 5.7
--exclude .build
--exclude Sources/_AsyncMergeSequence

# format options

Expand Down
9 changes: 9 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ This product contains derivations of the Lock and LockedValueBox implementations
* https://github.com/apple/swift-nio

---

This product uses swift-async-algorithms.

* LICENSE (Apache License 2.0):
* https://www.apache.org/licenses/LICENSE-2.0
* HOMEPAGE:
* https://github.com/apple/swift-async-algorithms

---
18 changes: 12 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ let package = Package(
from: "1.0.0"
),
.package(
url: "https://github.com/apple/swift-async-algorithms",
from: "0.1.0"
url: "https://github.com/apple/swift-collections.git",
from: "1.0.0"
),
],
targets: [
Expand All @@ -46,10 +46,7 @@ let package = Package(
name: "Logging",
package: "swift-log"
),
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
.target(name: "_AsyncMergeSequence"),
.target(name: "UnixSignals"),
.target(name: "ConcurrencyHelpers"),
]
Expand All @@ -69,6 +66,15 @@ let package = Package(
.target(
name: "ConcurrencyHelpers"
),
.target(
name: "_AsyncMergeSequence",
dependencies: [
.product(
name: "DequeModule",
package: "swift-collections"
),
]
),
.testTarget(
name: "ServiceLifecycleTests",
dependencies: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import AsyncAlgorithms
import _AsyncMergeSequence

extension AsyncSequence where Self: Sendable, Element: Sendable {
/// Creates an asynchronous sequence that is cancelled once graceful shutdown has triggered.
Expand Down
155 changes: 155 additions & 0 deletions Sources/_AsyncMergeSequence/Locking.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftServiceLifecycle open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if canImport(Darwin)
@_implementationOnly import Darwin
#elseif canImport(Glibc)
@_implementationOnly import Glibc
#elseif canImport(WinSDK)
@_implementationOnly import WinSDK
#endif

internal struct Lock {
#if canImport(Darwin)
typealias Primitive = os_unfair_lock
#elseif canImport(Glibc)
typealias Primitive = pthread_mutex_t
#elseif canImport(WinSDK)
typealias Primitive = SRWLOCK
#endif

typealias PlatformLock = UnsafeMutablePointer<Primitive>
let platformLock: PlatformLock

private init(_ platformLock: PlatformLock) {
self.platformLock = platformLock
}

fileprivate static func initialize(_ platformLock: PlatformLock) {
#if canImport(Darwin)
platformLock.initialize(to: os_unfair_lock())
#elseif canImport(Glibc)
let result = pthread_mutex_init(platformLock, nil)
precondition(result == 0, "pthread_mutex_init failed")
#elseif canImport(WinSDK)
InitializeSRWLock(platformLock)
#endif
}

fileprivate static func deinitialize(_ platformLock: PlatformLock) {
#if canImport(Glibc)
let result = pthread_mutex_destroy(platformLock)
precondition(result == 0, "pthread_mutex_destroy failed")
#endif
platformLock.deinitialize(count: 1)
}

fileprivate static func lock(_ platformLock: PlatformLock) {
#if canImport(Darwin)
os_unfair_lock_lock(platformLock)
#elseif canImport(Glibc)
pthread_mutex_lock(platformLock)
#elseif canImport(WinSDK)
AcquireSRWLockExclusive(platformLock)
#endif
}

fileprivate static func unlock(_ platformLock: PlatformLock) {
#if canImport(Darwin)
os_unfair_lock_unlock(platformLock)
#elseif canImport(Glibc)
let result = pthread_mutex_unlock(platformLock)
precondition(result == 0, "pthread_mutex_unlock failed")
#elseif canImport(WinSDK)
ReleaseSRWLockExclusive(platformLock)
#endif
}

static func allocate() -> Lock {
let platformLock = PlatformLock.allocate(capacity: 1)
initialize(platformLock)
return Lock(platformLock)
}

func deinitialize() {
Lock.deinitialize(platformLock)
}

func lock() {
Lock.lock(platformLock)
}

func unlock() {
Lock.unlock(platformLock)
}

/// Acquire the lock for the duration of the given block.
///
/// This convenience method should be preferred to `lock` and `unlock` in
/// most situations, as it ensures that the lock will be released regardless
/// of how `body` exits.
///
/// - Parameter body: The block to execute while holding the lock.
/// - Returns: The value returned by the block.
func withLock<T>(_ body: () throws -> T) rethrows -> T {
self.lock()
defer {
self.unlock()
}
return try body()
}

// specialise Void return (for performance)
func withLockVoid(_ body: () throws -> Void) rethrows -> Void {
try self.withLock(body)
}
}

struct ManagedCriticalState<State> {
private final class LockedBuffer: ManagedBuffer<State, Lock.Primitive> {
deinit {
withUnsafeMutablePointerToElements { Lock.deinitialize($0) }
}
}

private let buffer: ManagedBuffer<State, Lock.Primitive>

init(_ initial: State) {
buffer = LockedBuffer.create(minimumCapacity: 1) { buffer in
buffer.withUnsafeMutablePointerToElements { Lock.initialize($0) }
return initial
}
}

func withCriticalRegion<R>(_ critical: (inout State) throws -> R) rethrows -> R {
try buffer.withUnsafeMutablePointers { header, lock in
Lock.lock(lock)
defer { Lock.unlock(lock) }
return try critical(&header.pointee)
}
}
}

extension ManagedCriticalState: @unchecked Sendable where State: Sendable { }
108 changes: 108 additions & 0 deletions Sources/_AsyncMergeSequence/Merge/AsyncMerge2Sequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftServiceLifecycle open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

@_implementationOnly import DequeModule

/// Creates an asynchronous sequence of elements from two underlying asynchronous sequences
public func merge<Base1: AsyncSequence, Base2: AsyncSequence>(_ base1: Base1, _ base2: Base2) -> AsyncMerge2Sequence<Base1, Base2>
where
Base1.Element == Base2.Element,
Base1: Sendable, Base2: Sendable,
Base1.Element: Sendable
{
return AsyncMerge2Sequence(base1, base2)
}

/// An ``Swift/AsyncSequence`` that takes two upstream ``Swift/AsyncSequence``s and combines their elements.
public struct AsyncMerge2Sequence<
Base1: AsyncSequence,
Base2: AsyncSequence
>: Sendable where
Base1.Element == Base2.Element,
Base1: Sendable, Base2: Sendable,
Base1.Element: Sendable
{
public typealias Element = Base1.Element

private let base1: Base1
private let base2: Base2

/// Initializes a new ``AsyncMerge2Sequence``.
///
/// - Parameters:
/// - base1: The first upstream ``Swift/AsyncSequence``.
/// - base2: The second upstream ``Swift/AsyncSequence``.
public init(
_ base1: Base1,
_ base2: Base2
) {
self.base1 = base1
self.base2 = base2
}
}

extension AsyncMerge2Sequence: AsyncSequence {
public func makeAsyncIterator() -> AsyncIterator {
let storage = MergeStorage<Base1, Base2, Base1>(
base1: base1,
base2: base2,
base3: nil
)
return AsyncIterator(storage: storage)
}
}

extension AsyncMerge2Sequence {
public struct AsyncIterator: AsyncIteratorProtocol {
/// This class is needed to hook the deinit to observe once all references to the ``AsyncIterator`` are dropped.
///
/// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself.
final class InternalClass: Sendable {
private let storage: MergeStorage<Base1, Base2, Base1>

fileprivate init(storage: MergeStorage<Base1, Base2, Base1>) {
self.storage = storage
}

deinit {
self.storage.iteratorDeinitialized()
}

func next() async rethrows -> Element? {
try await storage.next()
}
}

let internalClass: InternalClass

fileprivate init(storage: MergeStorage<Base1, Base2, Base1>) {
internalClass = InternalClass(storage: storage)
}

public mutating func next() async rethrows -> Element? {
try await internalClass.next()
}
}
}
Loading