Skip to content

[ML] removing old 7.x bwc serialization code #80029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,15 @@ public MlMetadata(StreamInput in) throws IOException {
this.datafeeds = datafeeds;
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
this.upgradeMode = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should really have been V_7_13_0, so this is actually a bad bug that would mean 8.0.0-alpha1, 8.0.0-alpha2 and 8.0.0-beta1 cannot safely run in a mixed version cluster with 7.16.x if the ML metadata exists and an MlMetadata diff gets bundled up into the same cluster state update as some other update. I guess we never hit that in testing. 😬

I'm wondering if we need a known issue for 8.0.0-beta1 telling people not to attempt a rolling upgrade to it if they are using ML.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at 7.16, at least its correct there 😌

IDK if we have a list of known issues or not for beta1. If we do, this should definitely be on it.

this.resetMode = in.readBoolean();
} else {
this.resetMode = false;
}
this.resetMode = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeMap(jobs, out);
writeMap(datafeeds, out);
out.writeBoolean(upgradeMode);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(resetMode);
}
out.writeBoolean(resetMode);
}

private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
Expand Down Expand Up @@ -240,11 +234,7 @@ public MlMetadataDiff(StreamInput in) throws IOException {
MlMetadataDiff::readDatafeedDiffFrom
);
upgradeMode = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
resetMode = in.readBoolean();
} else {
resetMode = false;
}
resetMode = in.readBoolean();
}

