23
23
import org .elasticsearch .common .io .stream .StreamInput ;
24
24
import org .elasticsearch .common .io .stream .StreamOutput ;
25
25
import org .elasticsearch .common .io .stream .Writeable ;
26
+ import org .elasticsearch .common .logging .DeprecationLogger ;
26
27
import org .elasticsearch .common .logging .ESLoggerFactory ;
27
28
import org .elasticsearch .common .settings .Setting ;
28
29
import org .elasticsearch .common .settings .Setting .Property ;
29
30
import org .elasticsearch .common .settings .Settings ;
31
+ import org .elasticsearch .common .unit .ByteSizeValue ;
32
+ import org .elasticsearch .http .HttpTransportSettings ;
33
+
34
+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_COUNT ;
35
+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE ;
30
36
31
37
import java .io .Closeable ;
32
38
import java .io .IOException ;
39
45
import java .util .Set ;
40
46
import java .util .concurrent .CancellationException ;
41
47
import java .util .concurrent .ExecutionException ;
42
- import java .util .concurrent .FutureTask ;
43
48
import java .util .concurrent .RunnableFuture ;
44
49
import java .util .concurrent .atomic .AtomicBoolean ;
45
50
import java .util .function .Function ;
46
51
import java .util .function .Supplier ;
47
52
import java .util .stream .Collectors ;
48
53
import java .util .stream .Stream ;
54
+ import java .nio .charset .StandardCharsets ;
55
+
49
56
50
57
/**
51
58
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
@@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable {
81
88
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct ();
82
89
private final Map <String , String > defaultHeader ;
83
90
private final ContextThreadLocal threadLocal ;
91
+ private final int maxWarningHeaderCount ;
92
+ private final long maxWarningHeaderSize ;
84
93
85
94
/**
86
95
* Creates a new ThreadContext instance
@@ -98,6 +107,8 @@ public ThreadContext(Settings settings) {
98
107
this .defaultHeader = Collections .unmodifiableMap (defaultHeader );
99
108
}
100
109
threadLocal = new ContextThreadLocal ();
110
+ this .maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT .get (settings );
111
+ this .maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE .get (settings ).getBytes ();
101
112
}
102
113
103
114
@ Override
@@ -282,7 +293,7 @@ public void addResponseHeader(final String key, final String value) {
282
293
* @param uniqueValue the function that produces de-duplication values
283
294
*/
284
295
public void addResponseHeader (final String key , final String value , final Function <String , String > uniqueValue ) {
285
- threadLocal .set (threadLocal .get ().putResponse (key , value , uniqueValue ));
296
+ threadLocal .set (threadLocal .get ().putResponse (key , value , uniqueValue , maxWarningHeaderCount , maxWarningHeaderSize ));
286
297
}
287
298
288
299
/**
@@ -359,7 +370,7 @@ private static final class ThreadContextStruct {
359
370
private final Map <String , Object > transientHeaders ;
360
371
private final Map <String , List <String >> responseHeaders ;
361
372
private final boolean isSystemContext ;
362
-
373
+ private long warningHeadersSize ; //saving current warning headers' size not to recalculate the size with every new warning header
363
374
private ThreadContextStruct (StreamInput in ) throws IOException {
364
375
final int numRequest = in .readVInt ();
365
376
Map <String , String > requestHeaders = numRequest == 0 ? Collections .emptyMap () : new HashMap <>(numRequest );
@@ -371,6 +382,7 @@ private ThreadContextStruct(StreamInput in) throws IOException {
371
382
this .responseHeaders = in .readMapOfLists (StreamInput ::readString , StreamInput ::readString );
372
383
this .transientHeaders = Collections .emptyMap ();
373
384
isSystemContext = false ; // we never serialize this it's a transient flag
385
+ this .warningHeadersSize = 0L ;
374
386
}
375
387
376
388
private ThreadContextStruct setSystemContext () {
@@ -387,6 +399,18 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
387
399
this .responseHeaders = responseHeaders ;
388
400
this .transientHeaders = transientHeaders ;
389
401
this .isSystemContext = isSystemContext ;
402
+ this .warningHeadersSize = 0L ;
403
+ }
404
+
405
+ private ThreadContextStruct (Map <String , String > requestHeaders ,
406
+ Map <String , List <String >> responseHeaders ,
407
+ Map <String , Object > transientHeaders , boolean isSystemContext ,
408
+ long warningHeadersSize ) {
409
+ this .requestHeaders = requestHeaders ;
410
+ this .responseHeaders = responseHeaders ;
411
+ this .transientHeaders = transientHeaders ;
412
+ this .isSystemContext = isSystemContext ;
413
+ this .warningHeadersSize = warningHeadersSize ;
390
414
}
391
415
392
416
/**
@@ -440,30 +464,58 @@ private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers
440
464
return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext );
441
465
}
442
466
443
- private ThreadContextStruct putResponse (final String key , final String value , final Function <String , String > uniqueValue ) {
467
+ private ThreadContextStruct putResponse (final String key , final String value , final Function <String , String > uniqueValue ,
468
+ final int maxWarningHeaderCount , final long maxWarningHeaderSize ) {
444
469
assert value != null ;
470
+ long newWarningHeaderSize = warningHeadersSize ;
471
+ //check if we can add another warning header - if max size within limits
472
+ if (key .equals ("Warning" ) && (maxWarningHeaderSize != -1 )) { //if size is NOT unbounded, check its limits
473
+ if (warningHeadersSize > maxWarningHeaderSize ) { // if max size has already been reached before
474
+ final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
475
+ maxWarningHeaderSize + "] bytes set in [" +
476
+ HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE .getKey () + "]!" ;
477
+ ESLoggerFactory .getLogger (ThreadContext .class ).warn (message );
478
+ return this ;
479
+ }
480
+ newWarningHeaderSize += "Warning" .getBytes (StandardCharsets .UTF_8 ).length + value .getBytes (StandardCharsets .UTF_8 ).length ;
481
+ if (newWarningHeaderSize > maxWarningHeaderSize ) {
482
+ final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
483
+ maxWarningHeaderSize + "] bytes set in [" +
484
+ HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE .getKey () + "]!" ;
485
+ ESLoggerFactory .getLogger (ThreadContext .class ).warn (message );
486
+ return new ThreadContextStruct (requestHeaders , responseHeaders , transientHeaders , isSystemContext , newWarningHeaderSize );
487
+ }
488
+ }
445
489
446
490
final Map <String , List <String >> newResponseHeaders = new HashMap <>(this .responseHeaders );
447
491
final List <String > existingValues = newResponseHeaders .get (key );
448
-
449
492
if (existingValues != null ) {
450
493
final Set <String > existingUniqueValues = existingValues .stream ().map (uniqueValue ).collect (Collectors .toSet ());
451
494
assert existingValues .size () == existingUniqueValues .size ();
452
495
if (existingUniqueValues .contains (uniqueValue .apply (value ))) {
453
496
return this ;
454
497
}
455
-
456
498
final List <String > newValues = new ArrayList <>(existingValues );
457
499
newValues .add (value );
458
-
459
500
newResponseHeaders .put (key , Collections .unmodifiableList (newValues ));
460
501
} else {
461
502
newResponseHeaders .put (key , Collections .singletonList (value ));
462
503
}
463
504
464
- return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext );
505
+ //check if we can add another warning header - if max count within limits
506
+ if ((key .equals ("Warning" )) && (maxWarningHeaderCount != -1 )) { //if count is NOT unbounded, check its limits
507
+ final int warningHeaderCount = newResponseHeaders .containsKey ("Warning" ) ? newResponseHeaders .get ("Warning" ).size () : 0 ;
508
+ if (warningHeaderCount > maxWarningHeaderCount ) {
509
+ final String message = "Dropping a warning header, as their total count reached the maximum allowed of [" +
510
+ maxWarningHeaderCount + "] set in [" + HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_COUNT .getKey () + "]!" ;
511
+ ESLoggerFactory .getLogger (ThreadContext .class ).warn (message );
512
+ return this ;
513
+ }
514
+ }
515
+ return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext , newWarningHeaderSize );
465
516
}
466
517
518
+
467
519
private ThreadContextStruct putTransient (String key , Object value ) {
468
520
Map <String , Object > newTransient = new HashMap <>(this .transientHeaders );
469
521
if (newTransient .putIfAbsent (key , value ) != null ) {
0 commit comments