Skip to content

Commit 4cc5556

Browse files
authored
Fix bug in records broadcasting (#5513)
1 parent 9a49209 commit 4cc5556

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

ydb/core/change_exchange/change_sender_common_ops.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class TBaseChangeSender {
238238
EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) {
239239
if (Senders.contains(partitionId)) {
240240
auto& sender = Senders.at(partitionId);
241-
sender.Prepared.push_back(std::move(it->second));
241+
sender.Prepared.push_back(it->second);
242242
if (!sender.ActorId) {
243243
Y_ABORT_UNLESS(!sender.Ready);
244244
registrations.insert(partitionId);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

+40
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <library/cpp/json/json_reader.h>
1717
#include <library/cpp/json/json_writer.h>
1818

19+
#include <util/generic/algorithm.h>
1920
#include <util/generic/size_literals.h>
2021
#include <util/string/join.h>
2122
#include <util/string/printf.h>
@@ -3230,6 +3231,45 @@ Y_UNIT_TEST_SUITE(Cdc) {
32303231
});
32313232
}
32323233

3234+
Y_UNIT_TEST(ResolvedTimestampsMultiplePartitions) {
3235+
TPortManager portManager;
3236+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3237+
.SetUseRealThreads(false)
3238+
.SetDomainName("Root")
3239+
);
3240+
3241+
auto& runtime = *server->GetRuntime();
3242+
const auto edgeActor = runtime.AllocateEdgeActor();
3243+
3244+
SetupLogging(runtime);
3245+
InitRoot(server, edgeActor);
3246+
CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions().Shards(2));
3247+
3248+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3249+
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
3250+
3251+
TVector<TVector<std::pair<TString, TString>>> records(2); // partition to records
3252+
while (true) {
3253+
for (ui32 i = 0; i < records.size(); ++i) {
3254+
records[i] = GetRecords(*server->GetRuntime(), edgeActor, "/Root/Table/Stream", i);
3255+
}
3256+
3257+
if (AllOf(records, [](const auto& x) { return !x.empty(); })) {
3258+
break;
3259+
}
3260+
3261+
SimulateSleep(server, TDuration::Seconds(1));
3262+
}
3263+
3264+
UNIT_ASSERT(records.size() > 1);
3265+
UNIT_ASSERT(!records[0].empty());
3266+
AssertJsonsEqual(records[0][0].second, R"({"resolved":"***"})");
3267+
3268+
for (ui32 i = 1; i < records.size(); ++i) {
3269+
UNIT_ASSERT_VALUES_EQUAL(records[i][0].second, records[0][0].second);
3270+
}
3271+
}
3272+
32333273
Y_UNIT_TEST(InitialScanAndResolvedTimestamps) {
32343274
TPortManager portManager;
32353275
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

0 commit comments

Comments
 (0)