Skip to content

Commit a6a8b05

Browse files
Support for parquet file with type inferring (#7734)
1 parent ff5385f commit a6a8b05

File tree

6 files changed

+166
-16
lines changed

6 files changed

+166
-16
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
335335
}
336336
for (const auto& entry : entries.Objects) {
337337
if (entry.Size > 0) {
338-
return entry.Path;
338+
return entry;
339339
}
340340
}
341341
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
@@ -354,7 +354,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
354354
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
355355
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));
356356

357-
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
357+
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
358358
auto promise = NThreading::NewPromise<TMetadataResult>();
359359
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
360360
if (!response.Status.IsSuccess()) {
@@ -372,9 +372,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
372372
result.Metadata = meta;
373373
metaPromise.SetValue(std::move(result));
374374
};
375+
auto [path, size, _] = entryFut.GetValue();
375376
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
376377
arrowInferencinatorId,
377-
new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}),
378+
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
378379
promise,
379380
std::move(schemaToMetadata)
380381
));

ydb/core/external_sources/object_storage/events.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ struct TEvArrowFile : public NActors::TEventLocal<TEvArrowFile, EvArrowFile> {
119119
};
120120

121121
struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvInferFileSchema> {
122-
explicit TEvInferFileSchema(TString&& path)
122+
explicit TEvInferFileSchema(TString&& path, ui64 size)
123123
: Path{std::move(path)}
124+
, Size{size}
124125
{}
125126

126127
TString Path;
128+
ui64 Size = 0;
127129
};
128130

129131
struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <arrow/csv/chunker.h>
77
#include <arrow/csv/options.h>
88
#include <arrow/io/memory.h>
9+
#include <arrow/util/endian.h>
910

1011
#include <util/generic/guid.h>
1112
#include <util/generic/size_literals.h>
@@ -47,15 +48,20 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
4748
const auto& request = *ev->Get();
4849
TRequest localRequest{
4950
.Path = request.Path,
50-
.RequestId = {},
51+
.RequestId = TGUID::Create(),
5152
.Requester = ev->Sender,
53+
.MetadataRequest = false,
5254
};
53-
CreateGuid(&localRequest.RequestId);
5455

5556
switch (Format_) {
5657
case EFileFormat::CsvWithNames:
5758
case EFileFormat::TsvWithNames: {
58-
HandleAsPrefixFile(std::move(localRequest), ctx);
59+
RequestPartialFile(std::move(localRequest), ctx, 0, 10_MB);
60+
break;
61+
}
62+
case EFileFormat::Parquet: {
63+
localRequest.MetadataRequest = true;
64+
RequestPartialFile(std::move(localRequest), ctx, request.Size - 8, request.Size - 4);
5965
break;
6066
}
6167
default: {
@@ -96,6 +102,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
96102
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
97103
break;
98104
}
105+
case EFileFormat::Parquet: {
106+
if (request.MetadataRequest) {
107+
HandleMetadataSizeRequest(data, request, ctx);
108+
return;
109+
}
110+
file = BuildParquetFileFromMetadata(data, request, ctx);
111+
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
112+
break;
113+
}
99114
case EFileFormat::Undefined:
100115
default:
101116
Y_ABORT("Invalid format should be unreachable");
@@ -120,14 +135,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
120135
uint64_t From = 0;
121136
uint64_t To = 0;
122137
NActors::TActorId Requester;
138+
bool MetadataRequest;
123139
};
124140

125141
// Reading file
126142

127-
void HandleAsPrefixFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx) {
143+
void RequestPartialFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx, uint64_t from, uint64_t to) {
128144
auto path = insertedRequest.Path;
129-
insertedRequest.From = 0;
130-
insertedRequest.To = 10_MB;
145+
insertedRequest.From = from;
146+
insertedRequest.To = to;
131147
auto it = InflightRequests_.try_emplace(path, std::move(insertedRequest));
132148
Y_ABORT_UNLESS(it.second, "couldn't insert request for path: %s", path.c_str());
133149

@@ -223,6 +239,58 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
223239
return std::make_shared<arrow::io::BufferReader>(std::move(whole));
224240
}
225241

