Skip to content

Commit 2773bc7

Browse files
authored
Recycle ServletChannel in ServletContextHandler (#10801)
Recycle ServletChannel in ServletContextHandler rather than in ServletHandler, so that completion events on servlet API request/response can be handled.
1 parent 2cfe7c4 commit 2773bc7

File tree

12 files changed

+269
-101
lines changed

12 files changed

+269
-101
lines changed

jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java

+71
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Objects;
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicReference;
3132
import java.util.function.Consumer;
3233
import java.util.function.Function;
3334
import java.util.function.Predicate;
@@ -45,6 +46,7 @@
4546
import org.eclipse.jetty.server.internal.HttpChannelState;
4647
import org.eclipse.jetty.util.Attributes;
4748
import org.eclipse.jetty.util.Callback;
49+
import org.eclipse.jetty.util.ExceptionUtil;
4850
import org.eclipse.jetty.util.Fields;
4951
import org.eclipse.jetty.util.HostPort;
5052
import org.eclipse.jetty.util.NanoTime;
@@ -54,6 +56,8 @@
5456
import org.eclipse.jetty.util.annotation.ManagedAttribute;
5557
import org.eclipse.jetty.util.annotation.ManagedObject;
5658
import org.eclipse.jetty.util.thread.Invocable;
59+
import org.slf4j.Logger;
60+
import org.slf4j.LoggerFactory;
5761

5862
/**
5963
* <p>The representation of an HTTP request, for any protocol version (HTTP/1.1, HTTP/2, HTTP/3).</p>
@@ -123,6 +127,8 @@
123127
*/
124128
public interface Request extends Attributes, Content.Source
125129
{
130+
Logger LOG = LoggerFactory.getLogger(Request.class);
131+
126132
String CACHE_ATTRIBUTE = Request.class.getCanonicalName() + ".CookieCache";
127133
String COOKIE_ATTRIBUTE = Request.class.getCanonicalName() + ".Cookies";
128134
List<Locale> DEFAULT_LOCALES = List.of(Locale.getDefault());
@@ -316,8 +322,73 @@ default void push(MetaData.Request resource)
316322

317323
TunnelSupport getTunnelSupport();
318324

325+
/**
326+
* Add a {@link HttpStream.Wrapper} to the current {@link HttpStream}.
327+
* @param wrapper A function that wraps the passed stream.
328+
* @see #addCompletionListener(Request, Consumer)
329+
*/
319330
void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper);
320331

332+
/**
333+
* Adds a completion listener that is an optimized equivalent to overriding the
334+
* {@link HttpStream#succeeded()} and {@link HttpStream#failed(Throwable)} methods
335+
* of a {@link HttpStream.Wrapper} created by a call to {@link #addHttpStreamWrapper(Function)}.
336+
* In the case of a failure, the {@link Throwable} cause is passed to the listener, but unlike
337+
* {@link #addFailureListener(Consumer)} listeners, which are called when the failure occurs, completion
338+
* listeners are called only once the {@link HttpStream} is completed at the very end of processing.
339+
*
340+
* @param listener A {@link Consumer} of {@link Throwable} to call when the request handling is complete. The
341+
* listener is passed a null {@link Throwable} on success.
342+
* @see #addHttpStreamWrapper(Function)
343+
*/
344+
static void addCompletionListener(Request request, Consumer<Throwable> listener)
345+
{
346+
// Look for a ChannelRequest to use its optimized addCompletionLister
347+
HttpChannelState.ChannelRequest channelRequest = as(request, HttpChannelState.ChannelRequest.class);
348+
if (channelRequest != null)
349+
{
350+
channelRequest.addCompletionListener(listener);
351+
}
352+
else
353+
{
354+
// No ChannelRequest, so directly implement listener with a stream wrapper.
355+
AtomicReference<Consumer<Throwable>> onCompletion = new AtomicReference<>(listener);
356+
request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
357+
{
358+
@Override
359+
public void succeeded()
360+
{
361+
onCompletion(null);
362+
super.succeeded();
363+
}
364+
365+
@Override
366+
public void failed(Throwable x)
367+
{
368+
onCompletion(x);
369+
super.failed(x);
370+
}
371+
372+
private void onCompletion(Throwable x)
373+
{
374+
Consumer<Throwable> l = onCompletion.getAndSet(null);
375+
if (l != null)
376+
{
377+
try
378+
{
379+
l.accept(x);
380+
}
381+
catch (Throwable t)
382+
{
383+
ExceptionUtil.addSuppressedIfNotAssociated(x, t);
384+
LOG.warn("{} threw", l, t);
385+
}
386+
}
387+
}
388+
});
389+
}
390+
}
391+
321392
/**
322393
* <p>Get a {@link Session} associated with the request.
323394
* Sessions may not be supported by a given configuration, in which case

jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/EventsHandler.java

+4-19
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.eclipse.jetty.http.MetaData;
2121
import org.eclipse.jetty.io.Content;
2222
import org.eclipse.jetty.server.Handler;
23-
import org.eclipse.jetty.server.HttpStream;
2423
import org.eclipse.jetty.server.Request;
2524
import org.eclipse.jetty.server.Response;
2625
import org.eclipse.jetty.util.BufferUtil;
@@ -72,25 +71,11 @@ public boolean handle(Request request, Response response, Callback callback) thr
7271
{
7372
EventsRequest wrappedRequest = new EventsRequest(request, roRequest);
7473
EventsResponse wrappedResponse = new EventsResponse(roRequest, response);
75-
request.addHttpStreamWrapper(stream -> new HttpStream.Wrapper(stream)
74+
Request.addCompletionListener(request, x ->
7675
{
77-
@Override
78-
public void succeeded()
79-
{
80-
notifyOnResponseBegin(roRequest, wrappedResponse);
81-
notifyOnResponseTrailersComplete(roRequest, wrappedResponse);
82-
notifyOnComplete(roRequest, null);
83-
super.succeeded();
84-
}
85-
86-
@Override
87-
public void failed(Throwable x)
88-
{
89-
notifyOnResponseBegin(roRequest, wrappedResponse);
90-
notifyOnResponseTrailersComplete(roRequest, wrappedResponse);
91-
notifyOnComplete(roRequest, x);
92-
super.failed(x);
93-
}
76+
notifyOnResponseBegin(roRequest, wrappedResponse);
77+
notifyOnResponseTrailersComplete(roRequest, wrappedResponse);
78+
notifyOnComplete(roRequest, x);
9479
});
9580

9681
boolean handled = super.handle(wrappedRequest, wrappedResponse, callback);

jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java

+3-31
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.eclipse.jetty.http.HttpStatus;
2929
import org.eclipse.jetty.io.CyclicTimeouts;
3030
import org.eclipse.jetty.server.Handler;
31-
import org.eclipse.jetty.server.HttpStream;
3231
import org.eclipse.jetty.server.Request;
3332
import org.eclipse.jetty.server.Response;
3433
import org.eclipse.jetty.util.Callback;
@@ -263,7 +262,7 @@ private boolean handleWithPermit(Request request, Response response, Callback ca
263262
{
264263
if (LOG.isDebugEnabled())
265264
LOG.debug("{} forwarding {}", this, request);
266-
request.addHttpStreamWrapper(stream -> new Resumer(stream, request));
265+
Request.addCompletionListener(request, this::resume);
267266
return nextHandler(request, response, callback);
268267
}
269268

@@ -286,14 +285,14 @@ private void suspend(Request request, Response response, Callback callback)
286285
timeouts.schedule(entry);
287286
}
288287

289-
private void resume()
288+
private void resume(Throwable x)
290289
{
291290
// See correspondent state machine logic in handle() and expire().
292291
int permits = state.getAndIncrement();
293292
if (permits >= 0)
294293
{
295294
if (LOG.isDebugEnabled())
296-
LOG.debug("{} no suspended requests to resume", this);
295+
LOG.debug("{} no suspended requests to resume", this, x);
297296
return;
298297
}
299298

@@ -391,33 +390,6 @@ public void run()
391390
}
392391
}
393392

394-
private class Resumer extends HttpStream.Wrapper
395-
{
396-
private final Request request;
397-
398-
private Resumer(HttpStream wrapped, Request request)
399-
{
400-
super(wrapped);
401-
this.request = request;
402-
}
403-
404-
@Override
405-
public void succeeded()
406-
{
407-
if (LOG.isDebugEnabled())
408-
LOG.debug("{} succeeded {}", QoSHandler.this, request);
409-
resume();
410-
}
411-
412-
@Override
413-
public void failed(Throwable x)
414-
{
415-
if (LOG.isDebugEnabled())
416-
LOG.debug("{} failed {}", QoSHandler.this, request, x);
417-
resume();
418-
}
419-
}
420-
421393
private class Timeouts extends CyclicTimeouts<Entry>
422394
{
423395
private Timeouts(Scheduler scheduler)

jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java

+58
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
import java.io.IOException;
1717
import java.nio.ByteBuffer;
18+
import java.util.ArrayList;
1819
import java.util.HashMap;
20+
import java.util.List;
1921
import java.util.Objects;
2022
import java.util.Set;
2123
import java.util.concurrent.CompletableFuture;
2224
import java.util.concurrent.TimeoutException;
25+
import java.util.concurrent.atomic.AtomicReference;
2326
import java.util.concurrent.atomic.LongAdder;
2427
import java.util.function.Consumer;
2528
import java.util.function.Function;
@@ -774,6 +777,7 @@ public static class ChannelRequest implements Attributes, Request
774777
private final AutoLock _lock;
775778
private final LongAdder _contentBytesRead = new LongAdder();
776779
private final Attributes _attributes = new Attributes.Lazy();
780+
private final AtomicReference<List<Consumer<Throwable>>> _onCompletion = new AtomicReference<>();
777781
private HttpChannelState _httpChannelState;
778782
private Request _loggedRequest;
779783
private HttpFields _trailers;
@@ -1101,6 +1105,60 @@ public void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper)
11011105
getHttpChannelState().addHttpStreamWrapper(wrapper);
11021106
}
11031107

1108+
public void addCompletionListener(Consumer<Throwable> listener)
1109+
{
1110+
List<Consumer<Throwable>> listeners = _onCompletion.updateAndGet(list -> list == null ? new ArrayList<>() : list);
1111+
if (listeners.isEmpty())
1112+
{
1113+
addHttpStreamWrapper(stream -> new HttpStream.Wrapper(stream)
1114+
{
1115+
@Override
1116+
public void succeeded()
1117+
{
1118+
if (LOG.isDebugEnabled())
1119+
LOG.debug("succeeded {}", this);
1120+
onCompletion(null);
1121+
super.succeeded();
1122+
}
1123+
1124+
@Override
1125+
public void failed(Throwable x)
1126+
{
1127+
if (LOG.isDebugEnabled())
1128+
LOG.debug("failed {}", this, x);
1129+
onCompletion(x);
1130+
super.failed(x);
1131+
}
1132+
1133+
private void onCompletion(Throwable x)
1134+
{
1135+
List<Consumer<Throwable>> onCompletion = _onCompletion.getAndSet(null);
1136+
if (onCompletion == null)
1137+
{
1138+
if (LOG.isDebugEnabled())
1139+
LOG.warn("onCompletion called twice", new IllegalStateException(Thread.currentThread().getName()));
1140+
return;
1141+
}
1142+
// completion events in reverse order
1143+
for (int i = onCompletion.size(); i-- > 0;)
1144+
{
1145+
Consumer<Throwable> r = onCompletion.get(i);
1146+
try
1147+
{
1148+
r.accept(x);
1149+
}
1150+
catch (Throwable t)
1151+
{
1152+
ExceptionUtil.addSuppressedIfNotAssociated(x, t);
1153+
LOG.warn("{} threw", r, t);
1154+
}
1155+
}
1156+
}
1157+
});
1158+
}
1159+
listeners.add(listener);
1160+
}
1161+
11041162
@Override
11051163
public Session getSession(boolean create)
11061164
{

jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ContextHandlerTest.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -339,17 +339,12 @@ public boolean handle(Request request, Response response, Callback callback)
339339
{
340340
assertInContext(request);
341341
scopeListener.assertInContext(request.getContext(), request);
342-
343-
request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
342+
Request.addCompletionListener(request, x ->
344343
{
345-
@Override
346-
public void succeeded()
347-
{
348-
assertInContext(request);
349-
scopeListener.assertInContext(request.getContext(), request);
350-
super.succeeded();
351-
}
344+
assertInContext(request);
345+
scopeListener.assertInContext(request.getContext(), request);
352346
});
347+
353348
request.demand(() ->
354349
{
355350
assertInContext(request);

jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,6 @@ else if (_state == State.CLOSE)
244244

245245
try
246246
{
247-
if (failure != null)
248-
_servletChannel.abort(failure);
249-
250247
if (closedCallback != null)
251248
{
252249
if (failure == null)
@@ -435,7 +432,7 @@ public void complete(Callback callback)
435432
}
436433

437434
if (content != null)
438-
channelWrite(content, true, new WriteCompleteCB());
435+
channelWrite(content, true, new CompleteWriteCompleteCB());
439436
}
440437

441438
/**
@@ -1769,4 +1766,15 @@ public InvocationType getInvocationType()
17691766
return InvocationType.NON_BLOCKING;
17701767
}
17711768
}
1769+
1770+
private class CompleteWriteCompleteCB extends WriteCompleteCB
1771+
{
1772+
@Override
1773+
public void failed(Throwable x)
1774+
{
1775+
// TODO why is this needed for h2/h3?
1776+
HttpOutput.this._servletChannel.abort(x);
1777+
super.failed(x);
1778+
}
1779+
}
17721780
}

0 commit comments

Comments
 (0)