Skip to content

Commit abf8cb6

Browse files
authored
[CCR] Cleanup pause follow action (#34183)
* Change the `TransportPauseFollowAction` to extend from `TransportMasterNodeAction` instead of `HandledAction`, this removes a sync cluster state api call. * Introduced `ResponseHandler` that removes duplicated code in `TransportPauseFollowAction` and `TransportResumeFollowAction`. * Changed `PauseFollowAction.Request` to not use `readFrom()`.
1 parent 52266d8 commit abf8cb6

File tree

7 files changed

+139
-148
lines changed

7 files changed

+139
-148
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.action;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
10+
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicReferenceArray;
13+
14+
final class ResponseHandler {
15+
16+
private final AtomicInteger counter;
17+
private final AtomicReferenceArray<Object> responses;
18+
private final ActionListener<AcknowledgedResponse> listener;
19+
20+
ResponseHandler(int numRequests, ActionListener<AcknowledgedResponse> listener) {
21+
this.counter = new AtomicInteger(numRequests);
22+
this.responses = new AtomicReferenceArray<>(numRequests);
23+
this.listener = listener;
24+
}
25+
26+
<T> ActionListener<T> getActionListener(final int requestId) {
27+
return new ActionListener<T>() {
28+
29+
@Override
30+
public void onResponse(T response) {
31+
responses.set(requestId, response);
32+
finalizeResponse();
33+
}
34+
35+
@Override
36+
public void onFailure(Exception e) {
37+
responses.set(requestId, e);
38+
finalizeResponse();
39+
}
40+
};
41+
}
42+
43+
private void finalizeResponse() {
44+
Exception error = null;
45+
if (counter.decrementAndGet() == 0) {
46+
for (int j = 0; j < responses.length(); j++) {
47+
Object response = responses.get(j);
48+
if (response instanceof Exception) {
49+
if (error == null) {
50+
error = (Exception) response;
51+
} else {
52+
error.addSuppressed((Exception) response);
53+
}
54+
}
55+
}
56+
57+
if (error == null) {
58+
listener.onResponse(new AcknowledgedResponse(true));
59+
} else {
60+
listener.onFailure(error);
61+
}
62+
}
63+
}
64+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java

Lines changed: 54 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -7,114 +7,88 @@
77
package org.elasticsearch.xpack.ccr.action;
88

99
import org.elasticsearch.action.ActionListener;
10-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1110
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.action.support.HandledTransportAction;
1311
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14-
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.block.ClusterBlockException;
15+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
16+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
17+
import org.elasticsearch.cluster.service.ClusterService;
1518
import org.elasticsearch.common.inject.Inject;
1619
import org.elasticsearch.common.settings.Settings;
1720
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1821
import org.elasticsearch.persistent.PersistentTasksService;
19-
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.threadpool.ThreadPool;
2023
import org.elasticsearch.transport.TransportService;
2124
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
2225

2326
import java.util.List;
24-
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicReferenceArray;
2627
import java.util.stream.Collectors;
2728

28-
public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {
29+
public class TransportPauseFollowAction extends TransportMasterNodeAction<PauseFollowAction.Request, AcknowledgedResponse> {
2930

30-
private final Client client;
3131
private final PersistentTasksService persistentTasksService;
3232

3333
@Inject
3434
public TransportPauseFollowAction(
3535
final Settings settings,
3636
final TransportService transportService,
3737
final ActionFilters actionFilters,
38-
final Client client,
38+
final ClusterService clusterService,
39+
final ThreadPool threadPool,
40+
final IndexNameExpressionResolver indexNameExpressionResolver,
3941
final PersistentTasksService persistentTasksService) {
40-
super(settings, PauseFollowAction.NAME, transportService, actionFilters, PauseFollowAction.Request::new);
41-
this.client = client;
42+
super(settings, PauseFollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
43+
PauseFollowAction.Request::new, indexNameExpressionResolver);
4244
this.persistentTasksService = persistentTasksService;
4345
}
4446

4547
@Override
46-
protected void doExecute(
47-
final Task task,
48-
final PauseFollowAction.Request request,
49-
final ActionListener<AcknowledgedResponse> listener) {
50-
51-
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
52-
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
53-
if (persistentTasksMetaData == null) {
54-
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
55-
return;
56-
}
57-
58-
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
59-
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
60-
.filter(persistentTask -> {
61-
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
62-
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
63-
})
64-
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
65-
.collect(Collectors.toList());
66-
67-
if (shardFollowTaskIds.isEmpty()) {
68-
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
69-
return;
70-
}
71-
72-
final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
73-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
74-
int i = 0;
75-
76-
for (String taskId : shardFollowTaskIds) {
77-
final int shardId = i++;
78-
persistentTasksService.sendRemoveRequest(taskId,
79-
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
80-
@Override
81-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
82-
responses.set(shardId, task);
83-
finalizeResponse();
84-
}
48+
protected String executor() {
49+
return ThreadPool.Names.SAME;
50+
}
8551

86-
@Override
87-
public void onFailure(Exception e) {
88-
responses.set(shardId, e);
89-
finalizeResponse();
90-
}
52+
@Override
53+
protected AcknowledgedResponse newResponse() {
54+
return new AcknowledgedResponse();
55+
}
9156

92-
void finalizeResponse() {
93-
Exception error = null;
94-
if (counter.decrementAndGet() == 0) {
95-
for (int j = 0; j < responses.length(); j++) {
96-
Object response = responses.get(j);
97-
if (response instanceof Exception) {
98-
if (error == null) {
99-
error = (Exception) response;
100-
} else {
101-
error.addSuppressed((Throwable) response);
102-
}
103-
}
104-
}
57+
@Override
58+
protected void masterOperation(PauseFollowAction.Request request,
59+
ClusterState state,
60+
ActionListener<AcknowledgedResponse> listener) throws Exception {
61+
PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
62+
if (persistentTasksMetaData == null) {
63+
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
64+
return;
65+
}
66+
67+
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
68+
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
69+
.filter(persistentTask -> {
70+
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
71+
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
72+
})
73+
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
74+
.collect(Collectors.toList());
75+
76+
if (shardFollowTaskIds.isEmpty()) {
77+
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
78+
return;
79+
}
80+
81+
int i = 0;
82+
final ResponseHandler responseHandler = new ResponseHandler(shardFollowTaskIds.size(), listener);
83+
for (String taskId : shardFollowTaskIds) {
84+
final int taskSlot = i++;
85+
persistentTasksService.sendRemoveRequest(taskId, responseHandler.getActionListener(taskSlot));
86+
}
87+
}
10588

106-
if (error == null) {
107-
// include task ids?
108-
listener.onResponse(new AcknowledgedResponse(true));
109-
} else {
110-
// TODO: cancel all started tasks
111-
listener.onFailure(error);
112-
}
113-
}
114-
}
115-
});
116-
}
117-
}, listener::onFailure));
89+
@Override
90+
protected ClusterBlockException checkBlock(PauseFollowAction.Request request, ClusterState state) {
91+
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowIndex());
11892
}
11993

