Skip to content

Sync EvaluteExpr execution (#11801) #14455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/kqp/gateway/kqp_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ class IKqpGateway : public NYql::IKikimrGateway {
using NYql::IKikimrGateway::ExecuteLiteral;
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
TQueryData::TPtr params, ui32 txIndex) = 0;
using NYql::IKikimrGateway::ExecuteLiteralInstant;
virtual TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request,
TQueryData::TPtr params, ui32 txIndex) = 0;

/* Scripting */
virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0;
Expand Down
180 changes: 105 additions & 75 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,79 @@ struct TAppConfigResult : public IKqpGateway::TGenericResult {
std::shared_ptr<const NKikimrConfig::TAppConfig> Config;
};

bool ContainOnlyLiteralStages(NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest& request) {
for (const auto& tx : request.Transactions) {
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
return false;
}

for (const auto& stage : tx.Body->GetStages()) {
if (stage.InputsSize() != 0) {
return false;
}
}
}

return true;
}

void PrepareLiteralRequest(IKqpGateway::TExecPhysicalRequest& literalRequest, NKqpProto::TKqpPhyQuery& phyQuery, const TString& program, const NKikimrMiniKQL::TType& resultType) {
literalRequest.NeedTxId = false;
literalRequest.MaxAffectedShards = 0;
literalRequest.TotalReadSizeLimitBytes = 0;
literalRequest.MkqlMemoryLimit = 100_MB;

auto& transaction = *phyQuery.AddTransactions();
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);

auto& stage = *transaction.AddStages();
auto& stageProgram = *stage.MutableProgram();
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
stageProgram.SetRaw(program);
stage.SetOutputsCount(1);

auto& taskResult = *transaction.AddResults();
*taskResult.MutableItemType() = resultType;
auto& taskConnection = *taskResult.MutableConnection();
taskConnection.SetStageIndex(0);
}

void FillLiteralResult(const IKqpGateway::TExecPhysicalResult& result, IKqpGateway::TExecuteLiteralResult& literalResult) {
if (result.Success()) {
YQL_ENSURE(result.Results.size() == 1);
literalResult.SetSuccess();
literalResult.Result = result.Results[0];
} else {
literalResult.SetStatus(result.Status());
literalResult.AddIssues(result.Issues());
}
}

void FillPhysicalResult(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev, IKqpGateway::TExecPhysicalResult& result, TQueryData::TPtr params, ui32 txIndex) {
auto& response = *ev->Record.MutableResponse();
if (response.GetStatus() == Ydb::StatusIds::SUCCESS) {
result.SetSuccess();
result.ExecuterResult.Swap(response.MutableResult());
{
auto g = params->TypeEnv().BindAllocator();

auto& txResults = ev->GetTxResults();
result.Results.reserve(txResults.size());
for(auto& tx : txResults) {
result.Results.emplace_back(tx.GetMkql());
}
params->AddTxHolders(std::move(ev->GetTxHolders()));

if (!txResults.empty()) {
params->AddTxResults(txIndex, std::move(txResults));
}
}
} else {
for (auto& issue : response.GetIssues()) {
result.AddIssue(NYql::IssueFromMessage(issue));
}
}
}

template<typename TRequest, typename TResponse, typename TResult>
class TProxyRequestHandler: public TRequestHandlerBase<
Expand Down Expand Up @@ -621,32 +694,8 @@ class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRe
}

void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) {
auto* response = ev->Record.MutableResponse();

TResult result;
if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
result.SetSuccess();
result.ExecuterResult.Swap(response->MutableResult());
{
auto g = Parameters->TypeEnv().BindAllocator();

auto& txResults = ev->GetTxResults();
result.Results.reserve(txResults.size());
for(auto& tx : txResults) {
result.Results.emplace_back(tx.GetMkql());
}
Parameters->AddTxHolders(std::move(ev->GetTxHolders()));

if (!txResults.empty()) {
Parameters->AddTxResults(TxIndex, std::move(txResults));
}
}
} else {
for (auto& issue : response->GetIssues()) {
result.AddIssue(NYql::IssueFromMessage(issue));
}
}

FillPhysicalResult(ev, result, Parameters, TxIndex);
Promise.SetValue(std::move(result));
this->PassAway();
}
Expand Down Expand Up @@ -1798,79 +1847,60 @@ class TKikimrIcGateway : public IKqpGateway {
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);

literalRequest.NeedTxId = false;
literalRequest.MaxAffectedShards = 0;
literalRequest.TotalReadSizeLimitBytes = 0;
literalRequest.MkqlMemoryLimit = 100_MB;

auto& transaction = *phyQuery.AddTransactions();
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);

