Skip to content

Commit f855f59

Browse files
Hor911gridnevvvit
authored andcommitted
Sync EvaluteExpr execution (ydb-platform#11801) (ydb-platform#14455)
1 parent df03f29 commit f855f59

File tree

5 files changed

+135
-103
lines changed

5 files changed

+135
-103
lines changed

ydb/core/kqp/gateway/kqp_gateway.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ class IKqpGateway : public NYql::IKikimrGateway {
200200
using NYql::IKikimrGateway::ExecuteLiteral;
201201
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
202202
TQueryData::TPtr params, ui32 txIndex) = 0;
203+
using NYql::IKikimrGateway::ExecuteLiteralInstant;
204+
virtual TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request,
205+
TQueryData::TPtr params, ui32 txIndex) = 0;
203206

204207
/* Scripting */
205208
virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0;

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 105 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,79 @@ struct TAppConfigResult : public IKqpGateway::TGenericResult {
7777
std::shared_ptr<const NKikimrConfig::TAppConfig> Config;
7878
};
7979

80+
bool ContainOnlyLiteralStages(NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest& request) {
81+
for (const auto& tx : request.Transactions) {
82+
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
83+
return false;
84+
}
85+
86+
for (const auto& stage : tx.Body->GetStages()) {
87+
if (stage.InputsSize() != 0) {
88+
return false;
89+
}
90+
}
91+
}
92+
93+
return true;
94+
}
95+
96+
void PrepareLiteralRequest(IKqpGateway::TExecPhysicalRequest& literalRequest, NKqpProto::TKqpPhyQuery& phyQuery, const TString& program, const NKikimrMiniKQL::TType& resultType) {
97+
literalRequest.NeedTxId = false;
98+
literalRequest.MaxAffectedShards = 0;
99+
literalRequest.TotalReadSizeLimitBytes = 0;
100+
literalRequest.MkqlMemoryLimit = 100_MB;
101+
102+
auto& transaction = *phyQuery.AddTransactions();
103+
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
104+
105+
auto& stage = *transaction.AddStages();
106+
auto& stageProgram = *stage.MutableProgram();
107+
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
108+
stageProgram.SetRaw(program);
109+
stage.SetOutputsCount(1);
110+
111+
auto& taskResult = *transaction.AddResults();
112+
*taskResult.MutableItemType() = resultType;
113+
auto& taskConnection = *taskResult.MutableConnection();
114+
taskConnection.SetStageIndex(0);
115+
}
116+
117+
void FillLiteralResult(const IKqpGateway::TExecPhysicalResult& result, IKqpGateway::TExecuteLiteralResult& literalResult) {
118+
if (result.Success()) {
119+
YQL_ENSURE(result.Results.size() == 1);
120+
literalResult.SetSuccess();
121+
literalResult.Result = result.Results[0];
122+
} else {
123+
literalResult.SetStatus(result.Status());
124+
literalResult.AddIssues(result.Issues());
125+
}
126+
}
127+
128+
void FillPhysicalResult(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev, IKqpGateway::TExecPhysicalResult& result, TQueryData::TPtr params, ui32 txIndex) {
129+
auto& response = *ev->Record.MutableResponse();
130+
if (response.GetStatus() == Ydb::StatusIds::SUCCESS) {
131+
result.SetSuccess();
132+
result.ExecuterResult.Swap(response.MutableResult());
133+
{
134+
auto g = params->TypeEnv().BindAllocator();
135+
136+
auto& txResults = ev->GetTxResults();
137+
result.Results.reserve(txResults.size());
138+
for(auto& tx : txResults) {
139+
result.Results.emplace_back(tx.GetMkql());
140+
}
141+
params->AddTxHolders(std::move(ev->GetTxHolders()));
142+
143+
if (!txResults.empty()) {
144+
params->AddTxResults(txIndex, std::move(txResults));
145+
}
146+
}
147+
} else {
148+
for (auto& issue : response.GetIssues()) {
149+
result.AddIssue(NYql::IssueFromMessage(issue));
150+
}
151+
}
152+
}
80153

