Skip to content

Commit 1acf95f

Browse files
add validations for inconsistency states for scanners (#3526)
1 parent 3c19574 commit 1acf95f

File tree

11 files changed

+170
-219
lines changed

11 files changed

+170
-219
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#include "batch_iterator.h"
2+
3+
namespace NKikimr::NArrow::NMerger {
4+
5+
NJson::TJsonValue TBatchIterator::DebugJson() const {
6+
NJson::TJsonValue result;
7+
result["is_cp"] = IsControlPoint();
8+
result["key"] = KeyColumns.DebugJson();
9+
return result;
10+
}
11+
12+
NKikimr::NArrow::NMerger::TSortableBatchPosition::TFoundPosition TBatchIterator::SkipToLower(const TSortableBatchPosition& pos) {
13+
const ui32 posStart = KeyColumns.GetPosition();
14+
auto result = KeyColumns.SkipToLower(pos);
15+
const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart);
16+
AFL_VERIFY(delta >= 0);
17+
AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition())
18+
("size", VersionColumns.GetRecordsCount())("key_size", KeyColumns.GetRecordsCount());
19+
if (FilterIterator && delta) {
20+
AFL_VERIFY(FilterIterator->Next(delta));
21+
}
22+
return result;
23+
}
24+
25+
bool TBatchIterator::Next() {
26+
const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff);
27+
if (FilterIterator) {
28+
Y_ABORT_UNLESS(result == FilterIterator->Next(1));
29+
}
30+
return result;
31+
}
32+
33+
bool TBatchIterator::operator<(const TBatchIterator& item) const {
34+
const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns);
35+
if (result == std::partial_ordering::equivalent) {
36+
if (IsControlPoint() && item.IsControlPoint()) {
37+
return false;
38+
} else if (IsControlPoint()) {
39+
return false;
40+
} else if (item.IsControlPoint()) {
41+
return true;
42+
}
43+
//don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns)
44+
return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less;
45+
} else {
46+
//inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns)
47+
return result == std::partial_ordering::greater;
48+
}
49+
}
50+
51+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#pragma once
2+
#include "position.h"
3+
#include <ydb/core/formats/arrow/arrow_filter.h>
4+
5+
namespace NKikimr::NArrow::NMerger {
6+
7+
class TBatchIterator {
8+
private:
9+
bool ControlPointFlag;
10+
TSortableBatchPosition KeyColumns;
11+
TSortableBatchPosition VersionColumns;
12+
i64 RecordsCount;
13+
int ReverseSortKff;
14+
15+
std::shared_ptr<NArrow::TColumnFilter> Filter;
16+
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
17+
18+
i32 GetFirstPosition() const {
19+
if (ReverseSortKff > 0) {
20+
return 0;
21+
} else {
22+
return RecordsCount - 1;
23+
}
24+
}
25+
26+
public:
27+
NJson::TJsonValue DebugJson() const;
28+
29+
const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const {
30+
return Filter;
31+
}
32+
33+
bool IsControlPoint() const {
34+
return ControlPointFlag;
35+
}
36+
37+
const TSortableBatchPosition& GetKeyColumns() const {
38+
return KeyColumns;
39+
}
40+
41+
const TSortableBatchPosition& GetVersionColumns() const {
42+
return VersionColumns;
43+
}
44+
45+
TBatchIterator(const TSortableBatchPosition& keyColumns)
46+
: ControlPointFlag(true)
47+
, KeyColumns(keyColumns) {
48+
49+
}
50+
51+
template <class TDataContainer>
52+
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
53+
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
54+
: ControlPointFlag(false)
55+
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
56+
, VersionColumns(batch, 0, versionColumnNames, {}, false)
57+
, RecordsCount(batch->num_rows())
58+
, ReverseSortKff(reverseSort ? -1 : 1)
59+
, Filter(filter) {
60+
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
61+
Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
62+
if (Filter) {
63+
FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
64+
}
65+
}
66+
67+
bool CheckNextBatch(const TBatchIterator& nextIterator) {
68+
return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
69+
}
70+
71+
bool IsReverse() const {
72+
return ReverseSortKff < 0;
73+
}
74+
75+
bool IsDeleted() const {
76+
if (!FilterIterator) {
77+
return false;
78+
}
79+
return !FilterIterator->GetCurrentAcceptance();
80+
}
81+
82+
TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos);
83+
84+
bool Next();
85+
86+
bool operator<(const TBatchIterator& item) const;
87+
};
88+
89+
}

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -12,60 +12,6 @@ void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition
1212
SortHeap.Push(TBatchIterator(*point));
1313
}
1414

