@@ -3922,29 +3922,24 @@ def __init__(
3922
3922
self ._merge_enabled = merge_enabled
3923
3923
self ._snapshot_producer = snapshot_producer
3924
3924
3925
- def _group_by_spec (
3926
- self , first_manifest : ManifestFile , remaining_manifests : List [ManifestFile ]
3927
- ) -> Dict [int , List [ManifestFile ]]:
3925
+ def _group_by_spec (self , manifests : List [ManifestFile ]) -> Dict [int , List [ManifestFile ]]:
3928
3926
groups = defaultdict (list )
3929
- groups [first_manifest .partition_spec_id ].append (first_manifest )
3930
- for manifest in remaining_manifests :
3927
+ for manifest in manifests :
3931
3928
groups [manifest .partition_spec_id ].append (manifest )
3932
3929
return groups
3933
3930
3934
3931
def _create_manifest (self , spec_id : int , manifest_bin : List [ManifestFile ]) -> ManifestFile :
3935
3932
with self ._snapshot_producer .new_manifest_writer (spec = self ._snapshot_producer .spec (spec_id )) as writer :
3936
3933
for manifest in manifest_bin :
3937
3934
for entry in self ._snapshot_producer .fetch_manifest_entry (manifest = manifest , discard_deleted = False ):
3938
- if entry .status == ManifestEntryStatus .DELETED :
3939
- # suppress deletes from previous snapshots. only files deleted by this snapshot
3940
- # should be added to the new manifest
3941
- if entry .snapshot_id == self ._snapshot_producer .snapshot_id :
3942
- writer .delete (entry )
3935
+ if entry .status == ManifestEntryStatus .DELETED and entry .snapshot_id == self ._snapshot_producer .snapshot_id :
3936
+ # only files deleted by this snapshot should be added to the new manifest
3937
+ writer .delete (entry )
3943
3938
elif entry .status == ManifestEntryStatus .ADDED and entry .snapshot_id == self ._snapshot_producer .snapshot_id :
3944
- # adds from this snapshot are still adds , otherwise they should be existing
3939
+ # added entries from this snapshot are still added , otherwise they should be existing
3945
3940
writer .add (entry )
3946
- else :
3947
- # add all files from the old manifest as existing files
3941
+ elif entry . status != ManifestEntryStatus . DELETED :
3942
+ # add all non-deleted files from the old manifest as existing files
3948
3943
writer .existing (entry )
3949
3944
3950
3945
return writer .to_manifest_file ()
@@ -3958,11 +3953,9 @@ def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
3958
3953
if len (manifest_bin ) == 1 :
3959
3954
output_manifests .append (manifest_bin [0 ])
3960
3955
elif first_manifest in manifest_bin and len (manifest_bin ) < self ._min_count_to_merge :
3961
- # if the bin has the first manifest (the new data files or an appended manifest file)
3962
- # then only merge it
3963
- # if the number of manifests is above the minimum count. this is applied only to bins
3964
- # with an in-memory
3965
- # manifest so that large manifests don't prevent merging older groups.
3956
+ # if the bin has the first manifest (the new data files or an appended manifest file) then only
3957
+ # merge it if the number of manifests is above the minimum count. this is applied only to bins
3958
+ # with an in-memory manifest so that large manifests don't prevent merging older groups.
3966
3959
output_manifests .extend (manifest_bin )
3967
3960
else :
3968
3961
output_manifests .append (self ._create_manifest (spec_id , manifest_bin ))
@@ -3987,8 +3980,7 @@ def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
3987
3980
return manifests
3988
3981
3989
3982
first_manifest = manifests [0 ]
3990
- remaining_manifests = manifests [1 :]
3991
- groups = self ._group_by_spec (first_manifest , remaining_manifests )
3983
+ groups = self ._group_by_spec (manifests )
3992
3984
3993
3985
merged_manifests = []
3994
3986
for spec_id in reversed (groups .keys ()):
0 commit comments