Skip to content

Commit f319dde

Browse files
make local internal CS persistent snapshot pinger (#7431)
1 parent 899ccd7 commit f319dde

24 files changed

+430
-144
lines changed

ydb/core/protos/config.proto

-1
Original file line numberDiff line numberDiff line change
@@ -1516,7 +1516,6 @@ message TColumnShardConfig {
15161516

15171517
optional TIndexMetadataMemoryLimit IndexMetadataMemoryLimit = 12;
15181518
optional bool CleanupEnabled = 13 [default = true];
1519-
optional uint32 RemovedPortionLivetimeSeconds = 14 [default = 600];
15201519

15211520
message TRepairInfo {
15221521
optional string ClassName = 1;

ydb/core/tx/columnshard/columnshard.cpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
6161
EnqueueBackgroundActivities();
6262
BackgroundSessionsManager->Start();
6363
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
64+
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
6465
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
6566
AFL_VERIFY(!!StartInstant);
6667
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
@@ -161,7 +162,9 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
161162
if (HasIndex()) {
162163
index = &GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex();
163164
}
164-
InFlightReadsTracker.RemoveInFlightRequest(ev->Get()->RequestCookie, index);
165+
166+
InFlightReadsTracker.RemoveInFlightRequest(
167+
ev->Get()->RequestCookie, index, TInstant::Now());
165168

166169
ui64 txId = ev->Get()->TxId;
167170
if (ScanTxInFlight.contains(txId)) {
@@ -173,6 +176,14 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
173176
}
174177
}
175178

179+
void TColumnShard::Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& /*ev*/, const TActorContext& ctx) {
180+
if (auto writeTx = InFlightReadsTracker.Ping(
181+
this, NYDBTest::TControllers::GetColumnShardController()->GetPingCheckPeriod(0.6 * GetMaxReadStaleness()), TInstant::Now())) {
182+
Execute(writeTx.release(), ctx);
183+
}
184+
ctx.Schedule(0.3 * GetMaxReadStaleness(), new TEvPrivate::TEvPingSnapshotsUsage());
185+
}
186+
176187
void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
177188
if (ev->Get()->Manual) {
178189
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup::MANUAL")("tablet_id", TabletID());
@@ -182,6 +193,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
182193
SendWaitPlanStep(GetOutdatedStep());
183194

184195
SendPeriodicStats();
196+
EnqueueBackgroundActivities();
185197
ctx.Schedule(PeriodicWakeupActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
186198
}
187199
}

ydb/core/tx/columnshard/columnshard__init.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,14 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
225225
}
226226
Self->SharingSessionsManager = local;
227227
}
228+
{
229+
TMemoryProfileGuard g("TTxInit/TInFlightReadsTracker");
230+
TInFlightReadsTracker local(Self->StoragesManager, Self->Counters.GetRequestsTracingCounters());
231+
if (!local.LoadFromDatabase(txc.DB)) {
232+
return false;
233+
}
234+
Self->InFlightReadsTracker = std::move(local);
235+
}
228236

229237
Self->UpdateInsertTableCounters();
230238
Self->UpdateIndexCounters();

ydb/core/tx/columnshard/columnshard_impl.cpp

