Skip to content

YQL-17542 get rid of std::any in handling sources state #1635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev)
}
}

void TKqpScanComputeActor::PollSources(std::any prev) {
void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) {
if (!ScanData || ScanData->IsFinished()) {
return;
}
const auto hasNewMemoryPred = [&]() {
if (!prev.has_value()) {
return false;
}
const ui64 freeSpace = CalculateFreeSpace();
const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
return freeSpace > prevFreeSpace;
};
if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) {
return;
}
const ui32 freeSpace = CalculateFreeSpace();
const ui64 freeSpace = CalculateFreeSpace();
CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace);
for (auto&& i : Fetchers) {
Send(i, new TEvScanExchange::TEvAckData(freeSpace));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
: 0ul;
}

std::any GetSourcesState() override {
ui64 GetSourcesState() {
if (!ScanData) {
return 0;
}
return CalculateFreeSpace();
}

void PollSources(std::any prev) override;
void PollSources(ui64 prevFreeSpace);

void PassAway() override {
if (TaskRunner) {
Expand Down
20 changes: 11 additions & 9 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

virtual void DoExecuteImpl() {
auto sourcesState = GetSourcesState();
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();

PollAsyncInput();
ERunStatus status = TaskRunner->Run();

CA_LOG_T("Resume execution, run status: " << status);

if (status != ERunStatus::Finished) {
PollSources(std::move(sourcesState));
static_cast<TDerived*>(this)->PollSources(std::move(sourcesState));
}

if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
Expand Down Expand Up @@ -1049,13 +1049,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
protected:
// virtual methods (TODO: replace with static_cast<TDerived*>(this)->Foo()

virtual std::any GetSourcesState() {
return nullptr;
}

virtual void PollSources(std::any /* state */) {
}

virtual void TerminateSources(const TIssues& /* issues */, bool /* success */) {
}

Expand All @@ -1071,6 +1064,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
return true;
}

protected:
// methods that are called via static_cast<TDerived*>(this) and may be overriden by a dervied class
void* GetSourcesState() const {
return nullptr;
}
void PollSources(void* /* state */) {
}


protected:
void HandleExecuteBase(TEvDqCompute::TEvResumeExecution::TPtr&) {
ResumeEventScheduled = false;
Expand Down