diff --git a/src/huggingface_hub/_upload_large_folder.py b/src/huggingface_hub/_upload_large_folder.py index c925a31ff5..756d79d19c 100644 --- a/src/huggingface_hub/_upload_large_folder.py +++ b/src/huggingface_hub/_upload_large_folder.py @@ -44,6 +44,7 @@ WAITING_TIME_IF_NO_TASKS = 10 # seconds MAX_NB_REGULAR_FILES_PER_COMMIT = 75 MAX_NB_LFS_FILES_PER_COMMIT = 150 +COMMIT_SIZE_SCALE: List[int] = [20, 50, 75, 100, 125, 200, 250, 400, 600, 1000] def upload_large_folder_internal( @@ -184,6 +185,8 @@ def __init__(self, items: List[JOB_ITEM_T]): self.last_commit_attempt: Optional[float] = None self._started_at = datetime.now() + self._chunk_idx: int = 1 + self._chunk_lock: Lock = Lock() # Setup queues for item in self.items: @@ -199,6 +202,21 @@ def __init__(self, items: List[JOB_ITEM_T]): else: logger.debug(f"Skipping file {paths.path_in_repo} (already uploaded and committed)") + def target_chunk(self) -> int: + with self._chunk_lock: + return COMMIT_SIZE_SCALE[self._chunk_idx] + + def update_chunk(self, success: bool, nb_items: int, duration: float) -> None: + with self._chunk_lock: + if not success: + logger.warn(f"Failed to commit {nb_items} files at once. Will retry with less files in next batch.") + self._chunk_idx -= 1 + elif nb_items >= COMMIT_SIZE_SCALE[self._chunk_idx] and duration < 40: + logger.info(f"Successfully committed {nb_items} at once. Increasing the limit for next batch.") + self._chunk_idx += 1 + + self._chunk_idx = max(0, min(self._chunk_idx, len(COMMIT_SIZE_SCALE) - 1)) + def current_report(self) -> str: """Generate a report of the current status of the large upload.""" nb_hashed = 0 @@ -351,6 +369,8 @@ def _worker_job( status.nb_workers_preupload_lfs -= 1 elif job == WorkerJob.COMMIT: + start_ts = time.time() + success = True try: _commit(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision) except KeyboardInterrupt: @@ -360,6 +380,9 @@ def _worker_job( traceback.format_exc() for item in items: status.queue_commit.put(item) + success = False + duration = time.time() - start_ts + status.update_chunk(success, len(items), duration) with status.lock: status.last_commit_attempt = time.time() status.nb_workers_commit -= 1 @@ -393,7 +416,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob, elif status.queue_get_upload_mode.qsize() >= 10: status.nb_workers_get_upload_mode += 1 logger.debug("Job: get upload mode (>10 files ready)") - return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50)) + return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk())) # 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS elif status.queue_preupload_lfs.qsize() > 0 and status.nb_workers_preupload_lfs == 0: @@ -411,7 +434,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob, elif status.queue_get_upload_mode.qsize() > 0 and status.nb_workers_get_upload_mode == 0: status.nb_workers_get_upload_mode += 1 logger.debug("Job: get upload mode (no other worker getting upload mode)") - return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50)) + return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk())) # 7. Preupload LFS file if at least 1 file # Skip if hf_transfer is enabled and there is already a worker preuploading LFS @@ -432,7 +455,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob, elif status.queue_get_upload_mode.qsize() > 0: status.nb_workers_get_upload_mode += 1 logger.debug("Job: get upload mode") - return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50)) + return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk())) # 10. Commit if at least 1 file and 1 min since last commit attempt elif ( diff --git a/tests/test_upload_large_folder.py b/tests/test_upload_large_folder.py new file mode 100644 index 0000000000..f2de3131a9 --- /dev/null +++ b/tests/test_upload_large_folder.py @@ -0,0 +1,34 @@ +# tests/test_upload_large_folder.py +import pytest + +from huggingface_hub._upload_large_folder import COMMIT_SIZE_SCALE, LargeUploadStatus + + +@pytest.fixture +def status(): + return LargeUploadStatus(items=[]) + + +def test_target_chunk_default(status): + assert status.target_chunk() == COMMIT_SIZE_SCALE[1] + + +@pytest.mark.parametrize( + "start_idx, success, delta_items, duration, expected_idx", + [ + (2, False, 0, 10, 1), # drop by one on failure + (0, False, 0, 10, 0), # never go below zero + (1, True, 0, 50, 1), # duration >= 40 --> no bump + (1, True, -1, 30, 1), # nb_items < threshold --> no bump + (1, True, 0, 30, 2), # fast enough and enough items + (len(COMMIT_SIZE_SCALE) - 1, True, 0, 10, len(COMMIT_SIZE_SCALE) - 1), # never exceed last index + ], +) +def test_update_chunk_transitions(status, start_idx, success, delta_items, duration, expected_idx): + status._chunk_idx = start_idx + threshold = COMMIT_SIZE_SCALE[start_idx] + nb_items = threshold + delta_items + status.update_chunk(success=success, nb_items=nb_items, duration=duration) + + assert status._chunk_idx == expected_idx + assert status.target_chunk() == COMMIT_SIZE_SCALE[expected_idx]