15-
void TMergePartialStream::AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
16-
if (!batch || !batch->num_rows()) {
17-
return;
18-
}
19-
Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
20-
AddNewToHeap(batch, filter);
21-
}
22-
23-
void TMergePartialStream::AddSource(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
24-
if (!batch || !batch->num_rows()) {
25-
return;
26-
}
27-
// Y_DEBUG_ABORT_UNLESS(batch->IsSorted(SortSchema));
28-
AddNewToHeap(batch, filter);
29-
}
30-
31-
void TMergePartialStream::AddSource(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
32-
if (!batch || !batch->num_rows()) {
33-
return;
34-
}
35-
// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
36-
AddNewToHeap(batch, filter);
37-
}
38-
39-
void TMergePartialStream::AddNewToHeap(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
40-
if (!filter || filter->IsTotalAllowFilter()) {
41-
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
42-
} else if (filter->IsTotalDenyFilter()) {
43-
return;
44-
} else {
45-
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
46-
}
47-
}
48-
49-
void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
50-
if (!filter || filter->IsTotalAllowFilter()) {
51-
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
52-
} else if (filter->IsTotalDenyFilter()) {
53-
return;
54-
} else {
55-
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
56-
}
57-
}
58-
59-
void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
60-
if (!filter || filter->IsTotalAllowFilter()) {
61-
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
62-
} else if (filter->IsTotalDenyFilter()) {
63-
return;
64-
} else {
65-
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
66-
}
67-
}
68-
6915
void TMergePartialStream::RemoveControlPoint() {
7016
Y_ABORT_UNLESS(ControlPoints == 1);
7117
Y_ABORT_UNLESS(ControlPointEnriched());
@@ -252,11 +198,4 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
252198
return result;
253199
}
254200

255-
NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
256-
NJson::TJsonValue result;
257-
result["is_cp"] = IsControlPoint();
258-
result["key"] = KeyColumns.DebugJson();
259-
return result;
260-
}
261-
262201
}

ydb/core/formats/arrow/reader/merger.h

Lines changed: 13 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "position.h"
33
#include "heap.h"
44
#include "result_builder.h"
5+
#include "batch_iterator.h"
56

67
#include <ydb/core/formats/arrow/arrow_filter.h>
78

