Skip to content

Commit 24c3a1d

Browse files
committed
Ignore replication for noop updates (#46458)
Previously, we ignore replication for noop updates because they do not have sequence numbers. Since #44603, we started assigning sequence numbers to noop updates leading them to be replicated to replicas. This bug occurs only on 8.0 for it requires #41065 and #44603. Closes #46366
1 parent 7b26a8c commit 24c3a1d

File tree

3 files changed

+46
-113
lines changed

3 files changed

+46
-113
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

+17-70
Original file line numberDiff line numberDiff line change
@@ -383,54 +383,6 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest,
383383
return response;
384384
}
385385

386-
387-
/** Modes for executing item request on replica depending on corresponding primary execution result */
388-
public enum ReplicaItemExecutionMode {
389-
390-
/**
391-
* When primary execution succeeded
392-
*/
393-
NORMAL,
394-
395-
/**
396-
* When primary execution failed before sequence no was generated
397-
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes)
398-
*/
399-
NOOP,
400-
401-
/**
402-
* When primary execution failed after sequence no was generated
403-
*/
404-
FAILURE
405-
}
406-
407-
/**
408-
* Determines whether a bulk item request should be executed on the replica.
409-
*
410-
* @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures
411-
* {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation
412-
* {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or
413-
* when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes)
414-
*/
415-
static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) {
416-
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
417-
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
418-
if (primaryResponse.isFailed()) {
419-
return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
420-
? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op
421-
: ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication
422-
} else {
423-
// TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq#
424-
// (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether
425-
// an operation should be processed or be treated as a noop. This means we could remove this method and the
426-
// ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for
427-
// both failures and indexing operations.
428-
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
429-
? ReplicaItemExecutionMode.NORMAL // execution successful on primary
430-
: ReplicaItemExecutionMode.NOOP; // ignore replication
431-
}
432-
}
433-
434386
@Override
435387
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
436388
final Translog.Location location = performOnReplica(request, replica);
@@ -440,28 +392,23 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
440392
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
441393
Translog.Location location = null;
442394
for (int i = 0; i < request.items().length; i++) {
443-
BulkItemRequest item = request.items()[i];
395+
final BulkItemRequest item = request.items()[i];
396+
final BulkItemResponse response = item.getPrimaryResponse();
444397
final Engine.Result operationResult;
445-
DocWriteRequest<?> docWriteRequest = item.request();
446-
switch (replicaItemExecutionMode(item, i)) {
447-
case NORMAL:
448-
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
449-
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
450-
assert operationResult != null : "operation result must never be null when primary response has no failure";
451-
location = syncOperationResultOrThrow(operationResult, location);
452-
break;
453-
case NOOP:
454-
break;
455-
case FAILURE:
456-
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
457-
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
458-
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
459-
assert operationResult != null : "operation result must never be null when primary response has no failure";
460-
location = syncOperationResultOrThrow(operationResult, location);
461-
break;
462-
default:
463-
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
398+
if (item.getPrimaryResponse().isFailed()) {
399+
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
400+
continue; // ignore replication as we didn't generate a sequence number for this request.
401+
}
402+
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), response.getFailure().getMessage());
403+
} else {
404+
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
405+
continue; // ignore replication as it's a noop
406+
}
407+
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
408+
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
464409
}
410+
assert operationResult != null : "operation result must never be null when primary response has no failure";
411+
location = syncOperationResultOrThrow(operationResult, location);
465412
}
466413
return location;
467414
}
@@ -485,8 +432,8 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
485432
deleteRequest.type(), deleteRequest.id());
486433
break;
487434
default:
488-
throw new IllegalStateException("Unexpected request operation type on replica: "
489-
+ docWriteRequest.opType().getLowercase());
435+
assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;
436+
throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
490437
}
491438
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
492439
// Even though the primary waits on all nodes to ack the mapping changes to the master

server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java

