Skip to content

Ignore replication for noop updates #46458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,22 +393,20 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
DocWriteRequest<?> docWriteRequest = item.request();
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final BulkItemResponse.Failure failure = response.getFailure();
final DocWriteResponse writeResponse = response.getResponse();
final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo();
if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert failure != null || writeResponse.getResult() == DocWriteResponse.Result.NOOP
|| writeResponse.getResult() == DocWriteResponse.Result.NOT_FOUND;
continue;
}
if (failure == null) {
operationResult = performOpOnReplica(writeResponse, docWriteRequest, replica);
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), response.getFailure().getMessage());
} else {
operationResult = replica.markSeqNoAsNoop(seqNum, failure.getMessage());
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
Expand All @@ -435,8 +433,8 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
deleteRequest.type(), deleteRequest.id());
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
+ docWriteRequest.opType().getLowercase());
assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;
throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// Even though the primary waits on all nodes to ack the mapping changes to the master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
Expand All @@ -57,6 +59,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -618,5 +621,31 @@ public void testInvalidIndexNamesCorrectOpType() {
assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE));
assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE));
}

public void testNoopUpdate() {
String indexName = "test";
createIndex(indexName, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
internalCluster().ensureAtLeastNumDataNodes(2);
ensureGreen(indexName);
IndexResponse doc = index(indexName, "_doc", "1", Map.of("user", "xyz"));
assertThat(doc.getShardInfo().getSuccessful(), equalTo(2));
final BulkResponse bulkResponse = client().prepareBulk()
.add(new UpdateRequest().index(indexName).id("1").detectNoop(true).doc("user", "xyz")) // noop update
.add(new UpdateRequest().index(indexName).id("2").docAsUpsert(false).doc("f", "v")) // not_found update
.add(new DeleteRequest().index(indexName).id("2")) // not_found delete
.get();
assertThat(bulkResponse.getItems(), arrayWithSize(3));

final BulkItemResponse noopUpdate = bulkResponse.getItems()[0];
assertThat(noopUpdate.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
assertThat(Strings.toString(noopUpdate), noopUpdate.getResponse().getShardInfo().getSuccessful(), equalTo(2));

final BulkItemResponse notFoundUpdate = bulkResponse.getItems()[1];
assertNotNull(notFoundUpdate.getFailure());

final BulkItemResponse notFoundDelete = bulkResponse.getItems()[2];
assertThat(notFoundDelete.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOT_FOUND));
assertThat(Strings.toString(notFoundDelete), notFoundDelete.getResponse().getShardInfo().getSuccessful(), equalTo(2));
}
}