Skip to content

Commit d4599ee

Browse files
authored
Field capabilities index action should not fork its execution (#69865)
This commit removes the usage of the management thread pool to execute the field capabilities index action. This action (similar to can_match) is cheap so it can be executed on the same thread (network).
1 parent 715eb90 commit d4599ee

File tree

3 files changed

+115
-66
lines changed

3 files changed

+115
-66
lines changed

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 68 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
5959
// retrieve the initial timestamp in case the action is a cross cluster search
6060
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
6161
final ClusterState clusterState = clusterService.state();
62-
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
63-
request.indices());
62+
final Map<String, OriginalIndices> remoteClusterIndices =
63+
remoteClusterService.groupIndices(request.indicesOptions(), request.indices());
6464
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
6565
final String[] concreteIndices;
6666
if (localIndices == null) {
@@ -70,62 +70,81 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
7070
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
7171
}
7272
final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size();
73-
final CountDown completionCounter = new CountDown(totalNumRequest);
74-
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
75-
final Runnable onResponse = () -> {
76-
if (completionCounter.countDown()) {
77-
if (request.isMergeResults()) {
78-
listener.onResponse(merge(indexResponses, request.includeUnmapped()));
79-
} else {
80-
listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
81-
}
82-
}
83-
};
8473
if (totalNumRequest == 0) {
8574
listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap()));
86-
} else {
87-
ActionListener<FieldCapabilitiesIndexResponse> innerListener = new ActionListener<FieldCapabilitiesIndexResponse>() {
88-
@Override
89-
public void onResponse(FieldCapabilitiesIndexResponse result) {
90-
if (result.canMatch()) {
91-
indexResponses.add(result);
75+
return;
76+
}
77+
78+
final CountDown completionCounter = new CountDown(totalNumRequest);
79+
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
80+
final ActionListener<List<FieldCapabilitiesIndexResponse>> countDownListener = new ActionListener<>() {
81+
@Override
82+
public void onResponse(List<FieldCapabilitiesIndexResponse> results) {
83+
for (FieldCapabilitiesIndexResponse res : results) {
84+
if (res.canMatch()) {
85+
indexResponses.add(res);
9286
}
93-
onResponse.run();
9487
}
88+
countDown();
89+
}
9590

96-
@Override
97-
public void onFailure(Exception e) {
98-
// TODO we should somehow inform the user that we failed
99-
onResponse.run();
100-
}
101-
};
102-
for (String index : concreteIndices) {
103-
client.executeLocally(TransportFieldCapabilitiesIndexAction.TYPE, new FieldCapabilitiesIndexRequest(request.fields(),
104-
index, localIndices, request.indexFilter(), nowInMillis, request.runtimeFields()), innerListener);
91+
@Override
92+
public void onFailure(Exception e) {
93+
// TODO we should somehow inform the user that we failed
94+
countDown();
10595
}
10696

107-
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
108-
// send us back all individual index results.
109-
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
110-
String clusterAlias = remoteIndices.getKey();
111-
OriginalIndices originalIndices = remoteIndices.getValue();
112-
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
113-
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
114-
remoteRequest.setMergeResults(false); // we need to merge on this node
115-
remoteRequest.indicesOptions(originalIndices.indicesOptions());
116-
remoteRequest.indices(originalIndices.indices());
117-
remoteRequest.fields(request.fields());
118-
remoteRequest.runtimeFields(request.runtimeFields());
119-
remoteRequest.indexFilter(request.indexFilter());
120-
remoteRequest.nowInMillis(nowInMillis);
121-
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
122-
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
123-
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
124-
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get(), res.canMatch()));
97+
private void countDown() {
98+
if (completionCounter.countDown()) {
99+
if (request.isMergeResults()) {
100+
listener.onResponse(merge(indexResponses, request.includeUnmapped()));
101+
} else {
102+
listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
125103
}
126-
onResponse.run();
127-
}, failure -> onResponse.run()));
104+
}
128105
}
106+
};
107+
108+
for (String index : concreteIndices) {
109+
client.executeLocally(TransportFieldCapabilitiesIndexAction.TYPE,
110+
new FieldCapabilitiesIndexRequest(
111+
request.fields(),
112+
index,
113+
localIndices,
114+
request.indexFilter(),
115+
nowInMillis, request.runtimeFields()
116+
),
117+
ActionListener.wrap(
118+
response -> countDownListener.onResponse(Collections.singletonList(response)),
119+
countDownListener::onFailure
120+
)
121+
);
122+
}
123+
124+
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
125+
// send us back all individual index results.
126+
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
127+
String clusterAlias = remoteIndices.getKey();
128+
OriginalIndices originalIndices = remoteIndices.getValue();
129+
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
130+
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
131+
remoteRequest.setMergeResults(false); // we need to merge on this node
132+
remoteRequest.indicesOptions(originalIndices.indicesOptions());
133+
remoteRequest.indices(originalIndices.indices());
134+
remoteRequest.fields(request.fields());
135+
remoteRequest.runtimeFields(request.runtimeFields());
136+
remoteRequest.indexFilter(request.indexFilter());
137+
remoteRequest.nowInMillis(nowInMillis);
138+
remoteClusterClient.fieldCaps(remoteRequest,
139+
ActionListener.wrap(response -> {
140+
List<FieldCapabilitiesIndexResponse> remotes = new ArrayList<>();
141+
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
142+
remotes.add(new FieldCapabilitiesIndexResponse(
143+
RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()),
144+
resp.get(), resp.canMatch()));
145+
}
146+
countDownListener.onResponse(remotes);
147+
}, countDownListener::onFailure));
129148
}
130149
}
131150

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
1414
import org.elasticsearch.action.ActionListener;
15-
import org.elasticsearch.action.ActionRunnable;
1615
import org.elasticsearch.action.ActionType;
1716
import org.elasticsearch.action.NoShardAvailableActionException;
1817
import org.elasticsearch.action.support.ActionFilters;
@@ -21,7 +20,6 @@
2120
import org.elasticsearch.cluster.ClusterState;
2221
import org.elasticsearch.cluster.block.ClusterBlockException;
2322
import org.elasticsearch.cluster.block.ClusterBlockLevel;
24-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2523
import org.elasticsearch.cluster.node.DiscoveryNode;
2624
import org.elasticsearch.cluster.node.DiscoveryNodes;
2725
import org.elasticsearch.cluster.routing.GroupShardsIterator;
@@ -60,7 +58,6 @@
6058
import java.util.HashSet;
6159
import java.util.Map;
6260
import java.util.Set;
63-
import java.util.concurrent.Executor;
6461
import java.util.function.Predicate;
6562

