Skip to content

Commit 89f880c

Browse files
committed
YT-21634: Make shutdown sequences of Invokers more robust
Basically, there are two bugs/bad behaviors: 1) Some parts of shutdown are delegated to Finalizer/Shutdown invoker. This causes a race (not a data race, just non-deterministic behaviour) between deadlocking producer (if destruction of their callback causes a deadlock) and a finalizer/shutdown thread who closes the queue triggering deadlock event. We change behaviour so that queues are closed immediately thus if any deadlocks are present, they are guaranteed to happen and be reliably detected. 2) Most queues that implement a shutdown mechanism, do not have a proper way of double checking if queue was closed after they've checked previously but before they enqueued their task. This results in a callback being stuck in queue forever. On its own this is not an issues, but given the fact that callback can and usually does hold a strong reference to the class which usually owns the queue itself, we have a reference cycle queue -owns-> callback -owns-> queue. We implement appropriate double checking mechanics which ensure such a think never occurs. ad8c42eb67f74bc19c32c85fd8b7790b294a7144
1 parent 8252f93 commit 89f880c

21 files changed

+386
-167
lines changed

yt/yt/core/concurrency/action_queue.cpp

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,14 @@ class TActionQueue::TImpl
5454

5555
void Shutdown(bool graceful)
5656
{
57-
if (Stopped_.exchange(true)) {
57+
// Proper synchronization done via Queue_->Shutdown().
58+
if (Stopped_.exchange(true, std::memory_order::relaxed)) {
5859
return;
5960
}
6061

61-
Queue_->Shutdown();
62-
63-
ShutdownInvoker_->Invoke(BIND_NO_PROPAGATE([graceful, thread = Thread_, queue = Queue_] {
64-
thread->Stop(graceful);
65-
queue->DrainConsumer();
66-
}));
62+
Queue_->Shutdown(graceful);
63+
Thread_->Stop(graceful);
64+
Queue_->OnConsumerFinished();
6765
}
6866

6967
const IInvokerPtr& GetInvoker()
@@ -79,20 +77,14 @@ class TActionQueue::TImpl
7977
const TMpscSingleQueueSchedulerThreadPtr Thread_;
8078

8179
const TShutdownCookie ShutdownCookie_;
82-
const IInvokerPtr ShutdownInvoker_ = GetShutdownInvoker();
8380

84-
std::atomic<bool> Started_ = false;
8581
std::atomic<bool> Stopped_ = false;
8682

8783

8884
void EnsureStarted()
8985
{
90-
if (Started_.load(std::memory_order::relaxed)) {
91-
return;
92-
}
93-
if (Started_.exchange(true)) {
94-
return;
95-
}
86+
// Thread::Start already has
87+
// its own short-circ mechanism.
9688
Thread_->Start();
9789
}
9890
};

yt/yt/core/concurrency/delayed_executor.cpp

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "scheduler.h"
44
#include "private.h"
55

6+
#include <yt/yt/core/actions/invoker_util.h>
67
#include <yt/yt/core/misc/relaxed_mpsc_queue.h>
78
#include <yt/yt/core/misc/singleton.h>
89

@@ -60,6 +61,41 @@ DEFINE_REFCOUNTED_TYPE(TDelayedExecutorEntry)
6061

6162
////////////////////////////////////////////////////////////////////////////////
6263

64+
//! (arkady-e1ppa) MO's/Fence explanation:
65+
/*
66+
We want for shutdown to guarantee that no callback is left in any queue
67+
once it is over. Otherwise, memory leak or a deadlock of Poller/GrpcServer
68+
(or someone else who blocks thread until some callback is run) will occur.
69+
70+
We model our queue with Enqueue being RMW^rel(Queue_, x, y) and Dequeue
71+
being RMW^acq(Queue_, x, y), where x is what we have read and y is what we
72+
have observed. Missing callback would imply that in Submit method enqueueing |CB|
73+
we have observed Stopping_ |false| (e.g. TThread::Start (c) returned |true|)
74+
but also in ThreadMain during the SubmitQueue drain (f) we have not observed the
75+
|CB|. Execution is schematically listed below:
76+
T1(Submit) T2(Shutdown)
77+
RMW^rel(Queue_, empty, CB) (a) W^rel(Stopping_, true) (d)
78+
|sequenced-before |simply hb
79+
Fence^sc (b) Fence^sc (e)
80+
|sequenced-before |sequenced-before
81+
R^acq(Stopping_, false) (c) RMW^acq(Queue_, empty, empty) (f)
82+
83+
Since (c) reads |false| it must be reading from Stopping_ ctor which is
84+
W^na(Stopping_, false) which preceedes (d) in modification order. Thus
85+
(c) must read-from some modification preceding (d) in modification order (ctor)
86+
and therefore (c) -cob-> (d) (coherence ordered before).
87+
Likewise, (f) reads |empty| which can only be read from Queue_ ctor or
88+
prior Dequeue both of which preceede (a) in modification order (ctor is obvious;
89+
former Dequeue by assumption that no one has read |CB| ever: if some (a) was
90+
prior to some Dequeue in modification order, |CB| would inevitably be read).
91+
So, (f) -cob-> (a). For fences we now have to relations:
92+
(b) -sb-> (c) -cob-> (d) -simply hb-> (e) => (b) -S-> (e)
93+
(e) -sb-> (f) -cob-> (a) -sb-> (b) => (e) -S-> (b)
94+
Here sb is sequenced-before and S is sequentially-consistent total ordering.
95+
We have formed a loop in S thus contradicting the assumption.
96+
*/
97+
98+
6399
class TDelayedExecutorImpl
64100
{
65101
public:
@@ -139,9 +175,11 @@ class TDelayedExecutorImpl
139175
{
140176
YT_VERIFY(callback);
141177
auto entry = New<TDelayedExecutorEntry>(std::move(callback), deadline, std::move(invoker));
142-
PollerThread_->EnqueueSubmission(entry);
178+
PollerThread_->EnqueueSubmission(entry); // <- (a)
179+
180+
std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b)
143181

144-
if (!PollerThread_->Start()) {
182+
if (!PollerThread_->Start()) { // <- (c)
145183
if (auto callback = TakeCallback(entry)) {
146184
callback(/*aborted*/ true);
147185
}
@@ -213,6 +251,43 @@ class TDelayedExecutorImpl
213251
NProfiling::TCounter CanceledCallbacksCounter_ = ConcurrencyProfiler.Counter("/delayed_executor/canceled_callbacks");
214252
NProfiling::TCounter StaleCallbacksCounter_ = ConcurrencyProfiler.Counter("/delayed_executor/stale_callbacks");
215253

254+
class TCallbackGuard
255+
{
256+
public:
257+
TCallbackGuard(TCallback<void(bool)> callback, bool aborted) noexcept
258+
: Callback_(std::move(callback))
259+
, Aborted_(aborted)
260+
{ }
261+
262+
TCallbackGuard(TCallbackGuard&& other) = default;
263+
264+
TCallbackGuard(const TCallbackGuard&) = delete;
265+
266+
TCallbackGuard& operator=(const TCallbackGuard&) = delete;
267+
TCallbackGuard& operator=(TCallbackGuard&&) = delete;
268+
269+
void operator()()
270+
{
271+
auto callback = std::move(Callback_);
272+
YT_VERIFY(callback);
273+
callback.Run(Aborted_);
274+
}
275+
276+
~TCallbackGuard()
277+
{
278+
if (Callback_) {
279+
YT_LOG_DEBUG("Aborting delayed executor callback");
280+
281+
auto callback = std::move(Callback_);
282+
callback(/*aborted*/ true);
283+
}
284+
}
285+
286+
private:
287+
TCallback<void(bool)> Callback_;
288+
bool Aborted_;
289+
};
290+
216291

217292
void StartPrologue() override
218293
{
@@ -240,6 +315,13 @@ class TDelayedExecutorImpl
240315
ProcessQueues();
241316

242317
if (IsStopping()) {
318+
// We have Stopping_.store(true) in simply happens-before relation.
319+
// Assume Stopping_.store(true, release) is (d).
320+
// NB(arkady-e1ppa): At the time of writing it is seq_cst
321+
// actually, but
322+
// 1. We don't need it to be for correctness hehe
323+
// 2. It won't help us here anyway
324+
// 3. It might be changed as it could be suboptimal.
243325
break;
244326
}
245327

@@ -251,6 +333,8 @@ class TDelayedExecutorImpl
251333
EventCount_->Wait(cookie, deadline);
252334
}
253335

336+
std::atomic_thread_fence(std::memory_order::seq_cst); // <- (e)
337+
254338
// Perform graceful shutdown.
255339

256340
// First run the scheduled callbacks with |aborted = true|.
@@ -267,7 +351,7 @@ class TDelayedExecutorImpl
267351
// Now we handle the queued callbacks similarly.
268352
{
269353
TDelayedExecutorEntryPtr entry;
270-
while (SubmitQueue_.TryDequeue(&entry)) {
354+
while (SubmitQueue_.TryDequeue(&entry)) { // <- (f)
271355
runAbort(entry);
272356
}
273357
}
@@ -349,7 +433,11 @@ class TDelayedExecutorImpl
349433
void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort)
350434
{
351435
if (auto callback = TakeCallback(entry)) {
352-
(entry->Invoker ? entry->Invoker : DelayedInvoker_)->Invoke(BIND_NO_PROPAGATE(std::move(callback), abort));
436+
auto invoker = entry->Invoker
437+
? entry->Invoker
438+
: DelayedInvoker_;
439+
invoker
440+
->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), abort)));
353441
}
354442
}
355443
};

yt/yt/core/concurrency/fair_share_action_queue.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,14 @@ class TFairShareActionQueue
105105

106106
void Shutdown(bool graceful)
107107
{
108-
if (Stopped_.exchange(true)) {
108+
// Syncrhonization done via Queue_->Shutdown().
109+
if (Stopped_.exchange(true, std::memory_order::relaxed)) {
109110
return;
110111
}
111112

112-
Queue_->Shutdown();
113-
114-
ShutdownInvoker_->Invoke(BIND([graceful, thread = Thread_, queue = Queue_] {
115-
thread->Stop(graceful);
116-
queue->DrainConsumer();
117-
}));
113+
Queue_->Shutdown(graceful);
114+
Thread_->Stop(graceful);
115+
Queue_->OnConsumerFinished();
118116
}
119117

120118
const IInvokerPtr& GetInvoker(int index) override
@@ -156,18 +154,13 @@ class TFairShareActionQueue
156154

157155
std::vector<TString> BucketNames_;
158156

159-
std::atomic<bool> Started_ = false;
160157
std::atomic<bool> Stopped_ = false;
161158

162159

163160
void EnsuredStarted()
164161
{
165-
if (Started_.load(std::memory_order::relaxed)) {
166-
return;
167-
}
168-
if (Started_.exchange(true)) {
169-
return;
170-
}
162+
// Thread::Start already has
163+
// its own short-circ.
171164
Thread_->Start();
172165
}
173166
};

yt/yt/core/concurrency/fair_share_invoker_queue.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,24 +54,17 @@ const IInvokerPtr& TFairShareInvokerQueue::GetInvoker(int bucketIndex, int queue
5454
return bucket.Invokers[queueIndex];
5555
}
5656

57-
void TFairShareInvokerQueue::Shutdown()
57+
void TFairShareInvokerQueue::Shutdown(bool graceful)
5858
{
5959
for (auto& bucket : Buckets_) {
60-
bucket.Queue->Shutdown();
60+
bucket.Queue->Shutdown(graceful);
6161
}
6262
}
6363

64-
void TFairShareInvokerQueue::DrainProducer()
64+
void TFairShareInvokerQueue::OnConsumerFinished()
6565
{
6666
for (auto& bucket : Buckets_) {
67-
bucket.Queue->DrainProducer();
68-
}
69-
}
70-
71-
void TFairShareInvokerQueue::DrainConsumer()
72-
{
73-
for (auto& bucket : Buckets_) {
74-
bucket.Queue->DrainConsumer();
67+
bucket.Queue->OnConsumerFinished();
7568
}
7669
}
7770

yt/yt/core/concurrency/fair_share_invoker_queue.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ class TFairShareInvokerQueue
4040

4141
const IInvokerPtr& GetInvoker(int bucketIndex, int queueIndex) const;
4242

43-
void Shutdown();
44-
45-
void DrainProducer();
46-
void DrainConsumer();
43+
// See TInvokerQueue::Shutdown/OnConsumerFinished.
44+
void Shutdown(bool graceful = false);
45+
void OnConsumerFinished();
4746

4847
bool IsRunning() const;
4948

yt/yt/core/concurrency/fair_share_thread_pool.cpp

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ class TFairShareQueue
187187
void Invoke(TClosure callback, TBucket* bucket)
188188
{
189189
auto guard = Guard(SpinLock_);
190+
// See Shutdown.
191+
if (Stopping_) {
192+
return;
193+
}
190194

191195
QueueSize_.fetch_add(1, std::memory_order::relaxed);
192196

@@ -227,13 +231,14 @@ class TFairShareQueue
227231
}
228232

229233
void Shutdown()
230-
{
231-
Drain();
232-
}
233-
234-
void Drain()
235234
{
236235
auto guard = Guard(SpinLock_);
236+
// Setting under spinlock because this way
237+
// we have atomicity of two actions:
238+
// 1) Write/read flag and 2) Drain/Enqueue callback.
239+
// See two_level_fair_share_thread_pool Queue
240+
// for more detailed explanation.
241+
Stopping_ = true;
237242
for (const auto& item : Heap_) {
238243
item.Bucket->Drain();
239244
}
@@ -339,7 +344,7 @@ class TFairShareQueue
339344
const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_;
340345

341346
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
342-
347+
bool Stopping_ = false;
343348
std::vector<THeapItem> Heap_;
344349

345350
std::atomic<int> ThreadCount_ = 0;
@@ -548,14 +553,6 @@ class TFairShareThreadPool
548553
TThreadPoolBase::DoShutdown();
549554
}
550555

551-
TClosure MakeFinalizerCallback() override
552-
{
553-
return BIND_NO_PROPAGATE([queue = Queue_, callback = TThreadPoolBase::MakeFinalizerCallback()] {
554-
callback();
555-
queue->Drain();
556-
});
557-
}
558-
559556
void DoConfigure(int threadCount) override
560557
{
561558
Queue_->Configure(threadCount);

0 commit comments

Comments
 (0)