Skip to content

Commit f8e93d3

Browse files
authored
Continue to emit resolved timestamps after merge (#6594)
1 parent da925c3 commit f8e93d3

File tree

3 files changed

+94
-1
lines changed

3 files changed

+94
-1
lines changed

ydb/core/persqueue/partition_sourcemanager.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
8181
}
8282

8383
bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
84-
return !SourceIdWriter.GetSourceIdsToWrite().empty();
84+
return !SourceIdWriter.GetSourceIdsToWrite().empty()
85+
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
8586
}
8687

8788
void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {

ydb/core/persqueue/sourceid.h

+4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class TSourceIdWriter {
8585
return Registrations;
8686
}
8787

88+
const THashSet<TString>& GetSourceIdsToDelete() const {
89+
return Deregistrations;
90+
}
91+
8892
template <typename... Args>
8993
void RegisterSourceId(const TString& sourceId, Args&&... args) {
9094
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

+88
Original file line numberDiff line numberDiff line change
@@ -3600,6 +3600,94 @@ Y_UNIT_TEST_SUITE(Cdc) {
36003600
MustNotLoseSchemaSnapshot(true);
36013601
}
36023602

3603+
Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
3604+
TPortManager portManager;
3605+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3606+
.SetUseRealThreads(false)
3607+
.SetDomainName("Root")
3608+
);
3609+
3610+
auto& runtime = *server->GetRuntime();
3611+
const auto edgeActor = runtime.AllocateEdgeActor();
3612+
3613+
SetupLogging(runtime);
3614+
InitRoot(server, edgeActor);
3615+
SetSplitMergePartCountLimit(&runtime, -1);
3616+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3617+
3618+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3619+
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
3620+
3621+
Cerr << "... prepare" << Endl;
3622+
{
3623+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3624+
R"({"resolved":"***"})",
3625+
});
3626+
3627+
auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table");
3628+
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
3629+
3630+
WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2));
3631+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3632+
R"({"resolved":"***"})",
3633+
R"({"resolved":"***"})",
3634+
});
3635+
}
3636+
3637+
auto initialTabletIds = GetTableShards(server, edgeActor, "/Root/Table");
3638+
UNIT_ASSERT_VALUES_EQUAL(initialTabletIds.size(), 2);
3639+
3640+
std::vector<std::unique_ptr<IEventHandle>> blockedSplitRequests;
3641+
auto blockSplitRequests = runtime.AddObserver<TEvPersQueue::TEvRequest>([&](auto& ev) {
3642+
if (ev->Get()->Record.GetPartitionRequest().HasCmdSplitMessageGroup()) {
3643+
blockedSplitRequests.emplace_back(ev.Release());
3644+
}
3645+
});
3646+
3647+
Cerr << "... merge table" << Endl;
3648+
const auto mergeTxId = AsyncMergeTable(server, edgeActor, "/Root/Table", initialTabletIds);
3649+
WaitFor(runtime, [&]{ return blockedSplitRequests.size() == initialTabletIds.size(); }, "blocked split requests");
3650+
blockSplitRequests.Remove();
3651+
3652+
std::vector<std::unique_ptr<IEventHandle>> blockedRegisterRequests;
3653+
auto blockRegisterRequests = runtime.AddObserver<TEvPersQueue::TEvRequest>([&](auto& ev) {
3654+
if (ev->Get()->Record.GetPartitionRequest().HasCmdRegisterMessageGroup()) {
3655+
blockedRegisterRequests.emplace_back(ev.Release());
3656+
}
3657+
});
3658+
3659+
ui32 splitResponses = 0;
3660+
auto countSplitResponses = runtime.AddObserver<TEvPersQueue::TEvResponse>([&](auto& ev) {
3661+
++splitResponses;
3662+
});
3663+
3664+
Cerr << "... release split requests" << Endl;
3665+
for (auto& ev : std::exchange(blockedSplitRequests, {})) {
3666+
runtime.Send(ev.release(), 0, true);
3667+
WaitFor(runtime, [prev = splitResponses, &splitResponses]{ return splitResponses > prev; }, "split response");
3668+
}
3669+
3670+
Cerr << "... reboot pq tablet" << Endl;
3671+
RebootTablet(runtime, ResolvePqTablet(runtime, edgeActor, "/Root/Table/Stream", 0), edgeActor);
3672+
countSplitResponses.Remove();
3673+
3674+
Cerr << "... release register requests" << Endl;
3675+
blockRegisterRequests.Remove();
3676+
for (auto& ev : std::exchange(blockedRegisterRequests, {})) {
3677+
runtime.Send(ev.release(), 0, true);
3678+
}
3679+
3680+
Cerr << "... wait for merge tx notification" << Endl;
3681+
WaitTxNotification(server, edgeActor, mergeTxId);
3682+
3683+
Cerr << "... wait for final heartbeat" << Endl;
3684+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3685+
R"({"resolved":"***"})",
3686+
R"({"resolved":"***"})",
3687+
R"({"resolved":"***"})",
3688+
});
3689+
}
3690+
36033691
} // Cdc
36043692

36053693
} // NKikimr

0 commit comments

Comments
 (0)