Skip to content

Commit 304808a

Browse files
authored
[Multipart] Add a raw parts to frames serializer sequence. (#75)
1 parent 927f930 commit 304808a

File tree

2 files changed

+363
-0
lines changed

2 files changed

+363
-0
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftOpenAPIGenerator open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import HTTPTypes
16+
import Foundation
17+
18+
/// A sequence that serializes raw multipart parts into multipart frames.
19+
struct MultipartRawPartsToFramesSequence<Upstream: AsyncSequence & Sendable>: Sendable
20+
where Upstream.Element == MultipartRawPart {
21+
22+
/// The source of raw parts.
23+
var upstream: Upstream
24+
}
25+
26+
extension MultipartRawPartsToFramesSequence: AsyncSequence {
27+
28+
/// The type of element produced by this asynchronous sequence.
29+
typealias Element = MultipartFrame
30+
31+
/// Creates the asynchronous iterator that produces elements of this
32+
/// asynchronous sequence.
33+
///
34+
/// - Returns: An instance of the `AsyncIterator` type used to produce
35+
/// elements of the asynchronous sequence.
36+
func makeAsyncIterator() -> Iterator { Iterator(upstream: upstream.makeAsyncIterator()) }
37+
38+
/// An iterator that pulls raw parts from the upstream iterator and provides
39+
/// multipart frames.
40+
struct Iterator: AsyncIteratorProtocol {
41+
42+
/// The iterator that provides the raw parts.
43+
var upstream: Upstream.AsyncIterator
44+
45+
/// The underlying parts to frames serializer.
46+
var serializer: Serializer
47+
48+
/// Creates a new iterator.
49+
/// - Parameter upstream: The iterator that provides the raw parts.
50+
init(upstream: Upstream.AsyncIterator) {
51+
self.upstream = upstream
52+
self.serializer = .init(upstream: upstream)
53+
}
54+
55+
/// Asynchronously advances to the next element and returns it, or ends the
56+
/// sequence if there is no next element.
57+
///
58+
/// - Returns: The next element, if it exists, or `nil` to signal the end of
59+
/// the sequence.
60+
mutating func next() async throws -> Element? { try await serializer.next() }
61+
}
62+
}
63+
64+
extension MultipartRawPartsToFramesSequence {
65+
66+
/// A state machine representing the raw part to frame serializer.
67+
struct StateMachine {
68+
69+
/// The possible states of the state machine.
70+
enum State {
71+
72+
/// Has not emitted any frames yet.
73+
case initial
74+
75+
/// Waiting for the next part.
76+
case waitingForPart
77+
78+
/// Returning body chunks from the current part's body.
79+
case streamingBody(HTTPBody.AsyncIterator)
80+
81+
/// Finished, the terminal state.
82+
case finished
83+
}
84+
85+
/// The current state of the state machine.
86+
private(set) var state: State
87+
88+
/// Creates a new state machine.
89+
init() { self.state = .initial }
90+
91+
/// An action returned by the `next` method.
92+
enum NextAction {
93+
94+
/// Return nil to the caller, no more parts.
95+
case returnNil
96+
97+
/// Fetch the next part.
98+
case fetchPart
99+
100+
/// Fetch the next body chunk from the provided iterator.
101+
case fetchBodyChunk(HTTPBody.AsyncIterator)
102+
}
103+
104+
/// Read the next part from the upstream frames.
105+
/// - Returns: An action to perform.
106+
mutating func next() -> NextAction {
107+
switch state {
108+
case .initial:
109+
state = .waitingForPart
110+
return .fetchPart
111+
case .streamingBody(let iterator): return .fetchBodyChunk(iterator)
112+
case .finished: return .returnNil
113+
case .waitingForPart: preconditionFailure("Invalid state: \(state)")
114+
}
115+
}
116+
117+
/// An action returned by the `receivedPart` method.
118+
enum ReceivedPartAction: Hashable {
119+
120+
/// Return nil to the caller, no more frames.
121+
case returnNil
122+
123+
/// Return the provided header fields.
124+
case emitHeaderFields(HTTPFields)
125+
}
126+
127+
/// Ingest the provided part.
128+
/// - Parameter part: A new part. If `nil`, then the source of parts is finished.
129+
/// - Returns: An action to perform.
130+
mutating func receivedPart(_ part: MultipartRawPart?) -> ReceivedPartAction {
131+
switch state {
132+
case .waitingForPart:
133+
if let part {
134+
state = .streamingBody(part.body.makeAsyncIterator())
135+
return .emitHeaderFields(part.headerFields)
136+
} else {
137+
state = .finished
138+
return .returnNil
139+
}
140+
case .finished: return .returnNil
141+
case .initial, .streamingBody: preconditionFailure("Invalid state: \(state)")
142+
}
143+
}
144+
145+
/// An action returned by the `receivedBodyChunk` method.
146+
enum ReceivedBodyChunkAction: Hashable {
147+
148+
/// Return nil to the caller, no more frames.
149+
case returnNil
150+
151+
/// Fetch the next part.
152+
case fetchPart
153+
154+
/// Return the provided body chunk.
155+
case emitBodyChunk(ArraySlice<UInt8>)
156+
}
157+
158+
/// Ingest the provided part.
159+
/// - Parameter bodyChunk: A new body chunk. If `nil`, then the current part's body is finished.
160+
/// - Returns: An action to perform.
161+
mutating func receivedBodyChunk(_ bodyChunk: ArraySlice<UInt8>?) -> ReceivedBodyChunkAction {
162+
switch state {
163+
case .streamingBody:
164+
if let bodyChunk {
165+
return .emitBodyChunk(bodyChunk)
166+
} else {
167+
state = .waitingForPart
168+
return .fetchPart
169+
}
170+
case .finished: return .returnNil
171+
case .initial, .waitingForPart: preconditionFailure("Invalid state: \(state)")
172+
}
173+
}
174+
}
175+
}
176+
177+
extension MultipartRawPartsToFramesSequence {
178+
179+
/// A serializer of multipart raw parts into multipart frames.
180+
struct Serializer {
181+
182+
/// The upstream source of raw parts.
183+
private var upstream: Upstream.AsyncIterator
184+
185+
/// The underlying state machine.
186+
private var stateMachine: StateMachine
187+
188+
/// Creates a new iterator.
189+
/// - Parameter upstream: The upstream source of raw parts.
190+
init(upstream: Upstream.AsyncIterator) {
191+
self.upstream = upstream
192+
self.stateMachine = .init()
193+
}
194+
195+
/// Requests the next frame.
196+
/// - Returns: A frame.
197+
/// - Throws: When a serialization error is encountered.
198+
mutating func next() async throws -> MultipartFrame? {
199+
func handleFetchPart() async throws -> MultipartFrame? {
200+
let part = try await upstream.next()
201+
switch stateMachine.receivedPart(part) {
202+
case .returnNil: return nil
203+
case .emitHeaderFields(let headerFields): return .headerFields(headerFields)
204+
}
205+
}
206+
switch stateMachine.next() {
207+
case .returnNil: return nil
208+
case .fetchPart: return try await handleFetchPart()
209+
case .fetchBodyChunk(var iterator):
210+
let bodyChunk = try await iterator.next()
211+
switch stateMachine.receivedBodyChunk(bodyChunk) {
212+
case .returnNil: return nil
213+
case .fetchPart: return try await handleFetchPart()
214+
case .emitBodyChunk(let bodyChunk): return .bodyChunk(bodyChunk)
215+
}
216+
}
217+
}
218+
}
219+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftOpenAPIGenerator open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
import XCTest
15+
@_spi(Generated) @testable import OpenAPIRuntime
16+
import Foundation
17+
18+
final class Test_MultipartRawPartsToFramesSequence: Test_Runtime {
19+
func test() async throws {
20+
var secondPartChunks = "{}".utf8.makeIterator()
21+
let secondPartBody = HTTPBody(
22+
AsyncStream(unfolding: { secondPartChunks.next().map { ArraySlice([$0]) } }),
23+
length: .unknown
24+
)
25+
let parts: [MultipartRawPart] = [
26+
.init(headerFields: [.contentDisposition: #"form-data; name="name""#], body: "24"),
27+
.init(headerFields: [.contentDisposition: #"form-data; name="info""#], body: secondPartBody),
28+
]
29+
var upstreamIterator = parts.makeIterator()
30+
let upstream = AsyncStream { upstreamIterator.next() }
31+
let sequence = MultipartRawPartsToFramesSequence(upstream: upstream)
32+
33+
var frames: [MultipartFrame] = []
34+
for try await frame in sequence { frames.append(frame) }
35+
let expectedFrames: [MultipartFrame] = [
36+
.headerFields([.contentDisposition: #"form-data; name="name""#]), .bodyChunk(chunkFromString("24")),
37+
.headerFields([.contentDisposition: #"form-data; name="info""#]), .bodyChunk(chunkFromString("{")),
38+
.bodyChunk(chunkFromString("}")),
39+
]
40+
XCTAssertEqual(frames, expectedFrames)
41+
}
42+
}
43+
44+
final class Test_MultipartRawPartsToFramesSequenceSerializer: Test_Runtime {
45+
func test() async throws {
46+
var secondPartChunks = "{}".utf8.makeIterator()
47+
let secondPartBody = HTTPBody(
48+
AsyncStream(unfolding: { secondPartChunks.next().map { ArraySlice([$0]) } }),
49+
length: .unknown
50+
)
51+
let parts: [MultipartRawPart] = [
52+
.init(headerFields: [.contentDisposition: #"form-data; name="name""#], body: "24"),
53+
.init(headerFields: [.contentDisposition: #"form-data; name="info""#], body: secondPartBody),
54+
]
55+
var upstreamIterator = parts.makeIterator()
56+
let upstream = AsyncStream { upstreamIterator.next() }
57+
var serializer = MultipartRawPartsToFramesSequence<AsyncStream<MultipartRawPart>>
58+
.Serializer(upstream: upstream.makeAsyncIterator())
59+
var frames: [MultipartFrame] = []
60+
while let frame = try await serializer.next() { frames.append(frame) }
61+
let expectedFrames: [MultipartFrame] = [
62+
.headerFields([.contentDisposition: #"form-data; name="name""#]), .bodyChunk(chunkFromString("24")),
63+
.headerFields([.contentDisposition: #"form-data; name="info""#]), .bodyChunk(chunkFromString("{")),
64+
.bodyChunk(chunkFromString("}")),
65+
]
66+
XCTAssertEqual(frames, expectedFrames)
67+
}
68+
}
69+
70+
private func newStateMachine() -> MultipartRawPartsToFramesSequence<AsyncStream<MultipartRawPart>>.StateMachine {
71+
.init()
72+
}
73+
74+
final class Test_MultipartRawPartsToFramesSequenceStateMachine: Test_Runtime {
75+
76+
func testTwoParts() throws {
77+
var stateMachine = newStateMachine()
78+
XCTAssertTrue(stateMachine.state.isInitial)
79+
XCTAssertTrue(stateMachine.next().isFetchPart)
80+
XCTAssertTrue(stateMachine.state.isWaitingForPart)
81+
XCTAssertEqual(
82+
stateMachine.receivedPart(
83+
.init(headerFields: [.contentDisposition: #"form-data; name="name""#], body: "24")
84+
),
85+
.emitHeaderFields([.contentDisposition: #"form-data; name="name""#])
86+
)
87+
XCTAssertTrue(stateMachine.state.isStreamingBody)
88+
XCTAssertTrue(stateMachine.next().isFetchBodyChunk)
89+
XCTAssertEqual(stateMachine.receivedBodyChunk(chunkFromString("24")), .emitBodyChunk(chunkFromString("24")))
90+
XCTAssertTrue(stateMachine.state.isStreamingBody)
91+
XCTAssertTrue(stateMachine.next().isFetchBodyChunk)
92+
XCTAssertEqual(stateMachine.receivedBodyChunk(nil), .fetchPart)
93+
XCTAssertEqual(
94+
stateMachine.receivedPart(
95+
.init(headerFields: [.contentDisposition: #"form-data; name="info""#], body: "{}")
96+
),
97+
.emitHeaderFields([.contentDisposition: #"form-data; name="info""#])
98+
)
99+
XCTAssertTrue(stateMachine.state.isStreamingBody)
100+
XCTAssertTrue(stateMachine.next().isFetchBodyChunk)
101+
XCTAssertEqual(stateMachine.receivedBodyChunk(chunkFromString("{")), .emitBodyChunk(chunkFromString("{")))
102+
XCTAssertTrue(stateMachine.state.isStreamingBody)
103+
XCTAssertTrue(stateMachine.next().isFetchBodyChunk)
104+
XCTAssertEqual(stateMachine.receivedBodyChunk(chunkFromString("}")), .emitBodyChunk(chunkFromString("}")))
105+
XCTAssertTrue(stateMachine.state.isStreamingBody)
106+
XCTAssertTrue(stateMachine.next().isFetchBodyChunk)
107+
XCTAssertEqual(stateMachine.receivedBodyChunk(nil), .fetchPart)
108+
XCTAssertEqual(stateMachine.receivedPart(nil), .returnNil)
109+
}
110+
}
111+
112+
extension MultipartRawPartsToFramesSequence.StateMachine.State {
113+
var isInitial: Bool {
114+
guard case .initial = self else { return false }
115+
return true
116+
}
117+
var isWaitingForPart: Bool {
118+
guard case .waitingForPart = self else { return false }
119+
return true
120+
}
121+
var isStreamingBody: Bool {
122+
guard case .streamingBody = self else { return false }
123+
return true
124+
}
125+
var isFinished: Bool {
126+
guard case .finished = self else { return false }
127+
return true
128+
}
129+
}
130+
131+
extension MultipartRawPartsToFramesSequence.StateMachine.NextAction {
132+
var isReturnNil: Bool {
133+
guard case .returnNil = self else { return false }
134+
return true
135+
}
136+
var isFetchPart: Bool {
137+
guard case .fetchPart = self else { return false }
138+
return true
139+
}
140+
var isFetchBodyChunk: Bool {
141+
guard case .fetchBodyChunk = self else { return false }
142+
return true
143+
}
144+
}

0 commit comments

Comments
 (0)