22
22
import com .google .inject .Inject ;
23
23
import org .elasticsearch .ElasticSearchException ;
24
24
import org .elasticsearch .http .*;
25
- import org .elasticsearch .threadpool .ThreadPool ;
26
25
import org .elasticsearch .transport .BindTransportException ;
27
26
import org .elasticsearch .util .SizeUnit ;
28
27
import org .elasticsearch .util .SizeValue ;
29
- import org .elasticsearch .util .TimeValue ;
30
28
import org .elasticsearch .util .component .AbstractLifecycleComponent ;
31
29
import org .elasticsearch .util .settings .Settings ;
32
30
import org .elasticsearch .util .transport .BoundTransportAddress ;
40
38
import org .jboss .netty .handler .codec .http .HttpRequestDecoder ;
41
39
import org .jboss .netty .handler .codec .http .HttpResponseEncoder ;
42
40
import org .jboss .netty .handler .timeout .ReadTimeoutException ;
43
- import org .jboss .netty .handler .timeout .ReadTimeoutHandler ;
44
41
import org .jboss .netty .logging .InternalLogger ;
45
42
import org .jboss .netty .logging .InternalLoggerFactory ;
46
43
import org .jboss .netty .logging .Slf4JLoggerFactory ;
47
- import org .jboss .netty .util .HashedWheelTimer ;
48
44
49
45
import java .io .IOException ;
50
46
import java .net .InetAddress ;
51
47
import java .net .InetSocketAddress ;
52
48
import java .util .concurrent .Executors ;
53
- import java .util .concurrent .TimeUnit ;
54
49
import java .util .concurrent .atomic .AtomicReference ;
55
50
56
- import static org .elasticsearch .util .TimeValue .*;
57
51
import static org .elasticsearch .util .concurrent .DynamicExecutors .*;
58
52
import static org .elasticsearch .util .io .HostResolver .*;
59
53
@@ -70,8 +64,6 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
70
64
});
71
65
}
72
66
73
- private final ThreadPool threadPool ;
74
-
75
67
private final SizeValue maxContentLength ;
76
68
77
69
private final int workerCount ;
@@ -92,10 +84,6 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
92
84
93
85
private final SizeValue tcpReceiveBufferSize ;
94
86
95
- private final TimeValue httpKeepAlive ;
96
-
97
- private final TimeValue httpKeepAliveTickDuration ;
98
-
99
87
private volatile ServerBootstrap serverBootstrap ;
100
88
101
89
private volatile BoundTransportAddress boundAddress ;
@@ -106,11 +94,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
106
94
107
95
private volatile HttpServerAdapter httpServerAdapter ;
108
96
109
- private HashedWheelTimer keepAliveTimer ;
110
-
111
- @ Inject public NettyHttpServerTransport (Settings settings , ThreadPool threadPool ) {
97
+ @ Inject public NettyHttpServerTransport (Settings settings ) {
112
98
super (settings );
113
- this .threadPool = threadPool ;
114
99
SizeValue maxContentLength = componentSettings .getAsSize ("maxContentLength" , new SizeValue (100 , SizeUnit .MB ));
115
100
this .workerCount = componentSettings .getAsInt ("workerCount" , Runtime .getRuntime ().availableProcessors ());
116
101
this .port = componentSettings .get ("port" , "9200-9300" );
@@ -121,12 +106,6 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
121
106
this .reuseAddress = componentSettings .getAsBoolean ("reuseAddress" , true );
122
107
this .tcpSendBufferSize = componentSettings .getAsSize ("tcpSendBufferSize" , null );
123
108
this .tcpReceiveBufferSize = componentSettings .getAsSize ("tcpReceiveBufferSize" , null );
124
- this .httpKeepAlive = componentSettings .getAsTime ("httpKeepAlive" , timeValueSeconds (30 ));
125
- this .httpKeepAliveTickDuration = componentSettings .getAsTime ("httpKeepAliveTickDuration" , timeValueMillis (500 ));
126
-
127
- if ((httpKeepAliveTickDuration .millis () * 10 ) > httpKeepAlive .millis ()) {
128
- logger .warn ("Suspicious keep alive settings, httpKeepAlive set to [{}], while httpKeepAliveTickDuration is set to [{}]" , httpKeepAlive , httpKeepAliveTickDuration );
129
- }
130
109
131
110
// validate max content length
132
111
if (maxContentLength .bytes () > Integer .MAX_VALUE ) {
@@ -148,14 +127,12 @@ public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
148
127
Executors .newCachedThreadPool (daemonThreadFactory (settings , "httpIoWorker" )),
149
128
workerCount ));
150
129
151
- keepAliveTimer = new HashedWheelTimer (daemonThreadFactory (settings , "keepAliveTimer" ), httpKeepAliveTickDuration .millis (), TimeUnit .MILLISECONDS );
152
130
final HttpRequestHandler requestHandler = new HttpRequestHandler (this );
153
131
154
132
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory () {
155
133
@ Override public ChannelPipeline getPipeline () throws Exception {
156
134
ChannelPipeline pipeline = Channels .pipeline ();
157
135
pipeline .addLast ("openChannels" , serverOpenChannels );
158
- pipeline .addLast ("keepAliveTimeout" , new ReadTimeoutHandler (keepAliveTimer , httpKeepAlive .millis (), TimeUnit .MILLISECONDS ));
159
136
pipeline .addLast ("decoder" , new HttpRequestDecoder ());
160
137
pipeline .addLast ("aggregator" , new HttpChunkAggregator ((int ) maxContentLength .bytes ()));
161
138
pipeline .addLast ("encoder" , new HttpResponseEncoder ());
@@ -240,8 +217,6 @@ public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
240
217
serverOpenChannels = null ;
241
218
}
242
219
243
- keepAliveTimer .stop ();
244
-
245
220
if (serverBootstrap != null ) {
246
221
serverBootstrap .releaseExternalResources ();
247
222
serverBootstrap = null ;
0 commit comments