Skip to content

Commit 5ddf920

Browse files
Stop Copying Every Http Request in Message Handler (#44564)
* Copying the request is not necessary here. We can simply release it once the response has been generated and a lot of `Unpooled` allocations that way * Relates #32228 * I think the issue that preventet that PR that PR from being merged was solved by #39634 that moved the bulk index marker search to ByteBuf bulk access so the composite buffer shouldn't require many additional bounds checks (I'd argue the bounds checks we add, we save when copying the composite buffer) * I couldn't neccessarily reproduce much of a speedup from this change, but I could reproduce a very measureable reduction in GC time with e.g. Rally's PMC (4g heap node and bulk requests of size 5k saw a reduction in young GC time by ~10% for me)
1 parent 540de4e commit 5ddf920

File tree

14 files changed

+151
-37
lines changed

14 files changed

+151
-37
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.http.netty4;
2121

22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.Unpooled;
2224
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2325
import io.netty.handler.codec.http.DefaultHttpHeaders;
2426
import io.netty.handler.codec.http.FullHttpRequest;
@@ -28,7 +30,6 @@
2830
import io.netty.handler.codec.http.cookie.Cookie;
2931
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
3032
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
31-
import org.elasticsearch.common.bytes.BytesArray;
3233
import org.elasticsearch.common.bytes.BytesReference;
3334
import org.elasticsearch.http.HttpRequest;
3435
import org.elasticsearch.rest.RestRequest;
@@ -41,23 +42,30 @@
4142
import java.util.List;
4243
import java.util.Map;
4344
import java.util.Set;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4446
import java.util.stream.Collectors;
4547

4648
public class Netty4HttpRequest implements HttpRequest {
47-
private final FullHttpRequest request;
48-
private final BytesReference content;
4949
private final HttpHeadersMap headers;
5050
private final int sequence;
51+
private final AtomicBoolean released;
52+
private final FullHttpRequest request;
53+
private final boolean pooled;
54+
private final BytesReference content;
5155

5256
Netty4HttpRequest(FullHttpRequest request, int sequence) {
57+
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
58+
Netty4Utils.toBytesReference(request.content()));
59+
}
60+
61+
private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
62+
BytesReference content) {
5363
this.request = request;
54-
headers = new HttpHeadersMap(request.headers());
5564
this.sequence = sequence;
56-
if (request.content().isReadable()) {
57-
this.content = Netty4Utils.toBytesReference(request.content());
58-
} else {
59-
this.content = BytesArray.EMPTY;
60-
}
65+
this.headers = headers;
66+
this.content = content;
67+
this.pooled = pooled;
68+
this.released = released;
6169
}
6270

6371
@Override
@@ -105,9 +113,33 @@ public String uri() {
105113

106114
@Override
107115
public BytesReference content() {
116+
assert released.get() == false;
108117
return content;
109118
}
110119

120+
@Override
121+
public void release() {
122+
if (pooled && released.compareAndSet(false, true)) {
123+
request.release();
124+
}
125+
}
126+
127+
@Override
128+
public HttpRequest releaseAndCopy() {
129+
assert released.get() == false;
130+
if (pooled == false) {
131+
return this;
132+
}
133+
try {
134+
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
135+
return new Netty4HttpRequest(
136+
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
137+
request.trailingHeaders()),
138+
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
139+
} finally {
140+
release();
141+
}
142+
}
111143

112144
@Override
113145
public final Map<String, List<String>> getHeaders() {
@@ -147,7 +179,8 @@ public HttpRequest removeHeader(String header) {
147179
trailingHeaders.remove(header);
148180
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
149181
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
150-
return new Netty4HttpRequest(requestWithoutHeader, sequence);
182+
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
183+
pooled, content);
151184
}
152185

153186
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919

2020
package org.elasticsearch.http.netty4;
2121

22-
import io.netty.buffer.Unpooled;
2322
import io.netty.channel.ChannelHandler;
2423
import io.netty.channel.ChannelHandlerContext;
2524
import io.netty.channel.SimpleChannelInboundHandler;
26-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2725
import io.netty.handler.codec.http.FullHttpRequest;
2826
import org.elasticsearch.ExceptionsHelper;
2927
import org.elasticsearch.http.HttpPipelinedRequest;
@@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
4139
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
4240
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
4341
FullHttpRequest request = msg.getRequest();
44-
final FullHttpRequest copiedRequest;
42+
boolean success = false;
43+
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
4544
try {
46-
copiedRequest =
47-
new DefaultFullHttpRequest(
48-
request.protocolVersion(),
49-
request.method(),
50-
request.uri(),
51-
Unpooled.copiedBuffer(request.content()),
52-
request.headers(),
53-
request.trailingHeaders());
54-
} finally {
55-
// As we have copied the buffer, we can release the request
56-
request.release();
57-
}
58-
Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence());
59-
60-
if (request.decoderResult().isFailure()) {
61-
Throwable cause = request.decoderResult().cause();
62-
if (cause instanceof Error) {
63-
ExceptionsHelper.maybeDieOnAnotherThread(cause);
64-
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
45+
if (request.decoderResult().isFailure()) {
46+
Throwable cause = request.decoderResult().cause();
47+
if (cause instanceof Error) {
48+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
49+
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
50+
} else {
51+
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
52+
}
6553
} else {
66-
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
54+
serverTransport.incomingRequest(httpRequest, channel);
55+
}
56+
success = true;
57+
} finally {
58+
if (success == false) {
59+
httpRequest.release();
6760
}
68-
} else {
69-
serverTransport.incomingRequest(httpRequest, channel);
7061
}
7162
}
7263

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ public BytesReference content() {
108108
return content;
109109
}
110110

