Skip to content

Commit e60ad02

Browse files
Revert "Stable-24-1 CS patch (ydb-platform#2363)"
This reverts commit 385b62e.
1 parent 3527639 commit e60ad02

File tree

10 files changed

+115
-31
lines changed

10 files changed

+115
-31
lines changed

ydb/core/tx/columnshard/blobs_reader/actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
1515
bool aborted = false;
1616
if (event.Status != NKikimrProto::EReplyStatus::OK) {
1717
WaitingBlobsCount.Sub(Task->GetWaitingCount());
18-
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(0, 1024)))) {
18+
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) {
1919
aborted = true;
2020
}
2121
} else {

ydb/core/tx/columnshard/engines/portions/column_record.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo&
1414
if (context.GetMetaProto().HasRawBytes()) {
1515
RawBytes = context.GetMetaProto().GetRawBytes();
1616
}
17+
if (context.GetMetaProto().HasMinValue()) {
18+
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
19+
Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type());
20+
}
1721
if (context.GetMetaProto().HasMaxValue()) {
1822
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
1923
Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type());
@@ -33,9 +37,9 @@ NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
3337
if (RawBytes) {
3438
meta.SetRawBytes(*RawBytes);
3539
}
36-
if (HasMax()) {
40+
if (HasMinMax()) {
41+
ScalarToConstant(*Min, *meta.MutableMinValue());
3742
ScalarToConstant(*Max, *meta.MutableMaxValue());
38-
ScalarToConstant(*Max, *meta.MutableMinValue());
3943
}
4044
return meta;
4145
}

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,21 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArr
4343
Meta.SetTierName(tierName);
4444
}
4545

46+
std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
47+
std::shared_ptr<arrow::Scalar> result;
48+
for (auto&& i : Records) {
49+
if (i.ColumnId == columnId) {
50+
if (!i.GetMeta().GetMin()) {
51+
return nullptr;
52+
}
53+
if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMin()) > 0) {
54+
result = i.GetMeta().GetMin();
55+
}
56+
}
57+
}
58+
return result;
59+
}
60+
4661
std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
4762
std::shared_ptr<arrow::Scalar> result;
4863
for (auto&& i : Records) {
@@ -115,6 +130,14 @@ ui64 TPortionInfo::GetIndexBytes(const std::set<ui32>& entityIds) const {
115130
return sum;
116131
}
117132

133+
int TPortionInfo::CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
134+
return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
135+
}
136+
137+
int TPortionInfo::CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
138+
return CompareMinByColumnIds(item, info.KeyColumns);
139+
}
140+
118141
TString TPortionInfo::DebugString(const bool withDetails) const {
119142
TStringBuilder sb;
120143
sb << "(portion_id:" << Portion << ";" <<
@@ -156,6 +179,19 @@ void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& r
156179
}
157180
}
158181

