Skip to content

Commit 28e1d6d

Browse files
committed
Fix bug with broadcasting records (ydb-platform#6585)
1 parent 30b2fa8 commit 28e1d6d

File tree

2 files changed

+109
-61
lines changed

2 files changed

+109
-61
lines changed

ydb/core/change_exchange/change_sender_common_ops.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ bool TBaseChangeSender::AddBroadcastPartition(ui64 order, ui64 partitionId) {
351351
Y_ABORT_UNLESS(it != Broadcasting.end());
352352

353353
auto& broadcast = it->second;
354-
if (broadcast.Partitions.contains(partitionId)) {
354+
if (broadcast.CompletedPartitions.contains(partitionId)) {
355355
return false;
356356
}
357357

ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp

+108-60
Original file line numberDiff line numberDiff line change
@@ -518,84 +518,87 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
518518
});
519519
}
520520

521+
bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState,
522+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>& tablePartitions,
523+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>& topicPartitions)
524+
{
525+
for (const auto& topicPartition : topicPartitions) {
526+
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
527+
{
528+
auto& record = *request->Record.MutablePartitionRequest();
529+
record.SetPartition(topicPartition.GetPartitionId());
530+
auto& cmd = *record.MutableCmdGetMaxSeqNo();
531+
for (const auto& tablePartition : tablePartitions) {
532+
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
533+
}
534+
}
535+
536+
const auto& sender = runtime.AllocateEdgeActor();
537+
ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release());
538+
539+
auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
540+
{
541+
const auto& record = response->Get()->Record.GetPartitionResponse();
542+
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();
543+
544+
UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size());
545+
for (const auto& item: result) {
546+
if (item.GetState() != expectedState) {
547+
return false;
548+
}
549+
}
550+
}
551+
}
552+
553+
return true;
554+
}
555+
521556
struct TItem {
522557
TString Path;
523-
ui32 nPartitions;
558+
ui32 ExpectedPartitionCount;
524559
};
525560

526-
void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) {
561+
void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic,
562+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>* initialTablePartitions = nullptr)
563+
{
527564
auto tableDesc = DescribePath(runtime, table.Path, true, true);
528565
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
529-
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions);
566+
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount);
530567

531568
auto topicDesc = DescribePrivatePath(runtime, topic.Path);
532569
const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions();
533-
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions);
570+
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount);
534571

535572
while (true) {
536573
runtime.SimulateSleep(TDuration::Seconds(1));
537-
bool done = true;
538-
539-
for (ui32 i = 0; i < topic.nPartitions; ++i) {
540-
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
541-
{
542-
auto& record = *request->Record.MutablePartitionRequest();
543-
record.SetPartition(topicPartitions[i].GetPartitionId());
544-
auto& cmd = *record.MutableCmdGetMaxSeqNo();
545-
for (const auto& tablePartition : tablePartitions) {
546-
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
547-
}
548-
}
549-
550-
const auto& sender = runtime.AllocateEdgeActor();
551-
ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release());
552-
553-
auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
554-
{
555-
const auto& record = response->Get()->Record.GetPartitionResponse();
556-
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();
557-
558-
UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions);
559-
for (const auto& item: result) {
560-
done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED;
561-
if (!done) {
562-
break;
563-
}
564-
}
565-
}
566-
567-
if (!done) {
568-
break;
569-
}
570-
}
571-
572-
if (done) {
574+
if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) {
573575
break;
574576
}
575577
}
578+
579+
if (initialTablePartitions) {
580+
UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions));
581+
}
576582
}
577583

578-
Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
584+
template <typename T>
585+
void SplitTable(const TString& cdcStreamDesc) {
579586
T t;
580587
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
588+
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
581589
{
582590
TInactiveZone inactive(activeZone);
591+
583592
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
584593
Name: "Table"
585594
Columns { Name: "key" Type: "Uint32" }
586595
Columns { Name: "value" Type: "Uint32" }
587596
KeyColumnNames: ["key"]
588597
)");
589598
t.TestEnv->TestWaitNotification(runtime, t.TxId);
599+
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);
590600

591-
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
592-
TableName: "Table"
593-
StreamDescription {
594-
Name: "Stream"
595-
Mode: ECdcStreamModeKeysOnly
596-
Format: ECdcStreamFormatProto
597-
}
598-
)");
601+
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
599602
t.TestEnv->TestWaitNotification(runtime, t.TxId);
600603
}
601604

@@ -613,16 +616,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
613616
TInactiveZone inactive(activeZone);
614617
UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
615618
UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max<ui32>()});
616-
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
619+
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1},
620+
&initialTableDesc.GetPathDescription().GetTablePartitions());
617621
}
618622
});
619623
}
620624

621-
Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
625+
Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
626+
SplitTable<T>(R"(
627+
TableName: "Table"
628+
StreamDescription {
629+
Name: "Stream"
630+
Mode: ECdcStreamModeKeysOnly
631+
Format: ECdcStreamFormatProto
632+
}
633+
)");
634+
}
635+
636+
Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) {
637+
SplitTable<T>(R"(
638+
TableName: "Table"
639+
StreamDescription {
640+
Name: "Stream"
641+
Mode: ECdcStreamModeKeysOnly
642+
Format: ECdcStreamFormatProto
643+
ResolvedTimestampsIntervalMs: 1000
644+
}
645+
)");
646+
}
647+
648+
template <typename T>
649+
void MergeTable(const TString& cdcStreamDesc) {
622650
T t;
623651
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
652+
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
624653
{
625654
TInactiveZone inactive(activeZone);
655+
626656
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
627657
Name: "Table"
628658
Columns { Name: "key" Type: "Uint32" }
@@ -636,15 +666,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
636666
}
637667
)");
638668
t.TestEnv->TestWaitNotification(runtime, t.TxId);
669+
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);
639670

640-
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
641-
TableName: "Table"
642-
StreamDescription {
643-
Name: "Stream"
644-
Mode: ECdcStreamModeKeysOnly
645-
Format: ECdcStreamFormatProto
646-
}
647-
)");
671+
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
648672
t.TestEnv->TestWaitNotification(runtime, t.TxId);
649673
}
650674

@@ -657,11 +681,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
657681
{
658682
TInactiveZone inactive(activeZone);
659683
UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max<ui32>()});
660-
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
684+
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2},
685+
&initialTableDesc.GetPathDescription().GetTablePartitions());
661686
}
662687
});
663688
}
664689

690+
Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
691+
MergeTable<T>(R"(
692+
TableName: "Table"
693+
StreamDescription {
694+
Name: "Stream"
695+
Mode: ECdcStreamModeKeysOnly
696+
Format: ECdcStreamFormatProto
697+
}
698+
)");
699+
}
700+
701+
Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) {
702+
MergeTable<T>(R"(
703+
TableName: "Table"
704+
StreamDescription {
705+
Name: "Stream"
706+
Mode: ECdcStreamModeKeysOnly
707+
Format: ECdcStreamFormatProto
708+
ResolvedTimestampsIntervalMs: 1000
709+
}
710+
)");
711+
}
712+
665713
Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) {
666714
T t;
667715
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {

0 commit comments

Comments
 (0)