Skip to content

tiering usage validations and control incorrect state usage #2585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1d13476
Update ya.make
ivanmorozov333 Mar 6, 2024
b5462e0
Merge pull request #1 from ivanmorozov333/ivanmorozov333-patch-1
ivanmorozov333 Mar 6, 2024
7482439
Merge branch 'ydb-platform:main' into main
ivanmorozov333 Mar 6, 2024
9b07ea4
Merge branch 'main' of https://github.com/ivanmorozov333/ydb
ivanmorozov333 Mar 6, 2024
416f1f2
Merge branch 'main' of https://github.com/ivanmorozov333/ydb
ivanmorozov333 Mar 7, 2024
83f51a4
Merge branch 'main' of https://github.com/ivanmorozov333/ydb
ivanmorozov333 Mar 7, 2024
dd61a53
tiering usage validations and control incorrect state usage
ivanmorozov333 Mar 10, 2024
921ec24
fix build
ivanmorozov333 Mar 10, 2024
760bebb
fix build
ivanmorozov333 Mar 10, 2024
df4f566
fix build
ivanmorozov333 Mar 10, 2024
45ddf24
fix build
ivanmorozov333 Mar 10, 2024
f41b344
fix build
ivanmorozov333 Mar 10, 2024
4ca9788
fix build
ivanmorozov333 Mar 10, 2024
cfd9cd3
fix build
ivanmorozov333 Mar 10, 2024
0f80f9d
fix build
ivanmorozov333 Mar 10, 2024
6646510
fix build
ivanmorozov333 Mar 10, 2024
ada342c
fix build
ivanmorozov333 Mar 11, 2024
01c544d
tiering usage validations and control incorrect state usage
ivanmorozov333 Mar 10, 2024
1a1eb75
fix build
ivanmorozov333 Mar 10, 2024
fbb6a4b
fix build
ivanmorozov333 Mar 10, 2024
f58939a
fix build
ivanmorozov333 Mar 10, 2024
655a69e
fix build
ivanmorozov333 Mar 10, 2024
2c05766
fix build
ivanmorozov333 Mar 10, 2024
b69a323
fix build
ivanmorozov333 Mar 10, 2024
4802e63
fix build
ivanmorozov333 Mar 10, 2024
8bc2494
fix build
ivanmorozov333 Mar 10, 2024
fe08f0f
fix build
ivanmorozov333 Mar 10, 2024
bdb7446
fix build
ivanmorozov333 Mar 11, 2024
a01dbf5
Merge branch 'tiering_usage_normalization' of https://github.com/ivan…
ivanmorozov333 Mar 11, 2024
b53fca2
fix build
ivanmorozov333 Mar 11, 2024
ba37e5a
fix
ivanmorozov333 Mar 11, 2024
6ac649a
fix build
ivanmorozov333 Mar 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class IBlobsStorageOperator {
return DoLoad(dbBlobs);
}
void OnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
AFL_VERIFY(tiers);
return DoOnTieringModified(tiers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ std::shared_ptr<NKikimr::NOlap::IBlobsStorageOperator> IStoragesManager::GetOper
}

void IStoragesManager::OnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
AFL_VERIFY(tiers);
for (auto&& i : tiers->GetManagers()) {
GetOperatorGuarantee(i.first)->OnTieringModified(tiers);
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");

for (auto&& i : TablesManager.GetTables()) {
ActivateTiering(i.first, i.second.GetTieringUsage(), true);
ActivateTiering(i.first, i.second.GetTieringUsage());
}
OnTieringModified();

Become(&TThis::StateWork);
SignalTabletActive(ctx);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxInitSchema::Complete(const TActorContext& ctx) {
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID();)
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
Self->Execute(new TTxUpdateSchema(Self), ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
if (!Self->SetupTtl(pathTtls)) {
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL not started");
}
Self->TablesManager.MutablePrimaryIndex().OnTieringModified(Self->Tiers, Self->TablesManager.GetTtl());
Self->TablesManager.MutablePrimaryIndex().OnTieringModified(Self->Tiers, Self->TablesManager.GetTtl(), {});

return TTxController::TProposeResult();
}
Expand Down
56 changes: 32 additions & 24 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,21 +401,28 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
*tableVerProto.MutableSchema() = tableProto.GetSchema();
}

