Skip to content

ESQL: Add a pre-mapping logical plan processing step #120368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
5 changes: 5 additions & 0 deletions docs/changelog/120368.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120368
summary: Add a pre-mapping logical plan processing step
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ public void set(T value) {
this.value = value;
}

/**
* Sets a value in the holder, but only if none has already been set.
* @param value the new value to set.
* @return the previously held value, if any was set.
*/
public T trySet(T value) {
T old = this.value;
if (old == null) {
this.value = value;
}
return old;
}

public T get() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
Expand All @@ -31,9 +33,11 @@
import org.elasticsearch.geo.ShapeTestUtils;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
Expand Down Expand Up @@ -72,8 +76,8 @@
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.versionfield.Version;
Expand Down Expand Up @@ -140,6 +144,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;

public final class EsqlTestUtils {

Expand Down Expand Up @@ -360,7 +365,14 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {

public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver();
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
mock(SearchService.class),
null,
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
null
);

private EsqlTestUtils() {}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void esql(
EsqlExecutionInfo executionInfo,
IndicesExpressionGrouper indicesExpressionGrouper,
EsqlSession.PlanRunner planRunner,
QueryBuilderResolver queryBuilderResolver,
TransportActionServices services,
ActionListener<Result> listener
) {
final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
Expand All @@ -78,7 +78,7 @@ public void esql(
verifier,
planTelemetry,
indicesExpressionGrouper,
queryBuilderResolver
services
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package org.elasticsearch.xpack.esql.expression.function.fulltext;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator;
import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator.ShardConfig;
import org.elasticsearch.compute.operator.EvalOperator;
Expand Down Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor;
import org.elasticsearch.xpack.esql.querydsl.query.TranslationAwareExpressionQuery;

import java.util.List;
Expand All @@ -56,7 +57,12 @@
* These functions needs to be pushed down to Lucene queries to be executed - there's no Evaluator for them, but depend on
* {@link org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer} to rewrite them into Lucene queries.
*/
public abstract class FullTextFunction extends Function implements TranslationAware, PostAnalysisPlanVerificationAware, EvaluatorMapper {
public abstract class FullTextFunction extends Function
implements
TranslationAware,
PostAnalysisPlanVerificationAware,
EvaluatorMapper,
MappingPreProcessor.MappingPreProcessorSupplier {

private final Expression query;
private final QueryBuilder queryBuilder;
Expand Down Expand Up @@ -110,11 +116,12 @@ public Expression query() {
*/
public Object queryAsObject() {
Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */);
if (queryAsObject instanceof BytesRef bytesRef) {
return bytesRef.utf8ToString();
}
return BytesRefs.toString(queryAsObject);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}

return queryAsObject;
@Override
public MappingPreProcessor mappingPreProcessor() {
return FullTextFunctionMappingPreprocessor.INSTANCE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.fulltext;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.IndexResolver;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
* Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
* will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
* {@link FullTextFunctionMappingPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by
* replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
*/
public final class FullTextFunctionMappingPreprocessor implements MappingPreProcessor {

public static final FullTextFunctionMappingPreprocessor INSTANCE = new FullTextFunctionMappingPreprocessor();

private FullTextFunctionMappingPreprocessor() {}

@Override
public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
Rewriteable.rewriteAndFetch(
new FullTextFunctionsRewritable(plan),
queryRewriteContext(services, indexNames(plan)),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
);
}

private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set<String> indexNames) {
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
indexNames.toArray(String[]::new),
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
services.clusterService().state(),
services.indexNameExpressionResolver(),
services.transportService().getRemoteClusterService(),
System.currentTimeMillis()
);

return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null);
}

public Set<String> indexNames(LogicalPlan plan) {
Set<String> indexNames = new HashSet<>();
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteIndices()));
return indexNames;
}

private record FullTextFunctionsRewritable(LogicalPlan plan)
implements
Rewriteable<FullTextFunctionMappingPreprocessor.FullTextFunctionsRewritable> {
@Override
public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
Holder<IOException> exceptionHolder = new Holder<>();
Holder<Boolean> updated = new Holder<>(false);
LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> {
QueryBuilder builder = f.queryBuilder(), initial = builder;
builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder;
try {
builder = builder.rewrite(ctx);
} catch (IOException e) {
exceptionHolder.trySet(e);
}
var rewritten = builder != initial;
updated.set(updated.get() || rewritten);
return rewritten ? f.replaceQueryBuilder(builder) : f;
});
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner.mapper.preprocessor;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

/**
* Interface for a LogicalPlan processing rule occurring after the optimization, but before mapping to a physical plan.
* This step occurs on the coordinator. The rule may use services provided to the transport action and thus can resolve indices, rewrite
* queries, perform substitutions, etc.
* Note that the LogicalPlan following the rules' changes will not undergo another logical optimization round. The changes these rules
* should apply are only those that require access to services that need to be performed asynchronously.
*/
public interface MappingPreProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be multiple preprocessors?
Why not have a dedicated "stage" such as preMapping() (similar to preAnalyze) and encapsulate the logic there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be multiple preprocessors?

That was my assumption, yes.

Why not have a dedicated "stage" such as preMapping() (similar to preAnalyze) and encapsulate the logic there?

There are some particularities to how queries are repeatedly rewritten (and stopping that is done by identity checking of a return object) -- these should remain particular to FullTextFunction IMO and this logic stay within a dedicated class.

But the execution of the preprocessor is done at a dedicated stage.

Not sure if the current format meets your expectation, but happy to shape it further if it can be done better.


/**
* Process a logical plan making use of the available services and provide the updated plan to the provided listener.
* @param plan the logical plan to process
* @param services the services available from the transport action
* @param listener the listener to notify when processing is complete
*/
void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener);

interface MappingPreProcessorSupplier {
MappingPreProcessor mappingPreProcessor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner.mapper.preprocessor;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class MappingPreprocessorExecutor {

private final TransportActionServices services;

public MappingPreprocessorExecutor(TransportActionServices services) {
this.services = services;
}

public void execute(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
execute(plan, preprocessors(plan), 0, listener);
}

private static List<MappingPreProcessor> preprocessors(LogicalPlan plan) {
Set<MappingPreProcessor> preprocessors = new HashSet<>();
plan.forEachExpressionDown(e -> {
if (e instanceof MappingPreProcessor.MappingPreProcessorSupplier supplier) {
preprocessors.add(supplier.mappingPreProcessor());
}
});
return List.copyOf(preprocessors);
}

private void execute(LogicalPlan plan, List<MappingPreProcessor> preprocessors, int index, ActionListener<LogicalPlan> listener) {
if (index == preprocessors.size()) {
listener.onResponse(plan);
} else {
preprocessors.get(index)
.preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, preprocessors, index + 1, l)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;

public record TransportActionServices(
TransportService transportService,
SearchService searchService,
ExchangeService exchangeService,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
UsageService usageService
) {}
Loading