Skip to content

Commit bc8acf2

Browse files
authored
Merge 2795dbb into 6e8cb11
2 parents 6e8cb11 + 2795dbb commit bc8acf2

File tree

6 files changed

+118
-12
lines changed

6 files changed

+118
-12
lines changed

ydb/library/actors/core/actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ namespace NActors {
161161
return NHPTimer::GetSeconds(GetCurrentEventTicks());
162162
}
163163

164+
void TActivationContext::SetMailboxStatCallback(std::function<void(const TMailboxStat&)> callback) {
165+
TlsActivationContext->Mailbox.SetStatCallback(callback);
166+
}
167+
164168
TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
165169
return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId);
166170
}

ydb/library/actors/core/actor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "actorsystem.h"
44
#include "event.h"
55
#include "executor_thread.h"
6+
#include "mailbox.h"
67
#include "monotonic.h"
78
#include "thread_context.h"
89

@@ -130,6 +131,8 @@ namespace NActors {
130131

131132
static i64 GetCurrentEventTicks();
132133
static double GetCurrentEventTicksAsSeconds();
134+
135+
static void SetMailboxStatCallback(std::function<void(const TMailboxStat&)> callback);
133136
};
134137

135138
struct TActorContext: public TActivationContext {

ydb/library/actors/core/actor_ut.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
#include "events.h"
33
#include "actorsystem.h"
44
#include "executor_pool_basic.h"
5+
#include "mailbox.h"
56
#include "scheduler_basic.h"
67
#include "actor_bootstrapped.h"
78
#include "actor_benchmark_helper.h"
89

10+
#include <atomic>
911
#include <ydb/library/actors/testlib/test_runtime.h>
1012
#include <ydb/library/actors/util/threadparkpad.h>
1113
#include <library/cpp/testing/unittest/registar.h>
@@ -480,16 +482,22 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
480482

481483
struct TTestActor : TActorBootstrapped<TTestActor> {
482484
static constexpr char ActorName[] = "TestActor";
485+
static std::atomic<ui64> MailboxStatCount;
483486

484487
void Bootstrap()
485488
{
489+
TActivationContext::SetMailboxStatCallback([](const TMailboxStat&) {
490+
MailboxStatCount++;
491+
});
486492
const auto& activityTypeIndex = GetActivityType();
487493
Y_ENSURE(activityTypeIndex < GetActivityTypeCount());
488494
Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor");
489495
PassAway();
490496
}
491497
};
492498

499+
std::atomic<ui64> TTestActor::MailboxStatCount = 0;
500+
493501
Y_UNIT_TEST(Basic) {
494502
THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>();
495503
setup->NodeId = 0;
@@ -522,6 +530,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
522530
pad.Park();
523531
actorSystem.Stop();
524532
UNIT_ASSERT(pongCounter == 2 && pingCounter == 2);
533+
UNIT_ASSERT(TTestActor::MailboxStatCount != 0);
525534
}
526535

527536
Y_UNIT_TEST(LocalProcessKey) {

ydb/library/actors/core/executor_thread.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ namespace NActors {
203203
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
204204
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205205
NHPTimer::STime eventStart = Ctx.HPStart;
206+
NHPTimer::STime mailboxStart = Ctx.HPStart;
206207
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);
207208

208209
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
@@ -372,7 +373,12 @@ namespace NActors {
372373
break; // empty queue, leave
373374
}
374375
}
375-
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
376+
377+
NActors::TMailboxStat mailboxStat;
378+
mailboxStat.ElapsedCycles = hpnow - mailboxStart;
379+
mailbox->InvokeMailboxStat(mailboxStat);
380+
381+
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
376382
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
377383

378384
NProfiling::TMemoryTagScope::Reset(0);

ydb/library/actors/core/mailbox.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ namespace NActors {
373373
CleanupActors();
374374
}
375375

376-
bool TMailboxHeader::CleanupActors() {
376+
bool TMailboxHeader::CleanupActors(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo) {
377377
bool done = true;
378378
switch (ActorPack) {
379379
case TMailboxActorPack::Simple: {
@@ -399,13 +399,28 @@ namespace NActors {
399399
done = false;
400400
break;
401401
}
402+
case TMailboxActorPack::Complex:
403+
Y_ABORT("Unexpected ActorPack type");
402404
}
403405
ActorPack = TMailboxActorPack::Simple;
404406
ActorsInfo.Simple.ActorId = 0;
405407
ActorsInfo.Simple.Actor = nullptr;
406408
return done;
407409
}
408410

411+
bool TMailboxHeader::CleanupActors() {
412+
if (ActorPack != TMailboxActorPack::Complex) {
413+
return CleanupActors(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo);
414+
} else {
415+
bool done = CleanupActors(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo);
416+
delete ActorsInfo.Complex;
417+
ActorPack = TMailboxActorPack::Simple;
418+
ActorsInfo.Simple.ActorId = 0;
419+
ActorsInfo.Simple.Actor = nullptr;
420+
return done;
421+
}
422+
}
423+
409424
std::pair<ui32, ui32> TMailboxHeader::CountMailboxEvents(ui64 localActorId, ui32 maxTraverse) {
410425
switch (Type) {
411426
case TMailboxType::Simple:

ydb/library/actors/core/mailbox.h

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "executor_pool.h"
66
#include "mailbox_queue_simple.h"
77
#include "mailbox_queue_revolving.h"
8+
#include <functional>
89
#include <ydb/library/actors/util/unordered_cache.h>
910
#include <library/cpp/threading/queue/mpsc_htswap.h>
1011
#include <library/cpp/threading/queue/mpsc_read_as_filled.h>
@@ -27,6 +28,10 @@ namespace NActors {
2728

2829
struct TMailboxHeader;
2930

31+
struct TMailboxStat {
32+
ui64 ElapsedCycles = 0;
33+
};
34+
3035
template<bool>
3136
struct TMailboxUsageImpl {
3237
void Push(ui64 /*localId*/) {}
@@ -53,7 +58,8 @@ namespace NActors {
5358
enum EType {
5459
Simple = 0,
5560
Array = 1,
56-
Map = 2
61+
Map = 2,
62+
Complex = 3,
5763
};
5864
};
5965

@@ -91,6 +97,8 @@ namespace NActors {
9197
TActorPair Actors[ARRAY_CAPACITY];
9298
};
9399

100+
struct alignas(64) TComplexActorInfo;
101+
94102
union TActorsInfo {
95103
TActorPair Simple;
96104
struct {
@@ -100,11 +108,19 @@ namespace NActors {
100108
struct {
101109
TActorMap* ActorsMap;
102110
} Map;
111+
TComplexActorInfo* Complex;
103112
} ActorsInfo;
104113

114+
struct alignas(64) TComplexActorInfo{
115+
TActorsInfo ActorsInfo;
116+
std::function<void(const TMailboxStat&)> StatCallback;
117+
TMailboxActorPack::EType ActorPack;
118+
};
119+
105120
TMailboxHeader(TMailboxType::EType type);
106121
~TMailboxHeader();
107122

123+
static bool CleanupActors(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo);
108124
bool CleanupActors();
109125

110126
// this interface is used exclusively by executor thread, so implementation is there
@@ -123,7 +139,7 @@ namespace NActors {
123139
}
124140

125141
template<typename T>
126-
void ForEach(T&& callback) noexcept {
142+
static void ForEach(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, T&& callback) noexcept {
127143
switch (ActorPack) {
128144
case TMailboxActorPack::Simple:
129145
if (ActorsInfo.Simple.ActorId) {
@@ -143,10 +159,22 @@ namespace NActors {
143159
callback(row.ActorId, row.Actor);
144160
}
145161
break;
162+
163+
case TMailboxActorPack::Complex:
164+
Y_ABORT("Unexpected ActorPack type");
146165
}
147166
}
148167

149-
IActor* FindActor(ui64 localActorId) noexcept {
168+
template<typename T>
169+
void ForEach(T&& callback) noexcept {
170+
if (ActorPack != TMailboxActorPack::Complex) {
171+
ForEach(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, std::move(callback));
172+
} else {
173+
ForEach(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, std::move(callback));
174+
}
175+
}
176+
177+
static IActor* FindActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept {
150178
switch (ActorPack) {
151179
case TMailboxActorPack::Simple: {
152180
if (ActorsInfo.Simple.ActorId == localActorId)
@@ -167,13 +195,21 @@ namespace NActors {
167195
}
168196
break;
169197
}
170-
default:
171-
Y_ABORT();
198+
case TMailboxActorPack::Complex:
199+
Y_ABORT("Unexpected ActorPack type");
172200
}
173201
return nullptr;
174202
}
175203

176-
void AttachActor(ui64 localActorId, IActor* actor) noexcept {
204+
IActor* FindActor(ui64 localActorId) noexcept {
205+
if (ActorPack != TMailboxActorPack::Complex) {
206+
return FindActor(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, localActorId);
207+
} else {
208+
return FindActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId);
209+
}
210+
}
211+
212+
static void AttachActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId, IActor* actor) noexcept {
177213
switch (ActorPack) {
178214
case TMailboxActorPack::Simple: {
179215
if (ActorsInfo.Simple.ActorId == 0) {
@@ -210,13 +246,21 @@ namespace NActors {
210246
}
211247
break;
212248
}
213-
default:
214-
Y_ABORT();
249+
case TMailboxActorPack::Complex:
250+
Y_ABORT("Unexpected ActorPack type");
215251
}
216252
}
217253

218-
IActor* DetachActor(ui64 localActorId) noexcept {
219-
Y_DEBUG_ABORT_UNLESS(FindActor(localActorId) != nullptr);
254+
void AttachActor(ui64 localActorId, IActor* actor) noexcept {
255+
if (ActorPack != TMailboxActorPack::Complex) {
256+
AttachActor(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, localActorId, actor);
257+
} else {
258+
AttachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId, actor);
259+
}
260+
}
261+
262+
static IActor* DetachActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept {
263+
Y_DEBUG_ABORT_UNLESS(FindActor(ActorPack, ActorsInfo, localActorId) != nullptr);
220264

221265
IActor* actorToDestruct = nullptr;
222266

@@ -270,11 +314,36 @@ namespace NActors {
270314
}
271315
break;
272316
}
317+
case TMailboxActorPack::Complex:
318+
Y_ABORT("Unexpected ActorPack type");
273319
}
274320

275321
return actorToDestruct;
276322
}
277323

324+
IActor* DetachActor(ui64 localActorId) noexcept {
325+
if (ActorPack != TMailboxActorPack::Complex) {
326+
return DetachActor(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, localActorId);
327+
} else {
328+
return DetachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId);
329+
}
330+
}
331+
332+
void SetStatCallback(std::function<void(const TMailboxStat&)> callback) {
333+
TComplexActorInfo* complex = new TComplexActorInfo;
334+
complex->StatCallback = callback;
335+
complex->ActorPack = static_cast<TMailboxActorPack::EType>(ActorPack);
336+
complex->ActorsInfo = ActorsInfo;
337+
ActorPack = TMailboxActorPack::Complex;
338+
ActorsInfo.Complex = complex;
339+
}
340+
341+
void InvokeMailboxStat(const TMailboxStat &stat) {
342+
if (ActorPack == TMailboxActorPack::Complex) {
343+
ActorsInfo.Complex->StatCallback(stat);
344+
}
345+
}
346+
278347
std::pair<ui32, ui32> CountMailboxEvents(ui64 localActorId, ui32 maxTraverse);
279348
};
280349

0 commit comments

Comments
 (0)