@@ -20,135 +21,6 @@ class TMergePartialStream {
2021
const std::vector<std::string> VersionColumnNames;
2122
ui32 ControlPoints = 0;
2223

23-
class TBatchIterator {
24-
private:
25-
bool ControlPointFlag;
26-
TSortableBatchPosition KeyColumns;
27-
TSortableBatchPosition VersionColumns;
28-
i64 RecordsCount;
29-
int ReverseSortKff;
30-
31-
std::shared_ptr<NArrow::TColumnFilter> Filter;
32-
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
33-
34-
i32 GetFirstPosition() const {
35-
if (ReverseSortKff > 0) {
36-
return 0;
37-
} else {
38-
return RecordsCount - 1;
39-
}
40-
}
41-
42-
public:
43-
NJson::TJsonValue DebugJson() const;
44-
45-
const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const {
46-
return Filter;
47-
}
48-
49-
bool IsControlPoint() const {
50-
return ControlPointFlag;
51-
}
52-
53-
const TSortableBatchPosition& GetKeyColumns() const {
54-
return KeyColumns;
55-
}
56-
57-
const TSortableBatchPosition& GetVersionColumns() const {
58-
return VersionColumns;
59-
}
60-
61-
TBatchIterator(const TSortableBatchPosition& keyColumns)
62-
: ControlPointFlag(true)
63-
, KeyColumns(keyColumns)
64-
{
65-
66-
}
67-
68-
template <class TDataContainer>
69-
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
70-
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
71-
: ControlPointFlag(false)
72-
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
73-
, VersionColumns(batch, 0, versionColumnNames, {}, false)
74-
, RecordsCount(batch->num_rows())
75-
, ReverseSortKff(reverseSort ? -1 : 1)
76-
, Filter(filter)
77-
{
78-
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
79-
Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
80-
if (Filter) {
81-
FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
82-
}
83-
}
84-
85-
bool CheckNextBatch(const TBatchIterator& nextIterator) {
86-
return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
87-
}
88-
89-
bool IsReverse() const {
90-
return ReverseSortKff < 0;
91-
}
92-
93-
bool IsDeleted() const {
94-
if (!FilterIterator) {
95-
return false;
96-
}
97-
return !FilterIterator->GetCurrentAcceptance();
98-
}
99-
100-
TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos) {
101-
const ui32 posStart = KeyColumns.GetPosition();
102-
auto result = KeyColumns.SkipToLower(pos);
103-
const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart);
104-
AFL_VERIFY(delta >= 0);
105-
AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition())("size", VersionColumns.GetRecordsCount());
106-
if (FilterIterator && delta) {
107-
AFL_VERIFY(FilterIterator->Next(delta));
108-
}
109-
return result;
110-
}
111-
112-
bool Next() {
113-
const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff);
114-
if (FilterIterator) {
115-
Y_ABORT_UNLESS(result == FilterIterator->Next(1));
116-
}
117-
return result;
118-
}
119-
120-
bool operator<(const TBatchIterator& item) const {
121-
const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns);
122-
if (result == std::partial_ordering::equivalent) {
123-
if (IsControlPoint() && item.IsControlPoint()) {
124-
return false;
125-
} else if (IsControlPoint()) {
126-
return false;
127-
} else if (item.IsControlPoint()) {
128-
return true;
129-
}
130-
//don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns)
131-
return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less;
132-
} else {
133-
//inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns)
134-
return result == std::partial_ordering::greater;
135-
}
136-
}
137-
};
138-
139-
class TIteratorData {
140-
private:
141-
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
142-
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
143-
public:
144-
TIteratorData(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter)
145-
: Batch(batch)
146-
, Filter(filter)
147-
{
148-
149-
}
150-
};
151-
15224
TSortingHeap<TBatchIterator> SortHeap;
15325

15426
NJson::TJsonValue DebugJson() const {
@@ -164,9 +36,6 @@ class TMergePartialStream {
16436

16537
std::optional<TSortableBatchPosition> DrainCurrentPosition();
16638

167-
void AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
168-
void AddNewToHeap(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
169-
void AddNewToHeap(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
17039
void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
17140
public:
17241
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
@@ -233,9 +102,18 @@ class TMergePartialStream {
233102
return SortHeap.Size() && SortHeap.Current().IsControlPoint();
234103
}
235104

236-
void AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
237-
void AddSource(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
238-
void AddSource(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
105+
template <class TDataContainer>
106+
void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter) {
107+
if (!batch || !batch->num_rows()) {
108+
return;
109+
}
110+
if (filter && filter->IsTotalDenyFilter()) {
111+
return;
112+
}
113+
// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
114+
auto filterImpl = (!filter || filter->IsTotalAllowFilter()) ? nullptr : filter;
115+
SortHeap.Push(TBatchIterator(batch, filterImpl, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
116+
}
239117

240118
bool IsEmpty() const {
241119
return !SortHeap.Size();

ydb/core/formats/arrow/reader/position.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ class TSortableBatchPosition {
284284
Y_ABORT_UNLESS(batch);
285285
Y_ABORT_UNLESS(batch->num_rows());
286286
RecordsCount = batch->num_rows();
287+
AFL_VERIFY(Position < RecordsCount)("position", Position)("count", RecordsCount);
287288

288289
if (dataColumns.size()) {
289290
Data = std::make_shared<TSortableScanData>(batch, dataColumns);

0 commit comments

Comments
 (0)