Skip to content

Commit fbf335d

Browse files
authored
Extract transport cluster settings/ilm execute logic (#86941)
Extract execute logic from the transport actions for cluster update settings and ILM put/delete to support future reuse for operator file based updates. Relates to #86224
1 parent 3ddad4a commit fbf335d

File tree

3 files changed

+143
-94
lines changed

3 files changed

+143
-94
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.ElasticsearchException;
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1617
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1718
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1819
import org.elasticsearch.cluster.ClusterState;
@@ -137,11 +138,7 @@ protected void masterOperation(
137138
final ClusterState state,
138139
final ActionListener<ClusterUpdateSettingsResponse> listener
139140
) {
140-
final SettingsUpdater updater = new SettingsUpdater(clusterSettings);
141-
submitUnbatchedTask(UPDATE_TASK_SOURCE, new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) {
142-
143-
private volatile boolean changed = false;
144-
141+
submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
145142
@Override
146143
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
147144
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
@@ -225,21 +222,40 @@ public void onFailure(Exception e) {
225222
logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
226223
super.onFailure(e);
227224
}
228-
229-
@Override
230-
public ClusterState execute(final ClusterState currentState) {
231-
final ClusterState clusterState = updater.updateSettings(
232-
currentState,
233-
clusterSettings.upgradeSettings(request.transientSettings()),
234-
clusterSettings.upgradeSettings(request.persistentSettings()),
235-
logger
236-
);
237-
changed = clusterState != currentState;
238-
return clusterState;
239-
}
240225
});
241226
}
242227

228+
public static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
229+
protected volatile boolean changed = false;
230+
protected final SettingsUpdater updater;
231+
protected final ClusterUpdateSettingsRequest request;
232+
private final ClusterSettings clusterSettings;
233+
234+
public ClusterUpdateSettingsTask(
235+
final ClusterSettings clusterSettings,
236+
Priority priority,
237+
ClusterUpdateSettingsRequest request,
238+
ActionListener<? extends AcknowledgedResponse> listener
239+
) {
240+
super(priority, request, listener);
241+
this.clusterSettings = clusterSettings;
242+
this.updater = new SettingsUpdater(clusterSettings);
243+
this.request = request;
244+
}
245+
246+
@Override
247+
public ClusterState execute(final ClusterState currentState) {
248+
final ClusterState clusterState = updater.updateSettings(
249+
currentState,
250+
clusterSettings.upgradeSettings(request.transientSettings()),
251+
clusterSettings.upgradeSettings(request.persistentSettings()),
252+
logger
253+
);
254+
changed = clusterState != currentState;
255+
return clusterState;
256+
}
257+
}
258+
243259
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
244260
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
245261
clusterService.submitUnbatchedStateUpdateTask(source, task);

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -59,34 +59,43 @@ public TransportDeleteLifecycleAction(
5959

6060
@Override
6161
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
62-
submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new AckedClusterStateUpdateTask(request, listener) {
63-
@Override
64-
public ClusterState execute(ClusterState currentState) {
65-
String policyToDelete = request.getPolicyName();
66-
List<String> indicesUsingPolicy = currentState.metadata()
67-
.indices()
68-
.values()
69-
.stream()
70-
.filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
71-
.map(idxMeta -> idxMeta.getIndex().getName())
72-
.toList();
73-
if (indicesUsingPolicy.isEmpty() == false) {
74-
throw new IllegalArgumentException(
75-
"Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
76-
);
77-
}
78-
ClusterState.Builder newState = ClusterState.builder(currentState);
79-
IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
80-
if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
81-
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
82-
}
83-
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
84-
newPolicies.remove(request.getPolicyName());
85-
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
86-
newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
87-
return newState.build();
62+
submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new DeleteLifecyclePolicyTask(request, listener));
63+
}
64+
65+
public static class DeleteLifecyclePolicyTask extends AckedClusterStateUpdateTask {
66+
private final Request request;
67+
68+
public DeleteLifecyclePolicyTask(Request request, ActionListener<AcknowledgedResponse> listener) {
69+
super(request, listener);
70+
this.request = request;
71+
}
72+
73+
@Override
74+
public ClusterState execute(ClusterState currentState) {
75+
String policyToDelete = request.getPolicyName();
76+
List<String> indicesUsingPolicy = currentState.metadata()
77+
.indices()
78+
.values()
79+
.stream()
80+
.filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
81+
.map(idxMeta -> idxMeta.getIndex().getName())
82+
.toList();
83+
if (indicesUsingPolicy.isEmpty() == false) {
84+
throw new IllegalArgumentException(
85+
"Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
86+
);
87+
}
88+
ClusterState.Builder newState = ClusterState.builder(currentState);
89+
IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
90+
if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
91+
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
8892
}
89-
});
93+
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
94+
newPolicies.remove(request.getPolicyName());
95+
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
96+
newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
97+
return newState.build();
98+
}
9099
}
91100

