From ee545a8e06928ee432a974727f58fc1c38036ba8 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Mon, 11 Feb 2019 09:58:51 -0800 Subject: [PATCH 1/7] Add whereType transformer This needs to be implemented as it's own class instead of using one of the helpers like `fromHandlers` because it has unique generic type requirements. We want to avoid requring the call site to re-specify the generic type of the input stream. The return type needs to be a `StreamTransformer` to allow to satisfy static checking. However the argument to `bind` at runtime will never be a `Stream` so we need to define it explicitly to widen the allowed argument. --- CHANGELOG.md | 4 ++++ README.md | 4 ++++ lib/src/where_type.dart | 47 ++++++++++++++++++++++++++++++++++++ lib/stream_transform.dart | 1 + pubspec.yaml | 2 +- test/where_type_test.dart | 50 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 lib/src/where_type.dart create mode 100644 test/where_type_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 515f0ed..e26b1f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.0.15 + +- Add `whereType`. + ## 0.0.14+1 - Allow using non-dev Dart 2 SDK. diff --git a/README.md b/README.md index 5e54457..89ce15d 100644 --- a/README.md +++ b/README.md @@ -63,3 +63,7 @@ being a real subscriber. # throttle Blocks events for a duration after an event is successfully emitted. + +# whereType + +Like `Iterable.whereType` for a stream. diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart new file mode 100644 index 0000000..9b24157 --- /dev/null +++ b/lib/src/where_type.dart @@ -0,0 +1,47 @@ +// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// Emits only the events which have type [R]. +/// +/// If the source stream is a broadcast stream the result will be as well. +/// +/// Errors from the source stream are forwarded directly to the result stream. +StreamTransformer whereType() => _WhereType(); + +class _WhereType extends StreamTransformerBase { + @override + Stream bind(Stream values) { + var controller = values.isBroadcast + ? StreamController.broadcast(sync: true) + : StreamController(sync: true); + + StreamSubscription subscription; + controller.onListen = () { + if (subscription != null) return; + var valuesDone = false; + subscription = values.listen( + (value) { + if (value is R) controller.add(value); + }, + onError: controller.addError, + onDone: () { + valuesDone = true; + controller.close(); + }); + if (!values.isBroadcast) { + controller.onPause = subscription.pause; + controller.onResume = subscription.resume; + } + controller.onCancel = () { + var toCancel = subscription; + subscription = null; + if (!valuesDone) return toCancel.cancel(); + return null; + }; + }; + return controller.stream; + } +} diff --git a/lib/stream_transform.dart b/lib/stream_transform.dart index 13b80a4..bddb744 100644 --- a/lib/stream_transform.dart +++ b/lib/stream_transform.dart @@ -19,3 +19,4 @@ export 'src/switch.dart'; export 'src/take_until.dart'; export 'src/tap.dart'; export 'src/throttle.dart'; +export 'src/where_type.dart'; diff --git a/pubspec.yaml b/pubspec.yaml index b32a050..a29a467 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -2,7 +2,7 @@ name: stream_transform description: A collection of utilities to transform and manipulate streams. author: Dart Team homepage: https://www.github.com/dart-lang/stream_transform -version: 0.0.15-dev +version: 0.0.15 environment: sdk: ">=2.1.0 <3.0.0" diff --git a/test/where_type_test.dart b/test/where_type_test.dart new file mode 100644 index 0000000..2c6d7ed --- /dev/null +++ b/test/where_type_test.dart @@ -0,0 +1,50 @@ +// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:test/test.dart'; + +import 'package:stream_transform/stream_transform.dart'; + +void main() { + test('forwards only events that match the type', () async { + var values = Stream.fromIterable([1, 'a', 2, 'b']); + var filtered = values.transform(whereType()); + expect(await filtered.toList(), ['a', 'b']); + }); + + test('can result in empty stream', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.transform(whereType()); + expect(await filtered.isEmpty, true); + }); + + test('forwards values to multiple listeners', () async { + var values = StreamController.broadcast(); + var filtered = values.stream.transform(whereType()); + var firstValues = []; + var secondValues = []; + filtered..listen(firstValues.add)..listen(secondValues.add); + values..add(1)..add('a')..add(2)..add('b'); + await Future(() {}); + expect(firstValues, ['a', 'b']); + expect(secondValues, ['a', 'b']); + }); + + test('closes streams with multiple listeners', () async { + var values = StreamController.broadcast(); + var filtered = values.stream.transform(whereType()); + var firstDone = false; + var secondDone = false; + filtered + ..listen(null, onDone: () => firstDone = true) + ..listen(null, onDone: () => secondDone = true); + values.add(1); + values.add('a'); + await values.close(); + expect(firstDone, true); + expect(secondDone, true); + }); +} From 9449f17894d320a3027cf7e65d3ffc0877f643b0 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 12 Feb 2019 10:55:01 -0800 Subject: [PATCH 2/7] Use null state of subscription instead of valuesDone --- lib/src/where_type.dart | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index 9b24157..d7c8305 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -21,14 +21,13 @@ class _WhereType extends StreamTransformerBase { StreamSubscription subscription; controller.onListen = () { if (subscription != null) return; - var valuesDone = false; subscription = values.listen( (value) { if (value is R) controller.add(value); }, onError: controller.addError, onDone: () { - valuesDone = true; + subscription = null; controller.close(); }); if (!values.isBroadcast) { @@ -36,10 +35,8 @@ class _WhereType extends StreamTransformerBase { controller.onResume = subscription.resume; } controller.onCancel = () { - var toCancel = subscription; + subscription?.cancel(); subscription = null; - if (!valuesDone) return toCancel.cancel(); - return null; }; }; return controller.stream; From d003e1f90de9f9e003b539dc5e5969cd6e485430 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 12 Feb 2019 11:11:20 -0800 Subject: [PATCH 3/7] Include note on typing in doc comment --- lib/src/where_type.dart | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index d7c8305..7ad226a 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -9,6 +9,14 @@ import 'dart:async'; /// If the source stream is a broadcast stream the result will be as well. /// /// Errors from the source stream are forwarded directly to the result stream. +/// +/// The static type of the returned transformer takes `Null` so that it can +/// satisfy the subtype requirements for `stream.transform()` argument on any +/// source Stream. The argument to `bind` has been broaded to take +/// `Stream` since it never be passed a `Stream` at runtime. This +/// is safe to use on any source stream and there is no static or runtime +/// checking that [R] is sensible - that is that is a subtype of the stream's +/// type such that some values of that type may be possible. StreamTransformer whereType() => _WhereType(); class _WhereType extends StreamTransformerBase { From 95aa5061417c9872802664f0ba407fc290ba8b82 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 12 Feb 2019 12:02:56 -0800 Subject: [PATCH 4/7] Rephrase commend and rename to input --- lib/src/where_type.dart | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index 7ad226a..2ee207d 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -11,25 +11,27 @@ import 'dart:async'; /// Errors from the source stream are forwarded directly to the result stream. /// /// The static type of the returned transformer takes `Null` so that it can -/// satisfy the subtype requirements for `stream.transform()` argument on any -/// source Stream. The argument to `bind` has been broaded to take -/// `Stream` since it never be passed a `Stream` at runtime. This -/// is safe to use on any source stream and there is no static or runtime -/// checking that [R] is sensible - that is that is a subtype of the stream's -/// type such that some values of that type may be possible. +/// satisfy the subtype requirements for the `stream.transform()` argument on +/// any source stream. The argument to `bind` has been broaded to take +/// `Stream` since it will never be passed a `Stream` at runtime. +/// This is safe to use on any source stream. +/// +/// [R] should b a subtype of the stream's generic type otherwise nothing of +/// type [R] could possibly be emitted, however there is no static or runtime +/// checking that this is the case. StreamTransformer whereType() => _WhereType(); class _WhereType extends StreamTransformerBase { @override - Stream bind(Stream values) { - var controller = values.isBroadcast + Stream bind(Stream input) { + var controller = input.isBroadcast ? StreamController.broadcast(sync: true) : StreamController(sync: true); StreamSubscription subscription; controller.onListen = () { if (subscription != null) return; - subscription = values.listen( + subscription = input.listen( (value) { if (value is R) controller.add(value); }, @@ -38,7 +40,7 @@ class _WhereType extends StreamTransformerBase { subscription = null; controller.close(); }); - if (!values.isBroadcast) { + if (!input.isBroadcast) { controller.onPause = subscription.pause; controller.onResume = subscription.resume; } From a319ebc84475f360f7dee501b19dc0a3392b13bb Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 12 Feb 2019 13:18:01 -0800 Subject: [PATCH 5/7] rename to source --- lib/src/where_type.dart | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index 2ee207d..843ab1f 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -23,15 +23,15 @@ StreamTransformer whereType() => _WhereType(); class _WhereType extends StreamTransformerBase { @override - Stream bind(Stream input) { - var controller = input.isBroadcast + Stream bind(Stream source) { + var controller = source.isBroadcast ? StreamController.broadcast(sync: true) : StreamController(sync: true); StreamSubscription subscription; controller.onListen = () { if (subscription != null) return; - subscription = input.listen( + subscription = source.listen( (value) { if (value is R) controller.add(value); }, @@ -40,7 +40,7 @@ class _WhereType extends StreamTransformerBase { subscription = null; controller.close(); }); - if (!input.isBroadcast) { + if (!source.isBroadcast) { controller.onPause = subscription.pause; controller.onResume = subscription.resume; } From 7b52759943bde71116bcc556686716e66759cbb5 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 12 Feb 2019 15:29:06 -0800 Subject: [PATCH 6/7] spelling --- lib/src/where_type.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index 843ab1f..2680a46 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -12,7 +12,7 @@ import 'dart:async'; /// /// The static type of the returned transformer takes `Null` so that it can /// satisfy the subtype requirements for the `stream.transform()` argument on -/// any source stream. The argument to `bind` has been broaded to take +/// any source stream. The argument to `bind` has been broadened to take /// `Stream` since it will never be passed a `Stream` at runtime. /// This is safe to use on any source stream. /// From 59ce371473e0b016b8ad68bedc68ed5344b7fbe4 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 12 Feb 2019 15:29:54 -0800 Subject: [PATCH 7/7] More typos --- lib/src/where_type.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index 2680a46..96eb394 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -16,7 +16,7 @@ import 'dart:async'; /// `Stream` since it will never be passed a `Stream` at runtime. /// This is safe to use on any source stream. /// -/// [R] should b a subtype of the stream's generic type otherwise nothing of +/// [R] should be a subtype of the stream's generic type, otherwise nothing of /// type [R] could possibly be emitted, however there is no static or runtime /// checking that this is the case. StreamTransformer whereType() => _WhereType();