diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index ab078ad10d337..fa06f9c5698d7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -41,9 +41,31 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) throws Exception { Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); - FullHttpRequest request = msg.getRequest(); + final FullHttpRequest request = httpRequest(msg); + final Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence()); + + if (request.decoderResult().isFailure()) { + Throwable cause = request.decoderResult().cause(); + if (cause instanceof Error) { + ExceptionsHelper.dieOnError(cause); + serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause)); + } else { + serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause); + } + } else { + serverTransport.incomingRequest(httpRequest, channel); + } + } - try { + // package private for unit-testing + static FullHttpRequest httpRequest(HttpPipelinedRequest msg) { + FullHttpRequest request = msg.getRequest(); + if (Netty4Utils.isUnpooled(request.content())) { + assert Netty4Utils.isBufferHierarchyUnpooled(request.content()) : "request body contains unpooled and pooled buffers"; + // if the buffer is unpooled its lifecycle is managed by the garbage collector instead of Netty's internal + // memory pool. Thus we we can avoiding copying the request content buffer and use the original request instead. + return request; + } else { final FullHttpRequest copiedRequest = new DefaultFullHttpRequest( request.protocolVersion(), @@ -52,23 +74,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest channels) throws IOEx } } + /** + * @param buffer A byte buffer instance. Must not be null. + * @return true iff this byte buffer has been allocated outside of Netty's buffer pool. + */ + public static boolean isUnpooled(final ByteBuf buffer) { + return buffer.alloc() instanceof UnpooledByteBufAllocator; + } + + /** + * Contrary to {@link #isUnpooled(ByteBuf)} which assumes that the top-level buffer's allocator is used for any associated buffers, + * this implementation does a more thorough check by inspecting all components of a CompositeByteBuf. + * + * @param buffer A byte buffer instance. Must not be null. + * @return true iff this byte buffer and all its components have been allocated outside of Netty's buffer pool. + */ + public static boolean isBufferHierarchyUnpooled(final ByteBuf buffer) { + if (isUnpooled(buffer)) { + if (buffer instanceof CompositeByteBuf) { + CompositeByteBuf compositeBuffer = (CompositeByteBuf) buffer; + for(int i = 0; i < compositeBuffer.numComponents(); i++) { + // access the internal component to avoid duplicating the buffer (see CompositeByteBuf#component(int)) + if (isBufferHierarchyUnpooled(compositeBuffer.internalComponent(i)) == false) { + return false; + } + } + } + return true; + } else { + return false; + } + } + /** * If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be * caught and bubbles up to the uncaught exception handler. diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandlerTests.java new file mode 100644 index 0000000000000..9e7973011b3fc --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandlerTests.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.test.ESTestCase; + +public class Netty4HttpRequestHandlerTests extends ESTestCase { + public void testExtractHttpRequestWithUnpooledContent() { + HttpPipelinedRequest pipelinedRequest = createPipelinedRequest(false); + FullHttpRequest extractedRequest = Netty4HttpRequestHandler.httpRequest(pipelinedRequest); + assertSame(extractedRequest, pipelinedRequest.getRequest()); + } + + public void testExtractHttpRequestWithPooledContent() { + HttpPipelinedRequest pipelinedRequest = createPipelinedRequest(true); + assertEquals(1, pipelinedRequest.getRequest().refCnt()); + FullHttpRequest extractedRequest = Netty4HttpRequestHandler.httpRequest(pipelinedRequest); + assertNotSame(extractedRequest, pipelinedRequest.getRequest()); + assertEquals(0, pipelinedRequest.getRequest().refCnt()); + } + + private HttpPipelinedRequest createPipelinedRequest(boolean usePooledAllocator) { + ByteBufAllocator alloc = usePooledAllocator ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT; + ByteBuf content = alloc.buffer(5); + ByteBufUtil.writeAscii(content, randomAlphaOfLength(content.capacity())); + final FullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", content); + HttpUtil.setContentLength(nettyRequest, content.capacity()); + return new HttpPipelinedRequest<>(0, nettyRequest); + } +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java index 8372a8540b8be..9775717bcc8fc 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java @@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase; import org.elasticsearch.common.bytes.BytesArray; @@ -30,6 +32,8 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import org.junit.After; +import org.junit.Before; import java.io.IOException; @@ -37,6 +41,22 @@ public class Netty4UtilsTests extends ESTestCase { private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false); + private ByteBuf unpooledBuffer; + private ByteBuf pooledBuffer; + + @Before + public void setUp() throws Exception { + super.setUp(); + pooledBuffer = PooledByteBufAllocator.DEFAULT.buffer(1); + unpooledBuffer = UnpooledByteBufAllocator.DEFAULT.buffer(1); + } + + @After + public void tearDown() throws Exception { + pooledBuffer.release(); + unpooledBuffer.release(); + super.tearDown(); + } public void testToChannelBufferWithEmptyRef() throws IOException { ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0)); @@ -95,4 +115,8 @@ private BytesReference getRandomizedBytesReference(int length) throws IOExceptio } } + public void testClassifiesBuffersAsPooledOrUnpooled() { + assertTrue(Netty4Utils.isUnpooled(unpooledBuffer)); + assertFalse(Netty4Utils.isUnpooled(pooledBuffer)); + } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index b4108b3e6c7d0..01d81974e0e60 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.bytes.BytesReference; @@ -63,6 +64,38 @@ static ByteBuf toByteBuf(final BytesReference reference) { } } + /** + * @param buffer A byte buffer instance. Must not be null. + * @return true iff this byte buffer has been allocated outside of Netty's buffer pool. + */ + static boolean isUnpooled(final ByteBuf buffer) { + return buffer.alloc() instanceof UnpooledByteBufAllocator; + } + + /** + * Contrary to {@link #isUnpooled(ByteBuf)} which assumes that the top-level buffer's allocator is used for any associated buffers, + * this implementation does a more thorough check by inspecting all components of a CompositeByteBuf. + * + * @param buffer A byte buffer instance. Must not be null. + * @return true iff this byte buffer and all its components have been allocated outside of Netty's buffer pool. + */ + static boolean isBufferHierarchyUnpooled(final ByteBuf buffer) { + if (isUnpooled(buffer)) { + if (buffer instanceof CompositeByteBuf) { + CompositeByteBuf compositeBuffer = (CompositeByteBuf) buffer; + for(int i = 0; i < compositeBuffer.numComponents(); i++) { + // access the internal component to avoid duplicating the buffer (see CompositeByteBuf#component(int)) + if (isBufferHierarchyUnpooled(compositeBuffer.internalComponent(i)) == false) { + return false; + } + } + } + return true; + } else { + return false; + } + } + static BytesReference toBytesReference(final ByteBuf buffer) { return new ByteBufBytesReference(buffer, buffer.readableBytes()); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 3dcd59cf8e28c..2cf0242374c47 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -122,9 +122,31 @@ public void close() throws IOException { @SuppressWarnings("unchecked") private void handleRequest(Object msg) { final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; - FullHttpRequest request = pipelinedRequest.getRequest(); + final FullHttpRequest request = httpRequest(pipelinedRequest); + final NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence()); + + if (request.decoderResult().isFailure()) { + Throwable cause = request.decoderResult().cause(); + if (cause instanceof Error) { + ExceptionsHelper.dieOnError(cause); + transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); + } else { + transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); + } + } else { + transport.incomingRequest(httpRequest, nioHttpChannel); + } + } - try { + // package private for unit-testing + static FullHttpRequest httpRequest(HttpPipelinedRequest pipelinedRequest) { + FullHttpRequest request = pipelinedRequest.getRequest(); + if (ByteBufUtils.isUnpooled(request.content())) { + assert ByteBufUtils.isBufferHierarchyUnpooled(request.content()) : "request body contains unpooled and pooled buffers"; + // if the buffer is unpooled its lifecycle is managed by the garbage collector instead of Netty's internal + // memory pool. Thus we we can avoiding copying the request content buffer and use the original request instead. + return request; + } else { final FullHttpRequest copiedRequest = new DefaultFullHttpRequest( request.protocolVersion(), @@ -133,23 +155,10 @@ private void handleRequest(Object msg) { Unpooled.copiedBuffer(request.content()), request.headers(), request.trailingHeaders()); - - NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence()); - - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.dieOnError(cause); - transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); - } else { - transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); - } - } else { - transport.incomingRequest(httpRequest, nioHttpChannel); - } - } finally { // As we have copied the buffer, we can release the request request.release(); + return copiedRequest; } } + } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/ByteBufUtilsTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/ByteBufUtilsTests.java new file mode 100644 index 0000000000000..96cc6e23bb35e --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/ByteBufUtilsTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import org.elasticsearch.test.ESTestCase; +import org.junit.After; +import org.junit.Before; + +public class ByteBufUtilsTests extends ESTestCase { + private ByteBuf unpooledBuffer; + private ByteBuf pooledBuffer; + + @Before + public void setUp() throws Exception { + super.setUp(); + pooledBuffer = PooledByteBufAllocator.DEFAULT.buffer(1); + unpooledBuffer = UnpooledByteBufAllocator.DEFAULT.buffer(1); + } + + @After + public void tearDown() throws Exception { + pooledBuffer.release(); + unpooledBuffer.release(); + super.tearDown(); + } + + public void testClassifiesBuffersAsPooledOrUnpooled() { + assertTrue(ByteBufUtils.isUnpooled(unpooledBuffer)); + assertFalse(ByteBufUtils.isUnpooled(pooledBuffer)); + } +} diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index 62bf845a77058..29d378aa8d43d 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -20,10 +20,15 @@ package org.elasticsearch.http.nio; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; @@ -39,6 +44,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; +import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.http.HttpTransportSettings; @@ -217,6 +223,29 @@ public void testEncodeHttpResponse() throws IOException { } } + public void testExtractHttpRequestWithUnpooledContent() { + HttpPipelinedRequest pipelinedRequest = createPipelinedRequest(false); + FullHttpRequest extractedRequest = HttpReadWriteHandler.httpRequest(pipelinedRequest); + assertSame(extractedRequest, pipelinedRequest.getRequest()); + } + + public void testExtractHttpRequestWithPooledContent() { + HttpPipelinedRequest pipelinedRequest = createPipelinedRequest(true); + assertEquals(1, pipelinedRequest.getRequest().refCnt()); + FullHttpRequest extractedRequest = HttpReadWriteHandler.httpRequest(pipelinedRequest); + assertNotSame(extractedRequest, pipelinedRequest.getRequest()); + assertEquals(0, pipelinedRequest.getRequest().refCnt()); + } + + private HttpPipelinedRequest createPipelinedRequest(boolean usePooledAllocator) { + ByteBufAllocator alloc = usePooledAllocator ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT; + ByteBuf content = alloc.buffer(5); + ByteBufUtil.writeAscii(content, randomAlphaOfLength(content.capacity())); + final FullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", content); + HttpUtil.setContentLength(nettyRequest, content.capacity()); + return new HttpPipelinedRequest<>(0, nettyRequest); + } + public void testCorsEnabledWithoutAllowOrigins() throws IOException { // Set up a HTTP transport with only the CORS enabled setting Settings settings = Settings.builder() diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java b/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java index db3a2bae16714..91a6b0911846d 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java @@ -23,7 +23,7 @@ public class HttpPipelinedRequest implements HttpPipelinedMessage { private final R request; private final int sequence; - HttpPipelinedRequest(int sequence, R request) { + public HttpPipelinedRequest(int sequence, R request) { this.sequence = sequence; this.request = request; }