+29
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525
import org.elasticsearch.action.delete.DeleteRequest;
2626
import org.elasticsearch.action.get.GetResponse;
2727
import org.elasticsearch.action.index.IndexRequest;
28+
import org.elasticsearch.action.index.IndexResponse;
2829
import org.elasticsearch.action.search.SearchResponse;
2930
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
3031
import org.elasticsearch.action.update.UpdateRequest;
3132
import org.elasticsearch.action.update.UpdateRequestBuilder;
3233
import org.elasticsearch.action.update.UpdateResponse;
3334
import org.elasticsearch.client.Requests;
3435
import org.elasticsearch.cluster.metadata.IndexMetaData;
36+
import org.elasticsearch.common.Strings;
3537
import org.elasticsearch.common.settings.Settings;
3638
import org.elasticsearch.common.xcontent.XContentType;
3739
import org.elasticsearch.index.VersionType;
@@ -57,6 +59,7 @@
5759
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5860
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
5961
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
62+
import static org.hamcrest.Matchers.arrayWithSize;
6063
import static org.hamcrest.Matchers.containsString;
6164
import static org.hamcrest.Matchers.equalTo;
6265
import static org.hamcrest.Matchers.instanceOf;
@@ -618,5 +621,31 @@ public void testInvalidIndexNamesCorrectOpType() {
618621
assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE));
619622
assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE));
620623
}
624+
625+
public void testNoopUpdate() {
626+
String indexName = "test";
627+
createIndex(indexName, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
628+
internalCluster().ensureAtLeastNumDataNodes(2);
629+
ensureGreen(indexName);
630+
IndexResponse doc = index(indexName, "_doc", "1", Collections.singletonMap("user", "xyz"));
631+
assertThat(doc.getShardInfo().getSuccessful(), equalTo(2));
632+
final BulkResponse bulkResponse = client().prepareBulk()
633+
.add(new UpdateRequest().index(indexName).id("1").detectNoop(true).doc("user", "xyz")) // noop update
634+
.add(new UpdateRequest().index(indexName).id("2").docAsUpsert(false).doc("f", "v")) // not_found update
635+
.add(new DeleteRequest().index(indexName).id("2")) // not_found delete
636+
.get();
637+
assertThat(bulkResponse.getItems(), arrayWithSize(3));
638+
639+
final BulkItemResponse noopUpdate = bulkResponse.getItems()[0];
640+
assertThat(noopUpdate.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
641+
assertThat(Strings.toString(noopUpdate), noopUpdate.getResponse().getShardInfo().getSuccessful(), equalTo(2));
642+
643+
final BulkItemResponse notFoundUpdate = bulkResponse.getItems()[1];
644+
assertNotNull(notFoundUpdate.getFailure());
645+
646+
final BulkItemResponse notFoundDelete = bulkResponse.getItems()[2];
647+
assertThat(notFoundDelete.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOT_FOUND));
648+
assertThat(Strings.toString(notFoundDelete), notFoundDelete.getResponse().getShardInfo().getSuccessful(), equalTo(2));
649+
}
621650
}
622651

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

-43
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.action.DocWriteRequest;
2727
import org.elasticsearch.action.DocWriteResponse;
2828
import org.elasticsearch.action.LatchedActionListener;
29-
import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode;
3029
import org.elasticsearch.action.delete.DeleteRequest;
3130
import org.elasticsearch.action.delete.DeleteResponse;
3231
import org.elasticsearch.action.index.IndexRequest;
@@ -58,7 +57,6 @@
5857
import java.util.concurrent.CountDownLatch;
5958
import java.util.concurrent.atomic.AtomicInteger;
6059

61-
import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
6260
import static org.hamcrest.CoreMatchers.equalTo;
6361
import static org.hamcrest.CoreMatchers.not;
6462
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -95,47 +93,6 @@ private IndexMetaData indexMetaData() throws IOException {
9593
.primaryTerm(0, 1).build();
9694
}
9795

98-
public void testShouldExecuteReplicaItem() throws Exception {
99-
// Successful index request should be replicated
100-
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id")
101-
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
102-
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean());
103-
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
104-
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
105-
assertThat(replicaItemExecutionMode(request, 0),
106-
equalTo(ReplicaItemExecutionMode.NORMAL));
107-
108-
// Failed index requests without sequence no should not be replicated
109-
writeRequest = new IndexRequest("index", "_doc", "id")
110-
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
111-
request = new BulkItemRequest(0, writeRequest);
112-
request.setPrimaryResponse(
113-
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
114-
new BulkItemResponse.Failure("index", "type", "id",
115-
new IllegalArgumentException("i died"))));
116-
assertThat(replicaItemExecutionMode(request, 0),
117-
equalTo(ReplicaItemExecutionMode.NOOP));
118-
119-
// Failed index requests with sequence no should be replicated
120-
request = new BulkItemRequest(0, writeRequest);
121-
request.setPrimaryResponse(
122-
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
123-
new BulkItemResponse.Failure("index", "type", "id",
124-
new IllegalArgumentException(
125-
"i died after sequence no was generated"),
126-
1)));
127-
assertThat(replicaItemExecutionMode(request, 0),
128-
equalTo(ReplicaItemExecutionMode.FAILURE));
129-
// NOOP requests should not be replicated
130-
DocWriteRequest<UpdateRequest> updateRequest = new UpdateRequest("index", "type", "id");
131-
response = new UpdateResponse(shardId, "type", "id", 0, 1, 1, DocWriteResponse.Result.NOOP);
132-
request = new BulkItemRequest(0, updateRequest);
133-
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
134-
response));
135-
assertThat(replicaItemExecutionMode(request, 0),
136-
equalTo(ReplicaItemExecutionMode.NOOP));
137-
}
138-
13996
public void testExecuteBulkIndexRequest() throws Exception {
14097
IndexShard shard = newStartedShard(true);
14198

0 commit comments

Comments
 (0)