Skip to content

Commit b2df7c8

Browse files
authored
Fix busywait on adding to full async input buffer (#14522)
1 parent 7bc905c commit b2df7c8

File tree

3 files changed

+39
-8
lines changed

3 files changed

+39
-8
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
696696
}
697697

698698
void DoExecuteImpl() override {
699-
PollAsyncInput();
699+
LastPollResult = PollAsyncInput();
700+
701+
if (LastPollResult && *LastPollResult != EResumeSource::CAPollAsyncNoSpace) {
702+
// When (some) source buffers was not full, and (some) was successfully polled,
703+
// initiate next DoExecute run immediately;
704+
// If only reason for continuing was lack on space on all source
705+
// buffers, only continue execution after run completed,
706+
// (some) sources was consumed and compute waits for input
707+
// (Otherwise we enter busy-poll, and there are especially bad scenario
708+
// when compute is delayed by rate-limiter, we enter busy-poll here,
709+
// this spends cpu, ratelimiter delays compute execution even more))
710+
ContinueExecute(*std::exchange(LastPollResult, {}));
711+
}
712+
700713
if (ProcessSourcesState.Inflight == 0) {
701714
auto req = GetCheckpointRequest();
702715
CA_LOG_T("DoExecuteImpl: " << (bool) req);
@@ -1194,6 +1207,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
11941207
CA_LOG_T("AsyncCheckRunStatus: TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size());
11951208
return;
11961209
}
1210+
if (ProcessOutputsState.LastRunStatus == ERunStatus::PendingInput && LastPollResult) {
1211+
ContinueExecute(*LastPollResult);
1212+
}
11971213
TBase::CheckRunStatus();
11981214
}
11991215

@@ -1242,6 +1258,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
12421258
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
12431259
NMonitoring::TDynamicCounters::TCounterPtr CpuTime;
12441260
NDqProto::TEvComputeActorState ComputeActorState;
1261+
TMaybe<EResumeSource> LastPollResult;
12451262
};
12461263

12471264

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
459459
return;
460460
}
461461

462-
if (status != ERunStatus::Finished) {
462+
if (status == ERunStatus::PendingInput) {
463463
for (auto& [id, inputTransform] : InputTransformsMap) {
464464
if (!inputTransform.Buffer->Empty()) {
465465
ContinueExecute(EResumeSource::CAPendingInput);
@@ -1465,31 +1465,38 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14651465
}
14661466
}
14671467

1468-
void PollAsyncInput() {
1468+
[[nodiscard]]
1469+
TMaybe<EResumeSource> PollAsyncInput() {
1470+
TMaybe<EResumeSource> pollResult;
14691471
if (!Running) {
14701472
CA_LOG_T("Skip polling inputs and sources because not running");
1471-
return;
1473+
return pollResult;
14721474
}
14731475

14741476
CA_LOG_T("Poll inputs");
14751477
for (auto& [inputIndex, transform] : InputTransformsMap) {
14761478
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1477-
ContinueExecute(*resume);
1479+
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
1480+
pollResult = resume;
1481+
}
14781482
}
14791483
}
14801484

14811485
// Don't produce any input from sources if we're about to save checkpoint.
14821486
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
14831487
CA_LOG_T("Skip polling sources because of pending checkpoint");
1484-
return;
1488+
return pollResult;
14851489
}
14861490

14871491
CA_LOG_T("Poll sources");
14881492
for (auto& [inputIndex, source] : SourcesMap) {
14891493
if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1490-
ContinueExecute(*resume);
1494+
if (!pollResult || *pollResult == EResumeSource::CAPollAsyncNoSpace) {
1495+
pollResult = resume;
1496+
}
14911497
}
14921498
}
1499+
return pollResult;
14931500
}
14941501

14951502
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
3030
void DoExecuteImpl() override{
3131
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();
3232

33-
TBase::PollAsyncInput();
33+
auto lastPollResult = TBase::PollAsyncInput();
3434
ERunStatus status = TaskRunner->Run();
3535

3636
CA_LOG_T("Resume execution, run status: " << status);
@@ -44,6 +44,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
}
4545

4646
TBase::ProcessOutputsImpl(status);
47+
48+
if (lastPollResult && (*lastPollResult != EResumeSource::CAPollAsyncNoSpace || status == ERunStatus::PendingInput)) {
49+
// If only reason for continuing was lack on space on all sources,
50+
// only continue execution when input was consumed;
51+
// otherwise this may result in busy-poll
52+
TBase::ContinueExecute(*lastPollResult);
53+
}
4754
}
4855

4956
void DoTerminateImpl() override {

0 commit comments

Comments
 (0)