Skip to content

Commit 1c163a8

Browse files
committed
Fix bug in records broadcasting (ydb-platform#5513)
1 parent 2d79dc5 commit 1c163a8

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

ydb/core/change_exchange/change_sender_common_ops.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ void TBaseChangeSender::SendRecords() {
197197
EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) {
198198
if (Senders.contains(partitionId)) {
199199
auto& sender = Senders.at(partitionId);
200-
sender.Prepared.push_back(std::move(it->second));
200+
sender.Prepared.push_back(it->second);
201201
if (!sender.ActorId) {
202202
Y_ABORT_UNLESS(!sender.Ready);
203203
registrations.insert(partitionId);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

+40
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <library/cpp/json/json_reader.h>
1616
#include <library/cpp/json/json_writer.h>
1717

18+
#include <util/generic/algorithm.h>
1819
#include <util/generic/size_literals.h>
1920
#include <util/string/join.h>
2021
#include <util/string/printf.h>
@@ -3147,6 +3148,45 @@ Y_UNIT_TEST_SUITE(Cdc) {
31473148
});
31483149
}
31493150

3151+
Y_UNIT_TEST(ResolvedTimestampsMultiplePartitions) {
3152+
TPortManager portManager;
3153+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3154+
.SetUseRealThreads(false)
3155+
.SetDomainName("Root")
3156+
);
3157+
3158+
auto& runtime = *server->GetRuntime();
3159+
const auto edgeActor = runtime.AllocateEdgeActor();
3160+
3161+
SetupLogging(runtime);
3162+
InitRoot(server, edgeActor);
3163+
CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions().Shards(2));
3164+
3165+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3166+
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
3167+
3168+
TVector<TVector<std::pair<TString, TString>>> records(2); // partition to records
3169+
while (true) {
3170+
for (ui32 i = 0; i < records.size(); ++i) {
3171+
records[i] = GetRecords(*server->GetRuntime(), edgeActor, "/Root/Table/Stream", i);
3172+
}
3173+
3174+
if (AllOf(records, [](const auto& x) { return !x.empty(); })) {
3175+
break;
3176+
}
3177+
3178+
SimulateSleep(server, TDuration::Seconds(1));
3179+
}
3180+
3181+
UNIT_ASSERT(records.size() > 1);
3182+
UNIT_ASSERT(!records[0].empty());
3183+
AssertJsonsEqual(records[0][0].second, R"({"resolved":"***"})");
3184+
3185+
for (ui32 i = 1; i < records.size(); ++i) {
3186+
UNIT_ASSERT_VALUES_EQUAL(records[i][0].second, records[0][0].second);
3187+
}
3188+
}
3189+
31503190
Y_UNIT_TEST(InitialScanAndResolvedTimestamps) {
31513191
TPortManager portManager;
31523192
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

0 commit comments

Comments
 (0)