diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 0b684b30705a..8c9f45a1c698 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -238,7 +238,7 @@ class TBaseChangeSender { EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) { if (Senders.contains(partitionId)) { auto& sender = Senders.at(partitionId); - sender.Prepared.push_back(std::move(it->second)); + sender.Prepared.push_back(it->second); if (!sender.ActorId) { Y_ABORT_UNLESS(!sender.Ready); registrations.insert(partitionId); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index af843eca5676..5b35463f1fcc 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -3230,6 +3231,45 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampsMultiplePartitions) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions().Shards(2)); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + TVector>> records(2); // partition to records + while (true) { + for (ui32 i = 0; i < records.size(); ++i) { + records[i] = GetRecords(*server->GetRuntime(), edgeActor, "/Root/Table/Stream", i); + } + + if (AllOf(records, [](const auto& x) { return !x.empty(); })) { + break; + } + + SimulateSleep(server, TDuration::Seconds(1)); + } + + UNIT_ASSERT(records.size() > 1); + UNIT_ASSERT(!records[0].empty()); + AssertJsonsEqual(records[0][0].second, R"({"resolved":"***"})"); + + for (ui32 i = 1; i < records.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(records[i][0].second, records[0][0].second); + } + } + Y_UNIT_TEST(InitialScanAndResolvedTimestamps) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())