Skip to content

[ML][Data Frame] Add deduced mappings to _preview response payload #43742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,32 @@
public class PreviewDataFrameTransformResponse {

private static final String PREVIEW = "preview";
private static final String MAPPINGS = "mappings";

@SuppressWarnings("unchecked")
public static PreviewDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
Object previewDocs = parser.map().get(PREVIEW);
return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs);
Map<String, Object> previewMap = parser.mapOrdered();
Object previewDocs = previewMap.get(PREVIEW);
Object mappings = previewMap.get(MAPPINGS);
return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs, (Map<String, Object>) mappings);
}

private List<Map<String, Object>> docs;
private Map<String, Object> mappings;

public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs) {
public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs, Map<String, Object> mappings) {
this.docs = docs;
this.mappings = mappings;
}

public List<Map<String, Object>> getDocs() {
return docs;
}

public Map<String, Object> getMappings() {
return mappings;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand All @@ -57,12 +66,12 @@ public boolean equals(Object obj) {
}

PreviewDataFrameTransformResponse other = (PreviewDataFrameTransformResponse) obj;
return Objects.equals(other.docs, docs);
return Objects.equals(other.docs, docs) && Objects.equals(other.mappings, mappings);
}

@Override
public int hashCode() {
return Objects.hashCode(docs);
return Objects.hash(docs, mappings);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
Expand Down Expand Up @@ -277,6 +278,7 @@ public void testStartStop() throws IOException {
assertThat(taskState, is(DataFrameTransformTaskState.STOPPED));
}

@SuppressWarnings("unchecked")
public void testPreview() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);
Expand All @@ -298,6 +300,12 @@ public void testPreview() throws IOException {
Optional<Map<String, Object>> michel = docs.stream().filter(doc -> "michel".equals(doc.get("reviewer"))).findFirst();
assertTrue(michel.isPresent());
assertEquals(3.6d, (double) michel.get().get("avg_rating"), 0.1d);

Map<String, Object> mappings = preview.getMappings();
assertThat(mappings, hasKey("properties"));
Map<String, Object> fields = (Map<String, Object>)mappings.get("properties");
assertThat(fields.get("reviewer"), equalTo(Map.of("type", "keyword")));
assertThat(fields.get("avg_rating"), equalTo(Map.of("type", "double")));
}

private DataFrameTransformConfig validDataFrameTransformConfig(String id, String source, String destination) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ private PreviewDataFrameTransformResponse createTestInstance() {
}
docs.add(doc);
}
int numMappingEntries = randomIntBetween(5, 10);
Map<String, Object> mappings = new HashMap<>(numMappingEntries);
for (int i = 0; i < numMappingEntries; i++) {
mappings.put(randomAlphaOfLength(10), Map.of("type", randomAlphaOfLength(10)));
}

return new PreviewDataFrameTransformResponse(docs);
return new PreviewDataFrameTransformResponse(docs, mappings);
}

private void toXContent(PreviewDataFrameTransformResponse response, XContentBuilder builder) throws IOException {
Expand All @@ -64,6 +69,7 @@ private void toXContent(PreviewDataFrameTransformResponse response, XContentBuil
builder.map(doc);
}
builder.endArray();
builder.field("mappings", response.getMappings());
builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ public void testPreview() throws IOException, InterruptedException {
// end::preview-data-frame-transform-execute

assertNotNull(response.getDocs());
assertNotNull(response.getMappings());
}
{
// tag::preview-data-frame-transform-execute-listener
Expand Down
12 changes: 11 additions & 1 deletion docs/reference/data-frames/apis/preview-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@ The data that is returned for this example is as follows:
"customer_id" : "12"
}
...
]
],
"mappings": {
"properties": {
"max_price": {
"type": "double"
},
"customer_id": {
"type": "keyword"
}
}
}
}
----
// NOTCONSOLE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -137,11 +138,14 @@ public boolean equals(Object obj) {
public static class Response extends ActionResponse implements ToXContentObject {

private List<Map<String, Object>> docs;
private Map<String, Object> mappings;
public static ParseField PREVIEW = new ParseField("preview");
public static ParseField MAPPINGS = new ParseField("mappings");

static ObjectParser<Response, Void> PARSER = new ObjectParser<>("data_frame_transform_preview", Response::new);
static {
PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), PREVIEW);
PARSER.declareObject(Response::setMappings, (p, c) -> p.mapOrdered(), MAPPINGS);
}
public Response() {}

Expand All @@ -151,6 +155,10 @@ public Response(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
this.docs.add(in.readMap());
}
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
Map<String, Object> objectMap = in.readMap();
this.mappings = objectMap == null ? null : Map.copyOf(objectMap);
}
}

