34
34
import org .elasticsearch .threadpool .ThreadPool ;
35
35
import org .elasticsearch .transport .TransportService ;
36
36
37
- import java .util .List ;
38
37
import java .util .Queue ;
39
38
import java .util .concurrent .ConcurrentLinkedQueue ;
39
+ import java .util .concurrent .TimeUnit ;
40
40
import java .util .concurrent .atomic .AtomicInteger ;
41
+ import java .util .function .LongSupplier ;
41
42
42
43
public class TransportMultiSearchAction extends HandledTransportAction <MultiSearchRequest , MultiSearchResponse > {
43
44
44
45
private final int availableProcessors ;
45
46
private final ClusterService clusterService ;
46
47
private final TransportAction <SearchRequest , SearchResponse > searchAction ;
48
+ private final LongSupplier relativeTimeProvider ;
47
49
48
50
@ Inject
49
51
public TransportMultiSearchAction (Settings settings , ThreadPool threadPool , TransportService transportService ,
@@ -53,19 +55,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran
53
55
this .clusterService = clusterService ;
54
56
this .searchAction = searchAction ;
55
57
this .availableProcessors = EsExecutors .numberOfProcessors (settings );
58
+ this .relativeTimeProvider = System ::nanoTime ;
56
59
}
57
60
58
61
TransportMultiSearchAction (ThreadPool threadPool , ActionFilters actionFilters , TransportService transportService ,
59
62
ClusterService clusterService , TransportAction <SearchRequest , SearchResponse > searchAction ,
60
- IndexNameExpressionResolver resolver , int availableProcessors ) {
63
+ IndexNameExpressionResolver resolver , int availableProcessors , LongSupplier relativeTimeProvider ) {
61
64
super (Settings .EMPTY , MultiSearchAction .NAME , threadPool , transportService , actionFilters , resolver , MultiSearchRequest ::new );
62
65
this .clusterService = clusterService ;
63
66
this .searchAction = searchAction ;
64
67
this .availableProcessors = availableProcessors ;
68
+ this .relativeTimeProvider = relativeTimeProvider ;
65
69
}
66
70
67
71
@ Override
68
72
protected void doExecute (MultiSearchRequest request , ActionListener <MultiSearchResponse > listener ) {
73
+ final long relativeStartTime = relativeTimeProvider .getAsLong ();
74
+
69
75
ClusterState clusterState = clusterService .state ();
70
76
clusterState .blocks ().globalBlockedRaiseException (ClusterBlockLevel .READ );
71
77
@@ -85,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchR
85
91
final AtomicInteger responseCounter = new AtomicInteger (numRequests );
86
92
int numConcurrentSearches = Math .min (numRequests , maxConcurrentSearches );
87
93
for (int i = 0 ; i < numConcurrentSearches ; i ++) {
88
- executeSearch (searchRequestSlots , responses , responseCounter , listener );
94
+ executeSearch (searchRequestSlots , responses , responseCounter , listener , relativeStartTime );
89
95
}
90
96
}
91
97
@@ -111,11 +117,12 @@ static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState st
111
117
* @param responseCounter incremented on each response
112
118
* @param listener the listener attached to the multi-search request
113
119
*/
114
- private void executeSearch (
120
+ void executeSearch (
115
121
final Queue <SearchRequestSlot > requests ,
116
122
final AtomicArray <MultiSearchResponse .Item > responses ,
117
123
final AtomicInteger responseCounter ,
118
- final ActionListener <MultiSearchResponse > listener ) {
124
+ final ActionListener <MultiSearchResponse > listener ,
125
+ final long relativeStartTime ) {
119
126
SearchRequestSlot request = requests .poll ();
120
127
if (request == null ) {
121
128
/*
@@ -155,16 +162,25 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
155
162
} else {
156
163
if (thread == Thread .currentThread ()) {
157
164
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
158
- threadPool .generic ().execute (() -> executeSearch (requests , responses , responseCounter , listener ));
165
+ threadPool .generic ()
166
+ .execute (() -> executeSearch (requests , responses , responseCounter , listener , relativeStartTime ));
159
167
} else {
160
168
// we are on a different thread (we went asynchronous), it's safe to recurse
161
- executeSearch (requests , responses , responseCounter , listener );
169
+ executeSearch (requests , responses , responseCounter , listener , relativeStartTime );
162
170
}
163
171
}
164
172
}
165
173
166
174
private void finish () {
167
- listener .onResponse (new MultiSearchResponse (responses .toArray (new MultiSearchResponse .Item [responses .length ()])));
175
+ listener .onResponse (new MultiSearchResponse (responses .toArray (new MultiSearchResponse .Item [responses .length ()]),
176
+ buildTookInMillis ()));
177
+ }
178
+
179
+ /**
180
+ * Builds how long it took to execute the msearch.
181
+ */
182
+ private long buildTookInMillis () {
183
+ return TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - relativeStartTime );
168
184
}
169
185
});
170
186
}
@@ -178,7 +194,5 @@ static final class SearchRequestSlot {
178
194
this .request = request ;
179
195
this .responseSlot = responseSlot ;
180
196
}
181
-
182
197
}
183
-
184
198
}
0 commit comments