Skip to content

Enable XDC by default, fix ut to use XDC transfers #15540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ message TInterconnectConfig {
optional bool SuppressConnectivityCheck = 39 [default = false];
optional uint32 PreallocatedBufferSize = 40;
optional uint32 NumPreallocatedBuffers = 41;
optional bool EnableExternalDataChannel = 42;
optional bool EnableExternalDataChannel = 42 [default = true];
optional bool ValidateIncomingPeerViaDirectLookup = 44;
optional uint32 SocketBacklogSize = 45; // SOMAXCONN if not set or zero

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/interconnect/interconnect_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace NActors {
ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
ui32 PreallocatedBufferSize = 8 << 10; // 8 KB
ui32 NumPreallocatedBuffers = 16;
bool EnableExternalDataChannel = false;
bool EnableExternalDataChannel = true;
bool ValidateIncomingPeerViaDirectLookup = false;
ui32 SocketBacklogSize = 0; // SOMAXCONN if zero
TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/actors/interconnect/ut/lib/test_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ namespace NActors {
struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {
TEvTest() = default;

explicit TEvTest(ui64 sequenceNumber) {
Record.SetSequenceNumber(sequenceNumber);
}

TEvTest(ui64 sequenceNumber, const TString& payload) {
Record.SetSequenceNumber(sequenceNumber);
Record.SetPayload(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package NInterconnectTest;

message TEvTest {
optional uint64 SequenceNumber = 1;
optional bytes Payload = 2;
optional uint64 DataCrc = 2;
oneof Data {
bytes Payload = 3;
uint32 PayloadId = 4;
}
}

message TEvTestChan {
Expand Down
48 changes: 43 additions & 5 deletions ydb/library/actors/interconnect/ut_fat/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <library/cpp/testing/unittest/tests_data.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/digest/crc32c/crc32c.h>

#include <util/network/sock.h>
#include <util/network/poller.h>
Expand All @@ -21,11 +22,13 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
class TSenderActor: public TSenderBaseActor {
TDeque<ui64> InFly;
ui16 SendFlags;
bool UseRope;

public:
TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
TSenderActor(const TActorId& recipientActorId, ui16 sendFlags, bool useRope)
: TSenderBaseActor(recipientActorId, 32)
, SendFlags(sendFlags)
, UseRope(useRope)
{
}

Expand All @@ -36,8 +39,19 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
void SendMessage(const TActorContext& ctx) override {
const ui32 flags = IEventHandle::MakeFlags(0, SendFlags);
const ui64 cookie = SequenceNumber;
const TString payload('@', RandomNumber<size_t>(65536) + 4096);
ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie);

const TString payload(RandomNumber<size_t>(65536) + 4096, '@');

auto ev = new TEvTest(SequenceNumber);
ev->Record.SetDataCrc(Crc32c(payload.data(), payload.size()));

if (UseRope) {
ev->Record.SetPayloadId(ev->AddPayload(TRope(payload)));
} else {
ev->Record.SetPayload(payload);
}

ctx.Send(RecipientActorId, ev, flags, cookie);
InFly.push_back(SequenceNumber);
++InFlySize;
++SequenceNumber;
Expand Down Expand Up @@ -90,6 +104,14 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
Y_ABORT_UNLESS(m.HasSequenceNumber());
Y_ABORT_UNLESS(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64,
m.GetSequenceNumber(), ReceivedCount);
if (m.HasPayloadId()) {
auto rope = ev->Get()->GetPayload(m.GetPayloadId());
auto data = rope.GetContiguousSpan();
auto crc = Crc32c(data.data(), data.size());
Y_ABORT_UNLESS(m.GetDataCrc() == crc);
} else {
Y_ABORT_UNLESS(m.HasPayload());
}
++ReceivedCount;
SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber()));
}
Expand All @@ -109,7 +131,23 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {

TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags);
TSenderActor* senderActor = new TSenderActor(recipient, flags, false);
testCluster.RegisterActor(senderActor, 1);

NanoSleep(30ULL * 1000 * 1000 * 1000);
}

Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndeliveredWithRopeXdc) {
ui32 numNodes = 2;
double bandWidth = 1000000;
ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered;
TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};

TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);

TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags, true);
testCluster.RegisterActor(senderActor, 1);

NanoSleep(30ULL * 1000 * 1000 * 1000);
Expand All @@ -125,7 +163,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {

TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
TSenderActor* senderActor = new TSenderActor(recipient, flags);
TSenderActor* senderActor = new TSenderActor(recipient, flags, false);
testCluster.RegisterActor(senderActor, 1);

NanoSleep(30ULL * 1000 * 1000 * 1000);
Expand Down
Loading