Skip to content

Fix dynamic commit size #3016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/huggingface_hub/_upload_large_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
WAITING_TIME_IF_NO_TASKS = 10 # seconds
MAX_NB_REGULAR_FILES_PER_COMMIT = 75
MAX_NB_LFS_FILES_PER_COMMIT = 150
COMMIT_SIZE_SCALE: List[int] = [20, 50, 75, 100, 125, 200, 250, 400, 600, 1000]


def upload_large_folder_internal(
Expand Down Expand Up @@ -184,6 +185,8 @@ def __init__(self, items: List[JOB_ITEM_T]):
self.last_commit_attempt: Optional[float] = None

self._started_at = datetime.now()
self._chunk_idx: int = 1
self._chunk_lock: Lock = Lock()

# Setup queues
for item in self.items:
Expand All @@ -199,6 +202,21 @@ def __init__(self, items: List[JOB_ITEM_T]):
else:
logger.debug(f"Skipping file {paths.path_in_repo} (already uploaded and committed)")

def target_chunk(self) -> int:
with self._chunk_lock:
return COMMIT_SIZE_SCALE[self._chunk_idx]

def update_chunk(self, success: bool, nb_items: int, duration: float) -> None:
with self._chunk_lock:
if not success:
logger.warn(f"Failed to commit {nb_items} files at once. Will retry with less files in next batch.")
self._chunk_idx -= 1
elif nb_items >= COMMIT_SIZE_SCALE[self._chunk_idx] and duration < 40:
logger.info(f"Successfully committed {nb_items} at once. Increasing the limit for next batch.")
self._chunk_idx += 1

self._chunk_idx = max(0, min(self._chunk_idx, len(COMMIT_SIZE_SCALE) - 1))

def current_report(self) -> str:
"""Generate a report of the current status of the large upload."""
nb_hashed = 0
Expand Down Expand Up @@ -351,6 +369,8 @@ def _worker_job(
status.nb_workers_preupload_lfs -= 1

elif job == WorkerJob.COMMIT:
start_ts = time.time()
success = True
try:
_commit(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
except KeyboardInterrupt:
Expand All @@ -360,6 +380,9 @@ def _worker_job(
traceback.format_exc()
for item in items:
status.queue_commit.put(item)
success = False
duration = time.time() - start_ts
status.update_chunk(success, len(items), duration)
with status.lock:
status.last_commit_attempt = time.time()
status.nb_workers_commit -= 1
Expand Down Expand Up @@ -393,7 +416,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
elif status.queue_get_upload_mode.qsize() >= 10:
status.nb_workers_get_upload_mode += 1
logger.debug("Job: get upload mode (>10 files ready)")
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk()))

# 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS
elif status.queue_preupload_lfs.qsize() > 0 and status.nb_workers_preupload_lfs == 0:
Expand All @@ -411,7 +434,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
elif status.queue_get_upload_mode.qsize() > 0 and status.nb_workers_get_upload_mode == 0:
status.nb_workers_get_upload_mode += 1
logger.debug("Job: get upload mode (no other worker getting upload mode)")
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk()))

# 7. Preupload LFS file if at least 1 file
# Skip if hf_transfer is enabled and there is already a worker preuploading LFS
Expand All @@ -432,7 +455,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
elif status.queue_get_upload_mode.qsize() > 0:
status.nb_workers_get_upload_mode += 1
logger.debug("Job: get upload mode")
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk()))

# 10. Commit if at least 1 file and 1 min since last commit attempt
elif (
Expand Down
34 changes: 34 additions & 0 deletions tests/test_upload_large_folder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# tests/test_upload_large_folder.py
import pytest

from huggingface_hub._upload_large_folder import COMMIT_SIZE_SCALE, LargeUploadStatus


@pytest.fixture
def status():
return LargeUploadStatus(items=[])


def test_target_chunk_default(status):
assert status.target_chunk() == COMMIT_SIZE_SCALE[1]


@pytest.mark.parametrize(
"start_idx, success, delta_items, duration, expected_idx",
[
(2, False, 0, 10, 1), # drop by one on failure
(0, False, 0, 10, 0), # never go below zero
(1, True, 0, 50, 1), # duration >= 40 --> no bump
(1, True, -1, 30, 1), # nb_items < threshold --> no bump
(1, True, 0, 30, 2), # fast enough and enough items
(len(COMMIT_SIZE_SCALE) - 1, True, 0, 10, len(COMMIT_SIZE_SCALE) - 1), # never exceed last index
],
)
def test_update_chunk_transitions(status, start_idx, success, delta_items, duration, expected_idx):
status._chunk_idx = start_idx
threshold = COMMIT_SIZE_SCALE[start_idx]
nb_items = threshold + delta_items
status.update_chunk(success=success, nb_items=nb_items, duration=duration)

assert status._chunk_idx == expected_idx
assert status.target_chunk() == COMMIT_SIZE_SCALE[expected_idx]