Skip to content

Commit 2b9b581

Browse files
committed
Add streaming support to MVC functional endpoints
Prior to this commit, MVC function endpoints would allow Server Sent Event responses through `ServerResponse.sse()`. While this covers a common use case for streaming responses, other technologies would benefit from a "low-level", unopinionated streaming support. This commit introduces a new `BodyBuilder.stream()` methods that enables such use cases. Developers are in charge of setting the relevant HTTP response headers beforehand, and then can write to the response as raw `String`, `byte[]` or using complex objects and the configured message converters for serialization. Because each streaming protocol has different message separator semantics, it is also the developers' responsibility to flush buffered content to the network once a message has been fully written. Closes gh-32710
1 parent 6c8a859 commit 2b9b581

File tree

4 files changed

+466
-27
lines changed

4 files changed

+466
-27
lines changed

Diff for: spring-webmvc/src/main/java/org/springframework/web/servlet/function/DefaultServerResponseBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ public ServerResponse render(String name, Map<String, ?> model) {
209209
.build();
210210
}
211211

212+
@Override
213+
public ServerResponse stream(Consumer<ServerResponse.StreamBuilder> streamConsumer) {
214+
return StreamingServerResponse.create(this.statusCode, this.headers, this.cookies, streamConsumer, null);
215+
}
212216

213217
private static class WriteFunctionResponse extends AbstractServerResponse {
214218

Diff for: spring-webmvc/src/main/java/org/springframework/web/servlet/function/ServerResponse.java

+108-27
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,87 @@ interface BodyBuilder extends HeadersBuilder<BodyBuilder> {
547547
* @return the built response
548548
*/
549549
ServerResponse render(String name, Map<String, ?> model);
550+
551+
/**
552+
* Create a low-level streaming response; for SSE support, see {@link #sse(Consumer)}.
553+
* <p>The {@link StreamBuilder} provided to the {@code streamConsumer} can
554+
* be used to write to the response in a streaming fashion. Note, the builder is
555+
* responsible for flushing the buffered content to the network.
556+
* <p>For example:
557+
* <pre class="code">
558+
* public ServerResponse handleStream(ServerRequest request) {
559+
* return ServerResponse.ok()
560+
* .contentType(MediaType.APPLICATION_ND_JSON)
561+
* .stream(stream -&gt; {
562+
* try {
563+
* // Write and flush a first item
564+
* stream.write(new Person("John", 51), MediaType.APPLICATION_JSON)
565+
* .write(new byte[]{'\n'})
566+
* .flush();
567+
* // Write and complete with the last item
568+
* stream.write(new Person("Jane", 42), MediaType.APPLICATION_JSON)
569+
* .write(new byte[]{'\n'})
570+
* .complete();
571+
* }
572+
* catch (IOException ex) {
573+
* throw new UncheckedIOException(ex);
574+
* }
575+
* });
576+
* }
577+
* </pre>
578+
* @param streamConsumer consumer that will be provided with a stream builder
579+
* @return the server-side streaming response
580+
* @since 6.2
581+
*/
582+
ServerResponse stream(Consumer<StreamBuilder> streamConsumer);
583+
584+
}
585+
586+
/**
587+
* Defines a builder for async response bodies.
588+
* @since 6.2
589+
* @param <B> the builder subclass
590+
*/
591+
interface AsyncBuilder<B extends AsyncBuilder<B>> {
592+
593+
/**
594+
* Completes the stream with the given error.
595+
*
596+
* <p>The throwable is dispatched back into Spring MVC, and passed to
597+
* its exception handling mechanism. Since the response has
598+
* been committed by this point, the response status can not change.
599+
* @param t the throwable to dispatch
600+
*/
601+
void error(Throwable t);
602+
603+
/**
604+
* Completes the stream.
605+
*/
606+
void complete();
607+
608+
/**
609+
* Register a callback to be invoked when a request times
610+
* out.
611+
* @param onTimeout the callback to invoke on timeout
612+
* @return this builder
613+
*/
614+
B onTimeout(Runnable onTimeout);
615+
616+
/**
617+
* Register a callback to be invoked when an error occurs during
618+
* processing.
619+
* @param onError the callback to invoke on error
620+
* @return this builder
621+
*/
622+
B onError(Consumer<Throwable> onError);
623+
624+
/**
625+
* Register a callback to be invoked when the request completes.
626+
* @param onCompletion the callback to invoked on completion
627+
* @return this builder
628+
*/
629+
B onComplete(Runnable onCompletion);
630+
550631
}
551632

552633

@@ -555,7 +636,7 @@ interface BodyBuilder extends HeadersBuilder<BodyBuilder> {
555636
*
556637
* @since 5.3.2
557638
*/
558-
interface SseBuilder {
639+
interface SseBuilder extends AsyncBuilder<SseBuilder> {
559640

560641
/**
561642
* Sends the given object as a server-sent event.
@@ -618,45 +699,45 @@ interface SseBuilder {
618699
*/
619700
void data(Object object) throws IOException;
620701

621-
/**
622-
* Completes the event stream with the given error.
623-
*
624-
* <p>The throwable is dispatched back into Spring MVC, and passed to
625-
* its exception handling mechanism. Since the response has
626-
* been committed by this point, the response status can not change.
627-
* @param t the throwable to dispatch
628-
*/
629-
void error(Throwable t);
702+
}
630703

631-
/**
632-
* Completes the event stream.
633-
*/
634-
void complete();
704+
/**
705+
* Defines a builder for a streaming response body.
706+
*
707+
* @since 6.2
708+
*/
709+
interface StreamBuilder extends AsyncBuilder<StreamBuilder> {
635710

636711
/**
637-
* Register a callback to be invoked when an SSE request times
638-
* out.
639-
* @param onTimeout the callback to invoke on timeout
712+
* Write the given object to the response stream, without flushing.
713+
* Strings will be sent as UTF-8 encoded bytes, byte arrays will be sent as-is,
714+
* and other objects will be converted into JSON using
715+
* {@linkplain HttpMessageConverter message converters}.
716+
* @param object the object to send as data
640717
* @return this builder
718+
* @throws IOException in case of I/O errors
641719
*/
642-
SseBuilder onTimeout(Runnable onTimeout);
720+
StreamBuilder write(Object object) throws IOException;
643721

644722
/**
645-
* Register a callback to be invoked when an error occurs during SSE
646-
* processing.
647-
* @param onError the callback to invoke on error
723+
* Write the given object to the response stream, without flushing.
724+
* Strings will be sent as UTF-8 encoded bytes, byte arrays will be sent as-is,
725+
* and other objects will be converted into JSON using
726+
* {@linkplain HttpMessageConverter message converters}.
727+
* @param object the object to send as data
728+
* @param mediaType the media type to use for encoding the provided data
648729
* @return this builder
730+
* @throws IOException in case of I/O errors
649731
*/
650-
SseBuilder onError(Consumer<Throwable> onError);
732+
StreamBuilder write(Object object, @Nullable MediaType mediaType) throws IOException;
651733

652734
/**
653-
* Register a callback to be invoked when the SSE request completes.
654-
* @param onCompletion the callback to invoked on completion
655-
* @return this builder
735+
* Flush the buffered response stream content to the network.
736+
* @throws IOException in case of I/O errors
656737
*/
657-
SseBuilder onComplete(Runnable onCompletion);
658-
}
738+
void flush() throws IOException;
659739

740+
}
660741

661742
/**
662743
* Defines the context used during the {@link #writeTo(HttpServletRequest, HttpServletResponse, Context)}.

0 commit comments

Comments
 (0)