Skip to content

Commit 1cbae05

Browse files
authored
Delete unsupported special column DELETE_FLAG (#7068)
1 parent 4eb6288 commit 1cbae05

File tree

8 files changed

+175
-1
lines changed

8 files changed

+175
-1
lines changed

ydb/core/tx/columnshard/common/portion.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ class TSpecialColumns {
1717
public:
1818
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
1919
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
20+
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
2021
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
2122
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
23+
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
2224
};
2325

2426
}

ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ class IIndexInfo {
99
public:
1010
enum class ESpecialColumn: ui32 {
1111
PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX,
12-
TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX
12+
TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX,
13+
DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX
1314
};
1415

1516
static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP;
1617
static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID;
18+
static constexpr const char* SPEC_COL_DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG;
1719
static const TString STORE_INDEX_STATS_TABLE;
1820
static const TString STORE_INDEX_PORTION_STATS_TABLE;
1921
static const TString STORE_INDEX_GRANULE_STATS_TABLE;

ydb/core/tx/columnshard/engines/scheme/index_info.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ TString TIndexInfo::GetColumnName(ui32 id, bool required) const {
9898
return SPEC_COL_PLAN_STEP;
9999
} else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {
100100
return SPEC_COL_TX_ID;
101+
} else if (ESpecialColumn(id) == ESpecialColumn::DELETE_FLAG) {
102+
return SPEC_COL_DELETE_FLAG;
101103
} else {
102104
const auto ci = Columns.find(id);
103105

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ namespace NKikimr::NOlap {
5353
enum class ENormalizerSequentialId : ui32 {
5454
Granules = 1,
5555
Chunks,
56+
DeleteUnsupportedSpecialColumns,
5657
PortionsCleaner,
5758
TablesCleaner,
5859
// PortionsMetadata
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#include "normalizer.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
5+
namespace NKikimr::NOlap {
6+
7+
namespace {
8+
9+
constexpr ui32 ColumnIdxToDelete = (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG;
10+
11+
12+
using namespace NColumnShard;
13+
14+
struct TKey {
15+
ui32 Index;
16+
ui64 Granule;
17+
ui32 ColumnIdx;
18+
ui64 PlanStep;
19+
ui64 TxId;
20+
ui64 Portion;
21+
ui32 Chunk;
22+
};
23+
24+
using TKeyBatch = std::vector<TKey>;
25+
26+
std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, size_t maxBatchSize) {
27+
NIceDb::TNiceDb db(txc.DB);
28+
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
29+
return std::nullopt;
30+
}
31+
std::vector<TKeyBatch> result;
32+
TKeyBatch currentBatch;
33+
auto rowset = db.Table<Schema::IndexColumns>().Select<
34+
Schema::IndexColumns::Index,
35+
Schema::IndexColumns::Granule,
36+
Schema::IndexColumns::ColumnIdx,
37+
Schema::IndexColumns::PlanStep,
38+
Schema::IndexColumns::TxId,
39+
Schema::IndexColumns::Portion,
40+
Schema::IndexColumns::Chunk
41+
>();
42+
if (!rowset.IsReady()) {
43+
return std::nullopt;
44+
}
45+
while (!rowset.EndOfSet()) {
46+
if (rowset.GetValue<Schema::IndexColumns::ColumnIdx>() == ColumnIdxToDelete) {
47+
auto key = TKey {
48+
.Index = rowset.GetValue<Schema::IndexColumns::Index>(),
49+
.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(),
50+
.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(),
51+
.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(),
52+
.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(),
53+
.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>()
54+
};
55+
currentBatch.emplace_back(std::move(key));
56+
if (currentBatch.size() == maxBatchSize) {
57+
result.emplace_back(std::move(currentBatch));
58+
currentBatch = TKeyBatch{};
59+
}
60+
}
61+
if (!rowset.Next()) {
62+
return std::nullopt;
63+
}
64+
}
65+
if (!currentBatch.empty()) {
66+
result.emplace_back(std::move(currentBatch));
67+
}
68+
69+
return result;
70+
}
71+
72+
class TChanges : public INormalizerChanges {
73+
public:
74+
TChanges(TKeyBatch&& keys)
75+
: Keys(keys)
76+
{}
77+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
78+
using namespace NColumnShard;
79+
NIceDb::TNiceDb db(txc.DB);
80+
for(const auto& k: Keys) {
81+
db.Table<Schema::IndexColumns>().Key(
82+
k.Index,
83+
k.Granule,
84+
k.ColumnIdx,
85+
k.PlanStep,
86+
k.TxId,
87+
k.Portion,
88+
k.Chunk
89+
).Delete();
90+
}
91+
ACFL_INFO("normalizer", "TDeleteUnsupportedSpecialColumnsNormalier")("message", TStringBuilder() << GetSize() << " rows deleted");
92+
return true;
93+
}
94+
95+
ui64 GetSize() const override {
96+
return Keys.size();
97+
}
98+
private:
99+
const TKeyBatch Keys;
100+
};
101+
102+
} //namespace
103+
104+
TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteUnsupportedSpecialColumnsNormalier::DoInit(const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
105+
using namespace NColumnShard;
106+
NIceDb::TNiceDb db(txc.DB);
107+
const size_t MaxBatchSize = 10000;
108+
auto keysToDelete = KeysToDelete(txc, MaxBatchSize);
109+
if (!keysToDelete) {
110+
return TConclusionStatus::Fail("Not ready");
111+
}
112+
ACFL_INFO("normalizer", "TDeleteUnsupportedSpecialColumnsNormalier")("message",
113+
TStringBuilder()
114+
<< "found "
115+
<< std::accumulate(cbegin(*keysToDelete), cend(*keysToDelete), 0, [](size_t a, const TKeyBatch& b){return a + b.size();})
116+
<< " rows to delete grouped in "
117+
<< keysToDelete->size()
118+
<< " batches"
119+
);
120+
121+
std::vector<INormalizerTask::TPtr> result;
122+
for (auto&& batch: *keysToDelete) {
123+
AFL_VERIFY(!batch.empty());
124+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(
125+
std::make_shared<TChanges>(std::move(std::move(batch)))
126+
));
127+
}
128+
return result;
129+
}
130+
131+
} //namespace NKikimr::NOlap
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
7+
namespace NKikimr::NOlap {
8+
9+
class TDeleteUnsupportedSpecialColumnsNormalier: public TNormalizationController::INormalizerComponent {
10+
using TThisClass = TDeleteUnsupportedSpecialColumnsNormalier;
11+
static constexpr auto TypeId = ENormalizerSequentialId::DeleteUnsupportedSpecialColumns;
12+
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TThisClass>(TypeId);
13+
public:
14+
TDeleteUnsupportedSpecialColumnsNormalier(const TNormalizationController::TInitContext&)
15+
{}
16+
17+
virtual ENormalizerSequentialId GetType() const override {
18+
return TypeId;
19+
}
20+
21+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
22+
};
23+
24+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
GLOBAL normalizer.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/tx/columnshard/normalizer/abstract
9+
)
10+
11+
END()

ydb/core/tx/columnshard/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ PEERDIR(
5656
ydb/core/tx/columnshard/export
5757
ydb/core/tx/columnshard/resource_subscriber
5858
ydb/core/tx/columnshard/normalizer/granule
59+
ydb/core/tx/columnshard/normalizer/special_columns
5960
ydb/core/tx/columnshard/normalizer/portion
6061
ydb/core/tx/columnshard/normalizer/tables
6162
ydb/core/tx/columnshard/blobs_action/storages_manager

0 commit comments

Comments
 (0)