Skip to content

Commit 215e94d

Browse files
authored
Auto-allocate searchable snapshots (#52527)
This commit allows plugins to supply their own allocator for existing shards, generalizing the default `GatewayAllocator`. It uses this to implement a very simple auto-allocation process for searchable snapshots which respects the allocation deciders but which can otherwise be assigned to any node.
1 parent 95b1acc commit 215e94d

File tree

26 files changed

+923
-170
lines changed

26 files changed

+923
-170
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/Allocators.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,21 @@ private static class NoopGatewayAllocator extends GatewayAllocator {
4747
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
4848

4949
@Override
50-
public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
50+
public void applyStartedShards(List<ShardRouting> startedShards, RoutingAllocation allocation) {
5151
// noop
5252
}
5353

5454
@Override
55-
public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> failedShards) {
55+
public void applyFailedShards(List<FailedShard> failedShards, RoutingAllocation allocation) {
5656
// noop
5757
}
5858

5959
@Override
60-
public void allocateUnassigned(RoutingAllocation allocation) {
60+
public void allocateUnassigned(
61+
ShardRouting shardRouting,
62+
RoutingAllocation allocation,
63+
UnassignedAllocationHandler unassignedAllocationHandler
64+
) {
6165
// noop
6266
}
6367
}

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@
3333
import org.elasticsearch.cluster.node.DiscoveryNode;
3434
import org.elasticsearch.cluster.routing.RoutingNodes;
3535
import org.elasticsearch.cluster.routing.ShardRouting;
36-
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
37-
import org.elasticsearch.cluster.routing.allocation.MoveDecision;
36+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3837
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3938
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation.DebugMode;
4039
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
@@ -43,7 +42,6 @@
4342
import org.elasticsearch.cluster.service.ClusterService;
4443
import org.elasticsearch.common.inject.Inject;
4544
import org.elasticsearch.common.io.stream.StreamInput;
46-
import org.elasticsearch.gateway.GatewayAllocator;
4745
import org.elasticsearch.tasks.Task;
4846
import org.elasticsearch.threadpool.ThreadPool;
4947
import org.elasticsearch.transport.TransportService;
@@ -63,20 +61,20 @@ public class TransportClusterAllocationExplainAction
6361
private final ClusterInfoService clusterInfoService;
6462
private final AllocationDeciders allocationDeciders;
6563
private final ShardsAllocator shardAllocator;
66-
private final GatewayAllocator gatewayAllocator;
64+
private final AllocationService allocationService;
6765

6866
@Inject
6967
public TransportClusterAllocationExplainAction(TransportService transportService, ClusterService clusterService,
7068
ThreadPool threadPool, ActionFilters actionFilters,
7169
IndexNameExpressionResolver indexNameExpressionResolver,
7270
ClusterInfoService clusterInfoService, AllocationDeciders allocationDeciders,
73-
ShardsAllocator shardAllocator, GatewayAllocator gatewayAllocator) {
71+
ShardsAllocator shardAllocator, AllocationService allocationService) {
7472
super(ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
7573
ClusterAllocationExplainRequest::new, indexNameExpressionResolver);
7674
this.clusterInfoService = clusterInfoService;
7775
this.allocationDeciders = allocationDeciders;
7876
this.shardAllocator = shardAllocator;
79-
this.gatewayAllocator = gatewayAllocator;
77+
this.allocationService = allocationService;
8078
}
8179

8280
@Override
@@ -106,27 +104,21 @@ protected void masterOperation(Task task, final ClusterAllocationExplainRequest
106104
logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);
107105

108106
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation,
109-
request.includeDiskInfo() ? clusterInfo : null, request.includeYesDecisions(), gatewayAllocator, shardAllocator);
107+
request.includeDiskInfo() ? clusterInfo : null, request.includeYesDecisions(), allocationService);
110108
listener.onResponse(new ClusterAllocationExplainResponse(cae));
111109
}
112110

