Skip to content

Commit 025e981

Browse files
committed
Refactors TransportReplicationAction to decouple request routing and shard operation logic
1 parent da5b07a commit 025e981

20 files changed

+706
-672
lines changed

core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,16 @@
2222
import org.elasticsearch.action.support.replication.ReplicationRequest;
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.index.shard.ShardId;
2526

2627
import java.io.IOException;
2728

2829
public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
2930

3031
private FlushRequest request = new FlushRequest();
3132

32-
public ShardFlushRequest(FlushRequest request) {
33-
super(request);
33+
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
34+
super(request, shardId);
3435
this.request = request;
3536
}
3637

@@ -53,5 +54,8 @@ public void writeTo(StreamOutput out) throws IOException {
5354
request.writeTo(out);
5455
}
5556

56-
57+
@Override
58+
public String toString() {
59+
return "flush {" + super.toString() + "}";
60+
}
5761
}

core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected ActionWriteResponse newShardResponse() {
5353

5454
@Override
5555
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
56-
return new ShardFlushRequest(request).setShardId(shardId);
56+
return new ShardFlushRequest(request, shardId);
5757
}
5858

5959
@Override

core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,15 @@
2323
import org.elasticsearch.action.support.ActionFilters;
2424
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2525
import org.elasticsearch.cluster.ClusterService;
26-
import org.elasticsearch.cluster.ClusterState;
2726
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
2827
import org.elasticsearch.cluster.action.shard.ShardStateAction;
29-
import org.elasticsearch.cluster.block.ClusterBlockException;
3028
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3129
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
32-
import org.elasticsearch.cluster.routing.ShardIterator;
30+
import org.elasticsearch.cluster.metadata.MetaData;
3331
import org.elasticsearch.common.collect.Tuple;
3432
import org.elasticsearch.common.inject.Inject;
3533
import org.elasticsearch.common.settings.Settings;
3634
import org.elasticsearch.index.shard.IndexShard;
37-
import org.elasticsearch.index.shard.ShardId;
3835
import org.elasticsearch.indices.IndicesService;
3936
import org.elasticsearch.threadpool.ThreadPool;
4037
import org.elasticsearch.transport.TransportService;
@@ -61,15 +58,15 @@ protected ActionWriteResponse newResponseInstance() {
6158
}
6259

6360
@Override
64-
protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
65-
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id());
66-
indexShard.flush(shardRequest.request.getRequest());
61+
protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable {
62+
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
63+
indexShard.flush(shardRequest.getRequest());
6764
logger.trace("{} flush request executed on primary", indexShard.shardId());
68-
return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
65+
return new Tuple<>(new ActionWriteResponse(), shardRequest);
6966
}
7067

7168
@Override
72-
protected void shardOperationOnReplica(ShardId shardId, ShardFlushRequest request) {
69+
protected void shardOperationOnReplica(ShardFlushRequest request) {
7370
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
7471
indexShard.flush(request.getRequest());
7572
logger.trace("{} flush request executed on replica", indexShard.shardId());
@@ -81,18 +78,13 @@ protected boolean checkWriteConsistency() {
8178
}
8279

8380
@Override
84-
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
85-
return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
81+
protected ClusterBlockLevel globalBlockLevel() {
82+
return ClusterBlockLevel.METADATA_WRITE;
8683
}
8784

8885
@Override
89-
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
90-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
91-
}
92-
93-
@Override
94-
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
95-
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
86+
protected ClusterBlockLevel indexBlockLevel() {
87+
return ClusterBlockLevel.METADATA_WRITE;
9688
}
9789

9890
@Override

core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected ActionWriteResponse newShardResponse() {
5454

5555
@Override
5656
protected ReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
57-
return new ReplicationRequest(request).setShardId(shardId);
57+
return new ReplicationRequest(request, shardId);
5858
}
5959

6060
@Override

core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@
2424
import org.elasticsearch.action.support.replication.ReplicationRequest;
2525
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2626
import org.elasticsearch.cluster.ClusterService;
27-
import org.elasticsearch.cluster.ClusterState;
2827
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
2928
import org.elasticsearch.cluster.action.shard.ShardStateAction;
30-
import org.elasticsearch.cluster.block.ClusterBlockException;
3129
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3230
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
33-
import org.elasticsearch.cluster.routing.ShardIterator;
31+
import org.elasticsearch.cluster.metadata.MetaData;
3432
import org.elasticsearch.common.collect.Tuple;
3533
import org.elasticsearch.common.inject.Inject;
3634
import org.elasticsearch.common.settings.Settings;
@@ -62,15 +60,16 @@ protected ActionWriteResponse newResponseInstance() {
6260
}
6361

6462
@Override
65-
protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
66-
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).getShard(shardRequest.shardId.id());
63+
protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable {
64+
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
6765
indexShard.refresh("api");
6866
logger.trace("{} refresh request executed on primary", indexShard.shardId());
69-
return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
67+
return new Tuple<>(new ActionWriteResponse(), shardRequest);
7068
}
7169

7270
@Override
73-
protected void shardOperationOnReplica(ShardId shardId, ReplicationRequest request) {
71+
protected void shardOperationOnReplica(ReplicationRequest request) {
72+
final ShardId shardId = request.shardId();
7473
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
7574
indexShard.refresh("api");
7675
logger.trace("{} refresh request executed on replica", indexShard.shardId());
@@ -82,18 +81,13 @@ protected boolean checkWriteConsistency() {
8281
}
8382

8483
@Override
85-
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
86-
return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
84+
protected ClusterBlockLevel globalBlockLevel() {
85+
return ClusterBlockLevel.METADATA_WRITE;
8786
}
8887

8988
@Override
90-
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
91-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
92-
}
93-
94-
@Override
95-
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
96-
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
89+
protected ClusterBlockLevel indexBlockLevel() {
90+
return ClusterBlockLevel.METADATA_WRITE;
9791
}
9892

9993
@Override

core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,8 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
4040
public BulkShardRequest() {
4141
}
4242

43-
BulkShardRequest(BulkRequest bulkRequest, String index, int shardId, boolean refresh, BulkItemRequest[] items) {
44-
super(bulkRequest);
45-
this.index = index;
46-
this.setShardId(new ShardId(index, shardId));
43+
BulkShardRequest(BulkRequest bulkRequest, ShardId shardId, boolean refresh, BulkItemRequest[] items) {
44+
super(bulkRequest, shardId);
4745
this.items = items;
4846
this.refresh = refresh;
4947
}
@@ -93,4 +91,9 @@ public void readFrom(StreamInput in) throws IOException {
9391
}
9492
refresh = in.readBoolean();
9593
}
94+
95+
@Override
96+
public String toString() {
97+
return "shard bulk {" + super.toString() + "}";
98+
}
9699
}

core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
275275
list.add(new BulkItemRequest(i, new DeleteRequest(deleteRequest)));
276276
}
277277
} else {
278-
ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
278+
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
279279
List<BulkItemRequest> list = requestsByShard.get(shardId);
280280
if (list == null) {
281281
list = new ArrayList<>();
@@ -312,7 +312,7 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
312312
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
313313
final ShardId shardId = entry.getKey();
314314
final List<BulkItemRequest> requests = entry.getValue();
315-
BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
315+
BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId, bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
316316
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
317317
bulkShardRequest.timeout(bulkRequest.timeout());
318318
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {

0 commit comments

Comments
 (0)