Skip to content

[ML-DataFrame] Remove ID field from data frame indexer stats #44768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DataFrameIndexerTransformStatsTests extends AbstractHlrcXContentTes

public static DataFrameIndexerTransformStats fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats instance) {
return DataFrameIndexerTransformStats.withDefaultTransformId(instance.getNumPages(), instance.getNumDocuments(),
return new DataFrameIndexerTransformStats(instance.getNumPages(), instance.getNumDocuments(),
instance.getOutputDocuments(), instance.getNumInvocations(), instance.getIndexTime(), instance.getSearchTime(),
instance.getIndexTotal(), instance.getSearchTotal(), instance.getIndexFailures(), instance.getSearchFailures());
}
Expand All @@ -48,16 +48,16 @@ public DataFrameIndexerTransformStats convertHlrcToInternal(
return fromHlrc(instance);
}

public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
}

@Override
protected DataFrameIndexerTransformStats createTestInstance() {
return randomStats(DataFrameIndexerTransformStats.DEFAULT_TRANSFORM_ID);
return randomStats();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public static DataFrameTransformStats randomDataFrameTransformStats() {
randomFrom(DataFrameTransformTaskState.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : randomNodeAttributes(),
// TODO: remove this ID field from the server side as it's no longer needed
randomStats("_all"),
randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}

Expand Down Expand Up @@ -132,8 +131,8 @@ public static NodeAttributes randomNodeAttributes() {
attributes);
}

public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerJobStats;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameIndexerTransformStats extends IndexerJobStats {
public static final String DEFAULT_TRANSFORM_ID = "_all";

private static final String DEFAULT_TRANSFORM_ID = "_all"; // TODO remove when no longer needed for wire BWC

public static final String NAME = "data_frame_indexer_transform_stats";
public static ParseField NUM_PAGES = new ParseField("pages_processed");
Expand All @@ -39,12 +38,11 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {

private static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true,
args -> new DataFrameIndexerTransformStats(args[0] != null ? (String) args[0] : DEFAULT_TRANSFORM_ID,
(long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7],
(long) args[8], (long) args[9], (long) args[10]));
args -> new DataFrameIndexerTransformStats(
(long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6],
(long) args[7], (long) args[8], (long) args[9]));

static {
LENIENT_PARSER.declareString(optionalConstructorArg(), DataFrameField.ID);
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
Expand All @@ -57,60 +55,38 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

private final String transformId;

/**
* Certain situations call for a default transform ID, e.g. when merging many different transforms for statistics gather.
*
* The returned stats object cannot be stored in the index as the transformId does not refer to a real transform configuration
*
* @return new DataFrameIndexerTransformStats with empty stats and a default transform ID
* Create with all stats set to zero
*/
public static DataFrameIndexerTransformStats withDefaultTransformId() {
return new DataFrameIndexerTransformStats(DEFAULT_TRANSFORM_ID);
}

public static DataFrameIndexerTransformStats withDefaultTransformId(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures,
long searchFailures) {
return new DataFrameIndexerTransformStats(DEFAULT_TRANSFORM_ID, numPages, numInputDocuments,
numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
}

public DataFrameIndexerTransformStats(String transformId) {
public DataFrameIndexerTransformStats() {
super();
this.transformId = Objects.requireNonNull(transformId, "parameter transformId must not be null");
}

public DataFrameIndexerTransformStats(String transformId, long numPages, long numInputDocuments, long numOutputDocuments,
public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
this.transformId = Objects.requireNonNull(transformId, "parameter transformId must not be null");
}

public DataFrameIndexerTransformStats(DataFrameIndexerTransformStats other) {
this(other.transformId, other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures);
}

public DataFrameIndexerTransformStats(StreamInput in) throws IOException {
super(in);
transformId = in.readString();
if (in.getVersion().before(Version.V_8_0_0)) { // TODO change to 7.4.0 after backport
in.readString(); // was transformId
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(transformId);
}

@Nullable
public String getTransformId() {
return transformId;
if (out.getVersion().before(Version.V_8_0_0)) { // TODO change to 7.4.0 after backport
out.writeString(DEFAULT_TRANSFORM_ID);
}
}

@Override
Expand All @@ -126,21 +102,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
// If we are storing something, it should have a valid transform ID.
if (transformId.equals(DEFAULT_TRANSFORM_ID)) {
throw new IllegalArgumentException("when storing transform statistics, a valid transform id must be provided");
}
builder.field(DataFrameField.ID.getPreferredName(), transformId);
}
builder.endObject();
return builder;
}

public DataFrameIndexerTransformStats merge(DataFrameIndexerTransformStats other) {
// We should probably not merge two sets of stats unless one is an accumulation object (i.e. with the default transform id)
// or the stats are referencing the same transform
assert transformId.equals(DEFAULT_TRANSFORM_ID) || this.transformId.equals(other.transformId);
numPages += other.numPages;
numInputDocuments += other.numInputDocuments;
numOuputDocuments += other.numOuputDocuments;
Expand All @@ -167,8 +133,7 @@ public boolean equals(Object other) {

DataFrameIndexerTransformStats that = (DataFrameIndexerTransformStats) other;

return Objects.equals(this.transformId, that.transformId)
&& Objects.equals(this.numPages, that.numPages)
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
Expand All @@ -182,7 +147,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
return Objects.hash(transformId, numPages, numInputDocuments, numOuputDocuments, numInvocations,
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static DataFrameTransformStats fromXContent(XContentParser parser) throws
}

public static DataFrameTransformStats initialStats(String id) {
return stoppedStats(id, new DataFrameIndexerTransformStats(id));
return stoppedStats(id, new DataFrameIndexerTransformStats());
}

public static DataFrameTransformStats stoppedStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,13 @@
package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;

import java.io.IOException;
import java.util.Collections;

public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTestCase<DataFrameIndexerTransformStats> {

protected static ToXContent.Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));

@Override
protected DataFrameIndexerTransformStats createTestInstance() {
return randomStats();
Expand All @@ -36,36 +30,26 @@ protected DataFrameIndexerTransformStats doParseInstance(XContentParser parser)
}

public static DataFrameIndexerTransformStats randomStats() {
return randomStats(randomAlphaOfLength(10));
}

public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
}

@Override
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}

public void testMerge() throws IOException {
String transformId = randomAlphaOfLength(10);
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats(transformId);
DataFrameIndexerTransformStats randomStats = randomStats(transformId);
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats randomStats = randomStats();

assertEquals(randomStats, emptyStats.merge(randomStats));
assertEquals(randomStats, randomStats.merge(emptyStats));

DataFrameIndexerTransformStats randomStatsClone = copyInstance(randomStats);

DataFrameIndexerTransformStats trippleRandomStats = new DataFrameIndexerTransformStats(transformId, 3 * randomStats.getNumPages(),
DataFrameIndexerTransformStats tripleRandomStats = new DataFrameIndexerTransformStats(3 * randomStats.getNumPages(),
3 * randomStats.getNumDocuments(), 3 * randomStats.getOutputDocuments(), 3 * randomStats.getNumInvocations(),
3 * randomStats.getIndexTime(), 3 * randomStats.getSearchTime(), 3 * randomStats.getIndexTotal(),
3 * randomStats.getSearchTotal(), 3 * randomStats.getIndexFailures(), 3 * randomStats.getSearchFailures());

assertEquals(trippleRandomStats, randomStats.merge(randomStatsClone).merge(randomStatsClone));
assertEquals(tripleRandomStats, randomStats.merge(randomStatsClone).merge(randomStatsClone));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static DataFrameTransformStats randomDataFrameTransformStats() {
randomFrom(DataFrameTransformTaskState.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
DataFrameIndexerTransformStatsTests.randomStats(DataFrameIndexerTransformStats.DEFAULT_TRANSFORM_ID),
DataFrameIndexerTransformStatsTests.randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DataFrameTransformStoredDocTests extends AbstractSerializingDataFra
public static DataFrameTransformStoredDoc randomDataFrameTransformStoredDoc(String id) {
return new DataFrameTransformStoredDoc(id,
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats(id));
DataFrameIndexerTransformStatsTests.randomStats());
}

public static DataFrameTransformStoredDoc randomDataFrameTransformStoredDoc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo
statisticsList.add(0L);
}
}
return DataFrameIndexerTransformStats.withDefaultTransformId(statisticsList.get(0), // numPages
return new DataFrameIndexerTransformStats(statisticsList.get(0), // numPages
statisticsList.get(1), // numInputDocuments
statisticsList.get(2), // numOutputDocuments
statisticsList.get(3), // numInvocations
Expand Down Expand Up @@ -130,7 +130,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
},
failure -> {
if (failure instanceof ResourceNotFoundException) {
statsListener.onResponse(DataFrameIndexerTransformStats.withDefaultTransformId());
statsListener.onResponse(new DataFrameIndexerTransformStats());
} else {
statsListener.onFailure(failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
ActionListener<XPackUsageFeatureResponse> listener) {
boolean available = licenseState.isDataFrameAllowed();
if (enabled == false) {
var usage = new DataFrameFeatureSetUsage(available, enabled, Collections.emptyMap(),
DataFrameIndexerTransformStats.withDefaultTransformId());
var usage = new DataFrameFeatureSetUsage(available, enabled, Collections.emptyMap(), new DataFrameIndexerTransformStats());
listener.onResponse(new XPackUsageFeatureResponse(usage));
return;
}
Expand Down Expand Up @@ -99,7 +98,7 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
long totalTransforms = transformCountSuccess.getHits().getTotalHits().value;
if (totalTransforms == 0) {
var usage = new DataFrameFeatureSetUsage(available, enabled, transformsCountByState,
DataFrameIndexerTransformStats.withDefaultTransformId());
new DataFrameIndexerTransformStats());
listener.onResponse(new XPackUsageFeatureResponse(usage));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void getPreview(Pivot pivot,
r -> {
try {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
// remove all internal fields

if (pipeline == null) {
Expand Down
Loading