Skip to content

Fix Rollup job creation to work with templates #43943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.rollup;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -46,7 +45,6 @@
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.rollup.action.TransportDeleteRollupJobAction;
import org.elasticsearch.xpack.rollup.action.TransportGetRollupCapsAction;
import org.elasticsearch.xpack.rollup.action.TransportGetRollupIndexCapsAction;
Expand All @@ -73,7 +71,6 @@
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static java.util.Collections.emptyList;

Expand All @@ -89,20 +86,12 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
public static final String SCHEDULE_THREAD_POOL_NAME = RollupField.NAME + "_scheduler";

public static final String MAPPING_METADATA_PLACEHOLDER = "\"ROLLUP_METADATA_PLACEHOLDER\":\"ROLLUP_METADATA_PLACEHOLDER\"";
public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";
public static final String ROLLUP_TEMPLATE_VERSION_PATTERN =
Pattern.quote("${rollup.dynamic_template.version}");

private static final String ROLLUP_TEMPLATE_NAME = "/rollup-dynamic-template.json";
public static final String DYNAMIC_MAPPING_TEMPLATE = TemplateUtils.loadTemplate(ROLLUP_TEMPLATE_NAME,
Version.CURRENT.toString(), Rollup.ROLLUP_TEMPLATE_VERSION_PATTERN);

// list of headers that will be stored when a job is created
public static final Set<String> HEADER_FILTERS =
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));


private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
private final Settings settings;
private final boolean enabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
Expand Down Expand Up @@ -105,7 +107,7 @@ protected void masterOperation(Task task, PutRollupJobAction.Request request, Cl
.fields(request.getConfig().getAllFields().toArray(new String[0]));
fieldCapsRequest.setParentTask(clusterService.localNode().getId(), task.getId());

client.fieldCaps(fieldCapsRequest, new ActionListener<FieldCapabilitiesResponse>() {
client.fieldCaps(fieldCapsRequest, new ActionListener<>() {
@Override
public void onResponse(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get());
Expand Down Expand Up @@ -145,13 +147,14 @@ private static RollupJob createRollupJob(RollupJobConfig config, ThreadPool thre
static void createIndex(RollupJob job, ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService, Client client, Logger logger) {

String jobMetadata = "\"" + job.getConfig().getId() + "\":" + job.getConfig().toJSONString();

String mapping = Rollup.DYNAMIC_MAPPING_TEMPLATE
.replace(Rollup.MAPPING_METADATA_PLACEHOLDER, jobMetadata);

CreateIndexRequest request = new CreateIndexRequest(job.getConfig().getRollupIndex());
request.mapping(RollupField.TYPE_NAME, mapping, XContentType.JSON);
try {
XContentBuilder mapping = createMappings(job.getConfig());
request.source(mapping);
} catch (IOException e) {
listener.onFailure(e);
return;
}

client.execute(CreateIndexAction.INSTANCE, request,
ActionListener.wrap(createIndexResponse -> startPersistentTask(job, listener, persistentTasksService), e -> {
Expand All @@ -166,6 +169,40 @@ static void createIndex(RollupJob job, ActionListener<AcknowledgedResponse> list
}));
}

private static XContentBuilder createMappings(RollupJobConfig config) throws IOException {
return XContentBuilder.builder(XContentType.JSON.xContent())
.startObject()
.startObject("mappings")
.startObject("_doc")
.startObject("_meta")
.field(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.CURRENT.toString())
.startObject("_rollup")
.field(config.getId(), config)
.endObject()
.endObject()
.startArray("dynamic_templates")
.startObject()
.startObject("strings")
.field("match_mapping_type", "string")
.startObject("mapping")
.field("type", "keyword")
.endObject()
.endObject()
.endObject()
.startObject()
.startObject("date_histograms")
.field("path_match", "*.date_histogram.timestamp")
.startObject("mapping")
.field("type", "date")
.endObject()
.endObject()
.endObject()
.endArray()
.endObject()
.endObject()
.endObject();
}

@SuppressWarnings("unchecked")
static void updateMapping(RollupJob job, ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService, Client client, Logger logger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public void testIndexMetaData() throws InterruptedException {
String mapping = requestCaptor.getValue().mappings().get("_doc");

// Make sure the version is present, and we have our date template (the most important aspects)
assertThat(mapping, containsString("\"rollup-version\": \"" + Version.CURRENT.toString() + "\""));
assertThat(mapping, containsString("\"path_match\": \"*.date_histogram.timestamp\""));
assertThat(mapping, containsString("\"rollup-version\":\"" + Version.CURRENT.toString() + "\""));
assertThat(mapping, containsString("\"path_match\":\"*.date_histogram.timestamp\""));

listenerCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex()));
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,87 @@ setup:
]
}

---
"Test put job with templates":

- do:
indices.put_template:
name: test
body:
index_patterns: foo_*
mappings:
properties:
field:
type: keyword

- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
rollup.put_job:
id: foo
body: >
{
"index_pattern": "foo",
"rollup_index": "foo_rollup",
"cron": "*/30 * * * * ?",
"page_size" :10,
"groups" : {
"date_histogram": {
"field": "the_field",
"calendar_interval": "1h"
}
},
"metrics": [
{
"field": "value_field",
"metrics": ["min", "max", "sum"]
}
]
}
- is_true: acknowledged

- do:
indices.get_mapping:
index: foo_rollup

- set: {foo_rollup.mappings._meta.rollup-version: version}

- match:
foo_rollup:
mappings:
_meta:
_rollup:
foo:
id: "foo"
index_pattern: "foo"
rollup_index: "foo_rollup"
cron: "*/30 * * * * ?"
page_size: 10
groups :
date_histogram:
calendar_interval: "1h"
field: "the_field"
time_zone: "UTC"
metrics:
- field: "value_field"
metrics:
- "min"
- "max"
- "sum"
timeout: "20s"
rollup-version: $version
dynamic_templates:
- strings:
match_mapping_type: "string"
mapping:
type: "keyword"
- date_histograms:
path_match: "*.date_histogram.timestamp"
mapping:
type: "date"
properties:
field:
type: "keyword"