182+
bool TPortionInfo::HasPkMinMax() const {
183+
bool result = false;
184+
for (auto&& i : Records) {
185+
if (i.ColumnId == Meta.FirstPkColumn) {
186+
if (!i.GetMeta().HasMinMax()) {
187+
return false;
188+
}
189+
result = true;
190+
}
191+
}
192+
return result;
193+
}
194+
159195
std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const {
160196
std::vector<const TColumnRecord*> result;
161197
for (auto&& c : Records) {

ydb/core/tx/columnshard/engines/portions/portion_info.h

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class TPortionInfo {
2323
TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
2424
TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
2525

26+
bool HasPkMinMax() const;
2627
TPortionMeta Meta;
2728
std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator;
2829
ui64 DeprecatedGranuleId = 0;
@@ -189,7 +190,7 @@ class TPortionInfo {
189190

190191
bool Empty() const { return Records.empty(); }
191192
bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; }
192-
bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
193+
bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
193194
bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && PathId && Portion; }
194195
bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; }
195196
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
@@ -337,6 +338,7 @@ class TPortionInfo {
337338
void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys,
338339
const TString& tierName);
339340

341+
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
340342
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
341343

342344
const NArrow::TReplaceKey& IndexKeyStart() const {
@@ -412,7 +414,48 @@ class TPortionInfo {
412414
return GetRawBytes();
413415
}
414416

417+
private:
418+
class TMinGetter {
419+
public:
420+
static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
421+
return portionInfo.MinValue(columnId);
422+
}
423+
};
424+
425+
class TMaxGetter {
426+
public:
427+
static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
428+
return portionInfo.MaxValue(columnId);
429+
}
430+
};
431+
432+
template <class TSelfGetter, class TItemGetter = TSelfGetter>
433+
int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
434+
for (auto&& i : columnIds) {
435+
std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i);
436+
std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i);
437+
if (!!valueSelf && !!valueItem) {
438+
const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem);
439+
if (cmpResult) {
440+
return cmpResult;
441+
}
442+
} else if (!!valueSelf) {
443+
return 1;
444+
} else if (!!valueItem) {
445+
return -1;
446+
}
447+
}
448+
return 0;
449+
}
415450
public:
451+
int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const;
452+
453+
int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const;
454+
455+
int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
456+
return CompareByColumnIdsImpl<TMinGetter>(item, columnIds);
457+
}
458+
416459
class TAssembleBlobInfo {
417460
private:
418461
ui32 NullRowsCount = 0;

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,6 @@ namespace NKikimr::NOlap {
4949
class TNormalizationContext {
5050
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
5151
YDB_ACCESSOR_DEF(TActorId, ColumnshardActor);
52-
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
53-
public:
54-
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
55-
ResourcesGuard = rg;
56-
}
5752
};
5853

5954
class TNormalizationController;
@@ -122,7 +117,7 @@ namespace NKikimr::NOlap {
122117
TString DebugString() const {
123118
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
124119
<< ";current_normalizer_idx=" << CurrentNormalizerIndex
125-
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size() ? Normalizers[CurrentNormalizerIndex]->GetName() : "");
120+
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size()) ? Normalizers[CurrentNormalizerIndex]->GetName() : "";
126121
}
127122

128123
const INormalizerComponent::TPtr& GetNormalizer() const;

ydb/core/tx/columnshard/normalizer/portion/chunks.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
6868
}
6969

7070
public:
71-
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>)
71+
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
7272
: Blobs(std::move(blobs))
7373
, Chunks(std::move(chunks))
7474
, NormContext(nCtx)

ydb/core/tx/columnshard/normalizer/portion/min_max.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
1616
private:
1717
THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
1818
TDataContainer Portions;
19-
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
19+
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
2020
TNormalizationContext NormContext;
2121
protected:
2222
virtual bool DoExecute() override {
23-
Y_ABORT_UNLESS(!Schemas->empty());
24-
auto pkColumnIds = Schemas->begin()->second->GetPkColumnsIds();
23+
Y_ABORT_UNLESS(!Schemas.empty());
24+
auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
2525
pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());
2626

2727
for (auto&& portionInfo : Portions) {
28-
auto blobSchema = Schemas->FindPtr(portionInfo->GetPortionId());
28+
auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
2929
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
3030
for (auto&& i : portionInfo->Records) {
3131
auto blobIt = Blobs.find(i.BlobRange);
@@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
4747
}
4848

4949
public:
50-
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
50+
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
5151
: Blobs(std::move(blobs))
5252
, Portions(std::move(portions))
53-
, Schemas(schemas)
53+
, Schemas(std::move(schemas))
5454
, NormContext(nCtx)
5555
{}
5656

@@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
135135
}
136136

137137
THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
138-
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
138+
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
139139
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
140140

