Skip to content

Commit c29b235

Browse files
Stop Copying Bulk HTTP Requests in NIO Networking (elastic#49819) (elastic#51393)
Same as elastic#44564 but for NIO.
1 parent 8703b88 commit c29b235

File tree

3 files changed

+53
-36
lines changed

3 files changed

+53
-36
lines changed

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.lucene.util.BytesRef;
2525
import org.apache.lucene.util.BytesRefIterator;
2626
import org.elasticsearch.common.bytes.AbstractBytesReference;
27+
import org.elasticsearch.common.bytes.BytesArray;
2728
import org.elasticsearch.common.bytes.BytesReference;
2829
import org.elasticsearch.common.io.stream.StreamInput;
2930

@@ -70,7 +71,8 @@ static ByteBuf toByteBuf(final BytesReference reference) {
7071
}
7172

7273
static BytesReference toBytesReference(final ByteBuf buffer) {
73-
return new ByteBufBytesReference(buffer, buffer.readableBytes());
74+
final int readableBytes = buffer.readableBytes();
75+
return readableBytes == 0 ? BytesArray.EMPTY : new ByteBufBytesReference(buffer, readableBytes);
7476
}
7577

7678
private static class ByteBufBytesReference extends AbstractBytesReference {

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
package org.elasticsearch.http.nio;
2121

22-
import io.netty.buffer.Unpooled;
2322
import io.netty.channel.ChannelHandler;
2423
import io.netty.handler.codec.ByteToMessageDecoder;
25-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2624
import io.netty.handler.codec.http.FullHttpRequest;
2725
import io.netty.handler.codec.http.HttpContentCompressor;
2826
import io.netty.handler.codec.http.HttpContentDecompressor;
@@ -158,32 +156,25 @@ public void close() throws IOException {
158156
private void handleRequest(Object msg) {
159157
final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
160158
FullHttpRequest request = pipelinedRequest.getRequest();
161-
162-
final FullHttpRequest copiedRequest;
159+
boolean success = false;
160+
NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence());
163161
try {
164-
copiedRequest = new DefaultFullHttpRequest(
165-
request.protocolVersion(),
166-
request.method(),
167-
request.uri(),
168-
Unpooled.copiedBuffer(request.content()),
169-
request.headers(),
170-
request.trailingHeaders());
171-
} finally {
172-
// As we have copied the buffer, we can release the request
173-
request.release();
174-
}
175-
NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence());
176-
177-
if (request.decoderResult().isFailure()) {
178-
Throwable cause = request.decoderResult().cause();
179-
if (cause instanceof Error) {
180-
ExceptionsHelper.maybeDieOnAnotherThread(cause);
181-
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
162+
if (request.decoderResult().isFailure()) {
163+
Throwable cause = request.decoderResult().cause();
164+
if (cause instanceof Error) {
165+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
166+
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
167+
} else {
168+
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
169+
}
182170
} else {
183-
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
171+
transport.incomingRequest(httpRequest, nioHttpChannel);
172+
}
173+
success = true;
174+
} finally {
175+
if (success == false) {
176+
request.release();
184177
}
185-
} else {
186-
transport.incomingRequest(httpRequest, nioHttpChannel);
187178
}
188179
}
189180

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.http.nio;
2121

22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.Unpooled;
2224
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2325
import io.netty.handler.codec.http.DefaultHttpHeaders;
2426
import io.netty.handler.codec.http.FullHttpRequest;
@@ -28,7 +30,6 @@
2830
import io.netty.handler.codec.http.cookie.Cookie;
2931
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
3032
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
31-
import org.elasticsearch.common.bytes.BytesArray;
3233
import org.elasticsearch.common.bytes.BytesReference;
3334
import org.elasticsearch.http.HttpRequest;
3435
import org.elasticsearch.rest.RestRequest;
@@ -40,6 +41,7 @@
4041
import java.util.List;
4142
import java.util.Map;
4243
import java.util.Set;
44+
import java.util.concurrent.atomic.AtomicBoolean;
4345
import java.util.stream.Collectors;
4446

4547
public class NioHttpRequest implements HttpRequest {
@@ -48,16 +50,22 @@ public class NioHttpRequest implements HttpRequest {
4850
private final BytesReference content;
4951
private final HttpHeadersMap headers;
5052
private final int sequence;
53+
private final AtomicBoolean released;
54+
private final boolean pooled;
5155

5256
NioHttpRequest(FullHttpRequest request, int sequence) {
57+
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
58+
ByteBufUtils.toBytesReference(request.content()));
59+
}
60+
61+
private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
62+
BytesReference content) {
5363
this.request = request;
54-
headers = new HttpHeadersMap(request.headers());
5564
this.sequence = sequence;
56-
if (request.content().isReadable()) {
57-
this.content = ByteBufUtils.toBytesReference(request.content());
58-
} else {
59-
this.content = BytesArray.EMPTY;
60-
}
65+
this.headers = headers;
66+
this.content = content;
67+
this.pooled = pooled;
68+
this.released = released;
6169
}
6270

6371
@Override
@@ -105,17 +113,32 @@ public String uri() {
105113

106114
@Override
107115
public BytesReference content() {
116+
assert released.get() == false;
108117
return content;
109118
}
110119

111120
@Override
112121
public void release() {
113-
// NioHttpRequest works from copied unpooled bytes no need to release anything
122+
if (pooled && released.compareAndSet(false, true)) {
123+
request.release();
124+
}
114125
}
115126

116127
@Override
117128
public HttpRequest releaseAndCopy() {
118-
return this;
129+
assert released.get() == false;
130+
if (pooled == false) {
131+
return this;
132+
}
133+
try {
134+
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
135+
return new NioHttpRequest(
136+
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
137+
request.trailingHeaders()),
138+
headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent));
139+
} finally {
140+
release();
141+
}
119142
}
120143

121144
@Override
@@ -156,7 +179,8 @@ public HttpRequest removeHeader(String header) {
156179
trailingHeaders.remove(header);
157180
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
158181
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
159-
return new NioHttpRequest(requestWithoutHeader, sequence);
182+
return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
183+
pooled, content);
160184
}
161185

162186
@Override

0 commit comments

Comments
 (0)