/**
Expand All @@ -264,9 +254,7 @@ public void writeTo(StreamOutput out) throws IOException {
jobs.writeTo(out);
datafeeds.writeTo(out);
out.writeBoolean(upgradeMode);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(resetMode);
}
out.writeBoolean(resetMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure;
Expand Down Expand Up @@ -200,26 +199,10 @@ public Stats(StreamInput in) throws IOException {
id = in.readString();
state = DataFrameAnalyticsState.fromStream(in);
failureReason = in.readOptionalString();
if (in.getVersion().before(Version.V_7_4_0)) {
progress = readProgressFromLegacy(state, in);
} else {
progress = in.readList(PhaseProgress::new);
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
dataCounts = new DataCounts(in);
} else {
dataCounts = null;
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
memoryUsage = new MemoryUsage(in);
} else {
memoryUsage = null;
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
analysisStats = in.readOptionalNamedWriteable(AnalysisStats.class);
} else {
analysisStats = null;
}
progress = in.readList(PhaseProgress::new);
dataCounts = new DataCounts(in);
memoryUsage = new MemoryUsage(in);
analysisStats = in.readOptionalNamedWriteable(AnalysisStats.class);
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
}
Expand Down Expand Up @@ -355,50 +338,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
state.writeTo(out);
out.writeOptionalString(failureReason);
if (out.getVersion().before(Version.V_7_4_0)) {
writeProgressToLegacy(out);
} else {
out.writeList(progress);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
dataCounts.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
memoryUsage.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalNamedWriteable(analysisStats);
}
out.writeList(progress);
dataCounts.writeTo(out);
memoryUsage.writeTo(out);
out.writeOptionalNamedWriteable(analysisStats);
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
}

private void writeProgressToLegacy(StreamOutput out) throws IOException {
String targetPhase = null;
switch (state) {
case ANALYZING:
targetPhase = "analyzing";
break;
case REINDEXING:
targetPhase = "reindexing";
break;
case STARTING:
case STARTED:
case STOPPED:
case STOPPING:
default:
break;
}

Integer legacyProgressPercent = null;
for (PhaseProgress phaseProgress : progress) {
if (phaseProgress.getPhase().equals(targetPhase)) {
legacyProgressPercent = phaseProgress.getProgressPercent();
}
}
out.writeOptionalInt(legacyProgressPercent);
}

@Override
public int hashCode() {
return Objects.hash(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -36,7 +35,7 @@ private GetTrainedModelsAction() {
}

public static class Includes implements Writeable {
static final String DEFINITION = "definition";
public static final String DEFINITION = "definition";
static final String TOTAL_FEATURE_IMPORTANCE = "total_feature_importance";
static final String FEATURE_IMPORTANCE_BASELINE = "feature_importance_baseline";
static final String HYPERPARAMETERS = "hyperparameters";
Expand Down Expand Up @@ -126,18 +125,6 @@ public static class Request extends AbstractGetResourcesRequest {
private final Includes includes;
private final List<String> tags;

@Deprecated
public Request(String id, boolean includeModelDefinition, List<String> tags) {
setResourceId(id);
setAllowNoResources(true);
this.tags = tags == null ? Collections.emptyList() : tags;
if (includeModelDefinition) {
this.includes = Includes.forModelDefinition();
} else {
this.includes = Includes.empty();
}
}

public Request(String id) {
this(id, null, null);
}
Expand All @@ -151,11 +138,7 @@ public Request(String id, List<String> tags, Set<String> includes) {

public Request(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
this.includes = new Includes(in);
} else {
this.includes = in.readBoolean() ? Includes.forModelDefinition() : Includes.empty();
}
this.includes = new Includes(in);
this.tags = in.readStringList();
}

Expand All @@ -175,11 +158,7 @@ public Includes getIncludes() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
this.includes.writeTo(out);
} else {
out.writeBoolean(this.includes.isIncludeModelDefinition());
}
this.includes.writeTo(out);
out.writeStringCollection(tags);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -15,11 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.results.InferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfigUpdate;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

Expand Down Expand Up @@ -74,18 +69,7 @@ public Request(StreamInput in) throws IOException {
super(in);
this.modelId = in.readString();
this.objectsToInfer = Collections.unmodifiableList(in.readList(StreamInput::readMap));
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
this.update = in.readNamedWriteable(InferenceConfigUpdate.class);
} else {
InferenceConfig oldConfig = in.readNamedWriteable(InferenceConfig.class);
if (oldConfig instanceof RegressionConfig) {
this.update = RegressionConfigUpdate.fromConfig((RegressionConfig) oldConfig);
} else if (oldConfig instanceof ClassificationConfig) {
this.update = ClassificationConfigUpdate.fromConfig((ClassificationConfig) oldConfig);
} else {
throw new IOException("Unexpected configuration type [" + oldConfig.getName() + "]");
}
}
this.update = in.readNamedWriteable(InferenceConfigUpdate.class);
this.previouslyLicensed = in.readBoolean();
}

Expand Down Expand Up @@ -115,11 +99,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(modelId);
out.writeCollection(objectsToInfer, StreamOutput::writeMap);
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
out.writeNamedWriteable(update);
} else {
out.writeNamedWriteable(update.toConfig());
}
out.writeNamedWriteable(update);
out.writeBoolean(previouslyLicensed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
Expand Down Expand Up @@ -74,11 +73,7 @@ public Request(StreamInput in) throws IOException {
jobId = in.readString();
snapshotId = in.readString();
deleteInterveningResults = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_11_0)) {
force = in.readBoolean();
} else {
force = false;
}
force = in.readBoolean();
}

public Request(String jobId, String snapshotId) {
Expand Down Expand Up @@ -121,9 +116,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeString(snapshotId);
out.writeBoolean(deleteInterveningResults);
if (out.getVersion().onOrAfter(Version.V_7_11_0)) {
out.writeBoolean(force);
}
out.writeBoolean(force);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,30 +173,17 @@ private DataFrameAnalyticsConfig(

public DataFrameAnalyticsConfig(StreamInput in) throws IOException {
this.id = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
description = in.readOptionalString();
} else {
description = null;
}
this.description = in.readOptionalString();
this.source = new DataFrameAnalyticsSource(in);
this.dest = new DataFrameAnalyticsDest(in);
this.analysis = in.readNamedWriteable(DataFrameAnalysis.class);
this.analyzedFields = in.readOptionalWriteable(FetchSourceContext::new);
this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new);
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
createTime = in.readOptionalInstant();
version = in.readBoolean() ? Version.readVersion(in) : null;
} else {
createTime = null;
version = null;
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
allowLazyStart = in.readBoolean();
} else {
allowLazyStart = false;
}
maxNumThreads = in.readVInt();
this.createTime = in.readOptionalInstant();
this.version = in.readBoolean() ? Version.readVersion(in) : null;
this.allowLazyStart = in.readBoolean();
this.maxNumThreads = in.readVInt();
}

public String getId() {
Expand Down Expand Up @@ -291,27 +278,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeOptionalString(description);
}
out.writeOptionalString(description);
source.writeTo(out);
dest.writeTo(out);
out.writeNamedWriteable(analysis);
out.writeOptionalWriteable(analyzedFields);
out.writeOptionalWriteable(modelMemoryLimit);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalInstant(createTime);
if (version != null) {
out.writeBoolean(true);
Version.writeVersion(version, out);
} else {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(allowLazyStart);
out.writeOptionalInstant(createTime);
if (version != null) {
out.writeBoolean(true);
Version.writeVersion(version, out);
} else {
out.writeBoolean(false);
}
out.writeBoolean(allowLazyStart);
out.writeVInt(maxNumThreads);
}

Expand Down
Loading