Skip to content

Commit 284c508

Browse files
authored
Expose Engine.Searcher provider to ingest plugins. (#41010)
Relates to #32789
1 parent ba32255 commit 284c508

File tree

6 files changed

+323
-10
lines changed

6 files changed

+323
-10
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@
3030
import org.elasticsearch.action.index.IndexRequest;
3131
import org.elasticsearch.action.ingest.DeletePipelineRequest;
3232
import org.elasticsearch.action.ingest.PutPipelineRequest;
33+
import org.elasticsearch.action.support.IndicesOptions;
3334
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3435
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
3536
import org.elasticsearch.cluster.ClusterChangedEvent;
3637
import org.elasticsearch.cluster.ClusterState;
3738
import org.elasticsearch.cluster.ClusterStateApplier;
39+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3840
import org.elasticsearch.cluster.metadata.MetaData;
3941
import org.elasticsearch.cluster.node.DiscoveryNode;
4042
import org.elasticsearch.cluster.service.ClusterService;
@@ -45,8 +47,12 @@
4547
import org.elasticsearch.common.xcontent.XContentHelper;
4648
import org.elasticsearch.env.Environment;
4749
import org.elasticsearch.gateway.GatewayService;
50+
import org.elasticsearch.index.Index;
51+
import org.elasticsearch.index.IndexService;
4852
import org.elasticsearch.index.VersionType;
4953
import org.elasticsearch.index.analysis.AnalysisRegistry;
54+
import org.elasticsearch.index.shard.IndexShard;
55+
import org.elasticsearch.indices.IndicesService;
5056
import org.elasticsearch.plugins.IngestPlugin;
5157
import org.elasticsearch.script.ScriptService;
5258
import org.elasticsearch.threadpool.ThreadPool;
@@ -86,17 +92,43 @@ public class IngestService implements ClusterStateApplier {
8692

8793
public IngestService(ClusterService clusterService, ThreadPool threadPool,
8894
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
89-
List<IngestPlugin> ingestPlugins) {
95+
List<IngestPlugin> ingestPlugins, IndicesService indicesService) {
9096
this.clusterService = clusterService;
9197
this.scriptService = scriptService;
98+
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
9299
this.processorFactories = processorFactories(
93100
ingestPlugins,
94101
new Processor.Parameters(
95102
env, scriptService, analysisRegistry,
96103
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
97104
(delay, command) -> threadPool.schedule(
98105
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
99-
), this
106+
), this, indexExpression -> {
107+
ClusterState state = clusterService.state();
108+
Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression);
109+
if (resolvedIndices.length != 1) {
110+
throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index");
111+
}
112+
Index index = resolvedIndices[0];
113+
114+
// check if indexExpression matches with an alias that has a filter
115+
// There is no guarantee that alias filters are applied, so fail if this is the case.
116+
Set<String> indicesAndAliases = resolver.resolveExpressions(state, indexExpression);
117+
String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases);
118+
if (aliasesWithFilter != null && aliasesWithFilter.length > 0) {
119+
throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter");
120+
}
121+
122+
IndexService indexService = indicesService.indexServiceSafe(index);
123+
int numShards = indexService.getMetaData().getNumberOfShards();
124+
if (numShards != 1) {
125+
throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards +
126+
" shards");
127+
}
128+
129+
IndexShard indexShard = indexService.getShard(0);
130+
return indexShard.acquireSearcher("ingest");
131+
}
100132
)
101133
);
102134
this.threadPool = threadPool;

server/src/main/java/org/elasticsearch/ingest/Processor.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import org.elasticsearch.common.util.concurrent.ThreadContext;
2323
import org.elasticsearch.env.Environment;
2424
import org.elasticsearch.index.analysis.AnalysisRegistry;
25+
import org.elasticsearch.index.engine.Engine;
2526
import org.elasticsearch.script.ScriptService;
2627
import org.elasticsearch.threadpool.Scheduler;
2728

2829
import java.util.Map;
2930
import java.util.function.BiFunction;
31+
import java.util.function.Function;
3032
import java.util.function.LongSupplier;
3133

3234
/**
@@ -110,16 +112,24 @@ class Parameters {
110112
*/
111113
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;
112114

115+
/**
116+
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
117+
*
118+
* The locally allocated index must be have a single primary shard.
119+
*/
120+
public final Function<String, Engine.Searcher> localShardSearcher;
121+
113122
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
114123
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
115-
IngestService ingestService) {
124+
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
116125
this.env = env;
117126
this.scriptService = scriptService;
118127
this.threadContext = threadContext;
119128
this.analysisRegistry = analysisRegistry;
120129
this.relativeTimeSupplier = relativeTimeSupplier;
121130
this.scheduler = scheduler;
122131
this.ingestService = ingestService;
132+
this.localShardSearcher = localShardSearcher;
123133
}
124134

125135
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,6 @@ protected Node(
350350
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
351351
clusterService.addStateApplier(scriptModule.getScriptService());
352352
resourcesToClose.add(clusterService);
353-
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
354-
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
355353
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
356354
clusterService.getClusterSettings(), client);
357355
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
@@ -421,6 +419,10 @@ protected Node(
421419
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
422420
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);
423421

422+
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
423+
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
424+
pluginsService.filterPlugins(IngestPlugin.class), indicesService);
425+
424426
final AliasValidator aliasValidator = new AliasValidator();
425427

426428
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(

0 commit comments

Comments
 (0)