Skip to content

Commit 674cb15

Browse files
authoredApr 2, 2025··
Make supposedly unreachable code less reachable (#178)
* Move WriteJsonRpcMessageToBuffer to method * Fix indentation in McpClient * Make supposedly unreachable code less reachable * Guard against multiple RunAsync calls * Move RunAsync inside of Try in MapMcp
1 parent 25bcb44 commit 674cb15

File tree

7 files changed

+94
-80
lines changed

7 files changed

+94
-80
lines changed
 

Diff for: ‎src/ModelContextProtocol.AspNetCore/McpEndpointRouteBuilderExtensions.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpo
4747
{
4848
throw new Exception($"Unreachable given good entropy! Session with ID '{sessionId}' has already been created.");
4949
}
50-
await using var server = McpServerFactory.Create(transport, mcpServerOptions.Value, loggerFactory, endpoints.ServiceProvider);
5150

5251
try
5352
{
5453
var transportTask = transport.RunAsync(cancellationToken: requestAborted);
54+
await using var server = McpServerFactory.Create(transport, mcpServerOptions.Value, loggerFactory, endpoints.ServiceProvider);
5555

5656
try
5757
{
@@ -85,7 +85,7 @@ public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpo
8585

8686
if (!_sessions.TryGetValue(sessionId.ToString(), out var transport))
8787
{
88-
await Results.BadRequest($"Session {sessionId} not found.").ExecuteAsync(context);
88+
await Results.BadRequest($"Session ID not found.").ExecuteAsync(context);
8989
return;
9090
}
9191

Diff for: ‎src/ModelContextProtocol/Client/McpClient.cs

+15-17
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,27 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default)
8282
{
8383
// Connect transport
8484
_sessionTransport = await _clientTransport.ConnectAsync(cancellationToken).ConfigureAwait(false);
85-
// We don't want the ConnectAsync token to cancel the session after we've successfully connected.
86-
// The base class handles cleaning up the session in DisposeAsync without our help.
87-
StartSession(_sessionTransport, fullSessionCancellationToken: CancellationToken.None);
85+
StartSession(_sessionTransport);
8886

8987
// Perform initialization sequence
9088
using var initializationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
9189
initializationCts.CancelAfter(_options.InitializationTimeout);
9290

93-
try
94-
{
95-
// Send initialize request
96-
var initializeResponse = await SendRequestAsync<InitializeResult>(
97-
new JsonRpcRequest
98-
{
99-
Method = RequestMethods.Initialize,
100-
Params = new InitializeRequestParams()
91+
try
92+
{
93+
// Send initialize request
94+
var initializeResponse = await SendRequestAsync<InitializeResult>(
95+
new JsonRpcRequest
10196
{
102-
ProtocolVersion = _options.ProtocolVersion,
103-
Capabilities = _options.Capabilities ?? new ClientCapabilities(),
104-
ClientInfo = _options.ClientInfo
105-
}
106-
},
107-
initializationCts.Token).ConfigureAwait(false);
97+
Method = RequestMethods.Initialize,
98+
Params = new InitializeRequestParams()
99+
{
100+
ProtocolVersion = _options.ProtocolVersion,
101+
Capabilities = _options.Capabilities ?? new ClientCapabilities(),
102+
ClientInfo = _options.ClientInfo
103+
}
104+
},
105+
initializationCts.Token).ConfigureAwait(false);
108106

109107
// Store server information
110108
_logger.ServerCapabilitiesReceived(EndpointName,

Diff for: ‎src/ModelContextProtocol/Protocol/Transport/SseResponseStreamTransport.cs

+13-13
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,30 @@ public sealed class SseResponseStreamTransport(Stream sseResponseStream, string
3232
/// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns>
3333
public Task RunAsync(CancellationToken cancellationToken)
3434
{
35-
void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<byte> writer)
36-
{
37-
if (item.EventType == "endpoint")
38-
{
39-
writer.Write(Encoding.UTF8.GetBytes(messageEndpoint));
40-
return;
41-
}
42-
43-
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
44-
}
45-
46-
IsConnected = true;
47-
4835
// The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type,
4936
// so we fib and special-case the "endpoint" event type in the formatter.
5037
if (!_outgoingSseChannel.Writer.TryWrite(new SseItem<IJsonRpcMessage?>(null, "endpoint")))
5138
{
5239
throw new InvalidOperationException($"You must call ${nameof(RunAsync)} before calling ${nameof(SendMessageAsync)}.");
5340
}
5441

42+
IsConnected = true;
43+
5544
var sseItems = _outgoingSseChannel.Reader.ReadAllAsync(cancellationToken);
5645
return _sseWriteTask = SseFormatter.WriteAsync(sseItems, sseResponseStream, WriteJsonRpcMessageToBuffer, cancellationToken);
5746
}
5847

48+
private void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<byte> writer)
49+
{
50+
if (item.EventType == "endpoint")
51+
{
52+
writer.Write(Encoding.UTF8.GetBytes(messageEndpoint));
53+
return;
54+
}
55+
56+
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
57+
}
58+
5959
/// <inheritdoc/>
6060
public ChannelReader<IJsonRpcMessage> MessageReader => _incomingChannel.Reader;
6161

Diff for: ‎src/ModelContextProtocol/Server/McpServer.cs

+11-5
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ internal sealed class McpServer : McpJsonRpcEndpoint, IMcpServer
1414
private readonly EventHandler? _toolsChangedDelegate;
1515
private readonly EventHandler? _promptsChangedDelegate;
1616

17-
private ITransport _sessionTransport;
1817
private string _endpointName;
18+
private int _started;
1919

2020
/// <summary>
2121
/// Creates a new instance of <see cref="McpServer"/>.
@@ -32,7 +32,6 @@ public McpServer(ITransport transport, McpServerOptions options, ILoggerFactory?
3232
Throw.IfNull(transport);
3333
Throw.IfNull(options);
3434

35-
_sessionTransport = transport;
3635
ServerOptions = options;
3736
Services = serviceProvider;
3837
_endpointName = $"Server ({options.ServerInfo.Name} {options.ServerInfo.Version})";
@@ -74,6 +73,8 @@ public McpServer(ITransport transport, McpServerOptions options, ILoggerFactory?
7473
SetPromptsHandler(options);
7574
SetResourcesHandler(options);
7675
SetSetLoggingLevelHandler(options);
76+
77+
StartSession(transport);
7778
}
7879

7980
public ServerCapabilities? ServerCapabilities { get; set; }
@@ -96,11 +97,16 @@ public McpServer(ITransport transport, McpServerOptions options, ILoggerFactory?
9697
/// <inheritdoc />
9798
public async Task RunAsync(CancellationToken cancellationToken = default)
9899
{
100+
if (Interlocked.Exchange(ref _started, 1) != 0)
101+
{
102+
throw new InvalidOperationException($"{nameof(RunAsync)} must only be called once.");
103+
}
104+
99105
try
100106
{
101-
// Start processing messages
102-
StartSession(_sessionTransport, fullSessionCancellationToken: cancellationToken);
103-
await MessageProcessingTask.ConfigureAwait(false);
107+
using var _ = cancellationToken.Register(static s => ((McpServer)s!).CancelSession(), this);
108+
// The McpServer ctor always calls StartSession, so MessageProcessingTask is always set.
109+
await MessageProcessingTask!.ConfigureAwait(false);
104110
}
105111
finally
106112
{

Diff for: ‎src/ModelContextProtocol/Shared/McpJsonRpcEndpoint.cs

+4-8
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
2222

2323
private McpSession? _session;
2424
private CancellationTokenSource? _sessionCts;
25-
private int _started;
2625

2726
private readonly SemaphoreSlim _disposeLock = new(1, 1);
2827
private bool _disposed;
@@ -61,18 +60,15 @@ public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancella
6160
protected Task? MessageProcessingTask { get; set; }
6261

6362
[MemberNotNull(nameof(MessageProcessingTask))]
64-
protected void StartSession(ITransport sessionTransport, CancellationToken fullSessionCancellationToken = default)
63+
protected void StartSession(ITransport sessionTransport)
6564
{
66-
if (Interlocked.Exchange(ref _started, 1) != 0)
67-
{
68-
throw new InvalidOperationException("The MCP session has already stared.");
69-
}
70-
71-
_sessionCts = CancellationTokenSource.CreateLinkedTokenSource(fullSessionCancellationToken);
65+
_sessionCts = new CancellationTokenSource();
7266
_session = new McpSession(sessionTransport, EndpointName, _requestHandlers, _notificationHandlers, _logger);
7367
MessageProcessingTask = _session.ProcessMessagesAsync(_sessionCts.Token);
7468
}
7569

70+
protected void CancelSession() => _sessionCts?.Cancel();
71+
7672
public async ValueTask DisposeAsync()
7773
{
7874
using var _ = await _disposeLock.LockAsync().ConfigureAwait(false);

Diff for: ‎tests/ModelContextProtocol.Tests/Server/McpServerFactoryTests.cs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
using ModelContextProtocol.Protocol.Transport;
2-
using ModelContextProtocol.Protocol.Types;
1+
using ModelContextProtocol.Protocol.Types;
32
using ModelContextProtocol.Server;
43
using ModelContextProtocol.Tests.Utils;
5-
using Moq;
64

75
namespace ModelContextProtocol.Tests.Server;
86

@@ -25,7 +23,8 @@ public McpServerFactoryTests(ITestOutputHelper testOutputHelper)
2523
public async Task Create_Should_Initialize_With_Valid_Parameters()
2624
{
2725
// Arrange & Act
28-
await using IMcpServer server = McpServerFactory.Create(Mock.Of<ITransport>(), _options, LoggerFactory);
26+
await using var transport = new TestServerTransport();
27+
await using IMcpServer server = McpServerFactory.Create(transport, _options, LoggerFactory);
2928

3029
// Assert
3130
Assert.NotNull(server);
@@ -39,9 +38,10 @@ public void Create_Throws_For_Null_ServerTransport()
3938
}
4039

4140
[Fact]
42-
public void Create_Throws_For_Null_Options()
41+
public async Task Create_Throws_For_Null_Options()
4342
{
4443
// Arrange, Act & Assert
45-
Assert.Throws<ArgumentNullException>("serverOptions", () => McpServerFactory.Create(Mock.Of<ITransport>(), null!, LoggerFactory));
44+
await using var transport = new TestServerTransport();
45+
Assert.Throws<ArgumentNullException>("serverOptions", () => McpServerFactory.Create(transport, null!, LoggerFactory));
4646
}
4747
}

Diff for: ‎tests/ModelContextProtocol.Tests/Server/McpServerTests.cs

+43-29
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,21 @@
11
using Microsoft.Extensions.AI;
22
using Microsoft.Extensions.DependencyInjection;
33
using ModelContextProtocol.Protocol.Messages;
4-
using ModelContextProtocol.Protocol.Transport;
54
using ModelContextProtocol.Protocol.Types;
65
using ModelContextProtocol.Server;
76
using ModelContextProtocol.Tests.Utils;
8-
using Moq;
97
using System.Reflection;
108

119
namespace ModelContextProtocol.Tests.Server;
1210

1311
public class McpServerTests : LoggedTest
1412
{
15-
private readonly Mock<ITransport> _serverTransport;
1613
private readonly McpServerOptions _options;
17-
private readonly IServiceProvider _serviceProvider;
1814

1915
public McpServerTests(ITestOutputHelper testOutputHelper)
2016
: base(testOutputHelper)
2117
{
22-
_serverTransport = new Mock<ITransport>();
2318
_options = CreateOptions();
24-
_serviceProvider = new ServiceCollection().BuildServiceProvider();
2519
}
2620

2721
private static McpServerOptions CreateOptions(ServerCapabilities? capabilities = null)
@@ -39,7 +33,8 @@ private static McpServerOptions CreateOptions(ServerCapabilities? capabilities =
3933
public async Task Constructor_Should_Initialize_With_Valid_Parameters()
4034
{
4135
// Arrange & Act
42-
await using var server = McpServerFactory.Create(_serverTransport.Object, _options, LoggerFactory, _serviceProvider);
36+
await using var transport = new TestServerTransport();
37+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
4338

4439
// Assert
4540
Assert.NotNull(server);
@@ -49,21 +44,23 @@ public async Task Constructor_Should_Initialize_With_Valid_Parameters()
4944
public void Constructor_Throws_For_Null_Transport()
5045
{
5146
// Arrange, Act & Assert
52-
Assert.Throws<ArgumentNullException>(() => McpServerFactory.Create(null!, _options, LoggerFactory, _serviceProvider));
47+
Assert.Throws<ArgumentNullException>(() => McpServerFactory.Create(null!, _options, LoggerFactory));
5348
}
5449

5550
[Fact]
56-
public void Constructor_Throws_For_Null_Options()
51+
public async Task Constructor_Throws_For_Null_Options()
5752
{
5853
// Arrange, Act & Assert
59-
Assert.Throws<ArgumentNullException>(() => McpServerFactory.Create(_serverTransport.Object, null!, LoggerFactory, _serviceProvider));
54+
await using var transport = new TestServerTransport();
55+
Assert.Throws<ArgumentNullException>(() => McpServerFactory.Create(transport, null!, LoggerFactory));
6056
}
6157

6258
[Fact]
6359
public async Task Constructor_Does_Not_Throw_For_Null_Logger()
6460
{
6561
// Arrange & Act
66-
await using var server = McpServerFactory.Create(_serverTransport.Object, _options, null, _serviceProvider);
62+
await using var transport = new TestServerTransport();
63+
await using var server = McpServerFactory.Create(transport, _options, null);
6764

6865
// Assert
6966
Assert.NotNull(server);
@@ -73,7 +70,8 @@ public async Task Constructor_Does_Not_Throw_For_Null_Logger()
7370
public async Task Constructor_Does_Not_Throw_For_Null_ServiceProvider()
7471
{
7572
// Arrange & Act
76-
await using var server = McpServerFactory.Create(_serverTransport.Object, _options, LoggerFactory, null);
73+
await using var transport = new TestServerTransport();
74+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory, null);
7775

7876
// Assert
7977
Assert.NotNull(server);
@@ -83,27 +81,23 @@ public async Task Constructor_Does_Not_Throw_For_Null_ServiceProvider()
8381
public async Task RunAsync_Should_Throw_InvalidOperationException_If_Already_Running()
8482
{
8583
// Arrange
86-
await using var server = McpServerFactory.Create(_serverTransport.Object, _options, LoggerFactory, _serviceProvider);
84+
await using var transport = new TestServerTransport();
85+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
8786
var runTask = server.RunAsync(TestContext.Current.CancellationToken);
8887

8988
// Act & Assert
9089
await Assert.ThrowsAsync<InvalidOperationException>(() => server.RunAsync(TestContext.Current.CancellationToken));
9190

92-
try
93-
{
94-
await runTask;
95-
}
96-
catch (NullReferenceException)
97-
{
98-
// _serverTransport.Object returns a null MessageReader
99-
}
91+
await transport.DisposeAsync();
92+
await runTask;
10093
}
10194

10295
[Fact]
10396
public async Task RequestSamplingAsync_Should_Throw_McpServerException_If_Client_Does_Not_Support_Sampling()
10497
{
10598
// Arrange
106-
await using var server = McpServerFactory.Create(_serverTransport.Object, _options, LoggerFactory, _serviceProvider);
99+
await using var transport = new TestServerTransport();
100+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
107101
SetClientCapabilities(server, new ClientCapabilities());
108102

109103
var action = () => server.RequestSamplingAsync(new CreateMessageRequestParams { Messages = [] }, CancellationToken.None);
@@ -117,7 +111,7 @@ public async Task RequestSamplingAsync_Should_SendRequest()
117111
{
118112
// Arrange
119113
await using var transport = new TestServerTransport();
120-
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory, _serviceProvider);
114+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
121115
SetClientCapabilities(server, new ClientCapabilities { Sampling = new SamplingCapability() });
122116

123117
var runTask = server.RunAsync(TestContext.Current.CancellationToken);
@@ -138,7 +132,8 @@ public async Task RequestSamplingAsync_Should_SendRequest()
138132
public async Task RequestRootsAsync_Should_Throw_McpServerException_If_Client_Does_Not_Support_Roots()
139133
{
140134
// Arrange
141-
await using var server = McpServerFactory.Create(_serverTransport.Object, _options, LoggerFactory, _serviceProvider);
135+
await using var transport = new TestServerTransport();
136+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
142137
SetClientCapabilities(server, new ClientCapabilities());
143138

144139
// Act & Assert
@@ -150,7 +145,7 @@ public async Task RequestRootsAsync_Should_SendRequest()
150145
{
151146
// Arrange
152147
await using var transport = new TestServerTransport();
153-
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory, _serviceProvider);
148+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
154149
SetClientCapabilities(server, new ClientCapabilities { Roots = new RootsCapability() });
155150
var runTask = server.RunAsync(TestContext.Current.CancellationToken);
156151

@@ -507,7 +502,7 @@ private async Task Can_Handle_Requests(ServerCapabilities? serverCapabilities, s
507502
var options = CreateOptions(serverCapabilities);
508503
configureOptions?.Invoke(options);
509504

510-
await using var server = McpServerFactory.Create(transport, options, LoggerFactory, _serviceProvider);
505+
await using var server = McpServerFactory.Create(transport, options, LoggerFactory);
511506

512507
var runTask = server.RunAsync(TestContext.Current.CancellationToken);
513508

@@ -542,7 +537,7 @@ private async Task Throws_Exception_If_No_Handler_Assigned(ServerCapabilities se
542537
await using var transport = new TestServerTransport();
543538
var options = CreateOptions(serverCapabilities);
544539

545-
Assert.Throws<McpServerException>(() => McpServerFactory.Create(transport, options, LoggerFactory, _serviceProvider));
540+
Assert.Throws<McpServerException>(() => McpServerFactory.Create(transport, options, LoggerFactory));
546541
}
547542

548543
[Fact]
@@ -553,7 +548,6 @@ public async Task AsSamplingChatClient_NoSamplingSupport_Throws()
553548
Assert.Throws<ArgumentException>("server", () => server.AsSamplingChatClient());
554549
}
555550

556-
557551
[Fact]
558552
public async Task AsSamplingChatClient_HandlesRequestResponse()
559553
{
@@ -583,6 +577,26 @@ public async Task AsSamplingChatClient_HandlesRequestResponse()
583577
Assert.Equal(ChatRole.Assistant, response.Messages[0].Role);
584578
}
585579

580+
[Fact]
581+
public async Task Can_SendMessage_Before_RunAsync()
582+
{
583+
await using var transport = new TestServerTransport();
584+
await using var server = McpServerFactory.Create(transport, _options, LoggerFactory);
585+
586+
var logNotification = new JsonRpcNotification()
587+
{
588+
Method = NotificationMethods.LoggingMessageNotification
589+
};
590+
await server.SendMessageAsync(logNotification, TestContext.Current.CancellationToken);
591+
592+
var runTask = server.RunAsync(TestContext.Current.CancellationToken);
593+
await transport.DisposeAsync();
594+
await runTask;
595+
596+
Assert.NotEmpty(transport.SentMessages);
597+
Assert.Same(logNotification, transport.SentMessages[0]);
598+
}
599+
586600
private static void SetClientCapabilities(IMcpServer server, ClientCapabilities capabilities)
587601
{
588602
PropertyInfo? property = server.GetType().GetProperty("ClientCapabilities", BindingFlags.Public | BindingFlags.Instance);
@@ -644,7 +658,7 @@ public async Task NotifyProgress_Should_Be_Handled()
644658

645659
var notificationReceived = new TaskCompletionSource<JsonRpcNotification>();
646660

647-
var server = McpServerFactory.Create(transport, options, LoggerFactory, _serviceProvider);
661+
var server = McpServerFactory.Create(transport, options, LoggerFactory);
648662
server.AddNotificationHandler(NotificationMethods.ProgressNotification, notification =>
649663
{
650664
notificationReceived.SetResult(notification);

0 commit comments

Comments
 (0)
Please sign in to comment.