Skip to content

Commit f782c54

Browse files
committed
Read ORC file based Iceberg table and Integration test for the same
1 parent fcad68e commit f782c54

File tree

3 files changed

+26
-10
lines changed

3 files changed

+26
-10
lines changed

dev/provision.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@
346346
spark.sql(
347347
f"""
348348
CREATE TABLE {catalog_name}.default.test_read_orc (
349+
dt date,
350+
ts timestamp,
349351
number integer,
350352
letter string
351353
)
@@ -357,4 +359,11 @@
357359
"""
358360
)
359361

360-
spark.sql(f"INSERT INTO {catalog_name}.default.test_read_orc VALUES (1,'hello')")
362+
spark.sql(f"""INSERT INTO {catalog_name}.default.test_read_orc
363+
VALUES
364+
(CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'),
365+
(CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'),
366+
(CAST('2022-03-03' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 3, 'c')
367+
""")
368+
369+
spark.sql(f"DELETE FROM {catalog_name}.default.test_read_orc WHERE number = 3")

pyiceberg/io/pyarrow.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@
168168
ICEBERG_SCHEMA = b"iceberg.schema"
169169
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
170170
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
171+
PYARROW_ORC_FIELD_ID_KEY = b"iceberg.id"
171172
PYARROW_FIELD_DOC_KEY = b"doc"
172173
LIST_ELEMENT_NAME = "element"
173174
MAP_KEY_NAME = "key"
@@ -801,11 +802,12 @@ def primitive(self, primitive: pa.DataType) -> T:
801802

802803

803804
def _get_field_id(field: pa.Field) -> Optional[int]:
804-
return (
805-
int(field_id_str.decode())
806-
if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
807-
else None
808-
)
805+
if field.metadata and (field_id_str := field.metadata.get(PYARROW_ORC_FIELD_ID_KEY)):
806+
return int(field_id_str.decode())
807+
elif field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)):
808+
return int(field_id_str.decode())
809+
else:
810+
return None
809811

810812

811813
class _HasIds(PyArrowSchemaVisitor[bool]):
@@ -914,6 +916,9 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
914916
return TimestamptzType()
915917
elif primitive.tz is None:
916918
return TimestampType()
919+
if primitive.unit == "ns":
920+
if primitive.tz == "UTC":
921+
return TimestamptzType()
917922
elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive):
918923
return BinaryType()
919924
elif pa.types.is_fixed_size_binary(primitive):

tests/integration/test_reads.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
18-
18+
import datetime
1919
import math
2020
import time
2121
import uuid
@@ -541,8 +541,10 @@ def another_task() -> None:
541541

542542
@pytest.mark.integration
543543
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
544-
def test_pyarrow_read(catalog: Catalog) -> None:
544+
def test_pyarrow_read_orc(catalog: Catalog) -> None:
545545
table_orc = catalog.load_table("default.test_read_orc")
546-
arrow_table = table_orc.scan(row_filter="number > -1", selected_fields=("number", "letter")).to_arrow()
547-
assert len(arrow_table) == 1
546+
arrow_table = table_orc.scan(row_filter="number > -1", selected_fields=("number", "letter", "dt")).to_arrow()
547+
assert len(arrow_table) == 2
548548
assert arrow_table["number"][0].as_py() == 1
549+
assert arrow_table["letter"][1].as_py() == "b"
550+
assert arrow_table["dt"][0].as_py() == datetime.date(2022, 3, 1)

0 commit comments

Comments
 (0)