9
9
10
10
#include < library/cpp/testing/unittest/tests_data.h>
11
11
#include < library/cpp/testing/unittest/registar.h>
12
+ #include < library/cpp/digest/crc32c/crc32c.h>
12
13
13
14
#include < util/network/sock.h>
14
15
#include < util/network/poller.h>
@@ -21,11 +22,13 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
21
22
class TSenderActor : public TSenderBaseActor {
22
23
TDeque<ui64> InFly;
23
24
ui16 SendFlags;
25
+ bool UseRope;
24
26
25
27
public:
26
- TSenderActor (const TActorId& recipientActorId, ui16 sendFlags)
28
+ TSenderActor (const TActorId& recipientActorId, ui16 sendFlags, bool useRope )
27
29
: TSenderBaseActor(recipientActorId, 32 )
28
30
, SendFlags(sendFlags)
31
+ , UseRope(useRope)
29
32
{
30
33
}
31
34
@@ -36,8 +39,19 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
36
39
void SendMessage (const TActorContext& ctx) override {
37
40
const ui32 flags = IEventHandle::MakeFlags (0 , SendFlags);
38
41
const ui64 cookie = SequenceNumber;
39
- const TString payload (' @' , RandomNumber<size_t >(65536 ) + 4096 );
40
- ctx.Send (RecipientActorId, new TEvTest (SequenceNumber, payload), flags, cookie);
42
+
43
+ const TString payload (RandomNumber<size_t >(65536 ) + 4096 , ' @' );
44
+
45
+ auto ev = new TEvTest (SequenceNumber);
46
+ ev->Record .SetDataCrc (Crc32c (payload.data (), payload.size ()));
47
+
48
+ if (UseRope) {
49
+ ev->Record .SetPayloadId (ev->AddPayload (TRope (payload)));
50
+ } else {
51
+ ev->Record .SetPayload (payload);
52
+ }
53
+
54
+ ctx.Send (RecipientActorId, ev, flags, cookie);
41
55
InFly.push_back (SequenceNumber);
42
56
++InFlySize;
43
57
++SequenceNumber;
@@ -90,6 +104,14 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
90
104
Y_ABORT_UNLESS (m.HasSequenceNumber ());
91
105
Y_ABORT_UNLESS (m.GetSequenceNumber () >= ReceivedCount, " got #%" PRIu64 " expected at least #%" PRIu64,
92
106
m.GetSequenceNumber (), ReceivedCount);
107
+ if (m.HasPayloadId ()) {
108
+ auto rope = ev->Get ()->GetPayload (m.GetPayloadId ());
109
+ auto data = rope.GetContiguousSpan ();
110
+ auto crc = Crc32c (data.data (), data.size ());
111
+ Y_ABORT_UNLESS (m.GetDataCrc () == crc);
112
+ } else {
113
+ Y_ABORT_UNLESS (m.HasPayload ());
114
+ }
93
115
++ReceivedCount;
94
116
SenderNode->Send (ev->Sender , new TEvTestResponse (m.GetSequenceNumber ()));
95
117
}
@@ -109,7 +131,23 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
109
131
110
132
TReceiverActor* receiverActor = new TReceiverActor (testCluster.GetNode (1 ));
111
133
const TActorId recipient = testCluster.RegisterActor (receiverActor, 2 );
112
- TSenderActor* senderActor = new TSenderActor (recipient, flags);
134
+ TSenderActor* senderActor = new TSenderActor (recipient, flags, false );
135
+ testCluster.RegisterActor (senderActor, 1 );
136
+
137
+ NanoSleep (30ULL * 1000 * 1000 * 1000 );
138
+ }
139
+
140
+ Y_UNIT_TEST (InterconnectTestWithProxyUnsureUndeliveredWithRopeXdc) {
141
+ ui32 numNodes = 2 ;
142
+ double bandWidth = 1000000 ;
143
+ ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered;
144
+ TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds (2 ), bandWidth, true };
145
+
146
+ TTestICCluster testCluster (numNodes, TChannelsConfig (), &interrupterSettings);
147
+
148
+ TReceiverActor* receiverActor = new TReceiverActor (testCluster.GetNode (1 ));
149
+ const TActorId recipient = testCluster.RegisterActor (receiverActor, 2 );
150
+ TSenderActor* senderActor = new TSenderActor (recipient, flags, true );
113
151
testCluster.RegisterActor (senderActor, 1 );
114
152
115
153
NanoSleep (30ULL * 1000 * 1000 * 1000 );
@@ -125,7 +163,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
125
163
126
164
TReceiverActor* receiverActor = new TReceiverActor (testCluster.GetNode (1 ));
127
165
const TActorId recipient = testCluster.RegisterActor (receiverActor, 2 );
128
- TSenderActor* senderActor = new TSenderActor (recipient, flags);
166
+ TSenderActor* senderActor = new TSenderActor (recipient, flags, false );
129
167
testCluster.RegisterActor (senderActor, 1 );
130
168
131
169
NanoSleep (30ULL * 1000 * 1000 * 1000 );
0 commit comments