44#include < util/system/guard.h>
55#include < util/system/spinlock.h>
66
7+ #include < map>
78#include < memory>
89#include < mutex>
910#include < shared_mutex>
11+ #include < thread>
1012
1113namespace NYdb ::NTopic {
1214
@@ -17,18 +19,61 @@ template <typename TGuardedObject>
1719class TCallbackContext {
1820 friend class TContextOwner <TGuardedObject>;
1921
22+ // thread_id -> number of LockShared calls from this thread
23+ using TSharedLockCounter = std::map<std::thread::id, size_t >;
24+ using TSharedLockCounterPtr = std::shared_ptr<TSharedLockCounter>;
25+ using TSpinLockPtr = std::shared_ptr<TSpinLock>;
26+
2027public:
2128 using TMutexPtr = std::shared_ptr<std::shared_mutex>;
2229
2330 class TBorrowed {
2431 public:
25- explicit TBorrowed (const TCallbackContext& parent) : Mutex(parent.Mutex) {
26- Mutex->lock_shared ();
32+ explicit TBorrowed (const TCallbackContext& parent)
33+ : Mutex(parent.Mutex)
34+ , SharedLockCounterMutex(parent.SharedLockCounterMutex)
35+ , SharedLockCounter(parent.SharedLockCounter)
36+ {
37+ // "Recursive shared lock".
38+ //
39+ // https://en.cppreference.com/w/cpp/thread/shared_mutex/lock_shared says:
40+ // If lock_shared is called by a thread that already owns the mutex
41+ // in any mode (exclusive or shared), the behavior is UNDEFINED.
42+ //
43+ // So if a thread calls LockShared more than once without releasing the lock,
44+ // we should call lock_shared only on the first call.
45+
46+ bool takeLock = false ;
47+
48+ with_lock (*SharedLockCounterMutex) {
49+ auto & counter = SharedLockCounter->emplace (std::this_thread::get_id (), 0 ).first ->second ;
50+ ++counter;
51+ takeLock = counter == 1 ;
52+ }
53+
54+ if (takeLock) {
55+ Mutex->lock_shared ();
56+ }
57+
2758 Ptr = parent.GuardedObjectPtr .get ();
2859 }
2960
3061 ~TBorrowed () {
31- Mutex->unlock_shared ();
62+ bool releaseLock = false ;
63+
64+ with_lock (*SharedLockCounterMutex) {
65+ auto it = SharedLockCounter->find (std::this_thread::get_id ());
66+ auto & counter = it->second ;
67+ --counter;
68+ if (counter == 0 ) {
69+ releaseLock = true ;
70+ SharedLockCounter->erase (it);
71+ }
72+ }
73+
74+ if (releaseLock) {
75+ Mutex->unlock_shared ();
76+ }
3277 }
3378
3479 TGuardedObject* operator ->() {
@@ -46,12 +91,17 @@ class TCallbackContext {
4691 private:
4792 TMutexPtr Mutex;
4893 TGuardedObject* Ptr = nullptr ;
94+
95+ TSpinLockPtr SharedLockCounterMutex;
96+ TSharedLockCounterPtr SharedLockCounter;
4997 };
5098
5199public:
52100 explicit TCallbackContext (std::shared_ptr<TGuardedObject> ptr)
53101 : Mutex(std::make_shared<std::shared_mutex>())
54102 , GuardedObjectPtr(std::move(ptr))
103+ , SharedLockCounterMutex(std::make_shared<TSpinLock>())
104+ , SharedLockCounter(std::make_shared<TSharedLockCounter>())
55105 {}
56106
57107 TBorrowed LockShared () {
@@ -75,8 +125,12 @@ class TCallbackContext {
75125 }
76126
77127private:
128+
78129 TMutexPtr Mutex;
79130 std::shared_ptr<TGuardedObject> GuardedObjectPtr;
131+
132+ TSpinLockPtr SharedLockCounterMutex;
133+ TSharedLockCounterPtr SharedLockCounter;
80134};
81135
82136template <typename T>
0 commit comments