242+
void HandleMetadataSizeRequest(const TString& data, TRequest request, const NActors::TActorContext& ctx) {
243+
uint32_t metadataSize = arrow::BitUtil::FromLittleEndian<uint32_t>(ReadUnaligned<uint32_t>(data.data()));
244+
245+
if (metadataSize > 10_MB) {
246+
auto error = MakeError(
247+
request.Path,
248+
NFq::TIssuesIds::INTERNAL_ERROR,
249+
TStringBuilder{} << "couldn't load parquet metadata, size is bigger than 10MB : " << metadataSize
250+
);
251+
SendError(ctx, error);
252+
return;
253+
}
254+
255+
InflightRequests_.erase(request.Path);
256+
257+
TRequest localRequest{
258+
.Path = request.Path,
259+
.RequestId = TGUID::Create(),
260+
.Requester = request.Requester,
261+
.MetadataRequest = false,
262+
};
263+
RequestPartialFile(std::move(localRequest), ctx, request.From - metadataSize, request.To + 4);
264+
}
265+
266+
std::shared_ptr<arrow::io::RandomAccessFile> BuildParquetFileFromMetadata(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
267+
auto arrowData = std::make_shared<arrow::Buffer>(nullptr, 0);
268+
arrow::BufferBuilder builder;
269+
auto buildRes = builder.Append(data.data(), data.size());
270+
if (!buildRes.ok()) {
271+
auto error = MakeError(
272+
request.Path,
273+
NFq::TIssuesIds::INTERNAL_ERROR,
274+
TStringBuilder{} << "couldn't read data from S3Fetcher: " << buildRes.ToString()
275+
);
276+
SendError(ctx, error);
277+
return nullptr;
278+
}
279+
280+
buildRes = builder.Finish(&arrowData);
281+
if (!buildRes.ok()) {
282+
auto error = MakeError(
283+
request.Path,
284+
NFq::TIssuesIds::INTERNAL_ERROR,
285+
TStringBuilder{} << "couldn't copy data from S3Fetcher: " << buildRes.ToString()
286+
);
287+
SendError(ctx, error);
288+
return nullptr;
289+
}
290+
291+
return std::make_shared<arrow::io::BufferReader>(std::move(arrowData));
292+
}
293+
226294
// Utility
227295
void SendError(const NActors::TActorContext& ctx, TEvFileError* error) {
228296
auto requestIt = InflightRequests_.find(error->Path);

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <arrow/table.h>
44
#include <arrow/csv/options.h>
55
#include <arrow/csv/reader.h>
6+
#include <parquet/arrow/reader.h>
67

78
#include <ydb/core/external_sources/object_storage/events.h>
89
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -202,12 +203,37 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
202203
return table->fields();
203204
}
204205

206+
std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::RandomAccessFile> file) {
207+
parquet::arrow::FileReaderBuilder builder;
208+
builder.properties(parquet::ArrowReaderProperties(false));
209+
auto openStatus = builder.Open(std::move(file));
210+
if (!openStatus.ok()) {
211+
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
212+
}
213+
214+
std::unique_ptr<parquet::arrow::FileReader> reader;
215+
auto readerStatus = builder.Build(&reader);
216+
if (!readerStatus.ok()) {
217+
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
218+
}
219+
220+
std::shared_ptr<arrow::Schema> schema;
221+
auto schemaRes = reader->GetSchema(&schema);
222+
if (!schemaRes.ok()) {
223+
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
224+
}
225+
226+
return schema->fields();
227+
}
228+
205229
std::variant<ArrowFields, TString> InferType(EFileFormat format, std::shared_ptr<arrow::io::RandomAccessFile> file, const FormatConfig& config) {
206230
switch (format) {
207231
case EFileFormat::CsvWithNames:
208232
return InferCsvTypes(std::move(file), static_cast<const CsvConfig&>(config));
209233
case EFileFormat::TsvWithNames:
210234
return InferCsvTypes(std::move(file), static_cast<const TsvConfig&>(config));
235+
case EFileFormat::Parquet:
236+
return InferParquetTypes(std::move(file));
211237
case EFileFormat::Undefined:
212238
default:
213239
return std::variant<ArrowFields, TString>{std::in_place_type_t<TString>{}, TStringBuilder{} << "unexpected format: " << ConvertFileFormat(format)};
@@ -240,7 +266,10 @@ std::unique_ptr<FormatConfig> MakeFormatConfig(EFileFormat format, const THashMa
240266

241267
class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencinator> {
242268
public:
243-
TArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params)
269+
TArrowInferencinator(
270+
NActors::TActorId arrowFetcher,
271+
EFileFormat format,
272+
const THashMap<TString, TString>& params)
244273
: Format_{format}
245274
, Config_{MakeFormatConfig(Format_, params)}
246275
, ArrowFetcherId_{arrowFetcher}
@@ -270,7 +299,6 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
270299
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
271300
return;
272301
}
273-
274302
auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
275303
std::vector<Ydb::Column> ydbFields;
276304
for (const auto& field : arrowFields) {
@@ -297,7 +325,11 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
297325
NActors::TActorId RequesterId_;
298326
};
299327

300-
NActors::IActor* CreateArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params) {
328+
NActors::IActor* CreateArrowInferencinator(
329+
NActors::TActorId arrowFetcher,
330+
EFileFormat format,
331+
const THashMap<TString, TString>& params) {
332+
301333
return new TArrowInferencinator{arrowFetcher, format, params};
302334
}
303335
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ TEST_F(ArrowInferenceTest, csv_simple) {
8585

8686
auto inferencinatorId = RegisterInferencinator("csv_with_names");
8787
ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
88-
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}));
88+
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
8989
});
9090

