Skip to content

Commit 0709088

Browse files
maximizemaxwellWauplingithub-actions[bot]hanouticelina
authored
Fix dynamic commit size (#3016)
* add COMMIT_SIZE_SCALE list, methods in LargeUploadStatus, changed to dynamic scale * revised style * fixed more hard-coded parts * Apply style fixes * Update _upload_large_folder.py Co-authored-by: Lucain <[email protected]> * Update _upload_large_folder.py Co-authored-by: Lucain <[email protected]> * Update _upload_large_folder.py Co-authored-by: Lucain <[email protected]> * add test_upload_large_folder * Update test_upload_large_folder.py Co-authored-by: célina <[email protected]> --------- Co-authored-by: Lucain <[email protected]> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Lucain <[email protected]> Co-authored-by: célina <[email protected]>
1 parent 8693672 commit 0709088

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

src/huggingface_hub/_upload_large_folder.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
WAITING_TIME_IF_NO_TASKS = 10 # seconds
4545
MAX_NB_REGULAR_FILES_PER_COMMIT = 75
4646
MAX_NB_LFS_FILES_PER_COMMIT = 150
47+
COMMIT_SIZE_SCALE: List[int] = [20, 50, 75, 100, 125, 200, 250, 400, 600, 1000]
4748

4849

4950
def upload_large_folder_internal(
@@ -184,6 +185,8 @@ def __init__(self, items: List[JOB_ITEM_T]):
184185
self.last_commit_attempt: Optional[float] = None
185186

186187
self._started_at = datetime.now()
188+
self._chunk_idx: int = 1
189+
self._chunk_lock: Lock = Lock()
187190

188191
# Setup queues
189192
for item in self.items:
@@ -199,6 +202,21 @@ def __init__(self, items: List[JOB_ITEM_T]):
199202
else:
200203
logger.debug(f"Skipping file {paths.path_in_repo} (already uploaded and committed)")
201204

205+
def target_chunk(self) -> int:
206+
with self._chunk_lock:
207+
return COMMIT_SIZE_SCALE[self._chunk_idx]
208+
209+
def update_chunk(self, success: bool, nb_items: int, duration: float) -> None:
210+
with self._chunk_lock:
211+
if not success:
212+
logger.warn(f"Failed to commit {nb_items} files at once. Will retry with less files in next batch.")
213+
self._chunk_idx -= 1
214+
elif nb_items >= COMMIT_SIZE_SCALE[self._chunk_idx] and duration < 40:
215+
logger.info(f"Successfully committed {nb_items} at once. Increasing the limit for next batch.")
216+
self._chunk_idx += 1
217+
218+
self._chunk_idx = max(0, min(self._chunk_idx, len(COMMIT_SIZE_SCALE) - 1))
219+
202220
def current_report(self) -> str:
203221
"""Generate a report of the current status of the large upload."""
204222
nb_hashed = 0
@@ -351,6 +369,8 @@ def _worker_job(
351369
status.nb_workers_preupload_lfs -= 1
352370

353371
elif job == WorkerJob.COMMIT:
372+
start_ts = time.time()
373+
success = True
354374
try:
355375
_commit(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
356376
except KeyboardInterrupt:
@@ -360,6 +380,9 @@ def _worker_job(
360380
traceback.format_exc()
361381
for item in items:
362382
status.queue_commit.put(item)
383+
success = False
384+
duration = time.time() - start_ts
385+
status.update_chunk(success, len(items), duration)
363386
with status.lock:
364387
status.last_commit_attempt = time.time()
365388
status.nb_workers_commit -= 1
@@ -393,7 +416,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
393416
elif status.queue_get_upload_mode.qsize() >= 10:
394417
status.nb_workers_get_upload_mode += 1
395418
logger.debug("Job: get upload mode (>10 files ready)")
396-
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
419+
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk()))
397420

398421
# 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS
399422
elif status.queue_preupload_lfs.qsize() > 0 and status.nb_workers_preupload_lfs == 0:
@@ -411,7 +434,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
411434
elif status.queue_get_upload_mode.qsize() > 0 and status.nb_workers_get_upload_mode == 0:
412435
status.nb_workers_get_upload_mode += 1
413436
logger.debug("Job: get upload mode (no other worker getting upload mode)")
414-
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
437+
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk()))
415438

416439
# 7. Preupload LFS file if at least 1 file
417440
# Skip if hf_transfer is enabled and there is already a worker preuploading LFS
@@ -432,7 +455,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
432455
elif status.queue_get_upload_mode.qsize() > 0:
433456
status.nb_workers_get_upload_mode += 1
434457
logger.debug("Job: get upload mode")
435-
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
458+
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, status.target_chunk()))
436459

437460
# 10. Commit if at least 1 file and 1 min since last commit attempt
438461
elif (

tests/test_upload_large_folder.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# tests/test_upload_large_folder.py
2+
import pytest
3+
4+
from huggingface_hub._upload_large_folder import COMMIT_SIZE_SCALE, LargeUploadStatus
5+
6+
7+
@pytest.fixture
8+
def status():
9+
return LargeUploadStatus(items=[])
10+
11+
12+
def test_target_chunk_default(status):
13+
assert status.target_chunk() == COMMIT_SIZE_SCALE[1]
14+
15+
16+
@pytest.mark.parametrize(
17+
"start_idx, success, delta_items, duration, expected_idx",
18+
[
19+
(2, False, 0, 10, 1), # drop by one on failure
20+
(0, False, 0, 10, 0), # never go below zero
21+
(1, True, 0, 50, 1), # duration >= 40 --> no bump
22+
(1, True, -1, 30, 1), # nb_items < threshold --> no bump
23+
(1, True, 0, 30, 2), # fast enough and enough items
24+
(len(COMMIT_SIZE_SCALE) - 1, True, 0, 10, len(COMMIT_SIZE_SCALE) - 1), # never exceed last index
25+
],
26+
)
27+
def test_update_chunk_transitions(status, start_idx, success, delta_items, duration, expected_idx):
28+
status._chunk_idx = start_idx
29+
threshold = COMMIT_SIZE_SCALE[start_idx]
30+
nb_items = threshold + delta_items
31+
status.update_chunk(success=success, nb_items=nb_items, duration=duration)
32+
33+
assert status._chunk_idx == expected_idx
34+
assert status.target_chunk() == COMMIT_SIZE_SCALE[expected_idx]

0 commit comments

Comments
 (0)