Skip to content

Commit ed0f418

Browse files
authored
24-1: Fix followers not reading system tables and not processing requests (ydb-platform#4343)
1 parent 11b0174 commit ed0f418

9 files changed

+210
-34
lines changed

ydb/core/tablet_flat/flat_database.cpp

+30-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <ydb/core/util/pb.h>
1414
#include <ydb/core/scheme_types/scheme_type_registry.h>
1515
#include <util/generic/cast.h>
16+
#include <util/stream/output.h>
1617

1718

1819
#define MAX_REDO_BYTES_PER_COMMIT 268435456U // 256MB
@@ -21,6 +22,26 @@
2122
namespace NKikimr {
2223
namespace NTable {
2324

25+
bool TDatabase::TChangeCounter::operator<(const TChangeCounter& rhs) const {
26+
if (Serial && rhs.Serial) {
27+
// When both counters have serial they can be compared directly
28+
return Serial < rhs.Serial;
29+
}
30+
31+
if (Epoch == rhs.Epoch) {
32+
// When this counter is (0, epoch) but rhs is (non-zero, epoch), it
33+
// indicates rhs may have more changes. When serial is zero it means
34+
// the current memtable is empty, but rhs epoch is the same, so it
35+
// cannot have fewer changes.
36+
return Serial < rhs.Serial;
37+
}
38+
39+
// The one with the smaller epoch must have fewer changes. In the worst
40+
// case that change may have been a flush (incrementing epoch and serial)
41+
// and then compact (possibly resetting serial to zero).
42+
return Epoch < rhs.Epoch;
43+
}
44+
2445
TDatabase::TDatabase(TDatabaseImpl *databaseImpl) noexcept
2546
: DatabaseImpl(databaseImpl ? databaseImpl : new TDatabaseImpl(0, new TScheme, nullptr))
2647
, NoMoreReadsFlag(true)
@@ -500,7 +521,7 @@ void TDatabase::SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr)
500521
Require(table)->SetTableObserver(std::move(ptr));
501522
}
502523

503-
TDatabase::TChg TDatabase::Head(ui32 table) const noexcept
524+
TDatabase::TChangeCounter TDatabase::Head(ui32 table) const noexcept
504525
{
505526
if (table == Max<ui32>()) {
506527
return { DatabaseImpl->Serial(), TEpoch::Max() };
@@ -844,3 +865,11 @@ void DebugDumpDb(const TDatabase &db) {
844865
}
845866

846867
}}
868+
869+
Y_DECLARE_OUT_SPEC(, NKikimr::NTable::TDatabase::TChangeCounter, stream, value) {
870+
stream << "TChangeCounter{serial=";
871+
stream << value.Serial;
872+
stream << ", epoch=";
873+
stream << value.Epoch;
874+
stream << "}";
875+
}

ydb/core/tablet_flat/flat_database.h

+35-10
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,37 @@ class TDatabase {
4949
TVector<std::function<void()>> OnPersistent;
5050
};
5151

52-
struct TChg {
53-
ui64 Serial;
54-
TEpoch Epoch;
52+
struct TChangeCounter {
53+
/**
54+
* Monotonic change counter for a table or an entire database. Serial
55+
* is incremented and persisted on each successful Commit() that has
56+
* data changes (i.e. not empty). Note: this may or may not be zero
57+
* when table has no changes, or when all changes have been compacted.
58+
*/
59+
ui64 Serial = 0;
60+
61+
/**
62+
* Monotonic epoch of a table's current memtable. This is incremented
63+
* each time a memtable is flushed and a new one is started. The
64+
* current memtable may or may not have additional changes.
65+
*/
66+
TEpoch Epoch = TEpoch::Zero();
67+
68+
TChangeCounter() = default;
69+
70+
TChangeCounter(ui64 serial, TEpoch epoch)
71+
: Serial(serial)
72+
, Epoch(epoch)
73+
{}
74+
75+
bool operator==(const TChangeCounter& rhs) const = default;
76+
bool operator!=(const TChangeCounter& rhs) const = default;
77+
78+
/**
79+
* Compares two change counters, such that when a < b then b either
80+
* has more changes than a, or it's impossible to determine.
81+
*/
82+
bool operator<(const TChangeCounter& rhs) const;
5583
};
5684

5785
TDatabase(const TDatabase&) = delete;
@@ -60,14 +88,11 @@ class TDatabase {
6088

6189
void SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept;
6290

63-
/* Returns durable monotonic change number for table or entire database
64-
on default (table = Max<ui32>()). Serial is incremented for each
65-
successful Commit(). AHTUNG: Serial may go to the past in case of
66-
migration to older db versions with (Evolution < 18). Thus do not
67-
rely on durability until of kikimr stable 18-08.
91+
/**
92+
* Returns durable monotonic change counter for a table (or a database when
93+
* table = Max<ui32>() by default).
6894
*/
69-
70-
TChg Head(ui32 table = Max<ui32>()) const noexcept;
95+
TChangeCounter Head(ui32 table = Max<ui32>()) const noexcept;
7196

7297
/*_ Call Next() before accessing each row including the 1st row. */
7398

ydb/core/tx/datashard/datashard__init.cpp

+12-16
Original file line numberDiff line numberDiff line change
@@ -698,10 +698,12 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
698698
Y_ABORT_UNLESS(userTablesSchema, "UserTables");
699699

700700
// Check if tables changed since last time we synchronized them
701-
ui64 lastSysUpdate = txc.DB.Head(Schema::Sys::TableId).Serial;
702-
ui64 lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId).Serial;
703-
ui64 lastSnapshotsUpdate = scheme.GetTableInfo(Schema::Snapshots::TableId)
704-
? txc.DB.Head(Schema::Snapshots::TableId).Serial : 0;
701+
NTable::TDatabase::TChangeCounter lastSysUpdate = txc.DB.Head(Schema::Sys::TableId);
702+
NTable::TDatabase::TChangeCounter lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId);
703+
NTable::TDatabase::TChangeCounter lastSnapshotsUpdate;
704+
if (scheme.GetTableInfo(Schema::Snapshots::TableId)) {
705+
lastSnapshotsUpdate = txc.DB.Head(Schema::Snapshots::TableId);
706+
}
705707

706708
NIceDb::TNiceDb db(txc.DB);
707709

@@ -737,10 +739,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
737739
if (FollowerState.LastSysUpdate < lastSysUpdate) {
738740
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
739741
"Updating sys metadata on follower, tabletId " << TabletID()
740-
<< " prevGen " << (FollowerState.LastSysUpdate >> 32)
741-
<< " prevStep " << (FollowerState.LastSysUpdate & (ui32)-1)
742-
<< " newGen " << (lastSysUpdate >> 32)
743-
<< " newStep " << (lastSysUpdate & (ui32)-1));
742+
<< " prev " << FollowerState.LastSysUpdate
743+
<< " current " << lastSysUpdate);
744744

