Skip to content

Commit 9c9f748

Browse files
committed
[CCR] Improve error when operations are missing (#35179)
Improve error when operations are missing
1 parent 9b5849d commit 9c9f748

File tree

2 files changed

+51
-0
lines changed

2 files changed

+51
-0
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package org.elasticsearch.xpack.ccr.action;
77

88
import org.apache.logging.log4j.message.ParameterizedMessage;
9+
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.ExceptionsHelper;
911
import org.elasticsearch.action.Action;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.ActionRequestValidationException;
@@ -26,13 +28,15 @@
2628
import org.elasticsearch.common.unit.ByteSizeValue;
2729
import org.elasticsearch.common.unit.TimeValue;
2830
import org.elasticsearch.index.IndexService;
31+
import org.elasticsearch.index.IndexSettings;
2932
import org.elasticsearch.index.seqno.SeqNoStats;
3033
import org.elasticsearch.index.shard.IndexShard;
3134
import org.elasticsearch.index.shard.IndexShardNotStartedException;
3235
import org.elasticsearch.index.shard.IndexShardState;
3336
import org.elasticsearch.index.shard.ShardId;
3437
import org.elasticsearch.index.translog.Translog;
3538
import org.elasticsearch.indices.IndicesService;
39+
import org.elasticsearch.tasks.Task;
3640
import org.elasticsearch.threadpool.ThreadPool;
3741
import org.elasticsearch.transport.TransportService;
3842

@@ -376,6 +380,21 @@ protected void asyncShardOperation(
376380
}
377381
}
378382

383+
@Override
384+
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
385+
ActionListener<Response> wrappedListener = ActionListener.wrap(listener::onResponse, e -> {
386+
Throwable cause = ExceptionsHelper.unwrapCause(e);
387+
if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) {
388+
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
389+
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
390+
listener.onFailure(new ElasticsearchException(message, e));
391+
} else {
392+
listener.onFailure(e);
393+
}
394+
});
395+
super.doExecute(task, request, wrappedListener);
396+
}
397+
379398
private void globalCheckpointAdvanced(
380399
final ShardId shardId,
381400
final long globalCheckpoint,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package org.elasticsearch.xpack.ccr.action;
77

8+
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
10+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
811
import org.elasticsearch.action.admin.indices.stats.ShardStats;
912
import org.elasticsearch.common.settings.Settings;
1013
import org.elasticsearch.common.xcontent.XContentType;
@@ -84,4 +87,33 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
8487
assertThat(operation.id(), equalTo("5"));
8588
}
8689

90+
public void testMissingOperations() {
91+
client().admin().indices().prepareCreate("index")
92+
.setSettings(Settings.builder()
93+
.put("index.soft_deletes.enabled", true)
94+
.put("index.soft_deletes.retention.operations", 0)
95+
.put("index.number_of_shards", 1)
96+
.put("index.number_of_replicas", 0))
97+
.get();
98+
99+
for (int i = 0; i < 16; i++) {
100+
client().prepareIndex("index", "_doc", "1").setSource("{}", XContentType.JSON).get();
101+
client().prepareDelete("index", "_doc", "1").get();
102+
}
103+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
104+
ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index");
105+
forceMergeRequest.maxNumSegments(1);
106+
client().admin().indices().forceMerge(forceMergeRequest).actionGet();
107+
108+
ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
109+
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
110+
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
111+
request.setFromSeqNo(0L);
112+
request.setMaxOperationCount(1);
113+
114+
Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
115+
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
116+
"[index.soft_deletes.retention.operations]?"));
117+
}
118+
87119
}

0 commit comments

Comments
 (0)