Skip to content

Commit 1febe01

Browse files
dcherednikblinkov
authored andcommitted
Enable XDC by default, fix ut to use XDC transfers (#15540)
XDC allows to send huge events without splitting into small IC chunks.
1 parent d975679 commit 1febe01

File tree

5 files changed

+54
-8
lines changed

5 files changed

+54
-8
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ message TInterconnectConfig {
450450
optional bool SuppressConnectivityCheck = 39 [default = false];
451451
optional uint32 PreallocatedBufferSize = 40;
452452
optional uint32 NumPreallocatedBuffers = 41;
453-
optional bool EnableExternalDataChannel = 42;
453+
optional bool EnableExternalDataChannel = 42 [default = true];
454454
optional bool ValidateIncomingPeerViaDirectLookup = 44;
455455
optional uint32 SocketBacklogSize = 45; // SOMAXCONN if not set or zero
456456

ydb/library/actors/interconnect/interconnect_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ namespace NActors {
4949
ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
5050
ui32 PreallocatedBufferSize = 8 << 10; // 8 KB
5151
ui32 NumPreallocatedBuffers = 16;
52-
bool EnableExternalDataChannel = false;
52+
bool EnableExternalDataChannel = true;
5353
bool ValidateIncomingPeerViaDirectLookup = false;
5454
ui32 SocketBacklogSize = 0; // SOMAXCONN if zero
5555
TDuration FirstErrorSleep = TDuration::MilliSeconds(10);

ydb/library/actors/interconnect/ut/lib/test_events.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ namespace NActors {
1515
struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {
1616
TEvTest() = default;
1717

18+
explicit TEvTest(ui64 sequenceNumber) {
19+
Record.SetSequenceNumber(sequenceNumber);
20+
}
21+
1822
TEvTest(ui64 sequenceNumber, const TString& payload) {
1923
Record.SetSequenceNumber(sequenceNumber);
2024
Record.SetPayload(payload);

ydb/library/actors/interconnect/ut/protos/interconnect_test.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package NInterconnectTest;
22

33
message TEvTest {
44
optional uint64 SequenceNumber = 1;
5-
optional bytes Payload = 2;
5+
optional uint64 DataCrc = 2;
6+
oneof Data {
7+
bytes Payload = 3;
8+
uint32 PayloadId = 4;
9+
}
610
}
711

812
message TEvTestChan {

ydb/library/actors/interconnect/ut_fat/main.cpp

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <library/cpp/testing/unittest/tests_data.h>
1111
#include <library/cpp/testing/unittest/registar.h>
12+
#include <library/cpp/digest/crc32c/crc32c.h>
1213

1314
#include <util/network/sock.h>
1415
#include <util/network/poller.h>
@@ -21,11 +22,13 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
2122
class TSenderActor: public TSenderBaseActor {
2223
TDeque<ui64> InFly;
2324
ui16 SendFlags;
25+
bool UseRope;
2426

2527
public:
26-
TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
28+
TSenderActor(const TActorId& recipientActorId, ui16 sendFlags, bool useRope)
2729
: TSenderBaseActor(recipientActorId, 32)
2830
, SendFlags(sendFlags)
31+
, UseRope(useRope)
2932
{
3033
}
3134

@@ -36,8 +39,19 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
3639
void SendMessage(const TActorContext& ctx) override {
3740
const ui32 flags = IEventHandle::MakeFlags(0, SendFlags);
3841
const ui64 cookie = SequenceNumber;
39-
const TString payload('@', RandomNumber<size_t>(65536) + 4096);
40-
ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie);
42+
43+
const TString payload(RandomNumber<size_t>(65536) + 4096, '@');
44+
45+
auto ev = new TEvTest(SequenceNumber);
46+
ev->Record.SetDataCrc(Crc32c(payload.data(), payload.size()));
47+
48+
if (UseRope) {
49+
ev->Record.SetPayloadId(ev->AddPayload(TRope(payload)));
50+
} else {
51+
ev->Record.SetPayload(payload);
52+
}
53+
54+
ctx.Send(RecipientActorId, ev, flags, cookie);
4155
InFly.push_back(SequenceNumber);
4256
++InFlySize;
4357
++SequenceNumber;
@@ -90,6 +104,14 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
90104
Y_ABORT_UNLESS(m.HasSequenceNumber());
91105
Y_ABORT_UNLESS(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64,
92106
m.GetSequenceNumber(), ReceivedCount);
107+
if (m.HasPayloadId()) {
108+
auto rope = ev->Get()->GetPayload(m.GetPayloadId());
109+
auto data = rope.GetContiguousSpan();
110+
auto crc = Crc32c(data.data(), data.size());
111+
Y_ABORT_UNLESS(m.GetDataCrc() == crc);
112+
} else {
113+
Y_ABORT_UNLESS(m.HasPayload());
114+
}
93115
++ReceivedCount;
94116
SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber()));
95117
}
@@ -109,7 +131,23 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
109131

110132
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
111133
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
112-
TSenderActor* senderActor = new TSenderActor(recipient, flags);
134+
TSenderActor* senderActor = new TSenderActor(recipient, flags, false);
135+
testCluster.RegisterActor(senderActor, 1);
136+
137+
NanoSleep(30ULL * 1000 * 1000 * 1000);
138+
}
139+
140+
Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndeliveredWithRopeXdc) {
141+
ui32 numNodes = 2;
142+
double bandWidth = 1000000;
143+
ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered;
144+
TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};
145+
146+
TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
147+
148+
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
149+
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
150+
TSenderActor* senderActor = new TSenderActor(recipient, flags, true);
113151
testCluster.RegisterActor(senderActor, 1);
114152

115153
NanoSleep(30ULL * 1000 * 1000 * 1000);
@@ -125,7 +163,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
125163

126164
TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
127165
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
128-
TSenderActor* senderActor = new TSenderActor(recipient, flags);
166+
TSenderActor* senderActor = new TSenderActor(recipient, flags, false);
129167
testCluster.RegisterActor(senderActor, 1);
130168

131169
NanoSleep(30ULL * 1000 * 1000 * 1000);

0 commit comments

Comments
 (0)