16
16
17
17
#include " llvm/ADT/ArrayRef.h"
18
18
#include " llvm/ADT/SmallString.h"
19
+ #include " llvm/ADT/STLExtras.h"
19
20
#include " llvm/ADT/Twine.h"
20
21
#include " llvm/Support/Path.h"
21
22
#include " llvm/Support/Program.h"
28
29
#include < thread>
29
30
#include < vector>
30
31
#include < string>
32
+ #include < unordered_set>
31
33
32
34
#include < fcntl.h>
33
35
#include < pthread.h>
@@ -64,6 +66,16 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
64
66
std::mutex readyJobsMutex;
65
67
std::condition_variable readyJobsCondition;
66
68
69
+ // / The set of spawned processes to terminate if we get cancelled.
70
+ std::unordered_set<pid_t > spawnedProcesses;
71
+ std::mutex spawnedProcessesMutex;
72
+
73
+ // / Management of cancellation and SIGKILL escalation
74
+ std::unique_ptr<std::thread> killAfterTimeoutThread = nullptr ;
75
+ std::atomic<bool > cancelled { false };
76
+ std::condition_variable stopKillingCondition;
77
+ std::mutex stopKillingMutex;
78
+
67
79
void executeLane (unsigned laneNumber) {
68
80
// Set the thread name, if available.
69
81
#if defined(__APPLE__)
@@ -78,7 +90,7 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
78
90
#endif
79
91
80
92
// Execute items from the queue until shutdown.
81
- while (true ) {
93
+ while (!cancelled ) {
82
94
// Take a job from the ready queue.
83
95
QueueJob job{};
84
96
{
@@ -87,6 +99,10 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
87
99
// While the queue is empty, wait for an item.
88
100
while (readyJobs.empty ()) {
89
101
readyJobsCondition.wait (lock);
102
+
103
+ if (cancelled) {
104
+ return ;
105
+ }
90
106
}
91
107
92
108
// Take an item according to the chosen policy.
@@ -106,6 +122,21 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
106
122
}
107
123
}
108
124
125
+ void killAfterTimeout () {
126
+ std::unique_lock<std::mutex> lock (stopKillingMutex);
127
+ stopKillingCondition.wait_for (lock, std::chrono::seconds (10 ));
128
+ sendSignalToProcesses (SIGKILL);
129
+ }
130
+
131
+ void sendSignalToProcesses (int signal) {
132
+ std::unique_lock<std::mutex> lock (spawnedProcessesMutex);
133
+
134
+ for (pid_t pid: spawnedProcesses) {
135
+ // We are killing the whole process group here, this depends on us spawning each process in its own group earlier
136
+ ::kill (-pid, signal);
137
+ }
138
+ }
139
+
109
140
public:
110
141
LaneBasedExecutionQueue (BuildExecutionQueueDelegate& delegate,
111
142
unsigned numLanes)
@@ -120,20 +151,43 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
120
151
121
152
virtual ~LaneBasedExecutionQueue () {
122
153
// Shut down the lanes.
123
- for ( unsigned i = 0 ; i != numLanes; ++i) {
124
- addJob ({} );
125
- }
154
+ cancelled = true ;
155
+ readyJobsCondition. notify_all ( );
156
+
126
157
for (unsigned i = 0 ; i != numLanes; ++i) {
127
158
lanes[i]->join ();
128
159
}
160
+
161
+ if (killAfterTimeoutThread) {
162
+ stopKillingCondition.notify_all ();
163
+ killAfterTimeoutThread->join ();
164
+ }
129
165
}
130
166
131
167
virtual void addJob (QueueJob job) override {
168
+ if (cancelled) {
169
+ // FIXME: We should eventually raise an error here as new work should not be enqueued after cancellation
170
+ return ;
171
+ }
172
+
132
173
std::lock_guard<std::mutex> guard (readyJobsMutex);
133
174
readyJobs.push_back (job);
134
175
readyJobsCondition.notify_one ();
135
176
}
136
177
178
+ virtual void cancelAllJobs () override {
179
+ auto wasAlreadyCancelled = cancelled.exchange (true );
180
+ // If we were already cancelled, do nothing.
181
+ if (wasAlreadyCancelled) {
182
+ return ;
183
+ }
184
+
185
+ readyJobsCondition.notify_all ();
186
+
187
+ sendSignalToProcesses (SIGINT);
188
+ killAfterTimeoutThread = llvm::make_unique<std::thread>(&LaneBasedExecutionQueue::killAfterTimeout, this );
189
+ }
190
+
137
191
virtual bool
138
192
executeProcess (QueueJobContext* opaqueContext,
139
193
ArrayRef<StringRef> commandLine,
@@ -274,19 +328,24 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
274
328
}
275
329
276
330
// Spawn the command.
277
- //
278
- // FIXME: Need to track spawned processes for the purposes of cancellation.
279
-
280
331
pid_t pid;
281
- if (posix_spawn (&pid, args[0 ], /* file_actions=*/ &fileActions,
282
- /* attrp=*/ &attributes, const_cast <char **>(args.data ()),
283
- envp) != 0 ) {
284
- getDelegate ().commandProcessHadError (
285
- context.job .getForCommand (), handle,
286
- Twine (" unable to spawn process (" ) + strerror (errno) + " )" );
287
- getDelegate ().commandProcessFinished (context.job .getForCommand (), handle,
288
- -1 );
289
- return false ;
332
+ {
333
+ // We need to hold the spawn processes lock when we spawn, to ensure that
334
+ // we don't create a process in between when we are cancelled.
335
+ std::lock_guard<std::mutex> guard (spawnedProcessesMutex);
336
+
337
+ if (posix_spawn (&pid, args[0 ], /* file_actions=*/ &fileActions,
338
+ /* attrp=*/ &attributes, const_cast <char **>(args.data ()),
339
+ envp) != 0 ) {
340
+ getDelegate ().commandProcessHadError (
341
+ context.job .getForCommand (), handle,
342
+ Twine (" unable to spawn process (" ) + strerror (errno) + " )" );
343
+ getDelegate ().commandProcessFinished (context.job .getForCommand (), handle,
344
+ -1 );
345
+ return false ;
346
+ }
347
+
348
+ spawnedProcesses.insert (pid);
290
349
}
291
350
292
351
posix_spawn_file_actions_destroy (&fileActions);
@@ -323,6 +382,13 @@ class LaneBasedExecutionQueue : public BuildExecutionQueue {
323
382
int status, result = waitpid (pid, &status, 0 );
324
383
while (result == -1 && errno == EINTR)
325
384
result = waitpid (pid, &status, 0 );
385
+
386
+ // Update the set of spawned processes.
387
+ {
388
+ std::lock_guard<std::mutex> guard (spawnedProcessesMutex);
389
+ spawnedProcesses.erase (pid);
390
+ }
391
+
326
392
if (result == -1 ) {
327
393
getDelegate ().commandProcessHadError (
328
394
context.job .getForCommand (), handle,
0 commit comments