Skip to content

Commit 27aa1cf

Browse files
committed
YQL-17542 split FillIoMaps
1 parent 83b8570 commit 27aa1cf

File tree

3 files changed

+61
-52
lines changed

3 files changed

+61
-52
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
483483
Stat->AddCounters2(ev->Get()->Sensors);
484484
}
485485
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
486-
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges);
486+
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);
487487

488488
{
489489
// say "Hello" to executer

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 28 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,19 +1469,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14691469
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
14701470
const THashMap<TString, TString>& secureParams,
14711471
const THashMap<TString, TString>& taskParams,
1472-
const TVector<TString>& readRanges)
1472+
const TVector<TString>& readRanges,
1473+
IRandomProvider* randomProvider
1474+
)
14731475
{
1474-
if (TaskRunner) {
1475-
for (auto& [channelId, channel] : InputChannelsMap) {
1476-
channel.Channel = TaskRunner->GetInputChannel(channelId);
1477-
}
1478-
}
14791476
auto collectStatsLevel = StatsModeToCollectStatsLevel(RuntimeSettings.StatsMode);
14801477
for (auto& [inputIndex, source] : SourcesMap) {
1481-
if constexpr (!TDerived::HasAsyncTaskRunner) {
1482-
source.Buffer = TaskRunner->GetSource(inputIndex);
1483-
Y_ABORT_UNLESS(source.Buffer);
1484-
}
14851478
Y_ABORT_UNLESS(AsyncIoFactory);
14861479
const auto& inputDesc = Task.GetInputs(inputIndex);
14871480
Y_ABORT_UNLESS(inputDesc.HasSource());
@@ -1515,9 +1508,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15151508
this->RegisterWithSameMailbox(source.Actor);
15161509
}
15171510
for (auto& [inputIndex, transform] : InputTransformsMap) {
1518-
if constexpr (!TDerived::HasAsyncTaskRunner) {
1519-
transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry);
1520-
std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex);
1511+
Y_ABORT_UNLESS(TaskRunner);
1512+
transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry);
15211513
Y_ABORT_UNLESS(AsyncIoFactory);
15221514
const auto& inputDesc = Task.GetInputs(inputIndex);
15231515
CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString());
@@ -1543,43 +1535,33 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15431535
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
15441536
}
15451537
this->RegisterWithSameMailbox(transform.Actor);
1546-
}
1547-
}
1548-
if (TaskRunner) {
1549-
for (auto& [channelId, channel] : OutputChannelsMap) {
1550-
channel.Channel = TaskRunner->GetOutputChannel(channelId);
1551-
}
15521538
}
15531539
for (auto& [outputIndex, transform] : OutputTransformsMap) {
1554-
if (TaskRunner) {
1555-
transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry);
1556-
std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex);
1557-
Y_ABORT_UNLESS(AsyncIoFactory);
1558-
const auto& outputDesc = Task.GetOutputs(outputIndex);
1559-
CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString());
1560-
try {
1561-
std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform(
1562-
IDqAsyncIoFactory::TOutputTransformArguments {
1563-
.OutputDesc = outputDesc,
1564-
.OutputIndex = outputIndex,
1565-
.StatsLevel = collectStatsLevel,
1566-
.TxId = TxId,
1567-
.TransformOutput = transform.OutputBuffer,
1568-
.Callback = static_cast<TOutputTransformCallbacks*>(this),
1569-
.SecureParams = secureParams,
1570-
.TaskParams = taskParams,
1571-
.TypeEnv = typeEnv,
1572-
.HolderFactory = holderFactory,
1573-
.ProgramBuilder = *transform.ProgramBuilder
1574-
});
1575-
} catch (const std::exception& ex) {
1576-
throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what();
1577-
}
1578-
this->RegisterWithSameMailbox(transform.Actor);
1540+
transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry);
1541+
Y_ABORT_UNLESS(AsyncIoFactory);
1542+
const auto& outputDesc = Task.GetOutputs(outputIndex);
1543+
CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString());
1544+
try {
1545+
std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform(
1546+
IDqAsyncIoFactory::TOutputTransformArguments {
1547+
.OutputDesc = outputDesc,
1548+
.OutputIndex = outputIndex,
1549+
.StatsLevel = collectStatsLevel,
1550+
.TxId = TxId,
1551+
.TransformOutput = transform.OutputBuffer,
1552+
.Callback = static_cast<TOutputTransformCallbacks*>(this),
1553+
.SecureParams = secureParams,
1554+
.TaskParams = taskParams,
1555+
.TypeEnv = typeEnv,
1556+
.HolderFactory = holderFactory,
1557+
.ProgramBuilder = *transform.ProgramBuilder
1558+
});
1559+
} catch (const std::exception& ex) {
1560+
throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what();
15791561
}
1562+
this->RegisterWithSameMailbox(transform.Actor);
15801563
}
15811564
for (auto& [outputIndex, sink] : SinksMap) {
1582-
if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); }
15831565
Y_ABORT_UNLESS(AsyncIoFactory);
15841566
const auto& outputDesc = Task.GetOutputs(outputIndex);
15851567
Y_ABORT_UNLESS(outputDesc.HasSink());
@@ -1597,7 +1579,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15971579
.TaskParams = taskParams,
15981580
.TypeEnv = typeEnv,
15991581
.HolderFactory = holderFactory,
1600-
.RandomProvider = TaskRunner ? TaskRunner->GetRandomProvider() : nullptr
1582+
.RandomProvider = randomProvider
16011583
});
16021584
} catch (const std::exception& ex) {
16031585
throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what();

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,39 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
6262

6363
this->TaskRunner->Prepare(this->Task, limits, execCtx);
6464

65+
for (auto& [channelId, channel] : this->InputChannelsMap) {
66+
channel.Channel = this->TaskRunner->GetInputChannel(channelId);
67+
}
68+
69+
for (auto& [inputIndex, source] : this->SourcesMap) {
70+
source.Buffer = this->TaskRunner->GetSource(inputIndex);
71+
Y_ABORT_UNLESS(source.Buffer);
72+
}
73+
74+
for (auto& [inputIndex, transform] : this->InputTransformsMap) {
75+
std::tie(transform.InputBuffer, transform.Buffer) = this->TaskRunner->GetInputTransform(inputIndex);
76+
}
77+
78+
for (auto& [channelId, channel] : this->OutputChannelsMap) {
79+
channel.Channel = this->TaskRunner->GetOutputChannel(channelId);
80+
}
81+
82+
for (auto& [outputIndex, transform] : this->OutputTransformsMap) {
83+
std::tie(transform.Buffer, transform.OutputBuffer) = this->TaskRunner->GetOutputTransform(outputIndex);
84+
}
85+
86+
for (auto& [outputIndex, sink] : this->SinksMap) {
87+
sink.Buffer = this->TaskRunner->GetSink(outputIndex);
88+
}
89+
6590
TBase::FillIoMaps(
66-
this->TaskRunner->GetHolderFactory(),
67-
this->TaskRunner->GetTypeEnv(),
68-
this->TaskRunner->GetSecureParams(),
69-
this->TaskRunner->GetTaskParams(),
70-
this->TaskRunner->GetReadRanges());
91+
this->TaskRunner->GetHolderFactory(),
92+
this->TaskRunner->GetTypeEnv(),
93+
this->TaskRunner->GetSecureParams(),
94+
this->TaskRunner->GetTaskParams(),
95+
this->TaskRunner->GetReadRanges(),
96+
this->TaskRunner->GetRandomProvider()
97+
);
7198
}
7299
};
73100

0 commit comments

Comments
 (0)