Skip to content

Commit 7a4c215

Browse files
Split ttl tasks by rw address of storages read/write (#2644)
1 parent 6173ab9 commit 7a4c215

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1255
-482
lines changed

ydb/core/tx/columnshard/background_controller.cpp

-5
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,6 @@
33

44
namespace NKikimr::NColumnShard {
55

6-
void TBackgroundController::StartTtl() {
7-
Y_ABORT_UNLESS(!TtlStarted);
8-
TtlStarted = true;
9-
}
10-
116
bool TBackgroundController::StartCompaction(const NOlap::TPlanCompactionInfo& info) {
127
Y_ABORT_UNLESS(ActiveCompactionInfo.emplace(info.GetPathId(), info).second);
138
return true;

ydb/core/tx/columnshard/background_controller.h

-10
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ class TBackgroundController {
5252
TCurrentCompaction ActiveCompactionInfo;
5353

5454
bool ActiveCleanup = false;
55-
bool TtlStarted = false;
5655
YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero());
5756
public:
5857
THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const;
@@ -90,15 +89,6 @@ class TBackgroundController {
9089
bool IsCleanupActive() const {
9190
return ActiveCleanup;
9291
}
93-
94-
void StartTtl();
95-
void FinishTtl() {
96-
Y_ABORT_UNLESS(TtlStarted);
97-
TtlStarted = false;
98-
}
99-
bool IsTtlActive() const {
100-
return TtlStarted;
101-
}
10292
};
10393

10494
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
1313
Y_ABORT_UNLESS(Self->TablesManager.HasPrimaryIndex());
1414
txc.DB.NoMoreReadsForTx();
1515

16-
ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", *changes);
16+
ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", changes->DebugString());
1717
if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
1818
NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
19-
Y_ABORT_UNLESS(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot);
19+
Y_ABORT_UNLESS(Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot() <= snapshot);
2020

2121
TBlobGroupSelector dsGroupSelector(Self->Info());
2222
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
@@ -51,7 +51,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
5151
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID()));
5252
CompleteReady = true;
5353
auto changes = Ev->Get()->IndexChanges;
54-
ACFL_DEBUG("event", "TTxWriteIndex::Complete")("change_type", changes->TypeString())("details", *changes);
54+
ACFL_DEBUG("event", "TTxWriteIndex::Complete")("change_type", changes->TypeString())("details", changes->DebugString());
5555

5656
const ui64 blobsWritten = changes->GetBlobsAction().GetWritingBlobsCount();
5757
const ui64 bytesWritten = changes->GetBlobsAction().GetWritingTotalSize();

ydb/core/tx/columnshard/columnshard__stats_scan.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayB
8383
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
8484
NArrow::Append<arrow::UInt64Type>(*builders[10], r->BlobRange.Offset);
8585
NArrow::Append<arrow::UInt64Type>(*builders[11], r->BlobRange.Size);
86-
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
86+
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.IsRemovedFor(ReadMetadata->GetRequestSnapshot()));
8787

8888
const auto tierName = portionSchema->GetIndexInfo().GetEntityStorageId(r->GetColumnId(), portion.GetMeta().GetTierName());
8989
std::string strTierName(tierName.data(), tierName.size());
@@ -114,7 +114,7 @@ void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayB
114114
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
115115
NArrow::Append<arrow::UInt64Type>(*builders[10], r->GetBlobRange().Offset);
116116
NArrow::Append<arrow::UInt64Type>(*builders[11], r->GetBlobRange().Size);
117-
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
117+
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.IsRemovedFor(ReadMetadata->GetRequestSnapshot()));
118118
const auto tierName = portionSchema->GetIndexInfo().GetEntityStorageId(r->GetIndexId(), portion.GetMeta().GetTierName());
119119
std::string strTierName(tierName.data(), tierName.size());
120120
NArrow::Append<arrow::StringType>(*builders[13], strTierName);

ydb/core/tx/columnshard/columnshard_impl.cpp