113111
// public for testing
114112
public static ClusterAllocationExplanation explainShard(ShardRouting shardRouting, RoutingAllocation allocation,
115113
ClusterInfo clusterInfo, boolean includeYesDecisions,
116-
GatewayAllocator gatewayAllocator, ShardsAllocator shardAllocator) {
114+
AllocationService allocationService) {
117115
allocation.setDebugMode(includeYesDecisions ? DebugMode.ON : DebugMode.EXCLUDE_YES_DECISIONS);
118116

119117
ShardAllocationDecision shardDecision;
120118
if (shardRouting.initializing() || shardRouting.relocating()) {
121119
shardDecision = ShardAllocationDecision.NOT_TAKEN;
122120
} else {
123-
AllocateUnassignedDecision allocateDecision = shardRouting.unassigned() ?
124-
gatewayAllocator.decideUnassignedShardAllocation(shardRouting, allocation) : AllocateUnassignedDecision.NOT_TAKEN;
125-
if (allocateDecision.isDecisionTaken() == false) {
126-
shardDecision = shardAllocator.decideShardAllocation(shardRouting, allocation);
127-
} else {
128-
shardDecision = new ShardAllocationDecision(allocateDecision, MoveDecision.NOT_TAKEN);
129-
}
121+
shardDecision = allocationService.explainShardAllocation(shardRouting, allocation);
130122
}
131123

132124
return new ClusterAllocationExplanation(shardRouting,

server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
import org.elasticsearch.cluster.health.ClusterHealthStatus;
3737
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3838
import org.elasticsearch.cluster.routing.UnassignedInfo;
39+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3940
import org.elasticsearch.cluster.service.ClusterService;
4041
import org.elasticsearch.common.Strings;
4142
import org.elasticsearch.common.inject.Inject;
4243
import org.elasticsearch.common.io.stream.StreamInput;
4344
import org.elasticsearch.common.unit.TimeValue;
44-
import org.elasticsearch.gateway.GatewayAllocator;
4545
import org.elasticsearch.index.IndexNotFoundException;
4646
import org.elasticsearch.tasks.Task;
4747
import org.elasticsearch.threadpool.ThreadPool;
@@ -55,15 +55,15 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
5555

5656
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);
5757

58-
private final GatewayAllocator gatewayAllocator;
58+
private final AllocationService allocationService;
5959

6060
@Inject
6161
public TransportClusterHealthAction(TransportService transportService, ClusterService clusterService,
6262
ThreadPool threadPool, ActionFilters actionFilters,
63-
IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) {
63+
IndexNameExpressionResolver indexNameExpressionResolver, AllocationService allocationService) {
6464
super(ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
6565
ClusterHealthRequest::new, indexNameExpressionResolver);
66-
this.gatewayAllocator = gatewayAllocator;
66+
this.allocationService = allocationService;
6767
}
6868

6969
@Override
@@ -229,14 +229,14 @@ private static int getWaitCount(ClusterHealthRequest request) {
229229

230230
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitCount) {
231231
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
232-
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
232+
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
233233
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
234234
}
235235

236236
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
237237
final int waitFor, boolean timedOut) {
238238
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
239-
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
239+
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
240240
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
241241
boolean valid = (readyCounter == waitFor);
242242
assert valid || timedOut;

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
3535
import org.elasticsearch.cluster.routing.DelayedAllocationService;
3636
import org.elasticsearch.cluster.routing.allocation.AllocationService;
37+
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
3738
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
3839
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
3940
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
@@ -97,12 +98,14 @@ public class ClusterModule extends AbstractModule {
9798
private final IndexNameExpressionResolver indexNameExpressionResolver;
9899
private final AllocationDeciders allocationDeciders;
99100
private final AllocationService allocationService;
101+
private final List<ClusterPlugin> clusterPlugins;
100102
// pkg private for tests
101103
final Collection<AllocationDecider> deciderList;
102104
final ShardsAllocator shardsAllocator;
103105

104106
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
105107
ClusterInfoService clusterInfoService) {
108+
this.clusterPlugins = clusterPlugins;
106109
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
107110
this.allocationDeciders = new AllocationDeciders(deciderList);
108111
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
@@ -251,4 +254,22 @@ protected void configure() {
251254
bind(AllocationDeciders.class).toInstance(allocationDeciders);
252255
bind(ShardsAllocator.class).toInstance(shardsAllocator);
253256
}
257+
258+
public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) {
259+
final Map<String, ExistingShardsAllocator> existingShardsAllocators = new HashMap<>();
260+
existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator);
261+
262+
for (ClusterPlugin clusterPlugin : clusterPlugins) {
263+
for (Map.Entry<String, ExistingShardsAllocator> existingShardsAllocatorEntry
264+
: clusterPlugin.getExistingShardsAllocators().entrySet()) {
265+
final String allocatorName = existingShardsAllocatorEntry.getKey();
266+
if (existingShardsAllocators.put(allocatorName, existingShardsAllocatorEntry.getValue()) != null) {
267+
throw new IllegalArgumentException("ExistingShardsAllocator [" + allocatorName + "] from [" +
268+
clusterPlugin.getClass().getName() + "] was already defined");
269+
}
270+
}
271+
}
272+
allocationService.setExistingShardsAllocators(existingShardsAllocators);
273+
}
274+
254275
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.cluster.metadata.MetaData;
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
32+
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
3233
import org.elasticsearch.common.Nullable;
3334
import org.elasticsearch.common.Randomness;
3435
import org.elasticsearch.common.collect.Tuple;
@@ -883,7 +884,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
883884
ignored.add(shard);
884885
}
885886

886-
public class UnassignedIterator implements Iterator<ShardRouting> {
887+
public class UnassignedIterator implements Iterator<ShardRouting>, ExistingShardsAllocator.UnassignedAllocationHandler {
887888

888889
private final ListIterator<ShardRouting> iterator;
889890
private ShardRouting current;
@@ -907,6 +908,7 @@ public ShardRouting next() {
907908
*
908909
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
909910
*/
911+
@Override
910912
public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize,
911913
RoutingChangesObserver routingChangesObserver) {
912914
nodes.ensureMutable();
@@ -922,6 +924,7 @@ public ShardRouting initialize(String nodeId, @Nullable String existingAllocatio
922924
*
923925
* @param attempt the result of the allocation attempt
924926
*/
927+
@Override
925928
public void removeAndIgnore(AllocationStatus attempt, RoutingChangesObserver changes) {
926929
nodes.ensureMutable();
927930
innerRemove();
@@ -940,6 +943,7 @@ private void updateShardRouting(ShardRouting shardRouting) {
940943
* @param recoverySource the new recovery source to use
941944
* @return the shard with unassigned info updated
942945
*/
946+
@Override
943947
public ShardRouting updateUnassigned(UnassignedInfo unassignedInfo, RecoverySource recoverySource,
944948
RoutingChangesObserver changes) {
945949
nodes.ensureMutable();

0 commit comments

Comments
 (0)