33
33
import io .netty .handler .codec .http .HttpResponseStatus ;
34
34
import io .netty .handler .codec .http .HttpUtil ;
35
35
import io .netty .handler .codec .http .HttpVersion ;
36
-
37
36
import org .elasticsearch .common .bytes .BytesArray ;
38
37
import org .elasticsearch .common .settings .Settings ;
39
38
import org .elasticsearch .common .unit .ByteSizeValue ;
39
+ import org .elasticsearch .common .unit .TimeValue ;
40
40
import org .elasticsearch .http .HttpChannel ;
41
41
import org .elasticsearch .http .HttpHandlingSettings ;
42
+ import org .elasticsearch .http .HttpReadTimeoutException ;
42
43
import org .elasticsearch .http .HttpRequest ;
43
44
import org .elasticsearch .http .HttpResponse ;
44
45
import org .elasticsearch .http .HttpTransportSettings ;
48
49
import org .elasticsearch .nio .FlushOperation ;
49
50
import org .elasticsearch .nio .InboundChannelBuffer ;
50
51
import org .elasticsearch .nio .SocketChannelContext ;
52
+ import org .elasticsearch .nio .TaskScheduler ;
51
53
import org .elasticsearch .rest .RestRequest ;
52
54
import org .elasticsearch .rest .RestStatus ;
53
55
import org .elasticsearch .test .ESTestCase ;
56
58
57
59
import java .io .IOException ;
58
60
import java .nio .ByteBuffer ;
61
+ import java .util .Arrays ;
62
+ import java .util .Iterator ;
59
63
import java .util .List ;
60
64
import java .util .function .BiConsumer ;
61
65
62
66
import static org .elasticsearch .http .HttpTransportSettings .SETTING_CORS_ALLOW_CREDENTIALS ;
63
67
import static org .elasticsearch .http .HttpTransportSettings .SETTING_CORS_ALLOW_METHODS ;
64
68
import static org .elasticsearch .http .HttpTransportSettings .SETTING_CORS_ALLOW_ORIGIN ;
65
69
import static org .elasticsearch .http .HttpTransportSettings .SETTING_CORS_ENABLED ;
66
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_COMPRESSION ;
67
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_COMPRESSION_LEVEL ;
68
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_DETAILED_ERRORS_ENABLED ;
69
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_CHUNK_SIZE ;
70
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_HEADER_SIZE ;
71
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_INITIAL_LINE_LENGTH ;
72
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_RESET_COOKIES ;
73
- import static org .elasticsearch .http .HttpTransportSettings .SETTING_PIPELINING_MAX_EVENTS ;
70
+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_CONTENT_LENGTH ;
71
+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_READ_TIMEOUT ;
74
72
import static org .hamcrest .Matchers .equalTo ;
75
73
import static org .hamcrest .Matchers .is ;
76
74
import static org .hamcrest .Matchers .notNullValue ;
77
75
import static org .hamcrest .Matchers .nullValue ;
78
76
import static org .mockito .Matchers .any ;
77
+ import static org .mockito .Matchers .eq ;
79
78
import static org .mockito .Mockito .atLeastOnce ;
80
79
import static org .mockito .Mockito .mock ;
81
80
import static org .mockito .Mockito .times ;
84
83
public class HttpReadWriteHandlerTests extends ESTestCase {
85
84
86
85
private HttpReadWriteHandler handler ;
87
- private NioHttpChannel nioHttpChannel ;
86
+ private NioHttpChannel channel ;
88
87
private NioHttpServerTransport transport ;
88
+ private TaskScheduler taskScheduler ;
89
89
90
90
private final RequestEncoder requestEncoder = new RequestEncoder ();
91
91
private final ResponseDecoder responseDecoder = new ResponseDecoder ();
92
92
93
93
@ Before
94
94
public void setMocks () {
95
95
transport = mock (NioHttpServerTransport .class );
96
- Settings settings = Settings .EMPTY ;
97
- ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE .getDefault (settings );
98
- ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE .getDefault (settings );
99
- ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH .getDefault (settings );
100
- HttpHandlingSettings httpHandlingSettings = new HttpHandlingSettings (1024 ,
101
- Math .toIntExact (maxChunkSize .getBytes ()),
102
- Math .toIntExact (maxHeaderSize .getBytes ()),
103
- Math .toIntExact (maxInitialLineLength .getBytes ()),
104
- SETTING_HTTP_RESET_COOKIES .getDefault (settings ),
105
- SETTING_HTTP_COMPRESSION .getDefault (settings ),
106
- SETTING_HTTP_COMPRESSION_LEVEL .getDefault (settings ),
107
- SETTING_HTTP_DETAILED_ERRORS_ENABLED .getDefault (settings ),
108
- SETTING_PIPELINING_MAX_EVENTS .getDefault (settings ),
109
- SETTING_CORS_ENABLED .getDefault (settings ));
110
- nioHttpChannel = mock (NioHttpChannel .class );
111
- handler = new HttpReadWriteHandler (nioHttpChannel , transport , httpHandlingSettings , NioCorsConfigBuilder .forAnyOrigin ().build ());
96
+ Settings settings = Settings .builder ().put (SETTING_HTTP_MAX_CONTENT_LENGTH .getKey (), new ByteSizeValue (1024 )).build ();
97
+ HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings .fromSettings (settings );
98
+ channel = mock (NioHttpChannel .class );
99
+ taskScheduler = mock (TaskScheduler .class );
100
+
101
+ NioCorsConfig corsConfig = NioCorsConfigBuilder .forAnyOrigin ().build ();
102
+ handler = new HttpReadWriteHandler (channel , transport , httpHandlingSettings , corsConfig , taskScheduler , System ::nanoTime );
103
+ handler .channelRegistered ();
112
104
}
113
105
114
106
public void testSuccessfulDecodeHttpRequest () throws IOException {
@@ -188,7 +180,7 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t
188
180
flushOperation .getListener ().accept (null , null );
189
181
// Since we have keep-alive set to false, we should close the channel after the response has been
190
182
// flushed
191
- verify (nioHttpChannel ).close ();
183
+ verify (channel ).close ();
192
184
} finally {
193
185
response .release ();
194
186
}
@@ -335,10 +327,59 @@ public void testThatAnyOriginWorks() throws IOException {
335
327
}
336
328
}
337
329
338
- private FullHttpResponse executeCorsRequest (final Settings settings , final String originValue , final String host ) throws IOException {
330
+ @ SuppressWarnings ("unchecked" )
331
+ public void testReadTimeout () throws IOException {
332
+ TimeValue timeValue = TimeValue .timeValueMillis (500 );
333
+ Settings settings = Settings .builder ().put (SETTING_HTTP_READ_TIMEOUT .getKey (), timeValue ).build ();
339
334
HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings .fromSettings (settings );
340
- NioCorsConfig nioCorsConfig = NioHttpServerTransport .buildCorsConfig (settings );
341
- HttpReadWriteHandler handler = new HttpReadWriteHandler (nioHttpChannel , transport , httpHandlingSettings , nioCorsConfig );
335
+ DefaultFullHttpRequest nettyRequest = new DefaultFullHttpRequest (HttpVersion .HTTP_1_1 , HttpMethod .GET , "/" );
336
+ NioHttpRequest nioHttpRequest = new NioHttpRequest (nettyRequest , 0 );
337
+ NioHttpResponse httpResponse = nioHttpRequest .createResponse (RestStatus .OK , BytesArray .EMPTY );
338
+ httpResponse .addHeader (HttpHeaderNames .CONTENT_LENGTH .toString (), "0" );
339
+
340
+ NioCorsConfig corsConfig = NioCorsConfigBuilder .forAnyOrigin ().build ();
341
+ TaskScheduler taskScheduler = new TaskScheduler ();
342
+
343
+ Iterator <Integer > timeValues = Arrays .asList (0 , 2 , 4 , 6 , 8 ).iterator ();
344
+ handler = new HttpReadWriteHandler (channel , transport , httpHandlingSettings , corsConfig , taskScheduler , timeValues ::next );
345
+ handler .channelRegistered ();
346
+
347
+ prepareHandlerForResponse (handler );
348
+ SocketChannelContext context = mock (SocketChannelContext .class );
349
+ HttpWriteOperation writeOperation = new HttpWriteOperation (context , httpResponse , mock (BiConsumer .class ));
350
+ handler .writeToBytes (writeOperation );
351
+
352
+ taskScheduler .pollTask (timeValue .getNanos () + 1 ).run ();
353
+ // There was a read. Do not close.
354
+ verify (transport , times (0 )).onException (eq (channel ), any (HttpReadTimeoutException .class ));
355
+
356
+ prepareHandlerForResponse (handler );
357
+ prepareHandlerForResponse (handler );
358
+
359
+ taskScheduler .pollTask (timeValue .getNanos () + 3 ).run ();
360
+ // There was a read. Do not close.
361
+ verify (transport , times (0 )).onException (eq (channel ), any (HttpReadTimeoutException .class ));
362
+
363
+ handler .writeToBytes (writeOperation );
364
+
365
+ taskScheduler .pollTask (timeValue .getNanos () + 5 ).run ();
366
+ // There has not been a read, however there is still an inflight request. Do not close.
367
+ verify (transport , times (0 )).onException (eq (channel ), any (HttpReadTimeoutException .class ));
368
+
369
+ handler .writeToBytes (writeOperation );
370
+
371
+ taskScheduler .pollTask (timeValue .getNanos () + 7 ).run ();
372
+ // No reads and no inflight requests, close
373
+ verify (transport , times (1 )).onException (eq (channel ), any (HttpReadTimeoutException .class ));
374
+ assertNull (taskScheduler .pollTask (timeValue .getNanos () + 9 ));
375
+ }
376
+
377
+ private FullHttpResponse executeCorsRequest (final Settings settings , final String originValue , final String host ) throws IOException {
378
+ HttpHandlingSettings httpSettings = HttpHandlingSettings .fromSettings (settings );
379
+ NioCorsConfig corsConfig = NioHttpServerTransport .buildCorsConfig (settings );
380
+ HttpReadWriteHandler handler = new HttpReadWriteHandler (channel , transport , httpSettings , corsConfig , taskScheduler ,
381
+ System ::nanoTime );
382
+ handler .channelRegistered ();
342
383
prepareHandlerForResponse (handler );
343
384
DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest (HttpVersion .HTTP_1_1 , HttpMethod .GET , "/" );
344
385
if (originValue != null ) {
@@ -360,7 +401,7 @@ private FullHttpResponse executeCorsRequest(final Settings settings, final Strin
360
401
361
402
362
403
363
- private NioHttpRequest prepareHandlerForResponse (HttpReadWriteHandler handler ) throws IOException {
404
+ private void prepareHandlerForResponse (HttpReadWriteHandler handler ) throws IOException {
364
405
HttpMethod method = randomBoolean () ? HttpMethod .GET : HttpMethod .HEAD ;
365
406
HttpVersion version = randomBoolean () ? HttpVersion .HTTP_1_0 : HttpVersion .HTTP_1_1 ;
366
407
String uri = "http://localhost:9090/" + randomAlphaOfLength (8 );
@@ -385,7 +426,6 @@ private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) t
385
426
assertEquals (HttpRequest .HttpVersion .HTTP_1_0 , nioHttpRequest .protocolVersion ());
386
427
}
387
428
assertEquals (nioHttpRequest .uri (), uri );
388
- return nioHttpRequest ;
389
429
}
390
430
391
431
private InboundChannelBuffer toChannelBuffer (ByteBuf buf ) {
0 commit comments