Skip to content

Commit 488a1d4

Browse files
committed
Review DataBufferUtils for cancellation memory leaks
Issue: SPR-17408
1 parent 611019b commit 488a1d4

File tree

3 files changed

+293
-110
lines changed

3 files changed

+293
-110
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

+57-20
Original file line numberDiff line numberDiff line change
@@ -246,23 +246,12 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
246246
Assert.notNull(channel, "'channel' must not be null");
247247

248248
Flux<DataBuffer> flux = Flux.from(source);
249-
return Flux.create(sink ->
250-
flux.subscribe(dataBuffer -> {
251-
try {
252-
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
253-
while (byteBuffer.hasRemaining()) {
254-
channel.write(byteBuffer);
255-
}
256-
sink.next(dataBuffer);
257-
}
258-
catch (IOException ex) {
259-
sink.next(dataBuffer);
260-
sink.error(ex);
261-
}
262-
263-
},
264-
sink::error,
265-
sink::complete));
249+
return Flux.create(sink -> {
250+
WritableByteChannelSubscriber subscriber =
251+
new WritableByteChannelSubscriber(sink, channel);
252+
sink.onDispose(subscriber);
253+
flux.subscribe(subscriber);
254+
});
266255
}
267256

268257
/**
@@ -305,11 +294,15 @@ public static Flux<DataBuffer> write(
305294
Assert.isTrue(position >= 0, "'position' must be >= 0");
306295

307296
Flux<DataBuffer> flux = Flux.from(source);
308-
return Flux.create(sink ->
309-
flux.subscribe(new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position)));
297+
return Flux.create(sink -> {
298+
AsynchronousFileChannelWriteCompletionHandler completionHandler =
299+
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
300+
sink.onDispose(completionHandler);
301+
flux.subscribe(completionHandler);
302+
});
310303
}
311304

312-
private static void closeChannel(@Nullable Channel channel) {
305+
static void closeChannel(@Nullable Channel channel) {
313306
if (channel != null && channel.isOpen()) {
314307
try {
315308
channel.close();
@@ -554,6 +547,50 @@ public void dispose() {
554547
}
555548

556549

550+
private static class WritableByteChannelSubscriber extends BaseSubscriber<DataBuffer> {
551+
552+
private final FluxSink<DataBuffer> sink;
553+
554+
private final WritableByteChannel channel;
555+
556+
public WritableByteChannelSubscriber(FluxSink<DataBuffer> sink, WritableByteChannel channel) {
557+
this.sink = sink;
558+
this.channel = channel;
559+
}
560+
561+
@Override
562+
protected void hookOnSubscribe(Subscription subscription) {
563+
request(1);
564+
}
565+
566+
@Override
567+
protected void hookOnNext(DataBuffer dataBuffer) {
568+
try {
569+
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
570+
while (byteBuffer.hasRemaining()) {
571+
this.channel.write(byteBuffer);
572+
}
573+
this.sink.next(dataBuffer);
574+
request(1);
575+
}
576+
catch (IOException ex) {
577+
this.sink.next(dataBuffer);
578+
this.sink.error(ex);
579+
}
580+
}
581+
582+
@Override
583+
protected void hookOnError(Throwable throwable) {
584+
this.sink.error(throwable);
585+
}
586+
587+
@Override
588+
protected void hookOnComplete() {
589+
this.sink.complete();
590+
}
591+
}
592+
593+
557594
private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber<DataBuffer>
558595
implements CompletionHandler<Integer, ByteBuffer> {
559596

spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.rules.Verifier;
3131
import org.junit.runner.RunWith;
3232
import org.junit.runners.Parameterized;
33+
import reactor.core.publisher.Mono;
3334

3435
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
3536

@@ -69,6 +70,10 @@ protected DataBuffer stringBuffer(String value) {
6970
return byteBuffer(value.getBytes(StandardCharsets.UTF_8));
7071
}
7172

73+
protected Mono<DataBuffer> deferStringBuffer(String value) {
74+
return Mono.defer(() -> Mono.just(stringBuffer(value)));
75+
}
76+
7277
protected DataBuffer byteBuffer(byte[] value) {
7378
DataBuffer buffer = this.bufferFactory.allocateBuffer(value.length);
7479
buffer.write(value);

0 commit comments

Comments
 (0)