12094
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.indices.IndicesRequestCache;
3333
import org.elasticsearch.indices.IndicesService;
3434
import org.elasticsearch.license.LicenseUtils;
35-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3635
import org.elasticsearch.persistent.PersistentTasksService;
3736
import org.elasticsearch.tasks.Task;
3837
import org.elasticsearch.threadpool.ThreadPool;
@@ -49,8 +48,6 @@
4948
import java.util.Map;
5049
import java.util.Objects;
5150
import java.util.Set;
52-
import java.util.concurrent.atomic.AtomicInteger;
53-
import java.util.concurrent.atomic.AtomicReferenceArray;
5451
import java.util.stream.Collectors;
5552

5653
public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
@@ -144,62 +141,22 @@ void start(
144141
IndexMetaData leaderIndexMetadata,
145142
IndexMetaData followIndexMetadata,
146143
String[] leaderIndexHistoryUUIDs,
147-
ActionListener<AcknowledgedResponse> handler) throws IOException {
144+
ActionListener<AcknowledgedResponse> listener) throws IOException {
148145

149146
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
150147
validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService);
151148
final int numShards = followIndexMetadata.getNumberOfShards();
152-
final AtomicInteger counter = new AtomicInteger(numShards);
153-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
149+
final ResponseHandler handler = new ResponseHandler(numShards, listener);
154150
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
155151
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
156152
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
157153

158-
for (int i = 0; i < numShards; i++) {
159-
final int shardId = i;
154+
for (int shardId = 0; shardId < numShards; shardId++) {
160155
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
161156

162157
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
163158
leaderIndexMetadata, followIndexMetadata, filteredHeaders);
164-
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
165-
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
166-
@Override
167-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
168-
responses.set(shardId, task);
169-
finalizeResponse();
170-
}
171-
172-
@Override
173-
public void onFailure(Exception e) {
174-
responses.set(shardId, e);
175-
finalizeResponse();
176-
}
177-
178-
void finalizeResponse() {
179-
Exception error = null;
180-
if (counter.decrementAndGet() == 0) {
181-
for (int j = 0; j < responses.length(); j++) {
182-
Object response = responses.get(j);
183-
if (response instanceof Exception) {
184-
if (error == null) {
185-
error = (Exception) response;
186-
} else {
187-
error.addSuppressed((Throwable) response);
188-
}
189-
}
190-
}
191-
192-
if (error == null) {
193-
// include task ids?
194-
handler.onResponse(new AcknowledgedResponse(true));
195-
} else {
196-
// TODO: cancel all started tasks
197-
handler.onFailure(error);
198-
}
199-
}
200-
}
201-
}
202-
);
159+
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId));
203160
}
204161
}
205162

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseFollowAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ public String getName() {
3131

3232
@Override
3333
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
34-
Request request = new Request();
35-
request.setFollowIndex(restRequest.param("index"));
34+
Request request = new Request(restRequest.param("index"));
3635
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
3736
}
3837
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,7 @@ public void testFollowIndexWithNestedField() throws Exception {
357357
}
358358

359359
public void testUnfollowNonExistingIndex() {
360-
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
361-
unfollowRequest.setFollowIndex("non-existing-index");
360+
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request("non-existing-index");
362361
expectThrows(IllegalArgumentException.class,
363362
() -> followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).actionGet());
364363
}
@@ -750,8 +749,7 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
750749

751750
private void pauseFollow(String... indices) throws Exception {
752751
for (String index : indices) {
753-
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
754-
unfollowRequest.setFollowIndex(index);
752+
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
755753
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
756754
}
757755
ensureNoCcrTasks();

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ public void testFollowIndex() throws Exception {
5252
assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs));
5353
});
5454

55-
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request();
56-
pauseRequest.setFollowIndex("follower");
55+
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request("follower");
5756
client().execute(PauseFollowAction.INSTANCE, pauseRequest);
5857

5958
final long thirdBatchNumDocs = randomIntBetween(2, 64);

0 commit comments

Comments
 (0)