Skip to content

Commit 6e7da62

Browse files
committed
Switch to Reactor snapshots and remove workaround
Following the 5.2 M1 release we can switch back to Reactor snapshots and remove the workaround for a fix coming in Reactor Core 3.2.9.
1 parent bb28477 commit 6e7da62

File tree

4 files changed

+17
-22
lines changed

4 files changed

+17
-22
lines changed

build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ ext {
4040
log4jVersion = "2.11.2"
4141
nettyVersion = "4.1.34.Final"
4242
quartzVersion = "2.3.1"
43-
reactorVersion = "Californium-SR6"
43+
reactorVersion = "Californium-BUILD-SNAPSHOT"
4444
rxjavaVersion = "1.3.8"
4545
rxjavaAdapterVersion = "1.2.1"
4646
rxjava2Version = "2.2.8"
@@ -149,6 +149,7 @@ configure(allprojects) { project ->
149149

150150
repositories {
151151
maven { url "https://repo.spring.io/libs-release" }
152+
maven { url "https://repo.spring.io/snapshot" } // Reactor
152153
maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket
153154
mavenLocal()
154155
}

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,6 @@ public abstract class DataBufferUtils {
5757

5858
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
5959

60-
/**
61-
* Workaround to disable use of pooled buffers:
62-
* https://github.com/reactor/reactor-core/issues/1634.
63-
*/
64-
private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
65-
6660

6761
//---------------------------------------------------------------------
6862
// Reading
@@ -141,14 +135,12 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(
141135
Assert.isTrue(position >= 0, "'position' must be >= 0");
142136
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
143137

144-
DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory;
145-
146138
Flux<DataBuffer> flux = Flux.using(channelSupplier,
147139
channel -> Flux.create(sink -> {
148140
ReadCompletionHandler handler =
149-
new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize);
141+
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
150142
sink.onDispose(handler::dispose);
151-
DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize);
143+
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize);
152144
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
153145
channel.read(byteBuffer, position, dataBuffer, handler);
154146
}),

spring-messaging/spring-messaging.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ dependencyManagement {
77
}
88
}
99

10-
def rsocketVersion = "0.12.1-RC3"
10+
def rsocketVersion = "0.12.1-RC4-SNAPSHOT"
1111

1212
dependencies {
1313
compile(project(":spring-beans"))

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,10 @@ public RSocketStrategies rsocketStrategies() {
238238

239239

240240
/**
241-
* Similar {@link org.springframework.core.io.buffer.LeakAwareDataBufferFactory}
242-
* but extends {@link NettyDataBufferFactory} rather than rely on
243-
* decoration, since {@link PayloadUtils} does instanceof checks.
241+
* Unlike {@link org.springframework.core.io.buffer.LeakAwareDataBufferFactory}
242+
* this one is an instance of {@link NettyDataBufferFactory} which is necessary
243+
* since {@link PayloadUtils} does instanceof checks, and that also allows
244+
* intercepting {@link NettyDataBufferFactory#wrap(ByteBuf)}.
244245
*/
245246
private static class LeakAwareNettyDataBufferFactory extends NettyDataBufferFactory {
246247

@@ -277,32 +278,33 @@ void reset() {
277278

278279
@Override
279280
public NettyDataBuffer allocateBuffer() {
280-
return (NettyDataBuffer) record(super.allocateBuffer());
281+
return (NettyDataBuffer) recordHint(super.allocateBuffer());
281282
}
282283

283284
@Override
284285
public NettyDataBuffer allocateBuffer(int initialCapacity) {
285-
return (NettyDataBuffer) record(super.allocateBuffer(initialCapacity));
286+
return (NettyDataBuffer) recordHint(super.allocateBuffer(initialCapacity));
286287
}
287288

288289
@Override
289290
public NettyDataBuffer wrap(ByteBuf byteBuf) {
290291
NettyDataBuffer dataBuffer = super.wrap(byteBuf);
291292
if (byteBuf != Unpooled.EMPTY_BUFFER) {
292-
record(dataBuffer);
293+
recordHint(dataBuffer);
293294
}
294295
return dataBuffer;
295296
}
296297

297298
@Override
298299
public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
299-
return record(super.join(dataBuffers));
300+
return recordHint(super.join(dataBuffers));
300301
}
301302

302-
private DataBuffer record(DataBuffer buffer) {
303-
this.created.add(new DataBufferLeakInfo(buffer, new AssertionError(String.format(
303+
private DataBuffer recordHint(DataBuffer buffer) {
304+
AssertionError error = new AssertionError(String.format(
304305
"DataBuffer leak: {%s} {%s} not released.%nStacktrace at buffer creation: ", buffer,
305-
ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer())))));
306+
ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer())));
307+
this.created.add(new DataBufferLeakInfo(buffer, error));
306308
return buffer;
307309
}
308310
}

0 commit comments

Comments
 (0)