Skip to content

Commit 413ee9d

Browse files
authored
Remove ChannelManager class (flutter#64)
Fixes flutter#55 This class was intended to manage lifecycle around a `StreamChannel`, but in the case of a `Peer` there is a conflict having 3 instances of a `ChannelManager` attempting to manage lifecycle constraints which results in completing the same `Completer` instance twice. Since `Peer` needs a different behavior than `Server` or `Client` the `ChannelManager` abstraction isn't helpful. - Inline the important behavior around completing `done` appropriately for the end of the channel `stream`, or for errors, into `Peer`, `Server`, and `Client`. - Inline the important behavior around closing the channel `sink` and completing `done` when a `Server` or `Client` is closed. `Peer` has different behavior and only forwards to it's `Server` and `Client`. The returned future from `channel.sink.close` is ignored, since this future does not complete in the case where a done even can't be delivered to listeners.
1 parent 46c80d3 commit 413ee9d

File tree

7 files changed

+69
-115
lines changed

7 files changed

+69
-115
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
## 2.2.2-dev
1+
## 2.2.2
2+
3+
* Fix `Peer.close()` throwing `Bad state: Future already completed`.
24

35
## 2.2.1
46

lib/src/channel_manager.dart

Lines changed: 0 additions & 78 deletions
This file was deleted.

lib/src/client.dart

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import 'dart:async';
77
import 'package:stack_trace/stack_trace.dart';
88
import 'package:stream_channel/stream_channel.dart';
99

10-
import 'channel_manager.dart';
1110
import 'exception.dart';
1211
import 'utils.dart';
1312

@@ -17,7 +16,7 @@ import 'utils.dart';
1716
/// those method calls. Methods can be called with [sendRequest], or with
1817
/// [sendNotification] if no response is expected.
1918
class Client {
20-
final ChannelManager _manager;
19+
final StreamChannel<dynamic> _channel;
2120

2221
/// The next request id.
2322
var _id = 0;
@@ -30,19 +29,21 @@ class Client {
3029
/// The map of request ids to pending requests.
3130
final _pendingRequests = <int, _Request>{};
3231

32+
final _done = Completer<void>();
33+
3334
/// Returns a [Future] that completes when the underlying connection is
3435
/// closed.
3536
///
3637
/// This is the same future that's returned by [listen] and [close]. It may
3738
/// complete before [close] is called if the remote endpoint closes the
3839
/// connection.
39-
Future get done => _manager.done;
40+
Future get done => _done.future;
4041

4142
/// Whether the underlying connection is closed.
4243
///
4344
/// Note that this will be `true` before [close] is called if the remote
4445
/// endpoint closes the connection.
45-
bool get isClosed => _manager.isClosed;
46+
bool get isClosed => _done.isCompleted;
4647

4748
/// Creates a [Client] that communicates over [channel].
4849
///
@@ -60,9 +61,8 @@ class Client {
6061
///
6162
/// Note that the client won't begin listening to [responses] until
6263
/// [Client.listen] is called.
63-
Client.withoutJson(StreamChannel channel)
64-
: _manager = ChannelManager('Client', channel) {
65-
_manager.done.whenComplete(() {
64+
Client.withoutJson(this._channel) {
65+
done.whenComplete(() {
6666
for (var request in _pendingRequests.values) {
6767
request.completer.completeError(
6868
StateError(
@@ -81,13 +81,26 @@ class Client {
8181
/// when it has an error. This is the same as [done].
8282
///
8383
/// [listen] may only be called once.
84-
Future listen() => _manager.listen(_handleResponse);
84+
Future listen() {
85+
_channel.stream.listen(_handleResponse, onError: (error, stackTrace) {
86+
_done.completeError(error, stackTrace);
87+
_channel.sink.close();
88+
}, onDone: () {
89+
if (!_done.isCompleted) _done.complete();
90+
close();
91+
});
92+
return done;
93+
}
8594

8695
/// Closes the underlying connection.
8796
///
8897
/// Returns a [Future] that completes when all resources have been released.
8998
/// This is the same as [done].
90-
Future close() => _manager.close();
99+
Future close() {
100+
_channel.sink.close();
101+
if (!_done.isCompleted) _done.complete();
102+
return done;
103+
}
91104

92105
/// Sends a JSON-RPC 2 request to invoke the given [method].
93106
///
@@ -145,7 +158,7 @@ class Client {
145158
if (_batch != null) {
146159
_batch.add(message);
147160
} else {
148-
_manager.add(message);
161+
_channel.sink.add(message);
149162
}
150163
}
151164

@@ -166,7 +179,7 @@ class Client {
166179

167180
_batch = [];
168181
return tryFinally(callback, () {
169-
_manager.add(_batch);
182+
_channel.sink.add(_batch);
170183
_batch = null;
171184
});
172185
}

lib/src/peer.dart

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import 'dart:async';
66

77
import 'package:stream_channel/stream_channel.dart';
88

9-
import 'channel_manager.dart';
109
import 'client.dart';
1110
import 'parameters.dart';
1211
import 'server.dart';
@@ -18,7 +17,7 @@ import 'utils.dart';
1817
/// 2.0 endpoint. It sends both requests and responses across the same
1918
/// communication channel and expects to connect to a peer that does the same.
2019
class Peer implements Client, Server {
21-
final ChannelManager _manager;
20+
final StreamChannel<dynamic> _channel;
2221

2322
/// The underlying client that handles request-sending and response-receiving
2423
/// logic.
@@ -36,10 +35,11 @@ class Peer implements Client, Server {
3635
/// they're responses.
3736
final _clientIncomingForwarder = StreamController(sync: true);
3837

38+
final _done = Completer<void>();
3939
@override
40-
Future get done => _manager.done;
40+
Future get done => _done.future;
4141
@override
42-
bool get isClosed => _manager.isClosed;
42+
bool get isClosed => _done.isCompleted;
4343

4444
@override
4545
ErrorCallback get onUnhandledError => _server?.onUnhandledError;
@@ -81,15 +81,14 @@ class Peer implements Client, Server {
8181
/// some requests which are not conformant with the JSON-RPC 2.0
8282
/// specification. In particular, requests missing the `jsonrpc` parameter
8383
/// will be accepted.
84-
Peer.withoutJson(StreamChannel channel,
85-
{ErrorCallback onUnhandledError, bool strictProtocolChecks = true})
86-
: _manager = ChannelManager('Peer', channel) {
84+
Peer.withoutJson(this._channel,
85+
{ErrorCallback onUnhandledError, bool strictProtocolChecks = true}) {
8786
_server = Server.withoutJson(
88-
StreamChannel(_serverIncomingForwarder.stream, channel.sink),
87+
StreamChannel(_serverIncomingForwarder.stream, _channel.sink),
8988
onUnhandledError: onUnhandledError,
9089
strictProtocolChecks: strictProtocolChecks);
9190
_client = Client.withoutJson(
92-
StreamChannel(_clientIncomingForwarder.stream, channel.sink));
91+
StreamChannel(_clientIncomingForwarder.stream, _channel.sink));
9392
}
9493

9594
// Client methods.
@@ -121,7 +120,7 @@ class Peer implements Client, Server {
121120
Future listen() {
122121
_client.listen();
123122
_server.listen();
124-
return _manager.listen((message) {
123+
_channel.stream.listen((message) {
125124
if (message is Map) {
126125
if (message.containsKey('result') || message.containsKey('error')) {
127126
_clientIncomingForwarder.add(message);
@@ -142,10 +141,16 @@ class Peer implements Client, Server {
142141
// server since it knows how to send error responses.
143142
_serverIncomingForwarder.add(message);
144143
}
145-
}).whenComplete(close);
144+
}, onError: (error, stackTrace) {
145+
_done.completeError(error, stackTrace);
146+
_channel.sink.close();
147+
}, onDone: () {
148+
if (!_done.isCompleted) _done.complete();
149+
close();
150+
});
151+
return done;
146152
}
147153

148154
@override
149-
Future close() =>
150-
Future.wait([_client.close(), _server.close(), _manager.close()]);
155+
Future close() => Future.wait([_client.close(), _server.close()]);
151156
}

lib/src/server.dart

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import 'package:stack_trace/stack_trace.dart';
1010
import 'package:stream_channel/stream_channel.dart';
1111

1212
import '../error_code.dart' as error_code;
13-
import 'channel_manager.dart';
1413
import 'exception.dart';
1514
import 'parameters.dart';
1615
import 'utils.dart';
@@ -29,7 +28,7 @@ typedef ErrorCallback = void Function(dynamic error, dynamic stackTrace);
2928
/// asynchronously, it's possible for multiple methods to be invoked at the same
3029
/// time, or even for a single method to be invoked multiple times at once.
3130
class Server {
32-
final ChannelManager _manager;
31+
final StreamChannel<dynamic> _channel;
3332

3433
/// The methods registered for this server.
3534
final _methods = <String, Function>{};
@@ -40,19 +39,21 @@ class Server {
4039
/// [RpcException.methodNotFound] exception.
4140
final _fallbacks = Queue<Function>();
4241

42+
final _done = Completer<void>();
43+
4344
/// Returns a [Future] that completes when the underlying connection is
4445
/// closed.
4546
///
4647
/// This is the same future that's returned by [listen] and [close]. It may
4748
/// complete before [close] is called if the remote endpoint closes the
4849
/// connection.
49-
Future get done => _manager.done;
50+
Future get done => _done.future;
5051

5152
/// Whether the underlying connection is closed.
5253
///
5354
/// Note that this will be `true` before [close] is called if the remote
5455
/// endpoint closes the connection.
55-
bool get isClosed => _manager.isClosed;
56+
bool get isClosed => _done.isCompleted;
5657

5758
/// A callback that is fired on unhandled exceptions.
5859
///
@@ -102,23 +103,34 @@ class Server {
102103
/// If [strictProtocolChecks] is false, this [Server] will accept some
103104
/// requests which are not conformant with the JSON-RPC 2.0 specification. In
104105
/// particular, requests missing the `jsonrpc` parameter will be accepted.
105-
Server.withoutJson(StreamChannel channel,
106-
{this.onUnhandledError, this.strictProtocolChecks = true})
107-
: _manager = ChannelManager('Server', channel);
106+
Server.withoutJson(this._channel,
107+
{this.onUnhandledError, this.strictProtocolChecks = true});
108108

109109
/// Starts listening to the underlying stream.
110110
///
111111
/// Returns a [Future] that will complete when the connection is closed or
112112
/// when it has an error. This is the same as [done].
113113
///
114114
/// [listen] may only be called once.
115-
Future listen() => _manager.listen(_handleRequest);
115+
Future listen() {
116+
_channel.stream.listen(_handleRequest, onError: (error, stackTrace) {
117+
_done.completeError(error, stackTrace);
118+
_channel.sink.close();
119+
}, onDone: () {
120+
if (!_done.isCompleted) _done.complete();
121+
});
122+
return done;
123+
}
116124

117125
/// Closes the underlying connection.
118126
///
119127
/// Returns a [Future] that completes when all resources have been released.
120128
/// This is the same as [done].
121-
Future close() => _manager.close();
129+
Future close() {
130+
_channel.sink.close();
131+
if (!_done.isCompleted) _done.complete();
132+
return done;
133+
}
122134

123135
/// Registers a method named [name] on this server.
124136
///
@@ -177,7 +189,7 @@ class Server {
177189
if (response == null) return;
178190
}
179191

180-
if (!isClosed) _manager.add(response);
192+
if (!isClosed) _channel.sink.add(response);
181193
}
182194

183195
/// Handles an individual parsed request.

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: json_rpc_2
2-
version: 2.2.2-dev
2+
version: 2.2.2
33
description: >-
44
Utilities to write a client or server using the JSON-RPC 2.0 spec.
55
homepage: https://github.com/dart-lang/json_rpc_2

test/peer_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void main() {
111111
var peer = json_rpc.Peer.withoutJson(channel);
112112
unawaited(peer.listen());
113113
await peer.close();
114-
}, skip: 'https://github.com/dart-lang/json_rpc_2/issues/55');
114+
});
115115

116116
group('like a server,', () {
117117
test('can receive a call and return a response', () {

0 commit comments

Comments
 (0)