Skip to content

Commit c318914

Browse files
authored
Make Async CA drain output channels logic similar to Sync CA (#5794)
1 parent 805af0e commit c318914

File tree

1 file changed

+13
-42
lines changed

1 file changed

+13
-42
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

+13-42
Original file line numberDiff line numberDiff line change
@@ -431,20 +431,19 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
431431
// << ", finished: " << outputChannel.Channel->IsFinished());
432432
);
433433

434-
outputChannel.PopStarted = true;
435-
const bool hasFreeMemory = peerState.HasFreeMemory();
434+
const bool shouldSkipData = Channels->ShouldSkipData(outputChannel.ChannelId);
435+
const bool hasFreeMemory = Channels->HasFreeMemoryInChannel(outputChannel.ChannelId);
436436
UpdateBlocked(outputChannel, !hasFreeMemory);
437-
ProcessOutputsState.Inflight++;
438-
if (!hasFreeMemory) {
439-
CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId
440-
<< ", peerState:(" << peerState.DebugString() << ")"
441-
);
442-
auto ev = MakeHolder<NTaskRunnerActor::TEvOutputChannelData>(channelId);
443-
Y_ABORT_UNLESS(!ev->Finished);
444-
Send(SelfId(), std::move(ev)); // try again, ev.Finished == false
437+
438+
if (!shouldSkipData && !outputChannel.EarlyFinish && !hasFreeMemory) {
439+
CA_LOG_T("DrainOutputChannel return because No free memory in channel, channel: " << outputChannel.ChannelId);
440+
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
441+
ProcessOutputsState.AllOutputsFinished = !outputChannel.Finished;
445442
return;
446443
}
447444

445+
outputChannel.PopStarted = true;
446+
ProcessOutputsState.Inflight++;
448447
Send(TaskRunnerActorId, new NTaskRunnerActor::TEvOutputChannelDataRequest(channelId, wasFinished, peerState.GetFreeMemory()));
449448
}
450449

@@ -494,7 +493,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
494493
TOutputChannelInfo* outputChannel = OutputChannelsMap.FindPtr(channelId);
495494
outputChannel->Finished = true;
496495
outputChannel->EarlyFinish = true;
497-
TrySendAsyncChannelData(*outputChannel); // early finish (skip data)
498496
YQL_ENSURE(outputChannel, "task: " << Task.GetId() << ", output channelId: " << channelId);
499497

500498
if (outputChannel->PopStarted) { // There may be another in-flight message here
@@ -604,8 +602,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
604602
}
605603

606604
void DoExecuteImpl() override {
607-
TrySendAsyncChannelsData();
608-
609605
PollAsyncInput();
610606
if (ProcessSourcesState.Inflight == 0) {
611607
auto req = GetCheckpointRequest();
@@ -668,7 +664,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
668664
Stat->AddCounters2(ev->Get()->Sensors);
669665
}
670666
ContinueRunInflight = false;
671-
TrySendAsyncChannelsData(); // send from previous cycle
672667

673668
MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit;
674669
ProfileStats = std::move(ev->Get()->ProfileStats);
@@ -774,26 +769,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
774769
outputChannel.AsyncData->Finished = ev->Get()->Finished;
775770
outputChannel.AsyncData->Changed = ev->Get()->Changed;
776771

777-
if (TrySendAsyncChannelData(outputChannel)) {
778-
CheckRunStatus();
779-
}
772+
SendAsyncChannelData(outputChannel);
773+
CheckRunStatus();
780774
}
781775

782-
bool TrySendAsyncChannelData(TOutputChannelInfo& outputChannel) {
783-
if (!outputChannel.AsyncData) {
784-
return false;
785-
}
776+
void SendAsyncChannelData(TOutputChannelInfo& outputChannel) {
777+
Y_ABORT_UNLESS(outputChannel.AsyncData);
786778

787779
// If the channel has finished, then the data received after drain is no longer needed
788780
const bool shouldSkipData = Channels->ShouldSkipData(outputChannel.ChannelId);
789-
if (!shouldSkipData && !Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution.
790-
CA_LOG_T("TrySendAsyncChannelData return false because Channel can't send channel data, channel: " << outputChannel.ChannelId);
791-
return false;
792-
}
793-
if (!shouldSkipData && !outputChannel.EarlyFinish && !Channels->HasFreeMemoryInChannel(outputChannel.ChannelId)) {
794-
CA_LOG_T("TrySendAsyncChannelData return false because No free memory in channel, channel: " << outputChannel.ChannelId);
795-
return false;
796-
}
797781

798782
auto& asyncData = *outputChannel.AsyncData;
799783
outputChannel.Finished = asyncData.Finished || shouldSkipData || outputChannel.EarlyFinish;
@@ -861,19 +845,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
861845
FinishedSinks.size() == SinksMap.size();
862846

863847
outputChannel.AsyncData = Nothing();
864-
865-
return true;
866-
}
867-
868-
bool TrySendAsyncChannelsData() {
869-
bool result = false;
870-
for (auto& [channelId, outputChannel] : OutputChannelsMap) {
871-
result |= TrySendAsyncChannelData(outputChannel);
872-
}
873-
if (result) {
874-
CheckRunStatus();
875-
}
876-
return result;
877848
}
878849

879850
void OnInputChannelDataAck(NTaskRunnerActor::TEvInputChannelDataAck::TPtr& ev) {

0 commit comments

Comments
 (0)