Skip to content

Commit caa36cf

Browse files
keertipCommit Queue
authored and
Commit Queue
committed
Add a MessageScheduler queue that all incoming messages to the analysis server go through.
Change-Id: I3770b5a1a8faa391fb35a5a1e3d2678dbc4fb3f2 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/383703 Commit-Queue: Keerti Parthasarathy <[email protected]> Reviewed-by: Brian Wilkerson <[email protected]>
1 parent 59c67b6 commit caa36cf

File tree

7 files changed

+290
-41
lines changed

7 files changed

+290
-41
lines changed

Diff for: pkg/analysis_server/lib/src/analysis_server.dart

+8-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import 'package:analysis_server/src/protocol_server.dart' as legacy
2626
import 'package:analysis_server/src/protocol_server.dart' as server;
2727
import 'package:analysis_server/src/server/crash_reporting_attachments.dart';
2828
import 'package:analysis_server/src/server/diagnostic_server.dart';
29+
import 'package:analysis_server/src/server/message_scheduler.dart';
2930
import 'package:analysis_server/src/server/performance.dart';
3031
import 'package:analysis_server/src/services/completion/completion_performance.dart';
3132
import 'package:analysis_server/src/services/correction/assist_internal.dart';
@@ -250,6 +251,10 @@ abstract class AnalysisServer {
250251
/// A completer for [lspUninitialized].
251252
final Completer<void> _lspUninitializedCompleter = Completer<void>();
252253

254+
/// A scheduler that keeps track of all incoming messages and schedules them
255+
/// for processing.
256+
final MessageScheduler messageScheduler;
257+
253258
AnalysisServer(
254259
this.options,
255260
this.sdkManager,
@@ -269,7 +274,9 @@ abstract class AnalysisServer {
269274
}) : resourceProvider = OverlayResourceProvider(baseResourceProvider),
270275
pubApi = PubApi(instrumentationService, httpClient,
271276
Platform.environment['PUB_HOSTED_URL']),
272-
producerGeneratorsForLintRules = AssistProcessor.computeLintRuleMap() {
277+
producerGeneratorsForLintRules = AssistProcessor.computeLintRuleMap(),
278+
messageScheduler = MessageScheduler() {
279+
messageScheduler.setServer(this);
273280
// Set the default URI converter. This uses the resource providers path
274281
// context (unlike the initialized value) which allows tests to override it.
275282
uriConverter = ClientUriConverter.noop(baseResourceProvider.pathContext);

Diff for: pkg/analysis_server/lib/src/legacy_analysis_server.dart

+3-1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ import 'package:analysis_server/src/server/detachable_filesystem_manager.dart';
8686
import 'package:analysis_server/src/server/diagnostic_server.dart';
8787
import 'package:analysis_server/src/server/error_notifier.dart';
8888
import 'package:analysis_server/src/server/features.dart';
89+
import 'package:analysis_server/src/server/message_scheduler.dart';
8990
import 'package:analysis_server/src/server/performance.dart';
9091
import 'package:analysis_server/src/server/sdk_configuration.dart';
9192
import 'package:analysis_server/src/services/completion/completion_state.dart';
@@ -606,7 +607,8 @@ class LegacyAnalysisServer extends AnalysisServer {
606607
/// Handle a [request] that was read from the communication channel.
607608
void handleRequestOrResponse(RequestOrResponse requestOrResponse) {
608609
if (requestOrResponse is Request) {
609-
handleRequest(requestOrResponse);
610+
messageScheduler.add(LegacyMessage(request: requestOrResponse));
611+
messageScheduler.notify();
610612
} else if (requestOrResponse is Response) {
611613
handleResponse(requestOrResponse);
612614
}

Diff for: pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart

+8-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import 'package:analysis_server/src/server/crash_reporting_attachments.dart';
2727
import 'package:analysis_server/src/server/detachable_filesystem_manager.dart';
2828
import 'package:analysis_server/src/server/diagnostic_server.dart';
2929
import 'package:analysis_server/src/server/error_notifier.dart';
30+
import 'package:analysis_server/src/server/message_scheduler.dart';
3031
import 'package:analysis_server/src/server/performance.dart';
3132
import 'package:analysis_server/src/services/user_prompts/dart_fix_prompt_manager.dart';
3233
import 'package:analysis_server/src/utilities/extensions/flutter.dart';
@@ -191,7 +192,8 @@ class LspAnalysisServer extends AnalysisServer {
191192
analysisDriverScheduler.start();
192193

193194
_channelSubscription =
194-
channel.listen(handleMessage, onDone: done, onError: socketError);
195+
channel.listen(scheduleMessage, onDone: done, onError: socketError);
196+
195197
if (AnalysisServer.supportsPlugins) {
196198
_pluginChangeSubscription =
197199
pluginManager.pluginsChanged.listen((_) => _onPluginsChanged());
@@ -757,6 +759,11 @@ class LspAnalysisServer extends AnalysisServer {
757759
}
758760
}
759761

762+
void scheduleMessage(Message message) {
763+
messageScheduler.add(LspMessage(message: message));
764+
messageScheduler.notify();
765+
}
766+
760767
void sendErrorResponse(Message message, ResponseError error) {
761768
if (message is RequestMessage) {
762769
sendResponse(ResponseMessage(
+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:collection';
7+
8+
import 'package:analysis_server/lsp_protocol/protocol.dart' as lsp;
9+
import 'package:analysis_server/protocol/protocol.dart' as legacy;
10+
import 'package:analysis_server/src/analysis_server.dart';
11+
import 'package:analysis_server/src/legacy_analysis_server.dart';
12+
import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
13+
import 'package:analyzer/src/util/performance/operation_performance.dart';
14+
import 'package:meta/meta.dart';
15+
16+
/// Represents a message from DTD (Dart Tooling Daemon).
17+
final class DtdMessage extends MessageObject {
18+
final lsp.IncomingMessage message;
19+
final Completer<Map<String, Object?>> completer;
20+
final OperationPerformanceImpl performance;
21+
22+
DtdMessage(
23+
{required this.message,
24+
required this.completer,
25+
required this.performance});
26+
}
27+
28+
/// Represents a message in the Legacy protocol format.
29+
final class LegacyMessage extends MessageObject {
30+
final legacy.Request request;
31+
32+
LegacyMessage({required this.request});
33+
}
34+
35+
/// Represents a message in the LSP protocol format.
36+
final class LspMessage extends MessageObject {
37+
final lsp.Message message;
38+
39+
LspMessage({required this.message});
40+
}
41+
42+
/// Represents a message from a client, can be an IDE, DTD etc.
43+
sealed class MessageObject {}
44+
45+
/// The [MessageScheduler] receives messages from all clients of the
46+
/// [AnalysisServer]. Clients can include IDE's (LSP and Legacy protocol), DTD,
47+
/// and the Diagnostic server. The [MessageSchedular] acts as a hub for all
48+
/// incoming messages and forwards the messages to the appropriate handlers.
49+
final class MessageScheduler {
50+
/// The [AnalaysisServer] associated with the schedular.
51+
late final AnalysisServer server;
52+
53+
/// A queue of incoming messages from all the clients of the [AnalysisServer].
54+
final ListQueue<MessageObject> _incomingMessages = ListQueue<MessageObject>();
55+
56+
@visibleForTesting
57+
ListQueue<MessageObject> get incomingMessages => _incomingMessages;
58+
59+
/// Add a message to the end of the incoming messages queue.
60+
void add(MessageObject message) {
61+
_incomingMessages.addLast(message);
62+
}
63+
64+
/// Notify the [MessageSchedular] to process the messages queue.
65+
void notify() async {
66+
processMessages();
67+
}
68+
69+
/// Dispatch the first message in the queue to be executed.
70+
void processMessages() {
71+
if (_incomingMessages.isEmpty) {
72+
return;
73+
}
74+
var message = _incomingMessages.removeFirst();
75+
switch (message) {
76+
case LspMessage():
77+
var lspMessage = message.message;
78+
(server as LspAnalysisServer).handleMessage(lspMessage);
79+
case LegacyMessage():
80+
var request = message.request;
81+
(server as LegacyAnalysisServer).handleRequest(request);
82+
case DtdMessage():
83+
server.dtd!.processMessage(
84+
message.message, message.performance, message.completer);
85+
}
86+
}
87+
88+
/// Set the [AnalysisServer].
89+
void setServer(AnalysisServer analysisServer) {
90+
server = analysisServer;
91+
}
92+
}

Diff for: pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart

+58-38
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import 'dart:async';
66

77
import 'package:analysis_server/lsp_protocol/protocol.dart';
88
import 'package:analysis_server/src/analysis_server.dart';
9-
import 'package:analysis_server/src/lsp/client_capabilities.dart';
109
import 'package:analysis_server/src/lsp/error_or.dart';
1110
import 'package:analysis_server/src/lsp/handlers/handler_states.dart';
1211
import 'package:analysis_server/src/lsp/handlers/handlers.dart';
12+
import 'package:analysis_server/src/server/message_scheduler.dart';
1313
import 'package:analysis_server/src/server/performance.dart';
1414
import 'package:analyzer/src/util/performance/operation_performance.dart';
1515
import 'package:dtd/dtd.dart';
@@ -63,6 +63,54 @@ class DtdServices {
6363

6464
DtdConnectionState get state => _state;
6565

66+
/// Executes the LSP handler [messageHandler] with [params] and returns the
67+
/// results as a map to provide back to DTD.
68+
///
69+
/// If the handler fails, throws an [RpcException] to be propagated to the
70+
/// client.
71+
void processMessage(
72+
IncomingMessage message,
73+
OperationPerformanceImpl performance,
74+
Completer<Map<String, Object?>> completer) async {
75+
// (TODO:keertip) Lookup the right handler, execute and return results.
76+
// For now, complete with exception.
77+
completer.completeError(RpcException(
78+
ErrorCodes.InvalidRequest.toJson(),
79+
'DTD requests are not yet supported',
80+
));
81+
82+
// (TODO:keertip) Uncomment when lookup has been implemented
83+
// var info = MessageInfo(
84+
// performance: performance,
85+
// // DTD clients requests are always executed with a fixed set of
86+
// // capabilities so that the responses don't change in format based on the
87+
// // owning editor.
88+
// clientCapabilities: fixedBasicLspClientCapabilities,
89+
// );
90+
// var token = NotCancelableToken(); // We don't currently support cancel.
91+
92+
// // Execute the handler.
93+
// var result = await messageHandler.handleMessage(message, info, token);
94+
95+
// // Map the result (or error) on to what a DTD handler needs to return.
96+
// return result.map(
97+
// // Map LSP errors on to equiv JSON-RPC errors for DTD.
98+
// (error) => throw RpcException(
99+
// error.code.toJson(),
100+
// error.message,
101+
// data: error.data,
102+
// ),
103+
// // DTD requires that all results are a Map and that they contain a
104+
// // 'type' field. This differs slightly from LSP where we could return a
105+
// // boolean (for example). This means we need to put the result in a
106+
// // field, which we're calling 'result'.
107+
// (result) => {
108+
// 'type': result?.runtimeType.toString(),
109+
// 'result': result,
110+
// },
111+
// );
112+
}
113+
66114
/// Closes the connection to DTD and cleans up.
67115
void _close([DtdConnectionState state = DtdConnectionState.Disconnected]) {
68116
_state = state;
@@ -122,54 +170,26 @@ class DtdServices {
122170
}
123171
}
124172

125-
/// Executes the LSP handler [messageHandler] with [params] and returns the
126-
/// results as a map to provide back to DTD.
127-
///
128-
/// If the handler fails, throws an [RpcException] to be propagated to the
129-
/// client.
173+
/// The incoming request is sent to the [MessageScheduler] for execution.
174+
/// A completer is returned which will be completed with the result of the
175+
/// execution of the request by the corresponding [MessageHandler].
130176
Future<Map<String, Object?>> _executeLspHandler(
131177
MessageHandler<Object?, Object?, AnalysisServer> messageHandler,
132178
Parameters params,
133179
OperationPerformanceImpl performance,
134180
) async {
135-
// TODO(dantup): Currently the handler just runs immediately, but this
136-
// should interact with the scheduler in future.
137-
138181
// Map the incoming request into types we use for LSP request handling.
139182
var message = IncomingMessage(
140183
jsonrpc: jsonRpcVersion,
141184
method: messageHandler.handlesMessage,
142185
params: params.asMap,
143186
);
144-
var info = MessageInfo(
145-
performance: performance,
146-
// DTD clients requests are always executed with a fixed set of
147-
// capabilities so that the responses don't change in format based on the
148-
// owning editor.
149-
clientCapabilities: fixedBasicLspClientCapabilities,
150-
);
151-
var token = NotCancelableToken(); // We don't currently support cancel.
152-
153-
// Execute the handler.
154-
var result = await messageHandler.handleMessage(message, info, token);
155-
156-
// Map the result (or error) on to what a DTD handler needs to return.
157-
return result.map(
158-
// Map LSP errors on to equiv JSON-RPC errors for DTD.
159-
(error) => throw RpcException(
160-
error.code.toJson(),
161-
error.message,
162-
data: error.data,
163-
),
164-
// DTD requires that all results are a Map and that they contain a
165-
// 'type' field. This differs slightly from LSP where we could return a
166-
// boolean (for example). This means we need to put the result in a
167-
// field, which we're calling 'result'.
168-
(result) => {
169-
'type': result?.runtimeType.toString(),
170-
'result': result,
171-
},
172-
);
187+
var scheduler = _server.messageScheduler;
188+
var completer = Completer<Map<String, Object?>>();
189+
scheduler.add(DtdMessage(
190+
message: message, performance: performance, completer: completer));
191+
scheduler.notify();
192+
return completer.future;
173193
}
174194

175195
/// Handles an unexpected error occurring on the DTD connection by logging and

0 commit comments

Comments
 (0)