Skip to content

Stable-24-1 patch for cs #2142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
++itExt;
}
}
Y_ABORT_UNLESS(itSelf == Filter.end() && itExt == extFilter.Filter.cend());
AFL_VERIFY(itSelf == Filter.end() && itExt == extFilter.Filter.cend());
TColumnFilter result = TColumnFilter::BuildAllowFilter();
std::swap(resultFilter, result.Filter);
std::swap(curCurrent, result.LastValue);
Expand Down Expand Up @@ -611,4 +611,12 @@ std::optional<ui32> TColumnFilter::GetFilteredCount() const {
return *FilteredCount;
}

void TColumnFilter::Append(const TColumnFilter& filter) {
bool currentVal = filter.GetStartValue();
for (auto&& i : filter.Filter) {
Add(currentVal, i);
currentVal = !currentVal;
}
}

}
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TColumnFilter {
FilteredCount.reset();
}
public:
void Append(const TColumnFilter& filter);
void Add(const bool value, const ui32 count = 1);
std::optional<ui32> GetFilteredCount() const;
const std::vector<bool>& BuildSimpleFilter() const;
Expand Down
37 changes: 34 additions & 3 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "common/validation.h"
#include "merging_sorted_input_stream.h"
#include "permutations.h"
#include "serializer/batch_only.h"
#include "serializer/native.h"
#include "serializer/abstract.h"
#include "serializer/stream.h"
#include "simple_arrays_cache.h"
Expand Down Expand Up @@ -106,7 +106,7 @@ std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) {
}

TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) {
return NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
return NSerialization::TNativeSerializer(options).SerializePayload(batch);
}

TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) {
Expand All @@ -117,7 +117,7 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b

std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema)
{
auto result = NSerialization::TBatchPayloadDeserializer(schema).Deserialize(blob);
auto result = NSerialization::TNativeSerializer().Deserialize(blob, schema);
if (result.ok()) {
return *result;
} else {
Expand Down Expand Up @@ -977,4 +977,35 @@ std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_p
return arrow::RecordBatch::Make(schema, *recordsCount, columns);
}

std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t) {
std::set<ui32> splitPositions;
const ui32 numRows = t->num_rows();
for (auto&& i : t->columns()) {
ui32 pos = 0;
for (auto&& arr : i->chunks()) {
splitPositions.emplace(pos);
pos += arr->length();
}
AFL_VERIFY(pos == t->num_rows());
}
std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
slicedData.resize(splitPositions.size());
std::vector<ui32> positions(splitPositions.begin(), splitPositions.end());
for (auto&& i : t->columns()) {
for (ui32 idx = 0; idx < positions.size(); ++idx) {
auto slice = i->Slice(positions[idx], ((idx + 1 == positions.size()) ? numRows : positions[idx + 1]) - positions[idx]);
AFL_VERIFY(slice->num_chunks() == 1);
slicedData[idx].emplace_back(slice->chunks().front());
}
}
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
ui32 count = 0;
for (auto&& i : slicedData) {
result.emplace_back(arrow::RecordBatch::Make(t->schema(), i.front()->length(), i));
count += result.back()->num_rows();
}
AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows());
return result;
}

}
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
return column->null_bitmap_data();
}

std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t);

bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shared_ptr<arrow::Array>& rhs);
std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec);

Expand Down
23 changes: 0 additions & 23 deletions ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt

This file was deleted.

23 changes: 0 additions & 23 deletions ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt

This file was deleted.

24 changes: 0 additions & 24 deletions ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt

This file was deleted.

24 changes: 0 additions & 24 deletions ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt

This file was deleted.

19 changes: 0 additions & 19 deletions ydb/core/formats/arrow/compression/CMakeLists.txt

This file was deleted.

This file was deleted.

77 changes: 0 additions & 77 deletions ydb/core/formats/arrow/compression/diff.cpp

This file was deleted.

34 changes: 0 additions & 34 deletions ydb/core/formats/arrow/compression/diff.h

This file was deleted.

Loading