Skip to content

Commit f2808e8

Browse files
fraction cpu usage in conveyors (#4870)
1 parent bc3818b commit f2808e8

File tree

7 files changed

+74
-14
lines changed

7 files changed

+74
-14
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ message TConveyorConfig {
592592
optional uint32 WorkersCount = 2;
593593
optional uint32 QueueSizeLimit = 3;
594594
optional double DefaultFractionOfThreadsCount = 4;
595+
optional double WorkersCountDouble = 5;
595596
}
596597

597598
message TExternalIndexConfig {

ydb/core/tx/conveyor/service/service.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ void TDistributor::Bootstrap() {
1616
const ui32 workersCount = Config.GetWorkersCountForConveyor(NKqp::TStagePredictor::GetUsableThreads());
1717
AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("action", "conveyor_registered")("actor_id", SelfId())("workers_count", workersCount)("config", Config.DebugString());
1818
for (ui32 i = 0; i < workersCount; ++i) {
19-
Workers.emplace_back(Register(new TWorker(ConveyorName)));
19+
const double usage = Config.GetWorkerCPUUsage(i);
20+
Workers.emplace_back(Register(new TWorker(ConveyorName, usage, SelfId())));
21+
if (usage < 1) {
22+
AFL_VERIFY(!SlowWorkerId);
23+
SlowWorkerId = Workers.back();
24+
}
2025
}
2126
Counters.AvailableWorkersCount->Set(Workers.size());
2227
Counters.WorkersCountLimit->Set(Workers.size());
@@ -62,8 +67,13 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
6267
Counters.WaitingHistogram->Collect(0);
6368

6469
wTask.OnBeforeStart();
65-
Send(Workers.back(), new TEvInternal::TEvNewTask(wTask));
66-
Workers.pop_back();
70+
if (Workers.size() == 1 || !SlowWorkerId || Workers.back() != *SlowWorkerId) {
71+
Send(Workers.back(), new TEvInternal::TEvNewTask(wTask));
72+
Workers.pop_back();
73+
} else {
74+
Send(Workers.front(), new TEvInternal::TEvNewTask(wTask));
75+
Workers.pop_front();
76+
}
6777
Counters.UseWorkerRate->Inc();
6878
} else if (Waiting.size() < Config.GetQueueSizeLimit()) {
6979
Waiting.push(wTask);

ydb/core/tx/conveyor/service/service.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ class TDistributor: public TActorBootstrapped<TDistributor> {
7575
const TConfig Config;
7676
const TString ConveyorName = "common";
7777
TDequePriorityFIFO Waiting;
78-
std::vector<TActorId> Workers;
78+
std::deque<TActorId> Workers;
79+
std::optional<NActors::TActorId> SlowWorkerId;
7980
TCounters Counters;
8081
THashMap<TString, std::shared_ptr<TTaskSignals>> Signals;
8182

ydb/core/tx/conveyor/service/worker.cpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,35 @@
22

33
namespace NKikimr::NConveyor {
44

5-
void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
6-
auto& workerTask = ev->Get()->GetTask();
5+
void TWorker::ExecuteTask(const TWorkerTask& workerTask) {
6+
std::optional<TMonotonic> start;
7+
if (CPUUsage < 1) {
8+
start = TMonotonic::Now();
9+
}
710
if (workerTask.GetTask()->Execute(workerTask.GetTaskSignals())) {
8-
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()).SendTo(ev->Sender);
11+
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()).SendTo(DistributorId);
12+
} else {
13+
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()->GetErrorMessage()).SendTo(DistributorId);
14+
}
15+
if (CPUUsage < 1) {
16+
Schedule((TMonotonic::Now() - *start) * (1 - CPUUsage), new NActors::TEvents::TEvWakeup);
17+
WaitWakeUp = true;
18+
}
19+
}
20+
21+
void TWorker::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) {
22+
WaitWakeUp = false;
23+
if (WaitTask) {
24+
ExecuteTask(*WaitTask);
25+
WaitTask.reset();
26+
}
27+
}
28+
29+
void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
30+
if (!WaitWakeUp) {
31+
ExecuteTask(ev->Get()->GetTask());
932
} else {
10-
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()->GetErrorMessage()).SendTo(ev->Sender);
33+
WaitTask = ev->Get()->GetTask();
1134
}
1235
}
1336

