Skip to content

[7.x] ILM: Make all the shrink action steps retryable (#70107) #70573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions docs/reference/ilm/actions/ilm-shrink.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,29 @@

Phases allowed: hot, warm.

Sets an index to <<dynamic-index-settings, read-only>>
and shrinks it into a new index with fewer primary shards.
The name of the new index is of the form `shrink-<original-index-name>`.
For example, if the name of the source index is _logs_,
the name of the shrunken index is _shrink-logs_.
Sets a source index to <<index-blocks-read-only,read-only>> and shrinks it into
a new index with fewer primary shards. The name of the resulting index is
`shrink-<random-uuid>-<original-index-name>`. This action corresponds to the
<<indices-shrink-index,shrink API>>.

The shrink action allocates all primary shards of the index to one node so it
can call the <<indices-shrink-index,Shrink API>> to shrink the index.
After shrinking, it swaps aliases that point to the original index to the new shrunken index.
After the `shrink` action, any aliases that pointed to the source index point to
the new shrunken index. If {ilm-init} performs the `shrink` action on a backing
index for a data stream, the shrunken index replaces the source index in the
stream. You cannot perform the `shrink` action on a write index.

To use the `shrink` action in the `hot` phase, the `rollover` action *must* be present.
If no rollover action is configured, {ilm-init} will reject the policy.
To use the `shrink` action in the `hot` phase, the `rollover` action *must* be
present. If no rollover action is configured, {ilm-init} will reject the policy.

[IMPORTANT]
If the shrink action is used on a <<ccr-put-follow,follower index>>,
policy execution waits until the leader index rolls over (or is
<<skipping-rollover, otherwise marked complete>>),
then converts the follower index into a regular index with the
<<ilm-unfollow,unfollow>> action before performing the shrink operation.

If the managed index is part of a <<data-streams, data stream>>,
the shrunken index replaces the original index in the data stream.

[NOTE]
This action cannot be performed on a data stream's write index. Attempts to do
so will fail. To shrink the index, first
<<manually-roll-over-a-data-stream,manually roll over>> the data stream. This
creates a new write index. Because the index is no longer the stream's write
index, the action can resume shrinking it.
Using a policy that makes use of the <<ilm-rollover, rollover>> action
in the hot phase will avoid this situation and the need for a manual rollover for future
managed indices.
If the shrink action is used on a <<ccr-put-follow,follower index>>, policy
execution waits until the leader index rolls over (or is <<skipping-rollover,
otherwise marked complete>>), then converts the follower index into a regular
index with the <<ilm-unfollow,unfollow>> action before performing the shrink
operation.

[[ilm-shrink-options]]
==== Shrink options

`number_of_shards`::
(Optional, integer)
Number of shards to shrink to.
Expand Down Expand Up @@ -103,3 +91,27 @@ PUT _ilm/policy/my_policy
}
}
--------------------------------------------------

[[ilm-shrink-shard-allocation]]
==== Shard allocation for shrink

During a `shrink` action, {ilm-init} allocates the source index's primary shards
to one node. After shrinking the index, {ilm-init} reallocates the shrunken
index's shards to the appropriate nodes based on your allocation rules.

These allocation steps can fail for several reasons, including:

* A node is removed during the `shrink` action.
* No node has enough disk space to host the source index's shards.
* {es} cannot reallocate the shrunken index due to conflicting allocation rules.

When one of the allocation steps fails, {ilm-init} waits for the period set in
<<index-lifecycle-step-wait-time-threshold,`index.lifecycle.step.wait_time_threshold`>>,
which defaults to 12 hours. This threshold period lets the cluster resolve any
issues causing the allocation failure.

If the threshold period passes and {ilm-init} has not yet shrunk the index,
{ilm-init} attempts to allocate the source index's primary shards to another
node. If {ilm-init} shrunk the index but could not reallocate the shrunken
index's shards during the threshold period, {ilm-init} deletes the shrunken
index and re-attempts the entire `shrink` action.
7 changes: 7 additions & 0 deletions docs/reference/settings/ilm-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ An index that was rolled over would normally match the full format,
for example `logs-2016.10.31-000002`).
If the index name doesn't match the pattern, index creation fails.

[[index-lifecycle-step-wait-time-threshold]]
`index.lifecycle.step.wait_time_threshold`::
(<<indices-update-settings,Dynamic>>, <<time-units,time value>>)
Time to wait for the cluster to resolve allocation issues during an {ilm-init}
<<ilm-shrink,`shrink`>> action. Must be greater than `1h` (1 hour). Defaults to
`12h` (12 hours). See <<ilm-shrink-shard-allocation>>.

`index.lifecycle.rollover_alias`::
(<<indices-update-settings,Dynamic>>, string)
The index alias to update when the index rolls over. Specify when using a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public class CheckShrinkReadyStep extends ClusterStateWaitStep {
super(key, nextStepKey);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetadata idxMeta = clusterState.metadata().index(index);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexNotFoundException;

import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.fromIndexMetadata;

/**
* Deletes the index identified by the shrink index name stored in the lifecycle state of the managed index (if any was generated)
*/
public class CleanupShrinkIndexStep extends AsyncRetryDuringSnapshotActionStep {
public static final String NAME = "cleanup-shrink-index";
private static final Logger logger = LogManager.getLogger(CleanupShrinkIndexStep.class);

public CleanupShrinkIndexStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
final String shrunkenIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(shrunkenIndexSource) == false) {
// the current managed index is a shrunk index
if (currentClusterState.metadata().index(shrunkenIndexSource) == null) {
// if the source index does not exist, we'll skip deleting the
// (managed) shrunk index as that will cause data loss
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings());
logger.warn("managed index [{}] as part of policy [{}] is a shrunk index and the source index [{}] does not exist " +
"anymore. will skip the [{}] step", indexMetadata.getIndex().getName(), policyName, shrunkenIndexSource, NAME);
listener.onResponse(true);
return;
}
}

LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
final String shrinkIndexName = lifecycleState.getShrinkIndexName();
// if the shrink index was not generated there is nothing to delete so we move on
if (Strings.hasText(shrinkIndexName) == false) {
listener.onResponse(true);
return;
}
getClient().admin().indices()
.delete(new DeleteIndexRequest(shrinkIndexName).masterNodeTimeout(getMasterTimeout(currentClusterState)),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
// even if not all nodes acked the delete request yet we can consider this operation as successful as
// we'll generate a new index name and attempt to shrink into the newly generated name
listener.onResponse(true);
}

@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
// we can move on if the index was deleted in the meantime
listener.onResponse(true);
} else {
listener.onFailure(e);
}
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo;

import java.time.Clock;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.fromIndexMetadata;

/**
* This step wraps an {@link ClusterStateWaitStep} in order to be able to manipulate what the next step will be, depending on the result of
* the wrapped {@link ClusterStateWaitStep}.
* <p>
* If the action response is complete, the {@link ClusterStateWaitUntilThresholdStep}'s nextStepKey will be the nextStepKey of the
* wrapped action. When the threshold level is surpassed, if the underlying step's condition was not met, the nextStepKey will be changed to
* the provided {@link #nextKeyOnThresholdBreach} and this step will stop waiting.
*
* Failures encountered whilst executing the wrapped action will be propagated directly.
*/
public class ClusterStateWaitUntilThresholdStep extends ClusterStateWaitStep {

private static final Logger logger = LogManager.getLogger(ClusterStateWaitUntilThresholdStep.class);

private final ClusterStateWaitStep stepToExecute;
private final StepKey nextKeyOnThresholdBreach;
private final AtomicBoolean thresholdPassed = new AtomicBoolean(false);

public ClusterStateWaitUntilThresholdStep(ClusterStateWaitStep stepToExecute, StepKey nextKeyOnThresholdBreach) {
super(stepToExecute.getKey(), stepToExecute.getNextStepKey());
this.stepToExecute = stepToExecute;
this.nextKeyOnThresholdBreach = nextKeyOnThresholdBreach;
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetadata idxMeta = clusterState.metadata().index(index);
if (idxMeta == null) {
// Index must have been since deleted, ignore it
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists",
getKey().getAction(), index.getName());
return new Result(false, null);
}

Result stepResult = stepToExecute.isConditionMet(index, clusterState);

if (stepResult.isComplete() == false) {
// checking the threshold after we execute the step to make sure we execute the wrapped step at least once (because time is a
// wonderful thing)
TimeValue retryThreshold = LifecycleSettings.LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING.get(idxMeta.getSettings());
LifecycleExecutionState lifecycleState = fromIndexMetadata(idxMeta);
if (waitedMoreThanThresholdLevel(retryThreshold, lifecycleState, Clock.systemUTC())) {
// we retried this step enough, next step will be the configured to {@code nextKeyOnThresholdBreach}
thresholdPassed.set(true);

String message = String.format(Locale.ROOT, "[%s] lifecycle step, as part of [%s] action, for index [%s] executed for" +
" more than [%s]. Abandoning execution and moving to the next fallback step [%s]",
getKey().getName(), getKey().getAction(), idxMeta.getIndex().getName(), retryThreshold,
nextKeyOnThresholdBreach);
logger.debug(message);

return new Result(true, new SingleMessageFieldInfo(message));
}
}

return stepResult;
}

static boolean waitedMoreThanThresholdLevel(@Nullable TimeValue retryThreshold, LifecycleExecutionState lifecycleState, Clock clock) {
assert lifecycleState.getStepTime() != null : "lifecycle state [" + lifecycleState + "] does not have the step time set";
if (retryThreshold != null) {
// return true if the threshold was surpassed and false otherwise
return (lifecycleState.getStepTime() + retryThreshold.millis()) < clock.millis();
}
return false;
}

@Override
public StepKey getNextStepKey() {
if (thresholdPassed.get()) {
return nextKeyOnThresholdBreach;
} else {
return super.getNextStepKey();
}
}

/**
* Represents the {@link ClusterStateWaitStep} that's wrapped by this branching step.
*/
ClusterStateWaitStep getStepToExecute() {
return stepToExecute;
}

/**
* The step key to be reported as the {@link ClusterStateWaitUntilThresholdStep#getNextStepKey()} if the index configured a max wait
* time using {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING} and the threshold was passed.
*/
StepKey getNextKeyOnThreshold() {
return nextKeyOnThresholdBreach;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (super.equals(o) == false) {
return false;
}
ClusterStateWaitUntilThresholdStep that = (ClusterStateWaitUntilThresholdStep) o;
return super.equals(o)
&& Objects.equals(stepToExecute, that.stepToExecute)
&& Objects.equals(nextKeyOnThresholdBreach, that.nextKeyOnThresholdBreach);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), stepToExecute, nextKeyOnThresholdBreach);
}
}
Loading