Skip to content

Commit c86e7f1

Browse files
Simplify TransportShardBulkAction#performOnReplica (#41065)
* Simplify TransortShardBulkAction#performOnReplica * Resolve TODO since 8.0 doesn't have to worry about pre 6.x nodes * Remove test for removed method since the logic is now completely internal to `performOnReplica`
1 parent 07e0f0d commit c86e7f1

File tree

2 files changed

+17
-112
lines changed

2 files changed

+17
-112
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 17 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,8 @@ private static boolean isConflictException(final Exception e) {
338338
/**
339339
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
340340
*/
341-
static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
342-
BulkItemResponse operationResponse,
343-
final UpdateHelper.Result translate) {
341+
private static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
342+
BulkItemResponse operationResponse, final UpdateHelper.Result translate) {
344343
final BulkItemResponse response;
345344
DocWriteResponse.Result translatedResult = translate.getResponseResult();
346345
if (operationResponse.isFailed()) {
@@ -382,54 +381,6 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest,
382381
return response;
383382
}
384383

385-
386-
/** Modes for executing item request on replica depending on corresponding primary execution result */
387-
public enum ReplicaItemExecutionMode {
388-
389-
/**
390-
* When primary execution succeeded
391-
*/
392-
NORMAL,
393-
394-
/**
395-
* When primary execution failed before sequence no was generated
396-
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes)
397-
*/
398-
NOOP,
399-
400-
/**
401-
* When primary execution failed after sequence no was generated
402-
*/
403-
FAILURE
404-
}
405-
406-
/**
407-
* Determines whether a bulk item request should be executed on the replica.
408-
*
409-
* @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures
410-
* {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation
411-
* {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or
412-
* when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes)
413-
*/
414-
static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) {
415-
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
416-
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
417-
if (primaryResponse.isFailed()) {
418-
return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
419-
? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op
420-
: ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication
421-
} else {
422-
// TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq#
423-
// (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether
424-
// an operation should be processed or be treated as a noop. This means we could remove this method and the
425-
// ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for
426-
// both failures and indexing operations.
427-
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
428-
? ReplicaItemExecutionMode.NORMAL // execution successful on primary
429-
: ReplicaItemExecutionMode.NOOP; // ignore replication
430-
}
431-
}
432-
433384
@Override
434385
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
435386
final Translog.Location location = performOnReplica(request, replica);
@@ -442,25 +393,22 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
442393
BulkItemRequest item = request.items()[i];
443394
final Engine.Result operationResult;
444395
DocWriteRequest<?> docWriteRequest = item.request();
445-
switch (replicaItemExecutionMode(item, i)) {
446-
case NORMAL:
447-
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
448-
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
449-
assert operationResult != null : "operation result must never be null when primary response has no failure";
450-
location = syncOperationResultOrThrow(operationResult, location);
451-
break;
452-
case NOOP:
453-
break;
454-
case FAILURE:
455-
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
456-
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
457-
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
458-
assert operationResult != null : "operation result must never be null when primary response has no failure";
459-
location = syncOperationResultOrThrow(operationResult, location);
460-
break;
461-
default:
462-
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
396+
final BulkItemResponse response = item.getPrimaryResponse();
397+
final BulkItemResponse.Failure failure = response.getFailure();
398+
final DocWriteResponse writeResponse = response.getResponse();
399+
final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo();
400+
if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) {
401+
assert failure != null || writeResponse.getResult() == DocWriteResponse.Result.NOOP
402+
|| writeResponse.getResult() == DocWriteResponse.Result.NOT_FOUND;
403+
continue;
404+
}
405+
if (failure == null) {
406+
operationResult = performOpOnReplica(writeResponse, docWriteRequest, replica);
407+
} else {
408+
operationResult = replica.markSeqNoAsNoop(seqNum, failure.getMessage());
463409
}
410+
assert operationResult != null : "operation result must never be null when primary response has no failure";
411+
location = syncOperationResultOrThrow(operationResult, location);
464412
}
465413
return location;
466414
}

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.action.DocWriteRequest;
2727
import org.elasticsearch.action.DocWriteResponse;
2828
import org.elasticsearch.action.LatchedActionListener;
29-
import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode;
3029
import org.elasticsearch.action.delete.DeleteRequest;
3130
import org.elasticsearch.action.delete.DeleteResponse;
3231
import org.elasticsearch.action.index.IndexRequest;
@@ -59,7 +58,6 @@
5958
import java.util.concurrent.CountDownLatch;
6059
import java.util.concurrent.atomic.AtomicInteger;
6160

62-
import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
6361
import static org.hamcrest.CoreMatchers.equalTo;
6462
import static org.hamcrest.CoreMatchers.not;
6563
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -96,47 +94,6 @@ private IndexMetaData indexMetaData() throws IOException {
9694
.primaryTerm(0, 1).build();
9795
}
9896

99-
public void testShouldExecuteReplicaItem() throws Exception {
100-
// Successful index request should be replicated
101-
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id")
102-
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
103-
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean());
104-
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
105-
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
106-
assertThat(replicaItemExecutionMode(request, 0),
107-
equalTo(ReplicaItemExecutionMode.NORMAL));
108-
109-
// Failed index requests without sequence no should not be replicated
110-
writeRequest = new IndexRequest("index", "_doc", "id")
111-
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
112-
request = new BulkItemRequest(0, writeRequest);
113-
request.setPrimaryResponse(
114-
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
115-
new BulkItemResponse.Failure("index", "type", "id",
116-
new IllegalArgumentException("i died"))));
117-
assertThat(replicaItemExecutionMode(request, 0),
118-
equalTo(ReplicaItemExecutionMode.NOOP));
119-
120-
// Failed index requests with sequence no should be replicated
121-
request = new BulkItemRequest(0, writeRequest);
122-
request.setPrimaryResponse(
123-
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
124-
new BulkItemResponse.Failure("index", "type", "id",
125-
new IllegalArgumentException(
126-
"i died after sequence no was generated"),
127-
1)));
128-
assertThat(replicaItemExecutionMode(request, 0),
129-
equalTo(ReplicaItemExecutionMode.FAILURE));
130-
// NOOP requests should not be replicated
131-
DocWriteRequest<UpdateRequest> updateRequest = new UpdateRequest("index", "type", "id");
132-
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
133-
request = new BulkItemRequest(0, updateRequest);
134-
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
135-
response));
136-
assertThat(replicaItemExecutionMode(request, 0),
137-
equalTo(ReplicaItemExecutionMode.NOOP));
138-
}
139-
14097
public void testExecuteBulkIndexRequest() throws Exception {
14198
IndexShard shard = newStartedShard(true);
14299

0 commit comments

Comments
 (0)