Skip to content

Commit 8418f77

Browse files
committed
24-3: Fix resolved timestamp emitted too early for some displaced upserts
1 parent fac9434 commit 8418f77

File tree

2 files changed

+154
-18
lines changed

2 files changed

+154
-18
lines changed

ydb/core/tx/datashard/cdc_stream_heartbeat.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() {
9595
return;
9696
}
9797

98+
// We may possibly have more writes at this version
99+
TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite);
100+
bool wait = true;
101+
98102
if (const auto& plan = TransQueue.GetPlan()) {
99-
const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
100-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
101-
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
102-
}
103-
return;
103+
edge = Min(edge, plan.begin()->ToRowVersion());
104+
wait = false;
104105
}
105106

106107
if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
107-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
108-
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
109-
}
110-
return;
108+
edge = Min(edge, version);
109+
wait = false;
111110
}
112111

113-
const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
114-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
115-
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
112+
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) {
113+
return Execute(new TTxCdcStreamEmitHeartbeats(this, edge));
116114
}
117115

118-
WaitPlanStep(lowest.Next().Step);
116+
if (wait) {
117+
WaitPlanStep(lowest.Next().Step);
118+
}
119119
}
120120

121121
void TCdcStreamHeartbeatManager::Reset() {
@@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
215215
return false;
216216
}
217217

218-
if (Schedule.top().Version > edge) {
218+
if (Schedule.top().Version >= edge) {
219219
return false;
220220
}
221221

@@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
225225
THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbeatManager::EmitHeartbeats(
226226
NTable::TDatabase& db, const TRowVersion& edge)
227227
{
228-
if (Schedule.empty() || Schedule.top().Version > edge) {
228+
if (!ShouldEmitHeartbeat(edge)) {
229229
return {};
230230
}
231231

@@ -234,7 +234,7 @@ THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbea
234234

235235
while (true) {
236236
const auto& top = Schedule.top();
237-
if (top.Version > edge) {
237+
if (top.Version >= edge) {
238238
break;
239239
}
240240

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/persqueue/events/global.h>
77
#include <ydb/core/persqueue/user_info.h>
88
#include <ydb/core/persqueue/write_meta.h>
9+
#include <ydb/core/testlib/actors/block_events.h>
910
#include <ydb/core/tx/scheme_board/events.h>
1011
#include <ydb/core/tx/scheme_board/events_internal.h>
1112
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
19851986
return result;
19861987
}
19871988

1988-
void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
1989+
TVector<NJson::TJsonValue> WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
19891990
while (true) {
19901991
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
19911992
for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
@@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
19951996
if (records.size() >= expected.size()) {
19961997
UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
19971998
"Unexpected record: " << records.at(expected.size()).second);
1998-
break;
1999+
TVector<NJson::TJsonValue> values;
2000+
for (const auto& pr : records) {
2001+
bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back());
2002+
Y_ABORT_UNLESS(ok);
2003+
}
2004+
return values;
19992005
}
20002006

20012007
SimulateSleep(server, TDuration::Seconds(1));
@@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) {
36923698
});
36933699
}
36943700

