diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 039faafa8794..8c3feaa4cf72 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -2,8 +2,8 @@ namespace NKikimr::NColumnShard { -bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, const std::function& callback) { - auto rowset = db.Table().Prefix(index).Select(); +bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, const std::function& callback) { + auto rowset = db.Table().Prefix(0).Select(); if (!rowset.IsReady()) { return false; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index c55cf2ed57b4..b2317147a42d 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -510,13 +510,13 @@ struct Schema : NIceDb::Schema { // IndexColumns activities - static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { + static void IndexColumns_Write(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk); auto rowProto = row.GetMeta().SerializeToProto(); if (proto) { *rowProto.MutablePortionMeta() = std::move(*proto); } - db.Table().Key(index, portion.GetDeprecatedGranuleId(), row.ColumnId, + db.Table().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update( NIceDb::TUpdate(portion.GetRemoveSnapshot().GetPlanStep()), NIceDb::TUpdate(portion.GetRemoveSnapshot().GetTxId()), @@ -528,24 +528,24 @@ struct Schema : NIceDb::Schema { ); } - static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { - db.Table().Key(index, portion.GetDeprecatedGranuleId(), row.ColumnId, + static void IndexColumns_Erase(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { + db.Table().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete(); } - static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, + static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, const std::function& callback); // IndexCounters - static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 index, ui32 counterId, ui64 value) { - db.Table().Key(index, counterId).Update( + static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 counterId, ui64 value) { + db.Table().Key(0, counterId).Update( NIceDb::TUpdate(value) ); } - static bool IndexCounters_Load(NIceDb::TNiceDb& db, ui32 index, const std::function& callback) { - auto rowset = db.Table().Prefix(index).Select(); + static bool IndexCounters_Load(NIceDb::TNiceDb& db, const std::function& callback) { + auto rowset = db.Table().Prefix(0).Select(); if (!rowset.IsReady()) return false; diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp index f0172442ddb4..ef91baef134a 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp @@ -40,7 +40,7 @@ bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp continue; } for (auto& record : portionInfo.Records) { - self.ColumnsTable->Erase(context.DB, portionInfo, record); + context.DB.EraseColumn(portionInfo, record); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 24913b1ebbb6..0003e91cf625 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -68,7 +68,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange self.UpsertPortion(portionInfo, &oldInfo); for (auto& record : portionInfo.Records) { - self.ColumnsTable->Write(context.DB, portionInfo, record); + context.DB.WriteColumn(portionInfo, record); } } for (auto& portionInfoWithBlobs : AppendedPortions) { @@ -77,7 +77,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); self.UpsertPortion(portionInfo); for (auto& record : portionInfo.Records) { - self.ColumnsTable->Write(context.DB, portionInfo, record); + context.DB.WriteColumn(portionInfo, record); } } } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index d823af9629ef..29f7e74c2714 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -327,6 +327,10 @@ class TVersionedIndex { return Snapshots.rbegin()->second; } + bool IsEmpty() const { + return Snapshots.empty(); + } + const std::shared_ptr& GetPrimaryKey() const noexcept { return PrimaryKey; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 18290e79d90e..98cab6bef1ca 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -130,11 +130,7 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c } void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { - if (!ColumnsTable) { - ui32 indexId = indexInfo.GetId(); - ColumnsTable = std::make_shared(indexId); - CountersTable = std::make_shared(indexId); - } else { + if (!VersionedIndex.IsEmpty()) { const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo(); Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo)); } @@ -173,7 +169,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) { bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { TSnapshot lastSnapshot(0, 0); const TIndexInfo* currentIndexInfo = nullptr; - auto result = ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { + auto result = db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) { currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo(); lastSnapshot = portion.GetMinSnapshot(); @@ -207,7 +203,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { } }; - return CountersTable->Load(db, callback); + return db.LoadCounters(callback); } std::shared_ptr TColumnEngineForLogs::StartInsert(std::vector&& dataToIndex) noexcept { @@ -476,13 +472,13 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptrApplyChanges(*this, context)); } - CountersTable->Write(db, LAST_PORTION, LastPortion); - CountersTable->Write(db, LAST_GRANULE, LastGranule); + db.WriteCounter(LAST_PORTION, LastPortion); + db.WriteCounter(LAST_GRANULE, LastGranule); if (LastSnapshot < snapshot) { LastSnapshot = snapshot; - CountersTable->Write(db, LAST_PLAN_STEP, LastSnapshot.GetPlanStep()); - CountersTable->Write(db, LAST_TX_ID, LastSnapshot.GetTxId()); + db.WriteCounter(LAST_PLAN_STEP, LastSnapshot.GetPlanStep()); + db.WriteCounter(LAST_TX_ID, LastSnapshot.GetTxId()); } return true; } diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index 517a90c8a464..0b1cd1bdc806 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -7,44 +7,4 @@ namespace NKikimr::NOlap { -class TColumnsTable { -public: - TColumnsTable(ui32 indexId) - : IndexId(indexId) - {} - - void Write(IDbWrapper& db, const TPortionInfo& portion, const TColumnRecord& row) { - db.WriteColumn(IndexId, portion, row); - } - - void Erase(IDbWrapper& db, const TPortionInfo& portion, const TColumnRecord& row) { - db.EraseColumn(IndexId, portion, row); - } - - bool Load(IDbWrapper& db, std::function callback) { - return db.LoadColumns(IndexId, callback); - } - -private: - ui32 IndexId; -}; - -class TCountersTable { -public: - TCountersTable(ui32 indexId) - : IndexId(indexId) - {} - - void Write(IDbWrapper& db, ui32 counterId, ui64 value) { - db.WriteCounter(IndexId, counterId, value); - } - - bool Load(IDbWrapper& db, std::function callback) { - return db.LoadCounters(IndexId, callback); - } - -private: - ui32 IndexId; -}; - } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index ff825546b15f..57a7abb585ee 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -40,29 +40,29 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable, return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime); } -void TDbWrapper::WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { +void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) { NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexColumns_Write(db, index, portion, row); + NColumnShard::Schema::IndexColumns_Write(db, portion, row); } -void TDbWrapper::EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { +void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) { NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexColumns_Erase(db, index, portion, row); + NColumnShard::Schema::IndexColumns_Erase(db, portion, row); } -bool TDbWrapper::LoadColumns(ui32 index, const std::function& callback) { +bool TDbWrapper::LoadColumns(const std::function& callback) { NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, index, callback); + return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, callback); } -void TDbWrapper::WriteCounter(ui32 index, ui32 counterId, ui64 value) { +void TDbWrapper::WriteCounter(ui32 counterId, ui64 value) { NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::IndexCounters_Write(db, index, counterId, value); + return NColumnShard::Schema::IndexCounters_Write(db, counterId, value); } -bool TDbWrapper::LoadCounters(ui32 index, const std::function& callback) { +bool TDbWrapper::LoadCounters(const std::function& callback) { NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::IndexCounters_Load(db, index, callback); + return NColumnShard::Schema::IndexCounters_Load(db, callback); } } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 2767ec4f8848..a90684dbffd7 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -29,12 +29,12 @@ class IDbWrapper { virtual bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) = 0; - virtual void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0; - virtual void EraseColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0; - virtual bool LoadColumns(ui32 index, const std::function& callback) = 0; + virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0; + virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0; + virtual bool LoadColumns(const std::function& callback) = 0; - virtual void WriteCounter(ui32 index, ui32 counterId, ui64 value) = 0; - virtual bool LoadCounters(ui32 index, const std::function& callback) = 0; + virtual void WriteCounter(ui32 counterId, ui64 value) = 0; + virtual bool LoadCounters(const std::function& callback) = 0; }; class TDbWrapper : public IDbWrapper { @@ -53,12 +53,12 @@ class TDbWrapper : public IDbWrapper { bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override; - void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; - void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; - bool LoadColumns(ui32 index, const std::function& callback) override; + void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; + void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; + bool LoadColumns(const std::function& callback) override; - void WriteCounter(ui32 index, ui32 counterId, ui64 value) override; - bool LoadCounters(ui32 index, const std::function& callback) override; + void WriteCounter(ui32 counterId, ui64 value) override; + bool LoadCounters(const std::function& callback) override; private: NTable::TDatabase& Database;