From d5d31d7a879f0f3fbb5594a77bb8135f29637c15 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 20 Jun 2024 22:50:43 +0000 Subject: [PATCH 1/6] checkpoint --- tests/integration/test_writes/test_writes.py | 46 +++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 4585406cbb..8fd6456790 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -916,7 +916,7 @@ def test_sanitize_character_partitioned(catalog: Catalog) -> None: @pytest.mark.parametrize("format_version", [1, 2]) -def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: +def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: identifier = "default.table_append_subset_of_schema" tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) arrow_table_without_some_columns = arrow_table_with_null.combine_chunks().drop(arrow_table_with_null.column_names[0]) @@ -925,3 +925,47 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null tbl.append(arrow_table_without_some_columns) # overwrite and then append should produce twice the data assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2 + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_all_timestamp_precision(session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_all_timestamp_precision" + arrow_table_schema_with_all_timestamp_precisions = pa.schema([ + ("timestamp_s", pa.timestamp(unit="s")), + ("timestamptz_s", pa.timestamp(unit="s", tz="UTC")), + ("timestamp_ms", pa.timestamp(unit="ms")), + ("timestamptz_ms", pa.timestamp(unit="ms", tz="UTC")), + ("timestamp_us", pa.timestamp(unit="us")), + ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ns", pa.timestamp(unit="ns")), + ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), + ]) + TEST_DATA_WITH_NULL = { + "timestamp_s": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + "timestamptz_s": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], + "timestamp_ms": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + "timestamptz_ms": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], + "timestamp_us": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + "timestamptz_us": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], + "timestamp_ns": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + "timestamptz_ns": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], + } + tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_schema_with_all_timestamp_precisions]) + test_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions) + tbl.overwrite(test_arrow_table) From ae6ea72ea6e39ec77fcb1583f560957e251b45a7 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 22 Jun 2024 22:19:05 +0000 Subject: [PATCH 2/6] support more timestamps --- mkdocs/docs/configuration.md | 6 +++- pyiceberg/io/pyarrow.py | 24 ++++++++++--- tests/integration/test_writes/test_writes.py | 35 ++++++++++++++---- tests/io/test_pyarrow_visitor.py | 38 +++++++++++--------- 4 files changed, 74 insertions(+), 29 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index f8a69119c8..67bb2c1baa 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -299,4 +299,8 @@ PyIceberg uses multiple threads to parallelize operations. The number of workers # Backward Compatibility -Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue +Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` property as "True" in the configuration file, or by setting the `PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue + +# Nanoseconds Support + +PyIceberg currently only supports upto microsecond precision in its TimestampType. PyArrow timestamp types in 's' and 'ms' will be upcast automatically to 'us' precision timestamps on write. Timestamps in 'ns' precision can also be downcast automatically on write if desired. This can be configured by setting the `downcast-ns-timestamp-on-write` property as "True" in the configuration file, or by setting the `PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE` environment variable. Refer to the [nanoseconds timestamp proposal document](https://docs.google.com/document/d/1bE1DcEGNzZAMiVJSZ0X1wElKLNkT9kRkk0hDlfkXzvU/edit#heading=h.ibflcctc9i1d) for more details on the long term roadmap for nanoseconds support diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 935b78cece..19af58c9b2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -154,6 +154,7 @@ UUIDType, ) from pyiceberg.utils.concurrent import ExecutorFactory +from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -918,11 +919,24 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return TimeType() elif pa.types.is_timestamp(primitive): primitive = cast(pa.TimestampType, primitive) - if primitive.unit == "us": - if primitive.tz == "UTC" or primitive.tz == "+00:00": - return TimestamptzType() - elif primitive.tz is None: - return TimestampType() + if primitive.unit in ("s", "ms", "us"): + # Supported types, will be upcast automatically to 'us' + pass + elif primitive.unit == "ns": + if Config().get_bool("downcast-ns-timestamp-on-write"): + logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") + else: + raise TypeError( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ) + else: + raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") + + if primitive.tz == "UTC" or primitive.tz == "+00:00": + return TimestamptzType() + elif primitive.tz is None: + return TimestampType() + elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive): return BinaryType() elif pa.types.is_fixed_size_binary(primitive): diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 8fd6456790..61bf46e9e6 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -18,7 +18,7 @@ import math import os import time -from datetime import date, datetime +from datetime import date, datetime, timezone from pathlib import Path from typing import Any, Dict from urllib.parse import urlparse @@ -916,7 +916,7 @@ def test_sanitize_character_partitioned(catalog: Catalog) -> None: @pytest.mark.parametrize("format_version", [1, 2]) -def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: +def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: identifier = "default.table_append_subset_of_schema" tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_with_null]) arrow_table_without_some_columns = arrow_table_with_null.combine_chunks().drop(arrow_table_with_null.column_names[0]) @@ -927,8 +927,9 @@ def test_table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2 +@pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_write_all_timestamp_precision(session_catalog: Catalog, format_version: int) -> None: +def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: Catalog, format_version: int) -> None: identifier = "default.table_all_timestamp_precision" arrow_table_schema_with_all_timestamp_precisions = pa.schema([ ("timestamp_s", pa.timestamp(unit="s")), @@ -966,6 +967,28 @@ def test_write_all_timestamp_precision(session_catalog: Catalog, format_version: datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), ], } - tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, [arrow_table_schema_with_all_timestamp_precisions]) - test_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions) - tbl.overwrite(test_arrow_table) + input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions) + mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"}) + + tbl = _create_table( + session_catalog, + identifier, + {"format-version": format_version}, + data=[input_arrow_table], + schema=arrow_table_schema_with_all_timestamp_precisions, + ) + tbl.overwrite(input_arrow_table) + written_arrow_table = tbl.scan().to_arrow() + + expected_schema_in_all_us = pa.schema([ + ("timestamp_s", pa.timestamp(unit="us")), + ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ms", pa.timestamp(unit="us")), + ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_us", pa.timestamp(unit="us")), + ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), + ("timestamp_ns", pa.timestamp(unit="us")), + ("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")), + ]) + assert written_arrow_table.schema == expected_schema_in_all_us + assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index d3b6217c7b..227e88af3c 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -15,10 +15,12 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name +import os import re import pyarrow as pa import pytest +from pytest_mock.plugin import MockerFixture from pyiceberg.io.pyarrow import ( _ConvertToArrowSchema, @@ -161,22 +163,25 @@ def test_pyarrow_time64_ns_to_iceberg() -> None: visit_pyarrow(pyarrow_type, _ConvertToIceberg()) -def test_pyarrow_timestamp_to_iceberg() -> None: - pyarrow_type = pa.timestamp(unit="us") +@pytest.mark.parametrize("precision", ["s", "ms", "us", "ns"]) +def test_pyarrow_timestamp_to_iceberg(mocker: MockerFixture, precision: str) -> None: + mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"}) + + pyarrow_type = pa.timestamp(unit=precision) converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg()) assert converted_iceberg_type == TimestampType() - assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type + # all timestamp types are converted to 'us' precision + assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us") def test_pyarrow_timestamp_invalid_units() -> None: - pyarrow_type = pa.timestamp(unit="ms") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ms]")): - visit_pyarrow(pyarrow_type, _ConvertToIceberg()) - pyarrow_type = pa.timestamp(unit="s") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[s]")): - visit_pyarrow(pyarrow_type, _ConvertToIceberg()) pyarrow_type = pa.timestamp(unit="ns") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ns]")): + with pytest.raises( + TypeError, + match=re.escape( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ), + ): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) @@ -192,14 +197,13 @@ def test_pyarrow_timestamp_tz_to_iceberg() -> None: def test_pyarrow_timestamp_tz_invalid_units() -> None: - pyarrow_type = pa.timestamp(unit="ms", tz="UTC") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ms, tz=UTC]")): - visit_pyarrow(pyarrow_type, _ConvertToIceberg()) - pyarrow_type = pa.timestamp(unit="s", tz="UTC") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[s, tz=UTC]")): - visit_pyarrow(pyarrow_type, _ConvertToIceberg()) pyarrow_type = pa.timestamp(unit="ns", tz="UTC") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ns, tz=UTC]")): + with pytest.raises( + TypeError, + match=re.escape( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ), + ): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) From e4471ab3adea8cd6d3953d492e8dbdaa76aa6c57 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 5 Jul 2024 17:48:28 +0000 Subject: [PATCH 3/6] adopt review feedback --- mkdocs/docs/configuration.md | 2 +- pyiceberg/catalog/__init__.py | 3 +- pyiceberg/io/pyarrow.py | 56 +++++++++++++------ pyiceberg/table/__init__.py | 6 +- tests/integration/test_add_files.py | 59 ++++++++++++++++++++ tests/integration/test_writes/test_writes.py | 2 +- tests/io/test_pyarrow_visitor.py | 12 ++-- 7 files changed, 110 insertions(+), 30 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 67bb2c1baa..d7c10b3484 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -303,4 +303,4 @@ Previous versions of Java (`<1.4.0`) implementations incorrectly assume the opti # Nanoseconds Support -PyIceberg currently only supports upto microsecond precision in its TimestampType. PyArrow timestamp types in 's' and 'ms' will be upcast automatically to 'us' precision timestamps on write. Timestamps in 'ns' precision can also be downcast automatically on write if desired. This can be configured by setting the `downcast-ns-timestamp-on-write` property as "True" in the configuration file, or by setting the `PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE` environment variable. Refer to the [nanoseconds timestamp proposal document](https://docs.google.com/document/d/1bE1DcEGNzZAMiVJSZ0X1wElKLNkT9kRkk0hDlfkXzvU/edit#heading=h.ibflcctc9i1d) for more details on the long term roadmap for nanoseconds support +PyIceberg currently only supports upto microsecond precision in its TimestampType. PyArrow timestamp types in 's' and 'ms' will be upcast automatically to 'us' precision timestamps on write. Timestamps in 'ns' precision can also be downcast automatically on write if desired. This can be configured by setting the `downcast-ns-timestamp-to-us-on-write` property as "True" in the configuration file, or by setting the `PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE` environment variable. Refer to the [nanoseconds timestamp proposal document](https://docs.google.com/document/d/1bE1DcEGNzZAMiVJSZ0X1wElKLNkT9kRkk0hDlfkXzvU/edit#heading=h.ibflcctc9i1d) for more details on the long term roadmap for nanoseconds support diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 9a951b5c8e..ea4a053f9a 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -675,8 +675,9 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow + downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") if isinstance(schema, pa.Schema): - schema: Schema = visit_pyarrow(schema, _ConvertToIcebergWithoutIDs()) # type: ignore + schema: Schema = visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) # type: ignore return schema except ModuleNotFoundError: pass diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 65c5417af5..cf61d11957 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -471,7 +471,9 @@ def __setstate__(self, state: Dict[str, Any]) -> None: def schema_to_pyarrow( - schema: Union[Schema, IcebergType], metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True + schema: Union[Schema, IcebergType], + metadata: Dict[bytes, bytes] = EMPTY_DICT, + include_field_ids: bool = True, ) -> pa.schema: return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids)) @@ -664,12 +666,12 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start return np.subtract(np.setdiff1d(np.arange(start_index, end_index), all_chunks, assume_unique=False), start_index) -def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = None) -> Schema: +def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False) -> Schema: has_ids = visit_pyarrow(schema, _HasIds()) if has_ids: - visitor = _ConvertToIceberg() + visitor = _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) elif name_mapping is not None: - visitor = _ConvertToIceberg(name_mapping=name_mapping) + visitor = _ConvertToIceberg(name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) else: raise ValueError( "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined" @@ -677,8 +679,8 @@ def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = N return visit_pyarrow(schema, visitor) -def _pyarrow_to_schema_without_ids(schema: pa.Schema) -> Schema: - return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs()) +def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema: + return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema: @@ -850,9 +852,10 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): _field_names: List[str] _name_mapping: Optional[NameMapping] - def __init__(self, name_mapping: Optional[NameMapping] = None) -> None: + def __init__(self, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False) -> None: self._field_names = [] self._name_mapping = name_mapping + self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us def _field_id(self, field: pa.Field) -> int: if self._name_mapping: @@ -923,11 +926,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: # Supported types, will be upcast automatically to 'us' pass elif primitive.unit == "ns": - if Config().get_bool("downcast-ns-timestamp-on-write"): + if self._downcast_ns_timestamp_to_us: logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") else: raise TypeError( - "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." ) else: raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") @@ -1024,8 +1027,11 @@ def _task_to_record_batches( with fs.open_input_file(path) as fin: fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema - file_schema = pyarrow_to_schema(physical_schema, name_mapping) - + # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema + # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read. + # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on + # the table format version. + file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) @@ -1063,7 +1069,7 @@ def _task_to_record_batches( arrow_table = pa.Table.from_batches([batch]) arrow_table = arrow_table.filter(pyarrow_filter) batch = arrow_table.to_batches()[0] - yield to_requested_schema(projected_schema, file_project_schema, batch) + yield to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True) current_index += len(batch) @@ -1262,8 +1268,12 @@ def project_batches( total_row_count += len(batch) -def to_requested_schema(requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch) -> pa.RecordBatch: - struct_array = visit_with_partner(requested_schema, batch, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema)) +def to_requested_schema( + requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False +) -> pa.RecordBatch: + struct_array = visit_with_partner( + requested_schema, batch, ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us), ArrowAccessor(file_schema) + ) arrays = [] fields = [] @@ -1277,8 +1287,9 @@ def to_requested_schema(requested_schema: Schema, file_schema: Schema, batch: pa class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): file_schema: Schema - def __init__(self, file_schema: Schema): + def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False): self.file_schema = file_schema + self.downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) @@ -1289,7 +1300,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: # if file_field and field_type (e.g. String) are the same # but the pyarrow type of the array is different from the expected type # (e.g. string vs larger_string), we want to cast the array to the larger type - return values.cast(target_type) + safe = True + if ( + pa.types.is_timestamp(target_type) + and target_type.unit == "us" + and pa.types.is_timestamp(values.type) + and values.type.unit == "ns" + ): + safe = False + return values.cast(target_type, safe=safe) return values def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: @@ -1926,8 +1945,11 @@ def write_parquet(task: WriteTask) -> DataFile: else: file_schema = table_schema + downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") batches = [ - to_requested_schema(requested_schema=file_schema, file_schema=table_schema, batch=batch) + to_requested_schema( + requested_schema=file_schema, file_schema=table_schema, batch=batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) for batch in task.record_batches ] arrow_table = pa.Table.from_batches(batches) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8c1493974b..91eb9c089a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -138,6 +138,7 @@ transform_dict_value_to_str, ) from pyiceberg.utils.concurrent import ExecutorFactory +from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import datetime_to_millis from pyiceberg.utils.deprecated import deprecated from pyiceberg.utils.singleton import _convert_to_hashable_type @@ -168,11 +169,12 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> """ from pyiceberg.io.pyarrow import _pyarrow_to_schema_without_ids, pyarrow_to_schema + downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") name_mapping = table_schema.name_mapping try: - task_schema = pyarrow_to_schema(other_schema, name_mapping=name_mapping) + task_schema = pyarrow_to_schema(other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema) + other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) additional_names = set(other_schema.column_names) - set(table_schema.column_names) raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 84729fcca4..fffdfc8ef9 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -16,6 +16,7 @@ # under the License. # pylint:disable=redefined-outer-name +import os from datetime import date from typing import Iterator, Optional @@ -23,6 +24,7 @@ import pyarrow.parquet as pq import pytest from pyspark.sql import SparkSession +from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError @@ -36,6 +38,7 @@ IntegerType, NestedField, StringType, + TimestamptzType, ) TABLE_SCHEMA = Schema( @@ -448,3 +451,59 @@ def test_add_files_snapshot_properties(spark: SparkSession, session_catalog: Cat assert "snapshot_prop_a" in summary assert summary["snapshot_prop_a"] == "test_prop_a" + + +@pytest.mark.integration +def test_timestamp_tz_ns_downcast_on_read(session_catalog: Catalog, format_version: int, mocker: MockerFixture) -> None: + nanoseconds_schema_iceberg = Schema(NestedField(1, "quux", TimestamptzType())) + + nanoseconds_schema = pa.schema([ + ("quux", pa.timestamp("ns", tz="UTC")), + ]) + + arrow_table = pa.Table.from_pylist( + [ + { + "quux": 1615967687249846175, # 2021-03-17 07:54:47.249846159 + } + ], + schema=nanoseconds_schema, + ) + mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"}) + + identifier = f"default.timestamptz_ns_added{format_version}" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table( + identifier=identifier, + schema=nanoseconds_schema_iceberg, + properties={"format-version": str(format_version)}, + partition_spec=PartitionSpec(), + ) + + file_paths = [f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer: + writer.write_table(arrow_table) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + assert tbl.scan().to_arrow() == pa.concat_tables( + [ + arrow_table.cast( + pa.schema([ + ("quux", pa.timestamp("us", tz="UTC")), + ]), + safe=False, + ) + ] + * 5 + ) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 61bf46e9e6..3ef4033fa4 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -968,7 +968,7 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C ], } input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions) - mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"}) + mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"}) tbl = _create_table( session_catalog, diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 227e88af3c..897af1bbbd 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -15,12 +15,10 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name -import os import re import pyarrow as pa import pytest -from pytest_mock.plugin import MockerFixture from pyiceberg.io.pyarrow import ( _ConvertToArrowSchema, @@ -164,11 +162,9 @@ def test_pyarrow_time64_ns_to_iceberg() -> None: @pytest.mark.parametrize("precision", ["s", "ms", "us", "ns"]) -def test_pyarrow_timestamp_to_iceberg(mocker: MockerFixture, precision: str) -> None: - mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"}) - +def test_pyarrow_timestamp_to_iceberg(precision: str) -> None: pyarrow_type = pa.timestamp(unit=precision) - converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg()) + converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg(downcast_ns_timestamp_to_us=True)) assert converted_iceberg_type == TimestampType() # all timestamp types are converted to 'us' precision assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us") @@ -179,7 +175,7 @@ def test_pyarrow_timestamp_invalid_units() -> None: with pytest.raises( TypeError, match=re.escape( - "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." ), ): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) @@ -201,7 +197,7 @@ def test_pyarrow_timestamp_tz_invalid_units() -> None: with pytest.raises( TypeError, match=re.escape( - "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." ), ): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) From e41a81338f2a576fc98e59a22187254432fa11f8 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 5 Jul 2024 17:59:19 +0000 Subject: [PATCH 4/6] fix --- pyiceberg/catalog/__init__.py | 6 ++++-- pyiceberg/io/pyarrow.py | 15 ++++++++++----- pyiceberg/table/__init__.py | 6 ++++-- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index ea4a053f9a..fe1efd1856 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -675,9 +675,11 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow - downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") + downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False if isinstance(schema, pa.Schema): - schema: Schema = visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) # type: ignore + schema: Schema = visit_pyarrow( # type: ignore + schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + ) return schema except ModuleNotFoundError: pass diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index cf61d11957..6fe01d59ff 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -666,7 +666,9 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start return np.subtract(np.setdiff1d(np.arange(start_index, end_index), all_chunks, assume_unique=False), start_index) -def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False) -> Schema: +def pyarrow_to_schema( + schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False +) -> Schema: has_ids = visit_pyarrow(schema, _HasIds()) if has_ids: visitor = _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) @@ -1900,8 +1902,8 @@ def data_file_statistics_from_parquet_metadata( stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length ) - col_aggs[field_id].update_min(statistics.min) - col_aggs[field_id].update_max(statistics.max) + col_aggs[field_id].update_min(statistics.min_raw) + col_aggs[field_id].update_max(statistics.max_raw) except pyarrow.lib.ArrowNotImplementedError as e: invalidate_col.add(field_id) @@ -1945,10 +1947,13 @@ def write_parquet(task: WriteTask) -> DataFile: else: file_schema = table_schema - downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") + downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False batches = [ to_requested_schema( - requested_schema=file_schema, file_schema=table_schema, batch=batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + requested_schema=file_schema, + file_schema=table_schema, + batch=batch, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, ) for batch in task.record_batches ] diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 91eb9c089a..2e464bb3ea 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -169,10 +169,12 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> """ from pyiceberg.io.pyarrow import _pyarrow_to_schema_without_ids, pyarrow_to_schema - downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") + downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False name_mapping = table_schema.name_mapping try: - task_schema = pyarrow_to_schema(other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + task_schema = pyarrow_to_schema( + other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) except ValueError as e: other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) additional_names = set(other_schema.column_names) - set(table_schema.column_names) From d7483db94afd8340f1e31b7a1d080a5f6dbf3f01 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 6 Jul 2024 00:31:43 +0000 Subject: [PATCH 5/6] revert min_raw max_raw change --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6fe01d59ff..762b009e2a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1902,8 +1902,8 @@ def data_file_statistics_from_parquet_metadata( stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length ) - col_aggs[field_id].update_min(statistics.min_raw) - col_aggs[field_id].update_max(statistics.max_raw) + col_aggs[field_id].update_min(statistics.min) + col_aggs[field_id].update_max(statistics.max) except pyarrow.lib.ArrowNotImplementedError as e: invalidate_col.add(field_id) From 97ce9a062cc759296cee2d1b4d36dbba201abde6 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 9 Jul 2024 16:32:47 +0000 Subject: [PATCH 6/6] adopt nits --- pyiceberg/catalog/__init__.py | 3 ++- pyiceberg/io/pyarrow.py | 4 ++-- pyiceberg/table/__init__.py | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index fe1efd1856..ae978329a0 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -49,6 +49,7 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( + DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, CommitTableRequest, CommitTableResponse, CreateTableTransaction, @@ -675,7 +676,7 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow - downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): schema: Schema = visit_pyarrow( # type: ignore schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 762b009e2a..f3561416a9 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -126,7 +126,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import PropertyUtil, TableProperties, WriteTask +from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties, WriteTask from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform @@ -1947,7 +1947,7 @@ def write_parquet(task: WriteTask) -> DataFile: else: file_schema = table_schema - downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False batches = [ to_requested_schema( requested_schema=file_schema, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2e464bb3ea..c4261864cc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -154,7 +154,7 @@ ALWAYS_TRUE = AlwaysTrue() TABLE_ROOT_ID = -1 - +DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" _JAVA_LONG_MAX = 9223372036854775807 @@ -169,7 +169,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> """ from pyiceberg.io.pyarrow import _pyarrow_to_schema_without_ids, pyarrow_to_schema - downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False name_mapping = table_schema.name_mapping try: task_schema = pyarrow_to_schema(