Skip to content

Commit 0b13d36

Browse files
Dry up TransportMasterNodeAction Usage
1. It is confusing and unnecessary to handle response deserialization via inheritance and request deserialization via composition. Consistently using composition saves hundreds of lines of code and as a matter of fact also removes some indirection in transport master node action since we pass a reader down to the response handling anyway. 2. Defining `executor` via inheritance but then assuming the return of the method is a constant is confusing and again not in line with how we handle the `executor` definition for other transport actions so this was simplified away as well. Somewhat relates to the dry-up in elastic#63335
1 parent 4e740c2 commit 0b13d36

File tree

145 files changed

+232
-1581
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

145 files changed

+232
-1581
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,14 @@
3737
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3838
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation.DebugMode;
3939
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
40-
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
4140
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
4241
import org.elasticsearch.cluster.service.ClusterService;
4342
import org.elasticsearch.common.inject.Inject;
44-
import org.elasticsearch.common.io.stream.StreamInput;
4543
import org.elasticsearch.snapshots.SnapshotsInfoService;
4644
import org.elasticsearch.tasks.Task;
4745
import org.elasticsearch.threadpool.ThreadPool;
4846
import org.elasticsearch.transport.TransportService;
4947

50-
import java.io.IOException;
5148
import java.util.List;
5249

