Skip to content

Catch exceptions in unawaited Futures #1938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dwds/lib/src/connections/app_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import '../../data/connect_request.dart';
import '../../data/run_request.dart';
import '../../data/serializers.dart';
import '../handlers/socket_connections.dart';
import '../utilities/shared.dart';

/// A connection between the application loaded in the browser and DWDS.
class AppConnection {
Expand All @@ -19,7 +20,7 @@ class AppConnection {
final SocketConnection _connection;

AppConnection(this.request, this._connection) {
unawaited(_connection.sink.done.then((v) => _doneCompleter.complete()));
safeUnawaited(_connection.sink.done.then((v) => _doneCompleter.complete()));
}

bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod;
Expand Down
12 changes: 7 additions & 5 deletions dwds/lib/src/handlers/dev_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import '../servers/extension_backend.dart';
import '../services/app_debug_services.dart';
import '../services/debug_service.dart';
import '../services/expression_compiler.dart';
import '../utilities/shared.dart';
import 'injector.dart';

/// When enabled, this logs VM service protocol and Chrome debug protocol
Expand Down Expand Up @@ -161,7 +162,7 @@ class DevHandler {
.takeUntilGap(const Duration(milliseconds: 50));
// We enqueue this work as we need to begin listening (`.hasNext`)
// before events are received.
unawaited(Future.microtask(() => connection.runtime.enable()));
safeUnawaited(Future.microtask(() => connection.runtime.enable()));

await for (var contextId in contextIds) {
final result = await connection.sendCommand('Runtime.evaluate', {
Expand All @@ -177,7 +178,7 @@ class DevHandler {
}
}
if (appTab != null) break;
unawaited(connection.close());
safeUnawaited(connection.close());
}
if (appTab == null || tabConnection == null || executionContext == null) {
throw AppConnectionException(
Expand Down Expand Up @@ -236,7 +237,7 @@ class DevHandler {
await _chromeConnection(), appConnection);
appServices = await _createAppDebugServices(
appConnection.request.appId, debugService);
unawaited(appServices.chromeProxyService.remoteDebugger.onClose.first
safeUnawaited(appServices.chromeProxyService.remoteDebugger.onClose.first
.whenComplete(() async {
await appServices?.close();
_servicesByAppId.remove(appConnection.request.appId);
Expand Down Expand Up @@ -303,7 +304,7 @@ class DevHandler {
}
});

unawaited(injectedConnection.sink.done.then((_) async {
safeUnawaited(injectedConnection.sink.done.then((_) async {
_injectedConnections.remove(injectedConnection);
final connection = appConnection;
if (connection != null) {
Expand Down Expand Up @@ -535,7 +536,8 @@ class DevHandler {
);
final encodedUri = await debugService.encodedUri;
extensionDebugger.sendEvent('dwds.encodedUri', encodedUri);
unawaited(appServices.chromeProxyService.remoteDebugger.onClose.first
safeUnawaited(appServices
.chromeProxyService.remoteDebugger.onClose.first
.whenComplete(() async {
appServices?.chromeProxyService.destroyIsolate();
await appServices?.close();
Expand Down
7 changes: 4 additions & 3 deletions dwds/lib/src/services/batched_expression_evaluator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import '../debugging/debugger.dart';
import '../debugging/location.dart';
import '../debugging/modules.dart';
import '../utilities/batched_stream.dart';
import '../utilities/shared.dart';
import 'expression_compiler.dart';
import 'expression_evaluator.dart';

Expand Down Expand Up @@ -93,15 +94,15 @@ class BatchedExpressionEvaluator extends ExpressionEvaluator {
_logger.fine(' - scope: $scope != ${request.scope}');
}

unawaited(_evaluateBatch(currentRequests));
safeUnawaited(_evaluateBatch(currentRequests));
currentRequests = [];
libraryUri = request.libraryUri;
isolateId = request.isolateId;
scope = request.scope;
}
currentRequests.add(request);
}
unawaited(_evaluateBatch(currentRequests));
safeUnawaited(_evaluateBatch(currentRequests));
}

Future<void> _evaluateBatch(List<EvaluateRequest> requests) async {
Expand Down Expand Up @@ -135,7 +136,7 @@ class BatchedExpressionEvaluator extends ExpressionEvaluator {
createError(ErrorKind.internal, 'No batch result object ID.');
request.completer.complete(error);
} else {
unawaited(_debugger
safeUnawaited(_debugger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what happens on failure inside safeUnawaited - will .then below be executed and with what result. I think we might need to add an onError(e,s) function as a parameter to safeUnawaited so the user (batched expression evaluator in this case) can decide what to do with the error. In case of expression evaluation we should complete with an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated so we are now completing with error

.getProperties(listId, offset: i, count: 1, length: requests.length)
.then((v) {
final result = v.first.value;
Expand Down
10 changes: 5 additions & 5 deletions dwds/lib/src/services/chrome_proxy_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class ChromeProxyService implements VmServiceInterface {
executionContext,
expressionCompiler,
);
unawaited(service.createIsolate(appConnection));
safeUnawaited(service.createIsolate(appConnection));
return service;
}

Expand All @@ -171,7 +171,7 @@ class ChromeProxyService implements VmServiceInterface {
_skipLists.initialize();
// We do not need to wait for compiler dependencies to be updated as the
// [ExpressionEvaluator] is robust to evaluation requests during updates.
unawaited(_updateCompilerDependencies(entrypoint));
safeUnawaited(_updateCompilerDependencies(entrypoint));
}

Future<void> _updateCompilerDependencies(String entrypoint) async {
Expand Down Expand Up @@ -271,18 +271,18 @@ class ChromeProxyService implements VmServiceInterface {
compiler,
);

unawaited(_prewarmExpressionCompilerCache());
safeUnawaited(_prewarmExpressionCompilerCache());

await debugger.reestablishBreakpoints(
_previousBreakpoints, _disabledBreakpoints);
_disabledBreakpoints.clear();

unawaited(appConnection.onStart.then((_) async {
safeUnawaited(appConnection.onStart.then((_) async {
await debugger.resumeFromStart();
_startedCompleter.complete();
}));

unawaited(appConnection.onDone.then((_) => destroyIsolate()));
safeUnawaited(appConnection.onDone.then((_) => destroyIsolate()));

final isolateRef = inspector.isolateRef;
final timestamp = DateTime.now().millisecondsSinceEpoch;
Expand Down
7 changes: 4 additions & 3 deletions dwds/lib/src/services/debug_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ Future<void> _handleSseConnections(
if (onResponse != null) onResponse(response);
return jsonEncode(response);
}).listen(connection.sink.add);
unawaited(chromeProxyService.remoteDebugger.onClose.first.whenComplete(() {
safeUnawaited(
chromeProxyService.remoteDebugger.onClose.first.whenComplete(() {
connection.sink.close();
sub.cancel();
}));
Expand All @@ -97,7 +98,7 @@ Future<void> _handleSseConnections(
++_clientsConnected;
final vmServerConnection = VmServerConnection(inputStream,
responseController.sink, serviceExtensionRegistry, chromeProxyService);
unawaited(vmServerConnection.done.whenComplete(() {
safeUnawaited(vmServerConnection.done.whenComplete(() {
--_clientsConnected;
if (!_acceptNewConnections && _clientsConnected == 0) {
// DDS has disconnected so we can allow for clients to connect directly
Expand Down Expand Up @@ -236,7 +237,7 @@ class DebugService {
final sseHandler = SseHandler(Uri.parse('/$authToken/\$debugHandler'),
keepAlive: const Duration(seconds: 5));
handler = sseHandler.handler;
unawaited(_handleSseConnections(
safeUnawaited(_handleSseConnections(
sseHandler, chromeProxyService, serviceExtensionRegistry,
onRequest: onRequest, onResponse: onResponse));
} else {
Expand Down
5 changes: 3 additions & 2 deletions dwds/lib/src/utilities/batched_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import 'dart:async';
import 'package:async/async.dart';
import 'package:dwds/src/utilities/shared.dart';

/// Stream controller allowing to batch events.
class BatchedStreamController<T> {
Expand All @@ -28,7 +29,7 @@ class BatchedStreamController<T> {
_inputController = StreamController<T>(),
_outputController = StreamController<List<T>>() {
_inputQueue = StreamQueue<T>(_inputController.stream);
unawaited(_batchAndSendEvents());
safeUnawaited(_batchAndSendEvents());
}

/// Sink collecting events.
Expand All @@ -39,7 +40,7 @@ class BatchedStreamController<T> {

/// Close the controller.
Future<dynamic> close() async {
unawaited(_inputController.close());
safeUnawaited(_inputController.close());
return _completer.future.then((value) => _outputController.close());
}

Expand Down
9 changes: 9 additions & 0 deletions dwds/lib/src/utilities/shared.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:async';
import 'dart:io';

import 'package:http_multi_server/http_multi_server.dart';
import 'package:logging/logging.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart';
import 'package:stack_trace/stack_trace.dart';
Expand All @@ -23,6 +24,8 @@ String createId() {
return '$_nextId';
}

final _logger = Logger('Utilities');

/// Returns `true` if [hostname] is bound to an IPv6 address.
Future<bool> useIPv6ForHost(String hostname) async {
final addresses = await InternetAddress.lookup(hostname);
Expand Down Expand Up @@ -115,3 +118,9 @@ Map<String, dynamic> getResultOrHandleError(wip.WipResponse? response,
}
return result;
}

void safeUnawaited(Future<void> future) {
Copy link
Contributor

@annagrin annagrin Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion, so we an safely use this in batched expression evaluator:

void safeUnawaited(Future<void> future, { 
  void Function(Exception, StackTrace)? onError,
}) {
  onError ??= (e,s) => 
     _logger.warning('Error in unawaited Future:', e, s);
  unawaited(future.catchError(onError));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, made that change!

unawaited(future.catchError((error, stackTrace) {
_logger.warning('Error in unawaited Future:', error, stackTrace);
}));
}
6 changes: 4 additions & 2 deletions dwds/lib/src/web_utilities/batched_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import 'dart:async';
import 'package:async/async.dart';

import '../utilities/shared.dart';

/// Stream controller allowing to batch events.
class BatchedStreamController<T> {
static const _defaultBatchDelayMilliseconds = 1000;
Expand All @@ -28,7 +30,7 @@ class BatchedStreamController<T> {
_inputController = StreamController<T>(),
_outputController = StreamController<List<T>>() {
_inputQueue = StreamQueue<T>(_inputController.stream);
unawaited(_batchAndSendEvents());
safeUnawaited(_batchAndSendEvents());
}

/// Sink collecting events.
Expand All @@ -39,7 +41,7 @@ class BatchedStreamController<T> {

/// Close the controller.
Future<dynamic> close() async {
unawaited(_inputController.close());
safeUnawaited(_inputController.close());
return _completer.future.then((value) => _outputController.close());
}

Expand Down
7 changes: 4 additions & 3 deletions dwds/test/chrome_proxy_service_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import 'package:dwds/src/connections/debug_connection.dart';
import 'package:dwds/src/loaders/strategy.dart';
import 'package:dwds/src/services/chrome_proxy_service.dart';
import 'package:dwds/src/utilities/dart_uri.dart';
import 'package:dwds/src/utilities/shared.dart';
import 'package:http/http.dart' as http;
import 'package:path/path.dart' as path;
import 'package:test/test.dart';
Expand Down Expand Up @@ -1509,12 +1510,12 @@ void main() {
test('basic Pause/Resume', () async {
expect(service.streamListen('Debug'), completion(_isSuccess));
final stream = service.onEvent('Debug');
unawaited(tabConnection.debugger.pause());
safeUnawaited(tabConnection.debugger.pause());
await expectLater(
stream,
emitsThrough(const TypeMatcher<Event>()
.having((e) => e.kind, 'kind', EventKind.kPauseInterrupted)));
unawaited(tabConnection.debugger.resume());
safeUnawaited(tabConnection.debugger.resume());
expect(
eventStream,
emitsThrough(const TypeMatcher<Event>()
Expand Down Expand Up @@ -1720,7 +1721,7 @@ void main() {
final stream = service.onEvent(EventStreams.kLogging);
final message = 'myMessage';

unawaited(tabConnection.runtime.evaluate("sendLog('$message');"));
safeUnawaited(tabConnection.runtime.evaluate("sendLog('$message');"));

final event = await stream.first;
expect(event.kind, EventKind.kLogging);
Expand Down