Skip to content

Commit 013b64a

Browse files
authored
[CCR] Change FollowIndexAction.Request class to be more user friendly (#33810)
Instead of having one constructor that accepts all arguments, all parameters should be provided via setters. Only leader and follower index are required arguments. This makes using this class in tests and transport client easier.
1 parent c6e3231 commit 013b64a

File tree

10 files changed

+267
-190
lines changed

10 files changed

+267
-190
lines changed

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,13 @@ private static void refresh(String index) throws IOException {
199199

200200
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
201201
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
202-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
202+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
203203
assertOK(client().performRequest(request));
204204
}
205205

206206
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
207207
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
208-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
208+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
209209
assertOK(client().performRequest(request));
210210
}
211211

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ private static void refresh(String index) throws IOException {
141141

142142
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
143143
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
144-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
144+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
145145
assertOK(client().performRequest(request));
146146
}
147147

148148
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
149149
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
150-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
150+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
151151
assertOK(client().performRequest(request));
152152
}
153153

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,16 @@ private void followLeaderIndex(String clusterAlias, Index indexToFollow,
297297

298298
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
299299
clusterAlias + ":" + leaderIndexName;
300-
FollowIndexAction.Request request =
301-
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
302-
pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
303-
pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
304-
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
305-
pattern.getIdleShardRetryDelay());
300+
FollowIndexAction.Request request = new FollowIndexAction.Request();
301+
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
302+
request.setFollowerIndex(followIndexName);
303+
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
304+
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
305+
request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes());
306+
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
307+
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
308+
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
309+
request.setPollTimeout(pattern.getIdleShardRetryDelay());
306310

307311
// Execute if the create and follow api call succeeds:
308312
Runnable successHandler = () -> {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.indices.IndicesService;
3333
import org.elasticsearch.threadpool.ThreadPool;
3434
import org.elasticsearch.transport.TransportService;
35-
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
3635

3736
import java.io.IOException;
3837
import java.util.ArrayList;
@@ -64,8 +63,8 @@ public static class Request extends SingleShardRequest<Request> {
6463
private int maxOperationCount;
6564
private ShardId shardId;
6665
private String expectedHistoryUUID;
67-
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
68-
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
66+
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
67+
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
6968

7069
public Request(ShardId shardId, String expectedHistoryUUID) {
7170
super(shardId.getIndexName());

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.inject.Inject;
2020
import org.elasticsearch.common.settings.Setting;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.unit.TimeValue;
2223
import org.elasticsearch.index.IndexNotFoundException;
2324
import org.elasticsearch.index.IndexSettings;
2425
import org.elasticsearch.index.IndexingSlowLog;
@@ -55,6 +56,14 @@
5556

5657
public class TransportFollowIndexAction extends HandledTransportAction<FollowIndexAction.Request, AcknowledgedResponse> {
5758

59+
static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
60+
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
61+
private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
62+
private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
63+
private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024;
64+
private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1;
65+
static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
66+
5867
private final Client client;
5968
private final ThreadPool threadPool;
6069
private final ClusterService clusterService;
@@ -179,19 +188,8 @@ void start(
179188
String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata);
180189
String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId];
181190

182-
ShardFollowTask shardFollowTask = new ShardFollowTask(
183-
clusterNameAlias,
184-
new ShardId(followIndexMetadata.getIndex(), shardId),
185-
new ShardId(leaderIndexMetadata.getIndex(), shardId),
186-
request.getMaxBatchOperationCount(),
187-
request.getMaxConcurrentReadBatches(),
188-
request.getMaxOperationSizeInBytes(),
189-
request.getMaxConcurrentWriteBatches(),
190-
request.getMaxWriteBufferSize(),
191-
request.getMaxRetryDelay(),
192-
request.getPollTimeout(),
193-
recordedLeaderShardHistoryUUID,
194-
filteredHeaders);
191+
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
192+
leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders);
195193
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
196194
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
197195
@Override
@@ -299,6 +297,69 @@ static void validate(
299297
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
300298
}
301299

300+
private static ShardFollowTask createShardFollowTask(
301+
int shardId,
302+
String clusterAliasName,
303+
FollowIndexAction.Request request,
304+
IndexMetaData leaderIndexMetadata,
305+
IndexMetaData followIndexMetadata,
306+
String recordedLeaderShardHistoryUUID,
307+
Map<String, String> filteredHeaders
308+
) {
309+
int maxBatchOperationCount;
310+
if (request.getMaxBatchOperationCount() != null) {
311+
maxBatchOperationCount = request.getMaxBatchOperationCount();
312+
} else {
313+
maxBatchOperationCount = DEFAULT_MAX_BATCH_OPERATION_COUNT;
314+
}
315+
316+
int maxConcurrentReadBatches;
317+
if (request.getMaxConcurrentReadBatches() != null){
318+
maxConcurrentReadBatches = request.getMaxConcurrentReadBatches();
319+
} else {
320+
maxConcurrentReadBatches = DEFAULT_MAX_CONCURRENT_READ_BATCHES;
321+
}
322+
323+
long maxOperationSizeInBytes;
324+
if (request.getMaxOperationSizeInBytes() != null) {
325+
maxOperationSizeInBytes = request.getMaxOperationSizeInBytes();
326+
} else {
327+
maxOperationSizeInBytes = DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
328+
}
329+
330+
int maxConcurrentWriteBatches;
331+
if (request.getMaxConcurrentWriteBatches() != null) {
332+
maxConcurrentWriteBatches = request.getMaxConcurrentWriteBatches();
333+
} else {
334+
maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
335+
}
336+
337+
int maxWriteBufferSize;
338+
if (request.getMaxWriteBufferSize() != null) {
339+
maxWriteBufferSize = request.getMaxWriteBufferSize();
340+
} else {
341+
maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE;
342+
}
343+
344+
TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay();
345+
TimeValue pollTimeout = request.getPollTimeout() == null ? DEFAULT_POLL_TIMEOUT : request.getPollTimeout();
346+
347+
return new ShardFollowTask(
348+
clusterAliasName,
349+
new ShardId(followIndexMetadata.getIndex(), shardId),
350+
new ShardId(leaderIndexMetadata.getIndex(), shardId),
351+
maxBatchOperationCount,
352+
maxConcurrentReadBatches,
353+
maxOperationSizeInBytes,
354+
maxConcurrentWriteBatches,
355+
maxWriteBufferSize,
356+
maxRetryDelay,
357+
pollTimeout,
358+
recordedLeaderShardHistoryUUID,
359+
filteredHeaders
360+
);
361+
}
362+
302363
private static String[] extractIndexShardHistoryUUIDs(Map<String, String> ccrIndexMetaData) {
303364
String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS);
304365
return historyUUIDs.split(",");

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,16 +192,12 @@ private void assertNonCompliantLicense(final Exception e) {
192192
}
193193

194194
private FollowIndexAction.Request getFollowRequest() {
195-
return new FollowIndexAction.Request(
196-
"leader",
197-
"follower",
198-
FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
199-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES,
200-
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
201-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES,
202-
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
203-
TimeValue.timeValueMillis(10),
204-
TimeValue.timeValueMillis(10));
195+
FollowIndexAction.Request request = new FollowIndexAction.Request();
196+
request.setLeaderIndex("leader");
197+
request.setFollowerIndex("follower");
198+
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
199+
request.setPollTimeout(TimeValue.timeValueMillis(10));
200+
return request;
205201
}
206202

207203
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -319,9 +319,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
319319
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
320320
atLeastDocsIndexed("index1", numDocsIndexed / 3);
321321

322-
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize,
323-
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
324-
randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
322+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
323+
followRequest.setMaxBatchOperationCount(maxReadSize);
324+
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
325+
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
326+
followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240));
325327
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
326328
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
327329

