Skip to content

Expose Engine.Searcher provider to ingest plugins. #41010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

martijnvg
Copy link
Member

@martijnvg martijnvg commented Apr 9, 2019

Note that this is a PR against the enrich branch and will be backported to enrich-7.x branch.

Needed for #32789 in order for ingest processors to perform local Lucene searches.

@martijnvg martijnvg added >non-issue :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP labels Apr 9, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@martijnvg
Copy link
Member Author

Pinging @elastic/es-core-infra to raise awareness of this change. This PR modifies the ingest plugin extension to provide the ability to an ingest processor to access a locally allocated shard.

@jakelandis jakelandis self-requested a review April 16, 2019 23:38
@@ -96,7 +99,19 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this
), this, index -> {
IndexService indexService = indicesService.indexService(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume from the exception message that the IndexService returns null unless it it local to this node ? (it is not very clear from the code if InicesService is per node or per cluster).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, IndicesServices only contains a IndexService instance for locally allocated indices.

@@ -96,7 +99,19 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this
), this, index -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to do some alias resolution here as it appears that this call requires the concrete index ?

Copy link
Contributor

@jakelandis jakelandis Apr 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now you do it in the test plugin ... but maybe move the logic here to have one place for failures of that kind ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 yeah it makes sense to move the index expression resolution to here.

The Function<Index, Engine.Searcher> would then change into Function<String, Engine.Searcher>.

}

IndexShard indexShard = indexService.getShard(0);
return indexShard.acquireSearcher("ingest");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education, does this create a new searcher or give you back a pre-existing one ? If the former, do we need to do anything with the existing one w/r/t disabling refreshes ? (sorry if this a bogus question, I don't have much experience at this level).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always creates a new searcher. The code invoking this should be aware of this and make sure to close it after it is done with the searcher. In our case we will need to reuse the searcher from an ingest processor. We don't want to invoke this every time an enrich processor is invoked.

public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService) {
IngestService ingestService, Function<Index, Engine.Searcher> searcherProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: searcherProvider -> localShardSearcher

@jakelandis
Copy link
Contributor

LGTM, just a couple questions and a nitpick

However, I am a bit concerned by exposing this to all ingest plugins and would like another opinion on that point before fully commiting to this approach.

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/packaging-sample

@martijnvg
Copy link
Member Author

However, I am a bit concerned by exposing this to all ingest plugins and would like another opinion on that point before fully commiting to this approach.

Not sure how else we can expose a local allocated shard to an ingest processor. We either allow it for all plugins or not at all. I think the same applies also to other components (ClusterService, ScriptService etc.) which we already expose to ingest plugins.

@jakelandis
Copy link
Contributor

I am not advocating for nor against exposing this as-is. I simply don't fully understand the ramifications (if any) this may brings.

@jpountz or @jasontedor - do you see any potential issues of exposing an IndexSearcher (for local single shard index) directly to any Ingest plugin ?

Not sure how else we can expose a local allocated shard to an ingest processor.

There is likely a way, but would require some refactoring.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpountz or @jasontedor - do you see any potential issues of exposing an IndexSearcher (for local single shard index) directly to any Ingest plugin ?

The main issue I can think of is that the searcher might provide access to more information than it should, eg. if a filtered alias is queried, if the index is not allowed for reading, or if there are FLS/DLS rules configured. Searchers are read-only so there is no risk of altering data however.

IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
throw new ResourceNotFoundException("index [" + index + "] not locally allocated");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use indexServiceSafe?

Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression);
if (resolvedIndices.length != 1) {
throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we prevent filtered aliases too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense. There is no guarantee that they will be applied in an ingest processor. I will fail with an error if an alias has a filter.

@martijnvg
Copy link
Member Author

or if there are FLS/DLS rules configured

I think FLS/DLS rules still apply, because IndexShard#acquireSearcher(...) uses an IndexSearcherWrapper it has been configured and that is the case if dls/fls has been configured.

@jpountz
Copy link
Contributor

jpountz commented Apr 18, 2019

Let's have a test that the searcher wrapper is applied when acquired via an ingest processor?

@martijnvg martijnvg requested review from jpountz and jakelandis April 18, 2019 09:21
@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/packaging-sample

1 similar comment
@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/packaging-sample

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String indexExpression = "reference-index";
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserting that the wrapper has been called doesn't feel enough to me, I'd like to have a custom directory reader that the wrapper uses, and assert here that we get the expected implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted this test: fc939ef

@martijnvg martijnvg requested a review from jpountz April 24, 2019 13:29
Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martijnvg !

@martijnvg
Copy link
Member Author

@elasticmachine run elasticsearch-ci/1

(unrelated test failure, IntervalQueryBuilderTests.testToQuery, java.lang.IllegalArgumentException: Too many disjunctions to expand)

@martijnvg martijnvg merged commit 284c508 into elastic:enrich Apr 24, 2019
martijnvg added a commit that referenced this pull request Apr 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >non-issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants