xref: /oneTBB/src/tbb/queuing_rw_mutex.cpp (revision c21e688a)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 /** Before making any changes in the implementation, please emulate algorithmic changes
18     with SPIN tool using <TBB directory>/tools/spin_models/ReaderWriterMutex.pml.
19     There could be some code looking as "can be restructured" but its structure does matter! */
20 
21 #include "oneapi/tbb/queuing_rw_mutex.h"
22 #include "oneapi/tbb/detail/_assert.h"
23 #include "oneapi/tbb/detail/_utils.h"
24 #include "itt_notify.h"
25 
26 namespace tbb {
27 namespace detail {
28 namespace r1 {
29 
30 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
31     // Workaround for overzealous compiler warnings
32     #pragma warning (push)
33     #pragma warning (disable: 4311 4312)
34 #endif
35 
36 //! A view of a T* with additional functionality for twiddling low-order bits.
37 template<typename T>
38 class tricky_atomic_pointer {
39 public:
40     using word = uintptr_t;
41 
fetch_add(std::atomic<word> & location,word addend,std::memory_order memory_order)42     static T* fetch_add( std::atomic<word>& location, word addend, std::memory_order memory_order ) {
43         return reinterpret_cast<T*>(location.fetch_add(addend, memory_order));
44     }
45 
exchange(std::atomic<word> & location,T * value,std::memory_order memory_order)46     static T* exchange( std::atomic<word>& location, T* value, std::memory_order memory_order ) {
47         return reinterpret_cast<T*>(location.exchange(reinterpret_cast<word>(value), memory_order));
48     }
49 
compare_exchange_strong(std::atomic<word> & obj,const T * expected,const T * desired,std::memory_order memory_order)50     static T* compare_exchange_strong( std::atomic<word>& obj, const T* expected, const T* desired, std::memory_order memory_order ) {
51         word expd = reinterpret_cast<word>(expected);
52         obj.compare_exchange_strong(expd, reinterpret_cast<word>(desired), memory_order);
53         return reinterpret_cast<T*>(expd);
54     }
55 
store(std::atomic<word> & location,const T * value,std::memory_order memory_order)56     static void store( std::atomic<word>& location, const T* value, std::memory_order memory_order ) {
57         location.store(reinterpret_cast<word>(value), memory_order);
58     }
59 
load(std::atomic<word> & location,std::memory_order memory_order)60     static T* load( std::atomic<word>& location, std::memory_order memory_order ) {
61         return reinterpret_cast<T*>(location.load(memory_order));
62     }
63 
spin_wait_while_eq(const std::atomic<word> & location,const T * value)64     static void spin_wait_while_eq(const std::atomic<word>& location, const T* value) {
65         tbb::detail::d0::spin_wait_while_eq(location, reinterpret_cast<word>(value) );
66     }
67 
68     T* & ref;
tricky_atomic_pointer(T * & original)69     tricky_atomic_pointer( T*& original ) : ref(original) {};
70     tricky_atomic_pointer(const tricky_atomic_pointer&) = delete;
71     tricky_atomic_pointer& operator=(const tricky_atomic_pointer&) = delete;
operator &(const word operand2) const72     T* operator&( const word operand2 ) const {
73         return reinterpret_cast<T*>( reinterpret_cast<word>(ref) & operand2 );
74     }
operator |(const word operand2) const75     T* operator|( const word operand2 ) const {
76         return reinterpret_cast<T*>( reinterpret_cast<word>(ref) | operand2 );
77     }
78 };
79 
80 using tricky_pointer = tricky_atomic_pointer<queuing_rw_mutex::scoped_lock>;
81 
82 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
83     // Workaround for overzealous compiler warnings
84     #pragma warning (pop)
85 #endif
86 
87 //! Flag bits in a state_t that specify information about a locking request.
88 enum state_t_flags : unsigned char {
89     STATE_NONE                   = 0,
90     STATE_WRITER                 = 1<<0,
91     STATE_READER                 = 1<<1,
92     STATE_READER_UNBLOCKNEXT     = 1<<2,
93     STATE_ACTIVEREADER           = 1<<3,
94     STATE_UPGRADE_REQUESTED      = 1<<4,
95     STATE_UPGRADE_WAITING        = 1<<5,
96     STATE_UPGRADE_LOSER          = 1<<6,
97     STATE_COMBINED_WAITINGREADER = STATE_READER | STATE_READER_UNBLOCKNEXT,
98     STATE_COMBINED_READER        = STATE_COMBINED_WAITINGREADER | STATE_ACTIVEREADER,
99     STATE_COMBINED_UPGRADING     = STATE_UPGRADE_WAITING | STATE_UPGRADE_LOSER
100 };
101 
102 static const unsigned char RELEASED = 0;
103 static const unsigned char ACQUIRED = 1;
104 
105 struct queuing_rw_mutex_impl {
106     //! Try to acquire the internal lock
107     /** Returns true if lock was successfully acquired. */
try_acquire_internal_locktbb::detail::r1::queuing_rw_mutex_impl108     static bool try_acquire_internal_lock(d1::queuing_rw_mutex::scoped_lock& s)
109     {
110         auto expected = RELEASED;
111         return s.my_internal_lock.compare_exchange_strong(expected, ACQUIRED);
112     }
113 
114     //! Acquire the internal lock
acquire_internal_locktbb::detail::r1::queuing_rw_mutex_impl115     static void acquire_internal_lock(d1::queuing_rw_mutex::scoped_lock& s)
116     {
117         // Usually, we would use the test-test-and-set idiom here, with exponential backoff.
118         // But so far, experiments indicate there is no value in doing so here.
119         while( !try_acquire_internal_lock(s) ) {
120             machine_pause(1);
121         }
122     }
123 
124     //! Release the internal lock
release_internal_locktbb::detail::r1::queuing_rw_mutex_impl125     static void release_internal_lock(d1::queuing_rw_mutex::scoped_lock& s)
126     {
127         s.my_internal_lock.store(RELEASED, std::memory_order_release);
128     }
129 
130     //! Wait for internal lock to be released
wait_for_release_of_internal_locktbb::detail::r1::queuing_rw_mutex_impl131     static void wait_for_release_of_internal_lock(d1::queuing_rw_mutex::scoped_lock& s)
132     {
133         spin_wait_until_eq(s.my_internal_lock, RELEASED);
134     }
135 
136     //! A helper function
unblock_or_wait_on_internal_locktbb::detail::r1::queuing_rw_mutex_impl137     static void unblock_or_wait_on_internal_lock(d1::queuing_rw_mutex::scoped_lock& s, uintptr_t flag ) {
138         if( flag ) {
139             wait_for_release_of_internal_lock(s);
140         }
141         else {
142             release_internal_lock(s);
143         }
144     }
145 
146     //! Mask for low order bit of a pointer.
147     static const tricky_pointer::word FLAG = 0x1;
148 
get_flagtbb::detail::r1::queuing_rw_mutex_impl149     static uintptr_t get_flag( d1::queuing_rw_mutex::scoped_lock* ptr ) {
150         return reinterpret_cast<uintptr_t>(ptr) & FLAG;
151     }
152 
153     //------------------------------------------------------------------------
154     // Methods of queuing_rw_mutex::scoped_lock
155     //------------------------------------------------------------------------
156 
157     //! A method to acquire queuing_rw_mutex lock
acquiretbb::detail::r1::queuing_rw_mutex_impl158     static void acquire(d1::queuing_rw_mutex& m, d1::queuing_rw_mutex::scoped_lock& s, bool write)
159     {
160         __TBB_ASSERT( !s.my_mutex, "scoped_lock is already holding a mutex");
161 
162         // Must set all fields before the exchange, because once the
163         // exchange executes, *this becomes accessible to other threads.
164         s.my_mutex = &m;
165         s.my_prev.store(0U, std::memory_order_relaxed);
166         s.my_next.store(0U, std::memory_order_relaxed);
167         s.my_going.store(0U, std::memory_order_relaxed);
168         s.my_state.store(d1::queuing_rw_mutex::scoped_lock::state_t(write ? STATE_WRITER : STATE_READER), std::memory_order_relaxed);
169         s.my_internal_lock.store(RELEASED, std::memory_order_relaxed);
170 
171 
172         // The CAS must have release semantics, because we are
173         // "sending" the fields initialized above to other actors.
174         // We need acquire semantics, because we are acquiring the predecessor (or mutex if no predecessor)
175         queuing_rw_mutex::scoped_lock* predecessor = m.q_tail.exchange(&s, std::memory_order_acq_rel);
176 
177         if( write ) {       // Acquiring for write
178 
179             if( predecessor ) {
180                 ITT_NOTIFY(sync_prepare, s.my_mutex);
181                 predecessor = tricky_pointer(predecessor) & ~FLAG;
182                 __TBB_ASSERT( !predecessor->my_next, "the predecessor has another successor!");
183                 tricky_pointer::store(predecessor->my_next, &s, std::memory_order_release);
184                 // We are acquiring the mutex
185                 spin_wait_until_eq(s.my_going, 1U, std::memory_order_acquire);
186             }
187 
188         } else {            // Acquiring for read
189     #if __TBB_USE_ITT_NOTIFY
190             bool sync_prepare_done = false;
191     #endif
192             if( predecessor ) {
193                 unsigned char pred_state{};
194                 __TBB_ASSERT( !s.my_prev.load(std::memory_order_relaxed), "the predecessor is already set" );
195                 if( tricky_pointer(predecessor) & FLAG ) {
196                     /* this is only possible if predecessor is an upgrading reader and it signals us to wait */
197                     pred_state = STATE_UPGRADE_WAITING;
198                     predecessor = tricky_pointer(predecessor) & ~FLAG;
199                 } else {
200                     // Load predecessor->my_state now, because once predecessor->my_next becomes
201                     // non-null, we must assume that *predecessor might be destroyed.
202                     pred_state = predecessor->my_state.load(std::memory_order_relaxed);
203                     if (pred_state == STATE_READER) {
204                         // Notify the previous reader to unblock us.
205                         predecessor->my_state.compare_exchange_strong(pred_state, STATE_READER_UNBLOCKNEXT, std::memory_order_relaxed);
206                     }
207                     if (pred_state == STATE_ACTIVEREADER)  { // either we initially read it or CAS failed
208                         // Active reader means that the predecessor already acquired the mutex and cannot notify us.
209                         // Therefore, we need to acquire the mutex ourselves by re-reading predecessor state.
210                         (void)predecessor->my_state.load(std::memory_order_acquire);
211                     }
212                 }
213                 tricky_pointer::store(s.my_prev, predecessor, std::memory_order_relaxed);
214                 __TBB_ASSERT( !( tricky_pointer(predecessor) & FLAG ), "use of corrupted pointer!" );
215                 __TBB_ASSERT( !predecessor->my_next.load(std::memory_order_relaxed), "the predecessor has another successor!");
216                 tricky_pointer::store(predecessor->my_next, &s, std::memory_order_release);
217                 if( pred_state != STATE_ACTIVEREADER ) {
218     #if __TBB_USE_ITT_NOTIFY
219                     sync_prepare_done = true;
220                     ITT_NOTIFY(sync_prepare, s.my_mutex);
221     #endif
222                     // We are acquiring the mutex
223                     spin_wait_until_eq(s.my_going, 1U, std::memory_order_acquire);
224                 }
225             }
226 
227             // The protected state must have been acquired here before it can be further released to any other reader(s):
228             unsigned char old_state = STATE_READER;
229             // When this reader is signaled by previous actor it acquires the mutex.
230             // We need to build happens-before relation with all other coming readers that will read our ACTIVEREADER
231             // without blocking on my_going. Therefore, we need to publish ACTIVEREADER with release semantics.
232             // On fail it is relaxed, because we will build happens-before on my_going.
233             s.my_state.compare_exchange_strong(old_state, STATE_ACTIVEREADER, std::memory_order_release, std::memory_order_relaxed);
234             if( old_state!=STATE_READER ) {
235 #if __TBB_USE_ITT_NOTIFY
236                 if( !sync_prepare_done )
237                     ITT_NOTIFY(sync_prepare, s.my_mutex);
238 #endif
239                 // Failed to become active reader -> need to unblock the next waiting reader first
240                 __TBB_ASSERT( s.my_state.load(std::memory_order_relaxed)==STATE_READER_UNBLOCKNEXT, "unexpected state" );
241                 spin_wait_while_eq(s.my_next, 0U, std::memory_order_acquire);
242                 /* my_state should be changed before unblocking the next otherwise it might finish
243                    and another thread can get our old state and left blocked */
244                 s.my_state.store(STATE_ACTIVEREADER, std::memory_order_relaxed);
245                 tricky_pointer::load(s.my_next, std::memory_order_relaxed)->my_going.store(1U, std::memory_order_release);
246             }
247             __TBB_ASSERT(s.my_state.load(std::memory_order_relaxed) == STATE_ACTIVEREADER, "unlocked reader is active reader");
248         }
249 
250         ITT_NOTIFY(sync_acquired, s.my_mutex);
251     }
252 
253     //! A method to acquire queuing_rw_mutex if it is free
try_acquiretbb::detail::r1::queuing_rw_mutex_impl254     static bool try_acquire(d1::queuing_rw_mutex& m, d1::queuing_rw_mutex::scoped_lock& s, bool write)
255     {
256         __TBB_ASSERT( !s.my_mutex, "scoped_lock is already holding a mutex");
257 
258         if( m.q_tail.load(std::memory_order_relaxed) )
259             return false; // Someone already took the lock
260 
261         // Must set all fields before the exchange, because once the
262         // exchange executes, *this becomes accessible to other threads.
263         s.my_prev.store(0U, std::memory_order_relaxed);
264         s.my_next.store(0U, std::memory_order_relaxed);
265         s.my_going.store(0U, std::memory_order_relaxed); // TODO: remove dead assignment?
266         s.my_state.store(d1::queuing_rw_mutex::scoped_lock::state_t(write ? STATE_WRITER : STATE_ACTIVEREADER), std::memory_order_relaxed);
267         s.my_internal_lock.store(RELEASED, std::memory_order_relaxed);
268 
269         // The CAS must have release semantics, because we are
270         // "sending" the fields initialized above to other actors.
271         // We need acquire semantics, because we are acquiring the mutex
272         d1::queuing_rw_mutex::scoped_lock* expected = nullptr;
273         if (!m.q_tail.compare_exchange_strong(expected, &s, std::memory_order_acq_rel))
274             return false; // Someone already took the lock
275         s.my_mutex = &m;
276         ITT_NOTIFY(sync_acquired, s.my_mutex);
277         return true;
278     }
279 
280     //! A method to release queuing_rw_mutex lock
releasetbb::detail::r1::queuing_rw_mutex_impl281     static void release(d1::queuing_rw_mutex::scoped_lock& s) {
282         __TBB_ASSERT(s.my_mutex!=nullptr, "no lock acquired");
283 
284         ITT_NOTIFY(sync_releasing, s.my_mutex);
285 
286         if( s.my_state.load(std::memory_order_relaxed) == STATE_WRITER ) { // Acquired for write
287 
288             // The logic below is the same as "writerUnlock", but elides
289             // "return" from the middle of the routine.
290             // In the statement below, acquire semantics of reading my_next is required
291             // so that following operations with fields of my_next are safe.
292             d1::queuing_rw_mutex::scoped_lock* next = tricky_pointer::load(s.my_next, std::memory_order_acquire);
293             if( !next ) {
294                 d1::queuing_rw_mutex::scoped_lock* expected = &s;
295                 // Release mutex on success otherwise wait for successor publication
296                 if( s.my_mutex->q_tail.compare_exchange_strong(expected, nullptr,
297                     std::memory_order_release, std::memory_order_relaxed) )
298                 {
299                     // this was the only item in the queue, and the queue is now empty.
300                     goto done;
301                 }
302                 spin_wait_while_eq(s.my_next, 0U, std::memory_order_relaxed);
303                 next = tricky_pointer::load(s.my_next, std::memory_order_acquire);
304             }
305             next->my_going.store(2U, std::memory_order_relaxed); // protect next queue node from being destroyed too early
306             // If the next is STATE_UPGRADE_WAITING, it is expected to acquire all other released readers via release
307             // sequence in next->my_state. In that case, we need to preserve release sequence in next->my_state
308             // contributed by other reader. So, there are two approaches not to break the release sequence:
309             //   1. Use read-modify-write (exchange) operation to store with release the UPGRADE_LOSER state;
310             //   2. Acquire the release sequence and store the sequence and UPGRADE_LOSER state.
311             // The second approach seems better on x86 because it does not involve interlocked operations.
312             // Therefore, we read next->my_state with acquire while it is not required for else branch to get the
313             // release sequence.
314             if( next->my_state.load(std::memory_order_acquire)==STATE_UPGRADE_WAITING ) {
315                 // the next waiting for upgrade means this writer was upgraded before.
316                 acquire_internal_lock(s);
317                 // Responsibility transition, the one who reads uncorrupted my_prev will do release.
318                 // Guarantee that above store of 2 into next->my_going happens-before resetting of next->my_prev
319                 d1::queuing_rw_mutex::scoped_lock* tmp = tricky_pointer::exchange(next->my_prev, nullptr, std::memory_order_release);
320                 // Pass the release sequence that we acquired with the above load of next->my_state.
321                 next->my_state.store(STATE_UPGRADE_LOSER, std::memory_order_release);
322                 // We are releasing the mutex
323                 next->my_going.store(1U, std::memory_order_release);
324                 unblock_or_wait_on_internal_lock(s, get_flag(tmp));
325             } else {
326                 // next->state cannot be STATE_UPGRADE_REQUESTED
327                 __TBB_ASSERT( next->my_state.load(std::memory_order_relaxed) & (STATE_COMBINED_WAITINGREADER | STATE_WRITER), "unexpected state" );
328                 __TBB_ASSERT( !( next->my_prev.load(std::memory_order_relaxed) & FLAG ), "use of corrupted pointer!" );
329                 // Guarantee that above store of 2 into next->my_going happens-before resetting of next->my_prev
330                 tricky_pointer::store(next->my_prev, nullptr, std::memory_order_release);
331                 // We are releasing the mutex
332                 next->my_going.store(1U, std::memory_order_release);
333             }
334 
335         } else { // Acquired for read
336             // The basic idea it to build happens-before relation with left and right readers via prev and next. In addition,
337             // the first reader should acquire the left (prev) signal and propagate to right (next). To simplify, we always
338             // build happens-before relation between left and right (left is happened before right).
339             queuing_rw_mutex::scoped_lock *tmp = nullptr;
340     retry:
341             // Addition to the original paper: Mark my_prev as in use
342             queuing_rw_mutex::scoped_lock *predecessor = tricky_pointer::fetch_add(s.my_prev, FLAG, std::memory_order_acquire);
343 
344             if( predecessor ) {
345                 if( !(try_acquire_internal_lock(*predecessor)) )
346                 {
347                     // Failed to acquire the lock on predecessor. The predecessor either unlinks or upgrades.
348                     // In the second case, it could or could not know my "in use" flag - need to check
349                     // Responsibility transition, the one who reads uncorrupted my_prev will do release.
350                     tmp = tricky_pointer::compare_exchange_strong(s.my_prev, tricky_pointer(predecessor) | FLAG, predecessor, std::memory_order_acquire);
351                     if( !(tricky_pointer(tmp) & FLAG) ) {
352                         __TBB_ASSERT(tricky_pointer::load(s.my_prev, std::memory_order_relaxed) != (tricky_pointer(predecessor) | FLAG), nullptr);
353                         // Now owner of predecessor is waiting for _us_ to release its lock
354                         release_internal_lock(*predecessor);
355                     }
356                     // else the "in use" flag is back -> the predecessor didn't get it and will release itself; nothing to do
357 
358                     tmp = nullptr;
359                     goto retry;
360                 }
361                 __TBB_ASSERT(predecessor && predecessor->my_internal_lock.load(std::memory_order_relaxed)==ACQUIRED, "predecessor's lock is not acquired");
362                 tricky_pointer::store(s.my_prev, predecessor, std::memory_order_relaxed);
363                 acquire_internal_lock(s);
364 
365                 tricky_pointer::store(predecessor->my_next, nullptr, std::memory_order_release);
366 
367                 d1::queuing_rw_mutex::scoped_lock* expected = &s;
368                 if( !tricky_pointer::load(s.my_next, std::memory_order_acquire) && !s.my_mutex->q_tail.compare_exchange_strong(expected, predecessor, std::memory_order_release) ) {
369                     spin_wait_while_eq( s.my_next, 0U, std::memory_order_acquire );
370                 }
371                 __TBB_ASSERT( !(s.my_next.load(std::memory_order_relaxed) & FLAG), "use of corrupted pointer" );
372 
373                 // my_next is acquired either with load or spin_wait.
374                 if(d1::queuing_rw_mutex::scoped_lock *const l_next = tricky_pointer::load(s.my_next, std::memory_order_relaxed) ) { // I->next != nil, TODO: rename to next after clearing up and adapting the n in the comment two lines below
375                     // Equivalent to I->next->prev = I->prev but protected against (prev[n]&FLAG)!=0
376                     tmp = tricky_pointer::exchange(l_next->my_prev, predecessor, std::memory_order_release);
377                     // I->prev->next = I->next;
378                     __TBB_ASSERT(tricky_pointer::load(s.my_prev, std::memory_order_relaxed)==predecessor, nullptr);
379                     predecessor->my_next.store(s.my_next.load(std::memory_order_relaxed), std::memory_order_release);
380                 }
381                 // Safe to release in the order opposite to acquiring which makes the code simpler
382                 release_internal_lock(*predecessor);
383 
384             } else { // No predecessor when we looked
385                 acquire_internal_lock(s);  // "exclusiveLock(&I->EL)"
386                 d1::queuing_rw_mutex::scoped_lock* next = tricky_pointer::load(s.my_next, std::memory_order_acquire);
387                 if( !next ) {
388                     d1::queuing_rw_mutex::scoped_lock* expected = &s;
389                     // Release mutex on success otherwise wait for successor publication
390                     if( !s.my_mutex->q_tail.compare_exchange_strong(expected, nullptr,
391                         std::memory_order_release, std::memory_order_relaxed) )
392                     {
393                         spin_wait_while_eq( s.my_next, 0U, std::memory_order_relaxed );
394                         next = tricky_pointer::load(s.my_next, std::memory_order_acquire);
395                     } else {
396                         goto unlock_self;
397                     }
398                 }
399                 next->my_going.store(2U, std::memory_order_relaxed);
400                 // Responsibility transition, the one who reads uncorrupted my_prev will do release.
401                 tmp = tricky_pointer::exchange(next->my_prev, nullptr, std::memory_order_release);
402                 next->my_going.store(1U, std::memory_order_release);
403             }
404     unlock_self:
405             unblock_or_wait_on_internal_lock(s, get_flag(tmp));
406         }
407     done:
408         // Lifetime synchronization, no need to build happens-before relation
409         spin_wait_while_eq( s.my_going, 2U, std::memory_order_relaxed );
410 
411         s.initialize();
412     }
413 
downgrade_to_readertbb::detail::r1::queuing_rw_mutex_impl414     static bool downgrade_to_reader(d1::queuing_rw_mutex::scoped_lock& s) {
415         if ( s.my_state.load(std::memory_order_relaxed) == STATE_ACTIVEREADER ) return true; // Already a reader
416 
417         ITT_NOTIFY(sync_releasing, s.my_mutex);
418         d1::queuing_rw_mutex::scoped_lock* next = tricky_pointer::load(s.my_next, std::memory_order_acquire);
419         if( !next ) {
420             s.my_state.store(STATE_READER, std::memory_order_seq_cst);
421             // the following load of q_tail must not be reordered with setting STATE_READER above
422             if( &s == s.my_mutex->q_tail.load(std::memory_order_seq_cst) ) {
423                 unsigned char old_state = STATE_READER;
424                 // When this reader is signaled by previous actor it acquires the mutex.
425                 // We need to build happens-before relation with all other coming readers that will read our ACTIVEREADER
426                 // without blocking on my_going. Therefore, we need to publish ACTIVEREADER with release semantics.
427                 // On fail it is relaxed, because we will build happens-before on my_going.
428                 s.my_state.compare_exchange_strong(old_state, STATE_ACTIVEREADER, std::memory_order_release, std::memory_order_relaxed);
429                 if( old_state==STATE_READER )
430                     return true; // Downgrade completed
431             }
432             /* wait for the next to register */
433             spin_wait_while_eq(s.my_next, 0U, std::memory_order_relaxed);
434             next = tricky_pointer::load(s.my_next, std::memory_order_acquire);
435         }
436 
437         __TBB_ASSERT( next, "still no successor at this point!" );
438         if( next->my_state.load(std::memory_order_relaxed) & STATE_COMBINED_WAITINGREADER )
439             next->my_going.store(1U, std::memory_order_release);
440         // If the next is STATE_UPGRADE_WAITING, it is expected to acquire all other released readers via release
441         // sequence in next->my_state. In that case, we need to preserve release sequence in next->my_state
442         // contributed by other reader. So, there are two approaches not to break the release sequence:
443         //   1. Use read-modify-write (exchange) operation to store with release the UPGRADE_LOSER state;
444         //   2. Acquire the release sequence and store the sequence and UPGRADE_LOSER state.
445         // The second approach seems better on x86 because it does not involve interlocked operations.
446         // Therefore, we read next->my_state with acquire while it is not required for else branch to get the
447         // release sequence.
448         else if( next->my_state.load(std::memory_order_acquire)==STATE_UPGRADE_WAITING )
449             // the next waiting for upgrade means this writer was upgraded before.
450             // To safe release sequence on next->my_state read it with acquire
451             next->my_state.store(STATE_UPGRADE_LOSER, std::memory_order_release);
452         s.my_state.store(STATE_ACTIVEREADER, std::memory_order_release);
453         return true;
454     }
455 
upgrade_to_writertbb::detail::r1::queuing_rw_mutex_impl456     static bool upgrade_to_writer(d1::queuing_rw_mutex::scoped_lock& s) {
457         if (s.my_state.load(std::memory_order_relaxed) == STATE_WRITER) {
458             // Already a writer
459             return true;
460         }
461 
462         __TBB_ASSERT(s.my_state.load(std::memory_order_relaxed) == STATE_ACTIVEREADER, "only active reader can be updated");
463 
464         queuing_rw_mutex::scoped_lock* tmp{};
465         queuing_rw_mutex::scoped_lock* me = &s;
466 
467         ITT_NOTIFY(sync_releasing, s.my_mutex);
468         // Publish ourselves into my_state that other UPGRADE_WAITING actors can acquire our state.
469         s.my_state.store(STATE_UPGRADE_REQUESTED, std::memory_order_release);
470     requested:
471         __TBB_ASSERT( !(s.my_next.load(std::memory_order_relaxed) & FLAG), "use of corrupted pointer!" );
472         acquire_internal_lock(s);
473         d1::queuing_rw_mutex::scoped_lock* expected = &s;
474         if( !s.my_mutex->q_tail.compare_exchange_strong(expected, tricky_pointer(me)|FLAG, std::memory_order_acq_rel) ) {
475             spin_wait_while_eq( s.my_next, 0U, std::memory_order_relaxed );
476             queuing_rw_mutex::scoped_lock * next;
477             next = tricky_pointer::fetch_add(s.my_next, FLAG, std::memory_order_acquire);
478             // While we were READER the next READER might reach STATE_UPGRADE_WAITING state.
479             // Therefore, it did not build happens before relation with us and we need to acquire the
480             // next->my_state to build the happens before relation ourselves
481             unsigned short n_state = next->my_state.load(std::memory_order_acquire);
482             /* the next reader can be blocked by our state. the best thing to do is to unblock it */
483             if( n_state & STATE_COMBINED_WAITINGREADER )
484                 next->my_going.store(1U, std::memory_order_release);
485             // Responsibility transition, the one who reads uncorrupted my_prev will do release.
486             tmp = tricky_pointer::exchange(next->my_prev, &s, std::memory_order_release);
487             unblock_or_wait_on_internal_lock(s, get_flag(tmp));
488             if( n_state & (STATE_COMBINED_READER | STATE_UPGRADE_REQUESTED) ) {
489                 // save next|FLAG for simplicity of following comparisons
490                 tmp = tricky_pointer(next)|FLAG;
491                 for( atomic_backoff b; tricky_pointer::load(s.my_next, std::memory_order_relaxed)==tmp; b.pause() ) {
492                     if( s.my_state.load(std::memory_order_acquire) & STATE_COMBINED_UPGRADING ) {
493                         if( tricky_pointer::load(s.my_next, std::memory_order_acquire)==tmp )
494                             tricky_pointer::store(s.my_next, next, std::memory_order_relaxed);
495                         goto waiting;
496                     }
497                 }
498                 __TBB_ASSERT(tricky_pointer::load(s.my_next, std::memory_order_relaxed) != (tricky_pointer(next)|FLAG), nullptr);
499                 goto requested;
500             } else {
501                 __TBB_ASSERT( n_state & (STATE_WRITER | STATE_UPGRADE_WAITING), "unexpected state");
502                 __TBB_ASSERT( (tricky_pointer(next)|FLAG) == tricky_pointer::load(s.my_next, std::memory_order_relaxed), nullptr);
503                 tricky_pointer::store(s.my_next, next, std::memory_order_relaxed);
504             }
505         } else {
506             /* We are in the tail; whoever comes next is blocked by q_tail&FLAG */
507             release_internal_lock(s);
508         } // if( this != my_mutex->q_tail... )
509         {
510             unsigned char old_state = STATE_UPGRADE_REQUESTED;
511             // If we reach STATE_UPGRADE_WAITING state we do not build happens-before relation with READER on
512             // left. We delegate this responsibility to READER on left when it try upgrading. Therefore, we are releasing
513             // on success.
514             // Otherwise, on fail, we already acquired the next->my_state.
515             s.my_state.compare_exchange_strong(old_state, STATE_UPGRADE_WAITING, std::memory_order_release, std::memory_order_relaxed);
516         }
517     waiting:
518         __TBB_ASSERT( !( s.my_next.load(std::memory_order_relaxed) & FLAG ), "use of corrupted pointer!" );
519         __TBB_ASSERT( s.my_state & STATE_COMBINED_UPGRADING, "wrong state at upgrade waiting_retry" );
520         __TBB_ASSERT( me==&s, nullptr );
521         ITT_NOTIFY(sync_prepare, s.my_mutex);
522         /* if no one was blocked by the "corrupted" q_tail, turn it back */
523         expected = tricky_pointer(me)|FLAG;
524         s.my_mutex->q_tail.compare_exchange_strong(expected, &s, std::memory_order_release);
525         queuing_rw_mutex::scoped_lock * predecessor;
526         // Mark my_prev as 'in use' to prevent predecessor from releasing
527         predecessor = tricky_pointer::fetch_add(s.my_prev, FLAG, std::memory_order_acquire);
528         if( predecessor ) {
529             bool success = try_acquire_internal_lock(*predecessor);
530             {
531                 // While the predecessor pointer (my_prev) is in use (FLAG is set), we can safely update the node`s state.
532                 // Corrupted pointer transitions responsibility to release the predecessor`s node on us.
533                 unsigned char old_state = STATE_UPGRADE_REQUESTED;
534                 // Try to build happens before with the upgrading READER on left. If fail, the predecessor state is not
535                 // important for us because it will acquire our state.
536                 predecessor->my_state.compare_exchange_strong(old_state, STATE_UPGRADE_WAITING, std::memory_order_release,
537                     std::memory_order_relaxed);
538             }
539             if( !success ) {
540                 // Responsibility transition, the one who reads uncorrupted my_prev will do release.
541                 tmp = tricky_pointer::compare_exchange_strong(s.my_prev, tricky_pointer(predecessor)|FLAG, predecessor, std::memory_order_acquire);
542                 if( tricky_pointer(tmp) & FLAG ) {
543                     tricky_pointer::spin_wait_while_eq(s.my_prev, predecessor);
544                     predecessor = tricky_pointer::load(s.my_prev, std::memory_order_relaxed);
545                 } else {
546                     // TODO: spin_wait condition seems never reachable
547                     tricky_pointer::spin_wait_while_eq(s.my_prev, tricky_pointer(predecessor)|FLAG);
548                     release_internal_lock(*predecessor);
549                 }
550             } else {
551                 tricky_pointer::store(s.my_prev, predecessor, std::memory_order_relaxed);
552                 release_internal_lock(*predecessor);
553                 tricky_pointer::spin_wait_while_eq(s.my_prev, predecessor);
554                 predecessor = tricky_pointer::load(s.my_prev, std::memory_order_relaxed);
555             }
556             if( predecessor )
557                 goto waiting;
558         } else {
559             tricky_pointer::store(s.my_prev, nullptr, std::memory_order_relaxed);
560         }
561         __TBB_ASSERT( !predecessor && !s.my_prev, nullptr );
562 
563         // additional lifetime issue prevention checks
564         // wait for the successor to finish working with my fields
565         wait_for_release_of_internal_lock(s);
566         // now wait for the predecessor to finish working with my fields
567         spin_wait_while_eq( s.my_going, 2U );
568 
569         bool result = ( s.my_state != STATE_UPGRADE_LOSER );
570         s.my_state.store(STATE_WRITER, std::memory_order_relaxed);
571         s.my_going.store(1U, std::memory_order_relaxed);
572 
573         ITT_NOTIFY(sync_acquired, s.my_mutex);
574         return result;
575     }
576 
is_writertbb::detail::r1::queuing_rw_mutex_impl577     static bool is_writer(const d1::queuing_rw_mutex::scoped_lock& m) {
578         return m.my_state.load(std::memory_order_relaxed) == STATE_WRITER;
579     }
580 
constructtbb::detail::r1::queuing_rw_mutex_impl581     static void construct(d1::queuing_rw_mutex& m) {
582         suppress_unused_warning(m);
583         ITT_SYNC_CREATE(&m, _T("tbb::queuing_rw_mutex"), _T(""));
584     }
585 };
586 
acquire(d1::queuing_rw_mutex & m,d1::queuing_rw_mutex::scoped_lock & s,bool write)587 void __TBB_EXPORTED_FUNC acquire(d1::queuing_rw_mutex& m, d1::queuing_rw_mutex::scoped_lock& s, bool write) {
588     queuing_rw_mutex_impl::acquire(m, s, write);
589 }
590 
try_acquire(d1::queuing_rw_mutex & m,d1::queuing_rw_mutex::scoped_lock & s,bool write)591 bool __TBB_EXPORTED_FUNC try_acquire(d1::queuing_rw_mutex& m, d1::queuing_rw_mutex::scoped_lock& s, bool write) {
592     return queuing_rw_mutex_impl::try_acquire(m, s, write);
593 }
594 
release(d1::queuing_rw_mutex::scoped_lock & s)595 void __TBB_EXPORTED_FUNC release(d1::queuing_rw_mutex::scoped_lock& s) {
596     queuing_rw_mutex_impl::release(s);
597 }
598 
upgrade_to_writer(d1::queuing_rw_mutex::scoped_lock & s)599 bool __TBB_EXPORTED_FUNC upgrade_to_writer(d1::queuing_rw_mutex::scoped_lock& s) {
600     return queuing_rw_mutex_impl::upgrade_to_writer(s);
601 }
602 
is_writer(const d1::queuing_rw_mutex::scoped_lock & s)603 bool __TBB_EXPORTED_FUNC is_writer(const d1::queuing_rw_mutex::scoped_lock& s) {
604     return queuing_rw_mutex_impl::is_writer(s);
605 }
606 
downgrade_to_reader(d1::queuing_rw_mutex::scoped_lock & s)607 bool __TBB_EXPORTED_FUNC downgrade_to_reader(d1::queuing_rw_mutex::scoped_lock& s) {
608     return queuing_rw_mutex_impl::downgrade_to_reader(s);
609 }
610 
construct(d1::queuing_rw_mutex & m)611 void __TBB_EXPORTED_FUNC construct(d1::queuing_rw_mutex& m) {
612     queuing_rw_mutex_impl::construct(m);
613 }
614 
615 } // namespace r1
616 } // namespace detail
617 } // namespace tbb
618