Skip to content

Commit 888c474

Browse files
authored
share schemas between CS on same node (#12673)
1 parent e804b29 commit 888c474

30 files changed

+276
-82
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8383
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
8484
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
8585
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
86-
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
86+
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
87+
std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
8788
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
8889
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
8990
, InsertTable(std::make_unique<NOlap::TInsertTable>())

ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
1515
auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>();
1616

1717
for (auto& info : SchemeHistory) {
18-
index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema());
18+
index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema());
1919
}
2020

2121
TDbWrapper dbWrapper(txc.DB, nullptr);

ydb/core/tx/columnshard/engines/changes/indexation.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class TPathFieldsInfo {
108108
if (!Schemas.contains(data.GetSchemaVersion())) {
109109
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
110110
}
111-
auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
111+
TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
112112
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
113113
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
114114
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
@@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
247247
{
248248
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);
249249

250-
auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
250+
NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
251251
auto batchSchema =
252252
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
253253
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,9 @@ class IColumnEngine {
354354
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0;
355355
virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
356356
virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
357-
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
358-
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
359-
virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
357+
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0;
358+
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;
359+
virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;
360360

361361
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
362362
virtual const TColumnEngineStats& GetTotalStats() = 0;

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,32 @@
2626

2727
namespace NKikimr::NOlap {
2828

29-
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
29+
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
3030
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
31-
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
31+
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema)
3232
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
3333
, DataAccessorsManager(dataAccessorsManager)
3434
, StoragesManager(storagesManager)
35+
, SchemaObjectsCache(schemaCache)
3536
, TabletId(tabletId)
3637
, LastPortion(0)
3738
, LastGranule(0) {
3839
ActualizationController = std::make_shared<NActualizer::TController>();
39-
RegisterSchemaVersion(snapshot, schema);
40+
RegisterSchemaVersion(snapshot, presetId, schema);
4041
}
4142

42-
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
43+
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
4344
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
44-
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
45+
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema)
4546
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
4647
, DataAccessorsManager(dataAccessorsManager)
4748
, StoragesManager(storagesManager)
49+
, SchemaObjectsCache(schemaCache)
4850
, TabletId(tabletId)
4951
, LastPortion(0)
5052
, LastGranule(0) {
5153
ActualizationController = std::make_shared<NActualizer::TController>();
52-
RegisterSchemaVersion(snapshot, std::move(schema));
54+
RegisterSchemaVersion(snapshot, presetId, std::move(schema));
5355
}
5456

5557
const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::GetStats() const {
@@ -138,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats(
138140
}
139141
}
140142

141-
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
143+
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) {
142144
AFL_VERIFY(DataAccessorsManager);
143145
bool switchOptimizer = false;
144146
bool switchAccessorsManager = false;
@@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
150152
}
151153

152154
const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
153-
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
155+
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo)));
154156
if (isCriticalScheme) {
155157
StartActualization({});
156158
for (auto&& i : GranulesStorage->GetTables()) {
@@ -170,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
170172
}
171173
}
172174

173-
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
175+
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
174176
AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())(
175177
"last", VersionedIndex.GetLastSchema()->GetVersion());
176178

@@ -184,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons
184186
indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache);
185187
}
186188
AFL_VERIFY(indexInfoOptional);
187-
RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional));
189+
RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional));
188190
}
189191

190-
void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
192+
void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
191193
AFL_VERIFY(!VersionedIndex.IsEmpty());
192194

193195
ui64 version = schema.GetVersion();
@@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c
215217
}
216218

217219
AFL_VERIFY(indexInfoOptional);
218-
VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional));
220+
VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional)));
219221
}
220222

221223
std::shared_ptr<ITxReader> TColumnEngineForLogs::BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) {

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine {
6161
std::shared_ptr<IStoragesManager> StoragesManager;
6262

6363
std::shared_ptr<NActualizer::TController> ActualizationController;
64-
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
64+
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
6565
TVersionedIndex VersionedIndex;
6666
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;
6767

@@ -98,10 +98,13 @@ class TColumnEngineForLogs: public IColumnEngine {
9898
ADD,
9999
};
100100

101-
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
102-
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
103-
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
104-
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);
101+
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
102+
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
103+
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId,
104+
const TSchemaInitializationData& schema);
105+
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
106+
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
107+
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema);
105108

