Skip to content

Commit bc22d07

Browse files
INGEST: Simplify IngestService (#33008) (#33314)
* INGEST: Simplify IngestService (#33008) * Follow up to #32617 * Flatten redundant inner classes of `IngestService`
1 parent fa7a1b8 commit bc22d07

21 files changed

+1381
-1490
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
8989
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
9090
* the one with the least casts.
9191
*/
92-
final List<DocWriteRequest> requests = new ArrayList<>();
92+
final List<DocWriteRequest<?>> requests = new ArrayList<>();
9393
private final Set<String> indices = new HashSet<>();
9494
List<Object> payloads = null;
9595

@@ -105,14 +105,14 @@ public BulkRequest() {
105105
/**
106106
* Adds a list of requests to be executed. Either index or delete requests.
107107
*/
108-
public BulkRequest add(DocWriteRequest... requests) {
109-
for (DocWriteRequest request : requests) {
108+
public BulkRequest add(DocWriteRequest<?>... requests) {
109+
for (DocWriteRequest<?> request : requests) {
110110
add(request, null);
111111
}
112112
return this;
113113
}
114114

115-
public BulkRequest add(DocWriteRequest request) {
115+
public BulkRequest add(DocWriteRequest<?> request) {
116116
return add(request, null);
117117
}
118118

@@ -122,7 +122,7 @@ public BulkRequest add(DocWriteRequest request) {
122122
* @param payload Optional payload
123123
* @return the current bulk request
124124
*/
125-
public BulkRequest add(DocWriteRequest request, @Nullable Object payload) {
125+
public BulkRequest add(DocWriteRequest<?> request, @Nullable Object payload) {
126126
if (request instanceof IndexRequest) {
127127
add((IndexRequest) request, payload);
128128
} else if (request instanceof DeleteRequest) {
@@ -139,8 +139,8 @@ public BulkRequest add(DocWriteRequest request, @Nullable Object payload) {
139139
/**
140140
* Adds a list of requests to be executed. Either index or delete requests.
141141
*/
142-
public BulkRequest add(Iterable<DocWriteRequest> requests) {
143-
for (DocWriteRequest request : requests) {
142+
public BulkRequest add(Iterable<DocWriteRequest<?>> requests) {
143+
for (DocWriteRequest<?> request : requests) {
144144
add(request);
145145
}
146146
return this;
@@ -229,7 +229,7 @@ private void addPayload(Object payload) {
229229
/**
230230
* The list of requests in this bulk request.
231231
*/
232-
public List<DocWriteRequest> requests() {
232+
public List<DocWriteRequest<?>> requests() {
233233
return this.requests;
234234
}
235235

@@ -550,7 +550,7 @@ public ActionRequestValidationException validate() {
550550
if (requests.isEmpty()) {
551551
validationException = addValidationError("no requests added", validationException);
552552
}
553-
for (DocWriteRequest request : requests) {
553+
for (DocWriteRequest<?> request : requests) {
554554
// We first check if refresh has been set
555555
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
556556
validationException = addValidationError(
@@ -585,7 +585,7 @@ public void writeTo(StreamOutput out) throws IOException {
585585
super.writeTo(out);
586586
waitForActiveShards.writeTo(out);
587587
out.writeVInt(requests.size());
588-
for (DocWriteRequest request : requests) {
588+
for (DocWriteRequest<?> request : requests) {
589589
DocWriteRequest.writeDocumentRequest(out, request);
590590
}
591591
refreshPolicy.writeTo(out);

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ private long relativeTime() {
525525
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
526526
long ingestStartTimeInNanos = System.nanoTime();
527527
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
528-
ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
528+
ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
529529
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
530530
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
531531
bulkRequestModifier.markCurrentItemAsFailed(exception);
@@ -549,7 +549,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
549549
});
550550
}
551551

552-
static final class BulkRequestModifier implements Iterator<DocWriteRequest> {
552+
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
553553

554554
final BulkRequest bulkRequest;
555555
final SparseFixedBitSet failedSlots;
@@ -584,7 +584,7 @@ BulkRequest getBulkRequest() {
584584
modifiedBulkRequest.timeout(bulkRequest.timeout());
585585

586586
int slot = 0;
587-
List<DocWriteRequest> requests = bulkRequest.requests();
587+
List<DocWriteRequest<?>> requests = bulkRequest.requests();
588588
originalSlots = new int[requests.size()]; // oversize, but that's ok
589589
for (int i = 0; i < requests.size(); i++) {
590590
DocWriteRequest request = requests.get(i);

server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,23 @@
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.inject.Inject;
3232
import org.elasticsearch.common.settings.Settings;
33-
import org.elasticsearch.ingest.PipelineStore;
34-
import org.elasticsearch.node.NodeService;
33+
import org.elasticsearch.ingest.IngestService;
3534
import org.elasticsearch.threadpool.ThreadPool;
3635
import org.elasticsearch.transport.TransportService;
3736

3837
public class DeletePipelineTransportAction extends TransportMasterNodeAction<DeletePipelineRequest, AcknowledgedResponse> {
3938

40-
private final PipelineStore pipelineStore;
39+
private final IngestService ingestService;
4140
private final ClusterService clusterService;
4241

4342
@Inject
4443
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
4544
TransportService transportService, ActionFilters actionFilters,
46-
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
47-
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
45+
IndexNameExpressionResolver indexNameExpressionResolver, IngestService ingestService) {
46+
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool,
47+
actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
4848
this.clusterService = clusterService;
49-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
49+
this.ingestService = ingestService;
5050
}
5151

5252
@Override
@@ -60,8 +60,9 @@ protected AcknowledgedResponse newResponse() {
6060
}
6161

6262
@Override
63-
protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) throws Exception {
64-
pipelineStore.delete(clusterService, request, listener);
63+
protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
64+
throws Exception {
65+
ingestService.delete(request, listener);
6566
}
6667

6768
@Override

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,18 @@
2929
import org.elasticsearch.cluster.service.ClusterService;
3030
import org.elasticsearch.common.inject.Inject;
3131
import org.elasticsearch.common.settings.Settings;
32-
import org.elasticsearch.ingest.PipelineStore;
33-
import org.elasticsearch.node.NodeService;
32+
import org.elasticsearch.ingest.IngestService;
3433
import org.elasticsearch.threadpool.ThreadPool;
3534
import org.elasticsearch.transport.TransportService;
3635

3736
public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> {
3837

39-
private final PipelineStore pipelineStore;
40-
4138
@Inject
4239
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
4340
TransportService transportService, ActionFilters actionFilters,
44-
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
45-
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
46-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
41+
IndexNameExpressionResolver indexNameExpressionResolver) {
42+
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters,
43+
indexNameExpressionResolver, GetPipelineRequest::new);
4744
}
4845

4946
@Override
@@ -58,7 +55,7 @@ protected GetPipelineResponse newResponse() {
5855

5956
@Override
6057
protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener) throws Exception {
61-
listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(state, request.getIds())));
58+
listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds())));
6259
}
6360

6461
@Override

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@
3535
import org.elasticsearch.cluster.service.ClusterService;
3636
import org.elasticsearch.common.inject.Inject;
3737
import org.elasticsearch.common.settings.Settings;
38-
import org.elasticsearch.ingest.PipelineStore;
38+
import org.elasticsearch.ingest.IngestService;
3939
import org.elasticsearch.ingest.IngestInfo;
40-
import org.elasticsearch.node.NodeService;
4140
import org.elasticsearch.threadpool.ThreadPool;
4241
import org.elasticsearch.transport.TransportService;
4342

@@ -46,19 +45,19 @@
4645

4746
public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPipelineRequest, AcknowledgedResponse> {
4847

49-
private final PipelineStore pipelineStore;
48+
private final IngestService ingestService;
5049
private final ClusterService clusterService;
5150
private final TransportNodesInfoAction nodesInfoAction;
5251

5352
@Inject
5453
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
5554
TransportService transportService, ActionFilters actionFilters,
56-
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
55+
IndexNameExpressionResolver indexNameExpressionResolver, IngestService ingestService,
5756
TransportNodesInfoAction nodesInfoAction) {
5857
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
5958
this.clusterService = clusterService;
6059
this.nodesInfoAction = nodesInfoAction;
61-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
60+
this.ingestService = ingestService;
6261
}
6362

6463
@Override
@@ -84,7 +83,7 @@ public void onResponse(NodesInfoResponse nodeInfos) {
8483
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
8584
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
8685
}
87-
pipelineStore.put(clusterService, ingestInfos, request, listener);
86+
ingestService.putPipeline(ingestInfos, request, listener);
8887
} catch (Exception e) {
8988
onFailure(e);
9089
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.elasticsearch.index.VersionType;
3333
import org.elasticsearch.ingest.ConfigurationUtils;
3434
import org.elasticsearch.ingest.IngestDocument;
35+
import org.elasticsearch.ingest.IngestService;
3536
import org.elasticsearch.ingest.Pipeline;
36-
import org.elasticsearch.ingest.PipelineStore;
3737

3838
import java.io.IOException;
3939
import java.util.ArrayList;
@@ -160,24 +160,23 @@ public boolean isVerbose() {
160160
}
161161
}
162162

163-
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
164163
static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
165164

166-
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
165+
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService) {
167166
if (pipelineId == null) {
168167
throw new IllegalArgumentException("param [pipeline] is null");
169168
}
170-
Pipeline pipeline = pipelineStore.get(pipelineId);
169+
Pipeline pipeline = ingestService.getPipeline(pipelineId);
171170
if (pipeline == null) {
172171
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
173172
}
174173
List<IngestDocument> ingestDocumentList = parseDocs(config);
175174
return new Parsed(pipeline, ingestDocumentList, verbose);
176175
}
177176

178-
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
177+
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
179178
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
180-
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
179+
Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories());
181180
List<IngestDocument> ingestDocumentList = parseDocs(config);
182181
return new Parsed(pipeline, ingestDocumentList, verbose);
183182
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.elasticsearch.common.inject.Inject;
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.xcontent.XContentHelper;
29-
import org.elasticsearch.ingest.PipelineStore;
29+
import org.elasticsearch.ingest.IngestService;
3030
import org.elasticsearch.node.NodeService;
3131
import org.elasticsearch.threadpool.ThreadPool;
3232
import org.elasticsearch.transport.TransportService;
@@ -35,13 +35,13 @@
3535

3636
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
3737

38-
private final PipelineStore pipelineStore;
38+
private final IngestService ingestService;
3939
private final SimulateExecutionService executionService;
4040

4141
@Inject
4242
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
4343
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
44-
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
44+
this.ingestService = nodeService.getIngestService();
4545
this.executionService = new SimulateExecutionService(threadPool);
4646
}
4747

@@ -52,9 +52,9 @@ protected void doExecute(SimulatePipelineRequest request, ActionListener<Simulat
5252
final SimulatePipelineRequest.Parsed simulateRequest;
5353
try {
5454
if (request.getId() != null) {
55-
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
55+
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService);
5656
} else {
57-
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
57+
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService);
5858
}
5959
} catch (Exception e) {
6060
listener.onFailure(e);

0 commit comments

Comments
 (0)