Skip to content

Commit df9a2ef

Browse files
committed
adds McpSchemaCodec interface
1 parent 9e8ab7e commit df9a2ef

File tree

12 files changed

+248
-62
lines changed

12 files changed

+248
-62
lines changed

mcp-reactor/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
import java.util.function.Consumer;
1717
import java.util.function.Function;
1818

19-
import com.fasterxml.jackson.core.type.TypeReference;
2019
import com.fasterxml.jackson.databind.ObjectMapper;
2120
import io.modelcontextprotocol.client.transport.FlowSseClient.SseEvent;
2221
import io.modelcontextprotocol.schema.McpJacksonCodec;
22+
import io.modelcontextprotocol.schema.McpSchemaCodec;
2323
import io.modelcontextprotocol.schema.McpType;
2424
import io.modelcontextprotocol.spec.McpClientTransport;
2525
import io.modelcontextprotocol.spec.McpError;
@@ -92,8 +92,8 @@ public class HttpClientSseClientTransport implements McpClientTransport {
9292
/** HTTP request builder for building requests to send messages to the server */
9393
private final HttpRequest.Builder requestBuilder;
9494

95-
/** JSON object mapper for message serialization/deserialization */
96-
protected McpJacksonCodec jacksonCodec;
95+
/** McpSchemaCodec for message serialization/deserialization */
96+
protected McpSchemaCodec schemaCodec;
9797

9898
/** Flag indicating if the transport is in closing state */
9999
private volatile boolean isClosing = false;
@@ -186,7 +186,33 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
186186
Assert.notNull(requestBuilder, "requestBuilder must not be null");
187187
this.baseUri = URI.create(baseUri);
188188
this.sseEndpoint = sseEndpoint;
189-
this.jacksonCodec = new McpJacksonCodec(objectMapper);
189+
this.schemaCodec = new McpJacksonCodec(objectMapper);
190+
this.httpClient = httpClient;
191+
this.requestBuilder = requestBuilder;
192+
193+
this.sseClient = new FlowSseClient(this.httpClient, requestBuilder);
194+
}
195+
196+
/**
197+
* Creates a new transport instance with custom HTTP client builder, object mapper,
198+
* and headers.
199+
* @param httpClient the HTTP client to use
200+
* @param requestBuilder the HTTP request builder to use
201+
* @param baseUri the base URI of the MCP server
202+
* @param sseEndpoint the SSE endpoint path
203+
* @param schemaCodec the schemaCodec for JSON serialization/deserialization
204+
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
205+
*/
206+
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
207+
String sseEndpoint, McpSchemaCodec schemaCodec) {
208+
Assert.notNull(schemaCodec, "ObjectMapper must not be null");
209+
Assert.hasText(baseUri, "baseUri must not be empty");
210+
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
211+
Assert.notNull(httpClient, "httpClient must not be null");
212+
Assert.notNull(requestBuilder, "requestBuilder must not be null");
213+
this.baseUri = URI.create(baseUri);
214+
this.sseEndpoint = sseEndpoint;
215+
this.schemaCodec = schemaCodec;
190216
this.httpClient = httpClient;
191217
this.requestBuilder = requestBuilder;
192218

@@ -217,6 +243,8 @@ public static class Builder {
217243

218244
private ObjectMapper objectMapper = new ObjectMapper();
219245

246+
private McpSchemaCodec schemaCodec;
247+
220248
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
221249
.header("Content-Type", "application/json");
222250

@@ -317,13 +345,28 @@ public Builder objectMapper(ObjectMapper objectMapper) {
317345
return this;
318346
}
319347

348+
/**
349+
* Sets the schema codec for JSON serialization/deserialization.
350+
* @param schemaCodec the McpSchemaCodec implementation
351+
* @return this builder
352+
*/
353+
public Builder withSchemaCodec(final McpSchemaCodec schemaCodec) {
354+
Assert.notNull(schemaCodec, "McpSchemaCodec must not be null");
355+
this.schemaCodec = schemaCodec;
356+
return this;
357+
}
358+
320359
/**
321360
* Builds a new {@link HttpClientSseClientTransport} instance.
322361
* @return a new transport instance
323362
*/
324363
public HttpClientSseClientTransport build() {
364+
if (schemaCodec == null) {
365+
schemaCodec = new McpJacksonCodec(objectMapper);
366+
}
367+
325368
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
326-
objectMapper);
369+
schemaCodec);
327370
}
328371

