Skip to content

Commit df84217

Browse files
normalizer for remove granule_id in stored portions (#5049)
1 parent dd3fb68 commit df84217

File tree

5 files changed

+180
-2
lines changed

5 files changed

+180
-2
lines changed

ydb/core/tx/columnshard/engines/db_wrapper.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe
5151
}
5252
using IndexColumns = NColumnShard::Schema::IndexColumns;
5353
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
54-
db.Table<IndexColumns>().Key(0, portion.GetPathId(), row.ColumnId,
54+
db.Table<IndexColumns>().Key(0, 0, row.ColumnId,
5555
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Update(
5656
NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
5757
NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
@@ -85,7 +85,7 @@ void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
8585
void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
8686
NIceDb::TNiceDb db(Database);
8787
using IndexColumns = NColumnShard::Schema::IndexColumns;
88-
db.Table<IndexColumns>().Key(0, portion.GetPathId(), row.ColumnId,
88+
db.Table<IndexColumns>().Key(0, 0, row.ColumnId,
8989
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
9090
}
9191

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ namespace NKikimr::NOlap {
5656
PortionsCleaner,
5757
TablesCleaner,
5858
PortionsMetadata,
59+
CleanGranuleId
5960
};
6061

6162
class TNormalizationContext {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#include "clean_granule.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
5+
namespace NKikimr::NOlap {
6+
7+
namespace {
8+
9+
struct TChunkData {
10+
ui64 Index = 0;
11+
ui64 GranuleId = 0;
12+
ui64 PlanStep = 0;
13+
ui64 TxId = 0;
14+
ui64 PortionId = 0;
15+
ui32 Chunk = 0;
16+
ui64 ColumnIdx = 0;
17+
18+
ui64 XPlanStep = 0;
19+
ui64 XTxId = 0;
20+
TString Blob;
21+
TString Metadata;
22+
ui64 Offset;
23+
ui32 Size;
24+
ui64 PathId;
25+
26+
template <class TRowSet>
27+
TChunkData(const TRowSet& rowset) {
28+
using Schema = NColumnShard::Schema;
29+
PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>();
30+
TxId = rowset.template GetValue<Schema::IndexColumns::TxId>();
31+
PortionId = rowset.template GetValue<Schema::IndexColumns::Portion>();
32+
GranuleId = rowset.template GetValue<Schema::IndexColumns::Granule>();
33+
Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>();
34+
Index = rowset.template GetValue<Schema::IndexColumns::Index>();
35+
ColumnIdx = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>();
36+
37+
XPlanStep = rowset.template GetValue<Schema::IndexColumns::XPlanStep>();
38+
XTxId = rowset.template GetValue<Schema::IndexColumns::XTxId>();
39+
Blob = rowset.template GetValue<Schema::IndexColumns::Blob>();
40+
Metadata = rowset.template GetValue<Schema::IndexColumns::Metadata>();
41+
Offset = rowset.template GetValue<Schema::IndexColumns::Offset>();
42+
Size = rowset.template GetValue<Schema::IndexColumns::Size>();
43+
PathId = rowset.template GetValue<Schema::IndexColumns::PathId>();
44+
}
45+
};
46+
}
47+
48+
class TCleanGranuleIdNormalizer::TNormalizerResult : public INormalizerChanges {
49+
private:
50+
std::vector<TChunkData> Chunks;
51+
52+
void AddChunk(TChunkData&& chunk) {
53+
Chunks.push_back(std::move(chunk));
54+
}
55+
56+
TNormalizerResult() = default;
57+
58+
public:
59+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
60+
using Schema = NColumnShard::Schema;
61+
NIceDb::TNiceDb db(txc.DB);
62+
ACFL_INFO("normalizer", "TCleanGranuleIdNormalizer")("message", TStringBuilder() << "apply " << Chunks.size() << " chunks");
63+
64+
for (auto&& key : Chunks) {
65+
db.Table<Schema::IndexColumns>().Key(key.Index, key.GranuleId, key.ColumnIdx,
66+
key.PlanStep, key.TxId, key.PortionId, key.Chunk).Delete();
67+
68+
db.Table<Schema::IndexColumns>().Key(0, 0, key.ColumnIdx,
69+
key.PlanStep, key.TxId, key.PortionId, key.Chunk).Update(
70+
NIceDb::TUpdate<Schema::IndexColumns::PathId>(key.PathId),
71+
NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob),
72+
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
73+
NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset),
74+
NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size),
75+
NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep),
76+
NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId)
77+
78+
);
79+
}
80+
return true;
81+
}
82+
83+
ui64 GetSize() const override {
84+
return Chunks.size();
85+
}
86+
87+
static std::optional<std::vector<INormalizerChanges::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
88+
using namespace NColumnShard;
89+
NIceDb::TNiceDb db(txc.DB);
90+
91+
bool ready = true;
92+
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
93+
if (!ready) {
94+
return std::nullopt;
95+
}
96+
97+
std::vector<INormalizerChanges::TPtr> tasks;
98+
ui64 fullChunksCount = 0;
99+
{
100+
auto rowset = db.Table<Schema::IndexColumns>().Select();
101+
if (!rowset.IsReady()) {
102+
return std::nullopt;
103+
}
104+
std::shared_ptr<TNormalizerResult> changes(new TNormalizerResult());
105+
ui64 chunksCount = 0;
106+
107+
while (!rowset.EndOfSet()) {
108+
if (rowset.GetValue<Schema::IndexColumns::Granule>() || rowset.GetValue<Schema::IndexColumns::Index>()) {
109+
TChunkData key(rowset);
110+
111+
changes->AddChunk(std::move(key));
112+
++chunksCount;
113+
++fullChunksCount;
114+
115+
if (chunksCount == 10000) {
116+
tasks.emplace_back(changes);
117+
changes.reset(new TNormalizerResult());
118+
controller.GetCounters().CountObjects(chunksCount);
119+
chunksCount = 0;
120+
}
121+
}
122+
123+
if (!rowset.Next()) {
124+
return std::nullopt;
125+
}
126+
}
127+
128+
if (chunksCount > 0) {
129+
tasks.emplace_back(changes);
130+
controller.GetCounters().CountObjects(chunksCount);
131+
}
132+
}
133+
ACFL_INFO("normalizer", "TCleanGranuleIdNormalizer")("message", TStringBuilder() << fullChunksCount << " chunks found");
134+
return tasks;
135+
}
136+
137+
};
138+
139+
TConclusion<std::vector<INormalizerTask::TPtr>> TCleanGranuleIdNormalizer::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
140+
auto changes = TNormalizerResult::Init(controller, txc);
141+
if (!changes) {
142+
return TConclusionStatus::Fail("Not ready");;
143+
}
144+
std::vector<INormalizerTask::TPtr> tasks;
145+
for (auto&& c : *changes) {
146+
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
147+
}
148+
return tasks;
149+
}
150+
151+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
7+
namespace NKikimr::NOlap {
8+
9+
class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent {
10+
class TNormalizerResult;
11+
12+
static inline INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer> Registrator =
13+
INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer>(ENormalizerSequentialId::CleanGranuleId);
14+
public:
15+
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) {
16+
}
17+
18+
virtual ENormalizerSequentialId GetType() const override {
19+
return ENormalizerSequentialId::CleanGranuleId;
20+
}
21+
22+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
23+
};
24+
25+
}

ydb/core/tx/columnshard/normalizer/granule/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
GLOBAL normalizer.cpp
5+
GLOBAL clean_granule.cpp
56
)
67

78
PEERDIR(

0 commit comments

Comments
 (0)