Skip to content

Commit e7efaba

Browse files
committed
[Transform] finalize feature reset integration (elastic#71133)
This commit updates transform feature reset to: - wait for transform tasks to complete - wait for all indexing actions to transform indices to complete - and prevents transform audit messages from being written while the reset is being processed related to elastic#70008 & elastic#69581
1 parent 5c0a773 commit e7efaba

File tree

21 files changed

+780
-196
lines changed

21 files changed

+780
-196
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@
260260
import org.elasticsearch.xpack.core.textstructure.action.FindStructureAction;
261261
import org.elasticsearch.xpack.core.transform.TransformFeatureSetUsage;
262262
import org.elasticsearch.xpack.core.transform.TransformField;
263+
import org.elasticsearch.xpack.core.transform.TransformMetadata;
263264
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction;
264265
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
265266
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
@@ -662,12 +663,14 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
662663
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new),
663664
new NamedWriteableRegistry.Entry(Task.Status.class, TransformField.TASK_NAME, TransformState::new),
664665
new NamedWriteableRegistry.Entry(PersistentTaskState.class, TransformField.TASK_NAME, TransformState::new),
665-
new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME.getPreferredName(), TimeSyncConfig::new),
666-
new NamedWriteableRegistry.Entry(
667-
RetentionPolicyConfig.class,
668-
TransformField.TIME.getPreferredName(),
669-
TimeRetentionPolicyConfig::new
670-
),
666+
new NamedWriteableRegistry.Entry(Metadata.Custom.class, TransformMetadata.TYPE, TransformMetadata::new),
667+
new NamedWriteableRegistry.Entry(NamedDiff.class, TransformMetadata.TYPE, TransformMetadata.TransformMetadataDiff::new),
668+
new NamedWriteableRegistry.Entry(SyncConfig.class, TransformField.TIME.getPreferredName(), TimeSyncConfig::new),
669+
new NamedWriteableRegistry.Entry(
670+
RetentionPolicyConfig.class,
671+
TransformField.TIME.getPreferredName(),
672+
TimeRetentionPolicyConfig::new
673+
),
671674
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.FLATTENED, FlattenedFeatureSetUsage::new),
672675
// Vectors
673676
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.VECTORS, VectorsFeatureSetUsage::new),
@@ -751,7 +754,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
751754
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TransformField.TASK_NAME),
752755
TransformState::fromXContent),
753756
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(TransformField.TASK_NAME),
754-
TransformState::fromXContent)
757+
TransformState::fromXContent),
758+
new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(TransformMetadata.TYPE),
759+
parser -> TransformMetadata.LENIENT_PARSER.parse(parser, null).build())
755760
);
756761
}
757762

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
8181
import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader;
8282
import org.elasticsearch.xpack.core.ssl.SSLService;
83+
import org.elasticsearch.xpack.core.transform.TransformMetadata;
8384
import org.elasticsearch.xpack.core.watcher.WatcherMetadata;
8485
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
8586

@@ -244,7 +245,8 @@ private static boolean alreadyContainsXPackCustomMetadata(ClusterState clusterSt
244245
return metadata.custom(LicensesMetadata.TYPE) != null ||
245246
metadata.custom(MlMetadata.TYPE) != null ||
246247
metadata.custom(WatcherMetadata.TYPE) != null ||
247-
clusterState.custom(TokenMetadata.TYPE) != null;
248+
clusterState.custom(TokenMetadata.TYPE) != null ||
249+
metadata.custom(TransformMetadata.TYPE) != null;
248250
}
249251

