Skip to content

Commit 7631b13

Browse files
committed
change broadcast support to be able to run on all shards replicas (in parallel) and not just one shard per replica group. Change flush and refresh to use broadcast and not replicaiton. Remove shards transport support since broadcast now does exactly the same, and refactor index status to use broadcast (across all shards).
1 parent c31d29b commit 7631b13

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+420
-1451
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,11 @@
3030
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
3131
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
3232
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
33-
import org.elasticsearch.action.admin.indices.flush.TransportIndexFlushAction;
34-
import org.elasticsearch.action.admin.indices.flush.TransportShardFlushAction;
3533
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportGatewaySnapshotAction;
3634
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportIndexGatewaySnapshotAction;
3735
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportShardGatewaySnapshotAction;
3836
import org.elasticsearch.action.admin.indices.mapping.create.TransportCreateMappingAction;
39-
import org.elasticsearch.action.admin.indices.refresh.TransportIndexRefreshAction;
4037
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
41-
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
4238
import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction;
4339
import org.elasticsearch.action.count.TransportCountAction;
4440
import org.elasticsearch.action.delete.TransportDeleteAction;
@@ -76,18 +72,11 @@ public class TransportActionModule extends AbstractModule {
7672
bind(TransportIndexGatewaySnapshotAction.class).asEagerSingleton();
7773
bind(TransportGatewaySnapshotAction.class).asEagerSingleton();
7874

79-
bind(TransportShardRefreshAction.class).asEagerSingleton();
80-
bind(TransportIndexRefreshAction.class).asEagerSingleton();
8175
bind(TransportRefreshAction.class).asEagerSingleton();
82-
83-
bind(TransportShardFlushAction.class).asEagerSingleton();
84-
bind(TransportIndexFlushAction.class).asEagerSingleton();
8576
bind(TransportFlushAction.class).asEagerSingleton();
8677

8778
bind(TransportIndexAction.class).asEagerSingleton();
88-
8979
bind(TransportGetAction.class).asEagerSingleton();
90-
9180
bind(TransportDeleteAction.class).asEagerSingleton();
9281

9382
bind(TransportShardDeleteByQueryAction.class).asEagerSingleton();

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.action.TransportActions;
2525
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
2626
import org.elasticsearch.cluster.ClusterService;
27+
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2729
import org.elasticsearch.cluster.routing.ShardRouting;
2830
import org.elasticsearch.indices.IndicesService;
2931
import org.elasticsearch.threadpool.ThreadPool;
@@ -32,6 +34,8 @@
3234

3335
import java.util.concurrent.atomic.AtomicReferenceArray;
3436

37+
import static org.elasticsearch.action.Actions.*;
38+
3539
/**
3640
* @author kimchy (Shay Banon)
3741
*/
@@ -53,7 +57,11 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
5357
return new BroadcastPingRequest();
5458
}
5559

56-
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest broadcastPingRequest, AtomicReferenceArray shardsResponses) {
60+
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
61+
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
62+
}
63+
64+
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
5765
int successfulShards = 0;
5866
int failedShards = 0;
5967
for (int i = 0; i < shardsResponses.length(); i++) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,35 @@
1919

2020
package org.elasticsearch.action.admin.indices.flush;
2121

22-
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
23-
import org.elasticsearch.util.TimeValue;
22+
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
23+
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
2424

