Skip to content

Expose Engine.Searcher provider to ingest plugins. #41010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -45,8 +47,12 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -86,17 +92,43 @@ public class IngestService implements ClusterStateApplier {

public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
List<IngestPlugin> ingestPlugins, IndicesService indicesService) {
this.clusterService = clusterService;
this.scriptService = scriptService;
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
env, scriptService, analysisRegistry,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this
), this, indexExpression -> {
ClusterState state = clusterService.state();
Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression);
if (resolvedIndices.length != 1) {
throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use indexServiceSafe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we prevent filtered aliases too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense. There is no guarantee that they will be applied in an ingest processor. I will fail with an error if an alias has a filter.

Index index = resolvedIndices[0];

// check if indexExpression matches with an alias that has a filter
// There is no guarantee that alias filters are applied, so fail if this is the case.
Set<String> indicesAndAliases = resolver.resolveExpressions(state, indexExpression);
String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases);
if (aliasesWithFilter != null && aliasesWithFilter.length > 0) {
throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter");
}

IndexService indexService = indicesService.indexServiceSafe(index);
int numShards = indexService.getMetaData().getNumberOfShards();
if (numShards != 1) {
throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards +
" shards");
}

IndexShard indexShard = indexService.getShard(0);
return indexShard.acquireSearcher("ingest");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education, does this create a new searcher or give you back a pre-existing one ? If the former, do we need to do anything with the existing one w/r/t disabling refreshes ? (sorry if this a bogus question, I don't have much experience at this level).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always creates a new searcher. The code invoking this should be aware of this and make sure to close it after it is done with the searcher. In our case we will need to reuse the searcher from an ingest processor. We don't want to invoke this every time an enrich processor is invoked.

}
)
);
this.threadPool = threadPool;
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;

import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -110,16 +112,24 @@ class Parameters {
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

/**
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
*
* The locally allocated index must be have a single primary shard.
*/
public final Function<String, Engine.Searcher> localShardSearcher;

public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService) {
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.ingestService = ingestService;
this.localShardSearcher = localShardSearcher;
}

}
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ protected Node(
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
Expand Down Expand Up @@ -421,6 +419,10 @@ protected Node(
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);

final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), indicesService);

final AliasValidator aliasValidator = new AliasValidator();

final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(
Expand Down
Loading