Skip to content

Commit 543a2d8

Browse files
Merge 1ae2d56 into 4a621d8
2 parents 4a621d8 + 1ae2d56 commit 543a2d8

File tree

117 files changed

+2465
-1128
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+2465
-1128
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#include "constructor.h"
2+
#include <ydb/core/formats/arrow/accessor/plain/constructor.h>
3+
4+
namespace NKikimr::NArrow::NAccessor {
5+
6+
TConstructorContainer TConstructorContainer::GetDefaultConstructor() {
7+
static std::shared_ptr<IConstructor> result = std::make_shared<NPlain::TConstructor>();
8+
return result;
9+
}
10+
11+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#pragma once
2+
#include <ydb/core/formats/arrow/accessor/common/chunk_data.h>
3+
#include <ydb/core/formats/arrow/common/accessor.h>
4+
#include <ydb/core/formats/arrow/protos/accessor.pb.h>
5+
6+
#include <ydb/services/bg_tasks/abstract/interface.h>
7+
8+
#include <library/cpp/object_factory/object_factory.h>
9+
10+
namespace NKikimr::NArrow::NAccessor {
11+
12+
class IConstructor {
13+
public:
14+
using TFactory = NObjectFactory::TObjectFactory<IConstructor, TString>;
15+
using TProto = NKikimrArrowAccessorProto::TConstructor;
16+
17+
private:
18+
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
19+
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const = 0;
20+
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const = 0;
21+
virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) = 0;
22+
virtual std::shared_ptr<arrow::Schema> DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const = 0;
23+
virtual TString DoDebugString() const {
24+
return "";
25+
}
26+
27+
public:
28+
virtual ~IConstructor() = default;
29+
30+
TString DebugString() const {
31+
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
32+
}
33+
34+
TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Construct(
35+
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const {
36+
return DoConstruct(originalData, externalInfo);
37+
}
38+
39+
bool DeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) {
40+
return DoDeserializeFromProto(proto);
41+
}
42+
43+
NKikimrArrowAccessorProto::TConstructor SerializeToProto() const {
44+
return DoSerializeToProto();
45+
}
46+
47+
void SerializeToProto(NKikimrArrowAccessorProto::TConstructor& proto) const {
48+
proto = DoSerializeToProto();
49+
}
50+
51+
std::shared_ptr<arrow::Schema> GetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const {
52+
AFL_VERIFY(resultColumn);
53+
return DoGetExpectedSchema(resultColumn);
54+
}
55+
56+
virtual TString GetClassName() const = 0;
57+
};
58+
59+
class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> {
60+
private:
61+
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>;
62+
63+
public:
64+
using TBase::TBase;
65+
66+
static TConstructorContainer GetDefaultConstructor();
67+
};
68+
69+
} // namespace NKikimr::NArrow::NAccessor
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#include "request.h"
2+
3+
namespace NKikimr::NArrow::NAccessor {
4+
5+
TConclusionStatus TRequestedConstructorContainer::DeserializeFromRequest(NYql::TFeaturesExtractor& features) {
6+
const std::optional<TString> className = features.Extract("DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME");
7+
if (!className) {
8+
return TConclusionStatus::Success();
9+
}
10+
if (!TBase::Initialize(*className)) {
11+
return TConclusionStatus::Fail("don't know anything about class_name=" + *className);
12+
}
13+
return TBase::GetObjectPtr()->DeserializeFromRequest(features);
14+
}
15+
16+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
#include "constructor.h"
3+
4+
#include <ydb/core/formats/arrow/protos/accessor.pb.h>
5+
6+
#include <ydb/services/bg_tasks/abstract/interface.h>
7+
#include <ydb/services/metadata/abstract/request_features.h>
8+
9+
#include <library/cpp/object_factory/object_factory.h>
10+
11+
namespace NKikimr::NArrow::NAccessor {
12+
13+
class IRequestedConstructor {
14+
public:
15+
using TFactory = NObjectFactory::TObjectFactory<IRequestedConstructor, TString>;
16+
using TProto = NKikimrArrowAccessorProto::TRequestedConstructor;
17+
private:
18+
virtual TConclusion<NArrow::NAccessor::TConstructorContainer> DoBuildConstructor() const = 0;
19+
virtual NKikimrArrowAccessorProto::TRequestedConstructor DoSerializeToProto() const = 0;
20+
virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TRequestedConstructor& proto) = 0;
21+
virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) = 0;
22+
23+
public:
24+
virtual ~IRequestedConstructor() = default;
25+
26+
NKikimrArrowAccessorProto::TRequestedConstructor SerializeToProto() const {
27+
return DoSerializeToProto();
28+
}
29+
30+
void SerializeToProto(NKikimrArrowAccessorProto::TRequestedConstructor& proto) const {
31+
proto = DoSerializeToProto();
32+
}
33+
34+
bool DeserializeFromProto(const NKikimrArrowAccessorProto::TRequestedConstructor& proto) {
35+
return DoDeserializeFromProto(proto);
36+
}
37+
38+
TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features) {
39+
return DoDeserializeFromRequest(features);
40+
}
41+
42+
TConclusion<TConstructorContainer> BuildConstructor() const {
43+
return DoBuildConstructor();
44+
}
45+
46+
virtual TString GetClassName() const = 0;
47+
};
48+
49+
class TRequestedConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IRequestedConstructor> {
50+
private:
51+
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IRequestedConstructor>;
52+
53+
public:
54+
using TBase::TBase;
55+
TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features);
56+
};
57+
58+
} // namespace NKikimr::NArrow::NAccessor
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
ydb/core/formats/arrow/protos
5+
ydb/core/formats/arrow/accessor/common
6+
contrib/libs/apache/arrow
7+
ydb/library/conclusion
8+
ydb/services/metadata/abstract
9+
)
10+
11+
SRCS(
12+
constructor.cpp
13+
request.cpp
14+
)
15+
16+
END()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "chunk_data.h"
2+
3+
namespace NKikimr::NArrow::NAccessor {
4+
5+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
#include <ydb/library/accessor/accessor.h>
3+
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
4+
#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
5+
6+
namespace NKikimr::NArrow::NAccessor {
7+
8+
class TChunkConstructionData {
9+
private:
10+
YDB_READONLY(ui32, RecordsCount, 0);
11+
YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, DefaultValue);
12+
YDB_READONLY_DEF(std::shared_ptr<arrow::DataType>, ColumnType);
13+
14+
public:
15+
TChunkConstructionData(
16+
const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& columnType)
17+
: RecordsCount(recordsCount)
18+
, DefaultValue(defaultValue)
19+
, ColumnType(columnType) {
20+
}
21+
};
22+
23+
} // namespace NKikimr::NArrow::NAccessor
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
contrib/libs/apache/arrow
5+
)
6+
7+
SRCS(
8+
chunk_data.cpp
9+
)
10+
11+
END()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include "accessor.h"
2+
namespace NKikimr::NArrow::NAccessor {
3+
4+
namespace {
5+
class TCompositeChunkAccessor {
6+
private:
7+
const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& Chunks;
8+
std::optional<IChunkedArray::TCurrentChunkAddress>* ResultChunkAddress = nullptr;
9+
std::optional<IChunkedArray::TCurrentArrayAddress>* ResultArrayAddress = nullptr;
10+
11+
public:
12+
TCompositeChunkAccessor(
13+
const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& chunks, std::optional<IChunkedArray::TCurrentChunkAddress>& result)
14+
: Chunks(chunks)
15+
, ResultChunkAddress(&result) {
16+
}
17+
TCompositeChunkAccessor(
18+
const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& chunks, std::optional<IChunkedArray::TCurrentArrayAddress>& result)
19+
: Chunks(chunks)
20+
, ResultArrayAddress(&result) {
21+
}
22+
ui64 GetChunksCount() const {
23+
return Chunks.size();
24+
}
25+
ui64 GetChunkLength(const ui32 idx) const {
26+
return Chunks[idx]->GetRecordsCount();
27+
}
28+
void OnArray(const ui32 chunkIdx, const ui32 startPosition, const ui32 internalPosition) const {
29+
if (ResultChunkAddress) {
30+
*ResultChunkAddress = NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress(
31+
Chunks[chunkIdx]->GetChunk({}, internalPosition).GetArray(), startPosition, chunkIdx);
32+
}
33+
if (ResultArrayAddress) {
34+
*ResultArrayAddress = NArrow::NAccessor::IChunkedArray::TCurrentArrayAddress(Chunks[chunkIdx], startPosition, chunkIdx);
35+
}
36+
}
37+
};
38+
} // namespace
39+
40+
NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress TCompositeChunkedArray::DoGetChunk(
41+
const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
42+
std::optional<IChunkedArray::TCurrentChunkAddress> result;
43+
TCompositeChunkAccessor accessor(Chunks, result);
44+
SelectChunk(chunkCurrent, position, accessor);
45+
AFL_VERIFY(result);
46+
return *result;
47+
}
48+
49+
NArrow::NAccessor::IChunkedArray::TCurrentArrayAddress TCompositeChunkedArray::DoGetArray(
50+
const std::optional<TCurrentArrayAddress>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& /*selfPtr*/) const {
51+
std::optional<IChunkedArray::TCurrentArrayAddress> result;
52+
TCompositeChunkAccessor accessor(Chunks, result);
53+
SelectChunk(chunkCurrent, position, accessor);
54+
AFL_VERIFY(result);
55+
return *result;
56+
}
57+
58+
} // namespace NKikimr::NArrow::NAccessor
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#pragma once
2+
#include <ydb/core/formats/arrow/common/accessor.h>
3+
#include <ydb/library/accessor/accessor.h>
4+
5+
namespace NKikimr::NArrow::NAccessor {
6+
7+
class TCompositeChunkedArray: public NArrow::NAccessor::IChunkedArray {
8+
private:
9+
using TBase = NArrow::NAccessor::IChunkedArray;
10+
11+
private:
12+
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Chunks;
13+
14+
protected:
15+
virtual TCurrentArrayAddress DoGetArray(const std::optional<TCurrentArrayAddress>& chunkCurrent, const ui64 /*position*/,
16+
const std::shared_ptr<IChunkedArray>& selfPtr) const override;
17+
18+
virtual std::vector<NArrow::NAccessor::TChunkedArraySerialized> DoSplitBySizes(
19+
const TColumnSaver& /*saver*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) override {
20+
AFL_VERIFY(false);
21+
return {};
22+
}
23+
24+
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
25+
AFL_VERIFY(false)("problem", "cannot use method");
26+
return nullptr;
27+
}
28+
virtual std::optional<ui64> DoGetRawSize() const override {
29+
return {};
30+
}
31+
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override {
32+
AFL_VERIFY(false);
33+
return nullptr;
34+
}
35+
virtual TCurrentChunkAddress DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const override;
36+
virtual std::shared_ptr<arrow::ChunkedArray> DoGetChunkedArray() const override {
37+
AFL_VERIFY(false);
38+
return nullptr;
39+
}
40+
41+
TCompositeChunkedArray(std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>&& chunks, const ui32 recordsCount,
42+
const std::shared_ptr<arrow::DataType>& type)
43+
: TBase(recordsCount, NArrow::NAccessor::IChunkedArray::EType::SerializedChunkedArray, type)
44+
, Chunks(std::move(chunks)) {
45+
}
46+
47+
public:
48+
class TBuilder {
49+
private:
50+
ui32 RecordsCount = 0;
51+
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Chunks;
52+
const std::shared_ptr<arrow::DataType> Type;
53+
54+
public:
55+
TBuilder(const std::shared_ptr<arrow::DataType>& type)
56+
: Type(type) {
57+
AFL_VERIFY(Type);
58+
}
59+
60+
void AddChunk(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& arr) {
61+
AFL_VERIFY(arr->GetDataType()->id() == Type->id())("incoming", arr->GetDataType()->ToString())("main", Type->ToString());
62+
Chunks.emplace_back(arr);
63+
RecordsCount += arr->GetRecordsCount();
64+
}
65+
66+
std::shared_ptr<TCompositeChunkedArray> Finish() {
67+
return std::shared_ptr<TCompositeChunkedArray>(new TCompositeChunkedArray(std::move(Chunks), RecordsCount, Type));
68+
}
69+
};
70+
};
71+
72+
} // namespace NKikimr::NArrow::NAccessor
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
contrib/libs/apache/arrow
5+
ydb/core/formats/arrow/common
6+
)
7+
8+
SRCS(
9+
accessor.cpp
10+
)
11+
12+
END()

0 commit comments

Comments
 (0)