Skip to content

Commit 42cc419

Browse files
authored
SQL: Introduce an async querying mode for SQL (elastic#73991)
This adds an async query mode to SQL. It (re)uses the same request and response async-specific EQL object parameters. Also similar to EQL, the running search task can have its state monitored and canceled and its results stored and deleted, with intermediary responses not supported (the entire result is available once search finished). The async implementation is extended to work with the SQL-specific text formats (txt, csv, tsv) as well, besides xcontent. Closes elastic#71041.
1 parent a2c1d31 commit 42cc419

File tree

62 files changed

+2853
-342
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2853
-342
lines changed

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncResponse.java renamed to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncResponse.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.eql.async;
8+
package org.elasticsearch.xpack.core.async;
99

1010
import org.elasticsearch.action.ActionResponse;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.common.io.stream.Writeable;
1414
import org.elasticsearch.common.xcontent.ToXContentObject;
1515
import org.elasticsearch.common.xcontent.XContentBuilder;
16-
import org.elasticsearch.xpack.core.async.AsyncResponse;
1716

1817
import java.io.IOException;
1918
import java.util.Objects;

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java renamed to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,14 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.eql.async;
8+
package org.elasticsearch.xpack.core.async;
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.core.TimeValue;
1313
import org.elasticsearch.tasks.CancellableTask;
1414
import org.elasticsearch.tasks.TaskId;
1515
import org.elasticsearch.tasks.TaskManager;
16-
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
17-
import org.elasticsearch.xpack.core.async.AsyncTask;
1816

1917
import java.util.ArrayList;
2018
import java.util.List;
@@ -71,7 +69,7 @@ public synchronized void removeCompletionListener(ActionListener<Response> liste
7169
/**
7270
* This method is called when the task is finished successfully before unregistering the task and storing the results
7371
*/
74-
protected synchronized void onResponse(Response response) {
72+
public synchronized void onResponse(Response response) {
7573
for (ActionListener<Response> listener : completionListeners) {
7674
listener.onResponse(response);
7775
}
@@ -80,7 +78,7 @@ protected synchronized void onResponse(Response response) {
8078
/**
8179
* This method is called when the task failed before unregistering the task and storing the results
8280
*/
83-
protected synchronized void onFailure(Exception e) {
81+
public synchronized void onFailure(Exception e) {
8482
for (ActionListener<Response> listener : completionListeners) {
8583
listener.onFailure(e);
8684
}
@@ -89,7 +87,7 @@ protected synchronized void onFailure(Exception e) {
8987
/**
9088
* Return currently available partial or the final results
9189
*/
92-
protected abstract Response getCurrentResult();
90+
public abstract Response getCurrentResult();
9391

9492
@Override
9593
public void cancelTask(TaskManager taskManager, Runnable runnable, String reason) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.sql;
9+
10+
/**
11+
* Exposes SQL async action names for the RBAC engine
12+
*/
13+
public class SqlAsyncActionNames {
14+
public static final String SQL_ASYNC_GET_RESULT_ACTION_NAME = "indices:data/read/sql/async/get";
15+
public static final String SQL_ASYNC_GET_STATUS_ACTION_NAME = "cluster:monitor/xpack/sql/async/status";
16+
}

x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
3434
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
3535
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
36-
import org.elasticsearch.xpack.eql.async.StoredAsyncResponse;
36+
import org.elasticsearch.xpack.core.async.StoredAsyncResponse;
3737
import org.elasticsearch.xpack.eql.plugin.EqlAsyncGetResultAction;
3838
import org.hamcrest.BaseMatcher;
3939
import org.hamcrest.Description;

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.get.GetResult;
3131
import org.elasticsearch.index.mapper.SourceFieldMapper;
3232
import org.elasticsearch.search.SearchHits;
33+
import org.elasticsearch.xpack.ql.async.QlStatusResponse;
3334

3435
import java.io.IOException;
3536
import java.util.Collections;
@@ -41,7 +42,7 @@
4142
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
4243
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
4344

44-
public class EqlSearchResponse extends ActionResponse implements ToXContentObject {
45+
public class EqlSearchResponse extends ActionResponse implements ToXContentObject, QlStatusResponse.AsyncStatus {
4546

4647
private final Hits hits;
4748
private final long tookInMillis;
@@ -150,14 +151,17 @@ public Hits hits() {
150151
return hits;
151152
}
152153

154+
@Override
153155
public String id() {
154156
return asyncExecutionId;
155157
}
156158

159+
@Override
157160
public boolean isRunning() {
158161
return isRunning;
159162
}
160163

164+
@Override
161165
public boolean isPartial() {
162166
return isPartial;
163167
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java

+1-16
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.elasticsearch.core.TimeValue;
1111
import org.elasticsearch.tasks.TaskId;
1212
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
13-
import org.elasticsearch.xpack.eql.async.StoredAsyncTask;
13+
import org.elasticsearch.xpack.core.async.StoredAsyncTask;
1414

1515
import java.util.Map;
1616

@@ -27,19 +27,4 @@ public EqlSearchResponse getCurrentResult() {
2727
return new EqlSearchResponse(EqlSearchResponse.Hits.EMPTY, System.currentTimeMillis() - getStartTime(), false,
2828
getExecutionId().getEncoded(), true, true);
2929
}
30-
31-
32-
/**
33-
* Returns the status from {@link EqlSearchTask}
34-
*/
35-
public static EqlStatusResponse getStatusResponse(EqlSearchTask asyncTask) {
36-
return new EqlStatusResponse(
37-
asyncTask.getExecutionId().getEncoded(),
38-
true,
39-
true,
40-
asyncTask.getStartTime(),
41-
asyncTask.getExpirationTimeMillis(),
42-
null
43-
);
44-
}
4530
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetStatusAction.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
package org.elasticsearch.xpack.eql.plugin;
88

99
import org.elasticsearch.action.ActionType;
10-
import org.elasticsearch.xpack.eql.action.EqlStatusResponse;
10+
import org.elasticsearch.xpack.ql.async.QlStatusResponse;
1111

12-
public class EqlAsyncGetStatusAction extends ActionType<EqlStatusResponse> {
12+
public class EqlAsyncGetStatusAction extends ActionType<QlStatusResponse> {
1313
public static final EqlAsyncGetStatusAction INSTANCE = new EqlAsyncGetStatusAction();
1414
public static final String NAME = "cluster:monitor/eql/async/status";
1515

1616
private EqlAsyncGetStatusAction() {
17-
super(NAME, EqlStatusResponse::new);
17+
super(NAME, QlStatusResponse::new);
1818
}
1919
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public List<Setting<?>> getSettings() {
8787
return List.of(
8888
new ActionHandler<>(EqlSearchAction.INSTANCE, TransportEqlSearchAction.class),
8989
new ActionHandler<>(EqlStatsAction.INSTANCE, TransportEqlStatsAction.class),
90-
new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class),
90+
new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultsAction.class),
9191
new ActionHandler<>(EqlAsyncGetStatusAction.INSTANCE, TransportEqlAsyncGetStatusAction.class),
9292
new ActionHandler<>(XPackUsageFeatureAction.EQL, EqlUsageTransportAction.class),
9393
new ActionHandler<>(XPackInfoFeatureAction.EQL, EqlInfoTransportAction.class)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.eql.plugin;
8+
9+
import org.elasticsearch.action.support.ActionFilters;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.service.ClusterService;
12+
import org.elasticsearch.common.inject.Inject;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.common.util.BigArrays;
16+
import org.elasticsearch.threadpool.ThreadPool;
17+
import org.elasticsearch.transport.TransportService;
18+
import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames;
19+
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
20+
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
21+
import org.elasticsearch.xpack.ql.plugin.AbstractTransportQlAsyncGetResultsAction;
22+
23+
public class TransportEqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EqlSearchResponse, EqlSearchTask> {
24+
25+
@Inject
26+
public TransportEqlAsyncGetResultsAction(TransportService transportService,
27+
ActionFilters actionFilters,
28+
ClusterService clusterService,
29+
NamedWriteableRegistry registry,
30+
Client client,
31+
ThreadPool threadPool,
32+
BigArrays bigArrays) {
33+
super(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, transportService, actionFilters, clusterService, registry, client,
34+
threadPool, bigArrays, EqlSearchTask.class);
35+
}
36+
37+
@Override
38+
public Writeable.Reader<EqlSearchResponse> responseReader() {
39+
return EqlSearchResponse::new;
40+
}
41+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java

+6-43
Original file line numberDiff line numberDiff line change
@@ -6,39 +6,21 @@
66
*/
77
package org.elasticsearch.xpack.eql.plugin;
88

9-
import org.elasticsearch.action.ActionListener;
10-
import org.elasticsearch.action.ActionListenerResponseHandler;
119
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.action.support.HandledTransportAction;
1310
import org.elasticsearch.client.Client;
14-
import org.elasticsearch.cluster.node.DiscoveryNode;
1511
import org.elasticsearch.cluster.service.ClusterService;
1612
import org.elasticsearch.common.inject.Inject;
1713
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1814
import org.elasticsearch.common.io.stream.Writeable;
1915
import org.elasticsearch.common.util.BigArrays;
20-
import org.elasticsearch.tasks.Task;
2116
import org.elasticsearch.threadpool.ThreadPool;
2217
import org.elasticsearch.transport.TransportService;
23-
import org.elasticsearch.xpack.core.XPackPlugin;
24-
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
25-
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
26-
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
2718
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
2819
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
29-
import org.elasticsearch.xpack.eql.action.EqlStatusResponse;
30-
import org.elasticsearch.xpack.eql.async.StoredAsyncResponse;
20+
import org.elasticsearch.xpack.ql.plugin.AbstractTransportQlAsyncGetStatusAction;
3121

32-
import java.util.Objects;
33-
34-
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
35-
36-
37-
public class TransportEqlAsyncGetStatusAction extends HandledTransportAction<GetAsyncStatusRequest, EqlStatusResponse> {
38-
private final TransportService transportService;
39-
private final ClusterService clusterService;
40-
private final AsyncTaskIndexService<StoredAsyncResponse<EqlSearchResponse>> store;
4122

23+
public class TransportEqlAsyncGetStatusAction extends AbstractTransportQlAsyncGetStatusAction<EqlSearchResponse, EqlSearchTask> {
4224
@Inject
4325
public TransportEqlAsyncGetStatusAction(TransportService transportService,
4426
ActionFilters actionFilters,
@@ -47,31 +29,12 @@ public TransportEqlAsyncGetStatusAction(TransportService transportService,
4729
Client client,
4830
ThreadPool threadPool,
4931
BigArrays bigArrays) {
50-
super(EqlAsyncGetStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
51-
this.transportService = transportService;
52-
this.clusterService = clusterService;
53-
Writeable.Reader<StoredAsyncResponse<EqlSearchResponse>> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in);
54-
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
55-
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry, bigArrays);
32+
super(EqlAsyncGetStatusAction.NAME, transportService, actionFilters, clusterService, registry, client, threadPool, bigArrays,
33+
EqlSearchTask.class);
5634
}
5735

5836
@Override
59-
protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<EqlStatusResponse> listener) {
60-
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
61-
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
62-
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
63-
if (node == null || Objects.equals(node, localNode)) {
64-
store.retrieveStatus(
65-
request,
66-
taskManager,
67-
EqlSearchTask.class,
68-
EqlSearchTask::getStatusResponse,
69-
EqlStatusResponse::getStatusFromStoredSearch,
70-
listener
71-
);
72-
} else {
73-
transportService.sendRequest(node, EqlAsyncGetStatusAction.NAME, request,
74-
new ActionListenerResponseHandler<>(listener, EqlStatusResponse::new, ThreadPool.Names.SAME));
75-
}
37+
protected Writeable.Reader<EqlSearchResponse> responseReader() {
38+
return EqlSearchResponse::new;
7639
}
7740
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@
3535
import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
3636
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
3737
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
38-
import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService;
3938
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
4039
import org.elasticsearch.xpack.eql.parser.ParserParams;
4140
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
4241
import org.elasticsearch.xpack.eql.session.Results;
42+
import org.elasticsearch.xpack.ql.async.AsyncTaskManagementService;
4343
import org.elasticsearch.xpack.ql.expression.Order;
4444

4545
import java.io.IOException;

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java renamed to x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.eql.async;
8+
package org.elasticsearch.xpack.ql.async;
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
@@ -34,6 +34,8 @@
3434
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
3535
import org.elasticsearch.xpack.core.async.AsyncTask;
3636
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
37+
import org.elasticsearch.xpack.core.async.StoredAsyncResponse;
38+
import org.elasticsearch.xpack.core.async.StoredAsyncTask;
3739

3840
import java.io.IOException;
3941
import java.util.Map;

0 commit comments

Comments
 (0)