Skip to content

Commit b12c122

Browse files
authored
Fix resolved timestamp emitted too early for some displaced upserts (#8847)
1 parent 3ca8730 commit b12c122

File tree

2 files changed

+159
-18
lines changed

2 files changed

+159
-18
lines changed

ydb/core/tx/datashard/cdc_stream_heartbeat.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() {
9595
return;
9696
}
9797

98+
// We may possibly have more writes at this version
99+
TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite);
100+
bool wait = true;
101+
98102
if (const auto& plan = TransQueue.GetPlan()) {
99-
const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
100-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
101-
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
102-
}
103-
return;
103+
edge = Min(edge, plan.begin()->ToRowVersion());
104+
wait = false;
104105
}
105106

106107
if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
107-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
108-
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
109-
}
110-
return;
108+
edge = Min(edge, version);
109+
wait = false;
111110
}
112111

113-
const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
114-
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
115-
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
112+
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) {
113+
return Execute(new TTxCdcStreamEmitHeartbeats(this, edge));
116114
}
117115

118-
WaitPlanStep(lowest.Next().Step);
116+
if (wait) {
117+
WaitPlanStep(lowest.Next().Step);
118+
}
119119
}
120120

121121
void TCdcStreamHeartbeatManager::Reset() {
@@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
215215
return false;
216216
}
217217

