From e8cd3cc47c4572e345074ebad91b3cdcceb0f668 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Mon, 10 Mar 2025 16:15:19 +0100 Subject: [PATCH 1/3] Enable XDC by default, fix ut to use XDC transfers --- .../actors/interconnect/interconnect_common.h | 2 +- .../actors/interconnect/ut/lib/test_events.h | 4 ++ .../ut/protos/interconnect_test.proto | 6 ++- .../actors/interconnect/ut_fat/main.cpp | 48 +++++++++++++++++-- 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/ydb/library/actors/interconnect/interconnect_common.h b/ydb/library/actors/interconnect/interconnect_common.h index 2b2de7c44756..0b1a8be237f6 100644 --- a/ydb/library/actors/interconnect/interconnect_common.h +++ b/ydb/library/actors/interconnect/interconnect_common.h @@ -49,7 +49,7 @@ namespace NActors { ui32 MaxSerializedEventSize = NActors::EventMaxByteSize; ui32 PreallocatedBufferSize = 8 << 10; // 8 KB ui32 NumPreallocatedBuffers = 16; - bool EnableExternalDataChannel = false; + bool EnableExternalDataChannel = true; bool ValidateIncomingPeerViaDirectLookup = false; ui32 SocketBacklogSize = 0; // SOMAXCONN if zero TDuration FirstErrorSleep = TDuration::MilliSeconds(10); diff --git a/ydb/library/actors/interconnect/ut/lib/test_events.h b/ydb/library/actors/interconnect/ut/lib/test_events.h index 55cddb71527e..c59b1f76fb7a 100644 --- a/ydb/library/actors/interconnect/ut/lib/test_events.h +++ b/ydb/library/actors/interconnect/ut/lib/test_events.h @@ -15,6 +15,10 @@ namespace NActors { struct TEvTest : TEventPB { TEvTest() = default; + explicit TEvTest(ui64 sequenceNumber) { + Record.SetSequenceNumber(sequenceNumber); + } + TEvTest(ui64 sequenceNumber, const TString& payload) { Record.SetSequenceNumber(sequenceNumber); Record.SetPayload(payload); diff --git a/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto b/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto index b74d068a8b8b..c4218d67650c 100644 --- a/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto +++ b/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto @@ -2,7 +2,11 @@ package NInterconnectTest; message TEvTest { optional uint64 SequenceNumber = 1; - optional bytes Payload = 2; + optional uint64 DataCrc = 2; + oneof Data { + bytes Payload = 3; + uint32 PayloadId = 4; + } } message TEvTestChan { diff --git a/ydb/library/actors/interconnect/ut_fat/main.cpp b/ydb/library/actors/interconnect/ut_fat/main.cpp index 251ce1b75b60..1886f1238ac6 100644 --- a/ydb/library/actors/interconnect/ut_fat/main.cpp +++ b/ydb/library/actors/interconnect/ut_fat/main.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -21,11 +22,13 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { class TSenderActor: public TSenderBaseActor { TDeque InFly; ui16 SendFlags; + bool UseRope; public: - TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) + TSenderActor(const TActorId& recipientActorId, ui16 sendFlags, bool useRope) : TSenderBaseActor(recipientActorId, 32) , SendFlags(sendFlags) + , UseRope(useRope) { } @@ -36,8 +39,19 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { void SendMessage(const TActorContext& ctx) override { const ui32 flags = IEventHandle::MakeFlags(0, SendFlags); const ui64 cookie = SequenceNumber; - const TString payload('@', RandomNumber(65536) + 4096); - ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie); + + const TString payload(RandomNumber(65536) + 4096, '@'); + + auto ev = new TEvTest(SequenceNumber); + ev->Record.SetDataCrc(Crc32c(payload.data(), payload.size())); + + if (UseRope) { + ev->Record.SetPayloadId(ev->AddPayload(TRope(payload))); + } else { + ev->Record.SetPayload(payload); + } + + ctx.Send(RecipientActorId, ev, flags, cookie); InFly.push_back(SequenceNumber); ++InFlySize; ++SequenceNumber; @@ -90,6 +104,14 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { Y_ABORT_UNLESS(m.HasSequenceNumber()); Y_ABORT_UNLESS(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64, m.GetSequenceNumber(), ReceivedCount); + if (m.HasPayloadId()) { + auto rope = ev->Get()->GetPayload(m.GetPayloadId()); + auto data = rope.GetContiguousSpan(); + auto crc = Crc32c(data.data(), data.size()); + Y_ABORT_UNLESS(m.GetDataCrc() == crc); + } else { + Y_ABORT_UNLESS(m.HasPayload()); + } ++ReceivedCount; SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber())); } @@ -109,7 +131,23 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); - TSenderActor* senderActor = new TSenderActor(recipient, flags); + TSenderActor* senderActor = new TSenderActor(recipient, flags, false); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(30ULL * 1000 * 1000 * 1000); + } + + Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndeliveredWithRopeXdc) { + ui32 numNodes = 2; + double bandWidth = 1000000; + ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; + TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; + + TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + TSenderActor* senderActor = new TSenderActor(recipient, flags, true); testCluster.RegisterActor(senderActor, 1); NanoSleep(30ULL * 1000 * 1000 * 1000); @@ -125,7 +163,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); - TSenderActor* senderActor = new TSenderActor(recipient, flags); + TSenderActor* senderActor = new TSenderActor(recipient, flags, false); testCluster.RegisterActor(senderActor, 1); NanoSleep(30ULL * 1000 * 1000 * 1000); From fc0c698b281e8dc7d8ab2571c420983b518094c1 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Mon, 10 Mar 2025 16:51:18 +0100 Subject: [PATCH 2/3] Enable XDC in core, dq --- ydb/core/protos/config.proto | 2 +- ydb/library/yql/providers/dq/config/config.proto | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index a0a36626fb02..21bc6e3f3ef7 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -450,7 +450,7 @@ message TInterconnectConfig { optional bool SuppressConnectivityCheck = 39 [default = false]; optional uint32 PreallocatedBufferSize = 40; optional uint32 NumPreallocatedBuffers = 41; - optional bool EnableExternalDataChannel = 42; + optional bool EnableExternalDataChannel = 42 [default = true]; optional bool ValidateIncomingPeerViaDirectLookup = 44; optional uint32 SocketBacklogSize = 45; // SOMAXCONN if not set or zero diff --git a/ydb/library/yql/providers/dq/config/config.proto b/ydb/library/yql/providers/dq/config/config.proto index 8b998de21ce1..edb78e042613 100644 --- a/ydb/library/yql/providers/dq/config/config.proto +++ b/ydb/library/yql/providers/dq/config/config.proto @@ -25,7 +25,7 @@ message TDqConfig { optional uint64 MessagePendingTimeoutMs = 15 [default = 5000]; optional uint64 MessagePendingSize = 16 [default = 18446744073709551615]; optional uint32 MaxSerializedEventSize = 17 [default = 67108000]; - optional bool EnableExternalDataChannel = 27 [default = false]; + optional bool EnableExternalDataChannel = 27 [default = true]; // Scheduler optional uint64 ResolutionMicroseconds = 18 [default = 1024]; From f00cf0aed2a159032938019479c7b5b4560fdd41 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Mon, 10 Mar 2025 19:05:09 +0100 Subject: [PATCH 3/3] Disable XDC for dq --- ydb/library/yql/providers/dq/config/config.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/dq/config/config.proto b/ydb/library/yql/providers/dq/config/config.proto index edb78e042613..8b998de21ce1 100644 --- a/ydb/library/yql/providers/dq/config/config.proto +++ b/ydb/library/yql/providers/dq/config/config.proto @@ -25,7 +25,7 @@ message TDqConfig { optional uint64 MessagePendingTimeoutMs = 15 [default = 5000]; optional uint64 MessagePendingSize = 16 [default = 18446744073709551615]; optional uint32 MaxSerializedEventSize = 17 [default = 67108000]; - optional bool EnableExternalDataChannel = 27 [default = true]; + optional bool EnableExternalDataChannel = 27 [default = false]; // Scheduler optional uint64 ResolutionMicroseconds = 18 [default = 1024];