Skip to content

Commit d4dd78c

Browse files
authored
Allow ingest processors access to node client. (elastic#46077)
This is the first PR that merges changes made to server module from the enrich branch (see elastic#32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation.
1 parent d49a658 commit d4dd78c

File tree

5 files changed

+23
-9
lines changed

5 files changed

+23
-9
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.ingest.DeletePipelineRequest;
3232
import org.elasticsearch.action.ingest.PutPipelineRequest;
3333
import org.elasticsearch.action.support.master.AcknowledgedResponse;
34+
import org.elasticsearch.client.Client;
3435
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
3536
import org.elasticsearch.cluster.ClusterChangedEvent;
3637
import org.elasticsearch.cluster.ClusterState;
@@ -86,7 +87,7 @@ public class IngestService implements ClusterStateApplier {
8687

8788
public IngestService(ClusterService clusterService, ThreadPool threadPool,
8889
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
89-
List<IngestPlugin> ingestPlugins) {
90+
List<IngestPlugin> ingestPlugins, Client client) {
9091
this.clusterService = clusterService;
9192
this.scriptService = scriptService;
9293
this.processorFactories = processorFactories(
@@ -96,7 +97,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
9697
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
9798
(delay, command) -> threadPool.schedule(
9899
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
99-
), this
100+
), this, client
100101
)
101102
);
102103
this.threadPool = threadPool;

server/src/main/java/org/elasticsearch/ingest/Processor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import org.elasticsearch.client.Client;
2223
import org.elasticsearch.common.util.concurrent.ThreadContext;
2324
import org.elasticsearch.env.Environment;
2425
import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -110,16 +111,22 @@ class Parameters {
110111
*/
111112
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;
112113

114+
/**
115+
* Provides access to the node client
116+
*/
117+
public final Client client;
118+
113119
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
114120
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
115-
IngestService ingestService) {
121+
IngestService ingestService, Client client) {
116122
this.env = env;
117123
this.scriptService = scriptService;
118124
this.threadContext = threadContext;
119125
this.analysisRegistry = analysisRegistry;
120126
this.relativeTimeSupplier = relativeTimeSupplier;
121127
this.scheduler = scheduler;
122128
this.ingestService = ingestService;
129+
this.client = client;
123130
}
124131

125132
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,8 @@ protected Node(
366366
new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings())
367367
.newHashPublisher());
368368
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
369-
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
369+
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
370+
pluginsService.filterPlugins(IngestPlugin.class), client);
370371
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
371372
final UsageService usageService = new UsageService();
372373

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.action.ingest.DeletePipelineRequest;
3636
import org.elasticsearch.action.ingest.PutPipelineRequest;
3737
import org.elasticsearch.action.update.UpdateRequest;
38+
import org.elasticsearch.client.Client;
3839
import org.elasticsearch.client.Requests;
3940
import org.elasticsearch.cluster.ClusterChangedEvent;
4041
import org.elasticsearch.cluster.ClusterName;
@@ -106,27 +107,30 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
106107

107108
public void testIngestPlugin() {
108109
ThreadPool tp = mock(ThreadPool.class);
110+
Client client = mock(Client.class);
109111
IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null,
110-
null, Collections.singletonList(DUMMY_PLUGIN));
112+
null, Collections.singletonList(DUMMY_PLUGIN), client);
111113
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
112114
assertTrue(factories.containsKey("foo"));
113115
assertEquals(1, factories.size());
114116
}
115117

116118
public void testIngestPluginDuplicate() {
117119
ThreadPool tp = mock(ThreadPool.class);
120+
Client client = mock(Client.class);
118121
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
119122
new IngestService(mock(ClusterService.class), tp, null, null,
120-
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)));
123+
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client));
121124
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
122125
}
123126

124127
public void testExecuteIndexPipelineDoesNotExist() {
125128
ThreadPool threadPool = mock(ThreadPool.class);
129+
Client client = mock(Client.class);
126130
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
127131
when(threadPool.executor(anyString())).thenReturn(executorService);
128132
IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null,
129-
null, Collections.singletonList(DUMMY_PLUGIN));
133+
null, Collections.singletonList(DUMMY_PLUGIN), client);
130134
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
131135

132136
final SetOnce<Boolean> failure = new SetOnce<>();
@@ -1138,6 +1142,7 @@ private static IngestService createWithProcessors() {
11381142

11391143
private static IngestService createWithProcessors(Map<String, Processor.Factory> processors) {
11401144
ThreadPool threadPool = mock(ThreadPool.class);
1145+
Client client = mock(Client.class);
11411146
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
11421147
when(threadPool.executor(anyString())).thenReturn(executorService);
11431148
return new IngestService(mock(ClusterService.class), threadPool, null, null,
@@ -1146,7 +1151,7 @@ private static IngestService createWithProcessors(Map<String, Processor.Factory>
11461151
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
11471152
return processors;
11481153
}
1149-
}));
1154+
}), client);
11501155
}
11511156

11521157
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1045,7 +1045,7 @@ allocationService, new AliasValidator(), environment, indexScopedSettings,
10451045
new IngestService(
10461046
clusterService, threadPool, environment, scriptService,
10471047
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
1048-
Collections.emptyList()),
1048+
Collections.emptyList(), client),
10491049
client, actionFilters, indexNameExpressionResolver,
10501050
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
10511051
));

0 commit comments

Comments
 (0)