Skip to content

Commit df8664b

Browse files
tiering usage validations and control incorrect state usage (#2585)
1 parent 661e989 commit df8664b

16 files changed

+98
-58
lines changed

ydb/core/tx/columnshard/blobs_action/abstract/storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class IBlobsStorageOperator {
7777
return DoLoad(dbBlobs);
7878
}
7979
void OnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
80+
AFL_VERIFY(tiers);
8081
return DoOnTieringModified(tiers);
8182
}
8283

ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ std::shared_ptr<NKikimr::NOlap::IBlobsStorageOperator> IStoragesManager::GetOper
3333
}
3434

3535
void IStoragesManager::OnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
36+
AFL_VERIFY(tiers);
3637
for (auto&& i : tiers->GetManagers()) {
3738
GetOperatorGuarantee(i.first)->OnTieringModified(tiers);
3839
}

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
3939
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");
4040

4141
for (auto&& i : TablesManager.GetTables()) {
42-
ActivateTiering(i.first, i.second.GetTieringUsage(), true);
42+
ActivateTiering(i.first, i.second.GetTieringUsage());
4343
}
44-
OnTieringModified();
4544

4645
Become(&TThis::StateWork);
4746
SignalTabletActive(ctx);

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
381381
}
382382

383383
void TTxInitSchema::Complete(const TActorContext& ctx) {
384-
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID();)
384+
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
385385
Self->Execute(new TTxUpdateSchema(Self), ctx);
386386
}
387387

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
109109
if (!Self->SetupTtl(pathTtls)) {
110110
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL not started");
111111
}
112-
Self->TablesManager.MutablePrimaryIndex().OnTieringModified(Self->Tiers, Self->TablesManager.GetTtl());
112+
Self->TablesManager.MutablePrimaryIndex().OnTieringModified(Self->Tiers, Self->TablesManager.GetTtl(), {});
113113

114114
return TTxController::TProposeResult();
115115
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -401,21 +401,28 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
401401
*tableVerProto.MutableSchema() = tableProto.GetSchema();
402402
}
403403

404-
TTableInfo table(pathId);
405-
if (tableProto.HasTtlSettings()) {
406-
const auto& ttlSettings = tableProto.GetTtlSettings();
407-
*tableVerProto.MutableTtlSettings() = ttlSettings;
408-
if (ttlSettings.HasUseTiering()) {
409-
table.SetTieringUsage(ttlSettings.GetUseTiering());
410-
ActivateTiering(pathId, table.GetTieringUsage());
404+
{
405+
bool needTieringActivation = false;
406+
TTableInfo table(pathId);
407+
if (tableProto.HasTtlSettings()) {
408+
const auto& ttlSettings = tableProto.GetTtlSettings();
409+
*tableVerProto.MutableTtlSettings() = ttlSettings;
410+
if (ttlSettings.HasUseTiering()) {
411+
table.SetTieringUsage(ttlSettings.GetUseTiering());
412+
needTieringActivation = true;
413+
}
414+
}
415+
const TString tieringName = table.GetTieringUsage();
416+
TablesManager.RegisterTable(std::move(table), db);
417+
if (needTieringActivation) {
418+
ActivateTiering(pathId, tieringName);
411419
}
412420
}
413421

414422
tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
415423
tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj());
416424

417-
TablesManager.RegisterTable(std::move(table), db);
418-
TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
425+
TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);
419426

420427
SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
421428
SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size());
@@ -452,7 +459,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
452459
Schema::SaveTableInfo(db, pathId, tieringUsage);
453460

454461
tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
455-
TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
462+
TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);
456463
}
457464

458465
void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version,
@@ -1026,18 +1033,17 @@ void TColumnShard::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr&
10261033
Tiers->TakeConfigs(ev->Get()->GetSnapshot(), nullptr);
10271034
}
10281035

1029-
void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit) {
1030-
Y_ABORT_UNLESS(!!Tiers);
1031-
if (!!Tiers) {
1032-
if (useTiering) {
1033-
Tiers->EnablePathId(pathId, useTiering);
1034-
} else {
1035-
Tiers->DisablePathId(pathId);
1036-
}
1036+
void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering) {
1037+
AFL_VERIFY(Tiers);
1038+
if (useTiering) {
1039+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tiering", useTiering);
10371040
}
1038-
if (!onTabletInit) {
1039-
OnTieringModified();
1041+
if (useTiering) {
1042+
Tiers->EnablePathId(pathId, useTiering);
1043+
} else {
1044+
Tiers->DisablePathId(pathId);
10401045
}
1046+
OnTieringModified(pathId);
10411047
}
10421048

10431049
void TColumnShard::Enqueue(STFUNC_SIG) {
@@ -1050,11 +1056,13 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
10501056
}
10511057
}
10521058

1053-
void TColumnShard::OnTieringModified() {
1059+
void TColumnShard::OnTieringModified(const std::optional<ui64> pathId) {
10541060
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified");
1055-
StoragesManager->OnTieringModified(Tiers);
1056-
if (TablesManager.HasPrimaryIndex()) {
1057-
TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl());
1061+
if (Tiers->IsReady()) {
1062+
StoragesManager->OnTieringModified(Tiers);
1063+
if (TablesManager.HasPrimaryIndex()) {
1064+
TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl(), pathId);
1065+
}
10581066
}
10591067
}
10601068

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ class TColumnShard
255255
TabletCounters->Cumulative()[counter].Increment(num);
256256
}
257257

