Skip to content

Add enrich policy runner #41088

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
May 2, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6336351
WIP Policy execution code for enrich
jbaiera Apr 10, 2019
05769e5
Get the policy runner to pass a basic test, add licenses, generally f…
jbaiera Apr 12, 2019
6828919
Fix up implementation after merging in new enrich model
jbaiera Apr 15, 2019
0b6ea3a
Modify runner to accept index patterns for input
jbaiera Apr 16, 2019
41a886c
Make enrich policy result immutable
jbaiera Apr 16, 2019
3c4b24b
Make enrich policy runner constructor package private
jbaiera Apr 16, 2019
96899fb
Add assert to mapping validation
jbaiera Apr 16, 2019
3dc1e4d
Remove SuppressWarnings annotation in policy runner
jbaiera Apr 16, 2019
9c5aa7a
Rename EnrichPolicyResult to PolicyExecutionResult
jbaiera Apr 18, 2019
47099af
Check for null policy when executing id
jbaiera Apr 18, 2019
ea22658
Make ".enrich" a constant
jbaiera Apr 18, 2019
f3e7afe
Simplify the enrich mapping to just the enrich key, disable _source
jbaiera Apr 18, 2019
7546e67
Make EnrichPolicyRunner a Runnable
jbaiera Apr 18, 2019
1be4639
Remove enrich value field validation
jbaiera Apr 18, 2019
6d10db0
Use keySet on mapping validation instead of Map
jbaiera Apr 18, 2019
23493bf
Swap to diamond operators on ActionListener impls
jbaiera Apr 18, 2019
b751660
Merge branch 'enrich' into enrich-policy-runner
jbaiera Apr 30, 2019
35c4a2a
Use Generic thread pool when executing policy.
jbaiera Apr 30, 2019
31eeb40
Auto expand replicas on enrich indices.
jbaiera Apr 30, 2019
6c884d3
Enable source on enrich indices
jbaiera Apr 30, 2019
112a40b
set enrich mapping dynamic field to false
jbaiera Apr 30, 2019
4137585
Remove default match all query
jbaiera May 1, 2019
05f201c
Add additional asserts to test
jbaiera May 1, 2019
ac69e7d
Force content type of enrich index to use SMILE
jbaiera May 1, 2019
08dc157
Fix precommit
jbaiera May 1, 2019
7e5d52c
Merge branch 'enrich' into enrich-policy-runner
jbaiera May 1, 2019
74df5a7
Merge branch 'enrich' into enrich-policy-runner
jbaiera May 1, 2019
6522151
Merge branch 'enrich' into enrich-policy-runner
jbaiera May 2, 2019
064371d
Merge branch 'enrich' into enrich-policy-runner
jbaiera May 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -42,7 +44,10 @@ public Collection<Object> createComponents(Client client,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singleton(new EnrichStore(clusterService));
EnrichStore enrichStore = new EnrichStore(clusterService);
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(enrichStore, clusterService, client, threadPool,
new IndexNameExpressionResolver(), System::currentTimeMillis);
return Arrays.asList(enrichStore, enrichPolicyExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.enrich;

import java.util.function.LongSupplier;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;

class EnrichPolicyExecutor {

private final EnrichStore enrichStore;
private final ClusterService clusterService;
private final Client client;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;

EnrichPolicyExecutor(EnrichStore enrichStore, ClusterService clusterService, Client client, ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) {
this.enrichStore = enrichStore;
this.clusterService = clusterService;
this.client = client;
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
}

public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> listener) {
// Look up policy in policy store and execute it
EnrichPolicy policy = enrichStore.getPolicy(policyId);
if (policy == null) {
throw new ElasticsearchException("Policy execution failed. Could not locate policy with id [{}]", policyId);
} else {
runPolicy(policyId, policy, listener);
}
}

public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
EnrichPolicyRunner runnable =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier);
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class EnrichPolicyRunner implements Runnable {

private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);

private static final String ENRICH_INDEX_NAME_BASE = ".enrich-";

private final String policyName;
private final EnrichPolicy policy;
private final ActionListener<PolicyExecutionResult> listener;
private final ClusterService clusterService;
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;

EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier) {
this.policyName = policyName;
this.policy = policy;
this.listener = listener;
this.clusterService = clusterService;
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
}

