Skip to content

Commit daaabde

Browse files
committed
Fixed issues #1
1 parent 048f59f commit daaabde

File tree

6 files changed

+54
-32
lines changed

6 files changed

+54
-32
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -372,12 +372,16 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
372372
PassAway();
373373
}
374374

375-
void FillPreparedQuery(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
375+
void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
376376
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
377377
preparingQuery.release(), AppData()->FunctionRegistry);
378378
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
379379
KqpCompileResult->PreparedQuery = preparedQueryHolder;
380380
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
381+
382+
if (AstResult) {
383+
KqpCompileResult->Ast = AstResult->Ast;
384+
}
381385
}
382386

383387
void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
@@ -411,11 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
411415

412416
if (status == Ydb::StatusIds::SUCCESS) {
413417
YQL_ENSURE(kqpResult.PreparingQuery);
414-
FillPreparedQuery(std::move(kqpResult.PreparingQuery), queryType);
415-
416-
if (AstResult) {
417-
KqpCompileResult->Ast = AstResult->Ast;
418-
}
418+
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
419419

420420
auto now = TInstant::Now();
421421
auto duration = now - StartTime;
@@ -426,7 +426,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
426426
<< ", duration: " << duration);
427427
} else {
428428
if (kqpResult.PreparingQuery) {
429-
FillPreparedQuery(std::move(kqpResult.PreparingQuery), queryType);
429+
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
430430
}
431431

432432
LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResul
182182
, SqlVersion(sqlVersion) {}
183183

184184
void FillResult(TResult& validateResult) const override {
185+
if (!validateResult.Success()) {
186+
return;
187+
}
188+
185189
YQL_ENSURE(SessionCtx->Query().PrepareOnly);
186190
validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release());
187191
validateResult.SqlVersion = SqlVersion;
@@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
211215
, UseDqExplain(useDqExplain) {}
212216

213217
void FillResult(TResult& queryResult) const override {
218+
if (!queryResult.Success()) {
219+
return;
220+
}
221+
214222
if (UseDqExplain) {
215223
TVector<const TString> plans;
216224
for (auto id : SessionCtx->Query().ExecutionOrder) {
@@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
253261
, SqlVersion(sqlVersion) {}
254262

255263
void FillResult(TResult& queryResult) const override {
264+
if (!queryResult.Success()) {
265+
return;
266+
}
267+
256268
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
257269
queryResult.Results.emplace_back(
258270
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
@@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
300312
, ExecuteCtx(executeCtx) {}
301313

302314
void FillResult(TResult& queryResult) const override {
315+
if (!queryResult.Success()) {
316+
return;
317+
}
318+
303319
YQL_ENSURE(ExecuteCtx.QueryResults.size() == 1);
304320
queryResult = std::move(ExecuteCtx.QueryResults[0]);
305321
queryResult.QueryPlan = queryResult.PreparingQuery->GetPhysicalQuery().GetQueryPlan();
@@ -320,14 +336,24 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
320336
using TResult = IKqpHost::TQueryResult;
321337

322338
TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
323-
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion)
339+
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion,
340+
TIntrusivePtr<TKqlTransformContext> transformCtx = nullptr)
324341
: TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
325342
, QueryCtx(queryCtx)
326343
, ExprCtx(exprCtx)
344+
, TransformCtx(transformCtx)
327345
, QueryText(query.Text)
328346
, SqlVersion(sqlVersion) {}
329347

330348
void FillResult(TResult& prepareResult) const override {
349+
if (!prepareResult.Success() && TransformCtx) {
350+
if (auto exprRoot = TransformCtx->ExplainTransformerInput ? TransformCtx->ExplainTransformerInput : GetExprRoot()) {
351+
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
352+
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*GetExprRoot(), ExprCtx));
353+
}
354+
return;
355+
}
356+
331357
YQL_ENSURE(QueryCtx->PrepareOnly);
332358
YQL_ENSURE(QueryCtx->PreparingQuery);
333359

@@ -343,18 +369,10 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
343369
prepareResult.QueryAst = prepareResult.PreparingQuery->GetPhysicalQuery().GetQueryAst();
344370
}
345371

346-
void FillPartialResult(TResult& prepareResult) const override {
347-
YQL_ENSURE(QueryCtx->PrepareOnly);
348-
349-
if (auto exprRoot = GetExprRoot()) {
350-
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
351-
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*GetExprRoot(), ExprCtx));
352-
}
353-
}
354-
355372
private:
356373
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
357374
NYql::TExprContext& ExprCtx;
375+
TIntrusivePtr<TKqlTransformContext> TransformCtx;
358376
TString QueryText;
359377
TMaybe<TSqlVersion> SqlVersion;
360378
};
@@ -944,6 +962,7 @@ class TKqpHost : public IKqpHost {
944962
, IsInternalCall(isInternalCall)
945963
, FederatedQuerySetup(federatedQuerySetup)
946964
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
965+
, Config(config)
947966
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
948967
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
949968
, FakeWorld(ExprCtx->NewWorld(TPosition()))
@@ -1338,7 +1357,7 @@ class TKqpHost : public IKqpHost {
13381357
}
13391358

13401359
return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
1341-
query.Text, sqlVersion);
1360+
query.Text, sqlVersion, TransformCtx);
13421361
}
13431362

