Skip to content

Commit 75f11e8

Browse files
authored
Merge 2a9dc4c into d0fa983
2 parents d0fa983 + 2a9dc4c commit 75f11e8

9 files changed

+210
-34
lines changed

ydb/core/tablet_flat/flat_database.cpp

+30-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <ydb/core/util/pb.h>
1414
#include <ydb/core/scheme_types/scheme_type_registry.h>
1515
#include <util/generic/cast.h>
16+
#include <util/stream/output.h>
1617

1718

1819
#define MAX_REDO_BYTES_PER_COMMIT 268435456U // 256MB
@@ -21,6 +22,26 @@
2122
namespace NKikimr {
2223
namespace NTable {
2324

25+
bool TDatabase::TChangeCounter::operator<(const TChangeCounter& rhs) const {
26+
if (Serial && rhs.Serial) {
27+
// When both counters have serial they can be compared directly
28+
return Serial < rhs.Serial;
29+
}
30+
31+
if (Epoch == rhs.Epoch) {
32+
// When this counter is (0, epoch) but rhs is (non-zero, epoch), it
33+
// indicates rhs may have more changes. When serial is zero it means
34+
// the current memtable is empty, but rhs epoch is the same, so it
35+
// cannot have fewer changes.
36+
return Serial < rhs.Serial;
37+
}
38+
39+
// The one with the smaller epoch must have fewer changes. In the worst
40+
// case that change may have been a flush (incrementing epoch and serial)
41+
// and then compact (possibly resetting serial to zero).
42+
return Epoch < rhs.Epoch;
43+
}
44+
2445
TDatabase::TDatabase(TDatabaseImpl *databaseImpl) noexcept
2546
: DatabaseImpl(databaseImpl ? databaseImpl : new TDatabaseImpl(0, new TScheme, nullptr))
2647
, NoMoreReadsFlag(true)
@@ -500,7 +521,7 @@ void TDatabase::SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr)
500521
Require(table)->SetTableObserver(std::move(ptr));
501522
}
502523

503-
TDatabase::TChg TDatabase::Head(ui32 table) const noexcept
524+
TDatabase::TChangeCounter TDatabase::Head(ui32 table) const noexcept
504525
{
505526
if (table == Max<ui32>()) {
506527
return { DatabaseImpl->Serial(), TEpoch::Max() };
@@ -844,3 +865,11 @@ void DebugDumpDb(const TDatabase &db) {
844865
}
845866

846867
}}
868+
869+
Y_DECLARE_OUT_SPEC(, NKikimr::NTable::TDatabase::TChangeCounter, stream, value) {
870+
stream << "TChangeCounter{serial=";
871+
stream << value.Serial;
872+
stream << ", epoch=";
873+
stream << value.Epoch;
874+
stream << "}";
875+
}

ydb/core/tablet_flat/flat_database.h

+35-10
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,37 @@ class TDatabase {
4949
TVector<std::function<void()>> OnPersistent;
5050
};
5151

