Skip to content

Commit 6fcb51d

Browse files
authored
Fix issue with pipeline releasing bytes early (#54458)
Currently there is an issue with the InboundPipeline releasing bytes earlier than appropriate. This can lead to the bytes being reused before the message is handled. This commit fixes that issue and adds a test to detect when it is occurring.
1 parent 9600073 commit 6fcb51d

File tree

2 files changed

+72
-29
lines changed

2 files changed

+72
-29
lines changed

server/src/main/java/org/elasticsearch/transport/InboundPipeline.java

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -69,37 +69,22 @@ public void close() {
6969
public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
7070
pending.add(reference.retain());
7171

72-
final ReleasableBytesReference composite;
73-
if (pending.size() == 1) {
74-
composite = pending.peekFirst();
75-
} else {
76-
final ReleasableBytesReference[] bytesReferences = pending.toArray(new ReleasableBytesReference[0]);
77-
final Releasable releasable = () -> Releasables.closeWhileHandlingException(bytesReferences);
78-
composite = new ReleasableBytesReference(new CompositeBytesReference(bytesReferences), releasable);
79-
}
80-
8172
final ArrayList<Object> fragments = fragmentList.get();
82-
int bytesConsumed = 0;
8373
boolean continueHandling = true;
8474

8575
while (continueHandling && isClosed == false) {
8676
boolean continueDecoding = true;
87-
while (continueDecoding) {
88-
final int remaining = composite.length() - bytesConsumed;
89-
if (remaining != 0) {
90-
try (ReleasableBytesReference slice = composite.retainedSlice(bytesConsumed, remaining)) {
91-
final int bytesDecoded = decoder.decode(slice, fragments::add);
92-
if (bytesDecoded != 0) {
93-
bytesConsumed += bytesDecoded;
94-
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) {
95-
continueDecoding = false;
96-
}
97-
} else {
77+
while (continueDecoding && pending.isEmpty() == false) {
78+
try (ReleasableBytesReference toDecode = getPendingBytes()) {
79+
final int bytesDecoded = decoder.decode(toDecode, fragments::add);
80+
if (bytesDecoded != 0) {
81+
releasePendingBytes(bytesDecoded);
82+
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) {
9883
continueDecoding = false;
9984
}
85+
} else {
86+
continueDecoding = false;
10087
}
101-
} else {
102-
continueDecoding = false;
10388
}
10489
}
10590

@@ -118,8 +103,6 @@ public void handleBytes(TcpChannel channel, ReleasableBytesReference reference)
118103
}
119104
}
120105
}
121-
122-
releasePendingBytes(bytesConsumed);
123106
}
124107

125108
private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) throws IOException {
@@ -155,11 +138,22 @@ private boolean endOfMessage(Object fragment) {
155138
return fragment == InboundDecoder.PING || fragment == InboundDecoder.END_CONTENT || fragment instanceof Exception;
156139
}
157140

158-
private void releasePendingBytes(int bytesConsumed) {
159-
if (isClosed) {
160-
// Are released by the close method
161-
return;
141+
private ReleasableBytesReference getPendingBytes() {
142+
if (pending.size() == 1) {
143+
return pending.peekFirst().retain();
144+
} else {
145+
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()];
146+
int index = 0;
147+
for (ReleasableBytesReference pendingReference : pending) {
148+
bytesReferences[index] = pendingReference.retain();
149+
++index;
150+
}
151+
final Releasable releasable = () -> Releasables.closeWhileHandlingException(bytesReferences);
152+
return new ReleasableBytesReference(new CompositeBytesReference(bytesReferences), releasable);
162153
}
154+
}
155+
156+
private void releasePendingBytes(int bytesConsumed) {
163157
int bytesToRelease = bytesConsumed;
164158
while (bytesToRelease != 0) {
165159
try (ReleasableBytesReference reference = pending.pollFirst()) {

server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.collect.Tuple;
2626
import org.elasticsearch.common.io.Streams;
2727
import org.elasticsearch.common.io.stream.BytesStreamOutput;
28+
import org.elasticsearch.common.lease.Releasable;
2829
import org.elasticsearch.common.settings.Settings;
2930
import org.elasticsearch.common.util.PageCacheRecycler;
3031
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -34,6 +35,7 @@
3435
import java.util.ArrayList;
3536
import java.util.List;
3637
import java.util.Objects;
38+
import java.util.concurrent.atomic.AtomicBoolean;
3739
import java.util.function.BiConsumer;
3840

3941
import static org.hamcrest.Matchers.instanceOf;
@@ -171,6 +173,53 @@ public void testPipelineHandling() throws IOException {
171173
}
172174
}
173175

176+
public void testEnsureBodyIsNotPrematurelyReleased() throws IOException {
177+
final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE;
178+
BiConsumer<TcpChannel, InboundMessage> messageHandler = (c, m) -> {};
179+
BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler = (c, e) -> {};
180+
final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, recycler, messageHandler, errorHandler);
181+
182+
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
183+
String actionName = "actionName";
184+
final Version version = Version.CURRENT;
185+
final String value = randomAlphaOfLength(1000);
186+
final boolean isRequest = randomBoolean();
187+
final long requestId = randomNonNegativeLong();
188+
189+
OutboundMessage message;
190+
if (isRequest) {
191+
message = new OutboundMessage.Request(threadContext, new TestRequest(value),
192+
version, actionName, requestId, false, false);
193+
} else {
194+
message = new OutboundMessage.Response(threadContext, new TestResponse(value),
195+
version, requestId, false, false);
196+
}
197+
198+
final BytesReference reference = message.serialize(streamOutput);
199+
final int fixedHeaderSize = TcpHeader.headerSize(Version.CURRENT);
200+
final int variableHeaderSize = reference.getInt(fixedHeaderSize - 4);
201+
final int totalHeaderSize = fixedHeaderSize + variableHeaderSize;
202+
final AtomicBoolean bodyReleased = new AtomicBoolean(false);
203+
for (int i = 0; i < totalHeaderSize - 1; ++i) {
204+
try (ReleasableBytesReference slice = ReleasableBytesReference.wrap(reference.slice(i, 1))) {
205+
pipeline.handleBytes(new FakeTcpChannel(), slice);
206+
}
207+
}
208+
209+
final Releasable releasable = () -> bodyReleased.set(true);
210+
final int from = totalHeaderSize - 1;
211+
final BytesReference partHeaderPartBody = reference.slice(from, reference.length() - from - 1);
212+
try (ReleasableBytesReference slice = new ReleasableBytesReference(partHeaderPartBody, releasable)) {
213+
pipeline.handleBytes(new FakeTcpChannel(), slice);
214+
}
215+
assertFalse(bodyReleased.get());
216+
try (ReleasableBytesReference slice = new ReleasableBytesReference(reference.slice(reference.length() - 1, 1), releasable)) {
217+
pipeline.handleBytes(new FakeTcpChannel(), slice);
218+
}
219+
assertTrue(bodyReleased.get());
220+
}
221+
}
222+
174223
private static class MessageData {
175224

176225
private final Version version;

0 commit comments

Comments
 (0)