106109
void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
107110
void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) override;
@@ -157,9 +160,9 @@ class TColumnEngineForLogs: public IColumnEngine {
157160
virtual bool ApplyChangesOnExecute(
158161
IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;
159162

160-
void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
161-
void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
162-
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
163+
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override;
164+
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
165+
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
163166

164167
std::shared_ptr<TSelectInfo> Select(
165168
ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#include "schema_version.h"
2+
3+
namespace NKikimr::NOlap {}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
3+
#include <ydb/library/accessor/accessor.h>
4+
5+
#include <util/digest/numeric.h>
6+
7+
namespace NKikimr::NOlap {
8+
9+
class TSchemaVersionId {
10+
private:
11+
YDB_READONLY_DEF(ui64, PresetId);
12+
YDB_READONLY_DEF(ui64, Version);
13+
14+
public:
15+
bool operator==(const TSchemaVersionId& other) const {
16+
return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version);
17+
}
18+
19+
TSchemaVersionId(const ui64 presetId, const ui64 version)
20+
: PresetId(presetId)
21+
, Version(version) {
22+
}
23+
};
24+
25+
}
26+
27+
template <>
28+
struct THash<NKikimr::NOlap::TSchemaVersionId> {
29+
inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const {
30+
return CombineHashes(key.GetPresetId(), key.GetVersion());
31+
}
32+
};

ydb/core/tx/columnshard/engines/scheme/abstract/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
index_info.cpp
55
column_ids.cpp
6+
schema_version.cpp
67
)
78

89
PEERDIR(
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#include "cache.h"
2+
3+
namespace NKikimr::NOlap {}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#pragma once
2+
3+
#include <util/generic/hash.h>
4+
#include <util/system/guard.h>
5+
#include <util/system/mutex.h>
6+
7+
#include <memory>
8+
9+
namespace NKikimr::NOlap {
10+
11+
template <typename TKey, typename TObject>
12+
class TObjectCache : std::enable_shared_from_this<TObjectCache<TKey, TObject>> {
13+
private:
14+
THashMap<TKey, std::weak_ptr<const TObject>> Objects;
15+
mutable TMutex Mutex;
16+
17+
public:
18+
class TEntryGuard {
19+
private:
20+
TKey Key;
21+
std::shared_ptr<const TObject> Object;
22+
std::weak_ptr<TObjectCache> Cache;
23+
24+
public:
25+
TEntryGuard(TKey key, const std::shared_ptr<const TObject> object, TObjectCache* cache)
26+
: Key(key)
27+
, Object(object)
28+
, Cache(cache->weak_from_this()) {
29+
}
30+
31+
const TObject* operator->() const {
32+
return Object.get();
33+
}
34+
const TObject& operator*() const {
35+
return *Object;
36+
}
37+
38+
~TEntryGuard() {
39+
Object.reset();
40+
if (auto cache = Cache.lock()) {
41+
cache->TryFree(Key);
42+
}
43+
}
44+
};
45+
46+
public:
47+
TEntryGuard Upsert(TKey key, TObject&& object) {
48+
TGuard lock(Mutex);
49+
auto* findSchema = Objects.FindPtr(key);
50+
std::shared_ptr<const TObject> cachedObject;
51+
if (findSchema) {
52+
cachedObject = findSchema->lock();
53+
}
54+
if (!cachedObject) {
55+
cachedObject = std::make_shared<const TObject>(std::move(object));
56+
Objects[key] = cachedObject;
57+
}
58+
return TEntryGuard(std::move(key), cachedObject, this);
59+
}
60+
61+
void TryFree(const TKey& key) {
62+
TGuard lock(Mutex);
63+
auto findObject = Objects.FindPtr(key);
64+
if (findObject) {
65+
if (findObject->expired()) {
66+
Objects.erase(key);
67+
}
68+
}
69+
}
70+
};
71+
72+
} // namespace NKikimr::NOlap
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
cache.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/library/actors/core
9+
)
10+
11+
YQL_LAST_ABI_VERSION()
12+
13+
END()

ydb/core/tx/columnshard/engines/scheme/index_info.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) co
7676
}
7777

7878
std::vector<std::string> TIndexInfo::GetColumnSTLNames(const bool withSpecial) const {
79-
const auto ids = GetColumnIds(withSpecial);
79+
const TColumnIdsView ids = GetColumnIds(withSpecial);
8080
std::vector<std::string> out;
8181
out.reserve(ids.size());
8282
for (ui32 id : ids) {
@@ -457,7 +457,7 @@ std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> TIndexInfo::GetIndexMetaC
457457
}
458458

459459
std::vector<ui32> TIndexInfo::GetEntityIds() const {
460-
const auto columnIds = GetColumnIds(true);
460+
const TColumnIdsView columnIds = GetColumnIds(true);
461461
std::vector<ui32> result(columnIds.begin(), columnIds.end());
462462
for (auto&& i : Indexes) {
463463
result.emplace_back(i.first);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
#include "objects_cache.h"
22

3+
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
4+
35
namespace NKikimr::NOlap {
46

7+
TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) {
8+
const TSchemaVersionId versionId(presetId, indexInfo.GetVersion());
9+
return SchemasByVersion.Upsert(versionId, std::move(indexInfo));
10+
}
11+
512
} // namespace NKikimr::NOlap

0 commit comments

Comments
 (0)