9
9
10
10
import com .fasterxml .jackson .core .type .TypeReference ;
11
11
import com .fasterxml .jackson .databind .ObjectMapper ;
12
- import io .modelcontextprotocol .spec .ClientMcpTransport ;
12
+ import io .modelcontextprotocol .spec .McpClientTransport ;
13
13
import io .modelcontextprotocol .spec .McpError ;
14
14
import io .modelcontextprotocol .spec .McpSchema ;
15
15
import io .modelcontextprotocol .spec .McpSchema .JSONRPCMessage ;
58
58
* "https://spec.modelcontextprotocol.io/specification/basic/transports/#http-with-sse">MCP
59
59
* HTTP with SSE Transport Specification</a>
60
60
*/
61
- public class WebFluxSseClientTransport implements ClientMcpTransport {
61
+ public class WebFluxSseClientTransport implements McpClientTransport {
62
62
63
63
private static final Logger logger = LoggerFactory .getLogger (WebFluxSseClientTransport .class );
64
64
@@ -79,7 +79,7 @@ public class WebFluxSseClientTransport implements ClientMcpTransport {
79
79
* Default SSE endpoint path as specified by the MCP transport specification. This
80
80
* endpoint is used to establish the SSE connection with the server.
81
81
*/
82
- private static final String SSE_ENDPOINT = "/sse" ;
82
+ private static final String DEFAULT_SSE_ENDPOINT = "/sse" ;
83
83
84
84
/**
85
85
* Type reference for parsing SSE events containing string data.
@@ -117,6 +117,12 @@ public class WebFluxSseClientTransport implements ClientMcpTransport {
117
117
*/
118
118
protected final Sinks .One <String > messageEndpointSink = Sinks .one ();
119
119
120
+ /**
121
+ * The SSE endpoint URI provided by the server. Used for sending outbound messages via
122
+ * HTTP POST requests.
123
+ */
124
+ private String sseEndpoint ;
125
+
120
126
/**
121
127
* Constructs a new SseClientTransport with the specified WebClient builder. Uses a
122
128
* default ObjectMapper instance for JSON processing.
@@ -137,11 +143,27 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
137
143
* @throws IllegalArgumentException if either parameter is null
138
144
*/
139
145
public WebFluxSseClientTransport (WebClient .Builder webClientBuilder , ObjectMapper objectMapper ) {
146
+ this (webClientBuilder , objectMapper , DEFAULT_SSE_ENDPOINT );
147
+ }
148
+
149
+ /**
150
+ * Constructs a new SseClientTransport with the specified WebClient builder and
151
+ * ObjectMapper. Initializes both inbound and outbound message processing pipelines.
152
+ * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
153
+ * instance
154
+ * @param objectMapper the ObjectMapper to use for JSON processing
155
+ * @param sseEndpoint the SSE endpoint URI to use for establishing the connection
156
+ * @throws IllegalArgumentException if either parameter is null
157
+ */
158
+ public WebFluxSseClientTransport (WebClient .Builder webClientBuilder , ObjectMapper objectMapper ,
159
+ String sseEndpoint ) {
140
160
Assert .notNull (objectMapper , "ObjectMapper must not be null" );
141
161
Assert .notNull (webClientBuilder , "WebClient.Builder must not be null" );
162
+ Assert .hasText (sseEndpoint , "SSE endpoint must not be null or empty" );
142
163
143
164
this .objectMapper = objectMapper ;
144
165
this .webClient = webClientBuilder .build ();
166
+ this .sseEndpoint = sseEndpoint ;
145
167
}
146
168
147
169
/**
@@ -254,7 +276,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
254
276
protected Flux <ServerSentEvent <String >> eventStream () {// @formatter:off
255
277
return this .webClient
256
278
.get ()
257
- .uri (SSE_ENDPOINT )
279
+ .uri (this . sseEndpoint )
258
280
.accept (MediaType .TEXT_EVENT_STREAM )
259
281
.retrieve ()
260
282
.bodyToFlux (SSE_TYPE )
@@ -321,4 +343,66 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
321
343
return this .objectMapper .convertValue (data , typeRef );
322
344
}
323
345
346
+ /**
347
+ * Creates a new builder for {@link WebFluxSseClientTransport}.
348
+ * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
349
+ * instance
350
+ * @return a new builder instance
351
+ */
352
+ public static Builder builder (WebClient .Builder webClientBuilder ) {
353
+ return new Builder (webClientBuilder );
354
+ }
355
+
356
+ /**
357
+ * Builder for {@link WebFluxSseClientTransport}.
358
+ */
359
+ public static class Builder {
360
+
361
+ private final WebClient .Builder webClientBuilder ;
362
+
363
+ private String sseEndpoint = DEFAULT_SSE_ENDPOINT ;
364
+
365
+ private ObjectMapper objectMapper = new ObjectMapper ();
366
+
367
+ /**
368
+ * Creates a new builder with the specified WebClient.Builder.
369
+ * @param webClientBuilder the WebClient.Builder to use
370
+ */
371
+ public Builder (WebClient .Builder webClientBuilder ) {
372
+ Assert .notNull (webClientBuilder , "WebClient.Builder must not be null" );
373
+ this .webClientBuilder = webClientBuilder ;
374
+ }
375
+
376
+ /**
377
+ * Sets the SSE endpoint path.
378
+ * @param sseEndpoint the SSE endpoint path
379
+ * @return this builder
380
+ */
381
+ public Builder sseEndpoint (String sseEndpoint ) {
382
+ Assert .hasText (sseEndpoint , "sseEndpoint must not be empty" );
383
+ this .sseEndpoint = sseEndpoint ;
384
+ return this ;
385
+ }
386
+
387
+ /**
388
+ * Sets the object mapper for JSON serialization/deserialization.
389
+ * @param objectMapper the object mapper
390
+ * @return this builder
391
+ */
392
+ public Builder objectMapper (ObjectMapper objectMapper ) {
393
+ Assert .notNull (objectMapper , "objectMapper must not be null" );
394
+ this .objectMapper = objectMapper ;
395
+ return this ;
396
+ }
397
+
398
+ /**
399
+ * Builds a new {@link WebFluxSseClientTransport} instance.
400
+ * @return a new transport instance
401
+ */
402
+ public WebFluxSseClientTransport build () {
403
+ return new WebFluxSseClientTransport (webClientBuilder , objectMapper , sseEndpoint );
404
+ }
405
+
406
+ }
407
+
324
408
}
0 commit comments