92101
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -109,61 +109,85 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
109109
}
110110
}
111111

112-
submitUnbatchedTask("put-lifecycle-" + request.getPolicy().getName(), new AckedClusterStateUpdateTask(request, listener) {
113-
@Override
114-
public ClusterState execute(ClusterState currentState) throws Exception {
115-
final IndexLifecycleMetadata currentMetadata = currentState.metadata()
116-
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
117-
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas()
118-
.get(request.getPolicy().getName());
112+
submitUnbatchedTask(
113+
"put-lifecycle-" + request.getPolicy().getName(),
114+
new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client)
115+
);
116+
}
119117

120-
// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
121-
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
122-
return currentState;
123-
}
118+
public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
119+
private final Request request;
120+
private final XPackLicenseState licenseState;
121+
private final Map<String, String> filteredHeaders;
122+
private final NamedXContentRegistry xContentRegistry;
123+
private final Client client;
124124

125-
validatePrerequisites(request.getPolicy(), currentState);
125+
public UpdateLifecyclePolicyTask(
126+
Request request,
127+
ActionListener<AcknowledgedResponse> listener,
128+
XPackLicenseState licenseState,
129+
Map<String, String> filteredHeaders,
130+
NamedXContentRegistry xContentRegistry,
131+
Client client
132+
) {
133+
super(request, listener);
134+
this.request = request;
135+
this.licenseState = licenseState;
136+
this.filteredHeaders = filteredHeaders;
137+
this.xContentRegistry = xContentRegistry;
138+
this.client = client;
139+
}
126140

127-
ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
128-
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
129-
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
130-
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
131-
request.getPolicy(),
132-
filteredHeaders,
133-
nextVersion,
134-
Instant.now().toEpochMilli()
135-
);
136-
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
137-
if (oldPolicy == null) {
138-
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
139-
} else {
140-
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
141-
}
142-
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
143-
stateBuilder.metadata(
144-
Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()
145-
);
146-
ClusterState nonRefreshedState = stateBuilder.build();
147-
if (oldPolicy == null) {
141+
@Override
142+
public ClusterState execute(ClusterState currentState) throws Exception {
143+
final IndexLifecycleMetadata currentMetadata = currentState.metadata()
144+
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
145+
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas().get(request.getPolicy().getName());
146+
147+
// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
148+
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
149+
return currentState;
150+
}
151+
152+
validatePrerequisites(request.getPolicy(), currentState, licenseState);
153+
154+
ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
155+
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
156+
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
157+
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
158+
request.getPolicy(),
159+
filteredHeaders,
160+
nextVersion,
161+
Instant.now().toEpochMilli()
162+
);
163+
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
164+
if (oldPolicy == null) {
165+
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
166+
} else {
167+
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
168+
}
169+
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
170+
stateBuilder.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
171+
ClusterState nonRefreshedState = stateBuilder.build();
172+
if (oldPolicy == null) {
173+
return nonRefreshedState;
174+
} else {
175+
try {
176+
return updateIndicesForPolicy(
177+
nonRefreshedState,
178+
xContentRegistry,
179+
client,
180+
oldPolicy.getPolicy(),
181+
lifecyclePolicyMetadata,
182+
licenseState
183+
);
184+
} catch (Exception e) {
185+
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
186+
// Revert to the non-refreshed state
148187
return nonRefreshedState;
149-
} else {
150-
try {
151-
return updateIndicesForPolicy(
152-
nonRefreshedState,
153-
xContentRegistry,
154-
client,
155-
oldPolicy.getPolicy(),
156-
lifecyclePolicyMetadata,
157-
licenseState
158-
);
159-
} catch (Exception e) {
160-
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
161-
// Revert to the non-refreshed state
162-
return nonRefreshedState;
163-
}
164188
}
165189
}
166-
});
190+
}
167191
}
168192

169193
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
@@ -193,7 +217,7 @@ static boolean isNoopUpdate(
193217
* @param policy The lifecycle policy
194218
* @param state The cluster state
195219
*/
196-
private void validatePrerequisites(LifecyclePolicy policy, ClusterState state) {
220+
private static void validatePrerequisites(LifecyclePolicy policy, ClusterState state, XPackLicenseState licenseState) {
197221
List<Phase> phasesWithSearchableSnapshotActions = policy.getPhases()
198222
.values()
199223
.stream()

0 commit comments

Comments
 (0)