File tree 3 files changed +31
-1
lines changed
3 files changed +31
-1
lines changed Original file line number Diff line number Diff line change 342
342
(array(), map(), array(struct(1)))
343
343
"""
344
344
)
345
+
346
+ spark .sql (
347
+ f"""
348
+ CREATE TABLE { catalog_name } .default.test_read_orc (
349
+ number integer,
350
+ letter string
351
+ )
352
+ USING iceberg
353
+ TBLPROPERTIES (
354
+ 'format-version'='2',
355
+ 'write.format.default'='orc'
356
+ );
357
+ """
358
+ )
359
+
360
+ spark .sql (f"INSERT INTO { catalog_name } .default.test_read_orc VALUES (1,'hello')" )
Original file line number Diff line number Diff line change @@ -627,6 +627,8 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
627
627
def _get_file_format (file_format : FileFormat , ** kwargs : Dict [str , Any ]) -> ds .FileFormat :
628
628
if file_format == FileFormat .PARQUET :
629
629
return ds .ParquetFileFormat (** kwargs )
630
+ elif file_format == FileFormat .ORC :
631
+ return ds .OrcFileFormat ()
630
632
else :
631
633
raise ValueError (f"Unsupported file format: { file_format } " )
632
634
@@ -972,8 +974,11 @@ def _task_to_table(
972
974
name_mapping : Optional [NameMapping ] = None ,
973
975
) -> Optional [pa .Table ]:
974
976
_ , _ , path = PyArrowFileIO .parse_location (task .file .file_path )
975
- arrow_format = ds .ParquetFileFormat (pre_buffer = True , buffer_size = (ONE_MEGABYTE * 8 ))
976
977
with fs .open_input_file (path ) as fin :
978
+ if task .file .file_format == FileFormat .PARQUET :
979
+ arrow_format = ds .ParquetFileFormat (pre_buffer = True , buffer_size = (ONE_MEGABYTE * 8 ))
980
+ if task .file .file_format == FileFormat .ORC :
981
+ arrow_format = ds .OrcFileFormat () # currently ORC doesn't support any fragment scan options
977
982
fragment = arrow_format .make_fragment (fin )
978
983
physical_schema = fragment .physical_schema
979
984
file_schema = pyarrow_to_schema (physical_schema , name_mapping )
Original file line number Diff line number Diff line change @@ -537,3 +537,12 @@ def another_task() -> None:
537
537
538
538
table .transaction ().set_properties (lock = "xxx" ).commit_transaction ()
539
539
assert table .properties .get ("lock" ) == "xxx"
540
+
541
+
542
+ @pytest .mark .integration
543
+ @pytest .mark .parametrize ("catalog" , [pytest .lazy_fixture ("session_catalog_hive" ), pytest .lazy_fixture ("session_catalog" )])
544
+ def test_pyarrow_read (catalog : Catalog ) -> None :
545
+ table_orc = catalog .load_table ("default.test_read_orc" )
546
+ arrow_table = table_orc .scan (row_filter = "number > -1" , selected_fields = ("number" , "letter" )).to_arrow ()
547
+ assert len (arrow_table ) == 1
548
+ assert arrow_table ["number" ][0 ].as_py () == 1
You can’t perform that action at this time.
0 commit comments