Skip to content

Commit 1a13cc1

Browse files
authored
Add whereType transformer (dart-archive/stream_transform#60)
This needs to be implemented as it's own class instead of using one of the helpers like `fromHandlers` because it has unique generic type requirements. We want to avoid requring the call site to re-specify the generic type of the input stream. The return type needs to be a `StreamTransformer<Null, R>` to satisfy static checking. However the argument to `bind` at runtime will never be a `Stream<Null>` so we need to define it explicitly to widen the allowed argument.
1 parent 4254811 commit 1a13cc1

File tree

6 files changed

+114
-1
lines changed

6 files changed

+114
-1
lines changed

pkgs/stream_transform/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.0.15
2+
3+
- Add `whereType`.
4+
15
## 0.0.14+1
26

37
- Allow using non-dev Dart 2 SDK.

pkgs/stream_transform/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ being a real subscriber.
6363
# throttle
6464

6565
Blocks events for a duration after an event is successfully emitted.
66+
67+
# whereType
68+
69+
Like `Iterable.whereType` for a stream.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
/// Emits only the events which have type [R].
8+
///
9+
/// If the source stream is a broadcast stream the result will be as well.
10+
///
11+
/// Errors from the source stream are forwarded directly to the result stream.
12+
///
13+
/// The static type of the returned transformer takes `Null` so that it can
14+
/// satisfy the subtype requirements for the `stream.transform()` argument on
15+
/// any source stream. The argument to `bind` has been broadened to take
16+
/// `Stream<Object>` since it will never be passed a `Stream<Null>` at runtime.
17+
/// This is safe to use on any source stream.
18+
///
19+
/// [R] should be a subtype of the stream's generic type, otherwise nothing of
20+
/// type [R] could possibly be emitted, however there is no static or runtime
21+
/// checking that this is the case.
22+
StreamTransformer<Null, R> whereType<R>() => _WhereType<R>();
23+
24+
class _WhereType<R> extends StreamTransformerBase<Null, R> {
25+
@override
26+
Stream<R> bind(Stream<Object> source) {
27+
var controller = source.isBroadcast
28+
? StreamController<R>.broadcast(sync: true)
29+
: StreamController<R>(sync: true);
30+
31+
StreamSubscription<Object> subscription;
32+
controller.onListen = () {
33+
if (subscription != null) return;
34+
subscription = source.listen(
35+
(value) {
36+
if (value is R) controller.add(value);
37+
},
38+
onError: controller.addError,
39+
onDone: () {
40+
subscription = null;
41+
controller.close();
42+
});
43+
if (!source.isBroadcast) {
44+
controller.onPause = subscription.pause;
45+
controller.onResume = subscription.resume;
46+
}
47+
controller.onCancel = () {
48+
subscription?.cancel();
49+
subscription = null;
50+
};
51+
};
52+
return controller.stream;
53+
}
54+
}

pkgs/stream_transform/lib/stream_transform.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ export 'src/switch.dart';
1919
export 'src/take_until.dart';
2020
export 'src/tap.dart';
2121
export 'src/throttle.dart';
22+
export 'src/where_type.dart';

pkgs/stream_transform/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: stream_transform
22
description: A collection of utilities to transform and manipulate streams.
33
author: Dart Team <[email protected]>
44
homepage: https://www.github.com/dart-lang/stream_transform
5-
version: 0.0.15-dev
5+
version: 0.0.15
66

77
environment:
88
sdk: ">=2.1.0 <3.0.0"
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:test/test.dart';
8+
9+
import 'package:stream_transform/stream_transform.dart';
10+
11+
void main() {
12+
test('forwards only events that match the type', () async {
13+
var values = Stream.fromIterable([1, 'a', 2, 'b']);
14+
var filtered = values.transform(whereType<String>());
15+
expect(await filtered.toList(), ['a', 'b']);
16+
});
17+
18+
test('can result in empty stream', () async {
19+
var values = Stream.fromIterable([1, 2, 3, 4]);
20+
var filtered = values.transform(whereType<String>());
21+
expect(await filtered.isEmpty, true);
22+
});
23+
24+
test('forwards values to multiple listeners', () async {
25+
var values = StreamController.broadcast();
26+
var filtered = values.stream.transform(whereType<String>());
27+
var firstValues = [];
28+
var secondValues = [];
29+
filtered..listen(firstValues.add)..listen(secondValues.add);
30+
values..add(1)..add('a')..add(2)..add('b');
31+
await Future(() {});
32+
expect(firstValues, ['a', 'b']);
33+
expect(secondValues, ['a', 'b']);
34+
});
35+
36+
test('closes streams with multiple listeners', () async {
37+
var values = StreamController.broadcast();
38+
var filtered = values.stream.transform(whereType<String>());
39+
var firstDone = false;
40+
var secondDone = false;
41+
filtered
42+
..listen(null, onDone: () => firstDone = true)
43+
..listen(null, onDone: () => secondDone = true);
44+
values.add(1);
45+
values.add('a');
46+
await values.close();
47+
expect(firstDone, true);
48+
expect(secondDone, true);
49+
});
50+
}

0 commit comments

Comments
 (0)