From b09553aa15aaa7f80c13c0df79d6b20f89faed1c Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Fri, 17 Jan 2025 12:42:39 +0100 Subject: [PATCH 1/9] Add a pre-mapping logical plan processing step This adds a processing step that runs rules which can modify an optimized LogicalPlan by executing async calls to the services avaialble to ESQL's transport query action. The rules need to implement a new interface (`MappingPreProcessor`). The resulting plan won't be further optimized logically, but mapped and further processed as a physical plan. --- .../xpack/esql/EsqlTestUtils.java | 17 ++- .../xpack/esql/MockQueryBuilderResolver.java | 30 ---- .../xpack/esql/execution/PlanExecutor.java | 6 +- .../function/fulltext/FullTextFunction.java | 8 +- .../FullTextFunctionMapperPreprocessor.java | 140 ++++++++++++++++++ .../MapperPreprocessorExecutor.java | 59 ++++++++ .../preprocessor/MappingPreProcessor.java | 30 ++++ .../esql/plugin/TransportActionServices.java | 24 +++ .../esql/plugin/TransportEsqlQueryAction.java | 15 +- .../xpack/esql/session/EsqlSession.java | 42 +++--- .../elasticsearch/xpack/esql/CsvTests.java | 2 +- .../esql/stats/PlanExecutorMetricsTests.java | 4 +- 12 files changed, 311 insertions(+), 66 deletions(-) delete mode 100644 x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 7e25fb29fdb78..011072942f81b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -11,6 +11,8 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; @@ -31,9 +33,12 @@ import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.index.IndexMode; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; @@ -71,8 +76,8 @@ import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.session.Configuration; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.stats.Metrics; import org.elasticsearch.xpack.esql.stats.SearchStats; import org.elasticsearch.xpack.versionfield.Version; @@ -138,6 +143,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; public final class EsqlTestUtils { @@ -358,7 +364,14 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L)); - public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver(); + public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices( + mock(TransportService.class), + mock(SearchService.class), + null, + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + null + ); private EsqlTestUtils() {} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java deleted file mode 100644 index 7af3a89108fc0..0000000000000 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; -import org.elasticsearch.xpack.esql.session.Result; - -import java.util.function.BiConsumer; - -public class MockQueryBuilderResolver extends QueryBuilderResolver { - public MockQueryBuilderResolver() { - super(null, null, null, null); - } - - @Override - public void resolveQueryBuilders( - LogicalPlan plan, - ActionListener listener, - BiConsumer> callback - ) { - callback.accept(plan, listener); - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 94913581f696d..c9aea94e1d36c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -21,10 +21,10 @@ import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.session.Result; import org.elasticsearch.xpack.esql.stats.Metrics; import org.elasticsearch.xpack.esql.stats.PlanningMetrics; @@ -62,7 +62,7 @@ public void esql( EsqlExecutionInfo executionInfo, IndicesExpressionGrouper indicesExpressionGrouper, EsqlSession.PlanRunner planRunner, - QueryBuilderResolver queryBuilderResolver, + TransportActionServices services, ActionListener listener ) { final PlanningMetrics planningMetrics = new PlanningMetrics(); @@ -78,7 +78,7 @@ public void esql( verifier, planningMetrics, indicesExpressionGrouper, - queryBuilderResolver + services ); QueryMetric clientId = QueryMetric.fromString("rest"); metrics.total(clientId); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java index 4da7c01139c24..a8e8420ca6bc6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.esql.expression.function.fulltext; -import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; import org.elasticsearch.xpack.esql.common.Failures; @@ -113,11 +113,7 @@ public Expression query() { */ public Object queryAsObject() { Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */); - if (queryAsObject instanceof BytesRef bytesRef) { - return bytesRef.utf8ToString(); - } - - return queryAsObject; + return BytesRefs.toString(queryAsObject); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java new file mode 100644 index 0000000000000..1d44ce7ab71af --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ResolvedIndices; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; +import org.elasticsearch.xpack.esql.session.IndexResolver; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match} + * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator. + * {@link FullTextFunctionMapperPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by + * replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s. + */ +public class FullTextFunctionMapperPreprocessor implements MappingPreProcessor { + + @Override + public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener) { + Set unresolved = fullTextFunctions(plan); + Set indexNames = indexNames(plan); + + if (indexNames == null || indexNames.isEmpty() || unresolved.isEmpty()) { + listener.onResponse(plan); + return; + } + QueryRewriteContext ctx = queryRewriteContext(services, indexNames); + FullTextFunctionsRewritable rewritable = new FullTextFunctionsRewritable(unresolved); + Rewriteable.rewriteAndFetch(rewritable, ctx, new ActionListener<>() { + @Override + public void onResponse(FullTextFunctionsRewritable fullTextFunctionsRewritable) { + try { + LogicalPlan newPlan = planWithResolvedQueryBuilders(plan, fullTextFunctionsRewritable.results()); + listener.onResponse(newPlan); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + public LogicalPlan planWithResolvedQueryBuilders(LogicalPlan plan, Map newQueryBuilders) { + LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, m -> { + if (newQueryBuilders.containsKey(m)) { + return m.replaceQueryBuilder(newQueryBuilders.get(m)); + } + return m; + }); + // The given plan was already analyzed and optimized, so we set the resulted plan to optimized as well. + newPlan.setOptimized(); + return newPlan; + } + + private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set indexNames) { + ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions( + indexNames.toArray(String[]::new), + IndexResolver.FIELD_CAPS_INDICES_OPTIONS, + services.clusterService().state(), + services.indexNameExpressionResolver(), + services.transportService().getRemoteClusterService(), + System.currentTimeMillis() + ); + + return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null); + } + + private static Set fullTextFunctions(LogicalPlan plan) { + Set functions = new HashSet<>(); + plan.forEachExpressionDown(FullTextFunction.class, functions::add); + return functions; + } + + public Set indexNames(LogicalPlan plan) { + Set indexNames = new HashSet<>(); + plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.index().concreteIndices())); + return indexNames; + } + + private static class FullTextFunctionsRewritable + implements + Rewriteable { + + private final Map queryBuilderMap; + + FullTextFunctionsRewritable(Map queryBuilderMap) { + this.queryBuilderMap = queryBuilderMap; + } + + FullTextFunctionsRewritable(Set functions) { + this.queryBuilderMap = new HashMap<>(); + + for (FullTextFunction func : functions) { + queryBuilderMap.put(func, func.asQuery(PlannerUtils.TRANSLATOR_HANDLER).asBuilder()); + } + } + + @Override + public FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException { + Map results = new HashMap<>(); + + boolean hasChanged = false; + for (var entry : queryBuilderMap.entrySet()) { + var initial = entry.getValue(); + var rewritten = initial.rewrite(ctx); + hasChanged |= rewritten != initial; + + results.put(entry.getKey(), rewritten); + } + + return hasChanged ? new FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable(results) : this; + } + + public Map results() { + return queryBuilderMap; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java new file mode 100644 index 0000000000000..8d3a889e4bd07 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class MapperPreprocessorExecutor { + + private final TransportActionServices services; + private final List proprocessors = new ArrayList<>(); + + public MapperPreprocessorExecutor(TransportActionServices services) { + this.services = services; + } + + public MapperPreprocessorExecutor addPreprocessor(MappingPreProcessor preProcessor) { + proprocessors.add(preProcessor); + return this; + } + + public MapperPreprocessorExecutor addPreprocessors(Collection preProcessors) { + proprocessors.addAll(preProcessors); + return this; + } + + public void execute(LogicalPlan plan, ActionListener listener) { + execute(plan, 0, listener); + } + + private void execute(LogicalPlan plan, int index, ActionListener listener) { + if (index == proprocessors.size()) { + listener.onResponse(plan); + return; + } + + proprocessors.get(index).preprocess(plan, services, new ActionListener<>() { + @Override + public void onResponse(LogicalPlan p) { + execute(p, index + 1, listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java new file mode 100644 index 0000000000000..cc918a836ec81 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; + +/** + * Interface for a LogicalPlan processing rule occurring after the optimization, but before mapping to a physical plan. + * This step occurs on the coordinator. The rule may use services provided to the transport action and thus can resolve indices, rewrite + * queries, perform substitutions, etc. + * Note that the LogicalPlan following the rules' changes will not undergo another logical optimization round. The changes these rules + * should apply are only those that require access to services that need to be performed asynchronously. + */ +public interface MappingPreProcessor { + + /** + * Process a logical plan making use of the available services and provide the updated plan to the provided listener. + * @param plan the logical plan to process + * @param services the services available from the transport action + * @param listener the listener to notify when processing is complete + */ + void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener); +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java new file mode 100644 index 0000000000000..ad112542e000a --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.usage.UsageService; + +public record TransportActionServices( + TransportService transportService, + SearchService searchService, + ExchangeService exchangeService, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + UsageService usageService +) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 84173eeecc060..3d6dc0dc83e3e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.xpack.esql.execution.PlanExecutor; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.session.Result; import java.io.IOException; @@ -78,8 +77,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction asyncTaskManagementService; private final RemoteClusterService remoteClusterService; - private final QueryBuilderResolver queryBuilderResolver; private final UsageService usageService; + private final TransportActionServices services; @Inject @SuppressWarnings("this-escape") @@ -134,8 +133,16 @@ public TransportEsqlQueryAction( bigArrays ); this.remoteClusterService = transportService.getRemoteClusterService(); - this.queryBuilderResolver = new QueryBuilderResolver(searchService, clusterService, transportService, indexNameExpressionResolver); this.usageService = usageService; + + this.services = new TransportActionServices( + transportService, + searchService, + exchangeService, + clusterService, + indexNameExpressionResolver, + usageService + ); } @Override @@ -209,7 +216,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { recordCCSTelemetry(task, executionInfo, request, null); listener.onResponse(toResponse(task, request, configuration, result)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b10f766babb36..f61df90dbc5b0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -73,6 +73,9 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.FullTextFunctionMapperPreprocessor; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MapperPreprocessorExecutor; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.stats.PlanningMetrics; import java.util.ArrayList; @@ -114,7 +117,7 @@ public interface PlanRunner { private final PhysicalPlanOptimizer physicalPlanOptimizer; private final PlanningMetrics planningMetrics; private final IndicesExpressionGrouper indicesExpressionGrouper; - private final QueryBuilderResolver queryBuilderResolver; + private final MapperPreprocessorExecutor mapperPreprocessorExecutor; public EsqlSession( String sessionId, @@ -128,7 +131,7 @@ public EsqlSession( Verifier verifier, PlanningMetrics planningMetrics, IndicesExpressionGrouper indicesExpressionGrouper, - QueryBuilderResolver queryBuilderResolver + TransportActionServices services ) { this.sessionId = sessionId; this.configuration = configuration; @@ -142,7 +145,9 @@ public EsqlSession( this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); this.planningMetrics = planningMetrics; this.indicesExpressionGrouper = indicesExpressionGrouper; - this.queryBuilderResolver = queryBuilderResolver; + this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services); + + mapperPreprocessorExecutor.addPreprocessor(new FullTextFunctionMapperPreprocessor()); } public String sessionId() { @@ -162,16 +167,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - try { - var optimizedPlan = optimizedPlan(analyzedPlan); - queryBuilderResolver.resolveQueryBuilders( - optimizedPlan, - listener, - (newPlan, next) -> executeOptimizedPlan(request, executionInfo, planRunner, newPlan, next) - ); - } catch (Exception e) { - listener.onFailure(e); - } + executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener); } } ); @@ -188,11 +184,21 @@ public void executeOptimizedPlan( LogicalPlan optimizedPlan, ActionListener listener ) { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); - // TODO: this could be snuck into the underlying listener - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); - // execute any potential subplans - executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); + mapperPreprocessorExecutor.execute(optimizedPlan, new ActionListener<>() { + @Override + public void onResponse(LogicalPlan preprocessedPlan) { + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(preprocessedPlan, request); + // TODO: this could be snuck into the underlying listener + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + // execute any potential subplans + executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index ed1ee71ff1968..8afe8bb00ba6b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -516,7 +516,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { TEST_VERIFIER, new PlanningMetrics(), null, - EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES ); TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java index a3c5cd9168b4f..12db9ad96e2d9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java @@ -125,7 +125,7 @@ public void testFailedMetric() { new EsqlExecutionInfo(randomBoolean()), groupIndicesByCluster, runPhase, - EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER, + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override public void onResponse(Result result) { @@ -156,7 +156,7 @@ public void onFailure(Exception e) { new EsqlExecutionInfo(randomBoolean()), groupIndicesByCluster, runPhase, - EsqlTestUtils.MOCK_QUERY_BUILDER_RESOLVER, + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @Override public void onResponse(Result result) {} From cbddbc6b5e7701f992435ddebaef46b8c751e332 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Fri, 17 Jan 2025 12:49:19 +0100 Subject: [PATCH 2/9] Update docs/changelog/120368.yaml --- docs/changelog/120368.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/120368.yaml diff --git a/docs/changelog/120368.yaml b/docs/changelog/120368.yaml new file mode 100644 index 0000000000000..f51b42e3f241a --- /dev/null +++ b/docs/changelog/120368.yaml @@ -0,0 +1,5 @@ +pr: 120368 +summary: Add a pre-mapping logical plan processing step +area: ES|QL +type: enhancement +issues: [] From baec4eec67c0b1e699ea1a65d54144e764f2ba0f Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 17 Jan 2025 11:55:13 +0000 Subject: [PATCH 3/9] [CI] Auto commit changes from spotless --- .../main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 011072942f81b..49d5c397d06fc 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -36,7 +36,6 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.json.JsonXContent; From b17f6e60aafa450005ca234c5d6e7b26e5e97664 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Mon, 20 Jan 2025 14:08:56 +0100 Subject: [PATCH 4/9] Minor listerns refactoring --- .../FullTextFunctionMapperPreprocessor.java | 23 ++++----------- .../MapperPreprocessorExecutor.java | 15 ++-------- .../xpack/esql/session/EsqlSession.java | 28 +++++++------------ 3 files changed, 18 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java index 1d44ce7ab71af..9dc1c1d0f154d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java @@ -44,25 +44,14 @@ public void preprocess(LogicalPlan plan, TransportActionServices services, Actio } QueryRewriteContext ctx = queryRewriteContext(services, indexNames); FullTextFunctionsRewritable rewritable = new FullTextFunctionsRewritable(unresolved); - Rewriteable.rewriteAndFetch(rewritable, ctx, new ActionListener<>() { - @Override - public void onResponse(FullTextFunctionsRewritable fullTextFunctionsRewritable) { - try { - LogicalPlan newPlan = planWithResolvedQueryBuilders(plan, fullTextFunctionsRewritable.results()); - listener.onResponse(newPlan); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + Rewriteable.rewriteAndFetch( + rewritable, + ctx, + listener.delegateFailureAndWrap((l, r) -> l.onResponse(updateQueryBuilders(plan, r.results()))) + ); } - public LogicalPlan planWithResolvedQueryBuilders(LogicalPlan plan, Map newQueryBuilders) { + public LogicalPlan updateQueryBuilders(LogicalPlan plan, Map newQueryBuilders) { LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, m -> { if (newQueryBuilders.containsKey(m)) { return m.replaceQueryBuilder(newQueryBuilders.get(m)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java index 8d3a889e4bd07..b7a942373e06b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java @@ -41,19 +41,8 @@ public void execute(LogicalPlan plan, ActionListener listener) { private void execute(LogicalPlan plan, int index, ActionListener listener) { if (index == proprocessors.size()) { listener.onResponse(plan); - return; + } else { + proprocessors.get(index).preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, index + 1, l))); } - - proprocessors.get(index).preprocess(plan, services, new ActionListener<>() { - @Override - public void onResponse(LogicalPlan p) { - execute(p, index + 1, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index f61df90dbc5b0..d652c43b98eb8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -145,9 +145,9 @@ public EsqlSession( this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); this.planningMetrics = planningMetrics; this.indicesExpressionGrouper = indicesExpressionGrouper; - this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services); - - mapperPreprocessorExecutor.addPreprocessor(new FullTextFunctionMapperPreprocessor()); + this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services).addPreprocessor( + new FullTextFunctionMapperPreprocessor() + ); } public String sessionId() { @@ -184,21 +184,13 @@ public void executeOptimizedPlan( LogicalPlan optimizedPlan, ActionListener listener ) { - mapperPreprocessorExecutor.execute(optimizedPlan, new ActionListener<>() { - @Override - public void onResponse(LogicalPlan preprocessedPlan) { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(preprocessedPlan, request); - // TODO: this could be snuck into the underlying listener - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); - // execute any potential subplans - executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + mapperPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> { + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(p, request); + // TODO: this could be snuck into the underlying listener + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + // execute any potential subplans + executeSubPlans(physicalPlan, planRunner, executionInfo, request, l); + })); } private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} From 8fbd0fcc54f788919acbc4b6c6cabecf3d6e39dd Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Mon, 20 Jan 2025 14:28:00 +0100 Subject: [PATCH 5/9] Fix wrong auto-merge --- .../preprocessor/FullTextFunctionMapperPreprocessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java index 9dc1c1d0f154d..a4a93f39981c9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java @@ -15,7 +15,7 @@ import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.planner.TranslatorHandler; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.session.IndexResolver; @@ -102,7 +102,7 @@ private static class FullTextFunctionsRewritable this.queryBuilderMap = new HashMap<>(); for (FullTextFunction func : functions) { - queryBuilderMap.put(func, func.asQuery(PlannerUtils.TRANSLATOR_HANDLER).asBuilder()); + queryBuilderMap.put(func, func.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder()); } } From 62a586276a16cbd4bbb89cefda50b0841d980339 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Mon, 27 Jan 2025 17:53:59 +0100 Subject: [PATCH 6/9] Have the FTFMapperPreprocessor update the plan in-place --- .../xpack/esql/core/util/Holder.java | 13 ++ .../esql/capabilities/TranslationAware.java | 5 + .../function/fulltext/FullTextFunction.java | 32 ++-- .../FullTextFunctionMapperPreprocessor.java | 90 ++++++++++ .../FullTextFunctionMapperPreprocessor.java | 129 ------------- .../MapperPreprocessorExecutor.java | 33 ++-- .../xpack/esql/session/EsqlSession.java | 32 ++-- .../esql/session/QueryBuilderResolver.java | 169 ------------------ .../queries/SemanticQueryBuilder.java | 5 +- 9 files changed, 159 insertions(+), 349 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java index 1290bbca59ee7..a6f5aaae1731f 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java @@ -26,6 +26,19 @@ public void set(T value) { this.value = value; } + /** + * Sets a value in the holder, but only if none has already been set. + * @param value the new value to set. + * @return the previously held value, if any was set. + */ + public T trySet(T value) { + T old = this.value; + if (old == null) { + this.value = value; + } + return old; + } + public T get() { return value; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java index 8ef528b6668ab..5228bfe5170ed 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.esql.core.querydsl.query.Query; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; import org.elasticsearch.xpack.esql.planner.TranslatorHandler; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor; /** * Expressions implementing this interface can get called on data nodes to provide an Elasticsearch/Lucene query. @@ -42,4 +43,8 @@ interface SingleValueTranslationAware extends TranslationAware { */ Expression singleValueField(); } + + interface QueryRewriter extends TranslationAware { + MappingPreProcessor queryRewriter(); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java index ed59191e092b6..2d690550d4bf1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.planner.TranslatorHandler; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor; import org.elasticsearch.xpack.esql.querydsl.query.TranslationAwareExpressionQuery; import java.util.List; @@ -50,7 +51,11 @@ * These functions needs to be pushed down to Lucene queries to be executed - there's no Evaluator for them, but depend on * {@link org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer} to rewrite them into Lucene queries. */ -public abstract class FullTextFunction extends Function implements TranslationAware, PostAnalysisPlanVerificationAware { +public abstract class FullTextFunction extends Function + implements + TranslationAware, + TranslationAware.QueryRewriter, + PostAnalysisPlanVerificationAware { private final Expression query; private final QueryBuilder queryBuilder; @@ -116,6 +121,11 @@ public Object queryAsObject() { return BytesRefs.toString(queryAsObject); } + @Override + public MappingPreProcessor queryRewriter() { + return FullTextFunctionMapperPreprocessor.INSTANCE; + } + /** * Returns the param ordinal for the query parameter so it can be used in error messages * @@ -279,26 +289,6 @@ private static boolean onlyFullTextFunctionsInExpression(Expression expression) return false; } - /** - * Checks whether an expression contains a full text function as part of it - * - * @param expression expression to check - * @return true if the expression or any of its children is a full text function, false otherwise - */ - private static boolean anyFullTextFunctionsInExpression(Expression expression) { - if (expression instanceof FullTextFunction) { - return true; - } - - for (Expression child : expression.children()) { - if (anyFullTextFunctionsInExpression(child)) { - return true; - } - } - - return false; - } - /** * Checks all commands that exist before a specific type satisfy conditions. * diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java new file mode 100644 index 0000000000000..9c1f6cb33aa19 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.fulltext; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ResolvedIndices; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.planner.TranslatorHandler; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor; +import org.elasticsearch.xpack.esql.plugin.TransportActionServices; +import org.elasticsearch.xpack.esql.session.IndexResolver; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match} + * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator. + * {@link FullTextFunctionMapperPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by + * replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s. + */ +public final class FullTextFunctionMapperPreprocessor implements MappingPreProcessor { + + public static final FullTextFunctionMapperPreprocessor INSTANCE = new FullTextFunctionMapperPreprocessor(); + + @Override + public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener) { + Rewriteable.rewriteAndFetch( + new FullTextFunctionsRewritable(plan), + queryRewriteContext(services, indexNames(plan)), + listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan)) + ); + } + + private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set indexNames) { + ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions( + indexNames.toArray(String[]::new), + IndexResolver.FIELD_CAPS_INDICES_OPTIONS, + services.clusterService().state(), + services.indexNameExpressionResolver(), + services.transportService().getRemoteClusterService(), + System.currentTimeMillis() + ); + + return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null); + } + + public Set indexNames(LogicalPlan plan) { + Set indexNames = new HashSet<>(); + plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.index().concreteIndices())); + return indexNames; + } + + private record FullTextFunctionsRewritable(LogicalPlan plan) + implements + Rewriteable { + @Override + public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException { + Holder exceptionHolder = new Holder<>(); + Holder updated = new Holder<>(false); + LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> { + QueryBuilder builder = f.queryBuilder(), initial = builder; + builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder; + try { + builder = builder.rewrite(ctx); + } catch (IOException e) { + exceptionHolder.trySet(e); + } + var rewrite = builder != initial; + updated.set(updated.get() || rewrite); + return rewrite ? f.replaceQueryBuilder(builder) : f; + }); + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java deleted file mode 100644 index a4a93f39981c9..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/FullTextFunctionMapperPreprocessor.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ResolvedIndices; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryRewriteContext; -import org.elasticsearch.index.query.Rewriteable; -import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; -import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.planner.TranslatorHandler; -import org.elasticsearch.xpack.esql.plugin.TransportActionServices; -import org.elasticsearch.xpack.esql.session.IndexResolver; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match} - * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator. - * {@link FullTextFunctionMapperPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by - * replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s. - */ -public class FullTextFunctionMapperPreprocessor implements MappingPreProcessor { - - @Override - public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener) { - Set unresolved = fullTextFunctions(plan); - Set indexNames = indexNames(plan); - - if (indexNames == null || indexNames.isEmpty() || unresolved.isEmpty()) { - listener.onResponse(plan); - return; - } - QueryRewriteContext ctx = queryRewriteContext(services, indexNames); - FullTextFunctionsRewritable rewritable = new FullTextFunctionsRewritable(unresolved); - Rewriteable.rewriteAndFetch( - rewritable, - ctx, - listener.delegateFailureAndWrap((l, r) -> l.onResponse(updateQueryBuilders(plan, r.results()))) - ); - } - - public LogicalPlan updateQueryBuilders(LogicalPlan plan, Map newQueryBuilders) { - LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, m -> { - if (newQueryBuilders.containsKey(m)) { - return m.replaceQueryBuilder(newQueryBuilders.get(m)); - } - return m; - }); - // The given plan was already analyzed and optimized, so we set the resulted plan to optimized as well. - newPlan.setOptimized(); - return newPlan; - } - - private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set indexNames) { - ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions( - indexNames.toArray(String[]::new), - IndexResolver.FIELD_CAPS_INDICES_OPTIONS, - services.clusterService().state(), - services.indexNameExpressionResolver(), - services.transportService().getRemoteClusterService(), - System.currentTimeMillis() - ); - - return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null); - } - - private static Set fullTextFunctions(LogicalPlan plan) { - Set functions = new HashSet<>(); - plan.forEachExpressionDown(FullTextFunction.class, functions::add); - return functions; - } - - public Set indexNames(LogicalPlan plan) { - Set indexNames = new HashSet<>(); - plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.index().concreteIndices())); - return indexNames; - } - - private static class FullTextFunctionsRewritable - implements - Rewriteable { - - private final Map queryBuilderMap; - - FullTextFunctionsRewritable(Map queryBuilderMap) { - this.queryBuilderMap = queryBuilderMap; - } - - FullTextFunctionsRewritable(Set functions) { - this.queryBuilderMap = new HashMap<>(); - - for (FullTextFunction func : functions) { - queryBuilderMap.put(func, func.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder()); - } - } - - @Override - public FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException { - Map results = new HashMap<>(); - - boolean hasChanged = false; - for (var entry : queryBuilderMap.entrySet()) { - var initial = entry.getValue(); - var rewritten = initial.rewrite(ctx); - hasChanged |= rewritten != initial; - - results.put(entry.getKey(), rewritten); - } - - return hasChanged ? new FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable(results) : this; - } - - public Map results() { - return queryBuilderMap; - } - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java index b7a942373e06b..ea38b3d4e4ebf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java @@ -8,41 +8,42 @@ package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.capabilities.TranslationAware; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; -import java.util.ArrayList; -import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class MapperPreprocessorExecutor { private final TransportActionServices services; - private final List proprocessors = new ArrayList<>(); public MapperPreprocessorExecutor(TransportActionServices services) { this.services = services; } - public MapperPreprocessorExecutor addPreprocessor(MappingPreProcessor preProcessor) { - proprocessors.add(preProcessor); - return this; - } - - public MapperPreprocessorExecutor addPreprocessors(Collection preProcessors) { - proprocessors.addAll(preProcessors); - return this; + public void execute(LogicalPlan plan, ActionListener listener) { + execute(plan, queryRewriters(plan), 0, listener); } - public void execute(LogicalPlan plan, ActionListener listener) { - execute(plan, 0, listener); + private static List queryRewriters(LogicalPlan plan) { + Set queryRewriters = new HashSet<>(); + plan.forEachExpressionDown(e -> { + if (e instanceof TranslationAware.QueryRewriter qr) { + queryRewriters.add(qr.queryRewriter()); + } + }); + return List.copyOf(queryRewriters); } - private void execute(LogicalPlan plan, int index, ActionListener listener) { - if (index == proprocessors.size()) { + private void execute(LogicalPlan plan, List preprocessors, int index, ActionListener listener) { + if (index == preprocessors.size()) { listener.onResponse(plan); } else { - proprocessors.get(index).preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, index + 1, l))); + preprocessors.get(index) + .preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, preprocessors, index + 1, l))); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index d652c43b98eb8..bd891f1a94fce 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -73,7 +73,6 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; -import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.FullTextFunctionMapperPreprocessor; import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MapperPreprocessorExecutor; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.stats.PlanningMetrics; @@ -145,9 +144,7 @@ public EsqlSession( this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); this.planningMetrics = planningMetrics; this.indicesExpressionGrouper = indicesExpressionGrouper; - this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services).addPreprocessor( - new FullTextFunctionMapperPreprocessor() - ); + this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services); } public String sessionId() { @@ -167,12 +164,25 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener); + preMapping(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener); } } ); } + public void preMapping( + EsqlQueryRequest request, + EsqlExecutionInfo executionInfo, + PlanRunner planRunner, + LogicalPlan optimizedPlan, + ActionListener listener + ) { + mapperPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> { + p.setOptimized(); // might have been updated by the preprocessor + executeOptimizedPlan(request, executionInfo, planRunner, p, listener); + })); + } + /** * Execute an analyzed plan. Most code should prefer calling {@link #execute} but * this is public for testing. @@ -184,13 +194,11 @@ public void executeOptimizedPlan( LogicalPlan optimizedPlan, ActionListener listener ) { - mapperPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(p, request); - // TODO: this could be snuck into the underlying listener - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); - // execute any potential subplans - executeSubPlans(physicalPlan, planRunner, executionInfo, request, l); - })); + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); + // TODO: this could be snuck into the underlying listener + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + // execute any potential subplans + executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); } private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java deleted file mode 100644 index 91103ef286f72..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/QueryBuilderResolver.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.session; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ResolvedIndices; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryRewriteContext; -import org.elasticsearch.index.query.Rewriteable; -import org.elasticsearch.search.SearchService; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.esql.core.util.Holder; -import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; -import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; - -import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; - -/** - * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match} - * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator. - * {@link QueryBuilderResolver#resolveQueryBuilders(LogicalPlan, ActionListener, BiConsumer)} will rewrite the plan by replacing - * {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s. - */ -public class QueryBuilderResolver { - private final SearchService searchService; - private final ClusterService clusterService; - private final TransportService transportService; - private final IndexNameExpressionResolver indexNameExpressionResolver; - - public QueryBuilderResolver( - SearchService searchService, - ClusterService clusterService, - TransportService transportService, - IndexNameExpressionResolver indexNameExpressionResolver - ) { - this.searchService = searchService; - this.clusterService = clusterService; - this.transportService = transportService; - this.indexNameExpressionResolver = indexNameExpressionResolver; - } - - public void resolveQueryBuilders( - LogicalPlan plan, - ActionListener listener, - BiConsumer> callback - ) { - if (plan.optimized() == false) { - listener.onFailure(new IllegalStateException("Expected optimized plan before query builder rewrite.")); - return; - } - - Set unresolved = fullTextFunctions(plan); - Set indexNames = indexNames(plan); - - if (indexNames == null || indexNames.isEmpty() || unresolved.isEmpty()) { - callback.accept(plan, listener); - return; - } - QueryRewriteContext ctx = queryRewriteContext(indexNames); - FullTextFunctionsRewritable rewritable = new FullTextFunctionsRewritable(unresolved); - Rewriteable.rewriteAndFetch(rewritable, ctx, new ActionListener() { - @Override - public void onResponse(FullTextFunctionsRewritable fullTextFunctionsRewritable) { - try { - LogicalPlan newPlan = planWithResolvedQueryBuilders(plan, fullTextFunctionsRewritable.results()); - callback.accept(newPlan, listener); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - - private Set fullTextFunctions(LogicalPlan plan) { - Set functions = new HashSet<>(); - plan.forEachExpressionDown(FullTextFunction.class, func -> functions.add(func)); - return functions; - } - - public Set indexNames(LogicalPlan plan) { - Holder> indexNames = new Holder<>(); - - plan.forEachDown(EsRelation.class, esRelation -> { indexNames.set(esRelation.index().concreteIndices()); }); - - return indexNames.get(); - } - - public LogicalPlan planWithResolvedQueryBuilders(LogicalPlan plan, Map newQueryBuilders) { - LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, m -> { - if (newQueryBuilders.keySet().contains(m)) { - return m.replaceQueryBuilder(newQueryBuilders.get(m)); - } - return m; - }); - // The given plan was already analyzed and optimized, so we set the resulted plan to optimized as well. - newPlan.setOptimized(); - return newPlan; - } - - private QueryRewriteContext queryRewriteContext(Set indexNames) { - ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions( - indexNames.toArray(String[]::new), - IndexResolver.FIELD_CAPS_INDICES_OPTIONS, - clusterService.state(), - indexNameExpressionResolver, - transportService.getRemoteClusterService(), - System.currentTimeMillis() - ); - - return searchService.getRewriteContext(() -> System.currentTimeMillis(), resolvedIndices, null); - } - - private class FullTextFunctionsRewritable implements Rewriteable { - - private final Map queryBuilderMap; - - FullTextFunctionsRewritable(Map queryBuilderMap) { - this.queryBuilderMap = queryBuilderMap; - } - - FullTextFunctionsRewritable(Set functions) { - this.queryBuilderMap = new HashMap<>(); - - for (FullTextFunction func : functions) { - queryBuilderMap.put(func, TRANSLATOR_HANDLER.asQuery(func).asBuilder()); - } - } - - @Override - public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException { - Map results = new HashMap<>(); - - boolean hasChanged = false; - for (var entry : queryBuilderMap.entrySet()) { - var initial = entry.getValue(); - var rewritten = initial.rewrite(ctx); - hasChanged |= rewritten != initial; - - results.put(entry.getKey(), rewritten); - } - - return hasChanged ? new FullTextFunctionsRewritable(results) : this; - } - - public Map results() { - return queryBuilderMap; - } - } -} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java index 285739fe0936f..eafdb6366afd4 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/queries/SemanticQueryBuilder.java @@ -332,11 +332,12 @@ private static String getInferenceIdForForField(Collection indexM protected boolean doEquals(SemanticQueryBuilder other) { return Objects.equals(fieldName, other.fieldName) && Objects.equals(query, other.query) - && Objects.equals(inferenceResults, other.inferenceResults); + && Objects.equals(inferenceResults, other.inferenceResults) + && Objects.equals(inferenceResultsSupplier, other.inferenceResultsSupplier); } @Override protected int doHashCode() { - return Objects.hash(fieldName, query, inferenceResults); + return Objects.hash(fieldName, query, inferenceResults, inferenceResultsSupplier); } } From 6040f3a1f0d0bab30133ba581757141efa0ea660 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Tue, 28 Jan 2025 08:29:54 +0100 Subject: [PATCH 7/9] rename if'aces back --- .../function/fulltext/FullTextFunction.java | 6 +++--- .../preprocessor/MapperPreprocessorExecutor.java | 13 ++++++------- .../mapper/preprocessor/MappingPreProcessor.java | 4 ++++ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java index 2d690550d4bf1..f38d4967a76c5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java @@ -54,8 +54,8 @@ public abstract class FullTextFunction extends Function implements TranslationAware, - TranslationAware.QueryRewriter, - PostAnalysisPlanVerificationAware { + PostAnalysisPlanVerificationAware, + MappingPreProcessor.MappingPreProcessorSupplier { private final Expression query; private final QueryBuilder queryBuilder; @@ -122,7 +122,7 @@ public Object queryAsObject() { } @Override - public MappingPreProcessor queryRewriter() { + public MappingPreProcessor mappingPreProcessor() { return FullTextFunctionMapperPreprocessor.INSTANCE; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java index ea38b3d4e4ebf..10eca26107008 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.planner.mapper.preprocessor; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.xpack.esql.capabilities.TranslationAware; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; @@ -25,17 +24,17 @@ public MapperPreprocessorExecutor(TransportActionServices services) { } public void execute(LogicalPlan plan, ActionListener listener) { - execute(plan, queryRewriters(plan), 0, listener); + execute(plan, preprocessors(plan), 0, listener); } - private static List queryRewriters(LogicalPlan plan) { - Set queryRewriters = new HashSet<>(); + private static List preprocessors(LogicalPlan plan) { + Set preprocessors = new HashSet<>(); plan.forEachExpressionDown(e -> { - if (e instanceof TranslationAware.QueryRewriter qr) { - queryRewriters.add(qr.queryRewriter()); + if (e instanceof MappingPreProcessor.MappingPreProcessorSupplier supplier) { + preprocessors.add(supplier.mappingPreProcessor()); } }); - return List.copyOf(queryRewriters); + return List.copyOf(preprocessors); } private void execute(LogicalPlan plan, List preprocessors, int index, ActionListener listener) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java index cc918a836ec81..4542df645bf60 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreProcessor.java @@ -27,4 +27,8 @@ public interface MappingPreProcessor { * @param listener the listener to notify when processing is complete */ void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener); + + interface MappingPreProcessorSupplier { + MappingPreProcessor mappingPreProcessor(); + } } From aebbccfda9537e6eef7831aa9551ee26b5a42dd6 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 28 Jan 2025 07:55:04 +0000 Subject: [PATCH 8/9] [CI] Auto commit changes from spotless --- .../main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index bbafaa13be063..4cebadc22f65a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -77,7 +77,6 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.session.Configuration; -import org.elasticsearch.xpack.esql.session.QueryBuilderResolver; import org.elasticsearch.xpack.esql.stats.SearchStats; import org.elasticsearch.xpack.esql.telemetry.Metrics; import org.elasticsearch.xpack.versionfield.Version; From 364d7af8a9995e6c30162bfc4ad467d382bdcc60 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Thu, 30 Jan 2025 08:44:02 +0100 Subject: [PATCH 9/9] s/mapper/mapping --- .../esql/capabilities/TranslationAware.java | 5 ----- .../function/fulltext/FullTextFunction.java | 2 +- ... => FullTextFunctionMappingPreprocessor.java} | 16 +++++++++------- .../esql/expression/function/fulltext/Match.java | 2 -- ...tor.java => MappingPreprocessorExecutor.java} | 4 ++-- .../xpack/esql/session/EsqlSession.java | 8 ++++---- 6 files changed, 16 insertions(+), 21 deletions(-) rename x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/{FullTextFunctionMapperPreprocessor.java => FullTextFunctionMappingPreprocessor.java} (84%) rename x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/{MapperPreprocessorExecutor.java => MappingPreprocessorExecutor.java} (93%) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java index 5228bfe5170ed..8ef528b6668ab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/TranslationAware.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.esql.core.querydsl.query.Query; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; import org.elasticsearch.xpack.esql.planner.TranslatorHandler; -import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor; /** * Expressions implementing this interface can get called on data nodes to provide an Elasticsearch/Lucene query. @@ -43,8 +42,4 @@ interface SingleValueTranslationAware extends TranslationAware { */ Expression singleValueField(); } - - interface QueryRewriter extends TranslationAware { - MappingPreProcessor queryRewriter(); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java index f38d4967a76c5..f38ee6953ff70 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java @@ -123,7 +123,7 @@ public Object queryAsObject() { @Override public MappingPreProcessor mappingPreProcessor() { - return FullTextFunctionMapperPreprocessor.INSTANCE; + return FullTextFunctionMappingPreprocessor.INSTANCE; } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMappingPreprocessor.java similarity index 84% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMappingPreprocessor.java index 53ccc38e3364a..ec2ecd2919440 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMapperPreprocessor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunctionMappingPreprocessor.java @@ -27,12 +27,14 @@ /** * Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match} * will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator. - * {@link FullTextFunctionMapperPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by + * {@link FullTextFunctionMappingPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by * replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s. */ -public final class FullTextFunctionMapperPreprocessor implements MappingPreProcessor { +public final class FullTextFunctionMappingPreprocessor implements MappingPreProcessor { - public static final FullTextFunctionMapperPreprocessor INSTANCE = new FullTextFunctionMapperPreprocessor(); + public static final FullTextFunctionMappingPreprocessor INSTANCE = new FullTextFunctionMappingPreprocessor(); + + private FullTextFunctionMappingPreprocessor() {} @Override public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener listener) { @@ -64,7 +66,7 @@ public Set indexNames(LogicalPlan plan) { private record FullTextFunctionsRewritable(LogicalPlan plan) implements - Rewriteable { + Rewriteable { @Override public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException { Holder exceptionHolder = new Holder<>(); @@ -77,9 +79,9 @@ public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOExc } catch (IOException e) { exceptionHolder.trySet(e); } - var rewrite = builder != initial; - updated.set(updated.get() || rewrite); - return rewrite ? f.replaceQueryBuilder(builder) : f; + var rewritten = builder != initial; + updated.set(updated.get() || rewritten); + return rewritten ? f.replaceQueryBuilder(builder) : f; }); if (exceptionHolder.get() != null) { throw exceptionHolder.get(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java index ea5f3d9b83543..c87349e6b1288 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java @@ -30,8 +30,6 @@ public class Match extends AbstractMatchFullTextFunction { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Match", Match::readFrom); - private transient Boolean isOperator; - @FunctionInfo( returnType = "boolean", preview = true, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreprocessorExecutor.java similarity index 93% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreprocessorExecutor.java index 10eca26107008..690e29a784c06 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MapperPreprocessorExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/preprocessor/MappingPreprocessorExecutor.java @@ -15,11 +15,11 @@ import java.util.List; import java.util.Set; -public class MapperPreprocessorExecutor { +public class MappingPreprocessorExecutor { private final TransportActionServices services; - public MapperPreprocessorExecutor(TransportActionServices services) { + public MappingPreprocessorExecutor(TransportActionServices services) { this.services = services; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b5bfe4b820b70..ca9d192656abe 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -73,7 +73,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; -import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MapperPreprocessorExecutor; +import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreprocessorExecutor; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; @@ -116,7 +116,7 @@ public interface PlanRunner { private final PhysicalPlanOptimizer physicalPlanOptimizer; private final PlanTelemetry planTelemetry; private final IndicesExpressionGrouper indicesExpressionGrouper; - private final MapperPreprocessorExecutor mapperPreprocessorExecutor; + private final MappingPreprocessorExecutor mappingPreprocessorExecutor; public EsqlSession( String sessionId, @@ -144,7 +144,7 @@ public EsqlSession( this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); this.planTelemetry = planTelemetry; this.indicesExpressionGrouper = indicesExpressionGrouper; - this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services); + this.mappingPreprocessorExecutor = new MappingPreprocessorExecutor(services); } public String sessionId() { @@ -177,7 +177,7 @@ public void preMapping( LogicalPlan optimizedPlan, ActionListener listener ) { - mapperPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> { + mappingPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> { p.setOptimized(); // might have been updated by the preprocessor executeOptimizedPlan(request, executionInfo, planRunner, p, listener); }));