Skip to content

Commit 77a07c9

Browse files
authored
Support MergeAppend operations (apache#363)
* add ListPacker + tests * add merge append * add merge_append * fix snapshot inheritance * test manifest file and entries * add doc * fix lint * change test name * address review comments * rename _MergingSnapshotProducer to _SnapshotProducer * fix a serious bug * update the doc * remove merge_append as public API * make default to false * add test description * fix merge conflict * fix snapshot_id issue
1 parent 66b92ff commit 77a07c9

File tree

7 files changed

+567
-41
lines changed

7 files changed

+567
-41
lines changed

mkdocs/docs/configuration.md

+15
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior.
6161
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
6262
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
6363

64+
## Table behavior options
65+
66+
| Key | Options | Default | Description |
67+
| ------------------------------------ | ------------------- | ------------- | ----------------------------------------------------------- |
68+
| `commit.manifest.target-size-bytes` | Size in bytes | 8388608 (8MB) | Target size when merging manifest files |
69+
| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files |
70+
| `commit.manifest-merge.enabled` | Boolean | False | Controls whether to automatically merge manifests on writes |
71+
72+
<!-- prettier-ignore-start -->
73+
74+
!!! note "Fast append"
75+
Unlike Java implementation, PyIceberg default to the [fast append](api.md#write-support) and thus `commit.manifest-merge.enabled` is set to `False` by default.
76+
77+
<!-- prettier-ignore-end -->
78+
6479
# FileIO
6580

6681
Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.

pyiceberg/manifest.py

+69
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,48 @@ class ManifestEntry(Record):
404404
def __init__(self, *data: Any, **named_data: Any) -> None:
405405
super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data})
406406

407+
def _wrap(
408+
self,
409+
new_status: ManifestEntryStatus,
410+
new_snapshot_id: Optional[int],
411+
new_data_sequence_number: Optional[int],
412+
new_file_sequence_number: Optional[int],
413+
new_file: DataFile,
414+
) -> ManifestEntry:
415+
self.status = new_status
416+
self.snapshot_id = new_snapshot_id
417+
self.data_sequence_number = new_data_sequence_number
418+
self.file_sequence_number = new_file_sequence_number
419+
self.data_file = new_file
420+
return self
421+
422+
def _wrap_append(
423+
self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile
424+
) -> ManifestEntry:
425+
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)
426+
427+
def _wrap_delete(
428+
self,
429+
new_snapshot_id: Optional[int],
430+
new_data_sequence_number: Optional[int],
431+
new_file_sequence_number: Optional[int],
432+
new_file: DataFile,
433+
) -> ManifestEntry:
434+
return self._wrap(
435+
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
436+
)
437+
438+
def _wrap_existing(
439+
self,
440+
new_snapshot_id: Optional[int],
441+
new_data_sequence_number: Optional[int],
442+
new_file_sequence_number: Optional[int],
443+
new_file: DataFile,
444+
) -> ManifestEntry:
445+
return self._wrap(
446+
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
447+
)
448+
407449

408450
PARTITION_FIELD_SUMMARY_TYPE = StructType(
409451
NestedField(509, "contains_null", BooleanType(), required=True),
@@ -655,6 +697,7 @@ class ManifestWriter(ABC):
655697
_deleted_rows: int
656698
_min_data_sequence_number: Optional[int]
657699
_partitions: List[Record]
700+
_reused_entry_wrapper: ManifestEntry
658701

659702
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
660703
self.closed = False
@@ -671,6 +714,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
671714
self._deleted_rows = 0
672715
self._min_data_sequence_number = None
673716
self._partitions = []
717+
self._reused_entry_wrapper = ManifestEntry()
674718

675719
def __enter__(self) -> ManifestWriter:
676720
"""Open the writer."""
@@ -776,6 +820,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
776820
self._writer.write_block([self.prepare_entry(entry)])
777821
return self
778822

823+
def add(self, entry: ManifestEntry) -> ManifestWriter:
824+
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
825+
self.add_entry(
826+
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
827+
)
828+
else:
829+
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
830+
return self
831+
832+
def delete(self, entry: ManifestEntry) -> ManifestWriter:
833+
self.add_entry(
834+
self._reused_entry_wrapper._wrap_delete(
835+
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
836+
)
837+
)
838+
return self
839+
840+
def existing(self, entry: ManifestEntry) -> ManifestWriter:
841+
self.add_entry(
842+
self._reused_entry_wrapper._wrap_existing(
843+
entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
844+
)
845+
)
846+
return self
847+
779848

780849
class ManifestWriterV1(ManifestWriter):
781850
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):

0 commit comments

Comments
 (0)