From 27aa1cf843b5c0bfbb599b73576c74a67ae5b243 Mon Sep 17 00:00:00 2001 From: Evgeny Zverev Date: Fri, 2 Feb 2024 08:11:06 +0300 Subject: [PATCH] YQL-17542 split FillIoMaps --- .../actors/compute/dq_async_compute_actor.cpp | 2 +- .../dq/actors/compute/dq_compute_actor_impl.h | 74 +++++++------------ .../compute/dq_sync_compute_actor_base.h | 37 ++++++++-- 3 files changed, 61 insertions(+), 52 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 4c2b2beedfcd..9ea24cb6cf02 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -483,7 +483,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseAddCounters2(ev->Get()->Sensors); } TypeEnv = const_cast(&typeEnv); - FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges); + FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr); { // say "Hello" to executer diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5b4a3fa59793..0803dbc40533 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1469,19 +1469,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const THashMap& secureParams, const THashMap& taskParams, - const TVector& readRanges) + const TVector& readRanges, + IRandomProvider* randomProvider + ) { - if (TaskRunner) { - for (auto& [channelId, channel] : InputChannelsMap) { - channel.Channel = TaskRunner->GetInputChannel(channelId); - } - } auto collectStatsLevel = StatsModeToCollectStatsLevel(RuntimeSettings.StatsMode); for (auto& [inputIndex, source] : SourcesMap) { - if constexpr (!TDerived::HasAsyncTaskRunner) { - source.Buffer = TaskRunner->GetSource(inputIndex); - Y_ABORT_UNLESS(source.Buffer); - } Y_ABORT_UNLESS(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); Y_ABORT_UNLESS(inputDesc.HasSource()); @@ -1515,9 +1508,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->RegisterWithSameMailbox(source.Actor); } for (auto& [inputIndex, transform] : InputTransformsMap) { - if constexpr (!TDerived::HasAsyncTaskRunner) { - transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); - std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex); + Y_ABORT_UNLESS(TaskRunner); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); Y_ABORT_UNLESS(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString()); @@ -1543,43 +1535,33 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); } this->RegisterWithSameMailbox(transform.Actor); - } - } - if (TaskRunner) { - for (auto& [channelId, channel] : OutputChannelsMap) { - channel.Channel = TaskRunner->GetOutputChannel(channelId); - } } for (auto& [outputIndex, transform] : OutputTransformsMap) { - if (TaskRunner) { - transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); - std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex); - Y_ABORT_UNLESS(AsyncIoFactory); - const auto& outputDesc = Task.GetOutputs(outputIndex); - CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString()); - try { - std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( - IDqAsyncIoFactory::TOutputTransformArguments { - .OutputDesc = outputDesc, - .OutputIndex = outputIndex, - .StatsLevel = collectStatsLevel, - .TxId = TxId, - .TransformOutput = transform.OutputBuffer, - .Callback = static_cast(this), - .SecureParams = secureParams, - .TaskParams = taskParams, - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder - }); - } catch (const std::exception& ex) { - throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); - } - this->RegisterWithSameMailbox(transform.Actor); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); + Y_ABORT_UNLESS(AsyncIoFactory); + const auto& outputDesc = Task.GetOutputs(outputIndex); + CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString()); + try { + std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( + IDqAsyncIoFactory::TOutputTransformArguments { + .OutputDesc = outputDesc, + .OutputIndex = outputIndex, + .StatsLevel = collectStatsLevel, + .TxId = TxId, + .TransformOutput = transform.OutputBuffer, + .Callback = static_cast(this), + .SecureParams = secureParams, + .TaskParams = taskParams, + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); } + this->RegisterWithSameMailbox(transform.Actor); } for (auto& [outputIndex, sink] : SinksMap) { - if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); } Y_ABORT_UNLESS(AsyncIoFactory); const auto& outputDesc = Task.GetOutputs(outputIndex); Y_ABORT_UNLESS(outputDesc.HasSink()); @@ -1597,7 +1579,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TaskParams = taskParams, .TypeEnv = typeEnv, .HolderFactory = holderFactory, - .RandomProvider = TaskRunner ? TaskRunner->GetRandomProvider() : nullptr + .RandomProvider = randomProvider }); } catch (const std::exception& ex) { throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what(); diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 8d2ffb00924b..1d30d65257a1 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -62,12 +62,39 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseTaskRunner->Prepare(this->Task, limits, execCtx); + for (auto& [channelId, channel] : this->InputChannelsMap) { + channel.Channel = this->TaskRunner->GetInputChannel(channelId); + } + + for (auto& [inputIndex, source] : this->SourcesMap) { + source.Buffer = this->TaskRunner->GetSource(inputIndex); + Y_ABORT_UNLESS(source.Buffer); + } + + for (auto& [inputIndex, transform] : this->InputTransformsMap) { + std::tie(transform.InputBuffer, transform.Buffer) = this->TaskRunner->GetInputTransform(inputIndex); + } + + for (auto& [channelId, channel] : this->OutputChannelsMap) { + channel.Channel = this->TaskRunner->GetOutputChannel(channelId); + } + + for (auto& [outputIndex, transform] : this->OutputTransformsMap) { + std::tie(transform.Buffer, transform.OutputBuffer) = this->TaskRunner->GetOutputTransform(outputIndex); + } + + for (auto& [outputIndex, sink] : this->SinksMap) { + sink.Buffer = this->TaskRunner->GetSink(outputIndex); + } + TBase::FillIoMaps( - this->TaskRunner->GetHolderFactory(), - this->TaskRunner->GetTypeEnv(), - this->TaskRunner->GetSecureParams(), - this->TaskRunner->GetTaskParams(), - this->TaskRunner->GetReadRanges()); + this->TaskRunner->GetHolderFactory(), + this->TaskRunner->GetTypeEnv(), + this->TaskRunner->GetSecureParams(), + this->TaskRunner->GetTaskParams(), + this->TaskRunner->GetReadRanges(), + this->TaskRunner->GetRandomProvider() + ); } };