public Response(List<Map<String, Object>> docs) {
Expand All @@ -161,18 +169,56 @@ public void setDocs(List<Map<String, Object>> docs) {
this.docs = new ArrayList<>(docs);
}

public void setMappings(Map<String, Object> mappings) {
this.mappings = Map.copyOf(mappings);
}

/**
* This takes the a {@code Map<String, String>} of the type "fieldname: fieldtype" and transforms it into the
* typical mapping format.
*
* Example:
*
* input:
* {"field1.subField1": "long", "field2": "keyword"}
*
* output:
* {
* "properties": {
* "field1.subField1": {
* "type": "long"
* },
* "field2": {
* "type": "keyword"
* }
* }
* }
* @param mappings A Map of the form {"fieldName": "fieldType"}
*/
public void setMappingsFromStringMap(Map<String, String> mappings) {
Map<String, Object> fieldMappings = new HashMap<>();
mappings.forEach((k, v) -> fieldMappings.put(k, Map.of("type", v)));
this.mappings = Map.of("properties", fieldMappings);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docs.size());
for (Map<String, Object> doc : docs) {
out.writeMapWithConsistentOrder(doc);
}
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeMap(mappings);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems mappings can be null, so would it make sense to use optional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmuhs writeMap and readMap when it is just a Map<String,Object> handles null values internally.

There are also no BWC tests yet...still waiting on PR: #43506

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok checked it, implicit magic, well...

}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PREVIEW.getPreferredName(), docs);
if (mappings != null) {
builder.field(MAPPINGS.getPreferredName(), mappings);
}
builder.endObject();
return builder;
}
Expand All @@ -188,12 +234,12 @@ public boolean equals(Object obj) {
}

Response other = (Response) obj;
return Objects.equals(other.docs, docs);
return Objects.equals(other.docs, docs) && Objects.equals(other.mappings, mappings);
}

@Override
public int hashCode() {
return Objects.hashCode(docs);
return Objects.hash(docs, mappings);
}

public static Response fromXContent(final XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,28 @@ protected Response createTestInstance() {
int size = randomIntBetween(0, 10);
List<Map<String, Object>> data = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Map<String, Object> datum = new HashMap<>();
Map<String, Object> entry = new HashMap<>();
entry.put("value1", randomIntBetween(1, 100));
datum.put(randomAlphaOfLength(10), entry);
data.add(datum);
data.add(Map.of(randomAlphaOfLength(10), Map.of("value1", randomIntBetween(1, 100))));
}
return new Response(data);

Response response = new Response(data);
if (randomBoolean()) {
size = randomIntBetween(0, 10);
if (randomBoolean()) {
Map<String, Object> mappings = new HashMap<>(size);
for (int i = 0; i < size; i++) {
mappings.put(randomAlphaOfLength(10), Map.of("type", randomAlphaOfLength(10)));
}
response.setMappings(mappings);
} else {
Map<String, String> mappings = new HashMap<>(size);
for (int i = 0; i < size; i++) {
mappings.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
}
response.setMappingsFromStringMap(mappings);
}
}

return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,16 @@ protected void doExecute(Task task,

Pivot pivot = new Pivot(config.getPivotConfig());

getPreview(pivot,
config.getSource(),
config.getDestination().getPipeline(),
config.getDestination().getIndex(),
ActionListener.wrap(
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
error -> {
logger.error("Failure gathering preview", error);
listener.onFailure(error);
}
));
getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener);
}

@SuppressWarnings("unchecked")
private void getPreview(Pivot pivot,
SourceConfig source,
String pipeline,
String dest,
ActionListener<List<Map<String, Object>>> listener) {
ActionListener<PreviewDataFrameTransformAction.Response> listener) {
final PreviewDataFrameTransformAction.Response previewResponse = new PreviewDataFrameTransformAction.Response();
ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(
simulatePipelineResponse -> {
List<Map<String, Object>> response = new ArrayList<>(simulatePipelineResponse.getResults().size());
Expand All @@ -135,12 +126,14 @@ private void getPreview(Pivot pivot,
response.add((Map<String, Object>)XContentMapValues.extractValue("doc._source", tempMap));
}
}
listener.onResponse(response);
previewResponse.setDocs(response);
listener.onResponse(previewResponse);
},
listener::onFailure
);
pivot.deduceMappings(client, source, ActionListener.wrap(
deducedMappings -> {
previewResponse.setMappingsFromStringMap(deducedMappings);
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
ClientHelper.DATA_FRAME_ORIGIN,
client,
Expand All @@ -157,7 +150,8 @@ private void getPreview(Pivot pivot,
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_")))
.collect(Collectors.toList());
listener.onResponse(results);
previewResponse.setDocs(results);
listener.onResponse(previewResponse);
} else {
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.map(doc -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ setup:
- match: { preview.2.avg_response: 42.0 }
- match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
- match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
- match: { mappings.properties.airline.type: "keyword" }
- match: { mappings.properties.by-hour.type: "date" }
- match: { mappings.properties.avg_response.type: "double" }
- match: { mappings.properties.time\.max.type: "date" }
- match: { mappings.properties.time\.min.type: "date" }

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -141,6 +146,9 @@ setup:
- match: { preview.2.by-hour: 1487379600000 }
- match: { preview.2.avg_response: 42.0 }
- match: { preview.2.my_field: 42 }
- match: { mappings.properties.airline.type: "keyword" }
- match: { mappings.properties.by-hour.type: "date" }
- match: { mappings.properties.avg_response.type: "double" }

---
"Test preview transform with invalid config":
Expand Down