-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Add enrich policy runner #41088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add enrich policy runner #41088
Conversation
Pinging @elastic/es-core-features |
The general approach in the PR looks good to me. |
90b124a
to
6828919
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few nits. This change is going well!
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyResult.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
import static org.hamcrest.CoreMatchers.is; | ||
import static org.hamcrest.CoreMatchers.nullValue; | ||
|
||
public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to test this without starting a node, but I'm not sure how :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was one of those cases where it felt like mocking out all of the dependencies needed was a bit much. Would be happy to change the test if there are any tools to make that easier, or if we feel that it's important enough that it doesn't spin up a node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think this is the right trade off. Otherwise the code for mocking becomes unmaintainable.
Removed WIP label, should be good for a proper review at this point |
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
"Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]", | ||
policyName, policy.getEnrichKey(), sourceIndex)); | ||
} | ||
for (String enrichField : policy.getEnrichValues()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is OK if the not all of the values are present in the source index. Misses of the values should be handled by the processor.
However, we may want to log a warning if the same value is coming from multiple indexes since only 1 will win and it's not deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lean towards failing if a decorate field is missing in the mapping. I think this can lead to unexpected behaviour at enrich time? (why does this document not have all the enriched fields?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the mapping exists in the source index, the document we are using to enrich may not have that field and will need handle partial decorations in the processor.
If the decision to only allow full decorations, then the check here makes sense... but will need to changed slightly such that all enrich fields exist across the set of the source indexes (as opposed to exist in every source index as it is here).
I am in favor of removing this check and implement ignore_missing
(or the like) in the processor to allow partial decorations.
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<BulkByScrollResponse>() { | ||
@Override | ||
public void onResponse(BulkByScrollResponse bulkByScrollResponse) { | ||
// Do we want to fail the request if there were failures during the reindex process? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with this API... does this get called once after the whole re-index is complete, or per scroll response ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this gets invoked after the whole reindex has been completed.
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyResult.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java
Outdated
Show resolved
Hide resolved
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.CoreMatchers.is; | ||
import static org.hamcrest.CoreMatchers.nullValue; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests here are good for the happy case, but I would like to see more testing around the un-happy cases. Investments now in this test will greatly help future developers to quickly test changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another review round.
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<BulkByScrollResponse>() { | ||
@Override | ||
public void onResponse(BulkByScrollResponse bulkByScrollResponse) { | ||
// Do we want to fail the request if there were failures during the reindex process? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this gets invoked after the whole reindex has been completed.
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]", | ||
policyName, sourceIndex, policy.getIndexPattern())); | ||
} | ||
if (properties.containsKey(policy.getEnrichKey()) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably allow _id
as the enrich key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, though I think it's probably a good idea to put that into a separate PR. I don't think we should bog this one down with too many initial features.
Add EnrichPolicyExecutor which runs runners within a threadpool executor Update the tests
I made the runner object a Runnable and added a basic EnrichPolicyExecutor implementation. I figured the management threadpool would be fine to execute these on. I also simplified the enrich mapping to just define the key, and rely on the eventual meta mapping type to be added later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another review round. I like the split between EnrichPolicyExecutor and EnrichPolicyRunner.
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Show resolved
Hide resolved
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); | ||
searchSourceBuilder.fetchSource(retainFields.toArray(new String[0]), new String[0]); | ||
if (policy.getQuery() != null) { | ||
searchSourceBuilder.query(QueryBuilders.wrapperQuery(policy.getQuery().getQuery())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the policy query wrapped in a wrapper query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with the wrapper query since the EnrichPolicy.QuerySource returns the raw query json as a byte sequence. Is there a more appropriate way to convert a raw query body into a query builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, actually that does make sense to me :)
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
listener.onFailure(e); | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also delete the old enrich index? Maybe not here and not in the pr, but just wondering. These old indices should be purged at some point in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should definitely be subject to some kind of background cleanup task.
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
Show resolved
Hide resolved
Adds the foundation of the execution logic to execute an enrich policy. Validates the source index existence as well as mappings, creates a new enrich index for the policy, reindexes the source index into the new enrich index, and swaps the enrich alias for the policy to the new index.
Backports #41088 Adds the foundation of the execution logic to execute an enrich policy. Validates the source index existence as well as mappings, creates a new enrich index for the policy, reindexes the source index into the new enrich index, and swaps the enrich alias for the policy to the new index.
Note that this is a PR against the enrich branch and will be backported to enrich-7.x branch.
Adds a skeleton of the execution logic to execute an enrich policy. Validates the source index existence as well as mappings, creates a new enrich index for the policy, reindexes the source index into the new enrich index, and swaps the enrich alias for the policy to the new index.
WIP: Comments are welcome at this point. The code really needs some testing and will most likely be rebased on to some other PR's as they come in (namely #41003 and #40997), but I'd like to get some feedback before then.Good to go for review