+19-10
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
7474
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
7575
EPercentileCounters_descriptor, ETxTypes_descriptor>())
7676
, Counters(*TabletCountersHolder)
77-
, InFlightReadsTracker(StoragesManager)
77+
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
7878
, TablesManager(StoragesManager, info->TabletID)
7979
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
8080
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
@@ -84,8 +84,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8484
, TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters())
8585
, BackgroundController(Counters.GetBackgroundControllerCounters())
8686
, NormalizerController(StoragesManager, Counters.GetSubscribeCounters())
87-
, SysLocks(this)
88-
, MaxReadStaleness(TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms())) {
87+
, SysLocks(this) {
8988
}
9089

9190
void TColumnShard::OnDetach(const TActorContext& ctx) {
@@ -186,12 +185,18 @@ ui64 TColumnShard::GetOutdatedStep() const {
186185
return step;
187186
}
188187

189-
ui64 TColumnShard::GetMinReadStep() const {
190-
const TDuration maxReadStaleness = NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(MaxReadStaleness);
191-
ui64 delayMillisec = maxReadStaleness.MilliSeconds();
188+
NOlap::TSnapshot TColumnShard::GetMinReadSnapshot() const {
189+
ui64 delayMillisec = GetMaxReadStaleness().MilliSeconds();
192190
ui64 passedStep = GetOutdatedStep();
193191
ui64 minReadStep = (passedStep > delayMillisec ? passedStep - delayMillisec : 0);
194-
return minReadStep;
192+
Counters.GetRequestsTracingCounters()->OnDefaultMinSnapshotInstant(TInstant::MilliSeconds(minReadStep));
193+
194+
if (auto ssClean = InFlightReadsTracker.GetSnapshotToClean()) {
195+
if (ssClean->GetPlanStep() < minReadStep) {
196+
return *ssClean;
197+
}
198+
}
199+
return NOlap::TSnapshot::MaxForPlanStep(minReadStep);
195200
}
196201

197202
TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const {
@@ -785,9 +790,8 @@ void TColumnShard::SetupCleanupPortions() {
785790
return;
786791
}
787792

788-
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
789-
790-
auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(cleanupSnapshot, TablesManager.GetPathsToDrop(), DataLocksManager);
793+
auto changes =
794+
TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
791795
if (!changes) {
792796
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
793797
return;
@@ -1134,4 +1138,9 @@ const NKikimr::NColumnShard::NTiers::TManager* TColumnShard::GetTierManagerPoint
11341138
return Tiers->GetManagerOptional(tierId);
11351139
}
11361140

1141+
TDuration TColumnShard::GetMaxReadStaleness() {
1142+
return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(
1143+
TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms()));
1144+
}
1145+
11371146
}

ydb/core/tx/columnshard/columnshard_impl.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ class TColumnShard
217217
void Handle(TEvPrivate::TEvScanStats::TPtr &ev, const TActorContext &ctx);
218218
void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx);
219219
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
220+
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);
221+
220222
void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx);
221223
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
222224
void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
@@ -361,6 +363,8 @@ class TColumnShard
361363
HFunc(TEvPrivate::TEvScanStats, Handle);
362364
HFunc(TEvPrivate::TEvReadFinished, Handle);
363365
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
366+
HFunc(TEvPrivate::TEvPingSnapshotsUsage, Handle);
367+
364368
HFunc(NEvents::TDataEvents::TEvWrite, Handle);
365369
HFunc(TEvPrivate::TEvWriteDraft, Handle);
366370
HFunc(TEvPrivate::TEvGarbageCollectionFinished, Handle);
@@ -465,7 +469,7 @@ class TColumnShard
465469
TLimits Limits;
466470
NOlap::TNormalizationController NormalizerController;
467471
NDataShard::TSysLocks SysLocks;
468-
const TDuration MaxReadStaleness;
472+
static TDuration GetMaxReadStaleness();
469473

