43
43
44
44
WAITING_TIME_IF_NO_TASKS = 10 # seconds
45
45
MAX_NB_FILES_FETCH_UPLOAD_MODE = 100
46
- MAX_NB_REGULAR_FILES_PER_COMMIT = 75
47
- MAX_NB_LFS_FILES_PER_COMMIT = 150
48
46
COMMIT_SIZE_SCALE : List [int ] = [20 , 50 , 75 , 100 , 125 , 200 , 250 , 400 , 600 , 1000 ]
49
47
50
48
@@ -405,13 +403,13 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
405
403
):
406
404
status .nb_workers_commit += 1
407
405
logger .debug ("Job: commit (more than 5 minutes since last commit attempt)" )
408
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
406
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
409
407
410
408
# 2. Commit if at least 100 files are ready to commit
411
409
elif status .nb_workers_commit == 0 and status .queue_commit .qsize () >= 150 :
412
410
status .nb_workers_commit += 1
413
411
logger .debug ("Job: commit (>100 files ready)" )
414
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
412
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
415
413
416
414
# 3. Get upload mode if at least 100 files
417
415
elif status .queue_get_upload_mode .qsize () >= MAX_NB_FILES_FETCH_UPLOAD_MODE :
@@ -467,7 +465,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
467
465
):
468
466
status .nb_workers_commit += 1
469
467
logger .debug ("Job: commit (1 min since last commit attempt)" )
470
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
468
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
471
469
472
470
# 11. Commit if at least 1 file all other queues are empty and all workers are waiting
473
471
# e.g. when it's the last commit
@@ -483,7 +481,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
483
481
):
484
482
status .nb_workers_commit += 1
485
483
logger .debug ("Job: commit" )
486
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
484
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
487
485
488
486
# 12. If all queues are empty, exit
489
487
elif all (metadata .is_committed or metadata .should_ignore for _ , metadata in status .items ):
@@ -601,30 +599,6 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]:
601
599
return [queue .get () for _ in range (min (queue .qsize (), n ))]
602
600
603
601
604
- def _get_items_to_commit (queue : "queue.Queue[JOB_ITEM_T]" ) -> List [JOB_ITEM_T ]:
605
- """Special case for commit job: the number of items to commit depends on the type of files."""
606
- # Can take at most 50 regular files and/or 100 LFS files in a single commit
607
- items : List [JOB_ITEM_T ] = []
608
- nb_lfs , nb_regular = 0 , 0
609
- while True :
610
- # If empty queue => commit everything
611
- if queue .qsize () == 0 :
612
- return items
613
-
614
- # If we have enough items => commit them
615
- if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT :
616
- return items
617
-
618
- # Else, get a new item and increase counter
619
- item = queue .get ()
620
- items .append (item )
621
- _ , metadata = item
622
- if metadata .upload_mode == "lfs" :
623
- nb_lfs += 1
624
- else :
625
- nb_regular += 1
626
-
627
-
628
602
def _print_overwrite (report : str ) -> None :
629
603
"""Print a report, overwriting the previous lines.
630
604
0 commit comments