Skip to content

Commit 720647d

Browse files
authored
Merge 73be542 into 04968d0
2 parents 04968d0 + 73be542 commit 720647d

12 files changed

+539
-118
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#include "block_events.h"
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#include "test_runtime.h"
2+
3+
#include <deque>
4+
#include <functional>
5+
6+
namespace NActors {
7+
8+
/**
9+
* Easy blocking for events under the test actor runtime
10+
*
11+
* Matching events are blocked just before they are processed and stashed
12+
* into a deque.
13+
*/
14+
template<class TEvType>
15+
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
16+
public:
17+
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(const typename TEvType::TPtr&)> condition = {})
18+
: Runtime(runtime)
19+
, Condition(std::move(condition))
20+
, Holder(Runtime.AddObserver<TEvType>(
21+
[this](typename TEvType::TPtr& ev) {
22+
this->Process(ev);
23+
}))
24+
{}
25+
26+
/**
27+
* Unblocks up to count events at the front of the deque, allowing them
28+
* to be handled by the destination actor.
29+
*/
30+
TBlockEvents& Unblock(size_t count = Max<size_t>()) {
31+
while (!this->empty() && count > 0) {
32+
auto& ev = this->front();
33+
if (!Stopped) {
34+
IEventHandle* ptr = ev.Get();
35+
UnblockedOnce.insert(ptr);
36+
}
37+
ui32 nodeId = ev->GetRecipientRewrite().NodeId();
38+
ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId();
39+
Cerr << "... unblocking " << (ev->HasEvent() ? TypeName(*ev->GetBase()) : TypeName<TEvType>())
40+
<< " from " << Runtime.FindActorName(ev->Sender)
41+
<< " to " << Runtime.FindActorName(ev->GetRecipientRewrite())
42+
<< Endl;
43+
Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true);
44+
this->pop_front();
45+
--count;
46+
}
47+
return *this;
48+
}
49+
50+
/**
51+
* Stops blocking any new events. Events currently in the deque are
52+
* not unblocked, but may be unblocked at a later time if needed.
53+
*/
54+
TBlockEvents& Stop() {
55+
UnblockedOnce.clear();
56+
Holder.Remove();
57+
Stopped = true;
58+
return *this;
59+
}
60+
61+
private:
62+
void Process(typename TEvType::TPtr& ev) {
63+
IEventHandle* ptr = ev.Get();
64+
auto it = UnblockedOnce.find(ptr);
65+
if (it != UnblockedOnce.end()) {
66+
UnblockedOnce.erase(it);
67+
return;
68+
}
69+
70+
if (Condition && !Condition(ev)) {
71+
return;
72+
}
73+
74+
Cerr << "... blocking " << (ev->HasEvent() ? TypeName(*ev->GetBase()) : TypeName<TEvType>())
75+
<< " from " << Runtime.FindActorName(ev->Sender)
76+
<< " to " << Runtime.FindActorName(ev->GetRecipientRewrite())
77+
<< Endl;
78+
this->emplace_back(std::move(ev));
79+
}
80+
81+
private:
82+
TTestActorRuntime& Runtime;
83+
std::function<bool(typename TEvType::TPtr&)> Condition;
84+
TTestActorRuntime::TEventObserverHolder Holder;
85+
THashSet<IEventHandle*> UnblockedOnce;
86+
bool Stopped = false;
87+
};
88+
89+
90+
} // namespace NActors

