Skip to content

Commit f2ccd9c

Browse files
authored
Improved handling of empty cells in CSV YQ-2727 (#796)
* csv null poc * cleanup * not null string as default * cleanup tests
1 parent 0f64a08 commit f2ccd9c

File tree

7 files changed

+135
-0
lines changed

7 files changed

+135
-0
lines changed

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -3034,6 +3034,10 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
30343034

30353035
readSpec->Format = params.GetFormat();
30363036

3037+
if (readSpec->Format == "csv_with_names") {
3038+
readSpec->Settings.csv.empty_as_default = true;
3039+
}
3040+
30373041
if (const auto it = settings.find("compression"); settings.cend() != it)
30383042
readSpec->Compression = it->second;
30393043

ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/CSVRowInputFormat.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,9 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, co
429429
if (format_settings.csv.empty_as_default
430430
&& (at_delimiter || at_last_column_line_end))
431431
{
432+
if (!type->isNullable() && type->getName() != "String" && !type->getName().starts_with("FixedString")) {
433+
throw ParsingException("Invalid data format", NDB::ErrorCodes::INCORRECT_DATA);
434+
}
432435
/// Treat empty unquoted column value as default value, if
433436
/// specified in the settings. Tuple columns might seem
434437
/// problematic, because they are never quoted but still contain

ydb/tests/fq/s3/test_format_setting.py

+120
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import ydb.tests.fq.s3.s3_helpers as s3_helpers
1717
from ydb.tests.tools.fq_runner.kikimr_utils import yq_all
18+
from google.protobuf import struct_pb2
1819

1920

2021
class TestS3(TestYdsBase):
@@ -759,3 +760,122 @@ def test_date_time_completeness_iso(self, kikimr, s3, client, filename, type_for
759760
assert result_set.columns[3].type.type_id == ydb.Type.INT32
760761

761762
assert len(result_set.rows) == 6
763+
764+
@yq_all
765+
@pytest.mark.parametrize("filename", [
766+
("date_null/as_default/test.csv"),
767+
("date_null/parse_error/test.csv")
768+
])
769+
def test_date_null(self, kikimr, s3, client, filename):
770+
self.create_bucket_and_upload_file(filename, s3, kikimr)
771+
client.create_storage_connection("hcpp", "fbucket")
772+
773+
sql = '''
774+
SELECT
775+
`put`
776+
FROM
777+
`hcpp`.`{name}`
778+
WITH (FORMAT="csv_with_names",
779+
csv_delimiter=",",
780+
SCHEMA=(
781+
`put` Date
782+
))
783+
LIMIT 10;
784+
'''.format(name="/" + filename)
785+
786+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
787+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
788+
data = client.get_result_data(query_id, limit=50)
789+
assert data.result.result_set.rows[0].items[0].null_flag_value == struct_pb2.NULL_VALUE, str(data.result.result_set)
790+
791+
@yq_all
792+
@pytest.mark.parametrize("filename", [
793+
("date_null/as_default/test.csv"),
794+
("date_null/parse_error/test.csv")
795+
])
796+
def test_date_null_with_not_null_type(self, kikimr, s3, client, filename):
797+
self.create_bucket_and_upload_file(filename, s3, kikimr)
798+
client.create_storage_connection("hcpp", "fbucket")
799+
800+
sql = '''
801+
SELECT
802+
`put`
803+
FROM
804+
`hcpp`.`{name}`
805+
WITH (FORMAT="csv_with_names",
806+
csv_delimiter=",",
807+
SCHEMA=(
808+
`put` Date NOT NULL
809+
))
810+
LIMIT 10;
811+
'''.format(name="/" + filename)
812+
813+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
814+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
815+
describe_result = client.describe_query(query_id).result
816+
issues = describe_result.query.issue[0].issues
817+
assert "Invalid data format" in str(issues), str(describe_result)
818+
assert "name: put, type: Date, ERROR: text " in str(issues), str(describe_result)
819+
assert "is not like Date" in str(issues), str(describe_result)
820+
821+
@yq_all
822+
@pytest.mark.parametrize("filename", [
823+
("date_null/as_default/multi_null.csv"),
824+
("date_null/parse_error/multi_null.csv")
825+
])
826+
def test_date_null_multi(self, kikimr, s3, client, filename):
827+
self.create_bucket_and_upload_file(filename, s3, kikimr)
828+
client.create_storage_connection("hcpp", "fbucket")
829+
830+
sql = '''
831+
SELECT
832+
`put`, `a`, `t`
833+
FROM
834+
`hcpp`.`{name}`
835+
WITH (FORMAT="csv_with_names",
836+
csv_delimiter=",",
837+
SCHEMA=(
838+
`put` Date,
839+
`a` Date,
840+
`t` Date
841+
))
842+
LIMIT 10;
843+
'''.format(name="/" + filename)
844+
845+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
846+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
847+
data = client.get_result_data(query_id, limit=50)
848+
assert data.result.result_set.rows[0].items[0].null_flag_value == struct_pb2.NULL_VALUE, str(data.result.result_set)
849+
assert data.result.result_set.rows[0].items[1].null_flag_value == struct_pb2.NULL_VALUE, str(data.result.result_set)
850+
assert data.result.result_set.rows[0].items[2].null_flag_value == struct_pb2.NULL_VALUE, str(data.result.result_set)
851+
852+
@yq_all
853+
@pytest.mark.parametrize("filename", [
854+
("date_null/as_default/multi_null.csv"),
855+
("date_null/parse_error/multi_null.csv")
856+
])
857+
def test_string_not_null_multi(self, kikimr, s3, client, filename):
858+
self.create_bucket_and_upload_file(filename, s3, kikimr)
859+
client.create_storage_connection("hcpp", "fbucket")
860+
861+
sql = '''
862+
SELECT
863+
`put`, `a`, `t`
864+
FROM
865+
`hcpp`.`{name}`
866+
WITH (FORMAT="csv_with_names",
867+
csv_delimiter=",",
868+
SCHEMA=(
869+
`put` String NOT NULL,
870+
`a` Utf8 NOT NULL,
871+
`t` String NOT NULL
872+
))
873+
LIMIT 10;
874+
'''.format(name="/" + filename)
875+
876+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
877+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
878+
data = client.get_result_data(query_id, limit=50)
879+
assert data.result.result_set.rows[0].items[0].bytes_value == b"", str(data.result.result_set)
880+
assert data.result.result_set.rows[0].items[1].bytes_value == b"", str(data.result.result_set)
881+
assert data.result.result_set.rows[0].items[2].bytes_value == b"", str(data.result.result_set)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
id,put,call,a,t
2+
id1,,2022-12-22,,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
id,put,call
2+
id1,,2022-12-22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
id,put,call,a,t
2+
id1,,hello2,,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
id,put,call
2+
id1,,hello2

0 commit comments

Comments
 (0)