Skip to content

Commit 6708a6e

Browse files
authored
Update table metadata throughout transaction (#471)
* Update table metadata throughout transaction This PR add support for updating the table metadata throughout the transaction. This way, if a schema is first evolved, and then a snapshot is created based on the latest schema, it will be able to find the schema. * Fix integration tests * Thanks Honah! * Include the partition evolution * Cleanup
1 parent 9a5bb07 commit 6708a6e

File tree

8 files changed

+396
-339
lines changed

8 files changed

+396
-339
lines changed

pyiceberg/io/pyarrow.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
visit_with_partner,
126126
)
127127
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
128+
from pyiceberg.table.metadata import TableMetadata
128129
from pyiceberg.table.name_mapping import NameMapping
129130
from pyiceberg.transforms import TruncateTransform
130131
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
@@ -1720,7 +1721,7 @@ def fill_parquet_file_metadata(
17201721
data_file.split_offsets = split_offsets
17211722

17221723

1723-
def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[Schema] = None) -> Iterator[DataFile]:
1724+
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
17241725
task = next(tasks)
17251726

17261727
try:
@@ -1730,15 +1731,15 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
17301731
except StopIteration:
17311732
pass
17321733

1733-
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
1734+
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
17341735

1735-
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
1736-
file_schema = file_schema or table.schema()
1737-
arrow_file_schema = schema_to_pyarrow(file_schema)
1736+
file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
1737+
schema = table_metadata.schema()
1738+
arrow_file_schema = schema_to_pyarrow(schema)
17381739

1739-
fo = table.io.new_output(file_path)
1740+
fo = io.new_output(file_path)
17401741
row_group_size = PropertyUtil.property_as_int(
1741-
properties=table.properties,
1742+
properties=table_metadata.properties,
17421743
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
17431744
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
17441745
)
@@ -1757,16 +1758,16 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
17571758
# sort_order_id=task.sort_order_id,
17581759
sort_order_id=None,
17591760
# Just copy these from the table for now
1760-
spec_id=table.spec().spec_id,
1761+
spec_id=table_metadata.default_spec_id,
17611762
equality_ids=None,
17621763
key_metadata=None,
17631764
)
17641765

17651766
fill_parquet_file_metadata(
17661767
data_file=data_file,
17671768
parquet_metadata=writer.writer.metadata,
1768-
stats_columns=compute_statistics_plan(file_schema, table.properties),
1769-
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
1769+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1770+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
17701771
)
17711772
return iter([data_file])
17721773

0 commit comments

Comments
 (0)