Skip to content

Allocation: add delay between retries for failed allocations #27086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.StaleShard;
Expand Down Expand Up @@ -88,8 +91,13 @@ public ShardStateAction(Settings settings, ClusterService clusterService, Transp
this.clusterService = clusterService;
this.threadPool = threadPool;

transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME,
new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
AllocationRetryBackoffPolicy backoffPolicy = AllocationRetryBackoffPolicy.policyForSettings(settings);
ShardFailedClusterStateTaskExecutor shardFailedExecutor =
new ShardFailedClusterStateTaskExecutor(allocationService, routingService, backoffPolicy, logger);
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME,
new ShardFailedTransportHandler(clusterService, shardFailedExecutor, logger));
}

private void sendShardAction(final String actionName, final ClusterState currentState, final ShardEntry shardEntry, final Listener listener) {
Expand Down Expand Up @@ -251,11 +259,14 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final AllocationRetryBackoffPolicy backoffPolicy;
private final Logger logger;

public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, Logger logger) {
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService,
AllocationRetryBackoffPolicy backoffPolicy, Logger logger) {
this.allocationService = allocationService;
this.routingService = routingService;
this.backoffPolicy = backoffPolicy;
this.logger = logger;
}

Expand Down Expand Up @@ -341,14 +352,15 @@ ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> fail

