Skip to content

Commit 28fbe05

Browse files
fix kernel usage through chunked arrays (#1987)
1 parent cb022d0 commit 28fbe05

File tree

5 files changed

+54
-6
lines changed

5 files changed

+54
-6
lines changed

ydb/core/formats/arrow/arrow_filter.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,4 +611,12 @@ std::optional<ui32> TColumnFilter::GetFilteredCount() const {
611611
return *FilteredCount;
612612
}
613613

614+
void TColumnFilter::Append(const TColumnFilter& filter) {
615+
bool currentVal = filter.GetStartValue();
616+
for (auto&& i : filter.Filter) {
617+
Add(currentVal, i);
618+
currentVal = !currentVal;
619+
}
620+
}
621+
614622
}

ydb/core/formats/arrow/arrow_filter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TColumnFilter {
5252
FilteredCount.reset();
5353
}
5454
public:
55+
void Append(const TColumnFilter& filter);
5556
void Add(const bool value, const ui32 count = 1);
5657
std::optional<ui32> GetFilteredCount() const;
5758
const std::vector<bool>& BuildSimpleFilter() const;

ydb/core/formats/arrow/arrow_helpers.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,4 +977,35 @@ std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_p
977977
return arrow::RecordBatch::Make(schema, *recordsCount, columns);
978978
}
979979

980+
std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t) {
981+
std::set<ui32> splitPositions;
982+
const ui32 numRows = t->num_rows();
983+
for (auto&& i : t->columns()) {
984+
ui32 pos = 0;
985+
for (auto&& arr : i->chunks()) {
986+
splitPositions.emplace(pos);
987+
pos += arr->length();
988+
}
989+
AFL_VERIFY(pos == t->num_rows());
990+
}
991+
std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
992+
slicedData.resize(splitPositions.size());
993+
std::vector<ui32> positions(splitPositions.begin(), splitPositions.end());
994+
for (auto&& i : t->columns()) {
995+
for (ui32 idx = 0; idx < positions.size(); ++idx) {
996+
auto slice = i->Slice(positions[idx], ((idx + 1 == positions.size()) ? numRows : positions[idx + 1]) - positions[idx]);
997+
AFL_VERIFY(slice->num_chunks() == 1);
998+
slicedData[idx].emplace_back(slice->chunks().front());
999+
}
1000+
}
1001+
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
1002+
ui32 count = 0;
1003+
for (auto&& i : slicedData) {
1004+
result.emplace_back(arrow::RecordBatch::Make(t->schema(), i.front()->length(), i));
1005+
count += result.back()->num_rows();
1006+
}
1007+
AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows());
1008+
return result;
1009+
}
1010+
9801011
}

ydb/core/formats/arrow/arrow_helpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
128128
return column->null_bitmap_data();
129129
}
130130

131+
std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t);
132+
131133
bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shared_ptr<arrow::Array>& rhs);
132134
std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec);
133135

ydb/core/formats/arrow/program.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -871,12 +871,18 @@ std::shared_ptr<NArrow::TColumnFilter> TProgramStep::BuildFilter(const std::shar
871871
if (Filters.empty()) {
872872
return nullptr;
873873
}
874-
auto datumBatch = TDatumBatch::FromTable(t);
875-
876-
NArrow::TStatusValidator::Validate(ApplyAssignes(*datumBatch, NArrow::GetCustomExecContext()));
877-
NArrow::TColumnFilter local = NArrow::TColumnFilter::BuildAllowFilter();
878-
NArrow::TStatusValidator::Validate(MakeCombinedFilter(*datumBatch, local));
879-
return std::make_shared<NArrow::TColumnFilter>(std::move(local));
874+
std::vector<std::shared_ptr<arrow::RecordBatch>> batches = NArrow::SliceToRecordBatches(t);
875+
NArrow::TColumnFilter fullLocal = NArrow::TColumnFilter::BuildAllowFilter();
876+
for (auto&& rb : batches) {
877+
auto datumBatch = TDatumBatch::FromRecordBatch(rb);
878+
NArrow::TStatusValidator::Validate(ApplyAssignes(*datumBatch, NArrow::GetCustomExecContext()));
879+
NArrow::TColumnFilter local = NArrow::TColumnFilter::BuildAllowFilter();
880+
NArrow::TStatusValidator::Validate(MakeCombinedFilter(*datumBatch, local));
881+
AFL_VERIFY(local.Size() == datumBatch->Rows)("local", local.Size())("datum", datumBatch->Rows);
882+
fullLocal.Append(local);
883+
}
884+
AFL_VERIFY(fullLocal.Size() == t->num_rows())("filter", fullLocal.Size())("t", t->num_rows());
885+
return std::make_shared<NArrow::TColumnFilter>(std::move(fullLocal));
880886
}
881887

882888
const std::set<ui32>& TProgramStep::GetFilterOriginalColumnIds() const {

0 commit comments

Comments
 (0)