Skip to content

Commit f562e3c

Browse files
authored
adds Requester API with Decoder/Encoder to ReactiveSwift (#61)
Signed-off-by: David Nadoba <[email protected]>
1 parent 04f1fb8 commit f562e3c

29 files changed

+307
-129
lines changed

Sources/Examples/TimerClient/main.swift

+6-14
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,6 @@ import RSocketNIOChannel
66
import RSocketReactiveSwift
77
import RSocketWSTransport
88

9-
func route(_ route: String) -> Data {
10-
let encodedRoute = Data(route.utf8)
11-
precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded")
12-
let encodedRouteLength = Data([UInt8(encodedRoute.count)])
13-
14-
return encodedRouteLength + encodedRoute
15-
}
16-
179
extension URL: ExpressibleByArgument {
1810
public init?(argument: String) {
1911
guard let url = URL(string: argument) else { return nil }
@@ -43,12 +35,12 @@ struct TimerClientExample: ParsableCommand {
4335
)
4436

4537
let client = try bootstrap.connect(to: .init(url: url)).first()!.get()
46-
47-
try client.requester.requestStream(payload: Payload(
48-
metadata: route("timer"),
49-
data: Data()
50-
))
51-
.map() { String.init(decoding: $0.data, as: UTF8.self) }
38+
try client.requester.build(RequestStream {
39+
Encoder()
40+
.encodeStaticMetadata("timer", using: RoutingEncoder())
41+
Decoder()
42+
.mapData { String(decoding: $0, as: UTF8.self) }
43+
}, request: Data())
5244
.logEvents(identifier: "route.timer")
5345
.take(first: limit)
5446
.wait()

Sources/Examples/TwitterClient/main.swift

+15-20
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,6 @@ import RSocketNIOChannel
66
import RSocketReactiveSwift
77
import RSocketWSTransport
88

9-
func route(_ route: String) -> Data {
10-
let encodedRoute = Data(route.utf8)
11-
precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded")
12-
let encodedRouteLength = Data([UInt8(encodedRoute.count)])
13-
14-
return encodedRouteLength + encodedRoute
15-
}
16-
179
extension URL: ExpressibleByArgument {
1810
public init?(argument: String) {
1911
guard let url = URL(string: argument) else { return nil }
@@ -40,23 +32,26 @@ struct TwitterClientExample: ParsableCommand {
4032
func run() throws {
4133
let bootstrap = ClientBootstrap(
4234
transport: WSTransport(),
43-
config: ClientConfiguration.mobileToServer
35+
config: .mobileToServer
4436
.set(\.encoding.metadata, to: .messageXRSocketRoutingV0)
4537
.set(\.encoding.data, to: .applicationJson)
4638
)
4739

4840
let client = try bootstrap.connect(to: .init(url: url)).first()!.get()
49-
50-
try client.requester.requestStream(payload: Payload(
51-
metadata: route("searchTweets"),
52-
data: Data(searchString.utf8)
53-
))
54-
.attemptMap { payload -> String in
55-
// pretty print json
56-
let json = try JSONSerialization.jsonObject(with: payload.data, options: [])
57-
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
58-
return String(decoding: data, as: UTF8.self)
59-
}
41+
try client.requester.build(RequestStream {
42+
Encoder()
43+
.encodeStaticMetadata("searchTweets", using: RoutingEncoder())
44+
.mapData { (string: String) in
45+
Data(string.utf8)
46+
}
47+
Decoder()
48+
.mapData { data -> String in
49+
// pretty print json
50+
let json = try JSONSerialization.jsonObject(with: data, options: [])
51+
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
52+
return String(decoding: data, as: UTF8.self)
53+
}
54+
}, request: searchString)
6055
.logEvents(identifier: "route.searchTweets")
6156
.take(first: limit)
6257
.wait()

Sources/Examples/VanillaClient/main.swift

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ struct VanillaClientExample: ParsableCommand {
2222

2323
let client = try bootstrap.connect(to: .init(host: host, port: port)).first()!.get()
2424

25-
let streamProducer = client.requester.requestStream(payload: .empty)
26-
let requestProducer = client.requester.requestResponse(payload: Payload(data: Data("HelloWorld".utf8)))
25+
let streamProducer = client.requester.build(
26+
RequestStream(),
27+
request: Data()
28+
)
29+
let requestProducer = client.requester.build(RequestResponse(), request: Data("HelloWorld".utf8))
2730

2831
streamProducer.logEvents(identifier: "stream1").take(first: 1).start()
2932
streamProducer.logEvents(identifier: "stream3").take(first: 10).start()

Sources/RSocketCore/Channel Handler/ConnectionEstablishment.swift

+5-23
Original file line numberDiff line numberDiff line change
@@ -44,27 +44,7 @@ public struct SetupInfo {
4444
/// Token used for client resume identification
4545
public let resumeIdentificationToken: Data?
4646

47-
/**
48-
MIME Type for encoding of Metadata
49-
50-
This SHOULD be a US-ASCII string that includes the Internet media type specified in RFC 2045.
51-
Many are registered with IANA such as CBOR.
52-
Suffix rules MAY be used for handling layout.
53-
For example, `application/x.netflix+cbor` or `application/x.reactivesocket+cbor` or `application/x.netflix+json`.
54-
The string MUST NOT be null terminated.
55-
*/
56-
public let metadataEncodingMimeType: String
57-
58-
/**
59-
MIME Type for encoding of Data
60-
61-
This SHOULD be a US-ASCII string that includes the Internet media type specified in RFC 2045.
62-
Many are registered with IANA such as CBOR.
63-
Suffix rules MAY be used for handling layout.
64-
For example, `application/x.netflix+cbor` or `application/x.reactivesocket+cbor` or `application/x.netflix+json`.
65-
The string MUST NOT be null terminated.
66-
*/
67-
public let dataEncodingMimeType: String
47+
public let encoding: ConnectionEncoding
6848

6949
/// Payload of this frame describing connection capabilities of the endpoint sending the Setup header
7050
public let payload: Payload
@@ -114,8 +94,10 @@ extension SetupInfo {
11494
self.timeBetweenKeepaliveFrames = setup.timeBetweenKeepaliveFrames
11595
self.maxLifetime = setup.maxLifetime
11696
self.resumeIdentificationToken = setup.resumeIdentificationToken
117-
self.metadataEncodingMimeType = setup.metadataEncodingMimeType
118-
self.dataEncodingMimeType = setup.dataEncodingMimeType
97+
self.encoding = .init(
98+
metadata: .init(rawValue: setup.metadataEncodingMimeType),
99+
data: .init(rawValue: setup.dataEncodingMimeType)
100+
)
119101
self.payload = setup.payload
120102
}
121103
}

Sources/RSocketCore/ChannelPipeline.swift

+9-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,13 @@ extension ChannelPipeline {
4646
self?.writeAndFlush(NIOAny(frame), promise: nil)
4747
}
4848
let promise = eventLoop.makePromise(of: Void.self)
49-
let requester = Requester(streamIdGenerator: .client, eventLoop: eventLoop, sendFrame: sendFrame)
49+
let requester = Requester(
50+
streamIdGenerator: .client,
51+
encoding: config.encoding,
52+
eventLoop: eventLoop,
53+
sendFrame: sendFrame
54+
)
55+
let responder = responder ?? DefaultRSocket(encoding: config.encoding)
5056
promise.futureResult.map { requester as RSocket }.cascade(to: connectedPromise)
5157
let (timeBetweenKeepaliveFrames, maxLifetime): (Int32, Int32)
5258
do {
@@ -108,14 +114,14 @@ extension ChannelPipeline {
108114
FrameEncoderHandler(maximumFrameSize: maximumFrameSize),
109115
ConnectionStateHandler(),
110116
ConnectionEstablishmentHandler(initializeConnection: { [unowned self] (info, channel) in
111-
let responder = makeResponder?(info)
117+
let responder = makeResponder?(info) ?? DefaultRSocket(encoding: info.encoding)
112118
let sendFrame: (Frame) -> () = { [weak self] frame in
113119
self?.writeAndFlush(NIOAny(frame), promise: nil)
114120
}
115121
return channel.pipeline.addHandlers([
116122
DemultiplexerHandler(
117123
connectionSide: .server,
118-
requester: Requester(streamIdGenerator: .server, eventLoop: eventLoop, sendFrame: sendFrame),
124+
requester: Requester(streamIdGenerator: .server, encoding: info.encoding, eventLoop: eventLoop, sendFrame: sendFrame),
119125
responder: Responder(responderSocket: responder, eventLoop: eventLoop, sendFrame: sendFrame)
120126
),
121127
KeepaliveHandler(timeBetweenKeepaliveFrames: info.timeBetweenKeepaliveFrames, maxLifetime: info.maxLifetime, connectionSide: ConnectionRole.server),

Sources/RSocketCore/Client/ClientBootstrap.swift

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public protocol ClientBootstrap {
2121
associatedtype Responder
2222
associatedtype Transport: TransportChannelHandler
2323

24+
var config: ClientConfiguration { get }
25+
2426
/// Creates a new connection to the given `endpoint`.
2527
/// - Parameters:
2628
/// - endpoint: endpoint to connect to

Sources/RSocketCore/Client/ClientConfiguration.swift

+2-23
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,6 @@ public struct ClientConfiguration {
5757
}
5858
}
5959

60-
/// encoding configuration of metadata and data which is send to the server during setup
61-
public struct Encoding {
62-
63-
/// default encoding uses `.octetStream` for metadata and data
64-
public static let `default` = Encoding()
65-
66-
/// MIME Type for encoding of Metadata
67-
public var metadata: MIMEType
68-
69-
/// MIME Type for encoding of Data
70-
public var data: MIMEType
71-
72-
public init(
73-
metadata: MIMEType = .default,
74-
data: MIMEType = .default
75-
) {
76-
self.metadata = metadata
77-
self.data = data
78-
}
79-
}
80-
8160
/// local fragmentation configuration which are **not** send to the server
8261
public struct Fragmentation {
8362

@@ -109,14 +88,14 @@ public struct ClientConfiguration {
10988
public var timeout: Timeout
11089

11190
/// encoding configuration of metadata and data which is send to the server during setup
112-
public var encoding: Encoding
91+
public var encoding: ConnectionEncoding
11392

11493
/// local fragmentation configuration which are **not** send to the server
11594
public var fragmentation: Fragmentation
11695

11796
public init(
11897
timeout: Timeout,
119-
encoding: Encoding = .default,
98+
encoding: ConnectionEncoding = .default,
12099
fragmentation: Fragmentation = .default
121100
) {
122101
self.timeout = timeout

Sources/RSocketCore/DefaultRSocket.swift

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ fileprivate final class NoOpStream: UnidirectionalStream {
2929

3030
/// An RSocket which rejects all incoming requests (requestResponse, stream and channel) and ignores metadataPush and fireAndForget events.
3131
internal struct DefaultRSocket: RSocket {
32+
let encoding: ConnectionEncoding
3233
func metadataPush(metadata: Data) {}
3334
func fireAndForget(payload: Payload) {}
3435
func requestResponse(payload: Payload, responderStream: UnidirectionalStream) -> Cancellable {

Sources/RSocketCore/Extensions/Coder/Decoder/Decoder.swift

+10
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,15 @@ public struct Decoder: DecoderProtocol {
3838
}
3939
}
4040

41+
extension DecoderProtocol where Metadata == Void {
42+
@inlinable
43+
public mutating func decode(
44+
_ payload: Payload,
45+
encoding: ConnectionEncoding
46+
) throws -> Data {
47+
try decode(payload, encoding: encoding).1
48+
}
49+
}
50+
4151
/// Namespace for types conforming to the ``DecoderProtocol`` protocol
4252
public enum Decoders {}

Sources/RSocketCore/Extensions/Coder/Encoder/Encoder.swift

+10
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,15 @@ public struct Encoder: EncoderProtocol {
3636
}
3737
}
3838

39+
extension EncoderProtocol where Metadata == Void {
40+
@inlinable
41+
public mutating func encode(
42+
_ data: Data,
43+
encoding: ConnectionEncoding
44+
) throws -> Payload {
45+
try encode(metadata: (), data: data, encoding: encoding)
46+
}
47+
}
48+
3949
/// Namespace for types conforming to the ``EncoderProtocol`` protocol
4050
public enum Encoders {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2015-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
import NIO
19+
20+
public struct OctetStreamMetadataEncoder: MetadataEncoder {
21+
public typealias Metadata = Data?
22+
23+
@inlinable
24+
public init() {}
25+
26+
@inlinable
27+
public var mimeType: MIMEType { .applicationOctetStream }
28+
29+
@inlinable
30+
public func encode(_ metadata: Data?, into buffer: inout ByteBuffer) throws {
31+
guard let metadata = metadata else { return }
32+
buffer.writeData(metadata)
33+
}
34+
}
35+
36+
extension MetadataEncoder where Self == OctetStreamMetadataEncoder {
37+
@inlinable
38+
public static var octetStream: Self { .init() }
39+
}

Sources/RSocketCore/Extensions/RoutingDecoder.swift

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ import NIO
1818

1919
public struct RoutingDecoder: MetadataDecoder {
2020
public typealias Metadata = RouteMetadata
21-
21+
22+
@inlinable
23+
public init() {}
24+
2225
@inlinable
2326
public var mimeType: MIMEType { .messageXRSocketRoutingV0 }
2427

Sources/RSocketCore/Extensions/RoutingEncoder.swift

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import NIO
1818

1919
public struct RoutingEncoder: MetadataEncoder {
2020
public typealias Metadata = RouteMetadata
21+
22+
@inlinable
23+
public init() {}
2124

2225
@inlinable
2326
public var mimeType: MIMEType { .messageXRSocketRoutingV0 }

Sources/RSocketCore/RSocket.swift

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import Foundation
1818

1919
public protocol RSocket {
20+
var encoding: ConnectionEncoding { get }
2021
func metadataPush(metadata: Data)
2122

2223
func fireAndForget(payload: Payload)

0 commit comments

Comments
 (0)