Skip to content

Commit 01e4972

Browse files
committed
Print out search request as part of async search task description (elastic#62057)
Currently, the async search task is the task that will be running through the whole execution of an async search. While the submit async search task prints out the search as part of its description, async search task doesn't while it should. With this commit we address that while also making sure that the description highlights that the task is originated from an async search. Also, we streamline the way the description is printed out by SearchTask so that it does not get forgotten in the future.
1 parent 146b2e6 commit 01e4972

File tree

8 files changed

+35
-17
lines changed

8 files changed

+35
-17
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -645,16 +645,10 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s
645645

646646
@Override
647647
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
648-
// generating description in a lazy way since source can be quite big
649-
return new SearchTask(id, type, action, null, parentTaskId, headers) {
650-
@Override
651-
public String getDescription() {
652-
return buildDescription();
653-
}
654-
};
648+
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers);
655649
}
656650

657-
public String buildDescription() {
651+
public final String buildDescription() {
658652
StringBuilder sb = new StringBuilder();
659653
sb.append("indices[");
660654
Strings.arrayToDelimitedString(indices, ",", sb);

server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public SearchScrollRequest scroll(String keepAlive) {
114114

115115
@Override
116116
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
117-
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
117+
return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers);
118118
}
119119

120120
@Override

server/src/main/java/org/elasticsearch/action/search/SearchTask.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,25 @@
2323
import org.elasticsearch.tasks.TaskId;
2424

2525
import java.util.Map;
26+
import java.util.function.Supplier;
2627

2728
/**
2829
* Task storing information about a currently running {@link SearchRequest}.
2930
*/
3031
public class SearchTask extends CancellableTask {
32+
// generating description in a lazy way since source can be quite big
33+
private final Supplier<String> descriptionSupplier;
3134
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
3235

33-
public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
34-
super(id, type, action, description, parentTaskId, headers);
36+
public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier,
37+
TaskId parentTaskId, Map<String, String> headers) {
38+
super(id, type, action, null, parentTaskId, headers);
39+
this.descriptionSupplier = descriptionSupplier;
40+
}
41+
42+
@Override
43+
public final String getDescription() {
44+
return descriptionSupplier.get();
3545
}
3646

3747
/**

server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public Logger getLogger() {
7474

7575
@Override
7676
public SearchTask getTask() {
77-
return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
77+
return new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
7878
}
7979

8080
@Override

server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest
146146
searchRequest.allowPartialSearchResults(false);
147147
SearchPhaseController controller = new SearchPhaseController(
148148
writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder());
149-
SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
149+
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
150150
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger,
151151
searchTransportService, (clusterAlias, node) -> lookup.get(node),
152152
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,15 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
8484
String type,
8585
String action,
8686
TaskId parentTaskId,
87+
Supplier<String> descriptionSupplier,
8788
TimeValue keepAlive,
8889
Map<String, String> originHeaders,
8990
Map<String, String> taskHeaders,
9091
AsyncExecutionId searchId,
9192
Client client,
9293
ThreadPool threadPool,
9394
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
94-
super(id, type, action, "async_search", parentTaskId, taskHeaders);
95+
super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders);
9596
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
9697
this.originHeaders = originHeaders;
9798
this.searchId = searchId;

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
143143
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
144144
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
145145
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
146-
return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive,
146+
return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive,
147147
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
148148
}
149149
};

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.OriginalIndices;
1212
import org.elasticsearch.action.search.SearchPhaseExecutionException;
13+
import org.elasticsearch.action.search.SearchRequest;
1314
import org.elasticsearch.action.search.SearchResponse;
1415
import org.elasticsearch.action.search.SearchShard;
1516
import org.elasticsearch.action.search.ShardSearchFailure;
@@ -18,6 +19,7 @@
1819
import org.elasticsearch.common.io.stream.DelayableWriteable;
1920
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2021
import org.elasticsearch.common.unit.TimeValue;
22+
import org.elasticsearch.index.query.QueryBuilders;
2123
import org.elasticsearch.index.shard.ShardId;
2224
import org.elasticsearch.search.DocValueFormat;
2325
import org.elasticsearch.search.SearchHits;
@@ -26,6 +28,7 @@
2628
import org.elasticsearch.search.aggregations.InternalAggregation;
2729
import org.elasticsearch.search.aggregations.InternalAggregations;
2830
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
31+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2932
import org.elasticsearch.search.internal.InternalSearchResponse;
3033
import org.elasticsearch.tasks.TaskId;
3134
import org.elasticsearch.test.ESTestCase;
@@ -71,13 +74,23 @@ public void afterTest() {
7174
}
7275

7376
private AsyncSearchTask createAsyncSearchTask() {
74-
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
77+
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
7578
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
7679
new NoOpClient(threadPool), threadPool, null);
7780
}
7881

82+
public void testTaskDescription() {
83+
SearchRequest searchRequest = new SearchRequest("index1", "index2").source(
84+
new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));
85+
AsyncSearchTask asyncSearchTask = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), searchRequest::buildDescription,
86+
TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
87+
new NoOpClient(threadPool), threadPool, null);
88+
assertEquals("async_search{indices[index1,index2], types[], search_type[QUERY_THEN_FETCH], " +
89+
"source[{\"query\":{\"term\":{\"field\":{\"value\":\"value\",\"boost\":1.0}}}}]}", asyncSearchTask.getDescription());
90+
}
91+
7992
public void testWaitForInit() throws InterruptedException {
80-
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
93+
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
8194
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
8295
new NoOpClient(threadPool), threadPool, null);
8396
int numShards = randomIntBetween(0, 10);

0 commit comments

Comments
 (0)