Skip to content

Commit b16060e

Browse files
authored
Delete empty portions normalizer (#7596)
1 parent fb79e67 commit b16060e

File tree

6 files changed

+173
-3
lines changed

6 files changed

+173
-3
lines changed

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
5656
TablesCleaner,
5757
PortionsMetadata,
5858
CleanGranuleId,
59+
EmptyPortionsCleaner,
5960

6061
MAX
6162
};
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#include "clean_empty.h"
2+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
3+
4+
5+
namespace NKikimr::NOlap {
6+
7+
namespace {
8+
std::optional<THashSet<TPortionAddress>> GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc) {
9+
using namespace NColumnShard;
10+
NIceDb::TNiceDb db(txc.DB);
11+
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
12+
return std::nullopt;
13+
}
14+
THashSet<TPortionAddress> usedPortions;
15+
auto rowset = db.Table<Schema::IndexColumns>().Select<
16+
Schema::IndexColumns::PathId,
17+
Schema::IndexColumns::Portion
18+
>();
19+
if (!rowset.IsReady()) {
20+
return std::nullopt;
21+
}
22+
while (!rowset.EndOfSet()) {
23+
usedPortions.emplace(
24+
rowset.GetValue<Schema::IndexColumns::PathId>(),
25+
rowset.GetValue<Schema::IndexColumns::Portion>()
26+
);
27+
if (!rowset.Next()) {
28+
return std::nullopt;
29+
}
30+
}
31+
return usedPortions;
32+
}
33+
34+
using TBatch = std::vector<TPortionAddress>;
35+
36+
std::optional<std::vector<TBatch>> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) {
37+
using namespace NColumnShard;
38+
const auto usedPortions = GetColumnPortionAddresses(txc);
39+
if (!usedPortions) {
40+
return std::nullopt;
41+
}
42+
const size_t MaxBatchSize = 10000;
43+
NIceDb::TNiceDb db(txc.DB);
44+
if (!Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme())) {
45+
return std::nullopt;
46+
}
47+
auto rowset = db.Table<Schema::IndexPortions>().Select<
48+
Schema::IndexPortions::PathId,
49+
Schema::IndexPortions::PortionId
50+
>();
51+
if (!rowset.IsReady()) {
52+
return std::nullopt;
53+
}
54+
std::vector<TBatch> result;
55+
TBatch portionsToDelete;
56+
while (!rowset.EndOfSet()) {
57+
TPortionAddress addr(
58+
rowset.GetValue<Schema::IndexPortions::PathId>(),
59+
rowset.GetValue<Schema::IndexPortions::PortionId>()
60+
);
61+
if (!usedPortions->contains(addr)) {
62+
ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << addr.DebugString() << " marked for deletion");
63+
portionsToDelete.emplace_back(std::move(addr));
64+
if (portionsToDelete.size() == MaxBatchSize) {
65+
result.emplace_back(std::move(portionsToDelete));
66+
portionsToDelete = TBatch{};
67+
}
68+
}
69+
if (!rowset.Next()) {
70+
return std::nullopt;
71+
}
72+
}
73+
if (!portionsToDelete.empty()) {
74+
result.emplace_back(std::move(portionsToDelete));
75+
}
76+
return result;
77+
}
78+
79+
class TChanges : public INormalizerChanges {
80+
public:
81+
TChanges(TBatch&& addresses)
82+
: Addresses(addresses)
83+
{}
84+
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
85+
using namespace NColumnShard;
86+
NIceDb::TNiceDb db(txc.DB);
87+
for(const auto& a: Addresses) {
88+
db.Table<Schema::IndexPortions>().Key(
89+
a.GetPathId(),
90+
a.GetPortionId()
91+
).Delete();
92+
}
93+
ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << GetSize() << " portions deleted");
94+
return true;
95+
}
96+
97+
ui64 GetSize() const override {
98+
return Addresses.size();
99+
}
100+
private:
101+
const TBatch Addresses;
102+
};
103+
104+
} //namespace
105+
106+
TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
107+
using namespace NColumnShard;
108+
auto batchesToDelete = GetPortionsToDelete(txc);
109+
if (!batchesToDelete) {
110+
return TConclusionStatus::Fail("Not ready");
111+
}
112+
113+
std::vector<INormalizerTask::TPtr> result;
114+
for (auto&& b: *batchesToDelete) {
115+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(b))));
116+
}
117+
return result;
118+
}
119+
120+
} //namespace NKikimr::NOlap
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
5+
namespace NKikimr::NOlap {
6+
7+
class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent {
8+
9+
static TString ClassName() {
10+
return ToString(ENormalizerSequentialId::EmptyPortionsCleaner);
11+
}
12+
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanEmptyPortionsNormalizer>(ClassName());
13+
public:
14+
TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&)
15+
{}
16+
17+
std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
18+
return ENormalizerSequentialId::EmptyPortionsCleaner;
19+
}
20+
21+
TString GetClassName() const override {
22+
return ClassName();
23+
}
24+
25+
TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
26+
};
27+
28+
} //namespace NKikimr::NOlap

ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
namespace NKikimr::NOlap {
88

99
TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
10-
auto initRes = DoInitImpl(controller,txc);
10+
auto initRes = DoInitImpl(controller, txc);
1111

1212
if (initRes.IsFail()) {
1313
return initRes;

ydb/core/tx/columnshard/normalizer/portion/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ SRCS(
55
GLOBAL portion.cpp
66
GLOBAL chunks.cpp
77
GLOBAL clean.cpp
8+
GLOBAL clean_empty.cpp
89
GLOBAL broken_blobs.cpp
910
)
1011

ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
55
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
6+
#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
67

78
#include <ydb/core/tx/columnshard/operations/write_data.h>
89

@@ -161,7 +162,7 @@ class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier {
161162
}
162163
};
163164

164-
class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
165+
class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
165166
public:
166167
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
167168
using namespace NColumnShard;
@@ -185,6 +186,21 @@ class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
185186
}
186187
};
187188

189+
190+
class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier {
191+
public:
192+
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
193+
using namespace NColumnShard;
194+
NIceDb::TNiceDb db(txc.DB);
195+
for (size_t pathId = 100; pathId != 299; ++pathId) {
196+
for (size_t portionId = 1000; portionId != 1199; ++portionId) {
197+
db.Table<Schema::IndexPortions>().Key(pathId, portionId).Update();
198+
}
199+
}
200+
}
201+
};
202+
203+
188204
class TTablesCleaner : public NYDBTest::ILocalDBModifier {
189205
public:
190206
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
@@ -317,7 +333,11 @@ Y_UNIT_TEST_SUITE(Normalizers) {
317333
}
318334

319335
Y_UNIT_TEST(PortionsNormalizer) {
320-
TestNormalizerImpl<TPortinosCleaner>();
336+
TestNormalizerImpl<TPortionsCleaner>();
337+
}
338+
339+
Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
340+
TestNormalizerImpl<TEmptyPortionsCleaner>();
321341
}
322342

323343
Y_UNIT_TEST(EmptyTablesNormalizer) {

0 commit comments

Comments
 (0)