Skip to content

Commit 311f02e

Browse files
committed
Move MessageReader loop from MessageDispatcher to ProtocolEndpoint
This change moves the loop that reads messages from the MessageReader from the MessageDispatcher to ProtocolEndpoint. This allows the MessageDispatcher to be used separately from a specific connection so that message handlers can be registered and shared between connections.
1 parent 7ddb54c commit 311f02e

File tree

2 files changed

+155
-186
lines changed

2 files changed

+155
-186
lines changed

src/PowerShellEditorServices.Protocol/MessageProtocol/MessageDispatcher.cs

+1-178
Original file line numberDiff line numberDiff line change
@@ -17,81 +17,16 @@ public class MessageDispatcher
1717
{
1818
#region Fields
1919

20-
private ChannelBase protocolChannel;
21-
private AsyncContextThread messageLoopThread;
22-
2320
private Dictionary<string, Func<Message, MessageWriter, Task>> requestHandlers =
2421
new Dictionary<string, Func<Message, MessageWriter, Task>>();
2522

2623
private Dictionary<string, Func<Message, MessageWriter, Task>> eventHandlers =
2724
new Dictionary<string, Func<Message, MessageWriter, Task>>();
2825

29-
private Action<Message> responseHandler;
30-
31-
private CancellationTokenSource messageLoopCancellationToken =
32-
new CancellationTokenSource();
33-
34-
#endregion
35-
36-
#region Properties
37-
38-
public SynchronizationContext SynchronizationContext { get; private set; }
39-
40-
public bool InMessageLoopThread
41-
{
42-
get
43-
{
44-
// We're in the same thread as the message loop if the
45-
// current synchronization context equals the one we
46-
// know.
47-
return SynchronizationContext.Current == this.SynchronizationContext;
48-
}
49-
}
50-
51-
protected MessageReader MessageReader { get; private set; }
52-
53-
protected MessageWriter MessageWriter { get; private set; }
54-
55-
56-
#endregion
57-
58-
#region Constructors
59-
60-
public MessageDispatcher(ChannelBase protocolChannel)
61-
{
62-
this.protocolChannel = protocolChannel;
63-
}
64-
6526
#endregion
6627

6728
#region Public Methods
6829

69-
public void Start()
70-
{
71-
// At this point the MessageReader and MessageWriter should be ready
72-
this.MessageReader = protocolChannel.MessageReader;
73-
this.MessageWriter = protocolChannel.MessageWriter;
74-
75-
// Start the main message loop thread. The Task is
76-
// not explicitly awaited because it is running on
77-
// an independent background thread.
78-
this.messageLoopThread = new AsyncContextThread("Message Dispatcher");
79-
this.messageLoopThread
80-
.Run(() => this.ListenForMessages(this.messageLoopCancellationToken.Token))
81-
.ContinueWith(this.OnListenTaskCompleted);
82-
}
83-
84-
public void Stop()
85-
{
86-
// Stop the message loop thread
87-
if (this.messageLoopThread != null)
88-
{
89-
this.messageLoopCancellationToken.Cancel();
90-
this.messageLoopThread.Stop();
91-
SynchronizationContext.SetSynchronizationContext(null);
92-
}
93-
}
94-
9530
public void SetRequestHandler<TParams, TResult, TError, TRegistrationOptions>(
9631
RequestType<TParams, TResult, TError, TRegistrationOptions> requestType,
9732
Func<TParams, RequestContext<TResult>, Task> requestHandler)
@@ -171,98 +106,11 @@ public void SetEventHandler<TParams, TRegistrationOptions>(
171106
});
172107
}
173108

174-
public void SetResponseHandler(Action<Message> responseHandler)
175-
{
176-
this.responseHandler = responseHandler;
177-
}
178-
179-
#endregion
180-
181-
#region Events
182-
183-
public event EventHandler<Exception> UnhandledException;
184-
185-
protected void OnUnhandledException(Exception unhandledException)
186-
{
187-
if (this.UnhandledException != null)
188-
{
189-
this.UnhandledException(this, unhandledException);
190-
}
191-
}
192-
193109
#endregion
194110

