Skip to content

Commit b989646

Browse files
olcbeanjasontedor
authored andcommitted
Introducing took time for _msearch
This commit adds the took time to the response for _msearch. Relates #23767
1 parent 59657ad commit b989646

File tree

6 files changed

+263
-24
lines changed

6 files changed

+263
-24
lines changed

core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ExceptionsHelper;
24+
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionResponse;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.Strings;
2728
import org.elasticsearch.common.io.stream.StreamInput;
2829
import org.elasticsearch.common.io.stream.StreamOutput;
2930
import org.elasticsearch.common.io.stream.Streamable;
31+
import org.elasticsearch.common.unit.TimeValue;
3032
import org.elasticsearch.common.xcontent.ToXContentObject;
3133
import org.elasticsearch.common.xcontent.XContentBuilder;
3234

@@ -112,11 +114,14 @@ public Exception getFailure() {
112114

113115
private Item[] items;
114116

117+
private long tookInMillis;
118+
115119
MultiSearchResponse() {
116120
}
117121

118-
public MultiSearchResponse(Item[] items) {
122+
public MultiSearchResponse(Item[] items, long tookInMillis) {
119123
this.items = items;
124+
this.tookInMillis = tookInMillis;
120125
}
121126

122127
@Override
@@ -131,13 +136,23 @@ public Item[] getResponses() {
131136
return this.items;
132137
}
133138

139+
/**
140+
* How long the msearch took.
141+
*/
142+
public TimeValue getTook() {
143+
return new TimeValue(tookInMillis);
144+
}
145+
134146
@Override
135147
public void readFrom(StreamInput in) throws IOException {
136148
super.readFrom(in);
137149
items = new Item[in.readVInt()];
138150
for (int i = 0; i < items.length; i++) {
139151
items[i] = Item.readItem(in);
140152
}
153+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
154+
tookInMillis = in.readVLong();
155+
}
141156
}
142157

143158
@Override
@@ -147,11 +162,15 @@ public void writeTo(StreamOutput out) throws IOException {
147162
for (Item item : items) {
148163
item.writeTo(out);
149164
}
165+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
166+
out.writeVLong(tookInMillis);
167+
}
150168
}
151169

152170
@Override
153171
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
154172
builder.startObject();
173+
builder.field("took", tookInMillis);
155174
builder.startArray(Fields.RESPONSES);
156175
for (Item item : items) {
157176
builder.startObject();

core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java

+24-10
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@
3434
import org.elasticsearch.threadpool.ThreadPool;
3535
import org.elasticsearch.transport.TransportService;
3636

37-
import java.util.List;
3837
import java.util.Queue;
3938
import java.util.concurrent.ConcurrentLinkedQueue;
39+
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.function.LongSupplier;
4142

4243
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {
4344

4445
private final int availableProcessors;
4546
private final ClusterService clusterService;
4647
private final TransportAction<SearchRequest, SearchResponse> searchAction;
48+
private final LongSupplier relativeTimeProvider;
4749

4850
@Inject
4951
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
@@ -53,19 +55,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran
5355
this.clusterService = clusterService;
5456
this.searchAction = searchAction;
5557
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
58+
this.relativeTimeProvider = System::nanoTime;
5659
}
5760

5861
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
5962
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
60-
IndexNameExpressionResolver resolver, int availableProcessors) {
63+
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) {
6164
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
6265
this.clusterService = clusterService;
6366
this.searchAction = searchAction;
6467
this.availableProcessors = availableProcessors;
68+
this.relativeTimeProvider = relativeTimeProvider;
6569
}
6670

6771
@Override
6872
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
73+
final long relativeStartTime = relativeTimeProvider.getAsLong();
74+
6975
ClusterState clusterState = clusterService.state();
7076
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
7177

@@ -85,7 +91,7 @@ protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchR
8591
final AtomicInteger responseCounter = new AtomicInteger(numRequests);
8692
int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches);
8793
for (int i = 0; i < numConcurrentSearches; i++) {
88-
executeSearch(searchRequestSlots, responses, responseCounter, listener);
94+
executeSearch(searchRequestSlots, responses, responseCounter, listener, relativeStartTime);
8995
}
9096
}
9197

@@ -111,11 +117,12 @@ static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState st
111117
* @param responseCounter incremented on each response
112118
* @param listener the listener attached to the multi-search request
113119
*/
114-
private void executeSearch(
120+
void executeSearch(
115121
final Queue<SearchRequestSlot> requests,
116122
final AtomicArray<MultiSearchResponse.Item> responses,
117123
final AtomicInteger responseCounter,
118-
final ActionListener<MultiSearchResponse> listener) {
124+
final ActionListener<MultiSearchResponse> listener,
125+
final long relativeStartTime) {
119126
SearchRequestSlot request = requests.poll();
120127
if (request == null) {
121128
/*
@@ -155,16 +162,25 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
155162
} else {
156163
if (thread == Thread.currentThread()) {
157164
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
158-
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener));
165+
threadPool.generic()
166+
.execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime));
159167
} else {
160168
// we are on a different thread (we went asynchronous), it's safe to recurse
161-
executeSearch(requests, responses, responseCounter, listener);
169+
executeSearch(requests, responses, responseCounter, listener, relativeStartTime);
162170
}
163171
}
164172
}
165173

166174
private void finish() {
167-
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
175+
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]),
176+
buildTookInMillis()));
177+
}
178+
179+
/**
180+
* Builds how long it took to execute the msearch.
181+
*/
182+
private long buildTookInMillis() {
183+
return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime);
168184
}
169185
});
170186
}
@@ -178,7 +194,5 @@ static final class SearchRequestSlot {
178194
this.request = request;
179195
this.responseSlot = responseSlot;
180196
}
181-
182197
}
183-
184198
}

core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
101101
mSearchResponses.add(new MultiSearchResponse.Item(response, null));
102102
}
103103

104-
listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0])));
104+
listener.onResponse(
105+
new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000)));
105106
}
106107
};
107108

@@ -153,10 +154,11 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
153154
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
154155
null, null, null, false, null, 1);
155156
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
156-
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
157-
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
158-
new MultiSearchResponse.Item(response, null)
159-
}));
157+
listener.onResponse(new MultiSearchResponse(
158+
new MultiSearchResponse.Item[]{
159+
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
160+
new MultiSearchResponse.Item(response, null)
161+
}, randomIntBetween(1, 10000)));
160162
}
161163
};
162164

0 commit comments

Comments
 (0)