258-
void ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit = false);
259-
void OnTieringModified();
258+
void ActivateTiering(const ui64 pathId, const TString& useTiering);
259+
void OnTieringModified(const std::optional<ui64> pathId = {});
260260
public:
261261
enum class EOverloadStatus {
262262
ShardTxInFly /* "shard_tx" */,

ydb/core/tx/columnshard/engines/column_engine.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
namespace NKikimr::NOlap {
66

7+
const std::shared_ptr<arrow::Schema>& IColumnEngine::GetReplaceKey() const {
8+
return GetVersionedIndex().GetLastSchema()->GetIndexInfo().GetReplaceKey();
9+
}
710
}
811

912
template <>

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,8 @@ class IColumnEngine {
361361
virtual ~IColumnEngine() = default;
362362

363363
virtual const TVersionedIndex& GetVersionedIndex() const = 0;
364-
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return GetVersionedIndex().GetLastSchema()->GetIndexInfo().GetReplaceKey(); }
364+
virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const = 0;
365+
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;
365366

366367
virtual bool HasDataInPathId(const ui64 pathId) const = 0;
367368
virtual bool Load(IDbWrapper& db) = 0;
@@ -383,7 +384,7 @@ class IColumnEngine {
383384
virtual const TColumnEngineStats& GetTotalStats() = 0;
384385
virtual ui64 MemoryUsage() const { return 0; }
385386
virtual TSnapshot LastUpdate() const { return TSnapshot::Zero(); }
386-
virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) = 0;
387+
virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) = 0;
387388
};
388389

389390
}

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -573,17 +573,44 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
573573
return out;
574574
}
575575

576-
void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) {
577-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified");
578-
std::optional<THashMap<ui64, TTiering>> tierings;
576+
void TColumnEngineForLogs::OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) {
577+
if (!TiersInitialized) {
578+
for (auto&& i : Tables) {
579+
i.second->StartActualizationIndex();
580+
}
581+
}
582+
583+
TiersInitialized = true;
584+
AFL_VERIFY(manager);
585+
THashMap<ui64, TTiering> tierings;
579586
if (manager) {
580587
tierings = manager->GetTiering();
581588
}
589+
ttl.AddTtls(tierings);
590+
582591
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")
583-
("new_count_tierings", tierings ? ::ToString(tierings->size()) : TString("undefined"))
592+
("new_count_tierings", tierings.size())
584593
("new_count_ttls", ttl.PathsCount());
585594
EvictionsController.RefreshTierings(std::move(tierings), ttl);
586-
595+
if (pathId) {
596+
auto itGranule = Tables.find(*pathId);
597+
AFL_VERIFY(itGranule != Tables.end());
598+
auto it = tierings.find(*pathId);
599+
if (it == tierings.end()) {
600+
itGranule->second->RefreshTiering({});
601+
} else {
602+
itGranule->second->RefreshTiering(it->second);
603+
}
604+
} else {
605+
for (auto&& g : Tables) {
606+
auto it = tierings.find(g.first);
607+
if (it == tierings.end()) {
608+
g.second->RefreshTiering({});
609+
} else {
610+
g.second->RefreshTiering(it->second);
611+
}
612+
}
613+
}
587614
}
588615

589616
void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) {
@@ -606,9 +633,7 @@ void TEvictionsController::RefreshTierings(std::optional<THashMap<ui64, TTiering
606633
OriginalTierings = std::move(*tierings);
607634
}
608635
auto copy = OriginalTierings;
609-
if (!ttl.AddTtls(copy)) {
610-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Broken ttl");
611-
}
636+
ttl.AddTtls(copy);
612637
NextCheckInstantForTierings = BuildNextInstantCheckers(std::move(copy));
613638
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RefreshTierings")("count", NextCheckInstantForTierings.size());
614639
}

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TColumnEngineForLogs : public IColumnEngine {
8585
friend class TCleanupColumnEngineChanges;
8686
friend class NDataSharing::TDestinationSession;
8787
private:
88+
bool TiersInitialized = false;
8889
const NColumnShard::TEngineLogsCounters SignalCounters;
8990
std::shared_ptr<TGranulesStorage> GranulesStorage;
9091
std::shared_ptr<IStoragesManager> StoragesManager;
@@ -148,7 +149,11 @@ class TColumnEngineForLogs : public IColumnEngine {
148149

149150
TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits, const std::shared_ptr<IStoragesManager>& storagesManager);
150151

151-
virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) override;
152+
virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) override;
153+
154+
virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const override {
155+
return std::make_shared<TVersionedIndex>(VersionedIndex);
156+
}
152157