ydb/core/tx/conveyor/service/worker.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,20 @@ struct TEvInternal {
9191
class TWorker: public NActors::TActorBootstrapped<TWorker> {
9292
private:
9393
using TBase = NActors::TActorBootstrapped<TWorker>;
94-
public:
94+
const double CPUUsage = 1;
95+
bool WaitWakeUp = false;
96+
const NActors::TActorId DistributorId;
97+
std::optional<TWorkerTask> WaitTask;
98+
void ExecuteTask(const TWorkerTask& workerTask);
9599
void HandleMain(TEvInternal::TEvNewTask::TPtr& ev);
100+
void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
101+
public:
96102

97103
STATEFN(StateMain) {
98104
switch (ev->GetTypeRewrite()) {
99105
hFunc(TEvInternal::TEvNewTask, HandleMain);
100-
default:
106+
hFunc(NActors::TEvents::TEvWakeup, HandleMain);
107+
default:
101108
ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite();
102109
break;
103110
}
@@ -107,8 +114,10 @@ class TWorker: public NActors::TActorBootstrapped<TWorker> {
107114
Become(&TWorker::StateMain);
108115
}
109116

110-
TWorker(const TString& conveyorName)
117+
TWorker(const TString& conveyorName, const double cpuUsage, const NActors::TActorId& distributorId)
111118
: TBase("CONVEYOR::" + conveyorName + "::WORKER")
119+
, CPUUsage(cpuUsage)
120+
, DistributorId(distributorId)
112121
{
113122

114123
}

ydb/core/tx/conveyor/usage/config.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config)
1212
if (config.HasQueueSizeLimit()) {
1313
QueueSizeLimit = config.GetQueueSizeLimit();
1414
}
15-
if (config.HasWorkersCount()) {
15+
if (config.HasWorkersCountDouble()) {
16+
WorkersCount = config.GetWorkersCountDouble();
17+
} else if (config.HasWorkersCount()) {
1618
WorkersCount = config.GetWorkersCount();
1719
}
1820
if (config.HasDefaultFractionOfThreadsCount()) {
@@ -23,7 +25,7 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config)
2325

2426
ui32 TConfig::GetWorkersCountForConveyor(const ui32 poolThreadsCount) const {
2527
if (WorkersCount) {
26-
return *WorkersCount;
28+
return std::ceil(*WorkersCount);
2729
} else if (DefaultFractionOfThreadsCount) {
2830
return Min<ui32>(poolThreadsCount, Max<ui32>(1, *DefaultFractionOfThreadsCount * poolThreadsCount));
2931
} else {
@@ -44,4 +46,17 @@ TString TConfig::DebugString() const {
4446
return sb;
4547
}
4648

49+
double TConfig::GetWorkerCPUUsage(const ui32 workerIdx) const {
50+
if (!WorkersCount) {
51+
return 1;
52+
}
53+
double wholePart;
54+
const double fractionalPart = std::modf(*WorkersCount, &wholePart);
55+
if (workerIdx + 1 <= wholePart) {
56+
return 1;
57+
} else {
58+
return fractionalPart;
59+
}
60+
}
61+
4762
}

ydb/core/tx/conveyor/usage/config.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ namespace NKikimr::NConveyor {
66

77
class TConfig {
88
private:
9-
YDB_OPT(ui32, WorkersCount);
9+
YDB_OPT(double, WorkersCount);
1010
YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024);
1111
YDB_READONLY_FLAG(Enabled, true);
1212
YDB_OPT(double, DefaultFractionOfThreadsCount);
1313
public:
1414
bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config);
1515
ui32 GetWorkersCountForConveyor(const ui32 poolThreadsCount) const;
16+
double GetWorkerCPUUsage(const ui32 workerIdx) const;
1617
TString DebugString() const;
1718
};
1819

0 commit comments

Comments
 (0)