Skip to content

Commit ae6ef0c

Browse files
authored
YQL-17542 finalize split sync async CAs (#1689)
* Yql 17542 simplify alloc in compute actor (#1452) * YQL-17542 Simplify allocator usage in ComputeActors * fix build * fix dup * YQL-17542 move SaveState LoadState (#1474) * YQL-17703 always use sized allocator in CA (#1522) * YQL-17542 split FillIoMaps (#1537) * YQL-17755 fix drying input up (#1604) * YQL-17542 split stat (#1553) * YQL-17542 remove transition guards (#1610) * YQL-17542 get rid of std::any in handling sources state (#1635) * YQL-17755 ut for TComputeActorAsyncInputHelperTest::PollAsyncInput (#1626) * YQL-17542 move TaskRunner dependent Execute to TDqSyncComputeActorBase (#1599) * YQL-17542 move TaskRunner dependent Execute to TDqSyncComputeActorBase (#1666)
1 parent 9608841 commit ae6ef0c

10 files changed

+281
-183
lines changed

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

+2-6
Original file line numberDiff line numberDiff line change
@@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev)
156156
}
157157
}
158158

159-
void TKqpScanComputeActor::PollSources(std::any prev) {
159+
void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) {
160160
if (!ScanData || ScanData->IsFinished()) {
161161
return;
162162
}
163163
const auto hasNewMemoryPred = [&]() {
164-
if (!prev.has_value()) {
165-
return false;
166-
}
167164
const ui64 freeSpace = CalculateFreeSpace();
168-
const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
169165
return freeSpace > prevFreeSpace;
170166
};
171167
if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) {
172168
return;
173169
}
174-
const ui32 freeSpace = CalculateFreeSpace();
170+
const ui64 freeSpace = CalculateFreeSpace();
175171
CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace);
176172
for (auto&& i : Fetchers) {
177173
Send(i, new TEvScanExchange::TEvAckData(freeSpace));

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
8282
: 0ul;
8383
}
8484

85-
std::any GetSourcesState() override {
85+
ui64 GetSourcesState() {
8686
if (!ScanData) {
8787
return 0;
8888
}
8989
return CalculateFreeSpace();
9090
}
9191

92-
void PollSources(std::any prev) override;
92+
void PollSources(ui64 prevFreeSpace);
9393

9494
void PassAway() override {
9595
if (TaskRunner) {

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

+6-18
Original file line numberDiff line numberDiff line change
@@ -461,18 +461,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
461461
return inputChannel->FreeSpace;
462462
}
463463

464-
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() override {
465-
return TypeEnv->BindAllocator();
466-
}
467-
468-
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> MaybeBindAllocator() override {
469-
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard;
470-
if (TypeEnv) {
471-
guard.emplace(TypeEnv->BindAllocator());
472-
}
473-
return guard;
474-
}
475-
476464
void OnTaskRunnerCreated(NTaskRunnerActor::TEvTaskRunnerCreateFinished::TPtr& ev) {
477465
const auto& secureParams = ev->Get()->SecureParams;
478466
const auto& taskParams = ev->Get()->TaskParams;
@@ -483,7 +471,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
483471
Stat->AddCounters2(ev->Get()->Sensors);
484472
}
485473
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
486-
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges);
474+
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);
487475

488476
{
489477
// say "Hello" to executer
@@ -517,7 +505,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
517505

518506
MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit;
519507
ProfileStats = std::move(ev->Get()->ProfileStats);
520-
auto sourcesState = GetSourcesState();
521508
auto status = ev->Get()->RunStatus;
522509

523510
CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState
@@ -536,10 +523,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
536523
}
537524
}
538525

539-
if (status != ERunStatus::Finished) {
540-
PollSources(std::move(sourcesState));
541-
}
542-
543526
if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
544527
ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
545528
WatermarksTracker.PopPendingWatermark();
@@ -801,6 +784,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
801784
return TaskRunnerStats.Get();
802785
}
803786

787+
const NYql::NDq::TDqMeteringStats* GetMeteringStats() override {
788+
// TODO: support async CA
789+
return nullptr;
790+
}
791+
804792
template<typename TSecond>
805793
TVector<ui32> GetIds(const THashMap<ui64, TSecond>& collection) {
806794
TVector<ui32> ids;

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

-3
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,6 @@ struct TComputeMemoryLimits {
364364
IMemoryQuotaManager::TPtr MemoryQuotaManager;
365365
};
366366

367-
//temporary flag to integarate changes in interface
368-
#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1
369-
370367
using TTaskRunnerFactory = std::function<
371368
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
372369
>;

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ struct TComputeActorAsyncInputHelper {
8383
Pause(*watermark);
8484
}
8585
}
86+
const bool emptyBatch = batch.empty();
8687
AsyncInputPush(std::move(batch), space, finished);
87-
if (!batch.empty()) {
88+
if (!emptyBatch) {
8889
// If we have read some data, we must run such reading again
8990
// to process the case when async input notified us about new data
9091
// but we haven't read all of it.

0 commit comments

Comments
 (0)