52-
struct TChg {
53-
ui64 Serial;
54-
TEpoch Epoch;
52+
struct TChangeCounter {
53+
/**
54+
* Monotonic change counter for a table or an entire database. Serial
55+
* is incremented and persisted on each successful Commit() that has
56+
* data changes (i.e. not empty). Note: this may or may not be zero
57+
* when table has no changes, or when all changes have been compacted.
58+
*/
59+
ui64 Serial = 0;
60+
61+
/**
62+
* Monotonic epoch of a table's current memtable. This is incremented
63+
* each time a memtable is flushed and a new one is started. The
64+
* current memtable may or may not have additional changes.
65+
*/
66+
TEpoch Epoch = TEpoch::Zero();
67+
68+
TChangeCounter() = default;
69+
70+
TChangeCounter(ui64 serial, TEpoch epoch)
71+
: Serial(serial)
72+
, Epoch(epoch)
73+
{}
74+
75+
bool operator==(const TChangeCounter& rhs) const = default;
76+
bool operator!=(const TChangeCounter& rhs) const = default;
77+
78+
/**
79+
* Compares two change counters, such that when a < b then b either
80+
* has more changes than a, or it's impossible to determine.
81+
*/
82+
bool operator<(const TChangeCounter& rhs) const;
5583
};
5684

5785
TDatabase(const TDatabase&) = delete;
@@ -60,14 +88,11 @@ class TDatabase {
6088

6189
void SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept;
6290

63-
/* Returns durable monotonic change number for table or entire database
64-
on default (table = Max<ui32>()). Serial is incremented for each
65-
successful Commit(). AHTUNG: Serial may go to the past in case of
66-
migration to older db versions with (Evolution < 18). Thus do not
67-
rely on durability until of kikimr stable 18-08.
91+
/**
92+
* Returns durable monotonic change counter for a table (or a database when
93+
* table = Max<ui32>() by default).
6894
*/
69-
70-
TChg Head(ui32 table = Max<ui32>()) const noexcept;
95+
TChangeCounter Head(ui32 table = Max<ui32>()) const noexcept;
7196

7297
/*_ Call Next() before accessing each row including the 1st row. */
7398

ydb/core/tx/datashard/datashard__init.cpp

+12-16
Original file line numberDiff line numberDiff line change
@@ -694,10 +694,12 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
694694
Y_ABORT_UNLESS(userTablesSchema, "UserTables");
695695

696696
// Check if tables changed since last time we synchronized them
697-
ui64 lastSysUpdate = txc.DB.Head(Schema::Sys::TableId).Serial;
698-
ui64 lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId).Serial;
699-
ui64 lastSnapshotsUpdate = scheme.GetTableInfo(Schema::Snapshots::TableId)
700-
? txc.DB.Head(Schema::Snapshots::TableId).Serial : 0;
697+
NTable::TDatabase::TChangeCounter lastSysUpdate = txc.DB.Head(Schema::Sys::TableId);
698+
NTable::TDatabase::TChangeCounter lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId);
699+
NTable::TDatabase::TChangeCounter lastSnapshotsUpdate;
700+
if (scheme.GetTableInfo(Schema::Snapshots::TableId)) {
701+
lastSnapshotsUpdate = txc.DB.Head(Schema::Snapshots::TableId);
702+
}
701703

702704
NIceDb::TNiceDb db(txc.DB);
703705

@@ -733,10 +735,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
733735
if (FollowerState.LastSysUpdate < lastSysUpdate) {
734736
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
735737
"Updating sys metadata on follower, tabletId " << TabletID()
736-
<< " prevGen " << (FollowerState.LastSysUpdate >> 32)
737-
<< " prevStep " << (FollowerState.LastSysUpdate & (ui32)-1)
738-
<< " newGen " << (lastSysUpdate >> 32)
739-
<< " newStep " << (lastSysUpdate & (ui32)-1));
738+
<< " prev " << FollowerState.LastSysUpdate
739+
<< " current " << lastSysUpdate);
740740

741741
bool ready = true;
742742
ready &= SysGetUi64(db, Schema::Sys_PathOwnerId, PathOwnerId);
@@ -752,10 +752,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
752752
if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) {
753753
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
754754
"Updating tables metadata on follower, tabletId " << TabletID()
755-
<< " prevGen " << (FollowerState.LastSchemeUpdate >> 32)
756-
<< " prevStep " << (FollowerState.LastSchemeUpdate & (ui32)-1)
757-
<< " newGen " << (lastSchemeUpdate >> 32)
758-
<< " newStep " << (lastSchemeUpdate & (ui32)-1));
755+
<< " prev " << FollowerState.LastSchemeUpdate
756+
<< " current " << lastSchemeUpdate);
759757

760758
struct TRow {
761759
TPathId TableId;
@@ -825,10 +823,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
825823
if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) {
826824
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
827825
"Updating snapshots metadata on follower, tabletId " << TabletID()
828-
<< " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32)
829-
<< " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1)
830-
<< " newGen " << (lastSnapshotsUpdate >> 32)
831-
<< " newStep " << (lastSnapshotsUpdate & (ui32)-1));
826+
<< " prev " << FollowerState.LastSnapshotsUpdate
827+
<< " current " << lastSnapshotsUpdate);
832828