195111
#region Private Methods
196112

197-
private async Task ListenForMessages(CancellationToken cancellationToken)
198-
{
199-
this.SynchronizationContext = SynchronizationContext.Current;
200-
201-
// Run the message loop
202-
bool isRunning = true;
203-
while (isRunning && !cancellationToken.IsCancellationRequested)
204-
{
205-
Message newMessage = null;
206-
207-
try
208-
{
209-
// Read a message from the channel
210-
newMessage = await this.MessageReader.ReadMessage();
211-
}
212-
catch (MessageParseException e)
213-
{
214-
// TODO: Write an error response
215-
216-
Logger.Write(
217-
LogLevel.Error,
218-
"Could not parse a message that was received:\r\n\r\n" +
219-
e.ToString());
220-
221-
// Continue the loop
222-
continue;
223-
}
224-
catch (IOException e)
225-
{
226-
// The stream has ended, end the message loop
227-
Logger.Write(
228-
LogLevel.Error,
229-
string.Format(
230-
"Stream terminated unexpectedly, ending MessageDispatcher loop\r\n\r\nException: {0}\r\n{1}",
231-
e.GetType().Name,
232-
e.Message));
233-
234-
break;
235-
}
236-
catch (ObjectDisposedException)
237-
{
238-
Logger.Write(
239-
LogLevel.Verbose,
240-
"MessageReader attempted to read from a disposed stream, ending MessageDispatcher loop");
241-
242-
break;
243-
}
244-
catch (Exception e)
245-
{
246-
Logger.Write(
247-
LogLevel.Verbose,
248-
"Caught unexpected exception '{0}' in MessageDispatcher loop:\r\n{1}",
249-
e.GetType().Name,
250-
e.Message);
251-
}
252-
253-
// The message could be null if there was an error parsing the
254-
// previous message. In this case, do not try to dispatch it.
255-
if (newMessage != null)
256-
{
257-
// Process the message
258-
await this.DispatchMessage(
259-
newMessage,
260-
this.MessageWriter);
261-
}
262-
}
263-
}
264-
265-
protected async Task DispatchMessage(
113+
public async Task DispatchMessage(
266114
Message messageToDispatch,
267115
MessageWriter messageWriter)
268116
{
@@ -281,13 +129,6 @@ protected async Task DispatchMessage(
281129
Logger.Write(LogLevel.Error, $"MessageDispatcher: No handler registered for Request type '{messageToDispatch.Method}'");
282130
}
283131
}
284-
else if (messageToDispatch.MessageType == MessageType.Response)
285-
{
286-
if (this.responseHandler != null)
287-
{
288-
this.responseHandler(messageToDispatch);
289-
}
290-
}
291132
else if (messageToDispatch.MessageType == MessageType.Event)
292133
{
293134
Func<Message, MessageWriter, Task> eventHandler = null;
@@ -330,24 +171,6 @@ protected async Task DispatchMessage(
330171
}
331172
}
332173

333-
private void OnListenTaskCompleted(Task listenTask)
334-
{
335-
if (listenTask.IsFaulted)
336-
{
337-
Logger.Write(
338-
LogLevel.Error,
339-
string.Format(
340-
"MessageDispatcher loop terminated due to unhandled exception:\r\n\r\n{0}",
341-
listenTask.Exception.ToString()));
342-
343-
this.OnUnhandledException(listenTask.Exception);
344-
}
345-
else if (listenTask.IsCompleted || listenTask.IsCanceled)
346-
{
347-
// TODO: Dispose of anything?
348-
}
349-
}
350-
351174
#endregion
352175
}
353176
}

0 commit comments

Comments
 (0)