5350
/**
@@ -62,35 +59,23 @@ public class TransportClusterAllocationExplainAction
6259
private final ClusterInfoService clusterInfoService;
6360
private final SnapshotsInfoService snapshotsInfoService;
6461
private final AllocationDeciders allocationDeciders;
65-
private final ShardsAllocator shardAllocator;
6662
private final AllocationService allocationService;
6763

6864
@Inject
6965
public TransportClusterAllocationExplainAction(TransportService transportService, ClusterService clusterService,
7066
ThreadPool threadPool, ActionFilters actionFilters,
7167
IndexNameExpressionResolver indexNameExpressionResolver,
7268
ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService,
73-
AllocationDeciders allocationDeciders,
74-
ShardsAllocator shardAllocator, AllocationService allocationService) {
69+
AllocationDeciders allocationDeciders, AllocationService allocationService) {
7570
super(ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
76-
ClusterAllocationExplainRequest::new, indexNameExpressionResolver);
71+
ClusterAllocationExplainRequest::new, indexNameExpressionResolver, ClusterAllocationExplainResponse::new,
72+
ThreadPool.Names.MANAGEMENT);
7773
this.clusterInfoService = clusterInfoService;
7874
this.snapshotsInfoService = snapshotsInfoService;
7975
this.allocationDeciders = allocationDeciders;
80-
this.shardAllocator = shardAllocator;
8176
this.allocationService = allocationService;
8277
}
8378

84-
@Override
85-
protected String executor() {
86-
return ThreadPool.Names.MANAGEMENT;
87-
}
88-
89-
@Override
90-
protected ClusterAllocationExplainResponse read(StreamInput in) throws IOException {
91-
return new ClusterAllocationExplainResponse(in);
92-
}
93-
9479
@Override
9580
protected ClusterBlockException checkBlock(ClusterAllocationExplainRequest request, ClusterState state) {
9681
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,15 @@
3838
import org.elasticsearch.cluster.service.ClusterService;
3939
import org.elasticsearch.common.Priority;
4040
import org.elasticsearch.common.inject.Inject;
41-
import org.elasticsearch.common.io.stream.StreamInput;
4241
import org.elasticsearch.common.settings.ClusterSettings;
4342
import org.elasticsearch.common.settings.Setting;
4443
import org.elasticsearch.common.settings.Setting.Property;
4544
import org.elasticsearch.common.settings.Settings;
4645
import org.elasticsearch.common.unit.TimeValue;
4746
import org.elasticsearch.tasks.Task;
4847
import org.elasticsearch.threadpool.ThreadPool;
49-
import org.elasticsearch.threadpool.ThreadPool.Names;
5048
import org.elasticsearch.transport.TransportService;
5149

52-
import java.io.IOException;
5350
import java.util.Set;
5451
import java.util.function.Predicate;
5552
import java.util.stream.Collectors;
@@ -69,7 +66,8 @@ public TransportAddVotingConfigExclusionsAction(Settings settings, ClusterSettin
6966
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
7067
IndexNameExpressionResolver indexNameExpressionResolver) {
7168
super(AddVotingConfigExclusionsAction.NAME, transportService, clusterService, threadPool, actionFilters,
72-
AddVotingConfigExclusionsRequest::new, indexNameExpressionResolver);
69+
AddVotingConfigExclusionsRequest::new, indexNameExpressionResolver, AddVotingConfigExclusionsResponse::new,
70+
ThreadPool.Names.SAME);
7371

7472
maxVotingConfigExclusions = MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.get(settings);
7573
clusterSettings.addSettingsUpdateConsumer(MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING, this::setMaxVotingConfigExclusions);
@@ -79,16 +77,6 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) {
7977
this.maxVotingConfigExclusions = maxVotingConfigExclusions;
8078
}
8179

82-
@Override
83-
protected String executor() {
84-
return Names.SAME;
85-
}
86-
87-
@Override
88-
protected AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
89-
return new AddVotingConfigExclusionsResponse(in);
90-
}
91-
9280
@Override
9381
protected void masterOperation(Task task, AddVotingConfigExclusionsRequest request, ClusterState state,
9482
ActionListener<AddVotingConfigExclusionsResponse> listener) throws Exception {

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,11 @@
3838
import org.elasticsearch.cluster.service.ClusterService;
3939
import org.elasticsearch.common.Priority;
4040
import org.elasticsearch.common.inject.Inject;
41-
import org.elasticsearch.common.io.stream.StreamInput;
4241
import org.elasticsearch.common.unit.TimeValue;
4342
import org.elasticsearch.tasks.Task;
4443
import org.elasticsearch.threadpool.ThreadPool;
45-
import org.elasticsearch.threadpool.ThreadPool.Names;
4644
import org.elasticsearch.transport.TransportService;
4745

48-
import java.io.IOException;
4946
import java.util.function.Predicate;
5047

5148
public class TransportClearVotingConfigExclusionsAction
@@ -58,17 +55,8 @@ public TransportClearVotingConfigExclusionsAction(TransportService transportServ
5855
ThreadPool threadPool, ActionFilters actionFilters,
5956
IndexNameExpressionResolver indexNameExpressionResolver) {
6057
super(ClearVotingConfigExclusionsAction.NAME, transportService, clusterService, threadPool, actionFilters,
61-
ClearVotingConfigExclusionsRequest::new, indexNameExpressionResolver);
62-
}
63-
64-
@Override
65-
protected String executor() {
66-
return Names.SAME;
67-
}
68-
69-
@Override
70-
protected ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
71-
return new ClearVotingConfigExclusionsResponse(in);
58+
ClearVotingConfigExclusionsRequest::new, indexNameExpressionResolver, ClearVotingConfigExclusionsResponse::new,
59+
ThreadPool.Names.SAME);
7260
}
7361

7462
@Override

server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.cluster.service.ClusterService;
4242
import org.elasticsearch.common.Strings;
4343
import org.elasticsearch.common.inject.Inject;
44-
import org.elasticsearch.common.io.stream.StreamInput;
4544
import org.elasticsearch.common.unit.TimeValue;
4645
import org.elasticsearch.common.util.CollectionUtils;
4746
import org.elasticsearch.index.IndexNotFoundException;
@@ -50,7 +49,6 @@
5049
import org.elasticsearch.threadpool.ThreadPool;
5150
import org.elasticsearch.transport.TransportService;
5251

53-
import java.io.IOException;
5452
import java.util.function.Consumer;
5553
import java.util.function.Predicate;
5654

@@ -65,21 +63,10 @@ public TransportClusterHealthAction(TransportService transportService, ClusterSe
6563
ThreadPool threadPool, ActionFilters actionFilters,
6664
IndexNameExpressionResolver indexNameExpressionResolver, AllocationService allocationService) {
6765
super(ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
68-
ClusterHealthRequest::new, indexNameExpressionResolver);
66+
ClusterHealthRequest::new, indexNameExpressionResolver,ClusterHealthResponse::new, ThreadPool.Names.SAME);
6967
this.allocationService = allocationService;
7068
}
7169

72-
@Override
73-
protected String executor() {
74-
// this should be executing quickly no need to fork off
75-
return ThreadPool.Names.SAME;
76-
}
77-
78-
@Override
79-
protected ClusterHealthResponse read(StreamInput in) throws IOException {
80-
return new ClusterHealthResponse(in);
81-
}
82-
8370
@Override
8471
protected ClusterBlockException checkBlock(ClusterHealthRequest request, ClusterState state) {
8572
// we want users to be able to call this even when there are global blocks, just to check the health (are there blocks?)

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.cluster.service.ClusterService;
3939
import org.elasticsearch.common.Nullable;
4040
import org.elasticsearch.common.inject.Inject;
41-
import org.elasticsearch.common.io.stream.StreamInput;
4241
import org.elasticsearch.repositories.RepositoriesService;
4342
import org.elasticsearch.repositories.Repository;
4443
import org.elasticsearch.repositories.RepositoryCleanupResult;
@@ -49,7 +48,6 @@
4948
import org.elasticsearch.threadpool.ThreadPool;
5049
import org.elasticsearch.transport.TransportService;
5150

52-
import java.io.IOException;
5351
import java.util.List;
5452

5553
/**
@@ -78,18 +76,13 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
7876

7977
private final SnapshotsService snapshotsService;
8078

81-
@Override
82-
protected String executor() {
83-
return ThreadPool.Names.SAME;
84-
}
85-
8679
@Inject
8780
public TransportCleanupRepositoryAction(TransportService transportService, ClusterService clusterService,
8881
RepositoriesService repositoriesService, SnapshotsService snapshotsService,
8982
ThreadPool threadPool, ActionFilters actionFilters,
9083
IndexNameExpressionResolver indexNameExpressionResolver) {
9184
super(CleanupRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
92-
CleanupRepositoryRequest::new, indexNameExpressionResolver);
85+
CleanupRepositoryRequest::new, indexNameExpressionResolver, CleanupRepositoryResponse::new, ThreadPool.Names.SAME);
9386
this.repositoriesService = repositoriesService;
9487
this.snapshotsService = snapshotsService;
9588
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
@@ -136,11 +129,6 @@ private static ClusterState removeInProgressCleanup(final ClusterState currentSt
136129
: currentState;
137130
}
138131

139-
@Override
140-
protected CleanupRepositoryResponse read(StreamInput in) throws IOException {
141-
return new CleanupRepositoryResponse(in);
142-
}
143-
144132
@Override
145133
protected void masterOperation(Task task, CleanupRepositoryRequest request, ClusterState state,
146134
ActionListener<CleanupRepositoryResponse> listener) {

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,10 @@ public TransportDeleteRepositoryAction(TransportService transportService, Cluste
4646
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
4747
IndexNameExpressionResolver indexNameExpressionResolver) {
4848
super(DeleteRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
49-
DeleteRepositoryRequest::new, indexNameExpressionResolver);
49+
DeleteRepositoryRequest::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
5050
this.repositoriesService = repositoriesService;
5151
}
5252

53-
@Override
54-
protected String executor() {
55-
return ThreadPool.Names.SAME;
56-
}
57-
5853
@Override
5954
protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, ClusterState state) {
6055
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,12 @@
3131
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.inject.Inject;
34-
import org.elasticsearch.common.io.stream.StreamInput;
3534
import org.elasticsearch.common.regex.Regex;
3635
import org.elasticsearch.repositories.RepositoryMissingException;
3736
import org.elasticsearch.tasks.Task;
3837
import org.elasticsearch.threadpool.ThreadPool;
3938
import org.elasticsearch.transport.TransportService;
4039

41-
import java.io.IOException;
4240
import java.util.ArrayList;
4341
import java.util.Collections;
4442
import java.util.LinkedHashSet;
@@ -55,17 +53,7 @@ public TransportGetRepositoriesAction(TransportService transportService, Cluster
5553
ThreadPool threadPool, ActionFilters actionFilters,
5654
IndexNameExpressionResolver indexNameExpressionResolver) {
5755
super(GetRepositoriesAction.NAME, transportService, clusterService, threadPool, actionFilters,
58-
GetRepositoriesRequest::new, indexNameExpressionResolver);
59-
}
60-
61-
@Override
62-
protected String executor() {
63-
return ThreadPool.Names.SAME;
64-
}
65-
66-
@Override
67-
protected GetRepositoriesResponse read(StreamInput in) throws IOException {
68-
return new GetRepositoriesResponse(in);
56+
GetRepositoriesRequest::new, indexNameExpressionResolver, GetRepositoriesResponse::new, ThreadPool.Names.SAME);
6957
}
7058

7159
@Override

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,10 @@ public TransportPutRepositoryAction(TransportService transportService, ClusterSe
4646
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
4747
IndexNameExpressionResolver indexNameExpressionResolver) {
4848
super(PutRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
49-
PutRepositoryRequest::new, indexNameExpressionResolver);
49+
PutRepositoryRequest::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
5050
this.repositoriesService = repositoriesService;
5151
}
5252

53-
@Override
54-
protected String executor() {
55-
return ThreadPool.Names.SAME;
56-
}
57-
5853
@Override
5954
protected ClusterBlockException checkBlock(PutRepositoryRequest request, ClusterState state) {
6055
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,11 @@
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.inject.Inject;
32-
import org.elasticsearch.common.io.stream.StreamInput;
3332
import org.elasticsearch.repositories.RepositoriesService;
3433
import org.elasticsearch.tasks.Task;
3534
import org.elasticsearch.threadpool.ThreadPool;
3635
import org.elasticsearch.transport.TransportService;
3736

38-
import java.io.IOException;
39-
4037
/**
4138
* Transport action for verifying repository operation
4239
*/
@@ -51,20 +48,10 @@ public TransportVerifyRepositoryAction(TransportService transportService, Cluste
5148
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
5249
IndexNameExpressionResolver indexNameExpressionResolver) {
5350
super(VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
54-
VerifyRepositoryRequest::new, indexNameExpressionResolver);
51+
VerifyRepositoryRequest::new, indexNameExpressionResolver, VerifyRepositoryResponse::new, ThreadPool.Names.SAME);
5552
this.repositoriesService = repositoriesService;
5653
}
5754

