diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java index 352cbfb67fcde..186c67bf42ce2 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java @@ -43,6 +43,7 @@ public class DataFrameTransformState { private static final ParseField CHECKPOINT = new ParseField("checkpoint"); private static final ParseField REASON = new ParseField("reason"); private static final ParseField PROGRESS = new ParseField("progress"); + private static final ParseField NODE = new ParseField("node"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = @@ -52,7 +53,8 @@ public class DataFrameTransformState { (Map) args[2], (long) args[3], (String) args[4], - (DataFrameTransformProgress) args[5])); + (DataFrameTransformProgress) args[5], + (NodeAttributes) args[6])); static { PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); @@ -61,6 +63,7 @@ public class DataFrameTransformState { PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); } public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException { @@ -73,19 +76,22 @@ public static DataFrameTransformState fromXContent(XContentParser parser) throws private final Map currentPosition; private final String reason; private final DataFrameTransformProgress progress; + private final NodeAttributes node; public DataFrameTransformState(DataFrameTransformTaskState taskState, IndexerState indexerState, @Nullable Map position, long checkpoint, @Nullable String reason, - @Nullable DataFrameTransformProgress progress) { + @Nullable DataFrameTransformProgress progress, + @Nullable NodeAttributes node) { this.taskState = taskState; this.indexerState = indexerState; this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position)); this.checkpoint = checkpoint; this.reason = reason; this.progress = progress; + this.node = node; } public IndexerState getIndexerState() { @@ -115,6 +121,11 @@ public DataFrameTransformProgress getProgress() { return progress; } + @Nullable + public NodeAttributes getNode() { + return node; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -132,12 +143,13 @@ public boolean equals(Object other) { Objects.equals(this.currentPosition, that.currentPosition) && Objects.equals(this.progress, that.progress) && this.checkpoint == that.checkpoint && + Objects.equals(this.node, that.node) && Objects.equals(this.reason, that.reason); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress); + return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/NodeAttributes.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/NodeAttributes.java new file mode 100644 index 0000000000000..85c2b9644c2fd --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/NodeAttributes.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * A Pojo class containing an Elastic Node's attributes + */ +public class NodeAttributes implements ToXContentObject { + + public static final ParseField ID = new ParseField("id"); + public static final ParseField NAME = new ParseField("name"); + public static final ParseField EPHEMERAL_ID = new ParseField("ephemeral_id"); + public static final ParseField TRANSPORT_ADDRESS = new ParseField("transport_address"); + public static final ParseField ATTRIBUTES = new ParseField("attributes"); + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("node", true, + (a) -> { + int i = 0; + String id = (String) a[i++]; + String name = (String) a[i++]; + String ephemeralId = (String) a[i++]; + String transportAddress = (String) a[i++]; + Map attributes = (Map) a[i]; + return new NodeAttributes(id, name, ephemeralId, transportAddress, attributes); + }); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME); + PARSER.declareString(ConstructingObjectParser.constructorArg(), EPHEMERAL_ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TRANSPORT_ADDRESS); + PARSER.declareField(ConstructingObjectParser.constructorArg(), + (p, c) -> p.mapStrings(), + ATTRIBUTES, + ObjectParser.ValueType.OBJECT); + } + + private final String id; + private final String name; + private final String ephemeralId; + private final String transportAddress; + private final Map attributes; + + public NodeAttributes(String id, String name, String ephemeralId, String transportAddress, Map attributes) { + this.id = id; + this.name = name; + this.ephemeralId = ephemeralId; + this.transportAddress = transportAddress; + this.attributes = Collections.unmodifiableMap(attributes); + } + + /** + * The unique identifier of the node. + */ + public String getId() { + return id; + } + + /** + * The node name. + */ + public String getName() { + return name; + } + + /** + * The ephemeral id of the node. + */ + public String getEphemeralId() { + return ephemeralId; + } + + /** + * The host and port where transport HTTP connections are accepted. + */ + public String getTransportAddress() { + return transportAddress; + } + + /** + * Additional attributes related to this node + */ + public Map getAttributes() { + return attributes; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ID.getPreferredName(), id); + builder.field(NAME.getPreferredName(), name); + builder.field(EPHEMERAL_ID.getPreferredName(), ephemeralId); + builder.field(TRANSPORT_ADDRESS.getPreferredName(), transportAddress); + builder.field(ATTRIBUTES.getPreferredName(), attributes); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id, name, ephemeralId, transportAddress, attributes); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + NodeAttributes that = (NodeAttributes) other; + return Objects.equals(id, that.id) && + Objects.equals(name, that.name) && + Objects.equals(ephemeralId, that.ephemeralId) && + Objects.equals(transportAddress, that.transportAddress) && + Objects.equals(attributes, that.attributes); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java index 4ada50c20d219..ebb62890c3cdd 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.dataframe.transforms; import org.elasticsearch.client.core.IndexerState; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; @@ -37,7 +38,8 @@ public void testFromXContent() throws IOException { DataFrameTransformStateTests::toXContent, DataFrameTransformState::fromXContent) .supportsUnknownFields(true) - .randomFieldsExcludeFilter(field -> field.equals("current_position")) + .randomFieldsExcludeFilter(field -> field.equals("current_position") || + field.equals("node.attributes")) .test(); } @@ -47,7 +49,8 @@ public static DataFrameTransformState randomDataFrameTransformState() { randomPositionMap(), randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), - randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance()); + randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(), + randomBoolean() ? null : NodeAttributesTests.createRandom()); } public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException { @@ -65,6 +68,10 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui builder.field("progress"); DataFrameTransformProgressTests.toXContent(state.getProgress(), builder); } + if (state.getNode() != null) { + builder.field("node"); + state.getNode().toXContent(builder, ToXContent.EMPTY_PARAMS); + } builder.endObject(); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/NodeAttributesTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/NodeAttributesTests.java new file mode 100644 index 0000000000000..661aa9f7a30a4 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/NodeAttributesTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +public class NodeAttributesTests extends AbstractXContentTestCase { + + public static NodeAttributes createRandom() { + int numberOfAttributes = randomIntBetween(1, 10); + Map attributes = new HashMap<>(numberOfAttributes); + for(int i = 0; i < numberOfAttributes; i++) { + String val = randomAlphaOfLength(10); + attributes.put("key-"+i, val); + } + return new NodeAttributes(randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + attributes); + } + + @Override + protected NodeAttributes createTestInstance() { + return createRandom(); + } + + @Override + protected NodeAttributes doParseInstance(XContentParser parser) throws IOException { + return NodeAttributes.PARSER.parse(parser, null); + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return field -> !field.isEmpty(); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java index ad08881fb5641..dde44898bf90b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java @@ -64,7 +64,7 @@ protected boolean supportsUnknownFields() { @Override protected Predicate getRandomFieldsExcludeFilter() { - return field -> field.equals("state.current_position"); + return field -> field.equals("state.current_position") || field.equals("state.node") || field.equals("state.node.attributes"); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java index 4c80365bc539a..b97e0a72c1fa2 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes; import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; @@ -40,8 +41,20 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase getRandomFieldsExcludeFilter() { - return field -> field.equals("current_position"); + return field -> field.equals("current_position") || field.equals("node.attributes"); } public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) { @@ -97,6 +110,20 @@ public static DataFrameTransformProgress randomDataFrameTransformProgress() { return new DataFrameTransformProgress(totalDocs, remainingDocs); } + public static NodeAttributes randomNodeAttributes() { + int numberOfAttributes = randomIntBetween(1, 10); + Map attributes = new HashMap<>(numberOfAttributes); + for(int i = 0; i < numberOfAttributes; i++) { + String val = randomAlphaOfLength(10); + attributes.put("key-"+i, val); + } + return new NodeAttributes(randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + attributes); + } + public static DataFrameIndexerTransformStats randomStats(String transformId) { return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), @@ -110,7 +137,8 @@ public static DataFrameTransformState randomDataFrameTransformState() { randomPosition(), randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), - randomBoolean() ? null : randomDataFrameTransformProgress()); + randomBoolean() ? null : randomDataFrameTransformProgress(), + randomBoolean() ? null : randomNodeAttributes()); } private static Map randomPosition() { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 213bb7a02e211..731d42f902c50 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -45,6 +45,7 @@ import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; +import org.elasticsearch.client.dataframe.transforms.NodeAttributes; import org.elasticsearch.client.dataframe.transforms.QueryConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig; @@ -533,6 +534,8 @@ public void testGetStats() throws IOException, InterruptedException { stateAndStats.getTransformStats(); // <4> DataFrameTransformProgress progress = stateAndStats.getTransformState().getProgress(); // <5> + NodeAttributes node = + stateAndStats.getTransformState().getNode(); // <6> // end::get-data-frame-transform-stats-response assertEquals(IndexerState.STOPPED, indexerState); diff --git a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc index 4360157b4a445..8a3e1a96acb14 100644 --- a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc +++ b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc @@ -52,4 +52,5 @@ include-tagged::{doc-tests-file}[{api}-response] <3> The running state of the transform indexer e.g `started`, `indexing`, etc. <4> The overall transform statistics recording the number of documents indexed etc. <5> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint -and the total number of docs expected. \ No newline at end of file +and the total number of docs expected. +<6> The assigned node information if the task is currently assigned to a node and running. \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java index d4480caa0b9a4..2c3ad36d6849a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -41,6 +42,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState private final Map currentPosition; @Nullable private final String reason; + @Nullable + private NodeAttributes node; public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -48,6 +51,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public static final ParseField CHECKPOINT = new ParseField("checkpoint"); public static final ParseField REASON = new ParseField("reason"); public static final ParseField PROGRESS = new ParseField("progress"); + public static final ParseField NODE = new ParseField("node"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, @@ -57,7 +61,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState (Map) args[2], (long) args[3], (String) args[4], - (DataFrameTransformProgress) args[5])); + (DataFrameTransformProgress) args[5], + (NodeAttributes) args[6])); static { PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); @@ -66,6 +71,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT); PARSER.declareString(optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); } public DataFrameTransformState(DataFrameTransformTaskState taskState, @@ -73,13 +79,24 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState, @Nullable Map position, long checkpoint, @Nullable String reason, - @Nullable DataFrameTransformProgress progress) { + @Nullable DataFrameTransformProgress progress, + @Nullable NodeAttributes node) { this.taskState = taskState; this.indexerState = indexerState; this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position)); this.checkpoint = checkpoint; this.reason = reason; this.progress = progress; + this.node = node; + } + + public DataFrameTransformState(DataFrameTransformTaskState taskState, + IndexerState indexerState, + @Nullable Map position, + long checkpoint, + @Nullable String reason, + @Nullable DataFrameTransformProgress progress) { + this(taskState, indexerState, position, checkpoint, reason, progress, null); } public DataFrameTransformState(StreamInput in) throws IOException { @@ -90,6 +107,11 @@ public DataFrameTransformState(StreamInput in) throws IOException { checkpoint = in.readLong(); reason = in.readOptionalString(); progress = in.readOptionalWriteable(DataFrameTransformProgress::new); + if (in.getVersion().onOrAfter(Version.V_7_3_0)) { + node = in.readOptionalWriteable(NodeAttributes::new); + } else { + node = null; + } } public DataFrameTransformTaskState getTaskState() { @@ -125,6 +147,15 @@ public String getReason() { return reason; } + public NodeAttributes getNode() { + return node; + } + + public DataFrameTransformState setNode(NodeAttributes node) { + this.node = node; + return this; + } + public static DataFrameTransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -148,6 +179,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (progress != null) { builder.field(PROGRESS.getPreferredName(), progress); } + if (node != null) { + builder.field(NODE.getPreferredName(), node); + } builder.endObject(); return builder; } @@ -165,6 +199,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(checkpoint); out.writeOptionalString(reason); out.writeOptionalWriteable(progress); + if (out.getVersion().onOrAfter(Version.V_7_3_0)) { + out.writeOptionalWriteable(node); + } } @Override @@ -184,12 +221,13 @@ public boolean equals(Object other) { Objects.equals(this.currentPosition, that.currentPosition) && this.checkpoint == that.checkpoint && Objects.equals(this.reason, that.reason) && - Objects.equals(this.progress, that.progress); + Objects.equals(this.progress, that.progress) && + Objects.equals(this.node, that.node); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress); + return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/NodeAttributes.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/NodeAttributes.java new file mode 100644 index 0000000000000..76b40bd23778a --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/NodeAttributes.java @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * A Pojo class containing an Elastic Node's attributes + */ +public class NodeAttributes implements ToXContentObject, Writeable { + + public static final ParseField ID = new ParseField("id"); + public static final ParseField NAME = new ParseField("name"); + public static final ParseField EPHEMERAL_ID = new ParseField("ephemeral_id"); + public static final ParseField TRANSPORT_ADDRESS = new ParseField("transport_address"); + public static final ParseField ATTRIBUTES = new ParseField("attributes"); + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("node", true, + (a) -> { + int i = 0; + String id = (String) a[i++]; + String name = (String) a[i++]; + String ephemeralId = (String) a[i++]; + String transportAddress = (String) a[i++]; + Map attributes = (Map) a[i]; + return new NodeAttributes(id, name, ephemeralId, transportAddress, attributes); + }); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME); + PARSER.declareString(ConstructingObjectParser.constructorArg(), EPHEMERAL_ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TRANSPORT_ADDRESS); + PARSER.declareField(ConstructingObjectParser.constructorArg(), + (p, c) -> p.mapStrings(), + ATTRIBUTES, + ObjectParser.ValueType.OBJECT); + } + + private final String id; + private final String name; + private final String ephemeralId; + private final String transportAddress; + private final Map attributes; + + public static NodeAttributes fromDiscoveryNode(DiscoveryNode node) { + return new NodeAttributes(node.getId(), + node.getName(), + node.getEphemeralId(), + node.getAddress().toString(), + // TODO add data_frame attributes when/if they are added + Collections.emptyMap()); + } + + public NodeAttributes(String id, String name, String ephemeralId, String transportAddress, Map attributes) { + this.id = ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); + this.name = ExceptionsHelper.requireNonNull(name, NAME.getPreferredName()); + this.ephemeralId = ExceptionsHelper.requireNonNull(ephemeralId, EPHEMERAL_ID.getPreferredName()); + this.transportAddress = ExceptionsHelper.requireNonNull(transportAddress, TRANSPORT_ADDRESS.getPreferredName()); + this.attributes = Collections.unmodifiableMap(ExceptionsHelper.requireNonNull(attributes, ATTRIBUTES.getPreferredName())); + } + + public NodeAttributes(StreamInput in) throws IOException { + this.id = in.readString(); + this.name = in.readString(); + this.ephemeralId = in.readString(); + this.transportAddress = in.readString(); + this.attributes = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); + } + + /** + * The unique identifier of the node. + */ + public String getId() { + return id; + } + + /** + * The node name. + */ + public String getName() { + return name; + } + + /** + * The ephemeral id of the node. + */ + public String getEphemeralId() { + return ephemeralId; + } + + /** + * The host and port where transport HTTP connections are accepted. + */ + public String getTransportAddress() { + return transportAddress; + } + + public Map getAttributes() { + return attributes; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ID.getPreferredName(), id); + builder.field(NAME.getPreferredName(), name); + builder.field(EPHEMERAL_ID.getPreferredName(), ephemeralId); + builder.field(TRANSPORT_ADDRESS.getPreferredName(), transportAddress); + builder.field(ATTRIBUTES.getPreferredName(), attributes); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id, name, ephemeralId, transportAddress, attributes); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + NodeAttributes that = (NodeAttributes) other; + return Objects.equals(id, that.id) && + Objects.equals(name, that.name) && + Objects.equals(ephemeralId, that.ephemeralId) && + Objects.equals(transportAddress, that.transportAddress) && + Objects.equals(attributes, that.attributes); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeString(name); + out.writeString(ephemeralId); + out.writeString(transportAddress); + out.writeMap(attributes, StreamOutput::writeString, StreamOutput::writeString); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java index c978978b0589b..9f4ac546c89a4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java @@ -17,6 +17,7 @@ import java.util.function.Predicate; import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress; +import static org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributeTests.randomNodeAttributes; public class DataFrameTransformStateTests extends AbstractSerializingTestCase { @@ -26,7 +27,8 @@ public static DataFrameTransformState randomDataFrameTransformState() { randomPosition(), randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), - randomBoolean() ? null : randomDataFrameTransformProgress()); + randomBoolean() ? null : randomDataFrameTransformProgress(), + randomBoolean() ? null : randomNodeAttributes()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/NodeAttributeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/NodeAttributeTests.java new file mode 100644 index 0000000000000..fdc7692c412cb --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/NodeAttributeTests.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.function.Predicate; + + +public class NodeAttributeTests extends AbstractSerializingTestCase { + + public static NodeAttributes randomNodeAttributes() { + return new NodeAttributes(randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10))); + } + + @Override + protected NodeAttributes doParseInstance(XContentParser parser) throws IOException { + return NodeAttributes.PARSER.apply(parser, null); + } + + @Override + protected NodeAttributes createTestInstance() { + return randomNodeAttributes(); + } + + @Override + protected Reader instanceReader() { + return NodeAttributes::new; + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return field -> field.equals("attributes"); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index 6b578d040d2fd..99b5704c7fd53 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -29,6 +30,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; @@ -110,15 +112,23 @@ protected void doExecute(Task task, Request request, ActionListener fi request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { request.setExpandedIds(hitsAndIds.v2()); - request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state())); + final ClusterState state = clusterService.state(); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state)); super.doExecute(task, request, ActionListener.wrap( - response -> collectStatsForTransformsWithoutTasks(request, response, ActionListener.wrap( - finalResponse -> finalListener.onResponse(new Response(finalResponse.getTransformsStateAndStats(), - hitsAndIds.v1(), - finalResponse.getTaskFailures(), - finalResponse.getNodeFailures())), - finalListener::onFailure - )), + response -> { + PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasksInProgress != null) { + // Mutates underlying state object with the assigned node attributes + response.getTransformsStateAndStats().forEach(dtsas -> setNodeAttributes(dtsas, tasksInProgress, state)); + } + collectStatsForTransformsWithoutTasks(request, response, ActionListener.wrap( + finalResponse -> finalListener.onResponse(new Response(finalResponse.getTransformsStateAndStats(), + hitsAndIds.v1(), + finalResponse.getTaskFailures(), + finalResponse.getNodeFailures())), + finalListener::onFailure + )); + }, finalListener::onFailure )); }, @@ -133,6 +143,16 @@ protected void doExecute(Task task, Request request, ActionListener fi )); } + private static void setNodeAttributes(DataFrameTransformStateAndStats dataFrameTransformStateAndStats, + PersistentTasksCustomMetaData persistentTasksCustomMetaData, + ClusterState state) { + var pTask = persistentTasksCustomMetaData.getTask(dataFrameTransformStateAndStats.getTransformId()); + if (pTask != null) { + dataFrameTransformStateAndStats.getTransformState() + .setNode(NodeAttributes.fromDiscoveryNode(state.nodes().get(pTask.getExecutorNode()))); + } + } + private void collectStatsForTransformsWithoutTasks(Request request, Response response, ActionListener listener) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 616befdb494ec..a6066aceb969b 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -209,3 +209,56 @@ teardown: - match: { transforms.0.stats.search_time_in_ms: 0 } - match: { transforms.0.stats.search_total: 0 } - match: { transforms.0.stats.search_failures: 0 } +--- +"Test get continuous transform stats": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-stats-continuous" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline-stats-continuous" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { "time": { "field": "time", "delay": "1m" } } + } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-stats-continuous" + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "airline-transform-stats-continuous" + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform-stats-continuous" } + - match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" } + # Since this is continuous, there is no worry of it automatically stopping + - match: { transforms.0.state.task_state: "started" } + - lte: { transforms.0.state.checkpoint: 1 } + # Since this is continuous, and _start does not return until it is assigned + # we should see a node assignment + - is_true: transforms.0.state.node + - is_true: transforms.0.state.node.id + - is_true: transforms.0.state.node.name + - is_true: transforms.0.state.node.ephemeral_id + - is_true: transforms.0.state.node.transport_address + - lte: { transforms.0.stats.pages_processed: 1 } + - match: { transforms.0.stats.documents_processed: 0 } + - match: { transforms.0.stats.documents_indexed: 0 } + - lte: { transforms.0.stats.trigger_count: 1 } + - match: { transforms.0.stats.index_time_in_ms: 0 } + - match: { transforms.0.stats.index_total: 0 } + - match: { transforms.0.stats.index_failures: 0 } + - gte: { transforms.0.stats.search_time_in_ms: 0 } + - lte: { transforms.0.stats.search_total: 1 } + - match: { transforms.0.stats.search_failures: 0 } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "airline-transform-stats-continuous" + wait_for_completion: true + + - do: + data_frame.delete_data_frame_transform: + transform_id: "airline-transform-stats-continuous"