Skip to content

Support for parquet file with type inferring #7734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
for (const auto& entry : entries.Objects) {
if (entry.Size > 0) {
return entry.Path;
return entry;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А как оно раньше работало. Почему на entry заменилось?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

раньше мы скачивали первые 10МБ файла -> размер файла нам был не нужен.
теперь для паркета нам нужно скачать последние N -> нужно знать размер файла, тк S3Fetcher принимает на вход (начало, конец) участка памяти

соотв я добавил в TEvInferFileSchema размер файла

}
}
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
Expand All @@ -352,7 +352,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));

return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
if (!response.Status.IsSuccess()) {
Expand All @@ -370,9 +370,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
};
auto [path, size, _] = entryFut.GetValue();
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
arrowInferencinatorId,
new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}),
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
promise,
std::move(schemaToMetadata)
));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/external_sources/object_storage/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ struct TEvArrowFile : public NActors::TEventLocal<TEvArrowFile, EvArrowFile> {
};

struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvInferFileSchema> {
explicit TEvInferFileSchema(TString&& path)
explicit TEvInferFileSchema(TString&& path, ui64 size)
: Path{std::move(path)}
, Size{size}
{}

TString Path;
ui64 Size = 0;
};

struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <arrow/csv/chunker.h>
#include <arrow/csv/options.h>
#include <arrow/io/memory.h>
#include <arrow/util/endian.h>

#include <util/generic/guid.h>
#include <util/generic/size_literals.h>
Expand Down Expand Up @@ -47,15 +48,20 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
const auto& request = *ev->Get();
TRequest localRequest{
.Path = request.Path,
.RequestId = {},
.RequestId = TGUID::Create(),
.Requester = ev->Sender,
.MetadataRequest = false,
};
CreateGuid(&localRequest.RequestId);

switch (Format_) {
case EFileFormat::CsvWithNames:
case EFileFormat::TsvWithNames: {
HandleAsPrefixFile(std::move(localRequest), ctx);
RequestPartialFile(std::move(localRequest), ctx, 0, 10_MB);
break;
}
case EFileFormat::Parquet: {
localRequest.MetadataRequest = true;
RequestPartialFile(std::move(localRequest), ctx, request.Size - 8, request.Size - 4);
break;
}
default: {
Expand Down Expand Up @@ -96,6 +102,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
break;
}
case EFileFormat::Parquet: {
if (request.MetadataRequest) {
HandleMetadataSizeRequest(data, request, ctx);
return;
}
file = BuildParquetFileFromMetadata(data, request, ctx);
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
break;
}
case EFileFormat::Undefined:
default:
Y_ABORT("Invalid format should be unreachable");
Expand All @@ -120,14 +135,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
uint64_t From = 0;
uint64_t To = 0;
NActors::TActorId Requester;
bool MetadataRequest;
};

// Reading file

void HandleAsPrefixFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx) {
void RequestPartialFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx, uint64_t from, uint64_t to) {
auto path = insertedRequest.Path;
insertedRequest.From = 0;
insertedRequest.To = 10_MB;
insertedRequest.From = from;
insertedRequest.To = to;
auto it = InflightRequests_.try_emplace(path, std::move(insertedRequest));
Y_ABORT_UNLESS(it.second, "couldn't insert request for path: %s", path.c_str());

Expand Down Expand Up @@ -223,6 +239,58 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
return std::make_shared<arrow::io::BufferReader>(std::move(whole));
}

void HandleMetadataSizeRequest(const TString& data, TRequest request, const NActors::TActorContext& ctx) {
uint32_t metadataSize = arrow::BitUtil::FromLittleEndian<uint32_t>(ReadUnaligned<uint32_t>(data.data()));

if (metadataSize > 10_MB) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't load parquet metadata, size is bigger than 10MB : " << metadataSize
);
SendError(ctx, error);
return;
}

InflightRequests_.erase(request.Path);

TRequest localRequest{
.Path = request.Path,
.RequestId = TGUID::Create(),
.Requester = request.Requester,
.MetadataRequest = false,
};
RequestPartialFile(std::move(localRequest), ctx, request.From - metadataSize, request.To + 4);
}

std::shared_ptr<arrow::io::RandomAccessFile> BuildParquetFileFromMetadata(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
auto arrowData = std::make_shared<arrow::Buffer>(nullptr, 0);
arrow::BufferBuilder builder;
auto buildRes = builder.Append(data.data(), data.size());
if (!buildRes.ok()) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't read data from S3Fetcher: " << buildRes.ToString()
);
SendError(ctx, error);
return nullptr;
}

buildRes = builder.Finish(&arrowData);
if (!buildRes.ok()) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't copy data from S3Fetcher: " << buildRes.ToString()
);
SendError(ctx, error);
return nullptr;
}

return std::make_shared<arrow::io::BufferReader>(std::move(arrowData));
}

