23
23
import org .elasticsearch .common .io .stream .StreamInput ;
24
24
import org .elasticsearch .common .io .stream .StreamOutput ;
25
25
import org .elasticsearch .common .io .stream .Writeable ;
26
+ import org .elasticsearch .common .logging .DeprecationLogger ;
26
27
import org .elasticsearch .common .logging .ESLoggerFactory ;
27
28
import org .elasticsearch .common .settings .Setting ;
28
29
import org .elasticsearch .common .settings .Setting .Property ;
29
30
import org .elasticsearch .common .settings .Settings ;
31
+ import org .elasticsearch .common .unit .ByteSizeValue ;
32
+ import org .elasticsearch .http .HttpTransportSettings ;
33
+
34
+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_COUNT ;
35
+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE ;
30
36
31
37
import java .io .Closeable ;
32
38
import java .io .IOException ;
39
45
import java .util .Set ;
40
46
import java .util .concurrent .CancellationException ;
41
47
import java .util .concurrent .ExecutionException ;
42
- import java .util .concurrent .FutureTask ;
43
48
import java .util .concurrent .RunnableFuture ;
44
49
import java .util .concurrent .atomic .AtomicBoolean ;
45
50
import java .util .function .Function ;
46
51
import java .util .function .Supplier ;
47
52
import java .util .stream .Collectors ;
48
53
import java .util .stream .Stream ;
54
+ import java .nio .charset .StandardCharsets ;
55
+
49
56
50
57
/**
51
58
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
@@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable {
81
88
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct ();
82
89
private final Map <String , String > defaultHeader ;
83
90
private final ContextThreadLocal threadLocal ;
91
+ private static volatile int maxWrnHeaderCount ;
92
+ private static volatile long maxWrnHeaderSize ;
84
93
85
94
/**
86
95
* Creates a new ThreadContext instance
@@ -98,13 +107,23 @@ public ThreadContext(Settings settings) {
98
107
this .defaultHeader = Collections .unmodifiableMap (defaultHeader );
99
108
}
100
109
threadLocal = new ContextThreadLocal ();
110
+ maxWrnHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT .get (settings );
111
+ maxWrnHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE .get (settings ).getBytes ();
101
112
}
102
113
103
114
@ Override
104
115
public void close () throws IOException {
105
116
threadLocal .close ();
106
117
}
107
118
119
+ public static void setMaxWarningHeaderCount (int newMaxWrnHeaderCount ){
120
+ maxWrnHeaderCount = newMaxWrnHeaderCount ;
121
+ }
122
+
123
+ public static void setMaxWarningHeaderSize (ByteSizeValue newMaxWarningHeaderSize ){
124
+ maxWrnHeaderSize = newMaxWarningHeaderSize .getBytes ();
125
+ }
126
+
108
127
/**
109
128
* Removes the current context and resets a default context. The removed context can be
110
129
* restored when closing the returned {@link StoredContext}
@@ -359,7 +378,8 @@ private static final class ThreadContextStruct {
359
378
private final Map <String , Object > transientHeaders ;
360
379
private final Map <String , List <String >> responseHeaders ;
361
380
private final boolean isSystemContext ;
362
-
381
+ private long wrnHeadersSize ; //saving current warning headers' size not to recalculate the size with every new warning header
382
+ private boolean isWrnLmtReached ;
363
383
private ThreadContextStruct (StreamInput in ) throws IOException {
364
384
final int numRequest = in .readVInt ();
365
385
Map <String , String > requestHeaders = numRequest == 0 ? Collections .emptyMap () : new HashMap <>(numRequest );
@@ -371,6 +391,8 @@ private ThreadContextStruct(StreamInput in) throws IOException {
371
391
this .responseHeaders = in .readMapOfLists (StreamInput ::readString , StreamInput ::readString );
372
392
this .transientHeaders = Collections .emptyMap ();
373
393
isSystemContext = false ; // we never serialize this it's a transient flag
394
+ wrnHeadersSize = 0L ;
395
+ isWrnLmtReached = false ;
374
396
}
375
397
376
398
private ThreadContextStruct setSystemContext () {
@@ -387,6 +409,20 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
387
409
this .responseHeaders = responseHeaders ;
388
410
this .transientHeaders = transientHeaders ;
389
411
this .isSystemContext = isSystemContext ;
412
+ this .wrnHeadersSize = 0L ;
413
+ isWrnLmtReached = false ;
414
+ }
415
+
416
+ private ThreadContextStruct (Map <String , String > requestHeaders ,
417
+ Map <String , List <String >> responseHeaders ,
418
+ Map <String , Object > transientHeaders , boolean isSystemContext ,
419
+ long wrnHeadersSize , boolean isWrnLmtReached ) {
420
+ this .requestHeaders = requestHeaders ;
421
+ this .responseHeaders = responseHeaders ;
422
+ this .transientHeaders = transientHeaders ;
423
+ this .isSystemContext = isSystemContext ;
424
+ this .wrnHeadersSize = wrnHeadersSize ;
425
+ this .isWrnLmtReached = isWrnLmtReached ;
390
426
}
391
427
392
428
/**
@@ -442,6 +478,19 @@ private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers
442
478
443
479
private ThreadContextStruct putResponse (final String key , final String value , final Function <String , String > uniqueValue ) {
444
480
assert value != null ;
481
+ long curWrnHeaderSize = 0 ;
482
+ //check if we can add another warning header (max count or size within limits)
483
+ if (key .equals ("Warning" )) {
484
+ if (isWrnLmtReached ) return this ; //can't add warning headers - limit reached
485
+ if (maxWrnHeaderCount != -1 ) { //if count is NOT unbounded, check its limits
486
+ int wrnHeaderCount = this .responseHeaders .containsKey ("Warning" ) ? this .responseHeaders .get ("Warning" ).size () : 0 ;
487
+ if (wrnHeaderCount >= maxWrnHeaderCount ) return addWrnLmtReachedHeader ();
488
+ }
489
+ if (maxWrnHeaderSize != -1 ) { //if size is NOT unbounded, check its limits
490
+ curWrnHeaderSize = "Warning" .getBytes (StandardCharsets .UTF_8 ).length + value .getBytes (StandardCharsets .UTF_8 ).length ;
491
+ if ((wrnHeadersSize + curWrnHeaderSize ) > maxWrnHeaderSize ) return addWrnLmtReachedHeader ();
492
+ }
493
+ }
445
494
446
495
final Map <String , List <String >> newResponseHeaders = new HashMap <>(this .responseHeaders );
447
496
final List <String > existingValues = newResponseHeaders .get (key );
@@ -460,8 +509,37 @@ private ThreadContextStruct putResponse(final String key, final String value, fi
460
509
} else {
461
510
newResponseHeaders .put (key , Collections .singletonList (value ));
462
511
}
512
+ return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders ,
513
+ isSystemContext , wrnHeadersSize + curWrnHeaderSize , isWrnLmtReached );
514
+ }
463
515
464
- return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders , isSystemContext );
516
+ //replace last warning header(s) with "headers limit reached" warning
517
+ //respecting limitations on headers size if it is set by user
518
+ private ThreadContextStruct addWrnLmtReachedHeader (){
519
+ if ((maxWrnHeaderSize == 0 ) || (maxWrnHeaderCount ==0 )) //can't even add "headers limit reached" warning
520
+ return new ThreadContextStruct (requestHeaders , responseHeaders , transientHeaders ,
521
+ isSystemContext , wrnHeadersSize , true );
522
+ final Map <String , List <String >> newResponseHeaders = new HashMap <>(this .responseHeaders );
523
+ final List <String > wrns = new ArrayList <>(newResponseHeaders .get ("Warning" ));
524
+ final String lastWrnMessage = DeprecationLogger .formatWarning (
525
+ "There were more warnings, but they were dropped as [" +
526
+ HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_COUNT .getKey () + "] or [" +
527
+ HttpTransportSettings .SETTING_HTTP_MAX_WARNING_HEADER_SIZE .getKey () + "] were reached!" );
528
+
529
+ if (maxWrnHeaderSize > 0 ) {
530
+ final long wrnSize = "Warning" .getBytes (StandardCharsets .UTF_8 ).length ;
531
+ wrnHeadersSize = wrnHeadersSize + wrnSize + lastWrnMessage .getBytes (StandardCharsets .UTF_8 ).length ;
532
+ do {
533
+ String wrn = wrns .remove (wrns .size () - 1 );
534
+ wrnHeadersSize = wrnHeadersSize - wrnSize - wrn .getBytes (StandardCharsets .UTF_8 ).length ;
535
+ } while (wrnHeadersSize > maxWrnHeaderSize );
536
+ } else { //we don't care about size as it is unbounded
537
+ wrns .remove (wrns .size () - 1 );
538
+ }
539
+ wrns .add (lastWrnMessage );
540
+ newResponseHeaders .put ("Warning" , Collections .unmodifiableList (wrns ));
541
+ return new ThreadContextStruct (requestHeaders , newResponseHeaders , transientHeaders ,
542
+ isSystemContext , wrnHeadersSize , true );
465
543
}
466
544
467
545
private ThreadContextStruct putTransient (String key , Object value ) {
0 commit comments