@@ -14,45 +14,23 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(con
14
14
15
15
TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromString (
16
16
const TString& originalData, const TChunkConstructionData& externalInfo) const {
17
- TStringInput si (originalData);
18
- ui32 protoSize;
19
- si.Read (&protoSize, sizeof (protoSize));
20
- ui64 currentIndex = sizeof (protoSize);
21
- NKikimrArrowAccessorProto::TSubColumnsAccessor proto;
22
- if (!proto.ParseFromArray (originalData.data () + currentIndex, protoSize)) {
23
- return TConclusionStatus::Fail (" cannot parse proto" );
17
+ auto headerConclusion = TSubColumnsHeader::ReadHeader (originalData, externalInfo);
18
+ if (headerConclusion.IsFail ()) {
19
+ return headerConclusion;
24
20
}
25
- currentIndex += protoSize;
26
- TDictStats columnStats = [&]() {
27
- if (proto.GetColumnStatsSize ()) {
28
- std::shared_ptr<arrow::RecordBatch> rbColumnStats = TStatusValidator::GetValid (externalInfo.GetDefaultSerializer ()->Deserialize (
29
- TString (originalData.data () + currentIndex, proto.GetColumnStatsSize ()), TDictStats::GetStatsSchema ()));
30
- return TDictStats (rbColumnStats);
31
- } else {
32
- return TDictStats::BuildEmpty ();
33
- }
34
- }();
35
- currentIndex += proto.GetColumnStatsSize ();
36
- TDictStats otherStats = [&]() {
37
- if (proto.GetOtherStatsSize ()) {
38
- std::shared_ptr<arrow::RecordBatch> rbOtherStats = TStatusValidator::GetValid (externalInfo.GetDefaultSerializer ()->Deserialize (
39
- TString (originalData.data () + currentIndex, proto.GetOtherStatsSize ()), TDictStats::GetStatsSchema ()));
40
- return TDictStats (rbOtherStats);
41
- } else {
42
- return TDictStats::BuildEmpty ();
43
- }
44
- }();
45
- currentIndex += proto.GetOtherStatsSize ();
21
+ ui32 currentIndex = headerConclusion->GetHeaderSize ();
22
+ const auto & proto = headerConclusion->GetAddressesProto ();
46
23
47
24
std::shared_ptr<TGeneralContainer> columnKeysContainer;
48
25
{
49
26
std::vector<std::shared_ptr<IChunkedArray>> columns;
50
- auto schema = columnStats.BuildColumnsSchema ();
51
- AFL_VERIFY (columnStats.GetColumnsCount () == (ui32)proto.GetKeyColumns ().size ())(" schema" , columnStats.GetColumnsCount ())(
27
+ auto schema = headerConclusion->GetColumnStats ().BuildColumnsSchema ();
28
+ AFL_VERIFY (headerConclusion->GetColumnStats ().GetColumnsCount () == (ui32)proto.GetKeyColumns ().size ())(
29
+ " schema" , headerConclusion->GetColumnStats ().GetColumnsCount ())(
52
30
" proto" , proto.GetKeyColumns ().size ());
53
31
for (ui32 i = 0 ; i < (ui32)proto.GetKeyColumns ().size (); ++i) {
54
32
std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(
55
- externalInfo.GetDefaultSerializer (), columnStats .GetAccessorConstructor (i), schema->field (i), nullptr , 0 );
33
+ externalInfo.GetDefaultSerializer (), headerConclusion-> GetColumnStats () .GetAccessorConstructor (i), schema->field (i), nullptr , 0 );
56
34
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk (
57
35
externalInfo.GetRecordsCount (), TStringBuf (originalData.data () + currentIndex, proto.GetKeyColumns (i).GetSize ())) };
58
36
columns.emplace_back (std::make_shared<TDeserializeChunkedArray>(externalInfo.GetRecordsCount (), columnLoader, std::move (chunks), true ));
@@ -62,26 +40,17 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
62
40
}
63
41
TOthersData otherData = TOthersData::BuildEmpty ();
64
42
if (proto.GetOtherColumns ().size () && proto.GetOtherRecordsCount ()) {
65
- std::shared_ptr<TGeneralContainer> otherKeysContainer;
66
- std::vector<std::shared_ptr<IChunkedArray>> columns;
67
- AFL_VERIFY (TOthersData::GetSchema ()->num_fields () == proto.GetOtherColumns ().size ())(" proto" , proto.GetOtherColumns ().size ())(
68
- " schema" , TOthersData::GetSchema ()->num_fields ());
69
- auto schema = TOthersData::GetSchema ();
70
- for (ui32 i = 0 ; i < (ui32)proto.GetOtherColumns ().size (); ++i) {
71
- std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(
72
- externalInfo.GetDefaultSerializer (), std::make_shared<NPlain::TConstructor>(), schema->field (i), nullptr , 0 );
73
- std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk (
74
- proto.GetOtherRecordsCount (), TStringBuf (originalData.data () + currentIndex, proto.GetOtherColumns (i).GetSize ())) };
75
- columns.emplace_back (std::make_shared<TDeserializeChunkedArray>(proto.GetOtherRecordsCount (), columnLoader, std::move (chunks), true ));
76
- currentIndex += proto.GetOtherColumns (i).GetSize ();
77
- }
78
- otherKeysContainer = std::make_shared<TGeneralContainer>(schema, std::move (columns));
79
- otherData = TOthersData (otherStats, otherKeysContainer);
43
+ AFL_VERIFY (currentIndex < originalData.size ());
44
+ std::shared_ptr<TGeneralContainer> otherKeysContainer = BuildOthersContainer (
45
+ TStringBuf (originalData.data () + currentIndex, originalData.size () - currentIndex), proto, externalInfo).DetachResult ();
46
+ currentIndex += headerConclusion->GetOthersSize ();
47
+ otherData = TOthersData (headerConclusion->GetOtherStats (), otherKeysContainer);
80
48
}
81
- TColumnsData columnData (columnStats , columnKeysContainer);
49
+ TColumnsData columnData (headerConclusion-> GetColumnStats () , columnKeysContainer);
82
50
auto result = std::make_shared<TSubColumnsArray>(
83
51
std::move (columnData), std::move (otherData), externalInfo.GetColumnType (), externalInfo.GetRecordsCount (), Settings);
84
52
result->StoreSourceString (originalData);
53
+ AFL_VERIFY (currentIndex == originalData.size ())(" index" , currentIndex)(" size" , originalData.size ());
85
54
return result;
86
55
}
87
56
@@ -105,4 +74,32 @@ TString TConstructor::DoSerializeToString(const std::shared_ptr<IChunkedArray>&
105
74
return arr->SerializeToString (externalInfo);
106
75
}
107
76
77
+ TConclusion<std::shared_ptr<TGeneralContainer>> TConstructor::BuildOthersContainer (
78
+ const TStringBuf data, const NKikimrArrowAccessorProto::TSubColumnsAccessor& proto, const TChunkConstructionData& externalInfo) {
79
+ std::vector<std::shared_ptr<IChunkedArray>> columns;
80
+ AFL_VERIFY (TOthersData::GetSchema ()->num_fields () == proto.GetOtherColumns ().size ())(" proto" , proto.GetOtherColumns ().size ())(
81
+ " schema" , TOthersData::GetSchema ()->num_fields ());
82
+ auto schema = TOthersData::GetSchema ();
83
+ ui32 currentIndex = 0 ;
84
+ for (ui32 i = 0 ; i < (ui32)proto.GetOtherColumns ().size (); ++i) {
85
+ std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(
86
+ externalInfo.GetDefaultSerializer (), std::make_shared<NPlain::TConstructor>(), schema->field (i), nullptr , 0 );
87
+ std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk (
88
+ proto.GetOtherRecordsCount (), TStringBuf (data.data () + currentIndex, proto.GetOtherColumns (i).GetSize ())) };
89
+ columns.emplace_back (std::make_shared<TDeserializeChunkedArray>(proto.GetOtherRecordsCount (), columnLoader, std::move (chunks), true ));
90
+ currentIndex += proto.GetOtherColumns (i).GetSize ();
91
+ }
92
+ return std::make_shared<TGeneralContainer>(schema, std::move (columns));
93
+ }
94
+
95
+ TConclusion<std::shared_ptr<TSubColumnsPartialArray>> TConstructor::BuildPartialReader (
96
+ const TString& originalData, const TChunkConstructionData& externalInfo) {
97
+ auto headerConclusion = TSubColumnsHeader::ReadHeader (originalData, externalInfo);
98
+ if (headerConclusion.IsFail ()) {
99
+ return headerConclusion;
100
+ }
101
+ return std::make_shared<TSubColumnsPartialArray>(
102
+ headerConclusion.DetachResult (), externalInfo.GetRecordsCount (), externalInfo.GetColumnType ());
103
+ }
104
+
108
105
} // namespace NKikimr::NArrow::NAccessor::NSubColumns
0 commit comments