ydb/core/testlib/actors/test_runtime.h

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,42 @@ namespace NActors {
6868
void SimulateSleep(TDuration duration);
6969

7070
template<class TResult>
71-
inline TResult WaitFuture(NThreading::TFuture<TResult> f) {
71+
inline TResult WaitFuture(NThreading::TFuture<TResult> f, TDuration simTimeout = TDuration::Max()) {
7272
if (!f.HasValue() && !f.HasException()) {
7373
TDispatchOptions options;
7474
options.CustomFinalCondition = [&]() {
7575
return f.HasValue() || f.HasException();
7676
};
77-
options.FinalEvents.emplace_back([&](IEventHandle&) {
78-
return f.HasValue() || f.HasException();
79-
});
77+
// Quirk: non-empty FinalEvents enables full simulation
78+
options.FinalEvents.emplace_back([](IEventHandle&) { return false; });
8079

81-
this->DispatchEvents(options);
80+
this->DispatchEvents(options, simTimeout);
8281

8382
Y_ABORT_UNLESS(f.HasValue() || f.HasException());
8483
}
8584

86-
return f.ExtractValue();
85+
if constexpr (!std::is_same_v<TResult, void>) {
86+
return f.ExtractValue();
87+
} else {
88+
return f.GetValue();
89+
}
90+
}
91+
92+
template<class TCondition>
93+
inline void WaitFor(const TString& description, const TCondition& condition, TDuration simTimeout = TDuration::Max()) {
94+
if (!condition()) {
95+
TDispatchOptions options;
96+
options.CustomFinalCondition = [&]() {
97+
return condition();
98+
};
99+
// Quirk: non-empty FinalEvents enables full simulation
100+
options.FinalEvents.emplace_back([](IEventHandle&) { return false; });
101+
102+
Cerr << "... waiting for " << description << Endl;
103+
this->DispatchEvents(options, simTimeout);
104+
105+
Y_ABORT_UNLESS(condition(), "Timeout while waiting for %s", description.c_str());
106+
}
87107
}
88108

89109
TIntrusivePtr<NKikimr::TMemObserver> GetMemObserver(ui32 nodeIndex = 0) {

ydb/core/testlib/actors/test_runtime_ut.cpp

Lines changed: 205 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <ydb/core/testlib/actors/test_runtime.h>
2+
#include <ydb/core/testlib/actors/block_events.h>
23
#include <ydb/core/base/appdata.h>
34
#include <ydb/library/actors/core/event_local.h>
45
#include <ydb/library/actors/core/events.h>
@@ -622,6 +623,209 @@ Y_UNIT_TEST_SUITE(TActorTest) {
622623
UNIT_ASSERT_VALUES_EQUAL(event->Get()->Index, 12u);
623624
}
624625
}
625-
};
626+
627+
Y_UNIT_TEST(TestWaitFuture) {
628+
enum EEv {
629+
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
630+
};
631+
632+
struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
633+
TEvTrigger() = default;
634+
};
635+
636+
class TTriggerActor : public TActorBootstrapped<TTriggerActor> {
637+
public:
638+
TTriggerActor(NThreading::TPromise<void> promise)
639+
: Promise(std::move(promise))
640+
{}
641+
642+
void Bootstrap() {
643+
Schedule(TDuration::Seconds(1), new TEvTrigger);
644+
Become(&TThis::StateWork);
645+
}
646+
647+
private:
648+
STFUNC(StateWork) {
649+
switch (ev->GetTypeRewrite()) {
650+
hFunc(TEvTrigger, Handle);
651+
}
652+
}
653+
654+
void Handle(TEvTrigger::TPtr&) {
655+
Promise.SetValue();
656+
PassAway();
657+
}
658+
659+
private:
660+
NThreading::TPromise<void> Promise;
661+
};
662+
663+
TTestActorRuntime runtime;
664+
runtime.Initialize(MakeEgg());
665+
666+
NThreading::TPromise<void> promise = NThreading::NewPromise<void>();
667+
NThreading::TFuture<void> future = promise.GetFuture();
668+
669+
auto actor = runtime.Register(new TTriggerActor(std::move(promise)));
670+
runtime.EnableScheduleForActor(actor);
671+
672+
runtime.WaitFuture(std::move(future));
673+
}
674+
675+
Y_UNIT_TEST(TestWaitFor) {
676+
enum EEv {
677+
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
678+
};
679+
680+
struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
681+
TEvTrigger() = default;
682+
};
683+
684+
class TTriggerActor : public TActorBootstrapped<TTriggerActor> {
685+
public:
686+
TTriggerActor(int* ptr)
687+
: Ptr(ptr)
688+
{}
689+
690+
void Bootstrap() {
691+
Schedule(TDuration::Seconds(1), new TEvTrigger);
692+
Become(&TThis::StateWork);
693+
}
694+
695+
private:
696+
STFUNC(StateWork) {
697+
switch (ev->GetTypeRewrite()) {
698+
hFunc(TEvTrigger, Handle);
699+
}
700+
}
701+
702+
void Handle(TEvTrigger::TPtr&) {
703+
*Ptr = 42;
704+
PassAway();
705+
}
706+
707+
private:
708+
int* Ptr;
709+
};
710+
711+
TTestActorRuntime runtime;
712+
runtime.Initialize(MakeEgg());
713+
714+
int value = 0;
715+
auto actor = runtime.Register(new TTriggerActor(&value));
716+
runtime.EnableScheduleForActor(actor);
717+
718+
runtime.WaitFor("value = 42", [&]{ return value == 42; });
719+
UNIT_ASSERT_VALUES_EQUAL(value, 42);
720+
}
721+
722+
Y_UNIT_TEST(TestBlockEvents) {
723+
enum EEv {
724+
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
725+
};
726+
727+
struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
728+
int Value;
729+
730+
TEvTrigger(int value)
731+
: Value(value)
732+
{}
733+
};
734+
735+
class TTargetActor : public TActorBootstrapped<TTargetActor> {
736+
public:
737+
TTargetActor(std::vector<int>* ptr)
738+
: Ptr(ptr)
739+
{}
740+
741+
void Bootstrap() {
742+
Become(&TThis::StateWork);
743+
}
744+
745+
private:
746+
STFUNC(StateWork) {
747+
switch (ev->GetTypeRewrite()) {
748+
hFunc(TEvTrigger, Handle);
749+
}
750+
}
751+
752+
void Handle(TEvTrigger::TPtr& ev) {
753+
Ptr->push_back(ev->Get()->Value);
754+
}
755+
756+
private:
757+
std::vector<int>* Ptr;
758+
};
759+
760+
class TSourceActor : public TActorBootstrapped<TSourceActor> {
761+
public:
762+
TSourceActor(const TActorId& target)
763+
: Target(target)
764+
{}
765+
766+
void Bootstrap() {
767+
Become(&TThis::StateWork);
768+
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
769+
}
770+
771+
private:
772+
STFUNC(StateWork) {
773+
switch (ev->GetTypeRewrite()) {
774+
hFunc(TEvents::TEvWakeup, Handle);
775+
}
776+
}
777+
778+
void Handle(TEvents::TEvWakeup::TPtr&) {
779+
Send(Target, new TEvTrigger(++Counter));
780+
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
781+
}
782+
783+
private:
784+
TActorId Target;
785+
int Counter = 0;
786+
};
787+
788+
TTestActorRuntime runtime(2);
789+
runtime.Initialize(MakeEgg());
790+
791+
std::vector<int> values;
792+
auto target = runtime.Register(new TTargetActor(&values), /* nodeIdx */ 1);
793+
auto source = runtime.Register(new TSourceActor(target), /* nodeIdx */ 1);
794+
runtime.EnableScheduleForActor(source);
795+
796+
TBlockEvents<TEvTrigger> block(runtime, [&](const TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; });
797+
runtime.WaitFor("blocked 3 events", [&]{ return block.size() >= 3; });
798+
UNIT_ASSERT_VALUES_EQUAL(block.size(), 3u);
799+
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);
800+
801+
block.Unblock(2);
802+
UNIT_ASSERT_VALUES_EQUAL(block.size(), 1u);
803+
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);
804+
805+
runtime.WaitFor("blocked 1 more event", [&]{ return block.size() >= 2; });
806+
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
807+
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
808+
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 1);
809+
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 2);
810+
values.clear();
811+
812+
block.Stop();
813+
runtime.WaitFor("processed 2 more events", [&]{ return values.size() >= 2; });
814+
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
815+
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
816+
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 5);
817+
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 6);
818+
values.clear();
819+
820+
block.Unblock();
821+
UNIT_ASSERT_VALUES_EQUAL(block.size(), 0u);
822+
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);
823+
runtime.WaitFor("processed 3 more events", [&]{ return values.size() >= 3; });
824+
UNIT_ASSERT_VALUES_EQUAL(values.size(), 3u);
825+
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 3);
826+
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 4);
827+
UNIT_ASSERT_VALUES_EQUAL(values.at(2), 7);
828+
}
829+
}
626830

627831
}

ydb/core/testlib/actors/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
LIBRARY()
22

33
SRCS(
4+
block_events.cpp
5+
block_events.h
46
test_runtime.cpp
7+
test_runtime.h
58
)
69

710
PEERDIR(

0 commit comments

Comments
 (0)