Skip to content

Commit ee6b772

Browse files
handle EVFileError event in TObjectStorageExternalSource (#7569)
1 parent 1269ae5 commit ee6b772

File tree

4 files changed

+58
-6
lines changed

4 files changed

+58
-6
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,13 +355,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
355355
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
356356
auto promise = NThreading::NewPromise<TMetadataResult>();
357357
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
358+
if (!response.Status.IsSuccess()) {
359+
metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues()));
360+
return;
361+
}
362+
TMetadataResult result;
358363
meta->Changed = true;
359364
meta->Schema.clear_column();
360365
for (const auto& column : response.Fields) {
361366
auto& destColumn = *meta->Schema.add_column();
362367
destColumn = column;
363368
}
364-
TMetadataResult result;
365369
result.SetSuccess();
366370
result.Metadata = meta;
367371
metaPromise.SetValue(std::move(result));

ydb/core/external_sources/object_storage/events.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
1111
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
1212
#include <ydb/public/api/protos/ydb_value.pb.h>
13+
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
1314

1415
namespace NKikimr::NExternalSource::NObjectStorage {
1516

@@ -128,10 +129,16 @@ struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvIn
128129
struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
129130
TEvInferredFileSchema(TString path, std::vector<Ydb::Column>&& fields)
130131
: Path{std::move(path)}
132+
, Status{NYdb::EStatus::SUCCESS, {}}
131133
, Fields{std::move(fields)}
132134
{}
135+
TEvInferredFileSchema(TString path, NYql::TIssues&& issues)
136+
: Path{std::move(path)}
137+
, Status{NYdb::EStatus::INTERNAL_ERROR, std::move(issues)}
138+
{}
133139

134140
TString Path;
141+
NYdb::TStatus Status;
135142
std::vector<Ydb::Column> Fields;
136143
};
137144

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type) {
153153
}
154154
return false;
155155
}
156+
157+
TEvInferredFileSchema* MakeErrorSchema(TString path, NFq::TIssuesIds::EIssueCode code, TString message) {
158+
NYql::TIssues issues;
159+
issues.AddIssue(std::move(message));
160+
issues.back().SetCode(code, NYql::TSeverityIds::S_ERROR);
161+
return new TEvInferredFileSchema{std::move(path), std::move(issues)};
162+
}
163+
156164
}
157165

158166
struct FormatConfig {
@@ -181,14 +189,14 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
181189
.Value(&reader);
182190

183191
if (!readerStatus.ok()) {
184-
return TString{TStringBuilder{} << "couldn't make table from data: " << readerStatus.ToString()};
192+
return TString{TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString()};
185193
}
186194

187195
std::shared_ptr<arrow::Table> table;
188196
auto tableRes = reader->Read().Value(&table);
189197

190198
if (!tableRes.ok()) {
191-
return TStringBuilder{} << "couldn't read table from data: " << readerStatus.ToString();
199+
return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString();
192200
}
193201

194202
return table->fields();
@@ -259,7 +267,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
259267
auto& file = *ev->Get();
260268
auto mbArrowFields = InferType(Format_, file.File, *Config_);
261269
if (std::holds_alternative<TString>(mbArrowFields)) {
262-
ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
270+
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
263271
return;
264272
}
265273

@@ -269,7 +277,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
269277
ydbFields.emplace_back();
270278
auto& ydbField = ydbFields.back();
271279
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type())) {
272-
ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
280+
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
273281
return;
274282
}
275283
ydbField.mutable_name()->assign(field->name());
@@ -279,7 +287,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
279287

280288
void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) {
281289
Cout << "TArrowInferencinator::HandleFileError" << Endl;
282-
ctx.Send(RequesterId_, ev->Release());
290+
ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues)));
283291
}
284292

285293
private:

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,39 @@ def test_inference_multiple_files(self, kikimr, s3, client, unique_prefix):
310310
assert result_set.rows[2].items[2].int64_value == 3
311311
assert sum(kikimr.control_plane.get_metering(1)) == 10
312312

313+
@yq_v2
314+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
315+
def test_inference_file_error(self, kikimr, s3, client, unique_prefix):
316+
resource = boto3.resource(
317+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
318+
)
319+
320+
bucket = resource.Bucket("fbucket")
321+
bucket.create(ACL='public-read')
322+
bucket.objects.all().delete()
323+
324+
s3_client = boto3.client(
325+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
326+
)
327+
328+
read_data = '''{"a" : [10, 20, 30]}'''
329+
s3_client.put_object(Body=read_data, Bucket='fbucket', Key='data.json', ContentType='text/plain')
330+
kikimr.control_plane.wait_bootstrap(1)
331+
storage_connection_name = unique_prefix + "json_bucket"
332+
client.create_storage_connection(storage_connection_name, "fbucket")
333+
334+
sql = f'''
335+
SELECT *
336+
FROM `{storage_connection_name}`.`data.json`
337+
WITH (format=csv_with_names, with_infer='true');
338+
'''
339+
340+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
341+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
342+
assert "couldn\\'t parse csv/tsv file, check format and compression params:" in str(
343+
client.describe_query(query_id).result
344+
)
345+
313346
@yq_all
314347
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
315348
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):

0 commit comments

Comments
 (0)