1
+ /*
2
+ * Licensed to Elastic Search and Shay Banon under one
3
+ * or more contributor license agreements. See the NOTICE file
4
+ * distributed with this work for additional information
5
+ * regarding copyright ownership. Elastic Search licenses this
6
+ * file to you under the Apache License, Version 2.0 (the
7
+ * "License"); you may not use this file except in compliance
8
+ * with the License. You may obtain a copy of the License at
9
+ *
10
+ * http://www.apache.org/licenses/LICENSE-2.0
11
+ *
12
+ * Unless required by applicable law or agreed to in writing,
13
+ * software distributed under the License is distributed on an
14
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
+ * KIND, either express or implied. See the License for the
16
+ * specific language governing permissions and limitations
17
+ * under the License.
18
+ */
19
+
20
+ package org .elasticsearch .threadpool .blocking ;
21
+
22
+ import com .google .inject .Inject ;
23
+ import org .elasticsearch .threadpool .support .AbstractThreadPool ;
24
+ import org .elasticsearch .util .SizeUnit ;
25
+ import org .elasticsearch .util .SizeValue ;
26
+ import org .elasticsearch .util .TimeValue ;
27
+ import org .elasticsearch .util .concurrent .DynamicExecutors ;
28
+ import org .elasticsearch .util .settings .Settings ;
29
+
30
+ import java .util .concurrent .Executors ;
31
+
32
+ import static org .elasticsearch .util .TimeValue .*;
33
+ import static org .elasticsearch .util .settings .ImmutableSettings .Builder .*;
34
+
35
+ /**
36
+ * A thread pool that will will block the execute if all threads are busy.
37
+ *
38
+ * @author kimchy (shay.banon)
39
+ */
40
+ public class BlockingThreadPool extends AbstractThreadPool {
41
+
42
+ private final int min ;
43
+ private final int max ;
44
+ private final int capacity ;
45
+ private final TimeValue waitTime ;
46
+ private final TimeValue keepAlive ;
47
+
48
+ private final int scheduledSize ;
49
+
50
+ public BlockingThreadPool () {
51
+ this (EMPTY_SETTINGS );
52
+ }
53
+
54
+ @ Inject public BlockingThreadPool (Settings settings ) {
55
+ super (settings );
56
+ this .min = componentSettings .getAsInt ("min" , 1 );
57
+ this .max = componentSettings .getAsInt ("max" , 100 );
58
+ this .capacity = (int ) componentSettings .getAsSize ("capacity" , new SizeValue (1 , SizeUnit .KB )).bytes ();
59
+ this .waitTime = componentSettings .getAsTime ("waitTime" , timeValueSeconds (60 ));
60
+ this .keepAlive = componentSettings .getAsTime ("keepAlive" , timeValueSeconds (60 ));
61
+ this .scheduledSize = componentSettings .getAsInt ("scheduledSize" , 20 );
62
+ logger .debug ("Initializing {} thread pool with min[{}], max[{}], keepAlive[{}], capacity[{}], waitTime[{}], scheduledSize[{}]" , new Object []{getType (), min , max , keepAlive , capacity , waitTime , scheduledSize });
63
+ executorService = DynamicExecutors .newBlockingThreadPool (min , max , keepAlive .millis (), capacity , waitTime .millis (), DynamicExecutors .daemonThreadFactory (settings , "[tp]" ));
64
+ scheduledExecutorService = Executors .newScheduledThreadPool (scheduledSize , DynamicExecutors .daemonThreadFactory (settings , "[sc]" ));
65
+ started = true ;
66
+ }
67
+
68
+ @ Override public String getType () {
69
+ return "blocking" ;
70
+ }
71
+ }
0 commit comments