@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size();
if (numberOfUnassignedShards > 0) {
String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
if (logger.isTraceEnabled()) {
logger.trace("{}, scheduling a reroute", reason);
}
routingService.reroute(reason);
}
List<ShardRouting> unassigned = clusterChangedEvent.state().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
unassigned.stream()
.mapToInt(s -> s.unassignedInfo().getNumFailedAllocations())
.min()
.ifPresent(numberOfFailures -> {
String reason = String.format(Locale.ROOT, "Schedule rerouting after [%s] failures", numberOfFailures);
TimeValue delay = backoffPolicy.delayInterval(numberOfFailures);
routingService.scheduleReroute(reason, delay);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* A {@link RoutingService} listens to clusters state. When this service
Expand All @@ -51,14 +57,17 @@ public class RoutingService extends AbstractLifecycleComponent {

private final ClusterService clusterService;
private final AllocationService allocationService;
private final ThreadPool threadPool;

private AtomicBoolean rerouting = new AtomicBoolean();
private final AtomicBoolean rerouting = new AtomicBoolean();
private final AtomicReference<ScheduledFuture> pendingTask = new AtomicReference<>();

@Inject
public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService, ThreadPool threadPool) {
super(settings);
this.clusterService = clusterService;
this.allocationService = allocationService;
this.threadPool = threadPool;
}

@Override
Expand All @@ -74,8 +83,39 @@ protected void doClose() {
}

/**
* Initiates a reroute.
* Schedules a one-shot reroute action after the given delay.
* This schedule may be skipped if there is an ongoing rerouting.
*/
public void scheduleReroute(String reason, TimeValue delay) {
if (logger.isTraceEnabled()){
logger.trace("Schedule reroute in [{}], reason [{}]", delay, reason);
}

ScheduledFuture newTask = null;
while (true) {
final ScheduledFuture existingTask = pendingTask.get();
final long existingDelayMS = (existingTask == null) ? Long.MAX_VALUE : existingTask.getDelay(TimeUnit.MILLISECONDS);

if (newTask == null && existingDelayMS > delay.millis()) {
newTask = threadPool.schedule(delay, ThreadPool.Names.SAME, () -> performReroute(reason));
}
if (newTask == null) {
return;
}
if (existingDelayMS > newTask.getDelay(TimeUnit.MILLISECONDS)) {
if (pendingTask.compareAndSet(existingTask, newTask) == true) {
if (existingTask != null) {
FutureUtils.cancel(existingTask);
}
return;
}
} else {
FutureUtils.cancel(newTask);
return;
}
}
}

public final void reroute(String reason) {
performReroute(reason);
}
Expand Down Expand Up @@ -119,6 +159,8 @@ public void onFailure(String source, Exception e) {
rerouting.set(false);
ClusterState state = clusterService.state();
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
} finally {
pendingTask.set(null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Random;
import java.util.function.Function;

/**
*
* A policy controls when to retry allocating if shard allocation has failed.
*/
public abstract class AllocationRetryBackoffPolicy {
public enum PolicyType {
NO_BACKOFF(5) {
@Override
AllocationRetryBackoffPolicy policyForSettings(Settings settings) {
return noBackOffPolicy();
}
},
EXPONENTIAL_BACKOFF(1000) {
@Override
AllocationRetryBackoffPolicy policyForSettings(Settings settings) {
return exponentialBackoffPolicy(settings);
}
};
private final int defaultMaxRetries;

abstract AllocationRetryBackoffPolicy policyForSettings(Settings settings);

PolicyType(int defaultMaxRetries) {
this.defaultMaxRetries = defaultMaxRetries;
}

public int getDefaultMaxRetries() {
return defaultMaxRetries;
}

public static PolicyType fromString(String policyName) {
if ("exponential_backoff".equals(policyName)) {
return EXPONENTIAL_BACKOFF;
} else if ("no_backoff".equals(policyName)) {
return NO_BACKOFF;
}
throw new IllegalStateException("No backoff policy name match for [" + policyName + "]");
}
}

public static final Setting<PolicyType> SETTING_ALLOCATION_RETRY_POLICY =
new Setting<>("cluster.allocation.retry.policy", "exponential_backoff", PolicyType::fromString, Setting.Property.NodeScope);

public static final Function<Settings, Integer> SETTING_ALLOCATION_DEFAULT_MAX_RETRIES =
settings -> SETTING_ALLOCATION_RETRY_POLICY.get(settings).defaultMaxRetries;

public static final Setting<TimeValue> SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY =
Setting.positiveTimeSetting("cluster.allocation.retry.exponential_backoff.base_delay",
TimeValue.timeValueMillis(50), Setting.Property.NodeScope);

public static final Setting<TimeValue> SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY =
Setting.positiveTimeSetting("cluster.allocation.retry.exponential_backoff.max_delay",
TimeValue.timeValueMinutes(30), Setting.Property.NodeScope);

/**
* Determines a delay interval after a shard allocation has failed numOfFailures times.
* This method may produce different value for each call.
*/
public abstract TimeValue delayInterval(int numOfFailures);

/**
* Constructs the allocation retry policy for the given settings.
*/
public static AllocationRetryBackoffPolicy policyForSettings(Settings settings) {
return SETTING_ALLOCATION_RETRY_POLICY.get(settings).policyForSettings(settings);
}

public static AllocationRetryBackoffPolicy exponentialBackoffPolicy(Settings settings) {
return new ExponentialBackOffPolicy(settings);
}

public static AllocationRetryBackoffPolicy noBackOffPolicy() {
return new NoBackoffPolicy();
}

static class ExponentialBackOffPolicy extends AllocationRetryBackoffPolicy {
private final Random random;
private final long delayUnitMS;
private final long maxDelayMS;

ExponentialBackOffPolicy(Settings settings) {
this.random = new Random(Randomness.get().nextInt());
this.delayUnitMS = SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY.get(settings).millis();
this.maxDelayMS = SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY.get(settings).millis();
}

@Override
public TimeValue delayInterval(int numOfFailures) {
assert numOfFailures >= 0;
int bound = numOfFailures > 30 ? Integer.MAX_VALUE : 1 << numOfFailures;
return TimeValue.timeValueMillis(Math.min(maxDelayMS, delayUnitMS * random.nextInt(bound)));
}
}

static class NoBackoffPolicy extends AllocationRetryBackoffPolicy {
@Override
public TimeValue delayInterval(int numOfFailures) {
return TimeValue.ZERO;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,8 +38,8 @@
*/
public class MaxRetryAllocationDecider extends AllocationDecider {

public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries", 5, 0,
Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries",
AllocationRetryBackoffPolicy.SETTING_ALLOCATION_DEFAULT_MAX_RETRIES, 0, Setting.Property.Dynamic, Setting.Property.IndexScope);

public static final String NAME = "max_retry";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,10 @@ public static Setting<Integer> intSetting(String key, int defaultValue, int minV
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), properties);
}

public static Setting<Integer> intSetting(String key, Function<Settings, Integer> defaultValue, int minValue, Property... properties) {
return new Setting<>(key, defaultValue.andThen(n -> Integer.toString(n)), (s) -> parseInt(s, minValue, key), properties);
}

public static Setting<Integer> intSetting(String key, Setting<Integer> fallbackSetting, int minValue, Property... properties) {
return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key), properties);
}
Expand Down
Loading