81154
template<typename TRequest, typename TResponse, typename TResult>
82155
class TProxyRequestHandler: public TRequestHandlerBase<
@@ -619,32 +692,8 @@ class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRe
619692
}
620693

621694
void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) {
622-
auto* response = ev->Record.MutableResponse();
623-
624695
TResult result;
625-
if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
626-
result.SetSuccess();
627-
result.ExecuterResult.Swap(response->MutableResult());
628-
{
629-
auto g = Parameters->TypeEnv().BindAllocator();
630-
631-
auto& txResults = ev->GetTxResults();
632-
result.Results.reserve(txResults.size());
633-
for(auto& tx : txResults) {
634-
result.Results.emplace_back(tx.GetMkql());
635-
}
636-
Parameters->AddTxHolders(std::move(ev->GetTxHolders()));
637-
638-
if (!txResults.empty()) {
639-
Parameters->AddTxResults(TxIndex, std::move(txResults));
640-
}
641-
}
642-
} else {
643-
for (auto& issue : response->GetIssues()) {
644-
result.AddIssue(NYql::IssueFromMessage(issue));
645-
}
646-
}
647-
696+
FillPhysicalResult(ev, result, Parameters, TxIndex);
648697
Promise.SetValue(std::move(result));
649698
this->PassAway();
650699
}
@@ -1796,79 +1845,60 @@ class TKikimrIcGateway : public IKqpGateway {
17961845
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
17971846
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
17981847
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
1799-
1800-
literalRequest.NeedTxId = false;
1801-
literalRequest.MaxAffectedShards = 0;
1802-
literalRequest.TotalReadSizeLimitBytes = 0;
1803-
literalRequest.MkqlMemoryLimit = 100_MB;
1804-
1805-
auto& transaction = *phyQuery.AddTransactions();
1806-
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
1807-
1808-
auto& stage = *transaction.AddStages();
1809-
auto& stageProgram = *stage.MutableProgram();
1810-
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
1811-
stageProgram.SetRaw(program);
1812-
stage.SetOutputsCount(1);
1813-
1814-
auto& taskResult = *transaction.AddResults();
1815-
*taskResult.MutableItemType() = resultType;
1816-
auto& taskConnection = *taskResult.MutableConnection();
1817-
taskConnection.SetStageIndex(0);
1848+
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
18181849

18191850
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
1820-
18211851
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
1822-
18231852
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
18241853

18251854
return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) {
18261855
const auto& result = future.GetValue();
1827-
18281856
TExecuteLiteralResult literalResult;
1829-
1830-
if (result.Success()) {
1831-
YQL_ENSURE(result.Results.size() == 1);
1832-
literalResult.SetSuccess();
1833-
literalResult.Result = result.Results[0];
1834-
} else {
1835-
literalResult.SetStatus(result.Status());
1836-
literalResult.AddIssues(result.Issues());
1837-
}
1838-
1857+
FillLiteralResult(result, literalResult);
18391858
return literalResult;
18401859
});
18411860
}
18421861

1862+
TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override {
1863+
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
1864+
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
1865+
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
1866+
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
1867+
1868+
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
1869+
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
1870+
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
1871+
1872+
auto result = ExecuteLiteralInstant(std::move(literalRequest), params, 0);
1873+
1874+
TExecuteLiteralResult literalResult;
1875+
FillLiteralResult(result, literalResult);
1876+
return literalResult;
1877+
}
18431878

18441879
TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
18451880
YQL_ENSURE(!request.Transactions.empty());
18461881
YQL_ENSURE(request.DataShardLocks.empty());
18471882
YQL_ENSURE(!request.NeedTxId);
1848-
1849-
auto containOnlyLiteralStages = [](const auto& request) {
1850-
for (const auto& tx : request.Transactions) {
1851-
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
1852-
return false;
1853-
}
1854-
1855-
for (const auto& stage : tx.Body->GetStages()) {
1856-
if (stage.InputsSize() != 0) {
1857-
return false;
1858-
}
1859-
}
1860-
}
1861-
1862-
return true;
1863-
};
1864-
1865-
YQL_ENSURE(containOnlyLiteralStages(request));
1883+
YQL_ENSURE(ContainOnlyLiteralStages(request));
18661884
auto promise = NewPromise<TExecPhysicalResult>();
18671885
IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex);
18681886
RegisterActor(requestHandler);
18691887
return promise.GetFuture();
18701888
}
18711889

