diff --git a/dev/provision.py b/dev/provision.py index 44086caf20..d8a454af47 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -342,3 +342,28 @@ (array(), map(), array(struct(1))) """ ) + + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_read_orc ( + dt date, + ts timestamp, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2', + 'write.format.default'='orc' + ); + """ + ) + + spark.sql(f"""INSERT INTO {catalog_name}.default.test_read_orc + VALUES + (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), + (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), + (CAST('2022-03-03' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 3, 'c') + """) + + spark.sql(f"DELETE FROM {catalog_name}.default.test_read_orc WHERE number = 3") diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 04f30ec63e..878f7d54ed 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -168,6 +168,7 @@ ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" +PYARROW_ORC_FIELD_ID_KEY = b"iceberg.id" PYARROW_FIELD_DOC_KEY = b"doc" LIST_ELEMENT_NAME = "element" MAP_KEY_NAME = "key" @@ -627,6 +628,8 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat: if file_format == FileFormat.PARQUET: return ds.ParquetFileFormat(**kwargs) + elif file_format == FileFormat.ORC: + return ds.OrcFileFormat() else: raise ValueError(f"Unsupported file format: {file_format}") @@ -799,11 +802,12 @@ def primitive(self, primitive: pa.DataType) -> T: def _get_field_id(field: pa.Field) -> Optional[int]: - return ( - int(field_id_str.decode()) - if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY))) - else None - ) + if field.metadata and (field_id_str := field.metadata.get(PYARROW_ORC_FIELD_ID_KEY)): + return int(field_id_str.decode()) + elif field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)): + return int(field_id_str.decode()) + else: + return None class _HasIds(PyArrowSchemaVisitor[bool]): @@ -912,6 +916,9 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return TimestamptzType() elif primitive.tz is None: return TimestampType() + if primitive.unit == "ns": + if primitive.tz == "UTC": + return TimestamptzType() elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive): return BinaryType() elif pa.types.is_fixed_size_binary(primitive): @@ -972,8 +979,11 @@ def _task_to_table( name_mapping: Optional[NameMapping] = None, ) -> Optional[pa.Table]: _, _, path = PyArrowFileIO.parse_location(task.file.file_path) - arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) with fs.open_input_file(path) as fin: + if task.file.file_format == FileFormat.PARQUET: + arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) + if task.file.file_format == FileFormat.ORC: + arrow_format = ds.OrcFileFormat() # currently ORC doesn't support any fragment scan options fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema file_schema = pyarrow_to_schema(physical_schema, name_mapping) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 80a6f18632..4a1ae015fd 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name - +import datetime import math import time import uuid @@ -537,3 +537,14 @@ def another_task() -> None: table.transaction().set_properties(lock="xxx").commit_transaction() assert table.properties.get("lock") == "xxx" + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_pyarrow_read_orc(catalog: Catalog) -> None: + table_orc = catalog.load_table("default.test_read_orc") + arrow_table = table_orc.scan(row_filter="number > -1", selected_fields=("number", "letter", "dt")).to_arrow() + assert len(arrow_table) == 2 + assert arrow_table["number"][0].as_py() == 1 + assert arrow_table["letter"][1].as_py() == "b" + assert arrow_table["dt"][0].as_py() == datetime.date(2022, 3, 1)