Skip to content

Extract transport cluster settings/ilm execute logic #86941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -137,11 +138,7 @@ protected void masterOperation(
final ClusterState state,
final ActionListener<ClusterUpdateSettingsResponse> listener
) {
final SettingsUpdater updater = new SettingsUpdater(clusterSettings);
submitUnbatchedTask(UPDATE_TASK_SOURCE, new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) {

private volatile boolean changed = false;

submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
@Override
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
Expand Down Expand Up @@ -225,21 +222,40 @@ public void onFailure(Exception e) {
logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
super.onFailure(e);
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
currentState,
clusterSettings.upgradeSettings(request.transientSettings()),
clusterSettings.upgradeSettings(request.persistentSettings()),
logger
);
changed = clusterState != currentState;
return clusterState;
}
});
}

public static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
protected volatile boolean changed = false;
protected final SettingsUpdater updater;
protected final ClusterUpdateSettingsRequest request;
private final ClusterSettings clusterSettings;

public ClusterUpdateSettingsTask(
final ClusterSettings clusterSettings,
Priority priority,
ClusterUpdateSettingsRequest request,
ActionListener<? extends AcknowledgedResponse> listener
) {
super(priority, request, listener);
this.clusterSettings = clusterSettings;
this.updater = new SettingsUpdater(clusterSettings);
this.request = request;
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
currentState,
clusterSettings.upgradeSettings(request.transientSettings()),
clusterSettings.upgradeSettings(request.persistentSettings()),
logger
);
changed = clusterState != currentState;
return clusterState;
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,34 +59,43 @@ public TransportDeleteLifecycleAction(

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
String policyToDelete = request.getPolicyName();
List<String> indicesUsingPolicy = currentState.metadata()
.indices()
.values()
.stream()
.filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
.map(idxMeta -> idxMeta.getIndex().getName())
.toList();
if (indicesUsingPolicy.isEmpty() == false) {
throw new IllegalArgumentException(
"Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
}
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
newPolicies.remove(request.getPolicyName());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();
submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new DeleteLifecyclePolicyTask(request, listener));
}

public static class DeleteLifecyclePolicyTask extends AckedClusterStateUpdateTask {
private final Request request;

public DeleteLifecyclePolicyTask(Request request, ActionListener<AcknowledgedResponse> listener) {
super(request, listener);
this.request = request;
}

@Override
public ClusterState execute(ClusterState currentState) {
String policyToDelete = request.getPolicyName();
List<String> indicesUsingPolicy = currentState.metadata()
.indices()
.values()
.stream()
.filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
.map(idxMeta -> idxMeta.getIndex().getName())
.toList();
if (indicesUsingPolicy.isEmpty() == false) {
throw new IllegalArgumentException(
"Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
}
});
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
newPolicies.remove(request.getPolicyName());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,61 +109,85 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
}
}

submitUnbatchedTask("put-lifecycle-" + request.getPolicy().getName(), new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final IndexLifecycleMetadata currentMetadata = currentState.metadata()
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas()
.get(request.getPolicy().getName());
submitUnbatchedTask(
"put-lifecycle-" + request.getPolicy().getName(),
new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client)
);
}

// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
return currentState;
}
public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
private final Request request;
private final XPackLicenseState licenseState;
private final Map<String, String> filteredHeaders;
private final NamedXContentRegistry xContentRegistry;
private final Client client;

validatePrerequisites(request.getPolicy(), currentState);
public UpdateLifecyclePolicyTask(
Request request,
ActionListener<AcknowledgedResponse> listener,
XPackLicenseState licenseState,
Map<String, String> filteredHeaders,
NamedXContentRegistry xContentRegistry,
Client client
) {
super(request, listener);
this.request = request;
this.licenseState = licenseState;
this.filteredHeaders = filteredHeaders;
this.xContentRegistry = xContentRegistry;
this.client = client;
}

ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
request.getPolicy(),
filteredHeaders,
nextVersion,
Instant.now().toEpochMilli()
);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
stateBuilder.metadata(
Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()
);
ClusterState nonRefreshedState = stateBuilder.build();
if (oldPolicy == null) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final IndexLifecycleMetadata currentMetadata = currentState.metadata()
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas().get(request.getPolicy().getName());

// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
return currentState;
}

validatePrerequisites(request.getPolicy(), currentState, licenseState);

ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
request.getPolicy(),
filteredHeaders,
nextVersion,
Instant.now().toEpochMilli()
);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
stateBuilder.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
ClusterState nonRefreshedState = stateBuilder.build();
if (oldPolicy == null) {
return nonRefreshedState;
} else {
try {
return updateIndicesForPolicy(
nonRefreshedState,
xContentRegistry,
client,
oldPolicy.getPolicy(),
lifecyclePolicyMetadata,
licenseState
);
} catch (Exception e) {
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
// Revert to the non-refreshed state
return nonRefreshedState;
} else {
try {
return updateIndicesForPolicy(
nonRefreshedState,
xContentRegistry,
client,
oldPolicy.getPolicy(),
lifecyclePolicyMetadata,
licenseState
);
} catch (Exception e) {
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
// Revert to the non-refreshed state
return nonRefreshedState;
}
}
}
});
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
Expand Down Expand Up @@ -193,7 +217,7 @@ static boolean isNoopUpdate(
* @param policy The lifecycle policy
* @param state The cluster state
*/
private void validatePrerequisites(LifecyclePolicy policy, ClusterState state) {
private static void validatePrerequisites(LifecyclePolicy policy, ClusterState state, XPackLicenseState licenseState) {
List<Phase> phasesWithSearchableSnapshotActions = policy.getPhases()
.values()
.stream()
Expand Down