Skip to content

ESQL: Add a pre-mapping logical plan processing step #120368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
5 changes: 5 additions & 0 deletions docs/changelog/120368.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120368
summary: Add a pre-mapping logical plan processing step
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
Expand All @@ -31,9 +33,11 @@
import org.elasticsearch.geo.ShapeTestUtils;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
Expand Down Expand Up @@ -71,8 +75,8 @@
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.versionfield.Version;
Expand Down Expand Up @@ -138,6 +142,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;

public final class EsqlTestUtils {

Expand Down Expand Up @@ -358,7 +363,14 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {

public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver();
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
mock(SearchService.class),
null,
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
null
);

private EsqlTestUtils() {}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.PlanningMetrics;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void esql(
EsqlExecutionInfo executionInfo,
IndicesExpressionGrouper indicesExpressionGrouper,
EsqlSession.PlanRunner planRunner,
QueryBuilderResolver queryBuilderResolver,
TransportActionServices services,
ActionListener<Result> listener
) {
final PlanningMetrics planningMetrics = new PlanningMetrics();
Expand All @@ -78,7 +78,7 @@ public void esql(
verifier,
planningMetrics,
indicesExpressionGrouper,
queryBuilderResolver
services
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package org.elasticsearch.xpack.esql.expression.function.fulltext;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
Expand Down Expand Up @@ -113,11 +113,7 @@ public Expression query() {
*/
public Object queryAsObject() {
Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */);
if (queryAsObject instanceof BytesRef bytesRef) {
return bytesRef.utf8ToString();
}

return queryAsObject;
return BytesRefs.toString(queryAsObject);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner.mapper.preprocessor;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.IndexResolver;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
* will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
* {@link FullTextFunctionMapperPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by
* replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
*/
public class FullTextFunctionMapperPreprocessor implements MappingPreProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses a pull approach where a built-in pre-processor searches for FullTextFunctions before rewriting the query.
A push approach would be better, similar to the Aware interfaces: can the TranslationAware be used instead or potentially an extension to it.
Specifically this stage is important only for queries that depend on QueryRewriteContext (not its subclasses) - creating this context is not trivial so checking whether this is needed is valuable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reworked this part, to have the FullTextFunctions return the preprocessor as a singleton. This way the preprocessors are "discovered" in the plan.


@Override
public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
Set<FullTextFunction> unresolved = fullTextFunctions(plan);
Set<String> indexNames = indexNames(plan);

if (indexNames == null || indexNames.isEmpty() || unresolved.isEmpty()) {
listener.onResponse(plan);
return;
}
QueryRewriteContext ctx = queryRewriteContext(services, indexNames);
FullTextFunctionsRewritable rewritable = new FullTextFunctionsRewritable(unresolved);
Rewriteable.rewriteAndFetch(
rewritable,
ctx,
listener.delegateFailureAndWrap((l, r) -> l.onResponse(updateQueryBuilders(plan, r.results())))
);
}

public LogicalPlan updateQueryBuilders(LogicalPlan plan, Map<FullTextFunction, QueryBuilder> newQueryBuilders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't the rewrite applied on the spot and instead an associated map is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reworked this bit. But my guess is that the intention was to avoid multiple tree passes.

LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, m -> {
if (newQueryBuilders.containsKey(m)) {
return m.replaceQueryBuilder(newQueryBuilders.get(m));
}
return m;
});
// The given plan was already analyzed and optimized, so we set the resulted plan to optimized as well.
newPlan.setOptimized();
return newPlan;
}

private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set<String> indexNames) {
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
indexNames.toArray(String[]::new),
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
services.clusterService().state(),
services.indexNameExpressionResolver(),
services.transportService().getRemoteClusterService(),
System.currentTimeMillis()
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indices have already been resolved through field-caps; double check if this information can be reused instead of going back to string format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResolvedIndices splits the indices by local and remote and uses the string format for that. Not sure if/how we can simplify this.


return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null);
}

private static Set<FullTextFunction> fullTextFunctions(LogicalPlan plan) {
Set<FullTextFunction> functions = new HashSet<>();
plan.forEachExpressionDown(FullTextFunction.class, functions::add);
return functions;
}

public Set<String> indexNames(LogicalPlan plan) {
Set<String> indexNames = new HashSet<>();
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.index().concreteIndices()));
return indexNames;
}

private static class FullTextFunctionsRewritable
implements
Rewriteable<FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable> {

private final Map<FullTextFunction, QueryBuilder> queryBuilderMap;

FullTextFunctionsRewritable(Map<FullTextFunction, QueryBuilder> queryBuilderMap) {
this.queryBuilderMap = queryBuilderMap;
}

FullTextFunctionsRewritable(Set<FullTextFunction> functions) {
this.queryBuilderMap = new HashMap<>();

for (FullTextFunction func : functions) {
queryBuilderMap.put(func, func.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder());
}
}

@Override
public FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
Map<FullTextFunction, QueryBuilder> results = new HashMap<>();

boolean hasChanged = false;
for (var entry : queryBuilderMap.entrySet()) {
var initial = entry.getValue();
var rewritten = initial.rewrite(ctx);
hasChanged |= rewritten != initial;

results.put(entry.getKey(), rewritten);
}

return hasChanged ? new FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable(results) : this;
}

public Map<FullTextFunction, QueryBuilder> results() {
return queryBuilderMap;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner.mapper.preprocessor;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class MapperPreprocessorExecutor {

private final TransportActionServices services;
private final List<MappingPreProcessor> proprocessors = new ArrayList<>();

public MapperPreprocessorExecutor(TransportActionServices services) {
this.services = services;
}

public MapperPreprocessorExecutor addPreprocessor(MappingPreProcessor preProcessor) {
proprocessors.add(preProcessor);
return this;
}

public MapperPreprocessorExecutor addPreprocessors(Collection<MappingPreProcessor> preProcessors) {
proprocessors.addAll(preProcessors);
return this;
}

public void execute(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
execute(plan, 0, listener);
}

private void execute(LogicalPlan plan, int index, ActionListener<LogicalPlan> listener) {
if (index == proprocessors.size()) {
listener.onResponse(plan);
} else {
proprocessors.get(index).preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, index + 1, l)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner.mapper.preprocessor;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

/**
* Interface for a LogicalPlan processing rule occurring after the optimization, but before mapping to a physical plan.
* This step occurs on the coordinator. The rule may use services provided to the transport action and thus can resolve indices, rewrite
* queries, perform substitutions, etc.
* Note that the LogicalPlan following the rules' changes will not undergo another logical optimization round. The changes these rules
* should apply are only those that require access to services that need to be performed asynchronously.
*/
public interface MappingPreProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be multiple preprocessors?
Why not have a dedicated "stage" such as preMapping() (similar to preAnalyze) and encapsulate the logic there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be multiple preprocessors?

That was my assumption, yes.

Why not have a dedicated "stage" such as preMapping() (similar to preAnalyze) and encapsulate the logic there?

There are some particularities to how queries are repeatedly rewritten (and stopping that is done by identity checking of a return object) -- these should remain particular to FullTextFunction IMO and this logic stay within a dedicated class.

But the execution of the preprocessor is done at a dedicated stage.

Not sure if the current format meets your expectation, but happy to shape it further if it can be done better.


/**
* Process a logical plan making use of the available services and provide the updated plan to the provided listener.
* @param plan the logical plan to process
* @param services the services available from the transport action
* @param listener the listener to notify when processing is complete
*/
void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;

public record TransportActionServices(
TransportService transportService,
SearchService searchService,
ExchangeService exchangeService,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
UsageService usageService
) {}
Loading