Skip to content

Commit 566b4e1

Browse files
authored
Add AsyncStream-based API to AsyncProcess (#7830)
Existing `AsyncProcess` closure-based API for `stdout` and `stderr` consumption requires a lot of tedious and error-prone ceremony for use of `async`. One needs to manually create `AsyncStream`s and their continuations, feed the output via the continuations, spawn a task group and consume the output in separate tasks, while launching the process at the same time. We should provide a simpler `popen`-like API that allows passing `async` closures that receive ready for iteration `AsyncStream`s as arguments.
1 parent d2a0167 commit 566b4e1

File tree

2 files changed

+132
-20
lines changed

2 files changed

+132
-20
lines changed

Sources/Basics/AsyncProcess.swift

+83-9
Original file line numberDiff line numberDiff line change
@@ -173,20 +173,31 @@ package final class AsyncProcess {
173173
case stdinUnavailable
174174
}
175175

176-
package typealias OutputStream = AsyncStream<[UInt8]>
176+
package typealias ReadableStream = AsyncStream<[UInt8]>
177177

178-
package enum OutputRedirection {
178+
package enum OutputRedirection: Sendable {
179179
/// Do not redirect the output
180180
case none
181-
/// Collect stdout and stderr output and provide it back via ProcessResult object. If redirectStderr is true,
182-
/// stderr be redirected to stdout.
181+
182+
/// Collect stdout and stderr output and provide it back via ``AsyncProcessResult`` object. If
183+
/// `redirectStderr` is `true`, `stderr` be redirected to `stdout`.
183184
case collect(redirectStderr: Bool)
184-
/// Stream stdout and stderr via the corresponding closures. If redirectStderr is true, stderr be redirected to
185-
/// stdout.
185+
186+
/// Stream `stdout` and `stderr` via the corresponding closures. If `redirectStderr` is `true`, `stderr` will
187+
/// be redirected to `stdout`.
186188
case stream(stdout: OutputClosure, stderr: OutputClosure, redirectStderr: Bool)
187189

190+
/// Stream stdout and stderr as `AsyncSequence` provided as an argument to closures passed to
191+
/// ``AsyncProcess/launch(stdoutStream:stderrStream:)``.
192+
case asyncStream(
193+
stdoutStream: ReadableStream,
194+
stdoutContinuation: ReadableStream.Continuation,
195+
stderrStream: ReadableStream,
196+
stderrContinuation: ReadableStream.Continuation
197+
)
198+
188199
/// Default collect OutputRedirection that defaults to not redirect stderr. Provided for API compatibility.
189-
package static let collect: OutputRedirection = .collect(redirectStderr: false)
200+
package static let collect: Self = .collect(redirectStderr: false)
190201

191202
/// Default stream OutputRedirection that defaults to not redirect stderr. Provided for API compatibility.
192203
package static func stream(stdout: @escaping OutputClosure, stderr: @escaping OutputClosure) -> Self {
@@ -197,15 +208,19 @@ package final class AsyncProcess {
197208
switch self {
198209
case .none:
199210
false
200-
case .collect, .stream:
211+
case .collect, .stream, .asyncStream:
201212
true
202213
}
203214
}
204215

205216
package var outputClosures: (stdoutClosure: OutputClosure, stderrClosure: OutputClosure)? {
206217
switch self {
207-
case .stream(let stdoutClosure, let stderrClosure, _):
218+
case let .stream(stdoutClosure, stderrClosure, _):
208219
(stdoutClosure: stdoutClosure, stderrClosure: stderrClosure)
220+
221+
case let .asyncStream(stdoutStream, stdoutContinuation, stderrStream, stderrContinuation):
222+
(stdoutClosure: { stdoutContinuation.yield($0) }, stderrClosure: { stderrContinuation.yield($0) })
223+
209224
case .collect, .none:
210225
nil
211226
}
@@ -946,6 +961,65 @@ extension AsyncProcess {
946961
try await self.popen(arguments: args, environment: environment, loggingHandler: loggingHandler)
947962
}
948963

964+
package typealias DuplexStreamHandler =
965+
@Sendable (_ stdinStream: WritableByteStream, _ stdoutStream: ReadableStream) async throws -> ()
966+
package typealias ReadableStreamHandler =
967+
@Sendable (_ stderrStream: ReadableStream) async throws -> ()
968+
969+
/// Launches a new `AsyncProcess` instances, allowing the caller to consume `stdout` and `stderr` output
970+
/// with handlers that support structured concurrency.
971+
/// - Parameters:
972+
/// - arguments: CLI command used to launch the process.
973+
/// - environment: environment variables passed to the launched process.
974+
/// - loggingHandler: handler used for logging,
975+
/// - stdoutHandler: asynchronous bidirectional handler closure that receives `stdin` and `stdout` streams as
976+
/// arguments.
977+
/// - stderrHandler: asynchronous unidirectional handler closure that receives `stderr` stream as an argument.
978+
/// - Returns: ``AsyncProcessResult`` value as received from the underlying ``AsyncProcess/waitUntilExit()`` call
979+
/// made on ``AsyncProcess`` instance.
980+
package static func popen(
981+
arguments: [String],
982+
environment: Environment = .current,
983+
loggingHandler: LoggingHandler? = .none,
984+
stdoutHandler: @escaping DuplexStreamHandler,
985+
stderrHandler: ReadableStreamHandler? = nil
986+
) async throws -> AsyncProcessResult {
987+
let (stdoutStream, stdoutContinuation) = ReadableStream.makeStream()
988+
let (stderrStream, stderrContinuation) = ReadableStream.makeStream()
989+
990+
let process = AsyncProcess(
991+
arguments: arguments,
992+
environment: environment,
993+
outputRedirection: .stream {
994+
stdoutContinuation.yield($0)
995+
} stderr: {
996+
stderrContinuation.yield($0)
997+
},
998+
loggingHandler: loggingHandler
999+
)
1000+
1001+
return try await withThrowingTaskGroup(of: Void.self) { group in
1002+
let stdinStream = try process.launch()
1003+
1004+
group.addTask {
1005+
try await stdoutHandler(stdinStream, stdoutStream)
1006+
}
1007+
1008+
if let stderrHandler {
1009+
group.addTask {
1010+
try await stderrHandler(stderrStream)
1011+
}
1012+
}
1013+
1014+
defer {
1015+
stdoutContinuation.finish()
1016+
stderrContinuation.finish()
1017+
}
1018+
1019+
return try await process.waitUntilExit()
1020+
}
1021+
}
1022+
9491023
/// Execute a subprocess and get its (UTF-8) output if it has a non zero exit.
9501024
///
9511025
/// - Parameters:

Tests/BasicsTests/AsyncProcessTests.swift

+49-11
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,8 @@ final class AsyncProcessTests: XCTestCase {
396396
}
397397

398398
func testAsyncStream() async throws {
399-
let (stdoutStream, stdoutContinuation) = AsyncProcess.OutputStream.makeStream()
400-
let (stderrStream, stderrContinuation) = AsyncProcess.OutputStream.makeStream()
399+
let (stdoutStream, stdoutContinuation) = AsyncProcess.ReadableStream.makeStream()
400+
let (stderrStream, stderrContinuation) = AsyncProcess.ReadableStream.makeStream()
401401

402402
let process = AsyncProcess(
403403
scriptName: "echo",
@@ -408,15 +408,15 @@ final class AsyncProcessTests: XCTestCase {
408408
}
409409
)
410410

411-
try await withThrowingTaskGroup(of: Void.self) { group in
411+
let result = try await withThrowingTaskGroup(of: Void.self) { group in
412412
let stdin = try process.launch()
413413

414414
group.addTask {
415415
var counter = 0
416416
stdin.write("Hello \(counter)\n")
417417
stdin.flush()
418418

419-
for try await output in stdoutStream {
419+
for await output in stdoutStream {
420420
XCTAssertEqual(output, .init("Hello \(counter)\n".utf8))
421421
counter += 1
422422

@@ -431,9 +431,8 @@ final class AsyncProcessTests: XCTestCase {
431431

432432
group.addTask {
433433
var counter = 0
434-
for try await output in stderrStream {
434+
for await output in stderrStream {
435435
counter += 1
436-
XCTAssertTrue(output.isEmpty)
437436
}
438437

439438
XCTAssertEqual(counter, 0)
@@ -444,8 +443,43 @@ final class AsyncProcessTests: XCTestCase {
444443
stderrContinuation.finish()
445444
}
446445

447-
try await process.waitUntilExit()
446+
return try await process.waitUntilExit()
448447
}
448+
449+
XCTAssertEqual(result.exitStatus, .terminated(code: 0))
450+
}
451+
452+
func testAsyncStreamHighLevelAPI() async throws {
453+
let result = try await AsyncProcess.popen(
454+
scriptName: "echo",
455+
stdout: { stdin, stdout in
456+
var counter = 0
457+
stdin.write("Hello \(counter)\n")
458+
stdin.flush()
459+
460+
for await output in stdout {
461+
XCTAssertEqual(output, .init("Hello \(counter)\n".utf8))
462+
counter += 1
463+
464+
stdin.write(.init("Hello \(counter)\n".utf8))
465+
stdin.flush()
466+
}
467+
468+
XCTAssertEqual(counter, 5)
469+
470+
try stdin.close()
471+
},
472+
stderr: { stderr in
473+
var counter = 0
474+
for await output in stderr {
475+
counter += 1
476+
}
477+
478+
XCTAssertEqual(counter, 0)
479+
}
480+
)
481+
482+
XCTAssertEqual(result.exitStatus, .terminated(code: 0))
449483
}
450484
}
451485

@@ -466,9 +500,7 @@ extension AsyncProcess {
466500
)
467501
}
468502

469-
#if compiler(>=5.8)
470503
@available(*, noasync)
471-
#endif
472504
fileprivate static func checkNonZeroExit(
473505
scriptName: String,
474506
environment: Environment = .current,
@@ -494,9 +526,7 @@ extension AsyncProcess {
494526
)
495527
}
496528

497-
#if compiler(>=5.8)
498529
@available(*, noasync)
499-
#endif
500530
@discardableResult
501531
fileprivate static func popen(
502532
scriptName: String,
@@ -515,4 +545,12 @@ extension AsyncProcess {
515545
) async throws -> AsyncProcessResult {
516546
try await self.popen(arguments: [self.script(scriptName)], environment: .current, loggingHandler: loggingHandler)
517547
}
548+
549+
fileprivate static func popen(
550+
scriptName: String,
551+
stdout: @escaping AsyncProcess.DuplexStreamHandler,
552+
stderr: AsyncProcess.ReadableStreamHandler? = nil
553+
) async throws -> AsyncProcessResult {
554+
try await self.popen(arguments: [self.script(scriptName)], stdoutHandler: stdout, stderrHandler: stderr)
555+
}
518556
}

0 commit comments

Comments
 (0)