58-
@Override
59-
protected String executor() {
60-
return ThreadPool.Names.SAME;
61-
}
62-
63-
@Override
64-
protected VerifyRepositoryResponse read(StreamInput in) throws IOException {
65-
return new VerifyRepositoryResponse(in);
66-
}
67-
6855
@Override
6956
protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, ClusterState state) {
7057
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);

server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,10 @@
4747
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
4848
import org.elasticsearch.common.collect.ImmutableOpenMap;
4949
import org.elasticsearch.common.inject.Inject;
50-
import org.elasticsearch.common.io.stream.StreamInput;
5150
import org.elasticsearch.tasks.Task;
5251
import org.elasticsearch.threadpool.ThreadPool;
5352
import org.elasticsearch.transport.TransportService;
5453

55-
import java.io.IOException;
5654
import java.util.ArrayList;
5755
import java.util.HashMap;
5856
import java.util.List;
@@ -69,26 +67,15 @@ public TransportClusterRerouteAction(TransportService transportService, ClusterS
6967
ThreadPool threadPool, AllocationService allocationService, ActionFilters actionFilters,
7068
IndexNameExpressionResolver indexNameExpressionResolver) {
7169
super(ClusterRerouteAction.NAME, transportService, clusterService, threadPool, actionFilters,
72-
ClusterRerouteRequest::new, indexNameExpressionResolver);
70+
ClusterRerouteRequest::new, indexNameExpressionResolver, ClusterRerouteResponse::new, ThreadPool.Names.SAME);
7371
this.allocationService = allocationService;
7472
}
7573

76-
@Override
77-
protected String executor() {
78-
// we go async right away
79-
return ThreadPool.Names.SAME;
80-
}
81-
8274
@Override
8375
protected ClusterBlockException checkBlock(ClusterRerouteRequest request, ClusterState state) {
8476
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
8577
}
8678

87-
@Override
88-
protected ClusterRerouteResponse read(StreamInput in) throws IOException {
89-
return new ClusterRerouteResponse(in);
90-
}
91-
9279
@Override
9380
protected void masterOperation(Task task, final ClusterRerouteRequest request, final ClusterState state,
9481
final ActionListener<ClusterRerouteResponse> listener) {

0 commit comments

Comments
 (0)