Skip to content

Commit 306f1d7

Browse files
authored
[CCR] Retry when no index shard stats can be found (#34852)
Index shard stats for the follower shard are fetched, when a shard follow task is started. This is needed in order to bootstap the shard follow task with the follower global checkpoint. Sometimes index shard stats are not available (e.g. during a restart) and we fail now, while it is very likely that these stats will be available some time later.
1 parent 3f1fec1 commit 306f1d7

File tree

4 files changed

+162
-83
lines changed

4 files changed

+162
-83
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ private void fetchFollowerShardInfo(
205205
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
206206
IndexStats indexStats = r.getIndex(shardId.getIndexName());
207207
if (indexStats == null) {
208-
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
208+
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
209+
if (indexMetaData != null) {
210+
errorHandler.accept(new ShardNotFoundException(shardId));
211+
} else {
212+
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
213+
}
209214
return;
210215
}
211216

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
1010
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
11+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
12+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
13+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1114
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1215
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1316
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -24,9 +27,11 @@
2427
import org.elasticsearch.common.Priority;
2528
import org.elasticsearch.common.Strings;
2629
import org.elasticsearch.common.UUIDs;
30+
import org.elasticsearch.common.bytes.BytesReference;
2731
import org.elasticsearch.common.network.NetworkModule;
2832
import org.elasticsearch.common.settings.Settings;
2933
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.common.xcontent.XContentBuilder;
3035
import org.elasticsearch.core.internal.io.IOUtils;
3136
import org.elasticsearch.env.NodeEnvironment;
3237
import org.elasticsearch.index.Index;
@@ -35,6 +40,7 @@
3540
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3641
import org.elasticsearch.plugins.Plugin;
3742
import org.elasticsearch.script.ScriptService;
43+
import org.elasticsearch.tasks.TaskInfo;
3844
import org.elasticsearch.test.ESIntegTestCase;
3945
import org.elasticsearch.test.ESTestCase;
4046
import org.elasticsearch.test.InternalTestCluster;
@@ -48,6 +54,9 @@
4854
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
4955
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
5056
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
57+
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
58+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
59+
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
5160
import org.junit.After;
5261
import org.junit.AfterClass;
5362
import org.junit.Before;
@@ -59,14 +68,17 @@
5968
import java.util.Collection;
6069
import java.util.Collections;
6170
import java.util.Locale;
71+
import java.util.Map;
6272
import java.util.concurrent.CountDownLatch;
6373
import java.util.concurrent.TimeUnit;
6474
import java.util.function.Function;
6575

76+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
6677
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
6778
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
6879
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6980
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
81+
import static org.hamcrest.Matchers.empty;
7082
import static org.hamcrest.Matchers.equalTo;
7183
import static org.hamcrest.Matchers.lessThanOrEqualTo;
7284

@@ -279,6 +291,88 @@ protected void ensureEmptyWriteBuffers() throws Exception {
279291
});
280292
}
281293

294+
protected void pauseFollow(String... indices) throws Exception {
295+
for (String index : indices) {
296+
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
297+
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
298+
}
299+
ensureNoCcrTasks();
300+
}
301+
302+
protected void ensureNoCcrTasks() throws Exception {
303+
assertBusy(() -> {
304+
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
305+
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
306+
assertThat(tasks.tasks(), empty());
307+
308+
ListTasksRequest listTasksRequest = new ListTasksRequest();
309+
listTasksRequest.setDetailed(true);
310+
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
311+
int numNodeTasks = 0;
312+
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
313+
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
314+
numNodeTasks++;
315+
}
316+
}
317+
assertThat(numNodeTasks, equalTo(0));
318+
}, 30, TimeUnit.SECONDS);
319+
}
320+
321+
protected String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
322+
final Map<String, String> additionalIndexSettings) throws IOException {
323+
final String settings;
324+
try (XContentBuilder builder = jsonBuilder()) {
325+
builder.startObject();
326+
{
327+
builder.startObject("settings");
328+
{
329+
builder.field("index.number_of_shards", numberOfShards);
330+
builder.field("index.number_of_replicas", numberOfReplicas);
331+
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
332+
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
333+
}
334+
}
335+
builder.endObject();
336+
builder.startObject("mappings");
337+
{
338+
builder.startObject("doc");
339+
{
340+
builder.startObject("properties");
341+
{
342+
builder.startObject("f");
343+
{
344+
builder.field("type", "integer");
345+
}
346+
builder.endObject();
347+
}
348+
builder.endObject();
349+
}
350+
builder.endObject();
351+
}
352+
builder.endObject();
353+
}
354+
builder.endObject();
355+
settings = BytesReference.bytes(builder).utf8ToString();
356+
}
357+
return settings;
358+
}
359+
360+
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
361+
PutFollowAction.Request request = new PutFollowAction.Request();
362+
request.setRemoteCluster("leader_cluster");
363+
request.setLeaderIndex(leaderIndex);
364+
request.setFollowRequest(resumeFollow(followerIndex));
365+
return request;
366+
}
367+
368+
public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
369+
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
370+
request.setFollowerIndex(followerIndex);
371+
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
372+
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
373+
return request;
374+
}
375+
282376
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
283377
CountDownLatch latch = new CountDownLatch(1);
284378
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.apache.lucene.store.AlreadyClosedException;
1010
import org.elasticsearch.ElasticsearchException;
11-
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
1211
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1312
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1413
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -756,33 +755,6 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
756755
};
757756
}
758757

