Skip to content

Commit a780cad

Browse files
committed
Updates to RSocket[Strategies|Requester] defaults
1. RSocketStrategies hooks in the basic codecs from spring-core by default. Now that we have support for composite metadata, it makes sense to have multiple codecs available. 2. RSocketStrategies is pre-configured with NettyDataBufferFactory. 3. DefaultRSocketRequesterBuilder configures RSocket with a frame decoder that matches the DataBufferFactory choice, i.e. ensuring consistency of zero copy vs default (copy) choice. 4. DefaultRSocketRequesterBuilder now tries to find a single non-basic decoder to select a default data MimeType (e.g. CBOR), or otherwise fall back on the first default decoder (e.g. String). See gh-23314
1 parent c3c152f commit a780cad

File tree

5 files changed

+286
-85
lines changed

5 files changed

+286
-85
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
import java.net.URI;
2020
import java.util.ArrayList;
21+
import java.util.Collections;
2122
import java.util.List;
2223
import java.util.function.Consumer;
23-
import java.util.stream.Stream;
2424

2525
import io.rsocket.RSocketFactory;
2626
import io.rsocket.frame.decoder.PayloadDecoder;
@@ -29,6 +29,9 @@
2929
import io.rsocket.transport.netty.client.WebsocketClientTransport;
3030
import reactor.core.publisher.Mono;
3131

32+
import org.springframework.core.codec.Decoder;
33+
import org.springframework.core.codec.StringDecoder;
34+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
3235
import org.springframework.lang.Nullable;
3336
import org.springframework.util.Assert;
3437
import org.springframework.util.MimeType;
@@ -110,7 +113,10 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
110113
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
111114
rsocketFactory.dataMimeType(dataMimeType.toString());
112115
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
113-
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
116+
117+
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
118+
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
119+
}
114120

