Skip to content

Commit 7d61d34

Browse files
Isolate stream: replace existing stream implementation (#8)
1 parent f8e648c commit 7d61d34

File tree

4 files changed

+54
-81
lines changed

4 files changed

+54
-81
lines changed

benchmark/bin/query.dart

+1-13
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ void main() async {
88
await QueryFind().report();
99
await QueryFindIds().report();
1010
await QueryStream().report();
11-
await QueryStreamIsolate().report();
1211
}
1312

1413
class QueryBenchmark extends DbBenchmark {
@@ -57,21 +56,10 @@ class QueryFindIds extends QueryBenchmark {
5756
Future<void> run() async => query.findIds();
5857
}
5958

60-
/// Stream where visitor is running in native code.
59+
/// Stream where visitor is running in Dart isolate.
6160
class QueryStream extends QueryBenchmark {
6261
QueryStream() : super('${QueryStream}');
6362

6463
@override
6564
Future<void> run() async => await query.stream().toList();
6665
}
67-
68-
/// Stream where visitor is running in Dart isolate.
69-
class QueryStreamIsolate extends QueryBenchmark {
70-
QueryStreamIsolate() : super('${QueryStreamIsolate}');
71-
72-
@override
73-
Future<void> run() async {
74-
var stream = await query.streamAsync();
75-
await stream.toList();
76-
}
77-
}

objectbox/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
Linux, Windows). This is where the [`install.sh`](/install.sh) script downloads it by default.
88
E.g. it is no longer necessary to install the library globally to run `dart test` or `flutter test`.
99
* Windows: Support database directory paths that contain unicode (UTF-8) characters. #406
10+
* Changed `Query.stream` to collect results in a worker isolate, which should typically be faster.
1011
* Update: [objectbox-c 0.16.0](https://github.com/objectbox/objectbox-c/releases/tag/v0.16.0).
1112
* Update: [objectbox-android 3.1.3](https://github.com/objectbox/objectbox-java/releases/tag/V3.1.3).
1213
* Add new [task with tag list Flutter example app](example/flutter/objectbox_demo_relations) that

objectbox/lib/src/native/query/query.dart

+49-57
Original file line numberDiff line numberDiff line change
@@ -818,69 +818,61 @@ class Query<T> {
818818
return result;
819819
}
820820

821-
/// Finds Objects matching the query, streaming them while the query executes.
822-
///
823-
/// Note: make sure you evaluate performance in your use case - streams come
824-
/// with an overhead so a plain [find()] is usually faster.
825-
Stream<T> stream() => _stream1();
826-
827821
/// Finds Objects matching the query, streaming them while the query executes.
828822
///
829823
/// Results are streamed from a worker isolate in batches (the stream still
830824
/// returns objects one by one).
831-
///
832-
/// This is typically faster than [stream()] and even [find()].
833-
@experimental
834-
Stream<T> streamAsync() => _streamIsolate();
825+
Stream<T> stream() => _streamIsolate();
835826

836827
/// Stream items by sending full flatbuffers binary as a message.
837-
Stream<T> _stream1() {
838-
initializeDartAPI();
839-
final port = ReceivePort();
840-
final cStream = checkObxPtr(
841-
C.dartc_query_find(_cQuery, port.sendPort.nativePort), 'query stream');
842-
843-
var closed = false;
844-
final close = () {
845-
if (closed) return;
846-
closed = true;
847-
C.dartc_stream_close(cStream);
848-
port.close();
849-
reachabilityFence(this);
850-
};
851-
852-
try {
853-
final controller = StreamController<T>(onCancel: close);
854-
port.listen((dynamic message) {
855-
// We expect Uint8List for data and NULL when the query has finished.
856-
if (message is Uint8List) {
857-
try {
858-
controller.add(
859-
_entity.objectFromFB(_store, ByteData.view(message.buffer)));
860-
return;
861-
} catch (e) {
862-
controller.addError(e);
863-
}
864-
} else if (message is String) {
865-
controller.addError(
866-
ObjectBoxException('Query stream native exception: $message'));
867-
} else if (message != null) {
868-
controller.addError(ObjectBoxException(
869-
'Query stream received an invalid message type '
870-
'(${message.runtimeType}): $message'));
871-
}
872-
// Close the stream, this will call the onCancel function.
873-
// Do not call the onCancel function manually,
874-
// if cancel() is called on the Stream subscription right afterwards it
875-
// will use the shortcut in the onCancel function and not wait.
876-
controller.close(); // done
877-
});
878-
return controller.stream;
879-
} catch (e) {
880-
close();
881-
rethrow;
882-
}
883-
}
828+
/// Replaced by _streamIsolate which in benchmarks has been faster.
829+
// Stream<T> _stream1() {
830+
// initializeDartAPI();
831+
// final port = ReceivePort();
832+
// final cStream = checkObxPtr(
833+
// C.dartc_query_find(_cQuery, port.sendPort.nativePort), 'query stream');
834+
//
835+
// var closed = false;
836+
// final close = () {
837+
// if (closed) return;
838+
// closed = true;
839+
// C.dartc_stream_close(cStream);
840+
// port.close();
841+
// reachabilityFence(this);
842+
// };
843+
//
844+
// try {
845+
// final controller = StreamController<T>(onCancel: close);
846+
// port.listen((dynamic message) {
847+
// // We expect Uint8List for data and NULL when the query has finished.
848+
// if (message is Uint8List) {
849+
// try {
850+
// controller.add(
851+
// _entity.objectFromFB(_store, ByteData.view(message.buffer)));
852+
// return;
853+
// } catch (e) {
854+
// controller.addError(e);
855+
// }
856+
// } else if (message is String) {
857+
// controller.addError(
858+
// ObjectBoxException('Query stream native exception: $message'));
859+
// } else if (message != null) {
860+
// controller.addError(ObjectBoxException(
861+
// 'Query stream received an invalid message type '
862+
// '(${message.runtimeType}): $message'));
863+
// }
864+
// // Close the stream, this will call the onCancel function.
865+
// // Do not call the onCancel function manually,
866+
// // if cancel() is called on the Stream subscription right afterwards it
867+
// // will use the shortcut in the onCancel function and not wait.
868+
// controller.close(); // done
869+
// });
870+
// return controller.stream;
871+
// } catch (e) {
872+
// close();
873+
// rethrow;
874+
// }
875+
// }
884876

885877
/// Stream items by sending pointers from native code.
886878
/// Interestingly this is slower even though it transfers only pointers...

objectbox/test/query_test.dart

+3-11
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ void main() {
665665
q.close();
666666
});
667667

668-
testStream({required bool useIsolateStream}) async {
668+
test('stream items', () async {
669669
final count = env.short ? 100 : 1000;
670670
final items = List<TestEntity>.generate(
671671
count, (i) => TestEntity.filled(id: 0, tByte: i % 30));
@@ -678,7 +678,7 @@ void main() {
678678
expect(query.count(), countMatching);
679679

680680
final foundIds = query.findIds();
681-
final stream = useIsolateStream ? query.streamAsync() : query.stream();
681+
final stream = query.stream();
682682
final streamed = await stream.toList();
683683
expect(streamed.length, countMatching);
684684
final streamedIds = streamed.map((e) => e.id).toList(growable: false);
@@ -690,7 +690,7 @@ void main() {
690690
final streamListenedItems = <TestEntity>{};
691691

692692
final start = DateTime.now();
693-
final subStream = useIsolateStream ? query.streamAsync() : query.stream();
693+
final subStream = query.stream();
694694
final subscription = subStream.listen(streamListenedItems.add);
695695
// Note: no upper limit, global test timeout will stop if it takes too long.
696696
int millis = 1;
@@ -703,14 +703,6 @@ void main() {
703703
expect(streamListenedItems.length, isNonZero);
704704

705705
query.close();
706-
}
707-
708-
test('stream items', () async {
709-
await testStream(useIsolateStream: false);
710-
});
711-
712-
test('stream items via isolate', () async {
713-
await testStream(useIsolateStream: true);
714706
});
715707

716708
test('set param single', () async {

0 commit comments

Comments
 (0)