Skip to content

Add support for orc format #790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,28 @@
(array(), map(), array(struct(1)))
"""
)

spark.sql(
f"""
CREATE TABLE {catalog_name}.default.test_read_orc (
dt date,
ts timestamp,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'format-version'='2',
'write.format.default'='orc'
);
"""
)

spark.sql(f"""INSERT INTO {catalog_name}.default.test_read_orc
VALUES
(CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'),
(CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'),
(CAST('2022-03-03' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 3, 'c')
""")

spark.sql(f"DELETE FROM {catalog_name}.default.test_read_orc WHERE number = 3")
22 changes: 16 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
ICEBERG_SCHEMA = b"iceberg.schema"
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
PYARROW_ORC_FIELD_ID_KEY = b"iceberg.id"
PYARROW_FIELD_DOC_KEY = b"doc"
LIST_ELEMENT_NAME = "element"
MAP_KEY_NAME = "key"
Expand Down Expand Up @@ -627,6 +628,8 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
if file_format == FileFormat.PARQUET:
return ds.ParquetFileFormat(**kwargs)
elif file_format == FileFormat.ORC:
return ds.OrcFileFormat()
else:
raise ValueError(f"Unsupported file format: {file_format}")

Expand Down Expand Up @@ -799,11 +802,12 @@ def primitive(self, primitive: pa.DataType) -> T:


def _get_field_id(field: pa.Field) -> Optional[int]:
return (
int(field_id_str.decode())
if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
else None
)
if field.metadata and (field_id_str := field.metadata.get(PYARROW_ORC_FIELD_ID_KEY)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add a doc somewhere to mention that ORC read support requires pyarrow version >= 13.0.0, since the orc metadata is exposed in pyarrow schema in 13.0.0 apache/arrow#35304

Also, shall we check PYARROW_PARQUET_FIELD_ID_KEY first since parquet is the default file format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the right approach I will promote Parquet at the top then ORC and will make necessary doc changes while releasing ORC read it will definitely help users to get started, thanks for the feedback.

return int(field_id_str.decode())
elif field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)):
return int(field_id_str.decode())
else:
return None


class _HasIds(PyArrowSchemaVisitor[bool]):
Expand Down Expand Up @@ -912,6 +916,9 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()
if primitive.unit == "ns":
if primitive.tz == "UTC":
return TimestamptzType()
Comment on lines +919 to +921
Copy link
Contributor

@HonahX HonahX Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nanosecond timestamp is added in version 3, which is still under development and not formally adopted. Pyiceberg does not support nanosecond yet so I think we should not add the conversion here. (and it should be converted to a separate TimestampNanoType in the future: https://iceberg.apache.org/spec/#primitive-types )

I think you add this because arrow reads ORC timestamp as nanoseconds since ORC's timestamp types always contain nanosecond information, but we want to read this as microsecond.

It seems Java side currently just treat ORC's timestamp type as us unit one

We could probably fix this at arrow schema level. For example, we can add an additional conversion for physical_schema here to change the unit of arrow timestamp from ns to us:

fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
file_schema = pyarrow_to_schema(physical_schema, name_mapping)

Changing the physical schema also ensures that the actual timestamp data is read with us unit as required by TimestampType
fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],

However, I also feel it is not the ultimate solution because we assume the unit is microsecond. When TimestampNanoType is in, we may need to do some additional steps to ensure we reads the data using the correct unit.

@MehulBatra @Fokko Would love to hear your thoughts on this. Please correct me if I make any mistakes about the ORC's behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @HonahX you got it right arrow is reading the ORC timestamp unit as ns that's why I added a schema conversion at the primitive types even after being aware that we will be supporting nanoseconds in the coming version3, but I feel your suggestion makes sense till the time we start supporting version3 altogether.
I will try to incorporate the changes to read ns as us.
@Fokko what do you think are we good with this, for the time being?

elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive):
return BinaryType()
elif pa.types.is_fixed_size_binary(primitive):
Expand Down Expand Up @@ -972,8 +979,11 @@ def _task_to_table(
name_mapping: Optional[NameMapping] = None,
) -> Optional[pa.Table]:
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with fs.open_input_file(path) as fin:
if task.file.file_format == FileFormat.PARQUET:
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
if task.file.file_format == FileFormat.ORC:
arrow_format = ds.OrcFileFormat() # currently ORC doesn't support any fragment scan options
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
file_schema = pyarrow_to_schema(physical_schema, name_mapping)
Expand Down
13 changes: 12 additions & 1 deletion tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name

import datetime
import math
import time
import uuid
Expand Down Expand Up @@ -537,3 +537,14 @@ def another_task() -> None:

table.transaction().set_properties(lock="xxx").commit_transaction()
assert table.properties.get("lock") == "xxx"


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_pyarrow_read_orc(catalog: Catalog) -> None:
table_orc = catalog.load_table("default.test_read_orc")
arrow_table = table_orc.scan(row_filter="number > -1", selected_fields=("number", "letter", "dt")).to_arrow()
assert len(arrow_table) == 2
assert arrow_table["number"][0].as_py() == 1
assert arrow_table["letter"][1].as_py() == "b"
assert arrow_table["dt"][0].as_py() == datetime.date(2022, 3, 1)