+27-29
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class TChangesTask: public NConveyor::ITask {
554554
virtual bool DoExecute() override {
555555
NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId));
556556
{
557-
NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters);
557+
NOlap::TConstructionContext context(*TxEvent->IndexInfo, Counters);
558558
Y_ABORT_UNLESS(TxEvent->IndexChanges->ConstructBlobs(context).Ok());
559559
if (!TxEvent->IndexChanges->GetWritePortionsCount()) {
560560
TxEvent->SetPutStatus(NKikimrProto::OK);
@@ -601,7 +601,8 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
601601
}
602602
virtual bool DoOnError(const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
603603
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)("status", status.GetErrorMessage())("status_code", status.GetStatus());
604-
AFL_VERIFY(false)("blob_id", range)("status", status.GetStatus());
604+
AFL_VERIFY(false)("blob_id", range)("status", status.GetStatus())("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier())
605+
("debug", TxEvent->IndexChanges->DebugString());
605606
TxEvent->SetPutStatus(NKikimrProto::ERROR);
606607
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
607608
return false;
@@ -631,9 +632,9 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
631632
auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
632633
Y_ABORT_UNLESS(indexChanges);
633634

634-
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
635+
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
635636
indexChanges->Start(*this);
636-
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing);
637+
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing);
637638

638639
const TString externalTaskId = indexChanges->GetTaskIdentifier();
639640
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)
@@ -712,8 +713,8 @@ void TColumnShard::SetupCompaction() {
712713

713714
indexChanges->Start(*this);
714715

715-
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
716-
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction);
716+
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
717+
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterCompaction);
717718
const TString externalTaskId = indexChanges->GetTaskIdentifier();
718719
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);
719720

@@ -731,37 +732,34 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
731732
return false;
732733
}
733734
CSCounters.OnSetupTtl();
734-
if (BackgroundController.IsTtlActive()) {
735-
ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress");
736-
return false;
737-
}
738735
THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
739736
for (auto&& i : eviction) {
740737
ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString());
741738
}
742739

743-
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
740+
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
744741
const ui64 memoryUsageLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetTieringsMemoryLimit() : ((ui64)512 * 1024 * 1024);
745-
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);
742+
std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);
746743

747-
if (!indexChanges) {
744+
if (indexChanges.empty()) {
748745
ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
749746
return false;
750747
}
751-
const TString externalTaskId = indexChanges->GetTaskIdentifier();
752-
const bool needWrites = indexChanges->NeedConstruction();
753-
ACFL_DEBUG("background", "ttl")("need_writes", needWrites);
754-
755-
indexChanges->Start(*this);
756-
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
757-
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(TabletID(), indexChanges->TypeString());
758-
if (needWrites) {
759-
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
760-
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
761-
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
762-
} else {
763-
ev->SetPutStatus(NKikimrProto::OK);
764-
ActorContext().Send(SelfId(), std::move(ev));
748+
for (auto&& i : indexChanges) {
749+
const TString externalTaskId = i->GetTaskIdentifier();
750+
const bool needWrites = i->NeedConstruction();
751+
ACFL_DEBUG("background", "ttl")("need_writes", needWrites);
752+
i->Start(*this);
753+
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, i, false);
754+
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(TabletID(), i->TypeString());
755+
if (needWrites) {
756+
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
757+
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
758+
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
759+
} else {
760+
ev->SetPutStatus(NKikimrProto::OK);
761+
ActorContext().Send(SelfId(), std::move(ev));
762+
}
765763
}
766764
return true;
767765
}
@@ -782,8 +780,8 @@ void TColumnShard::SetupCleanup() {
782780
}
783781

784782
ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString());
785-
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
786-
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
783+
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
784+
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false);
787785
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
788786

789787
changes->Start(*this);

