From 9d9379abdb5e2ecdf085502b9ad4211782d2a2a9 Mon Sep 17 00:00:00 2001 From: Evgeny Zverev Date: Tue, 6 Feb 2024 15:28:56 +0000 Subject: [PATCH] YQL-17542 get rid of std::any in handling sources state --- .../compute_actor/kqp_scan_compute_actor.cpp | 8 ++------ .../compute_actor/kqp_scan_compute_actor.h | 4 ++-- .../dq/actors/compute/dq_compute_actor_impl.h | 20 ++++++++++--------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index f6a48b208c7b..d4cd835e37fe 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev) } } -void TKqpScanComputeActor::PollSources(std::any prev) { +void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) { if (!ScanData || ScanData->IsFinished()) { return; } const auto hasNewMemoryPred = [&]() { - if (!prev.has_value()) { - return false; - } const ui64 freeSpace = CalculateFreeSpace(); - const ui64 prevFreeSpace = std::any_cast(prev); return freeSpace > prevFreeSpace; }; if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) { return; } - const ui32 freeSpace = CalculateFreeSpace(); + const ui64 freeSpace = CalculateFreeSpace(); CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace); for (auto&& i : Fetchers) { Send(i, new TEvScanExchange::TEvAckData(freeSpace)); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index ccf548819a8c..dd9d78091ec6 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase } virtual void DoExecuteImpl() { - auto sourcesState = GetSourcesState(); + auto sourcesState = static_cast(this)->GetSourcesState(); PollAsyncInput(); ERunStatus status = TaskRunner->Run(); @@ -319,7 +319,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped CA_LOG_T("Resume execution, run status: " << status); if (status != ERunStatus::Finished) { - PollSources(std::move(sourcesState)); + static_cast(this)->PollSources(std::move(sourcesState)); } if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) { @@ -1049,13 +1049,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped protected: // virtual methods (TODO: replace with static_cast(this)->Foo() - virtual std::any GetSourcesState() { - return nullptr; - } - - virtual void PollSources(std::any /* state */) { - } - virtual void TerminateSources(const TIssues& /* issues */, bool /* success */) { } @@ -1071,6 +1064,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return true; } +protected: + // methods that are called via static_cast(this) and may be overriden by a dervied class + void* GetSourcesState() const { + return nullptr; + } + void PollSources(void* /* state */) { + } + + protected: void HandleExecuteBase(TEvDqCompute::TEvResumeExecution::TPtr&) { ResumeEventScheduled = false;