3701+
Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) {
3702+
TPortManager portManager;
3703+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3704+
.SetUseRealThreads(false)
3705+
.SetDomainName("Root")
3706+
);
3707+
3708+
TDisableDataShardLogBatching disableDataShardLogBatching;
3709+
3710+
auto& runtime = *server->GetRuntime();
3711+
const auto edgeActor = runtime.AllocateEdgeActor();
3712+
3713+
SetupLogging(runtime);
3714+
InitRoot(server, edgeActor);
3715+
SetSplitMergePartCountLimit(&runtime, -1);
3716+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3717+
3718+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3719+
WithVirtualTimestamps(WithResolvedTimestamps(
3720+
TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))));
3721+
3722+
Cerr << "... prepare" << Endl;
3723+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3724+
R"({"resolved":"***"})",
3725+
});
3726+
3727+
KqpSimpleExec(runtime, R"(
3728+
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3729+
)");
3730+
3731+
auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3732+
R"({"resolved":"***"})",
3733+
R"({"update":{"value":10},"key":[1],"ts":"***"})",
3734+
R"({"resolved":"***"})",
3735+
});
3736+
3737+
// Take the final step
3738+
ui64 lastStep = records.back()["resolved"][0].GetUInteger();
3739+
Cerr << "... last heartbeat at " << lastStep << Endl;
3740+
3741+
const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table");
3742+
const auto shards = GetTableShards(server, edgeActor, "/Root/Table");
3743+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
3744+
3745+
ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
3746+
ui64 snapshotStep = lastStep + 3000 - 1;
3747+
ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep));
3748+
3749+
TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates(runtime,
3750+
[&](auto& ev) {
3751+
return ev->Get()->Record.GetTimeBarrier() > snapshotStep;
3752+
});
3753+
3754+
Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl;
3755+
{
3756+
auto req = std::make_unique<TEvDataShard::TEvRead>();
3757+
{
3758+
auto& record = req->Record;
3759+
record.SetReadId(1);
3760+
record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
3761+
record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
3762+
record.AddColumns(1);
3763+
record.AddColumns(2);
3764+
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
3765+
ui32 key = 1;
3766+
TVector<TCell> keys;
3767+
keys.push_back(TCell::Make(key));
3768+
req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys)));
3769+
record.MutableSnapshot()->SetStep(snapshotStep);
3770+
record.MutableSnapshot()->SetTxId(Max<ui64>());
3771+
}
3772+
ForwardToTablet(runtime, shards.at(0), edgeActor, req.release());
3773+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(edgeActor);
3774+
auto* res = ev->Get();
3775+
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
3776+
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true);
3777+
Cerr << "... read finished" << Endl;
3778+
}
3779+
for (int i = 0; i < 10; ++i) {
3780+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3781+
}
3782+
3783+
Cerr << "... starting upsert 1 (expected to displace)" << Endl;
3784+
auto upsert1 = KqpSimpleSend(runtime, R"(
3785+
UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3786+
)");
3787+
for (int i = 0; i < 10; ++i) {
3788+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3789+
}
3790+
3791+
Cerr << "... starting upsert 2 (expected to displace)" << Endl;
3792+
auto upsert2 = KqpSimpleSend(runtime, R"(
3793+
UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3794+
)");
3795+
for (int i = 0; i < 10; ++i) {
3796+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3797+
}
3798+
3799+
Cerr << "... unblocking updates" << Endl;
3800+
blockedUpdates.Unblock().Stop();
3801+
for (int i = 0; i < 10; ++i) {
3802+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3803+
}
3804+
3805+
Cerr << "... checking the update is logged before the new resolved timestamp" << Endl;
3806+
records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3807+
R"({"resolved":"***"})",
3808+
R"({"update":{"value":10},"key":[1],"ts":"***"})",
3809+
R"({"resolved":"***"})",
3810+
R"({"update":{"value":20},"key":[2],"ts":"***"})",
3811+
R"({"update":{"value":30},"key":[3],"ts":"***"})",
3812+
R"({"resolved":"***"})",
3813+
});
3814+
3815+
TRowVersion resolved(0, 0);
3816+
for (auto& record : records) {
3817+
if (record.Has("resolved")) {
3818+
resolved.Step = record["resolved"][0].GetUInteger();
3819+
resolved.TxId = record["resolved"][1].GetUInteger();
3820+
}
3821+
if (record.Has("ts")) {
3822+
TRowVersion ts(
3823+
record["ts"][0].GetUInteger(),
3824+
record["ts"][1].GetUInteger());
3825+
UNIT_ASSERT_C(resolved < ts,
3826+
"Record with ts " << ts << " after resolved " << resolved);
3827+
}
3828+
}
3829+
}
3830+
36953831
} // Cdc
36963832

36973833
} // NKikimr

0 commit comments

Comments
 (0)