745745
bool ready = true;
746746
ready &= SysGetUi64(db, Schema::Sys_PathOwnerId, PathOwnerId);
@@ -756,10 +756,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
756756
if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) {
757757
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
758758
"Updating tables metadata on follower, tabletId " << TabletID()
759-
<< " prevGen " << (FollowerState.LastSchemeUpdate >> 32)
760-
<< " prevStep " << (FollowerState.LastSchemeUpdate & (ui32)-1)
761-
<< " newGen " << (lastSchemeUpdate >> 32)
762-
<< " newStep " << (lastSchemeUpdate & (ui32)-1));
759+
<< " prev " << FollowerState.LastSchemeUpdate
760+
<< " current " << lastSchemeUpdate);
763761

764762
struct TRow {
765763
TPathId TableId;
@@ -829,10 +827,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
829827
if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) {
830828
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
831829
"Updating snapshots metadata on follower, tabletId " << TabletID()
832-
<< " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32)
833-
<< " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1)
834-
<< " newGen " << (lastSnapshotsUpdate >> 32)
835-
<< " newStep " << (lastSnapshotsUpdate & (ui32)-1));
830+
<< " prev " << FollowerState.LastSnapshotsUpdate
831+
<< " current " << lastSnapshotsUpdate);
836832

837833
NIceDb::TNiceDb db(txc.DB);
838834
if (!SnapshotManager.ReloadSnapshots(db)) {

ydb/core/tx/datashard/datashard__stats.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,7 @@ class TDataShard::TTxInitiateStatsUpdate : public NTabletFlatExecutor::TTransact
441441
void CheckIdleMemCompaction(const TUserTable& table, TTransactionContext& txc, const TActorContext& ctx) {
442442
// Note: we only care about changes in the main table
443443
auto lastTableChange = txc.DB.Head(table.LocalTid);
444-
if (table.LastTableChange.Serial != lastTableChange.Serial ||
445-
table.LastTableChange.Epoch != lastTableChange.Epoch)
446-
{
444+
if (table.LastTableChange != lastTableChange) {
447445
table.LastTableChange = lastTableChange;
448446
table.LastTableChangeTimestamp = ctx.Monotonic();
449447
return;

ydb/core/tx/datashard/datashard_impl.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -2390,9 +2390,9 @@ class TDataShard
23902390

23912391
// For follower only
23922392
struct TFollowerState {
2393-
ui64 LastSysUpdate = 0;
2394-
ui64 LastSchemeUpdate = 0;
2395-
ui64 LastSnapshotsUpdate = 0;
2393+
NTable::TDatabase::TChangeCounter LastSysUpdate;
2394+
NTable::TDatabase::TChangeCounter LastSchemeUpdate;
2395+
NTable::TDatabase::TChangeCounter LastSnapshotsUpdate;
23962396
};
23972397

23982398
//

ydb/core/tx/datashard/datashard_user_table.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ struct TUserTable : public TThrRefBase {
401401
mutable TStats Stats;
402402
mutable bool StatsUpdateInProgress = false;
403403
mutable bool StatsNeedUpdate = true;
404-
mutable NTable::TDatabase::TChg LastTableChange{ 0, NTable::TEpoch::Zero() };
404+
mutable NTable::TDatabase::TChangeCounter LastTableChange;
405405
mutable TMonotonic LastTableChangeTimestamp;
406406

407407
ui32 SpecialColTablet = Max<ui32>();

ydb/core/tx/datashard/datashard_ut_followers.cpp

+96
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include "datashard_ut_common_kqp.h"
33
#include "datashard_ut_read_table.h"
44

5+
#include <ydb/library/actors/core/mon.h>
6+
57
namespace NKikimr {
68

79
using namespace NKikimr::NDataShard;
@@ -162,6 +164,100 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) {
162164
}
163165
}
164166

167+
Y_UNIT_TEST(FollowerRebootAfterSysCompaction) {
168+
TPortManager pm;
169+
TServerSettings serverSettings(pm.GetPort(2134));
170+
serverSettings.SetDomainName("Root")
171+
.SetUseRealThreads(false)
172+
.SetEnableForceFollowers(true);
173+
174+
Tests::TServer::TPtr server = new TServer(serverSettings);
175+
auto &runtime = *server->GetRuntime();
176+
auto sender = runtime.AllocateEdgeActor();
177+
178+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
179+
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
180+
181+
InitRoot(server, sender);
182+
183+
CreateShardedTable(server, sender, "/Root", "table-1",
184+
TShardedTableOptions()
185+
.Shards(2)
186+
.Followers(1));
187+
188+
const auto shards = GetTableShards(server, sender, "/Root/table-1");
189+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
190+
191+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);");
192+
193+
// Wait for leader to promote the follower read edge (and stop writing to the Sys table)
194+
Cerr << "... sleeping" << Endl;
195+
runtime.SimulateSleep(TDuration::Seconds(1));
196+
197+
UNIT_ASSERT_VALUES_EQUAL(
198+
KqpSimpleStaleRoExec(runtime,
199+
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
200+
"/Root"),
201+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
202+
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
203+
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");
204+
205+
// Now we ask the leader to compact the Sys table
206+
{
207+
NActorsProto::TRemoteHttpInfo pb;
208+
pb.SetMethod(HTTP_METHOD_GET);
209+
pb.SetPath("/executorInternals");
210+
auto* p1 = pb.AddQueryParams();
211+
p1->SetKey("force_compaction");
212+
p1->SetValue("1");
213+
SendViaPipeCache(runtime, shards.at(0), sender,
214+
std::make_unique<NMon::TEvRemoteHttpInfo>(std::move(pb)));
215+
auto ev = runtime.GrabEdgeEventRethrow<NMon::TEvRemoteHttpInfoRes>(sender);
216+
UNIT_ASSERT_C(
217+
ev->Get()->Html.Contains("Table will be compacted in the near future"),
218+
ev->Get()->Html);
219+
}
220+
221+
// Allow table to finish compaction
222+
Cerr << "... sleeping" << Endl;
223+
runtime.SimulateSleep(TDuration::Seconds(1));
224+
225+
// Reboot follower
226+
Cerr << "... killing follower" << Endl;
227+
SendViaPipeCache(runtime, shards.at(0), sender,
228+
std::make_unique<TEvents::TEvPoison>(),
229+
{ .Follower = true });
230+
231+
// Allow it to boot properly
232+
Cerr << "... sleeping" << Endl;
233+
runtime.SimulateSleep(TDuration::Seconds(1));
234+
235+
// Read from follower must succeed
236+
Cerr << "... checking" << Endl;
237+
UNIT_ASSERT_VALUES_EQUAL(
238+
KqpSimpleStaleRoExec(runtime,
239+
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
240+
"/Root"),
241+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
242+
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
243+
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");
244+
245+
// Update row values and sleep
246+
Cerr << "... updating rows" << Endl;
247+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);");
248+
runtime.SimulateSleep(TDuration::Seconds(1));
249+
250+
// Read from follower must see updated values
251+
Cerr << "... checking" << Endl;
252+
UNIT_ASSERT_VALUES_EQUAL(
253+
KqpSimpleStaleRoExec(runtime,
254+
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
255+
"/Root"),
256+
"{ items { uint32_value: 1 } items { uint32_value: 44 } }, "
257+
"{ items { uint32_value: 2 } items { uint32_value: 55 } }, "
258+
"{ items { uint32_value: 3 } items { uint32_value: 66 } }");
259+
}
260+
165261
} // Y_UNIT_TEST_SUITE(DataShardFollowers)
166262

167263
} // namespace NKikimr

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "datashard_ut_common.h"
22

33
#include <ydb/core/base/tablet.h>
4+
#include <ydb/core/base/tablet_pipecache.h>
45
#include <ydb/core/base/tablet_resolver.h>
56
#include <ydb/core/scheme/scheme_types_defs.h>
67
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -2274,4 +2275,22 @@ TString ReadShardedTable(
22742275
return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result;
22752276
}
22762277

2278+
void SendViaPipeCache(
2279+
TTestActorRuntime& runtime,
2280+
ui64 tabletId, const TActorId& sender,
2281+
std::unique_ptr<IEventBase> msg,
2282+
const TSendViaPipeCacheOptions& options)
2283+
{
2284+
ui32 nodeIndex = sender.NodeId() - runtime.GetNodeId(0);
2285+
runtime.Send(
2286+
new IEventHandle(
2287+
MakePipePeNodeCacheID(options.Follower),
2288+
sender,
2289+
new TEvPipeCache::TEvForward(msg.release(), tabletId, options.Subscribe),
2290+
options.Flags,
2291+
options.Cookie),
2292+
nodeIndex,
2293+
/* viaActorSystem */ true);
2294+
}
2295+
22772296
}

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

+13
Original file line numberDiff line numberDiff line change
@@ -798,4 +798,17 @@ void WaitFor(TTestActorRuntime& runtime, TCondition&& condition, const TString&
798798
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
799799
}
800800

801+
struct TSendViaPipeCacheOptions {
802+
ui32 Flags = 0;
803+
ui64 Cookie = 0;
804+
bool Follower = false;
805+
bool Subscribe = false;
806+
};
807+
808+
void SendViaPipeCache(
809+
TTestActorRuntime& runtime,
810+
ui64 tabletId, const TActorId& sender,
811+
std::unique_ptr<IEventBase> msg,
812+
const TSendViaPipeCacheOptions& options = {});
813+
801814
}

0 commit comments

Comments
 (0)