Skip to content

Commit 191f3e1

Browse files
authored
Ignore replication for noop updates (#46458)
Previously, we ignore replication for noop updates because they do not have sequence numbers. Since #44603, we started assigning sequence numbers to noop updates leading them to be replicated to replicas. This bug occurs only on 8.0 for it requires #41065 and #44603. Closes #46366
1 parent a6152f4 commit 191f3e1

File tree

2 files changed

+43
-16
lines changed

2 files changed

+43
-16
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -393,22 +393,20 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
393393
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
394394
Translog.Location location = null;
395395
for (int i = 0; i < request.items().length; i++) {
396-
BulkItemRequest item = request.items()[i];
397-
final Engine.Result operationResult;
398-
DocWriteRequest<?> docWriteRequest = item.request();
396+
final BulkItemRequest item = request.items()[i];
399397
final BulkItemResponse response = item.getPrimaryResponse();
400-
final BulkItemResponse.Failure failure = response.getFailure();
401-
final DocWriteResponse writeResponse = response.getResponse();
402-
final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo();
403-
if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) {
404-
assert failure != null || writeResponse.getResult() == DocWriteResponse.Result.NOOP
405-
|| writeResponse.getResult() == DocWriteResponse.Result.NOT_FOUND;
406-
continue;
407-
}
408-
if (failure == null) {
409-
operationResult = performOpOnReplica(writeResponse, docWriteRequest, replica);
398+
final Engine.Result operationResult;
399+
if (item.getPrimaryResponse().isFailed()) {
400+
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
401+
continue; // ignore replication as we didn't generate a sequence number for this request.
402+
}
403+
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), response.getFailure().getMessage());
410404
} else {
411-
operationResult = replica.markSeqNoAsNoop(seqNum, failure.getMessage());
405+
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
406+
continue; // ignore replication as it's a noop
407+
}
408+
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
409+
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
412410
}
413411
assert operationResult != null : "operation result must never be null when primary response has no failure";
414412
location = syncOperationResultOrThrow(operationResult, location);
@@ -435,8 +433,8 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
435433
deleteRequest.type(), deleteRequest.id());
436434
break;
437435
default:
438-
throw new IllegalStateException("Unexpected request operation type on replica: "
439-
+ docWriteRequest.opType().getLowercase());
436+
assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;
437+
throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
440438
}
441439
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
442440
// Even though the primary waits on all nodes to ack the mapping changes to the master

server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525
import org.elasticsearch.action.delete.DeleteRequest;
2626
import org.elasticsearch.action.get.GetResponse;
2727
import org.elasticsearch.action.index.IndexRequest;
28+
import org.elasticsearch.action.index.IndexResponse;
2829
import org.elasticsearch.action.search.SearchResponse;
2930
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
3031
import org.elasticsearch.action.update.UpdateRequest;
3132
import org.elasticsearch.action.update.UpdateRequestBuilder;
3233
import org.elasticsearch.action.update.UpdateResponse;
3334
import org.elasticsearch.client.Requests;
3435
import org.elasticsearch.cluster.metadata.IndexMetaData;
36+
import org.elasticsearch.common.Strings;
3537
import org.elasticsearch.common.settings.Settings;
3638
import org.elasticsearch.common.xcontent.XContentType;
3739
import org.elasticsearch.index.VersionType;
@@ -57,6 +59,7 @@
5759
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5860
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
5961
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
62+
import static org.hamcrest.Matchers.arrayWithSize;
6063
import static org.hamcrest.Matchers.containsString;
6164
import static org.hamcrest.Matchers.equalTo;
6265
import static org.hamcrest.Matchers.instanceOf;
@@ -618,5 +621,31 @@ public void testInvalidIndexNamesCorrectOpType() {
618621
assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE));
619622
assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE));
620623
}
624+
625+
public void testNoopUpdate() {
626+
String indexName = "test";
627+
createIndex(indexName, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
628+
internalCluster().ensureAtLeastNumDataNodes(2);
629+
ensureGreen(indexName);
630+
IndexResponse doc = index(indexName, "_doc", "1", Map.of("user", "xyz"));
631+
assertThat(doc.getShardInfo().getSuccessful(), equalTo(2));
632+
final BulkResponse bulkResponse = client().prepareBulk()
633+
.add(new UpdateRequest().index(indexName).id("1").detectNoop(true).doc("user", "xyz")) // noop update
634+
.add(new UpdateRequest().index(indexName).id("2").docAsUpsert(false).doc("f", "v")) // not_found update
635+
.add(new DeleteRequest().index(indexName).id("2")) // not_found delete
636+
.get();
637+
assertThat(bulkResponse.getItems(), arrayWithSize(3));
638+
639+
final BulkItemResponse noopUpdate = bulkResponse.getItems()[0];
640+
assertThat(noopUpdate.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
641+
assertThat(Strings.toString(noopUpdate), noopUpdate.getResponse().getShardInfo().getSuccessful(), equalTo(2));
642+
643+
final BulkItemResponse notFoundUpdate = bulkResponse.getItems()[1];
644+
assertNotNull(notFoundUpdate.getFailure());
645+
646+
final BulkItemResponse notFoundDelete = bulkResponse.getItems()[2];
647+
assertThat(notFoundDelete.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOT_FOUND));
648+
assertThat(Strings.toString(notFoundDelete), notFoundDelete.getResponse().getShardInfo().getSuccessful(), equalTo(2));
649+
}
621650
}
622651

0 commit comments

Comments
 (0)