111+
@Override
112+
public void release() {
113+
// NioHttpRequest works from copied unpooled bytes no need to release anything
114+
}
115+
116+
@Override
117+
public HttpRequest releaseAndCopy() {
118+
return this;
119+
}
111120

112121
@Override
113122
public final Map<String, List<String>> getHeaders() {

server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ protected BytesStreamOutput newBytesOutput() {
7777

7878
@Override
7979
public void sendResponse(RestResponse restResponse) {
80-
final ArrayList<Releasable> toClose = new ArrayList<>(3);
80+
final ArrayList<Releasable> toClose = new ArrayList<>(4);
81+
toClose.add(httpRequest::release);
8182
if (isCloseConnection()) {
8283
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
8384
}

server/src/main/java/org/elasticsearch/http/HttpRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,16 @@ enum HttpVersion {
6868
*/
6969
HttpResponse createResponse(RestStatus status, BytesReference content);
7070

71+
/**
72+
* Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()}
73+
* after this method has been invoked is undefined and implementation specific.
74+
*/
75+
void release();
76+
77+
/**
78+
* If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases
79+
* any resources associated with this instance. If the instance does not use any shared resources, returns itself.
80+
* @return a safe unpooled http request
81+
*/
82+
HttpRequest releaseAndCopy();
7183
}

server/src/main/java/org/elasticsearch/rest/RestController.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl
218218
}
219219
// iff we could reserve bytes for the request we need to send the response also over this channel
220220
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
221+
// TODO: Count requests double in the circuit breaker if they need copying?
222+
if (handler.allowsUnsafeBuffers() == false) {
223+
request.ensureSafeBuffers();
224+
}
221225
handler.handleRequest(request, responseChannel, client);
222226
} catch (Exception e) {
223227
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));

server/src/main/java/org/elasticsearch/rest/RestHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,16 @@ default boolean canTripCircuitBreaker() {
4747
default boolean supportsContentStream() {
4848
return false;
4949
}
50+
51+
/**
52+
* Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return
53+
* {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the
54+
* {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers
55+
* {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}.
56+
*
57+
* @return true iff the handler supports requests that make use of pooled buffers
58+
*/
59+
default boolean allowsUnsafeBuffers() {
60+
return false;
61+
}
5062
}

server/src/main/java/org/elasticsearch/rest/RestRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params {
6464
private final String rawPath;
6565
private final Set<String> consumedParams = new HashSet<>();
6666
private final SetOnce<XContentType> xContentType = new SetOnce<>();
67-
private final HttpRequest httpRequest;
6867
private final HttpChannel httpChannel;
6968

69+
private HttpRequest httpRequest;
70+
7071
private boolean contentConsumed = false;
7172

7273
public boolean isContentConsumed() {
@@ -97,6 +98,15 @@ protected RestRequest(RestRequest restRequest) {
9798
restRequest.getHttpRequest(), restRequest.getHttpChannel());
9899
}
99100

101+
/**
102+
* Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request
103+
* with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely
104+
* handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}.
105+
*/
106+
void ensureSafeBuffers() {
107+
httpRequest = httpRequest.releaseAndCopy();
108+
}
109+
100110
/**
101111
* Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be
102112
* decoded

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
8686
public boolean supportsContentStream() {
8787
return true;
8888
}
89+
90+
@Override
91+
public boolean allowsUnsafeBuffers() {
92+
return true;
93+
}
8994
}

server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,9 @@ public static void checkRestTotalHits(RestRequest restRequest, SearchRequest sea
310310
protected Set<String> responseParams() {
311311
return RESPONSE_PARAMS;
312312
}
313+
314+
@Override
315+
public boolean allowsUnsafeBuffers() {
316+
return true;
317+
}
313318
}

server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,15 @@ public HttpRequest removeHeader(String header) {
460460
public HttpResponse createResponse(RestStatus status, BytesReference content) {
461461
return new TestResponse(status, content);
462462
}
463+
464+
@Override
465+
public void release() {
466+
}
467+
468+
@Override
469+
public HttpRequest releaseAndCopy() {
470+
return this;
471+
}
463472
}
464473

465474
private static class TestResponse implements HttpResponse {

server/src/test/java/org/elasticsearch/rest/RestControllerTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,15 @@ public HttpRequest removeHeader(String header) {
608608
public HttpResponse createResponse(RestStatus status, BytesReference content) {
609609
return null;
610610
}
611+
612+
@Override
613+
public void release() {
614+
}
615+
616+
@Override
617+
public HttpRequest releaseAndCopy() {
618+
return this;
619+
}
611620
}, null);
612621

613622
final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED);

test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,15 @@ public boolean containsHeader(String name) {
113113
}
114114
};
115115
}
116+
117+
@Override
118+
public void release() {
119+
}
120+
121+
@Override
122+
public HttpRequest releaseAndCopy() {
123+
return this;
124+
}
116125
}
117126

118127
private static class FakeHttpChannel implements HttpChannel {

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ public boolean supportsContentStream() {
8686
return restHandler.supportsContentStream();
8787
}
8888

89+
@Override
90+
public boolean allowsUnsafeBuffers() {
91+
return restHandler.allowsUnsafeBuffers();
92+
}
93+
8994
private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException {
9095
if (restHandler instanceof RestRequestFilter) {
9196
return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);

0 commit comments

Comments
 (0)