470474
void TryRegisterMediatorTimeCast();
471475
void UnregisterMediatorTimeCast();
@@ -475,7 +479,7 @@ class TColumnShard
475479
void SendWaitPlanStep(ui64 step);
476480
void RescheduleWaitingReads();
477481
NOlap::TSnapshot GetMaxReadVersion() const;
478-
ui64 GetMinReadStep() const;
482+
NOlap::TSnapshot GetMinReadSnapshot() const;
479483
ui64 GetOutdatedStep() const;
480484
TDuration GetTxCompleteLag() const {
481485
ui64 mediatorTime = MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0;

ydb/core/tx/columnshard/columnshard_private_events.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct TEvPrivate {
4646
EvExportSaveCursor,
4747

4848
EvTaskProcessedResult,
49+
EvPingSnapshotsUsage,
4950

5051
EvEnd
5152
};
@@ -158,7 +159,11 @@ struct TEvPrivate {
158159
bool Manual;
159160
};
160161

161-
class TEvWriteBlobsResult : public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
162+
struct TEvPingSnapshotsUsage: public TEventLocal<TEvPingSnapshotsUsage, EvPingSnapshotsUsage> {
163+
TEvPingSnapshotsUsage() = default;
164+
};
165+
166+
class TEvWriteBlobsResult: public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
162167
private:
163168
NColumnShard::TBlobPutResult::TPtr PutResult;
164169
NOlap::TWritingBuffer WritesBuffer;

ydb/core/tx/columnshard/columnshard_schema.h

+12-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ struct Schema : NIceDb::Schema {
108108
TableVersionInfo = 11,
109109
SmallBlobs = 12,
110110
OneToOneEvictedBlobs = 13,
111-
BlobsToDeleteWT = 14
111+
BlobsToDeleteWT = 14,
112+
InFlightSnapshots = 15
112113
};
113114

114115
// Tablet tables
@@ -250,6 +251,14 @@ struct Schema : NIceDb::Schema {
250251
using TColumns = TableColumns<BlobId, TabletId>;
251252
};
252253

254+
struct InFlightSnapshots: Table<(ui32)ECommonTables::InFlightSnapshots> {
255+
struct PlanStep: Column<1, NScheme::NTypeIds::Uint64> {};
256+
struct TxId: Column<2, NScheme::NTypeIds::Uint64> {};
257+
258+
using TKey = TableKey<PlanStep, TxId>;
259+
using TColumns = TableColumns<PlanStep, TxId>;
260+
};
261+
253262
// Index tables
254263

255264
// InsertTable - common for all indices
@@ -545,7 +554,8 @@ struct Schema : NIceDb::Schema {
545554
BackgroundSessions,
546555
ShardingInfo,
547556
Normalizers,
548-
NormalizerEvents
557+
NormalizerEvents,
558+
InFlightSnapshots
549559
>;
550560

551561
//

ydb/core/tx/columnshard/common/snapshot.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,12 @@ TString TSnapshot::SerializeToString() const {
3535
return SerializeToProto().SerializeAsString();
3636
}
3737

38+
NKikimr::NOlap::TSnapshot TSnapshot::MaxForPlanStep(const ui64 planStep) noexcept {
39+
return TSnapshot(planStep, ::Max<ui64>());
40+
}
41+
42+
NKikimr::NOlap::TSnapshot TSnapshot::MaxForPlanInstant(const TInstant planInstant) noexcept {
43+
return TSnapshot(planInstant.MilliSeconds(), ::Max<ui64>());
44+
}
45+
3846
};

ydb/core/tx/columnshard/common/snapshot.h

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ class TSnapshot {
5454
return TSnapshot(-1ll, -1ll);
5555
}
5656

57+
static TSnapshot MaxForPlanInstant(const TInstant planInstant) noexcept;
58+
59+
static TSnapshot MaxForPlanStep(const ui64 planStep) noexcept;
60+
5761
constexpr bool operator==(const TSnapshot&) const noexcept = default;
5862

5963
constexpr auto operator<=>(const TSnapshot&) const noexcept = default;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "counters_manager.h"
2+
3+
namespace NKikimr::NColumnShard {
4+
5+
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/counters/counters_manager.h

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
#pragma once
22

3+
#include "background_controller.h"
4+
#include "column_tables.h"
35
#include "columnshard.h"
46
#include "indexation.h"
7+
#include "req_tracer.h"
58
#include "scan.h"
6-
#include "column_tables.h"
7-
#include "writes_monitor.h"
89
#include "tablet_counters.h"
9-
#include "background_controller.h"
10+
#include "writes_monitor.h"
1011

11-
#include <ydb/core/tx/columnshard/engines/column_engine.h>
12-
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
13-
#include <ydb/core/protos/table_stats.pb.h>
14-
#include <ydb/core/tablet/tablet_counters.h>
12+
#include <ydb/core/base/appdata_fwd.h>
1513
#include <ydb/core/protos/counters_columnshard.pb.h>
1614
#include <ydb/core/protos/counters_datashard.pb.h>
17-
#include <ydb/core/base/appdata_fwd.h>
15+
#include <ydb/core/protos/table_stats.pb.h>
16+
#include <ydb/core/tablet/tablet_counters.h>
17+
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
18+
#include <ydb/core/tx/columnshard/engines/column_engine.h>
19+
1820
#include <library/cpp/time_provider/time_provider.h>
1921

2022
namespace NKikimr::NColumnShard {
@@ -32,6 +34,7 @@ class TCountersManager {
3234
YDB_READONLY(TIndexationCounters, IndexationCounters, TIndexationCounters("Indexation"));
3335
YDB_READONLY(TIndexationCounters, CompactionCounters, TIndexationCounters("GeneralCompaction"));
3436
YDB_READONLY(TScanCounters, ScanCounters, TScanCounters("Scan"));
37+
YDB_READONLY_DEF(std::shared_ptr<TRequestsTracerCounters>, RequestsTracingCounters);
3538
YDB_READONLY_DEF(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>, SubscribeCounters);
3639

3740
public:
@@ -40,8 +43,9 @@ class TCountersManager {
4043
, WritesMonitor(std::make_shared<TWritesMonitor>(tabletCounters))
4144
, BackgroundControllerCounters(std::make_shared<TBackgroundControllerCounters>())
4245
, ColumnTablesCounters(std::make_shared<TColumnTablesCounters>())
46+
, RequestsTracingCounters(std::make_shared<TRequestsTracerCounters>())
4347
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>()) {
4448
}
4549
};
4650

47-
} // namespace NKikimr::NColumnShard
51+
} // namespace NKikimr::NColumnShard
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "req_tracer.h"
2+
3+
namespace NKikimr::NColumnShard {
4+
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
#include "common/owner.h"
3+
#include <ydb/core/tx/columnshard/common/snapshot.h>
4+
5+
namespace NKikimr::NColumnShard {
6+
7+
class TRequestsTracerCounters: public TCommonCountersOwner {
8+
private:
9+
using TBase = TCommonCountersOwner;
10+
NMonitoring::TDynamicCounters::TCounterPtr RequestedMinSnapshotAge;
11+
NMonitoring::TDynamicCounters::TCounterPtr DefaultMinSnapshotAge;
12+
NMonitoring::TDynamicCounters::TCounterPtr SnapshotsCount;
13+
NMonitoring::TDynamicCounters::TCounterPtr SnapshotLock;
14+
NMonitoring::TDynamicCounters::TCounterPtr SnapshotUnlock;
15+
16+
public:
17+
18+
TRequestsTracerCounters()
19+
: TBase("cs_requests_tracing")
20+
, RequestedMinSnapshotAge(TBase::GetValue("Snapshots/RequestedAge/Seconds"))
21+
, DefaultMinSnapshotAge(TBase::GetValue("Snapshots/DefaultAge/Seconds"))
22+
, SnapshotsCount(TBase::GetValue("Snapshots/Count"))
23+
, SnapshotLock(TBase::GetDeriviative("Snapshots/Lock"))
24+
, SnapshotUnlock(TBase::GetDeriviative("Snapshots/Unlock"))
25+
{
26+
27+
}
28+
29+
void OnDefaultMinSnapshotInstant(const TInstant instant) const {
30+
DefaultMinSnapshotAge->Set((TInstant::Now() - instant).Seconds());
31+
}
32+
33+
void OnSnapshotsInfo(const ui32 count, const std::optional<NOlap::TSnapshot> snapshotPlanStep) const {
34+
if (snapshotPlanStep) {
35+
RequestedMinSnapshotAge->Set((TInstant::Now() - snapshotPlanStep->GetPlanInstant()).Seconds());
36+
} else {
37+
RequestedMinSnapshotAge->Set(0);
38+
}
39+
SnapshotsCount->Set(count);
40+
41+
}
42+
43+
void OnSnapshotLocked() const {
44+
SnapshotLock->Add(1);
45+
}
46+
void OnSnapshotUnlocked() const {
47+
SnapshotUnlock->Add(1);
48+
}
49+
};
50+
51+
}

ydb/core/tx/columnshard/counters/ya.make

+7-5
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ LIBRARY()
22

33
SRCS(
44
background_controller.cpp
5-
column_tables.cpp
6-
indexation.cpp
7-
scan.cpp
8-
engine_logs.cpp
5+
counters_manager.cpp
96
blobs_manager.cpp
7+
column_tables.cpp
108
columnshard.cpp
11-
insert_table.cpp
129
common_data.cpp
10+
engine_logs.cpp
11+
indexation.cpp
12+
insert_table.cpp
13+
req_tracer.cpp
14+
scan.cpp
1315
splitter.cpp
1416
)
1517

0 commit comments

Comments
 (0)