4
4
#include < util/system/guard.h>
5
5
#include < util/system/spinlock.h>
6
6
7
+ #include < map>
7
8
#include < memory>
9
+ #include < mutex>
8
10
#include < shared_mutex>
9
- #include < vector >
11
+ #include < thread >
10
12
11
13
namespace NYdb ::NPersQueue {
12
14
@@ -17,18 +19,62 @@ template <typename TGuardedObject>
17
19
class TCallbackContext {
18
20
friend class TContextOwner <TGuardedObject>;
19
21
22
+ // thread_id -> number of LockShared calls from this thread
23
+ using TSharedLockCounter = std::map<std::thread::id, size_t >;
24
+ using TSharedLockCounterPtr = std::shared_ptr<TSharedLockCounter>;
25
+ using TSpinLockPtr = std::shared_ptr<TSpinLock>;
26
+
20
27
public:
21
28
using TMutexPtr = std::shared_ptr<std::shared_mutex>;
22
29
23
30
class TBorrowed {
24
31
public:
25
- explicit TBorrowed (const TCallbackContext& parent) : Mutex(parent.Mutex) {
26
- Mutex->lock_shared ();
32
+ explicit TBorrowed (const TCallbackContext& parent)
33
+ : Mutex(parent.Mutex)
34
+ , SharedLockCounterMutex(parent.SharedLockCounterMutex)
35
+ , SharedLockCounter(parent.SharedLockCounter)
36
+ {
37
+ // "Recursive shared lock".
38
+ //
39
+ // https://en.cppreference.com/w/cpp/thread/shared_mutex/lock_shared says:
40
+ // If lock_shared is called by a thread that already owns the mutex
41
+ // in any mode (exclusive or shared), the behavior is UNDEFINED.
42
+ //
43
+ // So if a thread calls LockShared more than once without releasing the lock,
44
+ // we should call lock_shared only on the first call.
45
+
46
+ bool takeLock = false ;
47
+
48
+ with_lock (*SharedLockCounterMutex) {
49
+ auto & counter = SharedLockCounter->emplace (std::this_thread::get_id (), 0 ).first ->second ;
50
+ ++counter;
51
+ takeLock = counter == 1 ;
52
+ }
53
+
54
+ if (takeLock) {
55
+ Mutex->lock_shared ();
56
+ }
57
+
27
58
Ptr = parent.GuardedObjectPtr .get ();
28
59
}
29
60
30
61
~TBorrowed () {
31
- Mutex->unlock_shared ();
62
+ bool releaseLock = false ;
63
+
64
+ with_lock (*SharedLockCounterMutex) {
65
+ auto it = SharedLockCounter->find (std::this_thread::get_id ());
66
+ Y_ABORT_UNLESS (it != SharedLockCounter->end ());
67
+ auto & counter = it->second ;
68
+ --counter;
69
+ if (counter == 0 ) {
70
+ releaseLock = true ;
71
+ SharedLockCounter->erase (it);
72
+ }
73
+ }
74
+
75
+ if (releaseLock) {
76
+ Mutex->unlock_shared ();
77
+ }
32
78
}
33
79
34
80
TGuardedObject* operator ->() {
@@ -46,12 +92,17 @@ class TCallbackContext {
46
92
private:
47
93
TMutexPtr Mutex;
48
94
TGuardedObject* Ptr = nullptr ;
95
+
96
+ TSpinLockPtr SharedLockCounterMutex;
97
+ TSharedLockCounterPtr SharedLockCounter;
49
98
};
50
99
51
100
public:
52
101
explicit TCallbackContext (std::shared_ptr<TGuardedObject> ptr)
53
102
: Mutex(std::make_shared<std::shared_mutex>())
54
103
, GuardedObjectPtr(std::move(ptr))
104
+ , SharedLockCounterMutex(std::make_shared<TSpinLock>())
105
+ , SharedLockCounter(std::make_shared<TSharedLockCounter>())
55
106
{}
56
107
57
108
TBorrowed LockShared () {
@@ -75,8 +126,12 @@ class TCallbackContext {
75
126
}
76
127
77
128
private:
129
+
78
130
TMutexPtr Mutex;
79
131
std::shared_ptr<TGuardedObject> GuardedObjectPtr;
132
+
133
+ TSpinLockPtr SharedLockCounterMutex;
134
+ TSharedLockCounterPtr SharedLockCounter;
80
135
};
81
136
82
137
template <typename T>
0 commit comments