115121
this.rsocketFactoryConfigurers.forEach(configurer -> {
116122
configurer.configureWithStrategies(rsocketStrategies);
@@ -139,16 +145,35 @@ private MimeType getDataMimeType(RSocketStrategies strategies) {
139145
if (this.dataMimeType != null) {
140146
return this.dataMimeType;
141147
}
142-
return Stream
143-
.concat(
144-
strategies.encoders().stream()
145-
.flatMap(encoder -> encoder.getEncodableMimeTypes().stream()),
146-
strategies.decoders().stream()
147-
.flatMap(encoder -> encoder.getDecodableMimeTypes().stream())
148-
)
149-
.filter(MimeType::isConcrete)
150-
.findFirst()
151-
.orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use."));
148+
// Look for non-basic Decoder (e.g. CBOR, Protobuf)
149+
MimeType selected = null;
150+
List<Decoder<?>> decoders = strategies.decoders();
151+
for (Decoder<?> candidate : decoders) {
152+
if (!isCoreCodec(candidate) && !candidate.getDecodableMimeTypes().isEmpty()) {
153+
Assert.state(selected == null,
154+
() -> "Cannot select default data MimeType based on configured decoders: " + decoders);
155+
selected = getMimeType(candidate);
156+
}
157+
}
158+
if (selected != null) {
159+
return selected;
160+
}
161+
// Fall back on 1st decoder (e.g. String)
162+
for (Decoder<?> decoder : decoders) {
163+
if (!decoder.getDecodableMimeTypes().isEmpty()) {
164+
return getMimeType(decoder);
165+
}
166+
}
167+
throw new IllegalArgumentException("Failed to select data MimeType to use.");
168+
}
169+
170+
private static boolean isCoreCodec(Object codec) {
171+
return codec.getClass().getPackage().equals(StringDecoder.class.getPackage());
172+
}
173+
174+
private static MimeType getMimeType(Decoder<?> decoder) {
175+
MimeType mimeType = decoder.getDecodableMimeTypes().get(0);
176+
return new MimeType(mimeType, Collections.emptyMap());
152177
}
153178

154179
}

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,21 @@
2222
import java.util.List;
2323
import java.util.function.Consumer;
2424

25+
import io.netty.buffer.PooledByteBufAllocator;
26+
2527
import org.springframework.core.ReactiveAdapterRegistry;
28+
import org.springframework.core.codec.ByteArrayDecoder;
29+
import org.springframework.core.codec.ByteArrayEncoder;
30+
import org.springframework.core.codec.ByteBufferDecoder;
31+
import org.springframework.core.codec.ByteBufferEncoder;
32+
import org.springframework.core.codec.CharSequenceEncoder;
33+
import org.springframework.core.codec.DataBufferDecoder;
34+
import org.springframework.core.codec.DataBufferEncoder;
2635
import org.springframework.core.codec.Decoder;
2736
import org.springframework.core.codec.Encoder;
37+
import org.springframework.core.codec.StringDecoder;
2838
import org.springframework.core.io.buffer.DataBufferFactory;
29-
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
39+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
3040
import org.springframework.lang.Nullable;
3141
import org.springframework.util.Assert;
3242

@@ -90,18 +100,33 @@ static class DefaultRSocketStrategiesBuilder implements RSocketStrategies.Builde
90100
private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
91101

92102
@Nullable
93-
private DataBufferFactory dataBufferFactory;
103+
private DataBufferFactory bufferFactory;
104+
105+
106+
DefaultRSocketStrategiesBuilder() {
107+
108+
// Order of decoders may be significant for default data MimeType
109+
// selection in RSocketRequester.Builder
94110

95-
public DefaultRSocketStrategiesBuilder() {
111+
this.decoders.add(StringDecoder.allMimeTypes());
112+
this.decoders.add(new ByteBufferDecoder());
113+
this.decoders.add(new ByteArrayDecoder());
114+
this.decoders.add(new DataBufferDecoder());
115+
116+
this.encoders.add(CharSequenceEncoder.allMimeTypes());
117+
this.encoders.add(new ByteBufferEncoder());
118+
this.encoders.add(new ByteArrayEncoder());
119+
this.encoders.add(new DataBufferEncoder());
96120
}
97121

98-
public DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
122+
DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
99123
this.encoders.addAll(other.encoders());
100124
this.decoders.addAll(other.decoders());
101125
this.adapterRegistry = other.reactiveAdapterRegistry();
102-
this.dataBufferFactory = other.dataBufferFactory();
126+
this.bufferFactory = other.dataBufferFactory();
103127
}
104128

129+
105130
@Override
106131
public Builder encoder(Encoder<?>... encoders) {
107132
this.encoders.addAll(Arrays.asList(encoders));
@@ -135,14 +160,19 @@ public Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry) {
135160

136161
@Override
137162
public Builder dataBufferFactory(DataBufferFactory bufferFactory) {
138-
this.dataBufferFactory = bufferFactory;
163+
this.bufferFactory = bufferFactory;
139164
return this;
140165
}
141166

142167
@Override
143168
public RSocketStrategies build() {
144-
return new DefaultRSocketStrategies(this.encoders, this.decoders, this.adapterRegistry,
145-
this.dataBufferFactory != null ? this.dataBufferFactory : new DefaultDataBufferFactory());
169+
return new DefaultRSocketStrategies(
170+
this.encoders, this.decoders, this.adapterRegistry, initBufferFactory());
171+
}
172+
173+
private DataBufferFactory initBufferFactory() {
174+
return this.bufferFactory != null ? this.bufferFactory :
175+
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
146176
}
147177
}
148178

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.springframework.core.ParameterizedTypeReference;
3030
import org.springframework.core.ReactiveAdapterRegistry;
31+
import org.springframework.core.codec.Decoder;
3132
import org.springframework.lang.Nullable;
3233
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
3334
import org.springframework.util.MimeType;
@@ -125,32 +126,32 @@ static RSocketRequester wrap(
125126
interface Builder {
126127

127128
/**
128-
* Configure the MimeType for payload data which is then specified
129-
* on the {@code SETUP} frame and applies to the whole connection.
130-
* <p>By default this is set to the first concrete mime type supported
131-
* by the configured encoders and decoders.
132-
* @param mimeType the data MimeType to use
129+
* Configure the payload data MimeType to specify on the {@code SETUP}
130+
* frame that applies to the whole connection.
131+
* <p>If this is not set, the builder will try to select the mime type
132+
* based on the presence of a single
133+
* {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default}
134+
* {@code Decoder}, or the first default decoder otherwise
135+
* (i.e. {@code String}) if no others are configured.
133136
*/
134137
RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
135138

136139
/**
137-
* Configure the MimeType for payload metadata which is then specified
138-
* on the {@code SETUP} frame and applies to the whole connection.
140+
* Configure the payload metadata MimeType to specify on the {@code SETUP}
141+
* frame and applies to the whole connection.
139142
* <p>By default this is set to
140143
* {@code "message/x.rsocket.composite-metadata.v0"} in which case the
141144
* route, if provided, is encoded as a
142-
* {@code "message/x.rsocket.routing.v0"} metadata entry, potentially
143-
* with other metadata entries added too. If this is set to any other
144-
* mime type, and a route is provided, it is assumed the mime type is
145-
* for the route.
146-
* @param mimeType the data MimeType to use
145+
* {@code "message/x.rsocket.routing.v0"} composite metadata entry.
146+
* For any other MimeType, it is assumed to be the MimeType for the
147+
* route, if provided.
147148
*/
148149
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
149150

150151
/**
151-
* Set the {@link RSocketStrategies} to use for access to encoders,
152-
* decoders, and a factory for {@code DataBuffer's}.
153-
* @param strategies the codecs strategies to use
152+
* Set the {@link RSocketStrategies} to use.
153+
* <p>By default this is set to {@code RSocketStrategies.builder().build()}
154+
* but may be further customized via {@link #rsocketStrategies(Consumer)}.
154155
*/
155156
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
156157

@@ -159,7 +160,6 @@ interface Builder {
159160
* <p>By default this starts out with an empty builder, i.e.
160161
* {@link RSocketStrategies#builder()}, but the strategies can also be
161162
* set via {@link #rsocketStrategies(RSocketStrategies)}.
162-
* @param configurer the configurer to apply
163163
*/
164164
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
165165

@@ -172,7 +172,6 @@ interface Builder {
172172
* {@code ClientRSocketFactory}. Use the shortcuts on this builder
173173
* instead since the created {@code RSocketRequester} needs to be aware
174174
* of those settings.
175-
* @param configurer consumer to customize the factory
176175
* @see AnnotationClientResponderConfigurer
177176
*/
178177
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
import java.util.List;
2020
import java.util.function.Consumer;
2121

22-
import io.netty.buffer.PooledByteBufAllocator;
22+
import io.rsocket.Payload;
23+
import io.rsocket.RSocketFactory.ClientRSocketFactory;
24+
import io.rsocket.RSocketFactory.ServerRSocketFactory;
2325

2426
import org.springframework.core.ReactiveAdapterRegistry;
2527
import org.springframework.core.ResolvableType;
2628
import org.springframework.core.codec.Decoder;
2729
import org.springframework.core.codec.Encoder;
2830
import org.springframework.core.io.buffer.DataBufferFactory;
31+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
32+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
2933
import org.springframework.lang.Nullable;
3034
import org.springframework.util.MimeType;
3135

@@ -120,24 +124,28 @@ default Builder mutate() {
120124
interface Builder {
121125

122126
/**
123-
* Add encoders to use for serializing Objects.
124-
* <p>By default this is empty.
127+
* Append to the list of encoders to use for serializing Objects to the
128+
* data or metadata of a {@link Payload}.
129+
* <p>By default this is initialized with encoders for {@code String},
130+
* {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}.
125131
*/
126132
Builder encoder(Encoder<?>... encoder);
127133

128134
/**
129-
* Access and manipulate the list of configured {@link #encoder encoders}.
135+
* Apply the consumer to the list of configured encoders, immediately.
130136
*/
131137
Builder encoders(Consumer<List<Encoder<?>>> consumer);
132138

133139
/**
134-
* Add decoders for de-serializing Objects.
135-
* <p>By default this is empty.
140+
* Append to the list of decoders to use for de-serializing Objects from
141+
* the data or metadata of a {@link Payload}.
142+
* <p>By default this is initialized with decoders for {@code String},
143+
* {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}.
136144
*/
137145
Builder decoder(Decoder<?>... decoder);
138146

139147
/**
140-
* Access and manipulate the list of configured {@link #encoder decoders}.
148+
* Apply the consumer to the list of configured decoders, immediately.
141149
*/
142150
Builder decoders(Consumer<List<Decoder<?>>> consumer);
143151

@@ -146,28 +154,23 @@ interface Builder {
146154
* to adapt to, and/or determine the semantics of a given
147155
* {@link org.reactivestreams.Publisher Publisher}.
148156
* <p>By default this {@link ReactiveAdapterRegistry#getSharedInstance()}.
149-
* @param registry the registry to use
150157
*/
151158
Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
152159

153160
/**
154-
* Configure the DataBufferFactory to use for allocating buffers, for
155-
* example when preparing requests or when responding. The choice here
156-
* must be aligned with the frame decoder configured in
157-
* {@link io.rsocket.RSocketFactory}.
158-
* <p>By default this property is an instance of
159-
* {@link org.springframework.core.io.buffer.DefaultDataBufferFactory
160-
* DefaultDataBufferFactory} matching to the default frame decoder in
161-
* {@link io.rsocket.RSocketFactory} which copies the payload. This
162-
* comes at cost to performance but does not require reference counting
163-
* and eliminates possibility for memory leaks.
164-
* <p>To switch to a zero-copy strategy,
165-
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configure RSocket</a>
166-
* accordingly, and then configure this property with an instance of
167-
* {@link org.springframework.core.io.buffer.NettyDataBufferFactory
168-
* NettyDataBufferFactory} with a pooled allocator such as
169-
* {@link PooledByteBufAllocator#DEFAULT}.
170-
* @param bufferFactory the DataBufferFactory to use
161+
* Configure the DataBufferFactory to use for allocating buffers when
162+
* preparing requests or creating responses.
163+
* <p>By default this is set to {@link NettyDataBufferFactory} with
164+
* pooled, allocated buffers for zero copy. RSocket must also be
165+
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configured</a>
166+
* for zero copy. For client setup, {@link RSocketRequester.Builder}
167+
* adapts automatically to the {@code DataBufferFactory} configured
168+
* here, and sets the frame decoder in {@link ClientRSocketFactory
169+
* ClientRSocketFactory} accordingly. For server setup, the
170+
* {@link ServerRSocketFactory ServerRSocketFactory} must be configured
171+
* accordingly too for zero copy.
172+
* <p>If using {@link DefaultDataBufferFactory} instead, there is no
173+
* need for related config changes in RSocket.
171174
*/
172175
Builder dataBufferFactory(DataBufferFactory bufferFactory);
173176

0 commit comments

Comments
 (0)