218-
if (Schedule.top().Version > edge) {
218+
if (Schedule.top().Version >= edge) {
219219
return false;
220220
}
221221

@@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
225225
THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbeatManager::EmitHeartbeats(
226226
NTable::TDatabase& db, const TRowVersion& edge)
227227
{
228-
if (Schedule.empty() || Schedule.top().Version > edge) {
228+
if (!ShouldEmitHeartbeat(edge)) {
229229
return {};
230230
}
231231

@@ -234,7 +234,7 @@ THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbea
234234

235235
while (true) {
236236
const auto& top = Schedule.top();
237-
if (top.Version > edge) {
237+
if (top.Version >= edge) {
238238
break;
239239
}
240240

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/persqueue/events/global.h>
77
#include <ydb/core/persqueue/user_info.h>
88
#include <ydb/core/persqueue/write_meta.h>
9+
#include <ydb/core/testlib/actors/block_events.h>
910
#include <ydb/core/tx/scheme_board/events.h>
1011
#include <ydb/core/tx/scheme_board/events_internal.h>
1112
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -2065,7 +2066,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
20652066
return result;
20662067
}
20672068

2068-
void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
2069+
TVector<NJson::TJsonValue> WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
20692070
while (true) {
20702071
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
20712072
for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
@@ -2075,7 +2076,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
20752076
if (records.size() >= expected.size()) {
20762077
UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
20772078
"Unexpected record: " << records.at(expected.size()).second);
2078-
break;
2079+
TVector<NJson::TJsonValue> values;
2080+
for (const auto& pr : records) {
2081+
bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back());
2082+
Y_ABORT_UNLESS(ok);
2083+
}
2084+
return values;
20792085
}
20802086

20812087
SimulateSleep(server, TDuration::Seconds(1));
@@ -3779,6 +3785,141 @@ Y_UNIT_TEST_SUITE(Cdc) {
37793785
});
37803786
}
37813787

3788+
Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) {
3789+
TPortManager portManager;
3790+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3791+
.SetUseRealThreads(false)
3792+
.SetDomainName("Root")
3793+
);
3794+
3795+
TDisableDataShardLogBatching disableDataShardLogBatching;
3796+
3797+
auto& runtime = *server->GetRuntime();
3798+
const auto edgeActor = runtime.AllocateEdgeActor();
3799+
3800+
SetupLogging(runtime);
3801+
InitRoot(server, edgeActor);
3802+
SetSplitMergePartCountLimit(&runtime, -1);
3803+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3804+
3805+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3806+
WithVirtualTimestamps(WithResolvedTimestamps(
3807+
TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))));
3808+
3809+
Cerr << "... prepare" << Endl;
3810+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3811+
R"({"resolved":"***"})",
3812+
});
3813+
3814+
KqpSimpleExec(runtime, R"(
3815+
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3816+
)");
3817+
3818+
auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3819+
R"({"resolved":"***"})",
3820+
R"({"update":{"value":10},"key":[1],"ts":"***"})",
3821+
R"({"resolved":"***"})",
3822+
});
3823+
3824+
// Take the final step
3825+
ui64 lastStep = records.back()["resolved"][0].GetUInteger();
3826+
Cerr << "... last heartbeat at " << lastStep << Endl;
3827+
3828+
const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table");
3829+
const auto shards = GetTableShards(server, edgeActor, "/Root/Table");
3830+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
3831+
3832+
ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
3833+
ui64 snapshotStep = lastStep + 3000 - 1;
3834+
ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep));
3835+
3836+
TBlockEvents<TEvMediatorTimecast::TEvGranularUpdate> blockedGranularUpdates(runtime,
3837+
[&](auto& ev) {
3838+
return ev->Get()->Record.GetLatestStep() > snapshotStep;
3839+
});
3840+
TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates(runtime,
3841+
[&](auto& ev) {
3842+
return ev->Get()->Record.GetTimeBarrier() > snapshotStep;
3843+
});
3844+
3845+
Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl;
3846+
{
3847+
auto req = std::make_unique<TEvDataShard::TEvRead>();
3848+
{
3849+
auto& record = req->Record;
3850+
record.SetReadId(1);
3851+
record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
3852+
record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
3853+
record.AddColumns(1);
3854+
record.AddColumns(2);
3855+
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
3856+
ui32 key = 1;
3857+
TVector<TCell> keys;
3858+
keys.push_back(TCell::Make(key));
3859+
req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys)));
3860+
record.MutableSnapshot()->SetStep(snapshotStep);
3861+
record.MutableSnapshot()->SetTxId(Max<ui64>());
3862+
}
3863+
ForwardToTablet(runtime, shards.at(0), edgeActor, req.release());
3864+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(edgeActor);
3865+
auto* res = ev->Get();
3866+
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
3867+
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true);
3868+
Cerr << "... read finished" << Endl;
3869+
}
3870+
for (int i = 0; i < 10; ++i) {
3871+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3872+
}
3873+
3874+
Cerr << "... starting upsert 1 (expected to displace)" << Endl;
3875+
auto upsert1 = KqpSimpleSend(runtime, R"(
3876+
UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3877+
)");
3878+
for (int i = 0; i < 10; ++i) {
3879+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3880+
}
3881+
3882+
Cerr << "... starting upsert 2 (expected to displace)" << Endl;
3883+
auto upsert2 = KqpSimpleSend(runtime, R"(
3884+
UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3885+
)");
3886+
for (int i = 0; i < 10; ++i) {
3887+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3888+
}
3889+
3890+
Cerr << "... unblocking updates" << Endl;
3891+
blockedGranularUpdates.Unblock().Stop();
3892+
blockedUpdates.Unblock().Stop();
3893+
for (int i = 0; i < 10; ++i) {
3894+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3895+
}
3896+
3897+
Cerr << "... checking the update is logged before the new resolved timestamp" << Endl;
3898+
records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3899+
R"({"resolved":"***"})",
3900+
R"({"update":{"value":10},"key":[1],"ts":"***"})",
3901+
R"({"resolved":"***"})",
3902+
R"({"update":{"value":20},"key":[2],"ts":"***"})",
3903+
R"({"update":{"value":30},"key":[3],"ts":"***"})",
3904+
R"({"resolved":"***"})",
3905+
});
3906+
3907+
TRowVersion resolved(0, 0);
3908+
for (auto& record : records) {
3909+
if (record.Has("resolved")) {
3910+
resolved.Step = record["resolved"][0].GetUInteger();
3911+
resolved.TxId = record["resolved"][1].GetUInteger();
3912+
}
3913+
if (record.Has("ts")) {
3914+
TRowVersion ts(
3915+
record["ts"][0].GetUInteger(),
3916+
record["ts"][1].GetUInteger());
3917+
UNIT_ASSERT_C(resolved < ts,
3918+
"Record with ts " << ts << " after resolved " << resolved);
3919+
}
3920+
}
3921+
}
3922+
37823923
} // Cdc
37833924

37843925
} // NKikimr

0 commit comments

Comments
 (0)