14
14
import org .opensearch .common .settings .Setting ;
15
15
import org .opensearch .common .settings .Settings ;
16
16
import org .opensearch .common .transport .PortsRange ;
17
+ import org .opensearch .common .unit .TimeValue ;
17
18
import org .opensearch .common .util .concurrent .OpenSearchExecutors ;
18
19
import org .opensearch .core .common .Strings ;
19
20
import org .opensearch .core .common .transport .BoundTransportAddress ;
20
21
import org .opensearch .core .common .transport .TransportAddress ;
22
+ import org .opensearch .core .common .unit .ByteSizeUnit ;
23
+ import org .opensearch .core .common .unit .ByteSizeValue ;
21
24
import org .opensearch .plugins .NetworkPlugin ;
22
25
import org .opensearch .transport .BindTransportException ;
23
26
@@ -115,6 +118,60 @@ public class Netty4GrpcServerTransport extends NetworkPlugin.AuxTransport {
115
118
Setting .Property .NodeScope
116
119
);
117
120
121
+ /**
122
+ * Controls the number of allowed simultaneous in flight requests a single client connection may send.
123
+ */
124
+ public static final Setting <Integer > SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS = Setting .intSetting (
125
+ "grpc.netty.max_concurrent_connection_calls" ,
126
+ 100 ,
127
+ 1 ,
128
+ Integer .MAX_VALUE ,
129
+ Setting .Property .NodeScope
130
+ );
131
+
132
+ /**
133
+ * Configure maximum inbound message size in bytes.
134
+ */
135
+ public static final Setting <ByteSizeValue > SETTING_GRPC_MAX_MSG_SIZE = Setting .byteSizeSetting (
136
+ "grpc.netty.max_msg_size" ,
137
+ new ByteSizeValue (10 , ByteSizeUnit .MB ),
138
+ new ByteSizeValue (0 , ByteSizeUnit .MB ),
139
+ new ByteSizeValue (Integer .MAX_VALUE , ByteSizeUnit .BYTES ),
140
+ Setting .Property .NodeScope
141
+ );
142
+
143
+ /**
144
+ * Connections lasting longer than configured age will be gracefully terminated.
145
+ * No max connection age by default.
146
+ */
147
+ public static final Setting <TimeValue > SETTING_GRPC_MAX_CONNECTION_AGE = Setting .timeSetting (
148
+ "grpc.netty.max_connection_age" ,
149
+ new TimeValue (Long .MAX_VALUE ),
150
+ new TimeValue (0 ),
151
+ Setting .Property .NodeScope
152
+ );
153
+
154
+ /**
155
+ * Idle connections lasting longer than configured value will be gracefully terminated.
156
+ * No max idle time by default.
157
+ */
158
+ public static final Setting <TimeValue > SETTING_GRPC_MAX_CONNECTION_IDLE = Setting .timeSetting (
159
+ "grpc.netty.max_connection_idle" ,
160
+ new TimeValue (Long .MAX_VALUE ),
161
+ new TimeValue (0 ),
162
+ Setting .Property .NodeScope
163
+ );
164
+
165
+ /**
166
+ * Timeout for keepalive ping requests of an established connection.
167
+ */
168
+ public static final Setting <TimeValue > SETTING_GRPC_KEEPALIVE_TIMEOUT = Setting .timeSetting (
169
+ "grpc.netty.keepalive_timeout" ,
170
+ new TimeValue (Long .MAX_VALUE ),
171
+ new TimeValue (0 ),
172
+ Setting .Property .NodeScope
173
+ );
174
+
118
175
/**
119
176
* Port range on which servers bind.
120
177
*/
@@ -136,6 +193,11 @@ public class Netty4GrpcServerTransport extends NetworkPlugin.AuxTransport {
136
193
private final String [] bindHosts ;
137
194
private final String [] publishHosts ;
138
195
private final int nettyEventLoopThreads ;
196
+ private final long maxInboundMessageSize ;
197
+ private final long maxConcurrentConnectionCalls ;
198
+ private final TimeValue maxConnectionAge ;
199
+ private final TimeValue maxConnectionIdle ;
200
+ private final TimeValue keepAliveTimeout ;
139
201
private final CopyOnWriteArrayList <Server > servers = new CopyOnWriteArrayList <>();
140
202
private final List <UnaryOperator <NettyServerBuilder >> serverBuilderConfigs = new ArrayList <>();
141
203
@@ -153,18 +215,20 @@ public Netty4GrpcServerTransport(Settings settings, List<BindableService> servic
153
215
this .settings = Objects .requireNonNull (settings );
154
216
this .services = Objects .requireNonNull (services );
155
217
this .networkService = Objects .requireNonNull (networkService );
156
-
157
218
final List <String > grpcBindHost = SETTING_GRPC_BIND_HOST .get (settings );
158
219
this .bindHosts = (grpcBindHost .isEmpty () ? NetworkService .GLOBAL_NETWORK_BIND_HOST_SETTING .get (settings ) : grpcBindHost ).toArray (
159
220
Strings .EMPTY_ARRAY
160
221
);
161
-
162
222
final List <String > grpcPublishHost = SETTING_GRPC_PUBLISH_HOST .get (settings );
163
223
this .publishHosts = (grpcPublishHost .isEmpty () ? NetworkService .GLOBAL_NETWORK_PUBLISH_HOST_SETTING .get (settings ) : grpcPublishHost )
164
224
.toArray (Strings .EMPTY_ARRAY );
165
-
166
225
this .port = SETTING_GRPC_PORT .get (settings );
167
226
this .nettyEventLoopThreads = SETTING_GRPC_WORKER_COUNT .get (settings );
227
+ this .maxInboundMessageSize = SETTING_GRPC_MAX_MSG_SIZE .get (settings ).getBytes ();
228
+ this .maxConcurrentConnectionCalls = SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS .get (settings );
229
+ this .maxConnectionAge = SETTING_GRPC_MAX_CONNECTION_AGE .get (settings );
230
+ this .maxConnectionIdle = SETTING_GRPC_MAX_CONNECTION_IDLE .get (settings );
231
+ this .keepAliveTimeout = SETTING_GRPC_KEEPALIVE_TIMEOUT .get (settings );
168
232
this .portSettingKey = SETTING_GRPC_PORT .getKey ();
169
233
}
170
234
@@ -285,12 +349,16 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan
285
349
286
350
boolean success = portRange .iterate (portNumber -> {
287
351
try {
288
-
289
352
final InetSocketAddress address = new InetSocketAddress (hostAddress , portNumber );
290
353
final NettyServerBuilder serverBuilder = NettyServerBuilder .forAddress (address )
291
354
.directExecutor ()
292
355
.bossEventLoopGroup (eventLoopGroup )
293
356
.workerEventLoopGroup (eventLoopGroup )
357
+ .maxInboundMessageSize ((int ) maxInboundMessageSize )
358
+ .maxConcurrentCallsPerConnection ((int ) maxConcurrentConnectionCalls )
359
+ .maxConnectionAge (maxConnectionAge .duration (), maxConnectionAge .timeUnit ())
360
+ .maxConnectionIdle (maxConnectionIdle .duration (), maxConnectionIdle .timeUnit ())
361
+ .keepAliveTimeout (keepAliveTimeout .duration (), keepAliveTimeout .timeUnit ())
294
362
.channelType (NioServerSocketChannel .class )
295
363
.addService (new HealthStatusManager ().getHealthService ())
296
364
.addService (ProtoReflectionService .newInstance ());
0 commit comments