Skip to content

Commit b643907

Browse files
author
Anna Gringauze
authored
Batch evaluation requests when possible (#1746)
* Batch expression evaluations from the same library * Update changelog * Fix tests failures * Destroy isolate on app connection exit * Fix failure to compile debug extension and client with dart2js
1 parent f6f000c commit b643907

12 files changed

+348
-7
lines changed

dwds/CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
## 16.0.1-dev
22

3-
- Allow `LoadStrategy.serverPathForModule` and `LoadStrategy.sourceMapPathForModule`
4-
to return `null` and add error handling.
3+
- Allow the following API to return `null` and add error handling:
4+
- `LoadStrategy.serverPathForModule`
5+
- `LoadStrategy.sourceMapPathForModule`
6+
- Expression evaluation performance improvement:
7+
- Batch `ChromeProxyService.evaluate()` requests that are close in time
8+
and are executed in the same library and scope.
59

610
## 16.0.0
711

dwds/debug_extension/web/background.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import 'package:dwds/data/devtools_request.dart';
2020
import 'package:dwds/data/extension_request.dart';
2121
import 'package:dwds/data/serializers.dart';
2222
import 'package:dwds/src/sockets.dart';
23-
import 'package:dwds/src/utilities/batched_stream.dart';
23+
// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart'
24+
// makes dart2js skip creating background.js, so we use a copy instead.
25+
// import 'package:dwds/src/utilities/batched_stream.dart';
26+
import 'package:dwds/src/web_utilities/batched_stream.dart';
2427
import 'package:js/js.dart';
2528
import 'package:js/js_util.dart' as js_util;
2629
import 'package:pub_semver/pub_semver.dart';

dwds/lib/src/connections/app_connection.dart

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@ class AppConnection {
1515
/// The initial connection request sent from the application in the browser.
1616
final ConnectRequest request;
1717
final _startedCompleter = Completer<void>();
18+
final _doneCompleter = Completer<void>();
1819
final SocketConnection _connection;
1920

20-
AppConnection(this.request, this._connection);
21+
AppConnection(this.request, this._connection) {
22+
unawaited(_connection.sink.done.then((v) => _doneCompleter.complete()));
23+
}
2124

2225
bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod;
2326
void shutDown() => _connection.shutdown();
2427
bool get isStarted => _startedCompleter.isCompleted;
2528
Future<void> get onStart => _startedCompleter.future;
29+
bool get isDone => _doneCompleter.isCompleted;
30+
Future<void> get onDone => _doneCompleter.future;
2631

2732
void runMain() {
2833
if (_startedCompleter.isCompleted) {

dwds/lib/src/injected/client.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:collection/collection.dart';
8+
import 'package:dwds/src/utilities/domain.dart';
9+
import 'package:logging/logging.dart';
10+
import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
11+
12+
import '../debugging/debugger.dart';
13+
import '../debugging/location.dart';
14+
import '../debugging/modules.dart';
15+
import '../utilities/batched_stream.dart';
16+
import 'expression_compiler.dart';
17+
import 'expression_evaluator.dart';
18+
19+
class EvaluateRequest {
20+
final String isolateId;
21+
final String? libraryUri;
22+
final String expression;
23+
final Map<String, String>? scope;
24+
final completer = Completer<RemoteObject>();
25+
26+
EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope);
27+
}
28+
29+
class BatchedExpressionEvaluator extends ExpressionEvaluator {
30+
final _logger = Logger('BatchedExpressionEvaluator');
31+
final Debugger _debugger;
32+
final _requestController =
33+
BatchedStreamController<EvaluateRequest>(delay: 200);
34+
35+
BatchedExpressionEvaluator(
36+
String entrypoint,
37+
AppInspectorInterface inspector,
38+
this._debugger,
39+
Locations locations,
40+
Modules modules,
41+
ExpressionCompiler compiler,
42+
) : super(entrypoint, inspector, _debugger, locations, modules, compiler) {
43+
_requestController.stream.listen(_processRequest);
44+
}
45+
46+
@override
47+
void close() {
48+
_logger.fine('Closed');
49+
_requestController.close();
50+
}
51+
52+
@override
53+
Future<RemoteObject> evaluateExpression(
54+
String isolateId,
55+
String? libraryUri,
56+
String expression,
57+
Map<String, String>? scope,
58+
) {
59+
final request = EvaluateRequest(isolateId, libraryUri, expression, scope);
60+
_requestController.sink.add(request);
61+
return request.completer.future;
62+
}
63+
64+
void _processRequest(List<EvaluateRequest> requests) async {
65+
String? libraryUri;
66+
String? isolateId;
67+
Map<String, String>? scope;
68+
List<EvaluateRequest> currentRequests = [];
69+
70+
for (var request in requests) {
71+
libraryUri ??= request.libraryUri;
72+
isolateId ??= request.isolateId;
73+
scope ??= request.scope;
74+
75+
if (libraryUri != request.libraryUri ||
76+
isolateId != request.isolateId ||
77+
!MapEquality().equals(scope, request.scope)) {
78+
_logger.fine('New batch due to');
79+
if (libraryUri != request.libraryUri) {
80+
_logger.fine(' - library uri: $libraryUri != ${request.libraryUri}');
81+
}
82+
if (isolateId != request.isolateId) {
83+
_logger.fine(' - isolateId: $isolateId != ${request.isolateId}');
84+
}
85+
if (!MapEquality().equals(scope, request.scope)) {
86+
_logger.fine(' - scope: $scope != ${request.scope}');
87+
}
88+
89+
unawaited(_evaluateBatch(currentRequests));
90+
currentRequests = [];
91+
libraryUri = request.libraryUri;
92+
isolateId = request.isolateId;
93+
scope = request.scope;
94+
}
95+
currentRequests.add(request);
96+
}
97+
unawaited(_evaluateBatch(currentRequests));
98+
}
99+
100+
Future<void> _evaluateBatch(List<EvaluateRequest> requests) async {
101+
if (requests.isEmpty) return;
102+
103+
final first = requests.first;
104+
if (requests.length == 1) {
105+
if (first.completer.isCompleted) return;
106+
return super
107+
.evaluateExpression(
108+
first.isolateId, first.libraryUri, first.expression, first.scope)
109+
.then(requests.first.completer.complete);
110+
}
111+
112+
final expressions = requests.map((r) => r.expression).join(', ');
113+
final batchedExpression = '[ $expressions ]';
114+
115+
_logger.fine('Evaluating batch of expressions $batchedExpression');
116+
117+
final RemoteObject list = await super.evaluateExpression(
118+
first.isolateId, first.libraryUri, batchedExpression, first.scope);
119+
120+
for (var i = 0; i < requests.length; i++) {
121+
final request = requests[i];
122+
if (request.completer.isCompleted) continue;
123+
_logger.fine('Getting result out of a batch for ${request.expression}');
124+
_debugger
125+
.getProperties(list.objectId!,
126+
offset: i, count: 1, length: requests.length)
127+
.then((v) {
128+
final result = v.first.value;
129+
_logger.fine(
130+
'Got result out of a batch for ${request.expression}: $result');
131+
request.completer.complete(result);
132+
});
133+
}
134+
}
135+
}

dwds/lib/src/services/chrome_proxy_service.dart

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import '../utilities/dart_uri.dart';
3030
import '../utilities/sdk_configuration.dart';
3131
import '../utilities/shared.dart';
3232
import 'expression_evaluator.dart';
33+
import 'batched_expression_evaluator.dart';
3334

3435
/// A proxy from the chrome debug protocol to the dart vm service protocol.
3536
class ChromeProxyService implements VmServiceInterface {
@@ -241,7 +242,7 @@ class ChromeProxyService implements VmServiceInterface {
241242
final compiler = _compiler;
242243
_expressionEvaluator = compiler == null
243244
? null
244-
: ExpressionEvaluator(
245+
: BatchedExpressionEvaluator(
245246
entrypoint,
246247
inspector,
247248
debugger,
@@ -259,6 +260,8 @@ class ChromeProxyService implements VmServiceInterface {
259260
_startedCompleter.complete();
260261
}));
261262

263+
unawaited(appConnection.onDone.then((_) => destroyIsolate()));
264+
262265
final isolateRef = inspector.isolateRef;
263266
final timestamp = DateTime.now().millisecondsSinceEpoch;
264267

@@ -301,6 +304,7 @@ class ChromeProxyService implements VmServiceInterface {
301304
///
302305
/// Clears out the [_inspector] and all related cached information.
303306
void destroyIsolate() {
307+
_logger.fine('Destroying isolate');
304308
if (!_isIsolateRunning) return;
305309
final isolate = inspector.isolate;
306310
final isolateRef = inspector.isolateRef;
@@ -318,6 +322,7 @@ class ChromeProxyService implements VmServiceInterface {
318322
_inspector = null;
319323
_previousBreakpoints.clear();
320324
_previousBreakpoints.addAll(isolate.breakpoints ?? []);
325+
_expressionEvaluator?.close();
321326
_consoleSubscription?.cancel();
322327
_consoleSubscription = null;
323328
}

dwds/lib/src/services/expression_evaluator.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
44

5+
import 'dart:async';
6+
57
import 'package:dwds/src/utilities/domain.dart';
68
import 'package:logging/logging.dart';
79
import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
@@ -61,6 +63,8 @@ class ExpressionEvaluator {
6163
<String, String>{'type': '$severity', 'value': message});
6264
}
6365

66+
void close() {}
67+
6468
/// Evaluate dart expression inside a given library.
6569
///
6670
/// Uses ExpressionCompiler interface to compile the expression to
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'package:async/async.dart';
7+
8+
/// Stream controller allowing to batch events.
9+
class BatchedStreamController<T> {
10+
static const _defaultBatchDelayMilliseconds = 1000;
11+
static const _checkDelayMilliseconds = 100;
12+
13+
final int _batchDelayMilliseconds;
14+
15+
final StreamController<T> _inputController;
16+
late StreamQueue<T> _inputQueue;
17+
18+
final StreamController<List<T>> _outputController;
19+
final Completer<bool> _completer = Completer<bool>();
20+
21+
/// Create batched stream controller.
22+
///
23+
/// Collects events from input [sink] and emits them in batches to the
24+
/// output [stream] every [delay] milliseconds. Keeps the original order.
25+
BatchedStreamController({
26+
int delay = _defaultBatchDelayMilliseconds,
27+
}) : _batchDelayMilliseconds = delay,
28+
_inputController = StreamController<T>(),
29+
_outputController = StreamController<List<T>>() {
30+
_inputQueue = StreamQueue<T>(_inputController.stream);
31+
unawaited(_batchAndSendEvents());
32+
}
33+
34+
/// Sink collecting events.
35+
StreamSink<T> get sink => _inputController.sink;
36+
37+
/// Output stream of batch events.
38+
Stream<List<T>> get stream => _outputController.stream;
39+
40+
/// Close the controller.
41+
Future<dynamic> close() async {
42+
unawaited(_inputController.close());
43+
return _completer.future.then((value) => _outputController.close());
44+
}
45+
46+
/// Send events to the output in a batch every [_batchDelayMilliseconds].
47+
Future<void> _batchAndSendEvents() async {
48+
const duration = Duration(milliseconds: _checkDelayMilliseconds);
49+
final buffer = <T>[];
50+
51+
// Batch events every `_batchDelayMilliseconds`.
52+
//
53+
// Note that events might arrive at random intervals, so collecting
54+
// a predetermined number of events to send in a batch might delay
55+
// the batch indefinitely. Instead, check for new events every
56+
// `_checkDelayMilliseconds` to make sure batches are sent in regular
57+
// intervals.
58+
var lastSendTime = DateTime.now().millisecondsSinceEpoch;
59+
while (await _hasEventOrTimeOut(duration)) {
60+
if (await _hasEventDuring(duration)) {
61+
buffer.add(await _inputQueue.next);
62+
}
63+
64+
final now = DateTime.now().millisecondsSinceEpoch;
65+
if (now > lastSendTime + _batchDelayMilliseconds) {
66+
lastSendTime = now;
67+
if (buffer.isNotEmpty) {
68+
_outputController.sink.add(List.from(buffer));
69+
buffer.clear();
70+
}
71+
}
72+
}
73+
74+
if (buffer.isNotEmpty) {
75+
_outputController.sink.add(List.from(buffer));
76+
}
77+
_completer.complete(true);
78+
}
79+
80+
Future<bool> _hasEventOrTimeOut(Duration duration) =>
81+
_inputQueue.hasNext.timeout(duration, onTimeout: () => true);
82+
83+
Future<bool> _hasEventDuring(Duration duration) =>
84+
_inputQueue.hasNext.timeout(duration, onTimeout: () => false);
85+
}

dwds/test/evaluate_common.dart

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,25 @@ void testAll({
539539

540540
tearDown(() async {});
541541

542+
test('in parallel (in a batch)', () async {
543+
final library = isolate.rootLib!;
544+
final evaluation1 = setup.service
545+
.evaluate(isolateId, library.id!, 'MainClass(0).toString()');
546+
final evaluation2 = setup.service
547+
.evaluate(isolateId, library.id!, 'MainClass(1).toString()');
548+
549+
final results = await Future.wait([evaluation1, evaluation2]);
550+
expect(
551+
results[0],
552+
const TypeMatcher<InstanceRef>().having(
553+
(instance) => instance.valueAsString, 'valueAsString', '0'));
554+
555+
expect(
556+
results[1],
557+
const TypeMatcher<InstanceRef>().having(
558+
(instance) => instance.valueAsString, 'valueAsString', '1'));
559+
});
560+
542561
test('with scope override', () async {
543562
final library = isolate.rootLib!;
544563
final object = await setup.service

0 commit comments

Comments
 (0)