diff --git a/docs/changelog/120368.yaml b/docs/changelog/120368.yaml new file mode 100644 index 0000000000000..f51b42e3f241a --- /dev/null +++ b/docs/changelog/120368.yaml @@ -0,0 +1,5 @@ +pr: 120368 +summary: Add a pre-mapping logical plan processing step +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java index 1290bbca59ee7..a6f5aaae1731f 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java @@ -26,6 +26,19 @@ public void set(T value) { this.value = value; } + /** + * Sets a value in the holder, but only if none has already been set. + * @param value the new value to set. + * @return the previously held value, if any was set. + */ + public T trySet(T value) { + T old = this.value; + if (old == null) { + this.value = value; + } + return old; + } + public T get() { return value; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 3e072e9a05c20..6deda725dcad4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -11,6 +11,8 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; @@ -31,9 +33,11 @@ import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.index.IndexMode; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; @@ -72,8 +76,8 @@ import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.session.Configuration; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.stats.SearchStats; import org.elasticsearch.xpack.esql.telemetry.Metrics; import org.elasticsearch.xpack.versionfield.Version; @@ -140,6 +144,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; public final class EsqlTestUtils { @@ -360,7 +365,14 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L)); - public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver(); + public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices( + mock(TransportService.class), + mock(SearchService.class), + null, + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + null + ); private EsqlTestUtils() {} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java deleted file mode 100644 index 7af3a89108fc0..0000000000000 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; -import org.elasticsearch.xpack.esql.session.Result; - -import java.util.function.BiConsumer; - -public class MockQueryBuilderResolver extends QueryBuilderResolver { - public MockQueryBuilderResolver() { - super(null, null, null, null); - } - - @Override - public void resolveQueryBuilders( - LogicalPlan plan, - ActionListener listener, - BiConsumer> callback - ) { - callback.accept(plan, listener); - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 81f63fd9d37a6..611516fc55342 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -21,10 +21,10 @@ import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.session.Result; import org.elasticsearch.xpack.esql.telemetry.Metrics; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; @@ -62,7 +62,7 @@ public void esql( EsqlExecutionInfo executionInfo, IndicesExpressionGrouper indicesExpressionGrouper, EsqlSession.PlanRunner planRunner, - QueryBuilderResolver queryBuilderResolver, + TransportActionServices services, ActionListener listener ) { final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry); @@ -78,7 +78,7 @@ public void esql( verifier, planTelemetry, indicesExpressionGrouper, - queryBuilderResolver + services ); QueryMetric clientId = QueryMetric.fromString("rest"); metrics.total(clientId); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java index 32a350ac7351e..deec3e382f899 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.esql.expression.function.fulltext; -import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator; import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator.ShardConfig; import org.elasticsearch.compute.operator.EvalOperator; @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.TranslatorHandler; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor; import org.elasticsearch.xpack.esql.querydsl.query.TranslationAwareExpressionQuery; import java.util.List; @@ -56,7 +57,12 @@ * These functions needs to be pushed down to Lucene queries to be executed - there's no Evaluator for them, but depend on * {@link org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer} to rewrite them into Lucene queries. */ -public abstract class FullTextFunction extends Function implements TranslationAware, PostAnalysisPlanVerificationAware, EvaluatorMapper { +public abstract class FullTextFunction extends Function + implements + TranslationAware, + PostAnalysisPlanVerificationAware, + EvaluatorMapper, + MappingPreProcessor.MappingPreProcessorSupplier { private final Expression query; private final QueryBuilder queryBuilder; @@ -110,11 +116,12 @@ public Expression query() { */ public Object queryAsObject() { Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */); - if (queryAsObject instanceof BytesRef bytesRef) { - return bytesRef.utf8ToString(); - } + return BytesRefs.toString(queryAsObject); + } - return queryAsObject; + @Override + public MappingPreProcessor mappingPreProcessor() { + return FullTextFunctionMappingPreprocessor.INSTANCE; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMappingPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMappingPreprocessor.java new file mode 100644 index 0000000000000..ec2ecd2919440 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMappingPreprocessor.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.fulltext; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ResolvedIndices; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.planner.TranslatorHandler; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; +import org.elasticsearch.xpack.esql.session.IndexResolver; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match} + * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator. + * {@link FullTextFunctionMappingPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by + * replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s. + */ +public final class FullTextFunctionMappingPreprocessor implements MappingPreProcessor { + + public static final FullTextFunctionMappingPreprocessor INSTANCE = new FullTextFunctionMappingPreprocessor(); + + private FullTextFunctionMappingPreprocessor() {} + + @Override + public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener) { + Rewriteable.rewriteAndFetch( + new FullTextFunctionsRewritable(plan), + queryRewriteContext(services, indexNames(plan)), + listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan)) + ); + } + + private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set indexNames) { + ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions( + indexNames.toArray(String[]::new), + IndexResolver.FIELD_CAPS_INDICES_OPTIONS, + services.clusterService().state(), + services.indexNameExpressionResolver(), + services.transportService().getRemoteClusterService(), + System.currentTimeMillis() + ); + + return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null); + } + + public Set indexNames(LogicalPlan plan) { + Set indexNames = new HashSet<>(); + plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteIndices())); + return indexNames; + } + + private record FullTextFunctionsRewritable(LogicalPlan plan) + implements + Rewriteable { + @Override + public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException { + Holder exceptionHolder = new Holder<>(); + Holder updated = new Holder<>(false); + LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> { + QueryBuilder builder = f.queryBuilder(), initial = builder; + builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder; + try { + builder = builder.rewrite(ctx); + } catch (IOException e) { + exceptionHolder.trySet(e); + } + var rewritten = builder != initial; + updated.set(updated.get() || rewritten); + return rewritten ? f.replaceQueryBuilder(builder) : f; + }); + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java new file mode 100644 index 0000000000000..4542df645bf60 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; + +/** + * Interface for a LogicalPlan processing rule occurring after the optimization, but before mapping to a physical plan. + * This step occurs on the coordinator. The rule may use services provided to the transport action and thus can resolve indices, rewrite + * queries, perform substitutions, etc. + * Note that the LogicalPlan following the rules' changes will not undergo another logical optimization round. The changes these rules + * should apply are only those that require access to services that need to be performed asynchronously. + */ +public interface MappingPreProcessor { + + /** + * Process a logical plan making use of the available services and provide the updated plan to the provided listener. + * @param plan the logical plan to process + * @param services the services available from the transport action + * @param listener the listener to notify when processing is complete + */ + void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener); + + interface MappingPreProcessorSupplier { + MappingPreProcessor mappingPreProcessor(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreprocessorExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreprocessorExecutor.java new file mode 100644 index 0000000000000..690e29a784c06 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreprocessorExecutor.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class MappingPreprocessorExecutor { + + private final TransportActionServices services; + + public MappingPreprocessorExecutor(TransportActionServices services) { + this.services = services; + } + + public void execute(LogicalPlan plan, ActionListener listener) { + execute(plan, preprocessors(plan), 0, listener); + } + + private static List preprocessors(LogicalPlan plan) { + Set preprocessors = new HashSet<>(); + plan.forEachExpressionDown(e -> { + if (e instanceof MappingPreProcessor.MappingPreProcessorSupplier supplier) { + preprocessors.add(supplier.mappingPreProcessor()); + } + }); + return List.copyOf(preprocessors); + } + + private void execute(LogicalPlan plan, List preprocessors, int index, ActionListener listener) { + if (index == preprocessors.size()) { + listener.onResponse(plan); + } else { + preprocessors.get(index) + .preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, preprocessors, index + 1, l))); + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java new file mode 100644 index 0000000000000..ad112542e000a --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.usage.UsageService; + +public record TransportActionServices( + TransportService transportService, + SearchService searchService, + ExchangeService exchangeService, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + UsageService usageService +) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index a32b4591943f4..b3a2c403137f3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -53,7 +53,6 @@ import org.elasticsearch.xpack.esql.execution.PlanExecutor; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.session.Result; import java.io.IOException; @@ -81,8 +80,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction asyncTaskManagementService; private final RemoteClusterService remoteClusterService; - private final QueryBuilderResolver queryBuilderResolver; private final UsageService usageService; + private final TransportActionServices services; // Listeners for active async queries, key being the async task execution ID private final Map asyncListeners = ConcurrentCollections.newConcurrentMap(); @@ -153,8 +152,16 @@ public TransportEsqlQueryAction( bigArrays ); this.remoteClusterService = transportService.getRemoteClusterService(); - this.queryBuilderResolver = new QueryBuilderResolver(searchService, clusterService, transportService, indexNameExpressionResolver); this.usageService = usageService; + + this.services = new TransportActionServices( + transportService, + searchService, + exchangeService, + clusterService, + indexNameExpressionResolver, + usageService + ); } @Override @@ -258,7 +265,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { recordCCSTelemetry(task, executionInfo, request, null); listener.onResponse(toResponse(task, request, configuration, result)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 94bf414da1b9d..19e4b0a185439 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -73,6 +73,8 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreprocessorExecutor; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import java.util.ArrayList; @@ -114,7 +116,7 @@ public interface PlanRunner { private final PhysicalPlanOptimizer physicalPlanOptimizer; private final PlanTelemetry planTelemetry; private final IndicesExpressionGrouper indicesExpressionGrouper; - private final QueryBuilderResolver queryBuilderResolver; + private final MappingPreprocessorExecutor mappingPreprocessorExecutor; public EsqlSession( String sessionId, @@ -128,7 +130,7 @@ public EsqlSession( Verifier verifier, PlanTelemetry planTelemetry, IndicesExpressionGrouper indicesExpressionGrouper, - QueryBuilderResolver queryBuilderResolver + TransportActionServices services ) { this.sessionId = sessionId; this.configuration = configuration; @@ -142,7 +144,7 @@ public EsqlSession( this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); this.planTelemetry = planTelemetry; this.indicesExpressionGrouper = indicesExpressionGrouper; - this.queryBuilderResolver = queryBuilderResolver; + this.mappingPreprocessorExecutor = new MappingPreprocessorExecutor(services); } public String sessionId() { @@ -162,21 +164,25 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - try { - var optimizedPlan = optimizedPlan(analyzedPlan); - queryBuilderResolver.resolveQueryBuilders( - optimizedPlan, - listener, - (newPlan, next) -> executeOptimizedPlan(request, executionInfo, planRunner, newPlan, next) - ); - } catch (Exception e) { - listener.onFailure(e); - } + preMapping(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener); } } ); } + public void preMapping( + EsqlQueryRequest request, + EsqlExecutionInfo executionInfo, + PlanRunner planRunner, + LogicalPlan optimizedPlan, + ActionListener listener + ) { + mappingPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> { + p.setOptimized(); // might have been updated by the preprocessor + executeOptimizedPlan(request, executionInfo, planRunner, p, listener); + })); + } + /** * Execute an analyzed plan. Most code should prefer calling {@link #execute} but * this is public for testing. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index bae20bb9b26d3..340d5a00e80b7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -516,7 +516,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { TEST_VERIFIER, new PlanTelemetry(functionRegistry), null, - EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES ); TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index 4c2913031271f..aa735e5cb6d86 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -125,7 +125,7 @@ public void testFailedMetric() { new EsqlExecutionInfo(randomBoolean()), groupIndicesByCluster, runPhase, - EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER, + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override public void onResponse(Result result) { @@ -156,7 +156,7 @@ public void onFailure(Exception e) { new EsqlExecutionInfo(randomBoolean()), groupIndicesByCluster, runPhase, - EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER, + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override public void onResponse(Result result) {} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java index 285739fe0936f..eafdb6366afd4 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java @@ -332,11 +332,12 @@ private static String getInferenceIdForForField(Collection indexM protected boolean doEquals(SemanticQueryBuilder other) { return Objects.equals(fieldName, other.fieldName) && Objects.equals(query, other.query) - && Objects.equals(inferenceResults, other.inferenceResults); + && Objects.equals(inferenceResults, other.inferenceResults) + && Objects.equals(inferenceResultsSupplier, other.inferenceResultsSupplier); } @Override protected int doHashCode() { - return Objects.hash(fieldName, query, inferenceResults); + return Objects.hash(fieldName, query, inferenceResults, inferenceResultsSupplier); } }