@Override
public void run() {
// Collect the source index information
logger.info("Policy [{}]: Running enrich policy", policyName);
final String sourceIndexPattern = policy.getIndexPattern();
logger.debug("Policy [{}]: Checking source index [{}]", policyName, sourceIndexPattern);
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndexPattern);
client.admin().indices().getIndex(getIndexRequest, new ActionListener<>() {
@Override
public void onResponse(GetIndexResponse getIndexResponse) {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex();
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

private Map<String, Object> getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getIndexResponse.mappings();
ImmutableOpenMap<String, MappingMetaData> indexMapping = mappings.get(sourceIndexName);
assert indexMapping.keys().size() == 1 : "Expecting only one type per index";
MappingMetaData typeMapping = indexMapping.iterator().next().value;
return typeMapping.sourceAsMap();
}

private void validateMappings(final GetIndexResponse getIndexResponse) {
String[] sourceIndices = getIndexResponse.getIndices();
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
Set<?> properties = ((Map<?, ?>) mapping.get("properties")).keySet();
if (properties == null) {
listener.onFailure(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndexPattern()));
}
if (properties.contains(policy.getEnrichKey()) == false) {
listener.onFailure(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]",
policyName, policy.getEnrichKey(), sourceIndex));
}
}
}

private String getEnrichIndexBase(final String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName;
}

private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.EXACT_MATCH_TYPE, which is a keyword type
String keyType;
if (EnrichPolicy.EXACT_MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword";
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
}

// Disable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.startObject("_source")
.field("enabled", false)
.endObject()
.startObject("properties")
.startObject(policy.getEnrichKey())
.field("type", keyType)
.field("index", true)
.field("doc_values", false)
.endObject()
.endObject()
.endObject()
.endObject();

return builder;
} catch (IOException ioe) {
throw new UncheckedIOException("Could not render enrich mapping", ioe);
}
}

private void prepareAndCreateEnrichIndex() {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = getEnrichIndexBase(policyName) + "-" + nowTimestamp;
// TODO: Settings for localizing enrich indices to nodes that are ingest+data only
Settings enrichIndexSettings = Settings.EMPTY;
CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings);
createEnrichIndexRequest.mapping(MapperService.SINGLE_MAPPING_NAME, resolveEnrichMapping(policy));
logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName);
client.admin().indices().create(createEnrichIndexRequest, new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
transferDataToEnrichIndex(enrichIndexName);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

private void transferDataToEnrichIndex(final String destinationIndexName) {
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
// Filter down the source fields to just the ones required by the policy
final Set<String> retainFields = new HashSet<>();
retainFields.add(policy.getEnrichKey());
retainFields.addAll(policy.getEnrichValues());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(retainFields.toArray(new String[0]), new String[0]);
if (policy.getQuery() != null) {
searchSourceBuilder.query(QueryBuilders.wrapperQuery(policy.getQuery().getQuery()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the policy query wrapped in a wrapper query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the wrapper query since the EnrichPolicy.QuerySource returns the raw query json as a byte sequence. Is there a more appropriate way to convert a raw query body into a query builder?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, actually that does make sense to me :)

} else {
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
}
ReindexRequest reindexRequest = new ReindexRequest()
.setDestIndex(destinationIndexName)
.setSourceIndices(policy.getIndexPattern());
reindexRequest.getSearchRequest().source(searchSourceBuilder);
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
// Do we want to fail the request if there were failures during the reindex process?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with this API... does this get called once after the whole re-index is complete, or per scroll response ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this gets invoked after the whole reindex has been completed.

if (bulkByScrollResponse.getBulkFailures().size() > 0) {
listener.onFailure(new ElasticsearchException("Encountered bulk failures during reindex process"));
} else if (bulkByScrollResponse.getSearchFailures().size() > 0) {
listener.onFailure(new ElasticsearchException("Encountered search failures during reindex process"));
} else {
logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName,
bulkByScrollResponse.getCreated(), destinationIndexName);
refreshEnrichIndex(destinationIndexName);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

private void refreshEnrichIndex(final String destinationIndexName) {
logger.debug("Policy [{}]: Refreshing newly created enrich index [{}]", policyName, destinationIndexName);
client.admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener<>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
updateEnrichPolicyAlias(destinationIndexName);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
String enrichIndexBase = getEnrichIndexBase(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);
ImmutableOpenMap<String, List<AliasMetaData>> aliases =
clusterService.state().metaData().findAliases(aliasRequest, concreteIndices);
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();
String[] indices = aliases.keys().toArray(String.class);
if (indices.length > 0) {
aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().indices(indices).alias(enrichIndexBase));
}
aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(destinationIndexName).alias(enrichIndexBase));
client.admin().indices().aliases(aliasToggleRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info("Policy [{}]: Policy execution complete", policyName);
listener.onResponse(new PolicyExecutionResult(true));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also delete the old enrich index? Maybe not here and not in the pr, but just wondering. These old indices should be purged at some point in time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should definitely be subject to some kind of background cleanup task.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;

public class PolicyExecutionResult {
private final boolean completed;

public PolicyExecutionResult(boolean completed) {
this.completed = completed;
}

public boolean isCompleted() {
return completed;
}
}
Loading