ydb/core/tx/columnshard/columnshard_private_events.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,18 @@ struct TEvPrivate {
7575

7676
/// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction.
7777
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
78-
NOlap::TVersionedIndex IndexInfo;
78+
std::shared_ptr<NOlap::TVersionedIndex> IndexInfo;
7979
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
8080
bool GranuleCompaction{false};
8181
TUsage ResourceUsage;
8282
bool CacheData{false};
8383
TDuration Duration;
8484
TBlobPutResult::TPtr PutResult;
8585

86-
TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo,
86+
TEvWriteIndex(const std::shared_ptr<NOlap::TVersionedIndex>& indexInfo,
8787
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges,
8888
bool cacheData)
89-
: IndexInfo(std::move(indexInfo))
89+
: IndexInfo(indexInfo)
9090
, IndexChanges(indexChanges)
9191
, CacheData(cacheData)
9292
{

ydb/core/tx/columnshard/counters/engine_logs.cpp

+22
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,21 @@ TEngineLogsCounters::TEngineLogsCounters()
3838

3939
PortionToDropCount = TBase::GetDeriviative("Ttl/PortionToDrop/Count");
4040
PortionToDropBytes = TBase::GetDeriviative("Ttl/PortionToDrop/Bytes");
41+
PortionToDropLag = TBase::GetHistogram("Ttl/PortionToDrop/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
42+
SkipDeleteWithProcessMemory = TBase::GetHistogram("Ttl/PortionToDrop/Skip/ProcessMemory/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
43+
SkipDeleteWithTxLimit = TBase::GetHistogram("Ttl/PortionToDrop/Skip/TxLimit/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
4144

4245
PortionToEvictCount = TBase::GetDeriviative("Ttl/PortionToEvict/Count");
4346
PortionToEvictBytes = TBase::GetDeriviative("Ttl/PortionToEvict/Bytes");
47+
PortionToEvictLag = TBase::GetHistogram("Ttl/PortionToEvict/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
48+
SkipEvictionWithProcessMemory = TBase::GetHistogram("Ttl/PortionToEvict/Skip/ProcessMemory/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
49+
SkipEvictionWithTxLimit = TBase::GetHistogram("Ttl/PortionToEvict/Skip/TxLimit/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
50+
51+
ActualizationTaskSizeRemove = TBase::GetHistogram("Actualization/RemoveTasks/Size", NMonitoring::ExponentialHistogram(18, 2));
52+
ActualizationTaskSizeEvict = TBase::GetHistogram("Actualization/EvictTasks/Size", NMonitoring::ExponentialHistogram(18, 2));
53+
54+
ActualizationSkipRWProgressCount = TBase::GetDeriviative("Actualization/Skip/RWProgress/Count");
55+
ActualizationSkipTooFreshPortion = TBase::GetHistogram("Actualization//Skip/TooFresh/Duration", NMonitoring::LinearHistogram(12, 0, 360));
4456

4557
PortionNoTtlColumnCount = TBase::GetDeriviative("Ttl/PortionNoTtlColumn/Count");
4658
PortionNoTtlColumnBytes = TBase::GetDeriviative("Ttl/PortionNoTtlColumn/Bytes");
@@ -52,6 +64,16 @@ TEngineLogsCounters::TEngineLogsCounters()
5264
ChunkUsageForTTLCount = TBase::GetDeriviative("Ttl/ChunkUsageForTTLCount/Count");
5365
}
5466

67+
void TEngineLogsCounters::OnActualizationTask(const ui32 evictCount, const ui32 removeCount) const {
68+
AFL_VERIFY(evictCount * removeCount == 0)("evict", evictCount)("remove", removeCount);
69+
AFL_VERIFY(evictCount + removeCount);
70+
if (evictCount) {
71+
ActualizationTaskSizeEvict->Collect(evictCount);
72+
} else {
73+
ActualizationTaskSizeRemove->Collect(removeCount);
74+
}
75+
}
76+
5577
void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
5678
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
5779
Y_ABORT_UNLESS(producedId < BlobGuards.size());

ydb/core/tx/columnshard/counters/engine_logs.h

+43-2
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,21 @@ class TEngineLogsCounters: public TCommonCountersOwner {
236236
using TBase = TCommonCountersOwner;
237237
NMonitoring::TDynamicCounters::TCounterPtr PortionToDropCount;
238238
NMonitoring::TDynamicCounters::TCounterPtr PortionToDropBytes;
239+
NMonitoring::THistogramPtr PortionToDropLag;
240+
NMonitoring::THistogramPtr SkipDeleteWithProcessMemory;
241+
NMonitoring::THistogramPtr SkipDeleteWithTxLimit;
239242

240243
NMonitoring::TDynamicCounters::TCounterPtr PortionToEvictCount;
241244
NMonitoring::TDynamicCounters::TCounterPtr PortionToEvictBytes;
245+
NMonitoring::THistogramPtr PortionToEvictLag;
246+
NMonitoring::THistogramPtr SkipEvictionWithProcessMemory;
247+
NMonitoring::THistogramPtr SkipEvictionWithTxLimit;
248+
249+
NMonitoring::THistogramPtr ActualizationTaskSizeRemove;
250+
NMonitoring::THistogramPtr ActualizationTaskSizeEvict;
251+
252+
NMonitoring::TDynamicCounters::TCounterPtr ActualizationSkipRWProgressCount;
253+
NMonitoring::THistogramPtr ActualizationSkipTooFreshPortion;
242254

243255
NMonitoring::TDynamicCounters::TCounterPtr PortionNoTtlColumnCount;
244256
NMonitoring::TDynamicCounters::TCounterPtr PortionNoTtlColumnBytes;
@@ -253,6 +265,7 @@ class TEngineLogsCounters: public TCommonCountersOwner {
253265
std::vector<std::shared_ptr<TIncrementalHistogram>> BlobSizeDistribution;
254266
std::vector<std::shared_ptr<TIncrementalHistogram>> PortionSizeDistribution;
255267
std::vector<std::shared_ptr<TIncrementalHistogram>> PortionRecordsDistribution;
268+
256269
public:
257270

258271
class TPortionsInfoGuard {
@@ -282,6 +295,8 @@ class TEngineLogsCounters: public TCommonCountersOwner {
282295

283296
};
284297

298+
void OnActualizationTask(const ui32 evictCount, const ui32 removeCount) const;
299+
285300
TPortionsInfoGuard BuildPortionBlobsGuard() const {
286301
return TPortionsInfoGuard(BlobSizeDistribution, PortionSizeDistribution, PortionRecordsDistribution);
287302
}
@@ -290,14 +305,40 @@ class TEngineLogsCounters: public TCommonCountersOwner {
290305
return GranuleDataAgent.RegisterClient();
291306
}
292307

293-
void OnPortionToEvict(const ui64 size) const {
308+
void OnActualizationSkipRWProgress() const {
309+
ActualizationSkipRWProgressCount->Add(1);
310+
}
311+
312+
void OnActualizationSkipTooFreshPortion(const TDuration dWait) const {
313+
ActualizationSkipTooFreshPortion->Collect(dWait.Seconds());
314+
}
315+
316+
void OnSkipDeleteWithProcessMemory(const TDuration lag) const {
317+
SkipDeleteWithProcessMemory->Collect(lag.Seconds());
318+
}
319+
320+
void OnSkipDeleteWithTxLimit(const TDuration lag) const {
321+
SkipDeleteWithTxLimit->Collect(lag.Seconds());
322+
}
323+
324+
void OnSkipEvictionWithProcessMemory(const TDuration lag) const {
325+
SkipEvictionWithProcessMemory->Collect(lag.Seconds());
326+
}
327+
328+
void OnSkipEvictionWithTxLimit(const TDuration lag) const {
329+
SkipEvictionWithTxLimit->Collect(lag.Seconds());
330+
}
331+
332+
void OnPortionToEvict(const ui64 size, const TDuration lag) const {
294333
PortionToEvictCount->Add(1);
295334
PortionToEvictBytes->Add(size);
335+
PortionToEvictLag->Collect(lag.Seconds());
296336
}
297337

298-
void OnPortionToDrop(const ui64 size) const {
338+
void OnPortionToDrop(const ui64 size, const TDuration lag) const {
299339
PortionToDropCount->Add(1);
300340
PortionToDropBytes->Add(size);
341+
PortionToDropLag->Collect(lag.Seconds());
301342
}
302343

303344
void OnPortionNoTtlColumn(const ui64 size) const {

0 commit comments

Comments
 (0)