Skip to content

Commit 6fa14ca

Browse files
committed
Fix resolved timestamp emitted too early for some displaced upserts
1 parent 154f927 commit 6fa14ca

File tree

2 files changed

+162
-18
lines changed

2 files changed

+162
-18
lines changed

ydb/core/tx/datashard/cdc_stream_heartbeat.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() {
9595
return;
9696
}
9797

98+
// We may possibly have more writes at this version
99+
TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite);
100+
bool wait = true;
101+
98102
if (const auto& plan = TransQueue.GetPlan()) {
99-
const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
100-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
101-
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
102-
}
103-
return;
103+
edge = Min(edge, plan.begin()->ToRowVersion());
104+
wait = false;
104105
}
105106

106107
if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
107-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
108-
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
109-
}
110-
return;
108+
edge = Min(edge, version);
109+
wait = false;
111110
}
112111

113-
const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
114-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
115-
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
112+
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) {
113+
return Execute(new TTxCdcStreamEmitHeartbeats(this, edge));
116114
}
117115

118-
WaitPlanStep(lowest.Next().Step);
116+
if (wait) {
117+
WaitPlanStep(lowest.Next().Step);
118+
}
119119
}
120120

121121
void TCdcStreamHeartbeatManager::Reset() {
@@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
215215
return false;
216216
}
217217

