Skip to content

Commit 5157076

Browse files
authored
Fix delete- and update-by-query on indices without sequence numbers (#50077)
This fixes a bug where delete-by-query and update-by-query were not working on an older index that did not have sequence numbers yet. The reason is that in a mixed-version 6.x/5.x cluster, documents can have a primary term but miss a sequence number.
1 parent a2697dc commit 5157076

File tree

7 files changed

+81
-14
lines changed

7 files changed

+81
-14
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.delete.DeleteRequest;
2626
import org.elasticsearch.client.ParentTaskAssigningClient;
2727
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.index.seqno.SequenceNumbers;
2829
import org.elasticsearch.script.ScriptService;
2930
import org.elasticsearch.threadpool.ThreadPool;
3031

@@ -33,18 +34,18 @@
3334
*/
3435
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
3536

36-
private final boolean useSeqNoForCAS;
37+
private final boolean allowSeqNoForCAS;
3738

3839
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
3940
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
4041
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
4142
super(task,
42-
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
43-
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
43+
// always return version information, as sequence number information might not be available on an older index
44+
true,
4445
// all nodes support sequence number powered optimistic concurrency control and we can use it
4546
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
4647
logger, client, threadPool, action, request, listener);
47-
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
48+
allowSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
4849
}
4950

5051
@Override
@@ -60,10 +61,11 @@ protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc
6061
delete.index(doc.getIndex());
6162
delete.type(doc.getType());
6263
delete.id(doc.getId());
63-
if (useSeqNoForCAS) {
64+
if (allowSeqNoForCAS && doc.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
6465
delete.setIfSeqNo(doc.getSeqNo());
6566
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
6667
} else {
68+
assert doc.getVersion() != -1 : "no version retrieved";
6769
delete.version(doc.getVersion());
6870
}
6971
return wrap(delete);

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.index.mapper.ParentFieldMapper;
3939
import org.elasticsearch.index.mapper.RoutingFieldMapper;
4040
import org.elasticsearch.index.mapper.TypeFieldMapper;
41+
import org.elasticsearch.index.seqno.SequenceNumbers;
4142
import org.elasticsearch.script.Script;
4243
import org.elasticsearch.script.ScriptService;
4344
import org.elasticsearch.tasks.Task;
@@ -88,18 +89,18 @@ protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkByScro
8889
*/
8990
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest, TransportUpdateByQueryAction> {
9091

91-
private final boolean useSeqNoForCAS;
92+
private final boolean allowSeqNoForCAS;
9293

9394
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
9495
ThreadPool threadPool, TransportUpdateByQueryAction action, UpdateByQueryRequest request, ClusterState clusterState,
9596
ActionListener<BulkByScrollResponse> listener) {
9697
super(task,
97-
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
98-
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
98+
// always return version information, as sequence number information might not be available on an older index
99+
true,
99100
// all nodes support sequence number powered optimistic concurrency control and we can use it
100101
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
101102
logger, client, threadPool, action, request, listener);
102-
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
103+
allowSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
103104
}
104105

105106
@Override
@@ -118,10 +119,11 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
118119
index.type(doc.getType());
119120
index.id(doc.getId());
120121
index.source(doc.getSource(), doc.getXContentType());
121-
if (useSeqNoForCAS) {
122+
if (allowSeqNoForCAS && doc.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
122123
index.setIfSeqNo(doc.getSeqNo());
123124
index.setIfPrimaryTerm(doc.getPrimaryTerm());
124125
} else {
126+
assert doc.getVersion() != -1 : "no version retrieved";
125127
index.versionType(VersionType.INTERNAL);
126128
index.version(doc.getVersion());
127129
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,62 @@ public void testUpdateDoc() throws Exception {
472472
}
473473
}
474474

475+
/** Ensure that we can always execute delete-by-query regardless of the version of cluster */
476+
public void testDeleteByQuery() throws Exception {
477+
final String index = "test_delete_by_query";
478+
if (CLUSTER_TYPE == ClusterType.OLD) {
479+
Settings.Builder settings = Settings.builder()
480+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
481+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
482+
createIndex(index, settings.build());
483+
}
484+
for (int i = 0; i < 100; i++) {
485+
Request indexDoc = new Request("POST", index + "/test");
486+
indexDoc.setJsonEntity("{\"test\": \"test_" + randomInt(5) + "\"}");
487+
client().performRequest(indexDoc);
488+
}
489+
client().performRequest(new Request("POST", index + "/_refresh"));
490+
if (randomBoolean()) {
491+
ensureGreen(index);
492+
}
493+
Request deleteByQuery = new Request("POST", index + "/_delete_by_query");
494+
deleteByQuery.setJsonEntity("{\"query\": {\"term\": { \"test\": \"test_" + CLUSTER_TYPE.ordinal() + "\" }}}");
495+
Map<String, Object> doc = entityAsMap(client().performRequest(deleteByQuery));
496+
logger.info(doc);
497+
498+
if (randomBoolean()) {
499+
syncedFlush(index);
500+
}
501+
}
502+
503+
/** Ensure that we can always execute delete-by-query regardless of the version of cluster */
504+
public void testUpdateByQuery() throws Exception {
505+
final String index = "test_update_by_query";
506+
if (CLUSTER_TYPE == ClusterType.OLD) {
507+
Settings.Builder settings = Settings.builder()
508+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
509+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
510+
createIndex(index, settings.build());
511+
}
512+
for (int i = 0; i < 100; i++) {
513+
Request indexDoc = new Request("POST", index + "/test");
514+
indexDoc.setJsonEntity("{\"test\": \"test_" + randomInt(5) + "\"}");
515+
client().performRequest(indexDoc);
516+
}
517+
client().performRequest(new Request("POST", index + "/_refresh"));
518+
if (randomBoolean()) {
519+
ensureGreen(index);
520+
}
521+
Request updateByQuery = new Request("POST", index + "/_update_by_query");
522+
updateByQuery.setJsonEntity("{\"query\": {\"term\": { \"test\": \"test_" + CLUSTER_TYPE.ordinal() + "\" }}}");
523+
Map<String, Object> doc = entityAsMap(client().performRequest(updateByQuery));
524+
logger.info(doc);
525+
526+
if (randomBoolean()) {
527+
syncedFlush(index);
528+
}
529+
}
530+
475531
private void syncedFlush(String index) throws Exception {
476532
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
477533
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.xcontent.XContentParser;
3232
import org.elasticsearch.common.xcontent.XContentType;
3333
import org.elasticsearch.index.engine.VersionConflictEngineException;
34+
import org.elasticsearch.index.seqno.SequenceNumbers;
3435
import org.elasticsearch.threadpool.ThreadPool;
3536
import org.elasticsearch.transport.TransportService;
3637
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
@@ -109,7 +110,8 @@ private void indexUpdatedFilter(MlFilter filter, final long version, final long
109110
UpdateFilterAction.Request request,
110111
ActionListener<PutFilterAction.Response> listener) {
111112
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
112-
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
113+
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) &&
114+
seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
113115
indexRequest.setIfSeqNo(seqNo);
114116
indexRequest.setIfPrimaryTerm(primaryTerm);
115117
} else {

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,8 @@ private void innerRefresh(String tokenDocId, Authentication userAuth, ActionList
884884
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
885885
.setDoc("refresh_token", Collections.singletonMap("refreshed", true))
886886
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
887-
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
887+
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) &&
888+
response.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
888889
assert response.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
889890
: "reading a token [" + tokenDocId + "] with no sequence number";
890891
assert response.getPrimaryTerm() != SequenceNumbers.UNASSIGNED_PRIMARY_TERM :

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.xcontent.json.JsonXContent;
4040
import org.elasticsearch.index.engine.DocumentMissingException;
4141
import org.elasticsearch.index.engine.VersionConflictEngineException;
42+
import org.elasticsearch.index.seqno.SequenceNumbers;
4243
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
4344
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
4445
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
@@ -363,7 +364,8 @@ public void updateWatchStatus(Watch watch) throws IOException {
363364

364365
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
365366
updateRequest.doc(source);
366-
boolean useSeqNoForCAS = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
367+
boolean useSeqNoForCAS = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)
368+
&& watch.getSourceSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
367369
if (useSeqNoForCAS) {
368370
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
369371
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.xcontent.ToXContent;
2626
import org.elasticsearch.common.xcontent.XContentBuilder;
2727
import org.elasticsearch.common.xcontent.XContentType;
28+
import org.elasticsearch.index.seqno.SequenceNumbers;
2829
import org.elasticsearch.license.XPackLicenseState;
2930
import org.elasticsearch.rest.RestStatus;
3031
import org.elasticsearch.threadpool.ThreadPool;
@@ -109,7 +110,8 @@ protected void masterOperation(AckWatchRequest request, ClusterState state,
109110

110111
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
111112
// this may reject this action, but prevents concurrent updates from a watch execution
112-
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
113+
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) &&
114+
getResponse.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
113115
updateRequest.setIfSeqNo(getResponse.getSeqNo());
114116
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());
115117
} else {

0 commit comments

Comments
 (0)