TTableInfo table(pathId);
if (tableProto.HasTtlSettings()) {
const auto& ttlSettings = tableProto.GetTtlSettings();
*tableVerProto.MutableTtlSettings() = ttlSettings;
if (ttlSettings.HasUseTiering()) {
table.SetTieringUsage(ttlSettings.GetUseTiering());
ActivateTiering(pathId, table.GetTieringUsage());
{
bool needTieringActivation = false;
TTableInfo table(pathId);
if (tableProto.HasTtlSettings()) {
const auto& ttlSettings = tableProto.GetTtlSettings();
*tableVerProto.MutableTtlSettings() = ttlSettings;
if (ttlSettings.HasUseTiering()) {
table.SetTieringUsage(ttlSettings.GetUseTiering());
needTieringActivation = true;
}
}
const TString tieringName = table.GetTieringUsage();
TablesManager.RegisterTable(std::move(table), db);
if (needTieringActivation) {
ActivateTiering(pathId, tieringName);
}
}

tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj());

TablesManager.RegisterTable(std::move(table), db);
TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);

SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size());
Expand Down Expand Up @@ -452,7 +459,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
Schema::SaveTableInfo(db, pathId, tieringUsage);

tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);
}

void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version,
Expand Down Expand Up @@ -1026,18 +1033,17 @@ void TColumnShard::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr&
Tiers->TakeConfigs(ev->Get()->GetSnapshot(), nullptr);
}

void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit) {
Y_ABORT_UNLESS(!!Tiers);
if (!!Tiers) {
if (useTiering) {
Tiers->EnablePathId(pathId, useTiering);
} else {
Tiers->DisablePathId(pathId);
}
void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering) {
AFL_VERIFY(Tiers);
if (useTiering) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tiering", useTiering);
}
if (!onTabletInit) {
OnTieringModified();
if (useTiering) {
Tiers->EnablePathId(pathId, useTiering);
} else {
Tiers->DisablePathId(pathId);
}
OnTieringModified(pathId);
}

void TColumnShard::Enqueue(STFUNC_SIG) {
Expand All @@ -1050,11 +1056,13 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
}
}

void TColumnShard::OnTieringModified() {
void TColumnShard::OnTieringModified(const std::optional<ui64> pathId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified");
StoragesManager->OnTieringModified(Tiers);
if (TablesManager.HasPrimaryIndex()) {
TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl());
if (Tiers->IsReady()) {
StoragesManager->OnTieringModified(Tiers);
if (TablesManager.HasPrimaryIndex()) {
TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl(), pathId);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ class TColumnShard
TabletCounters->Cumulative()[counter].Increment(num);
}

void ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit = false);
void OnTieringModified();
void ActivateTiering(const ui64 pathId, const TString& useTiering);
void OnTieringModified(const std::optional<ui64> pathId = {});
public:
enum class EOverloadStatus {
ShardTxInFly /* "shard_tx" */,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace NKikimr::NOlap {

const std::shared_ptr<arrow::Schema>& IColumnEngine::GetReplaceKey() const {
return GetVersionedIndex().GetLastSchema()->GetIndexInfo().GetReplaceKey();
}
}

template <>
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ class IColumnEngine {
virtual ~IColumnEngine() = default;

virtual const TVersionedIndex& GetVersionedIndex() const = 0;
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const { return GetVersionedIndex().GetLastSchema()->GetIndexInfo().GetReplaceKey(); }
virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const = 0;
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;

virtual bool HasDataInPathId(const ui64 pathId) const = 0;
virtual bool Load(IDbWrapper& db) = 0;
Expand All @@ -383,7 +384,7 @@ class IColumnEngine {
virtual const TColumnEngineStats& GetTotalStats() = 0;
virtual ui64 MemoryUsage() const { return 0; }
virtual TSnapshot LastUpdate() const { return TSnapshot::Zero(); }
virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) = 0;
virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) = 0;
};

}
41 changes: 33 additions & 8 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,17 +573,44 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
return out;
}

void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified");
std::optional<THashMap<ui64, TTiering>> tierings;
void TColumnEngineForLogs::OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) {
if (!TiersInitialized) {
for (auto&& i : Tables) {
i.second->StartActualizationIndex();
}
}

TiersInitialized = true;
AFL_VERIFY(manager);
THashMap<ui64, TTiering> tierings;
if (manager) {
tierings = manager->GetTiering();
}
ttl.AddTtls(tierings);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")
("new_count_tierings", tierings ? ::ToString(tierings->size()) : TString("undefined"))
("new_count_tierings", tierings.size())
("new_count_ttls", ttl.PathsCount());
EvictionsController.RefreshTierings(std::move(tierings), ttl);

if (pathId) {
auto itGranule = Tables.find(*pathId);
AFL_VERIFY(itGranule != Tables.end());
auto it = tierings.find(*pathId);
if (it == tierings.end()) {
itGranule->second->RefreshTiering({});
} else {
itGranule->second->RefreshTiering(it->second);
}
} else {
for (auto&& g : Tables) {
auto it = tierings.find(g.first);
if (it == tierings.end()) {
g.second->RefreshTiering({});
} else {
g.second->RefreshTiering(it->second);
}
}
}
}