153158
const TVersionedIndex& GetVersionedIndex() const override {
154159
return VersionedIndex;

ydb/core/tx/columnshard/engines/storage/granule.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ class TGranuleMeta: TNonCopyable {
177177
void OnAdditiveSummaryChange() const;
178178
YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero());
179179
public:
180+
void RefreshTiering(const std::optional<TTiering>& /*tiering*/) {
181+
}
182+
183+
void StartActualizationIndex() {
184+
}
185+
180186
NJson::TJsonValue OptimizerSerializeToJson() const {
181187
return OptimizerPlanner->SerializeToJsonVisual();
182188
}

ydb/core/tx/columnshard/tables_manager.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,6 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio
214214
table.SetDropVersion(version);
215215
PathsToDrop.insert(pathId);
216216
Ttl.DropPathTtl(pathId);
217-
if (PrimaryIndex) {
218-
PrimaryIndex->OnTieringModified(nullptr, Ttl);
219-
}
220217
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
221218
}
222219

@@ -269,7 +266,7 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho
269266
}
270267
}
271268

272-
void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) {
269+
void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
273270
auto it = Tables.find(pathId);
274271
AFL_VERIFY(it != Tables.end());
275272
auto& table = it->second;
@@ -296,7 +293,7 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
296293
Ttl.DropPathTtl(pathId);
297294
}
298295
if (PrimaryIndex) {
299-
PrimaryIndex->OnTieringModified(nullptr, Ttl);
296+
PrimaryIndex->OnTieringModified(manager, Ttl, pathId);
300297
}
301298
}
302299
Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
@@ -318,7 +315,6 @@ void TTablesManager::IndexSchemaVersion(const NOlap::TSnapshot& snapshot, const
318315
PrimaryIndex->RegisterTable(i.first);
319316
}
320317
}
321-
PrimaryIndex->OnTieringModified(nullptr, Ttl);
322318
}
323319

324320
TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId)

ydb/core/tx/columnshard/tables_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ class TTablesManager {
232232
bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);
233233

234234
void AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
235-
void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
235+
void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager);
236236
bool FillMonitoringReport(NTabletFlatExecutor::TTransactionContext& txc, NJson::TJsonValue& json);
237237
private:
238238
void IndexSchemaVersion(const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema);

ydb/core/tx/tiering/manager.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,15 +194,14 @@ NMetadata::NFetcher::ISnapshotsFetcher::TPtr TTiersManager::GetExternalDataManip
194194

195195
THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
196196
THashMap<ui64, NKikimr::NOlap::TTiering> result;
197-
if (!IsReady()) {
198-
return result;
199-
}
197+
AFL_VERIFY(IsReady());
200198
auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(Snapshot);
201199
Y_ABORT_UNLESS(snapshotPtr);
202200
auto& tierConfigs = snapshotPtr->GetTierConfigs();
203201
for (auto&& i : PathIdTiering) {
204202
auto* tiering = snapshotPtr->GetTieringById(i.second);
205203
if (tiering) {
204+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("path_id", i.first)("tiering_name", i.second)("event", "activation");
206205
result.emplace(i.first, tiering->BuildOlapTiers());
207206
for (auto& [pathId, pathTiering] : result) {
208207
for (auto& [name, tier] : pathTiering.GetTierByName()) {
@@ -213,6 +212,8 @@ THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
213212
}
214213
}
215214
}
215+
} else {
216+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("path_id", i.first)("tiering_name", i.second)("event", "not_found");
216217
}
217218
}
218219
return result;

ydb/core/tx/tiering/manager.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,12 @@ class TTiersManager: public ITiersManager {
8585
TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr);
8686
TTiersManager& Stop(const bool needStopActor);
8787
virtual const std::map<TString, NTiers::TManager>& GetManagers() const override {
88+
AFL_VERIFY(IsReady());
8889
return Managers;
8990
}
9091
virtual const NTiers::TManager* GetManagerOptional(const TString& tierId) const override;
9192
NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetExternalDataManipulation() const;
9293

93-
TManagers::const_iterator begin() const {
94-
return Managers.begin();
95-
}
96-
97-
TManagers::const_iterator end() const {
98-
return Managers.end();
99-
}
10094
};
10195

10296
}

0 commit comments

Comments
 (0)