Skip to content

Commit 1d0798e

Browse files
authored
Fix bug with broadcasting records (#6585)
1 parent 4940aa0 commit 1d0798e

File tree

2 files changed

+109
-61
lines changed

2 files changed

+109
-61
lines changed

ydb/core/change_exchange/change_sender_common_ops.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class TBaseChangeSender {
336336
Y_ABORT_UNLESS(it != Broadcasting.end());
337337

338338
auto& broadcast = it->second;
339-
if (broadcast.Partitions.contains(partitionId)) {
339+
if (broadcast.CompletedPartitions.contains(partitionId)) {
340340
return false;
341341
}
342342

ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp

+108-60
Original file line numberDiff line numberDiff line change
@@ -556,84 +556,87 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
556556
});
557557
}
558558

559+
bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState,
560+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>& tablePartitions,
561+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>& topicPartitions)
562+
{
563+
for (const auto& topicPartition : topicPartitions) {
564+
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
565+
{
566+
auto& record = *request->Record.MutablePartitionRequest();
567+
record.SetPartition(topicPartition.GetPartitionId());
568+
auto& cmd = *record.MutableCmdGetMaxSeqNo();
569+
for (const auto& tablePartition : tablePartitions) {
570+
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
571+
}
572+
}
573+
574+
const auto& sender = runtime.AllocateEdgeActor();
575+
ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release());
576+
577+
auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
578+
{
579+
const auto& record = response->Get()->Record.GetPartitionResponse();
580+
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();
581+
582+
UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size());
583+
for (const auto& item: result) {
584+
if (item.GetState() != expectedState) {
585+
return false;
586+
}
587+
}
588+
}
589+
}
590+
591+
return true;
592+
}
593+
559594
struct TItem {
560595
TString Path;
561-
ui32 nPartitions;
596+
ui32 ExpectedPartitionCount;
562597
};
563598

564-
void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) {
599+
void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic,
600+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>* initialTablePartitions = nullptr)
601+
{
565602
auto tableDesc = DescribePath(runtime, table.Path, true, true);
566603
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
567-
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions);
604+
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount);
568605

569606
auto topicDesc = DescribePrivatePath(runtime, topic.Path);
570607
const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions();
571-
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions);
608+
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount);
572609

573610
while (true) {
574611
runtime.SimulateSleep(TDuration::Seconds(1));
575-
bool done = true;
576-
577-
for (ui32 i = 0; i < topic.nPartitions; ++i) {
578-
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
579-
{
580-
auto& record = *request->Record.MutablePartitionRequest();
581-
record.SetPartition(topicPartitions[i].GetPartitionId());
582-
auto& cmd = *record.MutableCmdGetMaxSeqNo();
583-
for (const auto& tablePartition : tablePartitions) {
584-
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
585-
}
586-
}
587-
588-
const auto& sender = runtime.AllocateEdgeActor();
589-
ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release());
590-
591-
auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
592-
{
593-
const auto& record = response->Get()->Record.GetPartitionResponse();
594-
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();
595-
596-
UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions);
597-
for (const auto& item: result) {
598-
done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED;
599-
if (!done) {
600-
break;
601-
}
602-
}
603-
}
604-
605-
if (!done) {
606-
break;
607-
}
608-
}
609-
610-
if (done) {
612+
if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) {
611613
break;
612614
}
613615
}
616+
617+
if (initialTablePartitions) {
618+
UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions));
619+
}
614620
}
615621

616-
Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
622+
template <typename T>
623+
void SplitTable(const TString& cdcStreamDesc) {
617624
T t;
618625
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
626+
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
619627
{
620628
TInactiveZone inactive(activeZone);
629+
621630
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
622631
Name: "Table"
623632
Columns { Name: "key" Type: "Uint32" }
624633
Columns { Name: "value" Type: "Uint32" }
625634
KeyColumnNames: ["key"]
626635
)");
627636
t.TestEnv->TestWaitNotification(runtime, t.TxId);
637+
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);
628638

629-
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
630-
TableName: "Table"
631-
StreamDescription {
632-
Name: "Stream"
633-
Mode: ECdcStreamModeKeysOnly
634-
Format: ECdcStreamFormatProto
635-
}
636-
)");
639+
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
637640
t.TestEnv->TestWaitNotification(runtime, t.TxId);
638641
}
639642

@@ -651,16 +654,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
651654
TInactiveZone inactive(activeZone);
652655
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
653656
UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
654-
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
657+
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1},
658+
&initialTableDesc.GetPathDescription().GetTablePartitions());
655659
}
656660
});
657661
}
658662

659-
Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
663+
Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
664+
SplitTable<T>(R"(
665+
TableName: "Table"
666+
StreamDescription {
667+
Name: "Stream"
668+
Mode: ECdcStreamModeKeysOnly
669+
Format: ECdcStreamFormatProto
670+
}
671+
)");
672+
}
673+
674+
Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) {
675+
SplitTable<T>(R"(
676+
TableName: "Table"
677+
StreamDescription {
678+
Name: "Stream"
679+
Mode: ECdcStreamModeKeysOnly
680+
Format: ECdcStreamFormatProto
681+
ResolvedTimestampsIntervalMs: 1000
682+
}
683+
)");
684+
}
685+
686+
template <typename T>
687+
void MergeTable(const TString& cdcStreamDesc) {
660688
T t;
661689
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
690+
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
662691
{
663692
TInactiveZone inactive(activeZone);
693+
664694
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
665695
Name: "Table"
666696
Columns { Name: "key" Type: "Uint32" }
@@ -674,15 +704,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
674704
}
675705
)");
676706
t.TestEnv->TestWaitNotification(runtime, t.TxId);
707+
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);
677708

678-
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
679-
TableName: "Table"
680-
StreamDescription {
681-
Name: "Stream"
682-
Mode: ECdcStreamModeKeysOnly
683-
Format: ECdcStreamFormatProto
684-
}
685-
)");
709+
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
686710
t.TestEnv->TestWaitNotification(runtime, t.TxId);
687711
}
688712

@@ -696,11 +720,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
696720
TInactiveZone inactive(activeZone);
697721
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
698722
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
699-
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
723+
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2},
724+
&initialTableDesc.GetPathDescription().GetTablePartitions());
700725
}
701726
});
702727
}
703728

729+
Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
730+
MergeTable<T>(R"(
731+
TableName: "Table"
732+
StreamDescription {
733+
Name: "Stream"
734+
Mode: ECdcStreamModeKeysOnly
735+
Format: ECdcStreamFormatProto
736+
}
737+
)");
738+
}
739+
740+
Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) {
741+
MergeTable<T>(R"(
742+
TableName: "Table"
743+
StreamDescription {
744+
Name: "Stream"
745+
Mode: ECdcStreamModeKeysOnly
746+
Format: ECdcStreamFormatProto
747+
ResolvedTimestampsIntervalMs: 1000
748+
}
749+
)");
750+
}
751+
704752
Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) {
705753
T t;
706754
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {

0 commit comments

Comments
 (0)