759-
private void pauseFollow(String... indices) throws Exception {
760-
for (String index : indices) {
761-
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
762-
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
763-
}
764-
ensureNoCcrTasks();
765-
}
766-
767-
private void ensureNoCcrTasks() throws Exception {
768-
assertBusy(() -> {
769-
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
770-
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
771-
assertThat(tasks.tasks(), empty());
772-
773-
ListTasksRequest listTasksRequest = new ListTasksRequest();
774-
listTasksRequest.setDetailed(true);
775-
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
776-
int numNodeTasks = 0;
777-
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
778-
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
779-
numNodeTasks++;
780-
}
781-
}
782-
assertThat(numNodeTasks, equalTo(0));
783-
}, 30, TimeUnit.SECONDS);
784-
}
785-
786758
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
787759
return () -> {
788760
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
@@ -792,45 +764,6 @@ private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int valu
792764
};
793765
}
794766

795-
private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
796-
final Map<String, String> additionalIndexSettings) throws IOException {
797-
final String settings;
798-
try (XContentBuilder builder = jsonBuilder()) {
799-
builder.startObject();
800-
{
801-
builder.startObject("settings");
802-
{
803-
builder.field("index.number_of_shards", numberOfShards);
804-
builder.field("index.number_of_replicas", numberOfReplicas);
805-
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
806-
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
807-
}
808-
}
809-
builder.endObject();
810-
builder.startObject("mappings");
811-
{
812-
builder.startObject("doc");
813-
{
814-
builder.startObject("properties");
815-
{
816-
builder.startObject("f");
817-
{
818-
builder.field("type", "integer");
819-
}
820-
builder.endObject();
821-
}
822-
builder.endObject();
823-
}
824-
builder.endObject();
825-
}
826-
builder.endObject();
827-
}
828-
builder.endObject();
829-
settings = BytesReference.bytes(builder).utf8ToString();
830-
}
831-
return settings;
832-
}
833-
834767
private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
835768
final Map<String, String> additionalIndexSettings) throws IOException {
836769
final String settings;
@@ -968,19 +901,4 @@ private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numbe
968901
});
969902
}
970903

971-
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
972-
PutFollowAction.Request request = new PutFollowAction.Request();
973-
request.setRemoteCluster("leader_cluster");
974-
request.setLeaderIndex(leaderIndex);
975-
request.setFollowRequest(resumeFollow(followerIndex));
976-
return request;
977-
}
978-
979-
public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
980-
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
981-
request.setFollowerIndex(followerIndex);
982-
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
983-
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
984-
return request;
985-
}
986904
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.common.xcontent.XContentType;
10+
import org.elasticsearch.index.IndexSettings;
11+
import org.elasticsearch.xpack.CcrIntegTestCase;
12+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
13+
14+
import java.util.Locale;
15+
16+
import static java.util.Collections.singletonMap;
17+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
18+
import static org.hamcrest.Matchers.equalTo;
19+
20+
public class RestartIndexFollowingIT extends CcrIntegTestCase {
21+
22+
@Override
23+
protected int numberOfNodesPerCluster() {
24+
return 1;
25+
}
26+
27+
public void testFollowIndex() throws Exception {
28+
final String leaderIndexSettings = getIndexSettings(1, 0,
29+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
30+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
31+
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
32+
ensureLeaderGreen("index1");
33+
34+
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
35+
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
36+
37+
final long firstBatchNumDocs = randomIntBetween(2, 64);
38+
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
39+
for (int i = 0; i < firstBatchNumDocs; i++) {
40+
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
41+
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
42+
}
43+
44+
assertBusy(() -> {
45+
assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs));
46+
});
47+
48+
getFollowerCluster().fullRestart();
49+
ensureFollowerGreen("index2");
50+
51+
final long secondBatchNumDocs = randomIntBetween(2, 64);
52+
for (int i = 0; i < secondBatchNumDocs; i++) {
53+
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
54+
}
55+
56+
assertBusy(() -> {
57+
assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits,
58+
equalTo(firstBatchNumDocs + secondBatchNumDocs));
59+
});
60+
}
61+
62+
}

0 commit comments

Comments
 (0)