void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) {
Expand All @@ -606,9 +633,7 @@ void TEvictionsController::RefreshTierings(std::optional<THashMap<ui64, TTiering
OriginalTierings = std::move(*tierings);
}
auto copy = OriginalTierings;
if (!ttl.AddTtls(copy)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Broken ttl");
}
ttl.AddTtls(copy);
NextCheckInstantForTierings = BuildNextInstantCheckers(std::move(copy));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RefreshTierings")("count", NextCheckInstantForTierings.size());
}
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TColumnEngineForLogs : public IColumnEngine {
friend class TCleanupColumnEngineChanges;
friend class NDataSharing::TDestinationSession;
private:
bool TiersInitialized = false;
const NColumnShard::TEngineLogsCounters SignalCounters;
std::shared_ptr<TGranulesStorage> GranulesStorage;
std::shared_ptr<IStoragesManager> StoragesManager;
Expand Down Expand Up @@ -148,7 +149,11 @@ class TColumnEngineForLogs : public IColumnEngine {

TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits, const std::shared_ptr<IStoragesManager>& storagesManager);

virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) override;
virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) override;

virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const override {
return std::make_shared<TVersionedIndex>(VersionedIndex);
}

const TVersionedIndex& GetVersionedIndex() const override {
return VersionedIndex;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/granule.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ class TGranuleMeta: TNonCopyable {
void OnAdditiveSummaryChange() const;
YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero());
public:
void RefreshTiering(const std::optional<TTiering>& /*tiering*/) {
}

void StartActualizationIndex() {
}

NJson::TJsonValue OptimizerSerializeToJson() const {
return OptimizerPlanner->SerializeToJsonVisual();
}
Expand Down
8 changes: 2 additions & 6 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio
table.SetDropVersion(version);
PathsToDrop.insert(pathId);
Ttl.DropPathTtl(pathId);
if (PrimaryIndex) {
PrimaryIndex->OnTieringModified(nullptr, Ttl);
}
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
}

Expand Down Expand Up @@ -269,7 +266,7 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho
}
}

void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) {
void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
auto it = Tables.find(pathId);
AFL_VERIFY(it != Tables.end());
auto& table = it->second;
Expand All @@ -296,7 +293,7 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
Ttl.DropPathTtl(pathId);
}
if (PrimaryIndex) {
PrimaryIndex->OnTieringModified(nullptr, Ttl);
PrimaryIndex->OnTieringModified(manager, Ttl, pathId);
}
}
Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
Expand All @@ -318,7 +315,6 @@ void TTablesManager::IndexSchemaVersion(const NOlap::TSnapshot& snapshot, const
PrimaryIndex->RegisterTable(i.first);
}
}
PrimaryIndex->OnTieringModified(nullptr, Ttl);
}

TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class TTablesManager {
bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);

void AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager);
bool FillMonitoringReport(NTabletFlatExecutor::TTransactionContext& txc, NJson::TJsonValue& json);
private:
void IndexSchemaVersion(const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema);
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/tiering/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,14 @@ NMetadata::NFetcher::ISnapshotsFetcher::TPtr TTiersManager::GetExternalDataManip

THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
THashMap<ui64, NKikimr::NOlap::TTiering> result;
if (!IsReady()) {
return result;
}
AFL_VERIFY(IsReady());
auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(Snapshot);
Y_ABORT_UNLESS(snapshotPtr);
auto& tierConfigs = snapshotPtr->GetTierConfigs();
for (auto&& i : PathIdTiering) {
auto* tiering = snapshotPtr->GetTieringById(i.second);
if (tiering) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("path_id", i.first)("tiering_name", i.second)("event", "activation");
result.emplace(i.first, tiering->BuildOlapTiers());
for (auto& [pathId, pathTiering] : result) {
for (auto& [name, tier] : pathTiering.GetTierByName()) {
Expand All @@ -213,6 +212,8 @@ THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
}
}
}
} else {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("path_id", i.first)("tiering_name", i.second)("event", "not_found");
}
}
return result;
Expand Down
8 changes: 1 addition & 7 deletions ydb/core/tx/tiering/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,12 @@ class TTiersManager: public ITiersManager {
TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr);
TTiersManager& Stop(const bool needStopActor);
virtual const std::map<TString, NTiers::TManager>& GetManagers() const override {
AFL_VERIFY(IsReady());
return Managers;
}
virtual const NTiers::TManager* GetManagerOptional(const TString& tierId) const override;
NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetExternalDataManipulation() const;

TManagers::const_iterator begin() const {
return Managers.begin();
}

TManagers::const_iterator end() const {
return Managers.end();
}
};

}