Skip to content

Delete unsupported special column DELETE_FLAG #7068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/common/portion.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ class TSpecialColumns {
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ class IIndexInfo {
public:
enum class ESpecialColumn: ui32 {
PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX,
TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX
TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX,
DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX
};

static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP;
static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID;
static constexpr const char* SPEC_COL_DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG;
static const TString STORE_INDEX_STATS_TABLE;
static const TString STORE_INDEX_PORTION_STATS_TABLE;
static const TString STORE_INDEX_GRANULE_STATS_TABLE;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ TString TIndexInfo::GetColumnName(ui32 id, bool required) const {
return SPEC_COL_PLAN_STEP;
} else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {
return SPEC_COL_TX_ID;
} else if (ESpecialColumn(id) == ESpecialColumn::DELETE_FLAG) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

здесь я бы не добавлял. может не упасть там где надо

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В каком случае?

return SPEC_COL_DELETE_FLAG;
} else {
const auto ci = Columns.find(id);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace NKikimr::NOlap {
enum class ENormalizerSequentialId : ui32 {
Granules = 1,
Chunks,
DeleteUnsupportedSpecialColumns,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

добавление должно идти последовательно. это SequentialId
на этом построена схема запуска новых

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Но у нас же падение в следующем нормализаторе.

PortionsCleaner,
TablesCleaner,
// PortionsMetadata
Expand Down
123 changes: 123 additions & 0 deletions ydb/core/tx/columnshard/normalizer/special_columns/normalizer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include "normalizer.h"

#include <ydb/core/tx/columnshard/columnshard_private_events.h>

namespace NKikimr::NOlap {

namespace {

constexpr ui32 ColumnIdxToDelete = (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG;


using namespace NColumnShard;

struct TKey {
ui32 Index;
ui64 Granule;
ui32 ColumnIdx;
ui64 PlanStep;
ui64 TxId;
ui64 Portion;
ui32 Chunk;
};

using TKeyBatch = std::vector<TKey>;

std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, size_t maxBatchSize) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const size_t

NIceDb::TNiceDb db(txc.DB);
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
std::vector<TKeyBatch> result;
TKeyBatch currentBatch;
auto rowset = db.Table<Schema::IndexColumns>().Select<
Schema::IndexColumns::Index,
Schema::IndexColumns::Granule,
Schema::IndexColumns::ColumnIdx,
Schema::IndexColumns::PlanStep,
Schema::IndexColumns::TxId,
Schema::IndexColumns::Portion,
Schema::IndexColumns::Chunk
>();
if (!rowset.IsReady()) {
return std::nullopt;
}
while (!rowset.EndOfSet()) {
if (rowset.GetValue<Schema::IndexColumns::ColumnIdx>() == ColumnIdxToDelete) {
auto key = TKey {
.Index = rowset.GetValue<Schema::IndexColumns::Index>(),
.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(),
.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(),
.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(),
.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(),
.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>()
};
currentBatch.emplace_back(std::move(key));
if (currentBatch.size() == maxBatchSize) {
TKeyBatch newBatch;
currentBatch.swap(newBatch);
result.emplace_back(std::move(newBatch));
}
}
if (!rowset.Next()) {
return std::nullopt;
}
}
if (!currentBatch.empty()) {
result.emplace_back(std::move(currentBatch));
}
return result;
}

class TChanges : public INormalizerChanges {
public:
TChanges(TKeyBatch&& keys)
: Keys(keys)
{}
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for(const auto& k: Keys) {
db.Table<Schema::IndexColumns>().Key(
k.Index,
k.Granule,
k.ColumnIdx,
k.PlanStep,
k.TxId,
k.Portion,
k.Chunk
).Delete();

}
return true;
}

ui64 GetSize() const override {
return 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а это точно правильно?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

поправил

}
private:
const TKeyBatch Keys;
};

} //namespace

TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteUnsupportedSpecialColumnsNormalier::DoInit(const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
const size_t MaxBatchSize = 10000;
auto keysToDelete = KeysToDelete(txc, MaxBatchSize);
if (!keysToDelete) {
return TConclusionStatus::Fail("Not ready");
}

std::vector<INormalizerTask::TPtr> result;
for (auto&& batch: *keysToDelete) {
AFL_VERIFY(!batch.empty());
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(
std::make_shared<TChanges>(std::move(std::move(batch)))
));
}
return result;
}

} //namespace NKikimr::NOlap
24 changes: 24 additions & 0 deletions ydb/core/tx/columnshard/normalizer/special_columns/normalizer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>


namespace NKikimr::NOlap {

class TDeleteUnsupportedSpecialColumnsNormalier: public TNormalizationController::INormalizerComponent {
using TThisClass = TDeleteUnsupportedSpecialColumnsNormalier;
static constexpr auto TypeId = ENormalizerSequentialId::DeleteUnsupportedSpecialColumns;
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TThisClass>(TypeId);
public:
TDeleteUnsupportedSpecialColumnsNormalier(const TNormalizationController::TInitContext&)
{}

virtual ENormalizerSequentialId GetType() const override {
return TypeId;
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/normalizer/special_columns/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
GLOBAL normalizer.cpp
)

PEERDIR(
ydb/core/tx/columnshard/normalizer/abstract
)

END()
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ PEERDIR(
ydb/core/tx/columnshard/export
ydb/core/tx/columnshard/resource_subscriber
ydb/core/tx/columnshard/normalizer/granule
ydb/core/tx/columnshard/normalizer/special_columns
ydb/core/tx/columnshard/normalizer/portion
ydb/core/tx/columnshard/normalizer/tables
ydb/core/tx/columnshard/blobs_action/storages_manager
Expand Down
Loading