833829
NIceDb::TNiceDb db(txc.DB);
834830
if (!SnapshotManager.ReloadSnapshots(db)) {

ydb/core/tx/datashard/datashard__stats.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -428,9 +428,7 @@ class TDataShard::TTxInitiateStatsUpdate : public NTabletFlatExecutor::TTransact
428428
void CheckIdleMemCompaction(const TUserTable& table, TTransactionContext& txc, const TActorContext& ctx) {
429429
// Note: we only care about changes in the main table
430430
auto lastTableChange = txc.DB.Head(table.LocalTid);
431-
if (table.LastTableChange.Serial != lastTableChange.Serial ||
432-
table.LastTableChange.Epoch != lastTableChange.Epoch)
433-
{
431+
if (table.LastTableChange != lastTableChange) {
434432
table.LastTableChange = lastTableChange;
435433
table.LastTableChangeTimestamp = ctx.Monotonic();
436434
return;

ydb/core/tx/datashard/datashard_impl.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -2395,9 +2395,9 @@ class TDataShard
23952395

23962396
// For follower only
23972397
struct TFollowerState {
2398-
ui64 LastSysUpdate = 0;
2399-
ui64 LastSchemeUpdate = 0;
2400-
ui64 LastSnapshotsUpdate = 0;
2398+
NTable::TDatabase::TChangeCounter LastSysUpdate;
2399+
NTable::TDatabase::TChangeCounter LastSchemeUpdate;
2400+
NTable::TDatabase::TChangeCounter LastSnapshotsUpdate;
24012401
};
24022402

24032403
//

ydb/core/tx/datashard/datashard_user_table.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ struct TUserTable : public TThrRefBase {
400400
mutable TStats Stats;
401401
mutable bool StatsUpdateInProgress = false;
402402
mutable bool StatsNeedUpdate = true;
403-
mutable NTable::TDatabase::TChg LastTableChange{ 0, NTable::TEpoch::Zero() };
403+
mutable NTable::TDatabase::TChangeCounter LastTableChange;
404404
mutable TMonotonic LastTableChangeTimestamp;
405405

406406
ui32 SpecialColTablet = Max<ui32>();

ydb/core/tx/datashard/datashard_ut_followers.cpp

+96
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include "datashard_ut_common_kqp.h"
33
#include "datashard_ut_read_table.h"
44

5+
#include <ydb/library/actors/core/mon.h>
6+
57
namespace NKikimr {
68

79
using namespace NKikimr::NDataShard;
@@ -162,6 +164,100 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) {
162164
}
163165
}
164166

167+
Y_UNIT_TEST(FollowerRebootAfterSysCompaction) {
168+
TPortManager pm;
169+
TServerSettings serverSettings(pm.GetPort(2134));
170+
serverSettings.SetDomainName("Root")
171+
.SetUseRealThreads(false)
172+
.SetEnableForceFollowers(true);
173+
174+
Tests::TServer::TPtr server = new TServer(serverSettings);
175+
auto &runtime = *server->GetRuntime();
176+
auto sender = runtime.AllocateEdgeActor();
177+
178+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
179+
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
180+
181+
InitRoot(server, sender);
182+
183+
CreateShardedTable(server, sender, "/Root", "table-1",
184+
TShardedTableOptions()
185+
.Shards(2)
186+
.Followers(1));
187+
188+
const auto shards = GetTableShards(server, sender, "/Root/table-1");
189+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
190+
191+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);");
192+
193+
// Wait for leader to promote the follower read edge (and stop writing to the Sys table)
194+
Cerr << "... sleeping" << Endl;
195+
runtime.SimulateSleep(TDuration::Seconds(1));
196+
197+
UNIT_ASSERT_VALUES_EQUAL(
198+
KqpSimpleStaleRoExec(runtime,
199+
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
200+
"/Root"),
201+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
202+
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
203+
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");
204+
205+
// Now we ask the leader to compact the Sys table
206+
{
207+
NActorsProto::TRemoteHttpInfo pb;
208+
pb.SetMethod(HTTP_METHOD_GET);
209+
pb.SetPath("/executorInternals");
210+
auto* p1 = pb.AddQueryParams();
211+
p1->SetKey("force_compaction");
212+
p1->SetValue("1");
213+
SendViaPipeCache(runtime, shards.at(0), sender,
214+
std::make_unique<NMon::TEvRemoteHttpInfo>(std::move(pb)));
215+
auto ev = runtime.GrabEdgeEventRethrow<NMon::TEvRemoteHttpInfoRes>(sender);
216+
UNIT_ASSERT_C(
217+
ev->Get()->Html.Contains("Table will be compacted in the near future"),
218+
ev->Get()->Html);
219+
}
220+
221+
// Allow table to finish compaction
222+
Cerr << "... sleeping" << Endl;
223+
runtime.SimulateSleep(TDuration::Seconds(1));
224+
225+
// Reboot follower
226+
Cerr << "... killing follower" << Endl;
227+
SendViaPipeCache(runtime, shards.at(0), sender,
228+
std::make_unique<TEvents::TEvPoison>(),
229+
{ .Follower = true });
230+
231+
// Allow it to boot properly
232+
Cerr << "... sleeping" << Endl;
233+
runtime.SimulateSleep(TDuration::Seconds(1));
234+
235+
// Read from follower must succeed
236+
Cerr << "... checking" << Endl;
237+
UNIT_ASSERT_VALUES_EQUAL(
238+
KqpSimpleStaleRoExec(runtime,
239+
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
240+
"/Root"),
241+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
242+
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
243+
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");
244+
245+
// Update row values and sleep
246+
Cerr << "... updating rows" << Endl;
247+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);");
248+
runtime.SimulateSleep(TDuration::Seconds(1));
249+
250+
// Read from follower must see updated values
251+
Cerr << "... checking" << Endl;
252+
UNIT_ASSERT_VALUES_EQUAL(
253+
KqpSimpleStaleRoExec(runtime,
254+
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
255+
"/Root"),
256+
"{ items { uint32_value: 1 } items { uint32_value: 44 } }, "
257+
"{ items { uint32_value: 2 } items { uint32_value: 55 } }, "
258+
"{ items { uint32_value: 3 } items { uint32_value: 66 } }");
259+
}
260+
165261
} // Y_UNIT_TEST_SUITE(DataShardFollowers)
166262

167263
} // namespace NKikimr

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "datashard_ut_common.h"
22

33
#include <ydb/core/base/tablet.h>
4+
#include <ydb/core/base/tablet_pipecache.h>
45
#include <ydb/core/base/tablet_resolver.h>
56
#include <ydb/core/scheme/scheme_types_defs.h>
67
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -2389,4 +2390,22 @@ TString ReadShardedTable(
23892390
return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result;
23902391
}
23912392

2393+
void SendViaPipeCache(
2394+
TTestActorRuntime& runtime,
2395+
ui64 tabletId, const TActorId& sender,
2396+
std::unique_ptr<IEventBase> msg,
2397+
const TSendViaPipeCacheOptions& options)
2398+
{
2399+
ui32 nodeIndex = sender.NodeId() - runtime.GetNodeId(0);
2400+
runtime.Send(
2401+
new IEventHandle(
2402+
MakePipePeNodeCacheID(options.Follower),
2403+
sender,
2404+
new TEvPipeCache::TEvForward(msg.release(), tabletId, options.Subscribe),
2405+
options.Flags,
2406+
options.Cookie),
2407+
nodeIndex,
2408+
/* viaActorSystem */ true);
2409+
}
2410+
23922411
}

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

+13
Original file line numberDiff line numberDiff line change
@@ -819,4 +819,17 @@ void WaitFor(TTestActorRuntime& runtime, TCondition&& condition, const TString&
819819
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
820820
}
821821

822+
struct TSendViaPipeCacheOptions {
823+
ui32 Flags = 0;
824+
ui64 Cookie = 0;
825+
bool Follower = false;
826+
bool Subscribe = false;
827+
};
828+
829+
void SendViaPipeCache(
830+
TTestActorRuntime& runtime,
831+
ui64 tabletId, const TActorId& sender,
832+
std::unique_ptr<IEventBase> msg,
833+
const TSendViaPipeCacheOptions& options = {});
834+
822835
}

0 commit comments

Comments
 (0)