45
45
import java .util .Arrays ;
46
46
import java .util .Collections ;
47
47
import java .util .List ;
48
+ import java .util .Map ;
48
49
import java .util .Set ;
49
50
import java .util .concurrent .ConcurrentHashMap ;
51
+ import java .util .concurrent .TimeUnit ;
50
52
import java .util .concurrent .atomic .AtomicLong ;
51
53
import java .util .concurrent .atomic .AtomicReference ;
52
54
@@ -60,6 +62,9 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
60
62
private static final Logger logger = LogManager .getLogger (AbstractHttpServerTransport .class );
61
63
private static final ActionListener <Void > NO_OP = ActionListener .wrap (() -> {});
62
64
65
+ private static final long PRUNE_THROTTLE_INTERVAL = TimeUnit .SECONDS .toMillis (60 );
66
+ private static final long MAX_CLIENT_STATS_AGE = TimeUnit .MINUTES .toMillis (5 );
67
+
63
68
protected final Settings settings ;
64
69
public final HttpHandlingSettings handlingSettings ;
65
70
protected final NetworkService networkService ;
@@ -78,10 +83,12 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
78
83
private final AtomicLong totalChannelsAccepted = new AtomicLong ();
79
84
private final Set <HttpChannel > httpChannels = Collections .newSetFromMap (new ConcurrentHashMap <>());
80
85
private final Set <HttpServerChannel > httpServerChannels = Collections .newSetFromMap (new ConcurrentHashMap <>());
86
+ private final Map <Integer , HttpStats .ClientStats > httpChannelStats = new ConcurrentHashMap <>();
81
87
82
88
private final HttpTracer tracer ;
83
89
84
90
private volatile long slowLogThresholdMs ;
91
+ protected volatile long lastClientStatsPruneTime ;
85
92
86
93
protected AbstractHttpServerTransport (Settings settings , NetworkService networkService , BigArrays bigArrays , ThreadPool threadPool ,
87
94
NamedXContentRegistry xContentRegistry , Dispatcher dispatcher , ClusterSettings clusterSettings ) {
@@ -128,7 +135,26 @@ public HttpInfo info() {
128
135
129
136
@ Override
130
137
public HttpStats stats () {
131
- return new HttpStats (httpChannels .size (), totalChannelsAccepted .get ());
138
+ pruneClientStats (false );
139
+ return new HttpStats (new ArrayList <>(httpChannelStats .values ()), httpChannels .size (), totalChannelsAccepted .get ());
140
+ }
141
+
142
+ /**
143
+ * Prunes client stats of entries that have been disconnected for more than five minutes.
144
+ *
145
+ * @param throttled When true, executes the prune process only if more than 60 seconds has elapsed since the last execution.
146
+ */
147
+ void pruneClientStats (boolean throttled ) {
148
+ if (throttled == false || (threadPool .relativeTimeInMillis () - lastClientStatsPruneTime > PRUNE_THROTTLE_INTERVAL )) {
149
+ long nowMillis = threadPool .absoluteTimeInMillis ();
150
+ for (var statsEntry : httpChannelStats .entrySet ()) {
151
+ long closedTimeMillis = statsEntry .getValue ().closedTimeMillis ;
152
+ if (closedTimeMillis > 0 && (nowMillis - closedTimeMillis > MAX_CLIENT_STATS_AGE )) {
153
+ httpChannelStats .remove (statsEntry .getKey ());
154
+ }
155
+ }
156
+ lastClientStatsPruneTime = threadPool .relativeTimeInMillis ();
157
+ }
132
158
}
133
159
134
160
protected void bindServer () {
@@ -291,7 +317,23 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
291
317
boolean addedOnThisCall = httpChannels .add (httpChannel );
292
318
assert addedOnThisCall : "Channel should only be added to http channel set once" ;
293
319
totalChannelsAccepted .incrementAndGet ();
294
- httpChannel .addCloseListener (ActionListener .wrap (() -> httpChannels .remove (httpChannel )));
320
+ httpChannelStats .put (
321
+ HttpStats .ClientStats .getChannelKey (httpChannel ),
322
+ new HttpStats .ClientStats (threadPool .absoluteTimeInMillis ())
323
+ );
324
+ httpChannel .addCloseListener (ActionListener .wrap (() -> {
325
+ try {
326
+ httpChannels .remove (httpChannel );
327
+ HttpStats .ClientStats clientStats = httpChannelStats .get (HttpStats .ClientStats .getChannelKey (httpChannel ));
328
+ if (clientStats != null ) {
329
+ clientStats .closedTimeMillis = threadPool .absoluteTimeInMillis ();
330
+ }
331
+ } catch (Exception e ) {
332
+ // the listener code about should never throw
333
+ logger .trace ("error removing HTTP channel listener" , e );
334
+ }
335
+ }));
336
+ pruneClientStats (true );
295
337
logger .trace (() -> new ParameterizedMessage ("Http channel accepted: {}" , httpChannel ));
296
338
}
297
339
@@ -302,6 +344,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
302
344
* @param httpChannel that received the http request
303
345
*/
304
346
public void incomingRequest (final HttpRequest httpRequest , final HttpChannel httpChannel ) {
347
+ updateClientStats (httpRequest , httpChannel );
305
348
final long startTime = threadPool .relativeTimeInMillis ();
306
349
try {
307
350
handleIncomingRequest (httpRequest , httpChannel , httpRequest .getInboundException ());
@@ -315,6 +358,41 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt
315
358
}
316
359
}
317
360
361
+ void updateClientStats (final HttpRequest httpRequest , final HttpChannel httpChannel ) {
362
+ HttpStats .ClientStats clientStats = httpChannelStats .get (HttpStats .ClientStats .getChannelKey (httpChannel ));
363
+ if (clientStats != null ) {
364
+ if (clientStats .agent == null ) {
365
+ if (hasAtLeastOneHeaderValue (httpRequest , "x-elastic-product-origin" )) {
366
+ clientStats .agent = httpRequest .getHeaders ().get ("x-elastic-product-origin" ).get (0 );
367
+ } else if (hasAtLeastOneHeaderValue (httpRequest , "User-Agent" )) {
368
+ clientStats .agent = httpRequest .getHeaders ().get ("User-Agent" ).get (0 );
369
+ }
370
+ }
371
+ if (clientStats .localAddress == null ) {
372
+ clientStats .localAddress = NetworkAddress .format (httpChannel .getLocalAddress ());
373
+ clientStats .remoteAddress = NetworkAddress .format (httpChannel .getRemoteAddress ());
374
+ }
375
+ if (clientStats .forwardedFor == null ) {
376
+ if (hasAtLeastOneHeaderValue (httpRequest , "x-forwarded-for" )) {
377
+ clientStats .forwardedFor = httpRequest .getHeaders ().get ("x-forwarded-for" ).get (0 );
378
+ }
379
+ }
380
+ if (clientStats .opaqueId == null ) {
381
+ if (hasAtLeastOneHeaderValue (httpRequest , "x-opaque-id" )) {
382
+ clientStats .opaqueId = httpRequest .getHeaders ().get ("x-opaque-id" ).get (0 );
383
+ }
384
+ }
385
+ clientStats .lastRequestTimeMillis = threadPool .absoluteTimeInMillis ();
386
+ clientStats .lastUri = httpRequest .uri ();
387
+ clientStats .requestCount .increment ();
388
+ clientStats .requestSizeBytes .add (httpRequest .content ().length ());
389
+ }
390
+ }
391
+
392
+ private static boolean hasAtLeastOneHeaderValue (final HttpRequest request , final String header ) {
393
+ return request .getHeaders ().containsKey (header ) && request .getHeaders ().get (header ).size () > 0 ;
394
+ }
395
+
318
396
// Visible for testing
319
397
void dispatchRequest (final RestRequest restRequest , final RestChannel channel , final Throwable badRequestCause ) {
320
398
final ThreadContext threadContext = threadPool .getThreadContext ();
0 commit comments