Skip to content

Commit bb28477

Browse files
committed
Merge branch '5.1.x'
2 parents 88e3b84 + bd2c213 commit bb28477

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -532,13 +532,9 @@ public void completed(Integer read, DataBuffer dataBuffer) {
532532
long pos = this.position.addAndGet(read);
533533
dataBuffer.writePosition(read);
534534
this.sink.next(dataBuffer);
535-
// It's possible for cancellation to happen right before the push into the sink
535+
// onNext may have led to onCancel (e.g. downstream takeUntil)
536536
if (this.disposed.get()) {
537-
// TODO:
538-
// This is not ideal since we already passed the buffer into the sink and
539-
// releasing may cause something reading to fail. Maybe we won't have to
540-
// do this after https://github.com/reactor/reactor-core/issues/1634
541-
complete(dataBuffer);
537+
complete();
542538
}
543539
else {
544540
DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
@@ -547,12 +543,12 @@ public void completed(Integer read, DataBuffer dataBuffer) {
547543
}
548544
}
549545
else {
550-
complete(dataBuffer);
546+
release(dataBuffer);
547+
complete();
551548
}
552549
}
553550

554-
private void complete(DataBuffer dataBuffer) {
555-
release(dataBuffer);
551+
private void complete() {
556552
this.sink.complete();
557553
closeChannel(this.channel);
558554
}

spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
3737

3838
private final LeakAwareDataBufferFactory dataBufferFactory;
3939

40-
private int refCount = 1;
41-
4240

4341
LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) {
4442
Assert.notNull(delegate, "Delegate must not be null");
@@ -67,19 +65,24 @@ AssertionError leakError() {
6765

6866
@Override
6967
public boolean isAllocated() {
70-
return this.refCount > 0;
68+
return this.delegate instanceof PooledDataBuffer &&
69+
((PooledDataBuffer) this.delegate).isAllocated();
7170
}
7271

7372
@Override
7473
public PooledDataBuffer retain() {
75-
this.refCount++;
74+
if (this.delegate instanceof PooledDataBuffer) {
75+
((PooledDataBuffer) this.delegate).retain();
76+
}
7677
return this;
7778
}
7879

7980
@Override
8081
public boolean release() {
81-
this.refCount--;
82-
return this.refCount == 0;
82+
if (this.delegate instanceof PooledDataBuffer) {
83+
((PooledDataBuffer) this.delegate).release();
84+
}
85+
return isAllocated();
8386
}
8487

8588
// delegation

spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,16 @@ public void checkForLeaks() {
9999

100100
@Override
101101
public DataBuffer allocateBuffer() {
102-
return allocateBufferInternal(this.delegate.allocateBuffer());
102+
return createLeakAwareDataBuffer(this.delegate.allocateBuffer());
103103
}
104104

105105
@Override
106106
public DataBuffer allocateBuffer(int initialCapacity) {
107-
return allocateBufferInternal(this.delegate.allocateBuffer(initialCapacity));
107+
return createLeakAwareDataBuffer(this.delegate.allocateBuffer(initialCapacity));
108108
}
109109

110110
@NotNull
111-
private DataBuffer allocateBufferInternal(DataBuffer delegateBuffer) {
111+
private DataBuffer createLeakAwareDataBuffer(DataBuffer delegateBuffer) {
112112
LeakAwareDataBuffer dataBuffer = new LeakAwareDataBuffer(delegateBuffer, this);
113113
this.created.add(dataBuffer);
114114
return dataBuffer;

0 commit comments

Comments
 (0)