1890+
TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
1891+
YQL_ENSURE(!request.Transactions.empty());
1892+
YQL_ENSURE(request.DataShardLocks.empty());
1893+
YQL_ENSURE(!request.NeedTxId);
1894+
YQL_ENSURE(ContainOnlyLiteralStages(request));
1895+
1896+
auto ev = ::NKikimr::NKqp::ExecuteLiteral(std::move(request), Counters, TActorId{}, MakeIntrusive<TUserRequestContext>());
1897+
TExecPhysicalResult result;
1898+
FillPhysicalResult(ev, result, params, txIndex);
1899+
return result;
1900+
}
1901+
18721902
TFuture<TQueryResult> ExecScanQueryAst(const TString& cluster, const TString& query,
18731903
TQueryData::TPtr params, const TAstQuerySettings& settings, ui64 rowsLimit) override
18741904
{

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,6 +2219,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
22192219
return Gateway->ExecuteLiteral(program, resultType, txAlloc);
22202220
}
22212221

2222+
TExecuteLiteralResult ExecuteLiteralInstant(const TString& program,
2223+
const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override
2224+
{
2225+
return Gateway->ExecuteLiteralInstant(program, resultType, txAlloc);
2226+
}
2227+
22222228
private:
22232229
bool IsPrepare() const {
22242230
if (!SessionCtx) {

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -892,39 +892,30 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
892892
if (status.Level != TStatus::Ok) {
893893
return SyncStatus(status);
894894
}
895-
auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
896895

897-
return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
898-
[this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) {
899-
return TAsyncTransformCallback(
900-
[future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
896+
auto literalResult = Gateway->ExecuteLiteralInstant(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
901897

902-
const auto& literalResult = future.GetValueSync();
903-
904-
if (!literalResult.Success()) {
905-
for (const auto& issue : literalResult.Issues()) {
906-
ctx.AddError(issue);
907-
}
908-
input->SetState(TExprNode::EState::Error);
909-
return IGraphTransformer::TStatus::Error;
910-
}
898+
if (!literalResult.Success()) {
899+
for (const auto& issue : literalResult.Issues()) {
900+
ctx.AddError(issue);
901+
}
902+
input->SetState(TExprNode::EState::Error);
903+
return SyncError();
904+
}
911905

912-
bool truncated = false;
913-
auto yson = this->EncodeResultToYson(literalResult.Result, truncated);
914-
if (truncated) {
915-
input->SetState(TExprNode::EState::Error);
916-
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
917-
return IGraphTransformer::TStatus::Error;
918-
}
906+
bool truncated = false;
907+
auto yson = EncodeResultToYson(literalResult.Result, truncated);
908+
if (truncated) {
909+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
910+
input->SetState(TExprNode::EState::Error);
911+
return SyncError();
912+
}
919913

920-
output = input;
921-
input->SetState(TExprNode::EState::ExecutionComplete);
922-
input->SetResult(ctx.NewAtom(input->Pos(), yson));
923-
return IGraphTransformer::TStatus::Ok;
924-
});
925-
}));
914+
output = input;
915+
input->SetState(TExprNode::EState::ExecutionComplete);
916+
input->SetResult(ctx.NewAtom(input->Pos(), yson));
917+
return SyncOk();
926918
}
927-
928919
if (input->Content() == ConfigureName) {
929920
auto requireStatus = RequireChild(*input, 0);
930921
if (requireStatus.Level != TStatus::Ok) {

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,8 @@ class IKikimrGateway : public TThrRefBase {
10281028

10291029
virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
10301030

1031+
virtual TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
1032+
10311033
public:
10321034
using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>;
10331035

0 commit comments

Comments
 (0)