auto& stage = *transaction.AddStages();
auto& stageProgram = *stage.MutableProgram();
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
stageProgram.SetRaw(program);
stage.SetOutputsCount(1);

auto& taskResult = *transaction.AddResults();
*taskResult.MutableItemType() = resultType;
auto& taskConnection = *taskResult.MutableConnection();
taskConnection.SetStageIndex(0);
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);

NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());

NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);

literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);

return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) {
const auto& result = future.GetValue();

TExecuteLiteralResult literalResult;

if (result.Success()) {
YQL_ENSURE(result.Results.size() == 1);
literalResult.SetSuccess();
literalResult.Result = result.Results[0];
} else {
literalResult.SetStatus(result.Status());
literalResult.AddIssues(result.Issues());
}

FillLiteralResult(result, literalResult);
return literalResult;
});
}

TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override {
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);

NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);

auto result = ExecuteLiteralInstant(std::move(literalRequest), params, 0);

TExecuteLiteralResult literalResult;
FillLiteralResult(result, literalResult);
return literalResult;
}

TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
YQL_ENSURE(!request.Transactions.empty());
YQL_ENSURE(request.DataShardLocks.empty());
YQL_ENSURE(!request.NeedTxId);

auto containOnlyLiteralStages = [](const auto& request) {
for (const auto& tx : request.Transactions) {
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
return false;
}

for (const auto& stage : tx.Body->GetStages()) {
if (stage.InputsSize() != 0) {
return false;
}
}
}

return true;
};

YQL_ENSURE(containOnlyLiteralStages(request));
YQL_ENSURE(ContainOnlyLiteralStages(request));
auto promise = NewPromise<TExecPhysicalResult>();
IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex);
RegisterActor(requestHandler);
return promise.GetFuture();
}

TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
YQL_ENSURE(!request.Transactions.empty());
YQL_ENSURE(request.DataShardLocks.empty());
YQL_ENSURE(!request.NeedTxId);
YQL_ENSURE(ContainOnlyLiteralStages(request));

auto ev = ::NKikimr::NKqp::ExecuteLiteral(std::move(request), Counters, TActorId{}, MakeIntrusive<TUserRequestContext>());
TExecPhysicalResult result;
FillPhysicalResult(ev, result, params, txIndex);
return result;
}

TFuture<TQueryResult> ExecScanQueryAst(const TString& cluster, const TString& query,
TQueryData::TPtr params, const TAstQuerySettings& settings, ui64 rowsLimit) override
{
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2219,6 +2219,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
return Gateway->ExecuteLiteral(program, resultType, txAlloc);
}

TExecuteLiteralResult ExecuteLiteralInstant(const TString& program,
const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override
{
return Gateway->ExecuteLiteralInstant(program, resultType, txAlloc);
}

private:
bool IsPrepare() const {
if (!SessionCtx) {
Expand Down
47 changes: 19 additions & 28 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,39 +892,30 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
if (status.Level != TStatus::Ok) {
return SyncStatus(status);
}
auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());

return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
[this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) {
return TAsyncTransformCallback(
[future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
auto literalResult = Gateway->ExecuteLiteralInstant(program, resultType, SessionCtx->Query().QueryData->GetAllocState());

const auto& literalResult = future.GetValueSync();

if (!literalResult.Success()) {
for (const auto& issue : literalResult.Issues()) {
ctx.AddError(issue);
}
input->SetState(TExprNode::EState::Error);
return IGraphTransformer::TStatus::Error;
}
if (!literalResult.Success()) {
for (const auto& issue : literalResult.Issues()) {
ctx.AddError(issue);
}
input->SetState(TExprNode::EState::Error);
return SyncError();
}

bool truncated = false;
auto yson = this->EncodeResultToYson(literalResult.Result, truncated);
if (truncated) {
input->SetState(TExprNode::EState::Error);
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
return IGraphTransformer::TStatus::Error;
}
bool truncated = false;
auto yson = EncodeResultToYson(literalResult.Result, truncated);
if (truncated) {
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
input->SetState(TExprNode::EState::Error);
return SyncError();
}

output = input;
input->SetState(TExprNode::EState::ExecutionComplete);
input->SetResult(ctx.NewAtom(input->Pos(), yson));
return IGraphTransformer::TStatus::Ok;
});
}));
output = input;
input->SetState(TExprNode::EState::ExecutionComplete);
input->SetResult(ctx.NewAtom(input->Pos(), yson));
return SyncOk();
}

if (input->Content() == ConfigureName) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,8 @@ class IKikimrGateway : public TThrRefBase {

virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;

virtual TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;

public:
using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>;

Expand Down
Loading