@@ -358,9 +360,10 @@ public void testFollowIndexAndCloseNode() throws Exception {
358360
});
359361
thread.start();
360362

361-
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048),
362-
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
363-
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
363+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
364+
followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048));
365+
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
366+
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
364367
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
365368

366369
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
@@ -447,7 +450,7 @@ public void testFollowNonExistentIndex() throws Exception {
447450
.actionGet());
448451
}
449452

450-
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
453+
public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
451454
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
452455
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
453456
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
@@ -460,8 +463,8 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
460463
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
461464
}
462465

463-
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
464-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
466+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
467+
followRequest.setMaxOperationSizeInBytes(1L);
465468
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
466469
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
467470

@@ -489,25 +492,21 @@ public void testDontFollowTheWrongIndex() throws Exception {
489492
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
490493
ensureGreen("index3");
491494

492-
FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
493-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
495+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
494496
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
495497
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
496498

497-
followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L,
498-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
499+
followRequest = createFollowRequest("index3", "index4");
499500
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
500501
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
501502
unfollowIndex("index2", "index4");
502503

503-
FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L,
504-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
504+
FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4");
505505
Exception e = expectThrows(IllegalArgumentException.class,
506506
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
507507
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
508508

509-
FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L,
510-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
509+
FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2");
511510
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
512511
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
513512
}
@@ -716,10 +715,12 @@ private void assertSameDocCount(String index1, String index2) throws Exception {
716715
}, 60, TimeUnit.SECONDS);
717716
}
718717

719-
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) {
720-
return new FollowIndexAction.Request(leaderIndex, followIndex, FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
721-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
722-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
723-
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10));
718+
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
719+
FollowIndexAction.Request request = new FollowIndexAction.Request();
720+
request.setLeaderIndex(leaderIndex);
721+
request.setFollowerIndex(followerIndex);
722+
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
723+
request.setPollTimeout(TimeValue.timeValueMillis(10));
724+
return request;
724725
}
725726
}

0 commit comments

Comments
 (0)