250252
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.action;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.elasticsearch.ElasticsearchTimeoutException;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17+
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
18+
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
19+
import org.elasticsearch.cluster.ClusterState;
20+
import org.elasticsearch.cluster.block.ClusterBlockException;
21+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
22+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.inject.Inject;
25+
import org.elasticsearch.threadpool.ThreadPool;
26+
import org.elasticsearch.transport.TransportService;
27+
28+
public abstract class AbstractTransportSetResetModeAction extends AcknowledgedTransportMasterNodeAction<SetResetModeActionRequest> {
29+
30+
private static final Logger logger = LogManager.getLogger(AbstractTransportSetResetModeAction.class);
31+
private final ClusterService clusterService;
32+
33+
@Inject
34+
public AbstractTransportSetResetModeAction(
35+
String actionName,
36+
TransportService transportService,
37+
ThreadPool threadPool,
38+
ClusterService clusterService,
39+
ActionFilters actionFilters,
40+
IndexNameExpressionResolver indexNameExpressionResolver) {
41+
super(
42+
actionName,
43+
transportService,
44+
clusterService,
45+
threadPool,
46+
actionFilters,
47+
SetResetModeActionRequest::new,
48+
indexNameExpressionResolver,
49+
ThreadPool.Names.SAME
50+
);
51+
this.clusterService = clusterService;
52+
}
53+
54+
protected abstract boolean isResetMode(ClusterState clusterState);
55+
56+
protected abstract String featureName();
57+
58+
protected abstract ClusterState setState(ClusterState oldState, SetResetModeActionRequest request);
59+
60+
@Override
61+
protected void masterOperation(SetResetModeActionRequest request,
62+
ClusterState state,
63+
ActionListener<AcknowledgedResponse> listener) throws Exception {
64+
65+
final boolean isResetModeEnabled = isResetMode(state);
66+
// Noop, nothing for us to do, simply return fast to the caller
67+
if (request.isEnabled() == isResetModeEnabled) {
68+
logger.debug(() -> new ParameterizedMessage("Reset mode noop for [{}]", featureName()));
69+
listener.onResponse(AcknowledgedResponse.TRUE);
70+
return;
71+
}
72+
73+
logger.debug(
74+
() -> new ParameterizedMessage(
75+
"Starting to set [reset_mode] for [{}] to [{}] from [{}]",
76+
featureName(),
77+
request.isEnabled(),
78+
isResetModeEnabled
79+
)
80+
);
81+
82+
ActionListener<AcknowledgedResponse> wrappedListener = ActionListener.wrap(
83+
r -> {
84+
logger.debug(() -> new ParameterizedMessage("Completed reset mode request for [{}]", featureName()));
85+
listener.onResponse(r);
86+
},
87+
e -> {
88+
logger.debug(
89+
() -> new ParameterizedMessage("Completed reset mode for [{}] request but with failure", featureName()),
90+
e
91+
);
92+
listener.onFailure(e);
93+
}
94+
);
95+
96+
ActionListener<AcknowledgedResponse> clusterStateUpdateListener = ActionListener.wrap(
97+
acknowledgedResponse -> {
98+
if (acknowledgedResponse.isAcknowledged() == false) {
99+
wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state"));
100+
return;
101+
}
102+
wrappedListener.onResponse(acknowledgedResponse);
103+
},
104+
wrappedListener::onFailure
105+
);
106+
107+
clusterService.submitStateUpdateTask(featureName() + "-set-reset-mode",
108+
new AckedClusterStateUpdateTask(request, clusterStateUpdateListener) {
109+
110+
@Override
111+
protected AcknowledgedResponse newResponse(boolean acknowledged) {
112+
logger.trace(() -> new ParameterizedMessage("Cluster update response built for [{}]: {}", featureName(), acknowledged));
113+
return AcknowledgedResponse.of(acknowledged);
114+
}
115+
116+
@Override
117+
public ClusterState execute(ClusterState currentState) {
118+
logger.trace(() -> new ParameterizedMessage("Executing cluster state update for [{}]", featureName()));
119+
return setState(currentState, request);
120+
}
121+
});
122+
}
123+
124+
@Override
125+
protected ClusterBlockException checkBlock(SetResetModeActionRequest request, ClusterState state) {
126+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
127+
}
128+
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.action;
9+
10+
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
12+
import org.elasticsearch.common.ParseField;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
16+
import org.elasticsearch.common.xcontent.ToXContent;
17+
import org.elasticsearch.common.xcontent.ToXContentObject;
18+
import org.elasticsearch.common.xcontent.XContentBuilder;
19+
20+
import java.io.IOException;
21+
import java.util.Objects;
22+
23+
public class SetResetModeActionRequest extends AcknowledgedRequest<SetResetModeActionRequest> implements ToXContentObject {
24+
public static SetResetModeActionRequest enabled() {
25+
return new SetResetModeActionRequest(true);
26+
}
27+
28+
public static SetResetModeActionRequest disabled() {
29+
return new SetResetModeActionRequest(false);
30+
}
31+
32+
private final boolean enabled;
33+
34+
private static final ParseField ENABLED = new ParseField("enabled");
35+
public static final ConstructingObjectParser<SetResetModeActionRequest, Void> PARSER =
36+
new ConstructingObjectParser<>("set_reset_mode_action_request", a -> new SetResetModeActionRequest((Boolean)a[0]));
37+
38+
static {
39+
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
40+
}
41+
42+
SetResetModeActionRequest(boolean enabled) {
43+
this.enabled = enabled;
44+
}
45+
46+
public SetResetModeActionRequest(StreamInput in) throws IOException {
47+
super(in);
48+
this.enabled = in.readBoolean();
49+
}
50+
51+
public boolean isEnabled() {
52+
return enabled;
53+
}
54+
55+
@Override
56+
public ActionRequestValidationException validate() {
57+
return null;
58+
}
59+
60+
@Override
61+
public void writeTo(StreamOutput out) throws IOException {
62+
super.writeTo(out);
63+
out.writeBoolean(enabled);
64+
}
65+
66+
@Override
67+
public int hashCode() {
68+
return Objects.hash(enabled);
69+
}
70+
71+
@Override
72+
public boolean equals(Object obj) {
73+
if (this == obj) {
74+
return true;
75+
}
76+
if (obj == null || obj.getClass() != getClass()) {
77+
return false;
78+
}
79+
SetResetModeActionRequest other = (SetResetModeActionRequest) obj;
80+
return Objects.equals(enabled, other.enabled);
81+
}
82+
83+
@Override
84+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
85+
builder.startObject();
86+
builder.field(ENABLED.getPreferredName(), enabled);
87+
builder.endObject();
88+
return builder;
89+
}
90+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java

-79
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,9 @@
66
*/
77
package org.elasticsearch.xpack.core.ml.action;
88

9-
import org.elasticsearch.action.ActionRequestValidationException;
109
import org.elasticsearch.action.ActionType;
11-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1210
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13-
import org.elasticsearch.common.ParseField;
14-
import org.elasticsearch.common.io.stream.StreamInput;
15-
import org.elasticsearch.common.io.stream.StreamOutput;
16-
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
17-
import org.elasticsearch.common.xcontent.ToXContentObject;
18-
import org.elasticsearch.common.xcontent.XContentBuilder;
1911

20-
import java.io.IOException;
21-
import java.util.Objects;
2212

2313
public class SetResetModeAction extends ActionType<AcknowledgedResponse> {
2414

@@ -29,73 +19,4 @@ private SetResetModeAction() {
2919
super(NAME, AcknowledgedResponse::readFrom);
3020
}
3121

32-
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
33-
34-
public static Request enabled() {
35-
return new Request(true);
36-
}
37-
38-
public static Request disabled() {
39-
return new Request(false);
40-
}
41-
42-
private final boolean enabled;
43-
44-
private static final ParseField ENABLED = new ParseField("enabled");
45-
public static final ConstructingObjectParser<Request, Void> PARSER =
46-
new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0]));
47-
48-
static {
49-
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
50-
}
51-
52-
Request(boolean enabled) {
53-
this.enabled = enabled;
54-
}
55-
56-
public Request(StreamInput in) throws IOException {
57-
super(in);
58-
this.enabled = in.readBoolean();
59-
}
60-
61-
public boolean isEnabled() {
62-
return enabled;
63-
}
64-
65-
@Override
66-
public ActionRequestValidationException validate() {
67-
return null;
68-
}
69-
70-
@Override
71-
public void writeTo(StreamOutput out) throws IOException {
72-
super.writeTo(out);
73-
out.writeBoolean(enabled);
74-
}
75-
76-
@Override
77-
public int hashCode() {
78-
return Objects.hash(enabled);
79-
}
80-
81-
@Override
82-
public boolean equals(Object obj) {
83-
if (this == obj) {
84-
return true;
85-
}
86-
if (obj == null || obj.getClass() != getClass()) {
87-
return false;
88-
}
89-
Request other = (Request) obj;
90-
return Objects.equals(enabled, other.enabled);
91-
}
92-
93-
@Override
94-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
95-
builder.startObject();
96-
builder.field(ENABLED.getPreferredName(), enabled);
97-
builder.endObject();
98-
return builder;
99-
}
100-
}
10122
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public class TransformMessages {
2929
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
3030
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
3131

32+
public static final String FAILED_TO_UNSET_RESET_MODE =
33+
"Failed to set [reset_mode] to [false] after {0}. To allow transforms to run, please call the feature reset API again";
34+
3235
public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
3336
public static final String REST_WARN_NO_TRANSFORM_NODES =
3437
"Transform requires the transform node role for at least 1 node, found no transform nodes";

0 commit comments

Comments
 (0)