2525
/**
2626
* @author kimchy (Shay Banon)
2727
*/
28-
public class FlushRequest extends IndicesReplicationOperationRequest {
28+
public class FlushRequest extends BroadcastOperationRequest {
29+
30+
FlushRequest() {
31+
32+
}
2933

3034
public FlushRequest(String index) {
3135
this(new String[]{index});
3236
}
3337

3438
public FlushRequest(String... indices) {
35-
this.indices = indices;
39+
super(indices, null);
40+
// we want to do the refresh in parallel on local shards...
41+
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
3642
}
3743

3844
@Override public FlushRequest listenerThreaded(boolean threadedListener) {
3945
super.listenerThreaded(threadedListener);
4046
return this;
4147
}
4248

43-
public FlushRequest timeout(TimeValue timeout) {
44-
this.timeout = timeout;
49+
@Override public FlushRequest operationThreading(BroadcastOperationThreading operationThreading) {
50+
super.operationThreading(operationThreading);
4551
return this;
4652
}
47-
48-
FlushRequest() {
49-
50-
}
5153
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,30 @@
1919

2020
package org.elasticsearch.action.admin.indices.flush;
2121

22-
import org.elasticsearch.action.ActionResponse;
23-
import org.elasticsearch.util.io.Streamable;
22+
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
2423

2524
import java.io.DataInput;
2625
import java.io.DataOutput;
2726
import java.io.IOException;
28-
import java.util.HashMap;
29-
import java.util.Map;
3027

3128
/**
3229
* @author kimchy (Shay Banon)
3330
*/
34-
public class FlushResponse implements ActionResponse, Streamable {
35-
36-
private Map<String, IndexFlushResponse> indices = new HashMap<String, IndexFlushResponse>();
31+
public class FlushResponse extends BroadcastOperationResponse {
3732

3833
FlushResponse() {
3934

4035
}
4136

42-
public Map<String, IndexFlushResponse> indices() {
43-
return indices;
44-
}
45-
46-
public IndexFlushResponse index(String index) {
47-
return indices.get(index);
37+
FlushResponse(int successfulShards, int failedShards) {
38+
super(successfulShards, failedShards);
4839
}
4940

5041
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
51-
int size = in.readInt();
52-
for (int i = 0; i < size; i++) {
53-
IndexFlushResponse indexFlushResponse = new IndexFlushResponse();
54-
indexFlushResponse.readFrom(in);
55-
indices.put(indexFlushResponse.index(), indexFlushResponse);
56-
}
42+
super.readFrom(in);
5743
}
5844

5945
@Override public void writeTo(DataOutput out) throws IOException {
60-
out.writeInt(indices.size());
61-
for (IndexFlushResponse indexFlushResponse : indices.values()) {
62-
indexFlushResponse.writeTo(out);
63-
}
46+
super.writeTo(out);
6447
}
65-
}
48+
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushRequest.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushResponse.java

Lines changed: 0 additions & 77 deletions
This file was deleted.

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.flush;
2121

22-
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
22+
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
2323

2424
import java.io.DataInput;
2525
import java.io.DataOutput;
@@ -28,34 +28,20 @@
2828
/**
2929
* @author kimchy (Shay Banon)
3030
*/
31-
public class ShardFlushRequest extends ShardReplicationOperationRequest {
32-
33-
private int shardId;
34-
35-
public ShardFlushRequest(IndexFlushRequest indexFlushRequest, int shardId) {
36-
this(indexFlushRequest.index(), shardId);
37-
timeout = indexFlushRequest.timeout();
38-
}
39-
40-
public ShardFlushRequest(String index, int shardId) {
41-
this.index = index;
42-
this.shardId = shardId;
43-
}
31+
public class ShardFlushRequest extends BroadcastShardOperationRequest {
4432

4533
ShardFlushRequest() {
4634
}
4735

48-
public int shardId() {
49-
return this.shardId;
36+
public ShardFlushRequest(String index, int shardId) {
37+
super(index, shardId);
5038
}
5139

5240
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
5341
super.readFrom(in);
54-
shardId = in.readInt();
5542
}
5643

5744
@Override public void writeTo(DataOutput out) throws IOException {
5845
super.writeTo(out);
59-
out.writeInt(shardId);
6046
}
6147
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.flush;
2121

22-
import org.elasticsearch.action.ActionResponse;
23-
import org.elasticsearch.util.io.Streamable;
22+
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
2423

2524
import java.io.DataInput;
2625
import java.io.DataOutput;
@@ -29,15 +28,21 @@
2928
/**
3029
* @author kimchy (Shay Banon)
3130
*/
32-
public class ShardFlushResponse implements ActionResponse, Streamable {
31+
public class ShardFlushResponse extends BroadcastShardOperationResponse {
3332

3433
ShardFlushResponse() {
3534

3635
}
3736

37+
public ShardFlushResponse(String index, int shardId) {
38+
super(index, shardId);
39+
}
40+
3841
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
42+
super.readFrom(in);
3943
}
4044

4145
@Override public void writeTo(DataOutput out) throws IOException {
46+
super.writeTo(out);
4247
}
4348
}

0 commit comments

Comments
 (0)