9191
std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
@@ -121,7 +121,7 @@ TEST_F(ArrowInferenceTest, tsv_simple) {
121121

122122
auto inferencinatorId = RegisterInferencinator("tsv_with_names");
123123
ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
124-
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}));
124+
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
125125
});
126126

127127
std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});

ydb/tests/fq/s3/test_formats.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import ydb.public.api.protos.draft.fq_pb2 as fq
1212

1313
import ydb.tests.fq.s3.s3_helpers as s3_helpers
14-
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, YQ_STATS_FULL
14+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, yq_v2, YQ_STATS_FULL
1515

1616

1717
class TestS3Formats:
@@ -38,6 +38,26 @@ def validate_result(self, result_set):
3838
assert result_set.rows[2].items[0].bytes_value == b"Pear"
3939
assert result_set.rows[2].items[1].int32_value == 15
4040
assert result_set.rows[2].items[2].int32_value == 33
41+
42+
def validate_result_inference(self, result_set):
43+
logging.debug(str(result_set))
44+
assert len(result_set.columns) == 3
45+
assert result_set.columns[0].name == "Fruit"
46+
assert result_set.columns[0].type.type_id == ydb.Type.UTF8
47+
assert result_set.columns[1].name == "Price"
48+
assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64
49+
assert result_set.columns[2].name == "Weight"
50+
assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64
51+
assert len(result_set.rows) == 3
52+
assert result_set.rows[0].items[0].text_value == "Banana"
53+
assert result_set.rows[0].items[1].int64_value == 3
54+
assert result_set.rows[0].items[2].int64_value == 100
55+
assert result_set.rows[1].items[0].text_value == "Apple"
56+
assert result_set.rows[1].items[1].int64_value == 2
57+
assert result_set.rows[1].items[2].int64_value == 22
58+
assert result_set.rows[2].items[0].text_value == "Pear"
59+
assert result_set.rows[2].items[1].int64_value == 15
60+
assert result_set.rows[2].items[2].int64_value == 33
4161

4262
def validate_pg_result(self, result_set):
4363
logging.debug(str(result_set))
@@ -104,6 +124,33 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version, uni
104124
if type_format != "json_list":
105125
assert stat["ResultSet"]["IngressRows"]["sum"] == 3
106126

127+
@yq_v2
128+
@pytest.mark.parametrize(
129+
"filename, type_format",
130+
[
131+
("test.csv", "csv_with_names"),
132+
("test.tsv", "tsv_with_names"),
133+
("test.parquet", "parquet"),
134+
],
135+
)
136+
def test_format_inference(self, kikimr, s3, client, filename, type_format, unique_prefix):
137+
self.create_bucket_and_upload_file(filename, s3, kikimr)
138+
storage_connection_name = unique_prefix + "fruitbucket"
139+
client.create_storage_connection(storage_connection_name, "fbucket")
140+
141+
sql = f'''
142+
SELECT *
143+
FROM `{storage_connection_name}`.`{filename}`
144+
WITH (format=`{type_format}`, with_infer='true');
145+
'''
146+
147+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
148+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
149+
150+
data = client.get_result_data(query_id)
151+
result_set = data.result.result_set
152+
self.validate_result_inference(result_set)
153+
107154
@yq_all
108155
def test_btc(self, kikimr, s3, client, unique_prefix):
109156
self.create_bucket_and_upload_file("btct.parquet", s3, kikimr)

0 commit comments

Comments
 (0)