141141
{
@@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
161161
auto portionMeta = loadContext.GetPortionMeta();
162162
if (it == portions.end()) {
163163
Y_ABORT_UNLESS(portion.Records.empty());
164-
(*schemas)[portion.GetPortionId()] = currentSchema;
164+
schemas[portion.GetPortionId()] = currentSchema;
165165
auto portionNew = std::make_shared<TPortionInfo>(portion);
166166
portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
167167
it = portions.emplace(portion.GetPortion(), portionNew).first;
@@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
202202
}
203203
++brokenPortioncCount;
204204
package.emplace_back(portion.second);
205-
if (package.size() == 1000) {
205+
if (package.size() == 100) {
206206
std::vector<std::shared_ptr<TPortionInfo>> local;
207207
local.swap(package);
208208
tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));

ydb/core/tx/columnshard/normalizer/portion/normalizer.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
1818
private:
1919
using TBase = NOlap::NBlobOperations::NRead::ITask;
2020
typename TConveyorTask::TDataContainer Data;
21-
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
21+
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
2222
TNormalizationContext NormContext;
2323

2424
public:
25-
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
25+
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
2626
: TBase(actions, "CS::NORMALIZER")
2727
, Data(std::move(data))
2828
, Schemas(std::move(schemas))
@@ -32,8 +32,8 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
3232

3333
protected:
3434
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
35-
NormContext.SetResourcesGuard(resourcesGuard);
36-
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), Schemas);
35+
Y_UNUSED(resourcesGuard);
36+
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
3737
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
3838
}
3939

@@ -49,13 +49,13 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
4949
template <class TConveyorTask>
5050
class TPortionsNormalizerTask : public INormalizerTask {
5151
typename TConveyorTask::TDataContainer Package;
52-
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
52+
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
5353
public:
5454
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
5555
: Package(std::move(package))
5656
{}
5757

58-
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
58+
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
5959
: Package(std::move(package))
6060
, Schemas(schemas)
6161
{}
@@ -71,7 +71,7 @@ class TPortionsNormalizerTask : public INormalizerTask {
7171
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
7272
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
7373
nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
74-
std::make_shared<TReadPortionsTask<TConveyorTask>>(nCtx, actions, std::move(Package), Schemas), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
74+
std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
7575
}
7676
};
7777
}

ydb/core/tx/columnshard/splitter/chunk_meta.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,24 @@
44

55
namespace NKikimr::NOlap {
66

7-
TSimpleChunkMeta::TSimpleChunkMeta(const std::shared_ptr<arrow::Array>& column, const bool needMax, const bool isSortedColumn) {
7+
TSimpleChunkMeta::TSimpleChunkMeta(const std::shared_ptr<arrow::Array>& column, const bool needMinMax, const bool isSortedColumn) {
88
Y_ABORT_UNLESS(column);
99
Y_ABORT_UNLESS(column->length());
1010
NumRows = column->length();
1111
RawBytes = NArrow::GetArrayDataSize(column);
1212

13-
if (needMax) {
13+
if (needMinMax) {
1414
std::pair<i32, i32> minMaxPos = {0, (column->length() - 1)};
1515
if (!isSortedColumn) {
1616
minMaxPos = NArrow::FindMinMaxPosition(column);
1717
Y_ABORT_UNLESS(minMaxPos.first >= 0);
1818
Y_ABORT_UNLESS(minMaxPos.second >= 0);
1919
}
2020

21+
Min = NArrow::GetScalar(column, minMaxPos.first);
2122
Max = NArrow::GetScalar(column, minMaxPos.second);
2223

24+
Y_ABORT_UNLESS(Min);
2325
Y_ABORT_UNLESS(Max);
2426
}
2527
}

ydb/core/tx/columnshard/splitter/chunk_meta.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace NKikimr::NOlap {
1212

1313
class TSimpleChunkMeta {
1414
protected:
15+
std::shared_ptr<arrow::Scalar> Min;
1516
std::shared_ptr<arrow::Scalar> Max;
1617
std::optional<ui32> NumRows;
1718
std::optional<ui32> RawBytes;
@@ -24,6 +25,9 @@ class TSimpleChunkMeta {
2425
return sizeof(ui32) + sizeof(ui32) + 8 * 3 * 2;
2526
}
2627

28+
std::shared_ptr<arrow::Scalar> GetMin() const {
29+
return Min;
30+
}
2731
std::shared_ptr<arrow::Scalar> GetMax() const {
2832
return Max;
2933
}
@@ -45,8 +49,8 @@ class TSimpleChunkMeta {
4549
return *RawBytes;
4650
}
4751

48-
bool HasMax() const noexcept {
49-
return Max.get();
52+
bool HasMinMax() const noexcept {
53+
return Min.get() && Max.get();
5054
}
5155

5256
};

0 commit comments

Comments
 (0)