Skip to content

Commit 01af8e5

Browse files
authored
Make flutter update-packages run in parallel (flutter#91006)
This modifies the flutter update-packages and flutter update-packages --force-upgrade commands so that the many invocations of "dart pub get" in each repo project run in parallel instead of in series.
1 parent fc02dcb commit 01af8e5

File tree

9 files changed

+385
-118
lines changed

9 files changed

+385
-118
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2014 The Flutter Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style license that can be
3+
// found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:collection';
7+
8+
import '../globals_null_migrated.dart' as globals;
9+
10+
/// A closure type used by the [TaskQueue].
11+
typedef TaskQueueClosure<T> = Future<T> Function();
12+
13+
/// A task queue of Futures to be completed in parallel, throttling
14+
/// the number of simultaneous tasks.
15+
///
16+
/// The tasks return results of type T.
17+
class TaskQueue<T> {
18+
/// Creates a task queue with a maximum number of simultaneous jobs.
19+
/// The [maxJobs] parameter defaults to the number of CPU cores on the
20+
/// system.
21+
TaskQueue({int? maxJobs})
22+
: maxJobs = maxJobs ?? globals.platform.numberOfProcessors;
23+
24+
/// The maximum number of jobs that this queue will run simultaneously.
25+
final int maxJobs;
26+
27+
final Queue<_TaskQueueItem<T>> _pendingTasks = Queue<_TaskQueueItem<T>>();
28+
final Set<_TaskQueueItem<T>> _activeTasks = <_TaskQueueItem<T>>{};
29+
final Set<Completer<void>> _completeListeners = <Completer<void>>{};
30+
31+
/// Returns a future that completes when all tasks in the [TaskQueue] are
32+
/// complete.
33+
Future<void> get tasksComplete {
34+
// In case this is called when there are no tasks, we want it to
35+
// signal complete immediately.
36+
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
37+
return Future<void>.value();
38+
}
39+
final Completer<void> completer = Completer<void>();
40+
_completeListeners.add(completer);
41+
return completer.future;
42+
}
43+
44+
/// Adds a single closure to the task queue, returning a future that
45+
/// completes when the task completes.
46+
Future<T> add(TaskQueueClosure<T> task) {
47+
final Completer<T> completer = Completer<T>();
48+
_pendingTasks.add(_TaskQueueItem<T>(task, completer));
49+
if (_activeTasks.length < maxJobs) {
50+
_processTask();
51+
}
52+
return completer.future;
53+
}
54+
55+
// Process a single task.
56+
void _processTask() {
57+
if (_pendingTasks.isNotEmpty && _activeTasks.length <= maxJobs) {
58+
final _TaskQueueItem<T> item = _pendingTasks.removeFirst();
59+
_activeTasks.add(item);
60+
item.onComplete = () {
61+
_activeTasks.remove(item);
62+
_processTask();
63+
};
64+
item.run();
65+
} else {
66+
_checkForCompletion();
67+
}
68+
}
69+
70+
void _checkForCompletion() {
71+
if (_activeTasks.isEmpty && _pendingTasks.isEmpty) {
72+
for (final Completer<void> completer in _completeListeners) {
73+
if (!completer.isCompleted) {
74+
completer.complete();
75+
}
76+
}
77+
_completeListeners.clear();
78+
}
79+
}
80+
}
81+
82+
class _TaskQueueItem<T> {
83+
_TaskQueueItem(this._closure, this._completer, {this.onComplete});
84+
85+
final TaskQueueClosure<T> _closure;
86+
final Completer<T> _completer;
87+
void Function()? onComplete;
88+
89+
Future<void> run() async {
90+
try {
91+
_completer.complete(await _closure());
92+
} catch (e) { // ignore: avoid_catches_without_on_clauses
93+
_completer.completeError(e);
94+
} finally {
95+
onComplete?.call();
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)