13441363
IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx,
@@ -1513,7 +1532,8 @@ class TKqpHost : public IKqpHost {
15131532
}
15141533

15151534
void Init(EKikimrQueryType queryType) {
1516-
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
1535+
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
1536+
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);
15171537

15181538
ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
15191539
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
@@ -1646,6 +1666,7 @@ class TKqpHost : public IKqpHost {
16461666
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
16471667

16481668
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
1669+
TKikimrConfiguration::TPtr Config;
16491670

16501671
TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder;
16511672
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
@@ -1659,6 +1680,7 @@ class TKqpHost : public IKqpHost {
16591680
TExprNode::TPtr FakeWorld;
16601681

16611682
TIntrusivePtr<TExecuteContext> ExecuteCtx;
1683+
TIntrusivePtr<TKqlTransformContext> TransformCtx;
16621684
TIntrusivePtr<IKqpRunner> KqpRunner;
16631685
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};
16641686

ydb/core/kqp/host/kqp_host_impl.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
3535

3636
if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) {
3737
TResult result = NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
38-
FillPartialResult(result);
38+
FillResult(result);
3939
return result;
4040
}
4141

@@ -85,8 +85,6 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
8585
protected:
8686
virtual void FillResult(TResult& result) const = 0;
8787

88-
virtual void FillPartialResult(TResult&) const {}
89-
9088
NYql::TExprNode::TPtr GetExprRoot() const { return ExprRoot; }
9189
NYql::TExprContext& GetExprContext() const { return ExprCtx; }
9290
NYql::IGraphTransformer& GetTransformer() const { return Transformer; }
@@ -248,7 +246,7 @@ class IKqpRunner : public TThrRefBase {
248246

249247
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
250248
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
251-
const NMiniKQL::IFunctionRegistry& funcRegistry);
249+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);
252250

253251
TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
254252
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner {
137137
public:
138138
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
139139
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
140-
const NMiniKQL::IFunctionRegistry& funcRegistry)
140+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
141141
: Gateway(gateway)
142142
, Cluster(cluster)
143143
, TypesCtx(*typesCtx)
144144
, SessionCtx(sessionCtx)
145145
, FunctionRegistry(funcRegistry)
146146
, Config(sessionCtx->ConfigPtr())
147-
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
147+
, TransformCtx(transformCtx)
148148
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
149149
sessionCtx->TablesPtr()))
150150
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
@@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner {
377377

378378
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
379379
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
380-
const NMiniKQL::IFunctionRegistry& funcRegistry)
380+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
381381
{
382-
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry);
382+
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
383383
}
384384

385385
} // namespace NKqp

ydb/tests/fq/s3/test_bindings.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import boto3
55
import logging
66
import pytest
7+
import sys
78

89
import ydb.public.api.protos.ydb_value_pb2 as ydb
910
import ydb.public.api.protos.draft.fq_pb2 as fq
@@ -617,5 +618,5 @@ def test_ast_in_failed_query_compilation(self, kikimr, s3, client):
617618
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
618619
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
619620

620-
ast = str(client.describe_query(query_id).result.query.ast)
621-
assert ast != "", "Query ast not found"
621+
ast = client.describe_query(query_id).result.query.ast.data
622+
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"

ydb/tests/fq/yds/test_select_1.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# -*- coding: utf-8 -*-
33

44
import logging
5+
import sys
56

67
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all
78

@@ -121,10 +122,10 @@ def test_compile_error(self, client, yq_version):
121122

122123
@yq_all
123124
def test_ast_in_failed_query_runtime(self, client):
124-
sql = "SELECT unwrap(1 / 0)"
125+
sql = "SELECT unwrap(42 / 0) AS error_column"
125126

126127
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
127128
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
128129

129-
ast = str(client.describe_query(query_id).result.query.ast)
130-
assert ast != "", "Query ast not found"
130+
ast = client.describe_query(query_id).result.query.ast.data
131+
assert "(\'\"error_column\" (Unwrap (/ (Int32 \'\"42\")" in ast, "Invalid query ast"

0 commit comments

Comments
 (0)