6663
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@@ -77,20 +74,17 @@ public class TransportFieldCapabilitiesIndexAction
7774

7875
private final ClusterService clusterService;
7976
private final TransportService transportService;
80-
private final SearchService searchService;
8177
private final IndicesService indicesService;
82-
private final Executor executor;
8378

8479
@Inject
85-
public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, TransportService transportService,
86-
IndicesService indicesService, SearchService searchService, ThreadPool threadPool,
87-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
80+
public TransportFieldCapabilitiesIndexAction(ClusterService clusterService,
81+
TransportService transportService,
82+
IndicesService indicesService,
83+
ActionFilters actionFilters) {
8884
super(ACTION_NAME, transportService, actionFilters, FieldCapabilitiesIndexRequest::new);
8985
this.clusterService = clusterService;
9086
this.transportService = transportService;
91-
this.searchService = searchService;
9287
this.indicesService = indicesService;
93-
this.executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
9488
transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME,
9589
FieldCapabilitiesIndexRequest::new, new ShardTransportHandler());
9690
}
@@ -305,7 +299,14 @@ public void messageReceived(final FieldCapabilitiesIndexRequest request,
305299
logger.trace("executing [{}]", request);
306300
}
307301
ActionListener<FieldCapabilitiesIndexResponse> listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request);
308-
executor.execute(ActionRunnable.supply(listener, () -> shardOperation(request)));
302+
final FieldCapabilitiesIndexResponse resp;
303+
try {
304+
resp = shardOperation(request);
305+
} catch (Exception exc) {
306+
listener.onFailure(exc);
307+
return;
308+
}
309+
listener.onResponse(resp);
309310
}
310311
}
311312
}

x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@
3030
import org.elasticsearch.tasks.TaskInfo;
3131
import org.elasticsearch.test.ESIntegTestCase;
3232

33+
import java.io.IOException;
3334
import java.nio.file.Path;
3435
import java.util.ArrayList;
3536
import java.util.Collection;
3637
import java.util.List;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
3740
import java.util.concurrent.atomic.AtomicBoolean;
3841
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.function.Consumer;
3943

4044
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
4145
import static org.hamcrest.Matchers.equalTo;
@@ -128,6 +132,8 @@ public static class SearchBlockPlugin extends Plugin implements ActionPlugin {
128132

129133
private final String nodeId;
130134

135+
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
136+
131137
public void reset() {
132138
contexts.set(0);
133139
fieldCaps.set(0);
@@ -183,11 +189,14 @@ public int order() {
183189

184190
@Override
185191
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
186-
Task task, String action, Request request, ActionListener<Response> listener,
192+
Task task,
193+
String action,
194+
Request request,
195+
ActionListener<Response> listener,
187196
ActionFilterChain<Request, Response> chain) {
188-
ActionListener<Response> listenerWrapper = listener;
197+
189198
if (action.equals(FieldCapabilitiesAction.NAME)) {
190-
listenerWrapper = ActionListener.wrap(resp -> {
199+
final Consumer<Response> actionWrapper = resp -> {
191200
try {
192201
fieldCaps.incrementAndGet();
193202
logger.trace("blocking field caps on " + nodeId);
@@ -198,14 +207,34 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
198207
} finally {
199208
listener.onResponse(resp);
200209
}
201-
}, listener::onFailure);
202-
210+
logger.trace("unblocking field caps on " + nodeId);
211+
};
212+
final Thread originalThread = Thread.currentThread();
213+
chain.proceed(task, action, request,
214+
ActionListener.wrap(
215+
resp -> {
216+
if (originalThread == Thread.currentThread()) {
217+
// async if we never exited the original thread
218+
executorService.execute(() -> actionWrapper.accept(resp));
219+
} else {
220+
actionWrapper.accept(resp);
221+
}
222+
},
223+
listener::onFailure)
224+
);
225+
} else {
226+
chain.proceed(task, action, request, listener);
203227
}
204-
chain.proceed(task, action, request, listenerWrapper);
205228
}
206229
});
207230
return list;
208231
}
232+
233+
@Override
234+
public void close() throws IOException {
235+
List<Runnable> runnables = executorService.shutdownNow();
236+
assertTrue(runnables.isEmpty());
237+
}
209238
}
210239

211240
@Override

0 commit comments

Comments
 (0)