Skip to content

Commit a7fa5d3

Browse files
authored
Remove dangerous ByteBufStreamInput methods (#27076)
This commit removes the `ByteBufStreamInput` `readBytesReference` and `readBytesRef` methods. These methods are zero-copy which means that they retain a reference to the underlying netty buffer. The problem is that our `TcpTransport` is not designed to handle zero-copy. The netty implementation sets the read index past the current message once it has been deserialized, handled, and mostly likely dispatched to another thread. This means that netty is free to release this buffer. So it is unsafe to retain a reference to it without calling `retain`. And we cannot call `retain` because we are not currently designed to handle reference counting past the transport level. This should not currently impact us as we wrap the `ByteBufStreamInput` in `NamedWriteableAwareStreamInput` in the `TcpTransport`. This stream essentially delegates to the underling stream. However, in the case of `readBytesReference` and `readBytesRef` it leaves thw implementations to the standard `StreamInput` methods. These methods call the read byte array method which delegates to `ByteBufStreamInput`. The read byte array method on `ByteBufStreamInput` copies so it is safe. The only impact of this commit should be removing methods that could be dangerous if they were eventually called due to some refactoring.
1 parent d5efc30 commit a7fa5d3

File tree

1 file changed

+11
-11
lines changed

1 file changed

+11
-11
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,34 @@
3333
class ByteBufStreamInput extends StreamInput {
3434

3535
private final ByteBuf buffer;
36-
private final int startIndex;
3736
private final int endIndex;
3837

3938
ByteBufStreamInput(ByteBuf buffer, int length) {
4039
if (length > buffer.readableBytes()) {
4140
throw new IndexOutOfBoundsException();
4241
}
4342
this.buffer = buffer;
44-
startIndex = buffer.readerIndex();
43+
int startIndex = buffer.readerIndex();
4544
endIndex = startIndex + length;
4645
buffer.markReaderIndex();
4746
}
4847

4948
@Override
5049
public BytesReference readBytesReference(int length) throws IOException {
51-
BytesReference ref = Netty4Utils.toBytesReference(buffer.slice(buffer.readerIndex(), length));
52-
buffer.skipBytes(length);
53-
return ref;
50+
// NOTE: It is unsafe to share a reference of the internal structure, so we
51+
// use the default implementation which will copy the bytes. It is unsafe because
52+
// a netty ByteBuf might be pooled which requires a manual release to prevent
53+
// memory leaks.
54+
return super.readBytesReference(length);
5455
}
5556

5657
@Override
5758
public BytesRef readBytesRef(int length) throws IOException {
58-
if (!buffer.hasArray()) {
59-
return super.readBytesRef(length);
60-
}
61-
BytesRef bytesRef = new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), length);
62-
buffer.skipBytes(length);
63-
return bytesRef;
59+
// NOTE: It is unsafe to share a reference of the internal structure, so we
60+
// use the default implementation which will copy the bytes. It is unsafe because
61+
// a netty ByteBuf might be pooled which requires a manual release to prevent
62+
// memory leaks.
63+
return super.readBytesRef(length);
6464
}
6565

6666
@Override

0 commit comments

Comments
 (0)