218-
if (Schedule.top().Version > edge) {
218+
if (Schedule.top().Version >= edge) {
219219
return false;
220220
}
221221

@@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
225225
THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbeatManager::EmitHeartbeats(
226226
NTable::TDatabase& db, const TRowVersion& edge)
227227
{
228-
if (Schedule.empty() || Schedule.top().Version > edge) {
228+
if (!ShouldEmitHeartbeat(edge)) {
229229
return {};
230230
}
231231

@@ -234,7 +234,7 @@ THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbea
234234

235235
while (true) {
236236
const auto& top = Schedule.top();
237-
if (top.Version > edge) {
237+
if (top.Version >= edge) {
238238
break;
239239
}
240240

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 146 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/persqueue/events/global.h>
77
#include <ydb/core/persqueue/user_info.h>
88
#include <ydb/core/persqueue/write_meta.h>
9+
#include <ydb/core/testlib/actors/block_events.h>
910
#include <ydb/core/tx/scheme_board/events.h>
1011
#include <ydb/core/tx/scheme_board/events_internal.h>
1112
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -2065,17 +2066,25 @@ Y_UNIT_TEST_SUITE(Cdc) {
20652066
return result;
20662067
}
20672068

2068-
void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
2069+
TVector<NJson::TJsonValue> WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
20692070
while (true) {
20702071
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
2072+
for (size_t i = 0; i < records.size(); ++i) {
2073+
Cerr << "... " << i << ": " << records[i].second << Endl;
2074+
}
20712075
for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
20722076
AssertJsonsEqual(records.at(i).second, expected.at(i));
20732077
}
20742078

20752079
if (records.size() >= expected.size()) {
20762080
UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
20772081
"Unexpected record: " << records.at(expected.size()).second);
2078-
break;
2082+
TVector<NJson::TJsonValue> values;
2083+
for (const auto& pr : records) {
2084+
bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back());
2085+
Y_ABORT_UNLESS(ok);
2086+
}
2087+
return values;
20792088
}
20802089

20812090
SimulateSleep(server, TDuration::Seconds(1));
@@ -3779,6 +3788,141 @@ Y_UNIT_TEST_SUITE(Cdc) {
37793788
});
37803789
}
37813790

3791+
Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) {
3792+
TPortManager portManager;
3793+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3794+
.SetUseRealThreads(false)
3795+
.SetDomainName("Root")
3796+
);
3797+
3798+
TDisableDataShardLogBatching disableDataShardLogBatching;
3799+
3800+
auto& runtime = *server->GetRuntime();
3801+
const auto edgeActor = runtime.AllocateEdgeActor();
3802+
3803+
SetupLogging(runtime);
3804+
InitRoot(server, edgeActor);
3805+
SetSplitMergePartCountLimit(&runtime, -1);
3806+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3807+
3808+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3809+
WithVirtualTimestamps(WithResolvedTimestamps(
3810+
TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))));
3811+
3812+
Cerr << "... prepare" << Endl;
3813+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3814+
R"({"resolved":"***"})",
3815+
});
3816+
3817+
KqpSimpleExec(runtime, R"(
3818+
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3819+
)");
3820+
3821+
auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3822+
R"({"resolved":"***"})",
3823+
R"({"update":{"value":10},"key":[1],"ts":"***"})",
3824+
R"({"resolved":"***"})",
3825+
});
3826+
3827+
// Take the final step
3828+
ui64 lastStep = records.back()["resolved"][0].GetUInteger();
3829+
Cerr << "... last heartbeat at " << lastStep << Endl;
3830+
3831+
const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table");
3832+
const auto shards = GetTableShards(server, edgeActor, "/Root/Table");
3833+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
3834+
3835+
ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
3836+
ui64 snapshotStep = lastStep + 3000 - 1;
3837+
ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep));
3838+
3839+
TBlockEvents<TEvMediatorTimecast::TEvGranularUpdate> blockedGranularUpdates(runtime,
3840+
[&](auto& ev) {
3841+
return ev->Get()->Record.GetLatestStep() > snapshotStep;
3842+
});
3843+
TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates(runtime,
3844+
[&](auto& ev) {
3845+
return ev->Get()->Record.GetTimeBarrier() > snapshotStep;
3846+
});
3847+
3848+
Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl;
3849+
{
3850+
auto req = std::make_unique<TEvDataShard::TEvRead>();
3851+
{
3852+
auto& record = req->Record;
3853+
record.SetReadId(1);
3854+
record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
3855+
record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
3856+
record.AddColumns(1);
3857+
record.AddColumns(2);
3858+
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
3859+
ui32 key = 1;
3860+
TVector<TCell> keys;
3861+
keys.push_back(TCell::Make(key));
3862+
req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys)));
3863+
record.MutableSnapshot()->SetStep(snapshotStep);
3864+
record.MutableSnapshot()->SetTxId(Max<ui64>());
3865+
}
3866+
ForwardToTablet(runtime, shards.at(0), edgeActor, req.release());
3867+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(edgeActor);
3868+
auto* res = ev->Get();
3869+
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
3870+
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true);
3871+
Cerr << "... read finished" << Endl;
3872+
}
3873+
for (int i = 0; i < 10; ++i) {
3874+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3875+
}
3876+
3877+
Cerr << "... starting upsert 1 (expected to displace)" << Endl;
3878+
auto upsert1 = KqpSimpleSend(runtime, R"(
3879+
UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3880+
)");
3881+
for (int i = 0; i < 10; ++i) {
3882+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3883+
}
3884+
3885+
Cerr << "... starting upsert 2 (expected to displace)" << Endl;
3886+
auto upsert2 = KqpSimpleSend(runtime, R"(
3887+
UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3888+
)");
3889+
for (int i = 0; i < 10; ++i) {
3890+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3891+
}
3892+
3893+
Cerr << "... unblocking updates" << Endl;
3894+
blockedGranularUpdates.Unblock().Stop();
3895+
blockedUpdates.Unblock().Stop();
3896+
for (int i = 0; i < 10; ++i) {
3897+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3898+
}
3899+
3900+
Cerr << "... checking the update is logged before the new resolved timestamp" << Endl;
3901+
records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3902+
R"({"resolved":"***"})",
3903+
R"({"update":{"value":10},"key":[1],"ts":"***"})",
3904+
R"({"resolved":"***"})",
3905+
R"({"update":{"value":20},"key":[2],"ts":"***"})",
3906+
R"({"update":{"value":30},"key":[3],"ts":"***"})",
3907+
R"({"resolved":"***"})",
3908+
});
3909+
3910+
TRowVersion resolved(0, 0);
3911+
for (auto& record : records) {
3912+
if (record.Has("resolved")) {
3913+
resolved.Step = record["resolved"][0].GetUInteger();
3914+
resolved.TxId = record["resolved"][1].GetUInteger();
3915+
}
3916+
if (record.Has("ts")) {
3917+
TRowVersion ts(
3918+
record["ts"][0].GetUInteger(),
3919+
record["ts"][1].GetUInteger());
3920+
UNIT_ASSERT_C(resolved < ts,
3921+
"Record with ts " << ts << " after resolved " << resolved);
3922+
}
3923+
}
3924+
}
3925+
37823926
} // Cdc
37833927

37843928
} // NKikimr

0 commit comments

Comments
 (0)