// Utility
void SendError(const NActors::TActorContext& ctx, TEvFileError* error) {
auto requestIt = InflightRequests_.find(error->Path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <arrow/table.h>
#include <arrow/csv/options.h>
#include <arrow/csv/reader.h>
#include <parquet/arrow/reader.h>

#include <ydb/core/external_sources/object_storage/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -202,12 +203,37 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
return table->fields();
}

std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::RandomAccessFile> file) {
parquet::arrow::FileReaderBuilder builder;
builder.properties(parquet::ArrowReaderProperties(false));
auto openStatus = builder.Open(std::move(file));
if (!openStatus.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

std::unique_ptr<parquet::arrow::FileReader> reader;
auto readerStatus = builder.Build(&reader);
if (!readerStatus.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

std::shared_ptr<arrow::Schema> schema;
auto schemaRes = reader->GetSchema(&schema);
if (!schemaRes.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

return schema->fields();
}

std::variant<ArrowFields, TString> InferType(EFileFormat format, std::shared_ptr<arrow::io::RandomAccessFile> file, const FormatConfig& config) {
switch (format) {
case EFileFormat::CsvWithNames:
return InferCsvTypes(std::move(file), static_cast<const CsvConfig&>(config));
case EFileFormat::TsvWithNames:
return InferCsvTypes(std::move(file), static_cast<const TsvConfig&>(config));
case EFileFormat::Parquet:
return InferParquetTypes(std::move(file));
case EFileFormat::Undefined:
default:
return std::variant<ArrowFields, TString>{std::in_place_type_t<TString>{}, TStringBuilder{} << "unexpected format: " << ConvertFileFormat(format)};
Expand Down Expand Up @@ -240,7 +266,10 @@ std::unique_ptr<FormatConfig> MakeFormatConfig(EFileFormat format, const THashMa

class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencinator> {
public:
TArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params)
TArrowInferencinator(
NActors::TActorId arrowFetcher,
EFileFormat format,
const THashMap<TString, TString>& params)
: Format_{format}
, Config_{MakeFormatConfig(Format_, params)}
, ArrowFetcherId_{arrowFetcher}
Expand Down Expand Up @@ -270,7 +299,6 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
return;
}

auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
std::vector<Ydb::Column> ydbFields;
for (const auto& field : arrowFields) {
Expand All @@ -297,7 +325,11 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
NActors::TActorId RequesterId_;
};

NActors::IActor* CreateArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params) {
NActors::IActor* CreateArrowInferencinator(
NActors::TActorId arrowFetcher,
EFileFormat format,
const THashMap<TString, TString>& params) {

return new TArrowInferencinator{arrowFetcher, format, params};
}
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ TEST_F(ArrowInferenceTest, csv_simple) {

auto inferencinatorId = RegisterInferencinator("csv_with_names");
ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}));
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
});

std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
Expand Down Expand Up @@ -121,7 +121,7 @@ TEST_F(ArrowInferenceTest, tsv_simple) {

auto inferencinatorId = RegisterInferencinator("tsv_with_names");
ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}));
NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
});

std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
Expand Down
49 changes: 48 additions & 1 deletion ydb/tests/fq/s3/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import ydb.public.api.protos.draft.fq_pb2 as fq

import ydb.tests.fq.s3.s3_helpers as s3_helpers
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, YQ_STATS_FULL
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, yq_v2, YQ_STATS_FULL


class TestS3Formats:
Expand All @@ -38,6 +38,26 @@ def validate_result(self, result_set):
assert result_set.rows[2].items[0].bytes_value == b"Pear"
assert result_set.rows[2].items[1].int32_value == 15
assert result_set.rows[2].items[2].int32_value == 33

def validate_result_inference(self, result_set):
logging.debug(str(result_set))
assert len(result_set.columns) == 3
assert result_set.columns[0].name == "Fruit"
assert result_set.columns[0].type.type_id == ydb.Type.UTF8
assert result_set.columns[1].name == "Price"
assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64
assert result_set.columns[2].name == "Weight"
assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].text_value == "Banana"
assert result_set.rows[0].items[1].int64_value == 3
assert result_set.rows[0].items[2].int64_value == 100
assert result_set.rows[1].items[0].text_value == "Apple"
assert result_set.rows[1].items[1].int64_value == 2
assert result_set.rows[1].items[2].int64_value == 22
assert result_set.rows[2].items[0].text_value == "Pear"
assert result_set.rows[2].items[1].int64_value == 15
assert result_set.rows[2].items[2].int64_value == 33

def validate_pg_result(self, result_set):
logging.debug(str(result_set))
Expand Down Expand Up @@ -104,6 +124,33 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version, uni
if type_format != "json_list":
assert stat["ResultSet"]["IngressRows"]["sum"] == 3

@yq_v2
@pytest.mark.parametrize(
"filename, type_format",
[
("test.csv", "csv_with_names"),
("test.tsv", "tsv_with_names"),
("test.parquet", "parquet"),
],
)
def test_format_inference(self, kikimr, s3, client, filename, type_format, unique_prefix):
self.create_bucket_and_upload_file(filename, s3, kikimr)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
SELECT *
FROM `{storage_connection_name}`.`{filename}`
WITH (format=`{type_format}`, with_infer='true');
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
self.validate_result_inference(result_set)

@yq_all
def test_btc(self, kikimr, s3, client, unique_prefix):
self.create_bucket_and_upload_file("btct.parquet", s3, kikimr)
Expand Down
Loading