329372
}
@@ -362,7 +405,7 @@ public void onEvent(SseEvent event) {
362405
future.complete(null);
363406
}
364407
else if (MESSAGE_EVENT_TYPE.equals(event.type())) {
365-
JSONRPCMessage message = jacksonCodec.deserializeJsonRpcMessage(event.data());
408+
JSONRPCMessage message = schemaCodec.decodeFromString(event.data());
366409
Publisher<McpSchema.JSONRPCMessage> result = handler.apply(Mono.just(message));
367410
Mono.from(result).subscribe();
368411
}
@@ -419,7 +462,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
419462
}
420463

421464
try {
422-
String jsonText = this.jacksonCodec.getMapper().writeValueAsString(message);
465+
String jsonText = this.schemaCodec.encodeAsString(message);
423466
URI requestUri = Utils.resolveUri(baseUri, endpoint);
424467
HttpRequest request = this.requestBuilder.uri(requestUri)
425468
.POST(HttpRequest.BodyPublishers.ofString(jsonText))
@@ -474,7 +517,7 @@ public Mono<Void> closeGracefully() {
474517
*/
475518
@Override
476519
public <T> T unmarshalFrom(Object data, McpType<T> typeRef) {
477-
return jacksonCodec.decodeResult(data, typeRef);
520+
return schemaCodec.decodeResult(data, typeRef);
478521
}
479522

480523
}

mcp-reactor/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
import java.util.function.Consumer;
1616
import java.util.function.Function;
1717

18-
import com.fasterxml.jackson.core.type.TypeReference;
1918
import com.fasterxml.jackson.databind.ObjectMapper;
2019

2120
import io.modelcontextprotocol.schema.McpJacksonCodec;
21+
import io.modelcontextprotocol.schema.McpSchemaCodec;
2222
import io.modelcontextprotocol.schema.McpType;
2323
import io.modelcontextprotocol.spec.McpClientTransport;
24-
import io.modelcontextprotocol.schema.McpSchema;
2524
import io.modelcontextprotocol.schema.McpSchema.JSONRPCMessage;
2625
import io.modelcontextprotocol.util.Assert;
2726

@@ -53,7 +52,7 @@ public class StdioClientTransport implements McpClientTransport {
5352
/** The server process being communicated with */
5453
private Process process;
5554

56-
private McpJacksonCodec jacksonCodec;
55+
private McpSchemaCodec schemaCodec;
5756

5857
/** Scheduler for handling inbound messages from the server process */
5958
private Scheduler inboundScheduler;
@@ -89,15 +88,24 @@ public StdioClientTransport(ServerParameters params) {
8988
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
9089
*/
9190
public StdioClientTransport(ServerParameters params, ObjectMapper objectMapper) {
91+
this(params, new McpJacksonCodec(objectMapper));
92+
}
93+
94+
/**
95+
* Creates a new StdioClientTransport with the specified parameters and ObjectMapper.
96+
* @param params The parameters for configuring the server process
97+
* @param schemaCodec The McpSchemaCodec to use for JSON serialization/deserialization
98+
*/
99+
public StdioClientTransport(ServerParameters params, McpSchemaCodec schemaCodec) {
92100
Assert.notNull(params, "The params can not be null");
93-
Assert.notNull(objectMapper, "The ObjectMapper can not be null");
101+
Assert.notNull(schemaCodec, "The ObjectMapper can not be null");
94102

95103
this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
96104
this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
97105

98106
this.params = params;
99107

100-
this.jacksonCodec = new McpJacksonCodec(objectMapper);
108+
this.schemaCodec = schemaCodec;
101109

102110
this.errorSink = Sinks.many().unicast().onBackpressureBuffer();
103111

@@ -263,7 +271,7 @@ private void startInboundProcessing() {
263271
String line;
264272
while (!isClosing && (line = processReader.readLine()) != null) {
265273
try {
266-
JSONRPCMessage message = jacksonCodec.deserializeJsonRpcMessage(line);
274+
JSONRPCMessage message = schemaCodec.decodeFromString(line);
267275
if (!this.inboundSink.tryEmitNext(message).isSuccess()) {
268276
if (!isClosing) {
269277
logger.error("Failed to enqueue inbound message: {}", message);
@@ -304,7 +312,7 @@ private void startOutboundProcessing() {
304312
.handle((message, s) -> {
305313
if (message != null && !isClosing) {
306314
try {
307-
String jsonMessage = jacksonCodec.getMapper().writeValueAsString(message);
315+
String jsonMessage = schemaCodec.encodeAsString(message);
308316
// Escape any embedded newlines in the JSON message as per spec:
309317
// https://spec.modelcontextprotocol.io/specification/basic/transports/#stdio
310318
// - Messages are delimited by newlines, and MUST NOT contain
@@ -399,7 +407,7 @@ public Sinks.Many<String> getErrorSink() {
399407

400408
@Override
401409
public <T> T unmarshalFrom(Object data, McpType<T> typeRef) {
402-
return jacksonCodec.decodeResult(data, typeRef);
410+
return schemaCodec.decodeResult(data, typeRef);
403411
}
404412

405413
}

mcp-reactor/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.fasterxml.jackson.databind.ObjectMapper;
1919

2020
import io.modelcontextprotocol.schema.McpJacksonCodec;
21+
import io.modelcontextprotocol.schema.McpSchemaCodec;
22+
import io.modelcontextprotocol.schema.McpType;
2123
import io.modelcontextprotocol.session.McpClientSession;
2224
import io.modelcontextprotocol.spec.McpError;
2325
import io.modelcontextprotocol.schema.McpSchema;
@@ -252,7 +254,7 @@ private static class AsyncServerImpl extends McpAsyncServer {
252254

253255
private final McpServerTransportProvider mcpTransportProvider;
254256

255-
private final McpJacksonCodec jacksonCodec;
257+
private final McpSchemaCodec schemaCodec;
256258

257259
private final McpSchema.ServerCapabilities serverCapabilities;
258260

@@ -278,8 +280,13 @@ private static class AsyncServerImpl extends McpAsyncServer {
278280

279281
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
280282
Duration requestTimeout, McpServerFeatures.Async features) {
283+
this(mcpTransportProvider, new McpJacksonCodec(objectMapper), requestTimeout, features);
284+
}
285+
286+
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, McpSchemaCodec schemaCodec,
287+
Duration requestTimeout, McpServerFeatures.Async features) {
281288
this.mcpTransportProvider = mcpTransportProvider;
282-
this.jacksonCodec = new McpJacksonCodec(objectMapper);
289+
this.schemaCodec = schemaCodec;
283290
this.serverInfo = features.serverInfo();
284291
this.serverCapabilities = features.serverCapabilities();
285292
this.instructions = features.instructions();
@@ -484,9 +491,8 @@ private McpServerSession.RequestHandler<McpSchema.ListToolsResult> toolsListRequ
484491

485492
private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler() {
486493
return (exchange, params) -> {
487-
McpSchema.CallToolRequest callToolRequest = jacksonCodec.getMapper()
488-
.convertValue(params, new TypeReference<McpSchema.CallToolRequest>() {
489-
});
494+
McpSchema.CallToolRequest callToolRequest = schemaCodec.decodeResult(params,
495+
McpType.of(McpSchema.CallToolRequest.class));
490496

491497
Optional<McpServerFeatures.AsyncToolSpecification> toolSpecification = this.tools.stream()
492498
.filter(tr -> callToolRequest.name().equals(tr.tool().name()))
@@ -574,9 +580,8 @@ private McpServerSession.RequestHandler<McpSchema.ListResourceTemplatesResult> r
574580

575581
private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHandler() {
576582
return (exchange, params) -> {
577-
McpSchema.ReadResourceRequest resourceRequest = jacksonCodec.getMapper()
578-
.convertValue(params, new TypeReference<McpSchema.ReadResourceRequest>() {
579-
});
583+
McpSchema.ReadResourceRequest resourceRequest = schemaCodec.decodeResult(params,
584+
McpType.of(McpSchema.ReadResourceRequest.class));
580585
var resourceUri = resourceRequest.uri();
581586
McpServerFeatures.AsyncResourceSpecification specification = this.resources.get(resourceUri);
582587
if (specification != null) {
@@ -668,9 +673,8 @@ private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> promptsList
668673

669674
private McpServerSession.RequestHandler<McpSchema.GetPromptResult> promptsGetRequestHandler() {
670675
return (exchange, params) -> {
671-
McpSchema.GetPromptRequest promptRequest = jacksonCodec.getMapper()
672-
.convertValue(params, new TypeReference<McpSchema.GetPromptRequest>() {
673-
});
676+
McpSchema.GetPromptRequest promptRequest = schemaCodec.decodeResult(params,
677+
McpType.of(McpSchema.GetPromptRequest.class));
674678

675679
// Implement prompt retrieval logic here
676680
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
@@ -705,9 +709,8 @@ private McpServerSession.RequestHandler<Object> setLoggerRequestHandler() {
705709
return (exchange, params) -> {
706710
return Mono.defer(() -> {
707711

708-
SetLevelRequest newMinLoggingLevel = jacksonCodec.getMapper()
709-
.convertValue(params, new TypeReference<SetLevelRequest>() {
710-
});
712+
SetLevelRequest newMinLoggingLevel = schemaCodec.decodeResult(params,
713+
McpType.of(SetLevelRequest.class));
711714

712715
exchange.setMinLoggingLevel(newMinLoggingLevel.level());
713716

mcp-reactor/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.concurrent.ConcurrentHashMap;
1212
import java.util.concurrent.atomic.AtomicBoolean;
1313

14-
import com.fasterxml.jackson.core.type.TypeReference;
1514
import com.fasterxml.jackson.databind.ObjectMapper;
1615

1716
import io.modelcontextprotocol.schema.McpJacksonCodec;
@@ -297,7 +296,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
297296
body.append(line);
298297
}
299298

300-
McpSchema.JSONRPCMessage message = jacksonCodec.deserializeJsonRpcMessage(body.toString());
299+
McpSchema.JSONRPCMessage message = jacksonCodec.decodeFromString(body.toString());
301300

302301
// Process the message through the session's handle method
303302
session.handle(message).block(); // Block for Servlet compatibility

mcp-reactor/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
import java.util.concurrent.atomic.AtomicBoolean;
1515
import java.util.function.Function;
1616

17-
import com.fasterxml.jackson.core.type.TypeReference;
1817
import com.fasterxml.jackson.databind.ObjectMapper;
1918

2019
import io.modelcontextprotocol.schema.McpJacksonCodec;
20+
import io.modelcontextprotocol.schema.McpSchemaCodec;
2121
import io.modelcontextprotocol.schema.McpType;
2222
import io.modelcontextprotocol.spec.McpError;
2323
import io.modelcontextprotocol.schema.McpSchema;
@@ -47,7 +47,7 @@ public class StdioServerTransportProvider implements McpServerTransportProvider
4747

4848
private static final Logger logger = LoggerFactory.getLogger(StdioServerTransportProvider.class);
4949

50-
private final McpJacksonCodec jacksonCodec;
50+
private final McpSchemaCodec schemaCodec;
5151

5252
private final InputStream inputStream;
5353

@@ -84,11 +84,23 @@ public StdioServerTransportProvider(ObjectMapper objectMapper) {
8484
* @param outputStream The output stream to write to
8585
*/
8686
public StdioServerTransportProvider(ObjectMapper objectMapper, InputStream inputStream, OutputStream outputStream) {
87-
Assert.notNull(objectMapper, "The ObjectMapper can not be null");
87+
this(new McpJacksonCodec(objectMapper), inputStream, outputStream);
88+
}
89+
90+
/**
91+
* Creates a new StdioServerTransportProvider with the specified ObjectMapper and
92+
* streams.
93+
* @param schemaCodec The McpSchemaCodec to use for JSON serialization/deserialization
94+
* @param inputStream The input stream to read from
95+
* @param outputStream The output stream to write to
96+
*/
97+
public StdioServerTransportProvider(McpSchemaCodec schemaCodec, InputStream inputStream,
98+
OutputStream outputStream) {
99+
Assert.notNull(schemaCodec, "The ObjectMapper can not be null");
88100
Assert.notNull(inputStream, "The InputStream can not be null");
89101
Assert.notNull(outputStream, "The OutputStream can not be null");
90102

91-
this.jacksonCodec = new McpJacksonCodec(objectMapper);
103+
this.schemaCodec = schemaCodec;
92104
this.inputStream = inputStream;
93105
this.outputStream = outputStream;
94106
}
@@ -169,7 +181,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
169181

170182
@Override
171183
public <T> T unmarshalFrom(Object data, McpType<T> typeRef) {
172-
return jacksonCodec.decodeResult(data, typeRef);
184+
return schemaCodec.decodeResult(data, typeRef);
173185
}
174186

175187
@Override
@@ -222,7 +234,7 @@ private void startInboundProcessing() {
222234
logger.debug("Received JSON message: {}", line);
223235

224236
try {
225-
McpSchema.JSONRPCMessage message = jacksonCodec.deserializeJsonRpcMessage(line);
237+
McpSchema.JSONRPCMessage message = schemaCodec.decodeFromString(line);
226238
if (!this.inboundSink.tryEmitNext(message).isSuccess()) {
227239
// logIfNotClosing("Failed to enqueue message");
228240
break;
@@ -265,7 +277,7 @@ private void startOutboundProcessing() {
265277
.handle((message, sink) -> {
266278
if (message != null && !isClosing.get()) {
267279
try {
268-
String jsonMessage = jacksonCodec.getMapper().writeValueAsString(message);
280+
String jsonMessage = schemaCodec.encodeAsString(message);
269281
// Escape any embedded newlines in the JSON message as per spec
270282
jsonMessage = jsonMessage.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n");
271283

0 commit comments

Comments
 (0)