1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/write_thread.h"
7 #include <chrono>
8 #include <thread>
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "test_util/sync_point.h"
13 #include "util/random.h"
14 
15 namespace ROCKSDB_NAMESPACE {
16 
WriteThread(const ImmutableDBOptions & db_options)17 WriteThread::WriteThread(const ImmutableDBOptions& db_options)
18     : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
19                           ? db_options.write_thread_max_yield_usec
20                           : 0),
21       slow_yield_usec_(db_options.write_thread_slow_yield_usec),
22       allow_concurrent_memtable_write_(
23           db_options.allow_concurrent_memtable_write),
24       enable_pipelined_write_(db_options.enable_pipelined_write),
25       max_write_batch_group_size_bytes(
26           db_options.max_write_batch_group_size_bytes),
27       newest_writer_(nullptr),
28       newest_memtable_writer_(nullptr),
29       last_sequence_(0),
30       write_stall_dummy_(),
31       stall_mu_(),
32       stall_cv_(&stall_mu_) {}
33 
BlockingAwaitState(Writer * w,uint8_t goal_mask)34 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
35   // We're going to block.  Lazily create the mutex.  We guarantee
36   // propagation of this construction to the waker via the
37   // STATE_LOCKED_WAITING state.  The waker won't try to touch the mutex
38   // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
39   // we install below.
40   w->CreateMutex();
41 
42   auto state = w->state.load(std::memory_order_acquire);
43   assert(state != STATE_LOCKED_WAITING);
44   if ((state & goal_mask) == 0 &&
45       w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
46     // we have permission (and an obligation) to use StateMutex
47     std::unique_lock<std::mutex> guard(w->StateMutex());
48     w->StateCV().wait(guard, [w] {
49       return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
50     });
51     state = w->state.load(std::memory_order_relaxed);
52   }
53   // else tricky.  Goal is met or CAS failed.  In the latter case the waker
54   // must have changed the state, and compare_exchange_strong has updated
55   // our local variable with the new one.  At the moment WriteThread never
56   // waits for a transition across intermediate states, so we know that
57   // since a state change has occurred the goal must have been met.
58   assert((state & goal_mask) != 0);
59   return state;
60 }
61 
AwaitState(Writer * w,uint8_t goal_mask,AdaptationContext * ctx)62 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
63                                 AdaptationContext* ctx) {
64   uint8_t state = 0;
65 
66   // 1. Busy loop using "pause" for 1 micro sec
67   // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
68   // 3. Else blocking wait
69 
70   // On a modern Xeon each loop takes about 7 nanoseconds (most of which
71   // is the effect of the pause instruction), so 200 iterations is a bit
72   // more than a microsecond.  This is long enough that waits longer than
73   // this can amortize the cost of accessing the clock and yielding.
74   for (uint32_t tries = 0; tries < 200; ++tries) {
75     state = w->state.load(std::memory_order_acquire);
76     if ((state & goal_mask) != 0) {
77       return state;
78     }
79     port::AsmVolatilePause();
80   }
81 
82   // This is below the fast path, so that the stat is zero when all writes are
83   // from the same thread.
84   PERF_TIMER_GUARD(write_thread_wait_nanos);
85 
86   // If we're only going to end up waiting a short period of time,
87   // it can be a lot more efficient to call std::this_thread::yield()
88   // in a loop than to block in StateMutex().  For reference, on my 4.0
89   // SELinux test server with support for syscall auditing enabled, the
90   // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
91   // 2.7 usec, and the average is more like 10 usec.  That can be a big
92   // drag on RockDB's single-writer design.  Of course, spinning is a
93   // bad idea if other threads are waiting to run or if we're going to
94   // wait for a long time.  How do we decide?
95   //
96   // We break waiting into 3 categories: short-uncontended,
97   // short-contended, and long.  If we had an oracle, then we would always
98   // spin for short-uncontended, always block for long, and our choice for
99   // short-contended might depend on whether we were trying to optimize
100   // RocksDB throughput or avoid being greedy with system resources.
101   //
102   // Bucketing into short or long is easy by measuring elapsed time.
103   // Differentiating short-uncontended from short-contended is a bit
104   // trickier, but not too bad.  We could look for involuntary context
105   // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
106   // (portability code and CPU) to just look for yield calls that take
107   // longer than we expect.  sched_yield() doesn't actually result in any
108   // context switch overhead if there are no other runnable processes
109   // on the current core, in which case it usually takes less than
110   // a microsecond.
111   //
112   // There are two primary tunables here: the threshold between "short"
113   // and "long" waits, and the threshold at which we suspect that a yield
114   // is slow enough to indicate we should probably block.  If these
115   // thresholds are chosen well then CPU-bound workloads that don't
116   // have more threads than cores will experience few context switches
117   // (voluntary or involuntary), and the total number of context switches
118   // (voluntary and involuntary) will not be dramatically larger (maybe
119   // 2x) than the number of voluntary context switches that occur when
120   // --max_yield_wait_micros=0.
121   //
122   // There's another constant, which is the number of slow yields we will
123   // tolerate before reversing our previous decision.  Solitary slow
124   // yields are pretty common (low-priority small jobs ready to run),
125   // so this should be at least 2.  We set this conservatively to 3 so
126   // that we can also immediately schedule a ctx adaptation, rather than
127   // waiting for the next update_ctx.
128 
129   const size_t kMaxSlowYieldsWhileSpinning = 3;
130 
131   // Whether the yield approach has any credit in this context. The credit is
132   // added by yield being succesfull before timing out, and decreased otherwise.
133   auto& yield_credit = ctx->value;
134   // Update the yield_credit based on sample runs or right after a hard failure
135   bool update_ctx = false;
136   // Should we reinforce the yield credit
137   bool would_spin_again = false;
138   // The samling base for updating the yeild credit. The sampling rate would be
139   // 1/sampling_base.
140   const int sampling_base = 256;
141 
142   if (max_yield_usec_ > 0) {
143     update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
144 
145     if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
146       // we're updating the adaptation statistics, or spinning has >
147       // 50% chance of being shorter than max_yield_usec_ and causing no
148       // involuntary context switches
149       auto spin_begin = std::chrono::steady_clock::now();
150 
151       // this variable doesn't include the final yield (if any) that
152       // causes the goal to be met
153       size_t slow_yield_count = 0;
154 
155       auto iter_begin = spin_begin;
156       while ((iter_begin - spin_begin) <=
157              std::chrono::microseconds(max_yield_usec_)) {
158         std::this_thread::yield();
159 
160         state = w->state.load(std::memory_order_acquire);
161         if ((state & goal_mask) != 0) {
162           // success
163           would_spin_again = true;
164           break;
165         }
166 
167         auto now = std::chrono::steady_clock::now();
168         if (now == iter_begin ||
169             now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
170           // conservatively count it as a slow yield if our clock isn't
171           // accurate enough to measure the yield duration
172           ++slow_yield_count;
173           if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
174             // Not just one ivcsw, but several.  Immediately update yield_credit
175             // and fall back to blocking
176             update_ctx = true;
177             break;
178           }
179         }
180         iter_begin = now;
181       }
182     }
183   }
184 
185   if ((state & goal_mask) == 0) {
186     TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
187     state = BlockingAwaitState(w, goal_mask);
188   }
189 
190   if (update_ctx) {
191     // Since our update is sample based, it is ok if a thread overwrites the
192     // updates by other threads. Thus the update does not have to be atomic.
193     auto v = yield_credit.load(std::memory_order_relaxed);
194     // fixed point exponential decay with decay constant 1/1024, with +1
195     // and -1 scaled to avoid overflow for int32_t
196     //
197     // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
198     // 0.1%). If the sampled yield was successful, the credit is also increased
199     // by X. Setting X=2^17 ensures that the credit never exceeds
200     // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
201     // logic applies to negative credits.
202     v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
203     yield_credit.store(v, std::memory_order_relaxed);
204   }
205 
206   assert((state & goal_mask) != 0);
207   return state;
208 }
209 
SetState(Writer * w,uint8_t new_state)210 void WriteThread::SetState(Writer* w, uint8_t new_state) {
211   auto state = w->state.load(std::memory_order_acquire);
212   if (state == STATE_LOCKED_WAITING ||
213       !w->state.compare_exchange_strong(state, new_state)) {
214     assert(state == STATE_LOCKED_WAITING);
215 
216     std::lock_guard<std::mutex> guard(w->StateMutex());
217     assert(w->state.load(std::memory_order_relaxed) != new_state);
218     w->state.store(new_state, std::memory_order_relaxed);
219     w->StateCV().notify_one();
220   }
221 }
222 
LinkOne(Writer * w,std::atomic<Writer * > * newest_writer)223 bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
224   assert(newest_writer != nullptr);
225   assert(w->state == STATE_INIT);
226   Writer* writers = newest_writer->load(std::memory_order_relaxed);
227   while (true) {
228     // If write stall in effect, and w->no_slowdown is not true,
229     // block here until stall is cleared. If its true, then return
230     // immediately
231     if (writers == &write_stall_dummy_) {
232       if (w->no_slowdown) {
233         w->status = Status::Incomplete("Write stall");
234         SetState(w, STATE_COMPLETED);
235         return false;
236       }
237       // Since no_slowdown is false, wait here to be notified of the write
238       // stall clearing
239       {
240         MutexLock lock(&stall_mu_);
241         writers = newest_writer->load(std::memory_order_relaxed);
242         if (writers == &write_stall_dummy_) {
243           stall_cv_.Wait();
244           // Load newest_writers_ again since it may have changed
245           writers = newest_writer->load(std::memory_order_relaxed);
246           continue;
247         }
248       }
249     }
250     w->link_older = writers;
251     if (newest_writer->compare_exchange_weak(writers, w)) {
252       return (writers == nullptr);
253     }
254   }
255 }
256 
LinkGroup(WriteGroup & write_group,std::atomic<Writer * > * newest_writer)257 bool WriteThread::LinkGroup(WriteGroup& write_group,
258                             std::atomic<Writer*>* newest_writer) {
259   assert(newest_writer != nullptr);
260   Writer* leader = write_group.leader;
261   Writer* last_writer = write_group.last_writer;
262   Writer* w = last_writer;
263   while (true) {
264     // Unset link_newer pointers to make sure when we call
265     // CreateMissingNewerLinks later it create all missing links.
266     w->link_newer = nullptr;
267     w->write_group = nullptr;
268     if (w == leader) {
269       break;
270     }
271     w = w->link_older;
272   }
273   Writer* newest = newest_writer->load(std::memory_order_relaxed);
274   while (true) {
275     leader->link_older = newest;
276     if (newest_writer->compare_exchange_weak(newest, last_writer)) {
277       return (newest == nullptr);
278     }
279   }
280 }
281 
CreateMissingNewerLinks(Writer * head)282 void WriteThread::CreateMissingNewerLinks(Writer* head) {
283   while (true) {
284     Writer* next = head->link_older;
285     if (next == nullptr || next->link_newer != nullptr) {
286       assert(next == nullptr || next->link_newer == head);
287       break;
288     }
289     next->link_newer = head;
290     head = next;
291   }
292 }
293 
FindNextLeader(Writer * from,Writer * boundary)294 WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
295                                                  Writer* boundary) {
296   assert(from != nullptr && from != boundary);
297   Writer* current = from;
298   while (current->link_older != boundary) {
299     current = current->link_older;
300     assert(current != nullptr);
301   }
302   return current;
303 }
304 
CompleteLeader(WriteGroup & write_group)305 void WriteThread::CompleteLeader(WriteGroup& write_group) {
306   assert(write_group.size > 0);
307   Writer* leader = write_group.leader;
308   if (write_group.size == 1) {
309     write_group.leader = nullptr;
310     write_group.last_writer = nullptr;
311   } else {
312     assert(leader->link_newer != nullptr);
313     leader->link_newer->link_older = nullptr;
314     write_group.leader = leader->link_newer;
315   }
316   write_group.size -= 1;
317   SetState(leader, STATE_COMPLETED);
318 }
319 
CompleteFollower(Writer * w,WriteGroup & write_group)320 void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
321   assert(write_group.size > 1);
322   assert(w != write_group.leader);
323   if (w == write_group.last_writer) {
324     w->link_older->link_newer = nullptr;
325     write_group.last_writer = w->link_older;
326   } else {
327     w->link_older->link_newer = w->link_newer;
328     w->link_newer->link_older = w->link_older;
329   }
330   write_group.size -= 1;
331   SetState(w, STATE_COMPLETED);
332 }
333 
BeginWriteStall()334 void WriteThread::BeginWriteStall() {
335   LinkOne(&write_stall_dummy_, &newest_writer_);
336 
337   // Walk writer list until w->write_group != nullptr. The current write group
338   // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
339   // point
340   Writer* w = write_stall_dummy_.link_older;
341   Writer* prev = &write_stall_dummy_;
342   while (w != nullptr && w->write_group == nullptr) {
343     if (w->no_slowdown) {
344       prev->link_older = w->link_older;
345       w->status = Status::Incomplete("Write stall");
346       SetState(w, STATE_COMPLETED);
347       if (prev->link_older) {
348         prev->link_older->link_newer = prev;
349       }
350       w = prev->link_older;
351     } else {
352       prev = w;
353       w = w->link_older;
354     }
355   }
356 }
357 
EndWriteStall()358 void WriteThread::EndWriteStall() {
359   MutexLock lock(&stall_mu_);
360 
361   // Unlink write_stall_dummy_ from the write queue. This will unblock
362   // pending write threads to enqueue themselves
363   assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
364   assert(write_stall_dummy_.link_older != nullptr);
365   write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
366   newest_writer_.exchange(write_stall_dummy_.link_older);
367 
368   // Wake up writers
369   stall_cv_.SignalAll();
370 }
371 
372 static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
JoinBatchGroup(Writer * w)373 void WriteThread::JoinBatchGroup(Writer* w) {
374   TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
375   assert(w->batch != nullptr);
376 
377   bool linked_as_leader = LinkOne(w, &newest_writer_);
378 
379   if (linked_as_leader) {
380     SetState(w, STATE_GROUP_LEADER);
381   }
382 
383   TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
384 
385   if (!linked_as_leader) {
386     /**
387      * Wait util:
388      * 1) An existing leader pick us as the new leader when it finishes
389      * 2) An existing leader pick us as its follewer and
390      * 2.1) finishes the memtable writes on our behalf
391      * 2.2) Or tell us to finish the memtable writes in pralallel
392      * 3) (pipelined write) An existing leader pick us as its follower and
393      *    finish book-keeping and WAL write for us, enqueue us as pending
394      *    memtable writer, and
395      * 3.1) we become memtable writer group leader, or
396      * 3.2) an existing memtable writer group leader tell us to finish memtable
397      *      writes in parallel.
398      */
399     TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
400     AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
401                       STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
402                &jbg_ctx);
403     TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
404   }
405 }
406 
EnterAsBatchGroupLeader(Writer * leader,WriteGroup * write_group)407 size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
408                                             WriteGroup* write_group) {
409   assert(leader->link_older == nullptr);
410   assert(leader->batch != nullptr);
411   assert(write_group != nullptr);
412 
413   size_t size = WriteBatchInternal::ByteSize(leader->batch);
414 
415   // Allow the group to grow up to a maximum size, but if the
416   // original write is small, limit the growth so we do not slow
417   // down the small write too much.
418   size_t max_size = max_write_batch_group_size_bytes;
419   const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
420   if (size <= min_batch_size_bytes) {
421     max_size = size + min_batch_size_bytes;
422   }
423 
424   leader->write_group = write_group;
425   write_group->leader = leader;
426   write_group->last_writer = leader;
427   write_group->size = 1;
428   Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
429 
430   // This is safe regardless of any db mutex status of the caller. Previous
431   // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
432   // (they emptied the list and then we added ourself as leader) or had to
433   // explicitly wake us up (the list was non-empty when we added ourself,
434   // so we have already received our MarkJoined).
435   CreateMissingNewerLinks(newest_writer);
436 
437   // Tricky. Iteration start (leader) is exclusive and finish
438   // (newest_writer) is inclusive. Iteration goes from old to new.
439   Writer* w = leader;
440   while (w != newest_writer) {
441     w = w->link_newer;
442 
443     if (w->sync && !leader->sync) {
444       // Do not include a sync write into a batch handled by a non-sync write.
445       break;
446     }
447 
448     if (w->no_slowdown != leader->no_slowdown) {
449       // Do not mix writes that are ok with delays with the ones that
450       // request fail on delays.
451       break;
452     }
453 
454     if (w->disable_wal != leader->disable_wal) {
455       // Do not mix writes that enable WAL with the ones whose
456       // WAL disabled.
457       break;
458     }
459 
460     if (w->batch == nullptr) {
461       // Do not include those writes with nullptr batch. Those are not writes,
462       // those are something else. They want to be alone
463       break;
464     }
465 
466     if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
467       // don't batch writes that don't want to be batched
468       break;
469     }
470 
471     auto batch_size = WriteBatchInternal::ByteSize(w->batch);
472     if (size + batch_size > max_size) {
473       // Do not make batch too big
474       break;
475     }
476 
477     w->write_group = write_group;
478     size += batch_size;
479     write_group->last_writer = w;
480     write_group->size++;
481   }
482   TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
483   return size;
484 }
485 
EnterAsMemTableWriter(Writer * leader,WriteGroup * write_group)486 void WriteThread::EnterAsMemTableWriter(Writer* leader,
487                                         WriteGroup* write_group) {
488   assert(leader != nullptr);
489   assert(leader->link_older == nullptr);
490   assert(leader->batch != nullptr);
491   assert(write_group != nullptr);
492 
493   size_t size = WriteBatchInternal::ByteSize(leader->batch);
494 
495   // Allow the group to grow up to a maximum size, but if the
496   // original write is small, limit the growth so we do not slow
497   // down the small write too much.
498   size_t max_size = max_write_batch_group_size_bytes;
499   const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
500   if (size <= min_batch_size_bytes) {
501     max_size = size + min_batch_size_bytes;
502   }
503 
504   leader->write_group = write_group;
505   write_group->leader = leader;
506   write_group->size = 1;
507   Writer* last_writer = leader;
508 
509   if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
510     Writer* newest_writer = newest_memtable_writer_.load();
511     CreateMissingNewerLinks(newest_writer);
512 
513     Writer* w = leader;
514     while (w != newest_writer) {
515       w = w->link_newer;
516 
517       if (w->batch == nullptr) {
518         break;
519       }
520 
521       if (w->batch->HasMerge()) {
522         break;
523       }
524 
525       if (!allow_concurrent_memtable_write_) {
526         auto batch_size = WriteBatchInternal::ByteSize(w->batch);
527         if (size + batch_size > max_size) {
528           // Do not make batch too big
529           break;
530         }
531         size += batch_size;
532       }
533 
534       w->write_group = write_group;
535       last_writer = w;
536       write_group->size++;
537     }
538   }
539 
540   write_group->last_writer = last_writer;
541   write_group->last_sequence =
542       last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
543 }
544 
ExitAsMemTableWriter(Writer *,WriteGroup & write_group)545 void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
546                                        WriteGroup& write_group) {
547   Writer* leader = write_group.leader;
548   Writer* last_writer = write_group.last_writer;
549 
550   Writer* newest_writer = last_writer;
551   if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
552                                                        nullptr)) {
553     CreateMissingNewerLinks(newest_writer);
554     Writer* next_leader = last_writer->link_newer;
555     assert(next_leader != nullptr);
556     next_leader->link_older = nullptr;
557     SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
558   }
559   Writer* w = leader;
560   while (true) {
561     if (!write_group.status.ok()) {
562       w->status = write_group.status;
563     }
564     Writer* next = w->link_newer;
565     if (w != leader) {
566       SetState(w, STATE_COMPLETED);
567     }
568     if (w == last_writer) {
569       break;
570     }
571     w = next;
572   }
573   // Note that leader has to exit last, since it owns the write group.
574   SetState(leader, STATE_COMPLETED);
575 }
576 
LaunchParallelMemTableWriters(WriteGroup * write_group)577 void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
578   assert(write_group != nullptr);
579   write_group->running.store(write_group->size);
580   for (auto w : *write_group) {
581     SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
582   }
583 }
584 
585 static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
586 // This method is called by both the leader and parallel followers
CompleteParallelMemTableWriter(Writer * w)587 bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
588 
589   auto* write_group = w->write_group;
590   if (!w->status.ok()) {
591     std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
592     write_group->status = w->status;
593   }
594 
595   if (write_group->running-- > 1) {
596     // we're not the last one
597     AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
598     return false;
599   }
600   // else we're the last parallel worker and should perform exit duties.
601   w->status = write_group->status;
602   return true;
603 }
604 
ExitAsBatchGroupFollower(Writer * w)605 void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
606   auto* write_group = w->write_group;
607 
608   assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
609   assert(write_group->status.ok());
610   ExitAsBatchGroupLeader(*write_group, write_group->status);
611   assert(w->status.ok());
612   assert(w->state == STATE_COMPLETED);
613   SetState(write_group->leader, STATE_COMPLETED);
614 }
615 
616 static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
ExitAsBatchGroupLeader(WriteGroup & write_group,Status status)617 void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
618                                          Status status) {
619   Writer* leader = write_group.leader;
620   Writer* last_writer = write_group.last_writer;
621   assert(leader->link_older == nullptr);
622 
623   // Propagate memtable write error to the whole group.
624   if (status.ok() && !write_group.status.ok()) {
625     status = write_group.status;
626   }
627 
628   if (enable_pipelined_write_) {
629     // Notify writers don't write to memtable to exit.
630     for (Writer* w = last_writer; w != leader;) {
631       Writer* next = w->link_older;
632       w->status = status;
633       if (!w->ShouldWriteToMemtable()) {
634         CompleteFollower(w, write_group);
635       }
636       w = next;
637     }
638     if (!leader->ShouldWriteToMemtable()) {
639       CompleteLeader(write_group);
640     }
641 
642     Writer* next_leader = nullptr;
643 
644     // Look for next leader before we call LinkGroup. If there isn't
645     // pending writers, place a dummy writer at the tail of the queue
646     // so we know the boundary of the current write group.
647     Writer dummy;
648     Writer* expected = last_writer;
649     bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
650     if (!has_dummy) {
651       // We find at least one pending writer when we insert dummy. We search
652       // for next leader from there.
653       next_leader = FindNextLeader(expected, last_writer);
654       assert(next_leader != nullptr && next_leader != last_writer);
655     }
656 
657     // Link the ramaining of the group to memtable writer list.
658     //
659     // We have to link our group to memtable writer queue before wake up the
660     // next leader or set newest_writer_ to null, otherwise the next leader
661     // can run ahead of us and link to memtable writer queue before we do.
662     if (write_group.size > 0) {
663       if (LinkGroup(write_group, &newest_memtable_writer_)) {
664         // The leader can now be different from current writer.
665         SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
666       }
667     }
668 
669     // If we have inserted dummy in the queue, remove it now and check if there
670     // are pending writer join the queue since we insert the dummy. If so,
671     // look for next leader again.
672     if (has_dummy) {
673       assert(next_leader == nullptr);
674       expected = &dummy;
675       bool has_pending_writer =
676           !newest_writer_.compare_exchange_strong(expected, nullptr);
677       if (has_pending_writer) {
678         next_leader = FindNextLeader(expected, &dummy);
679         assert(next_leader != nullptr && next_leader != &dummy);
680       }
681     }
682 
683     if (next_leader != nullptr) {
684       next_leader->link_older = nullptr;
685       SetState(next_leader, STATE_GROUP_LEADER);
686     }
687     AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
688                            STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
689                &eabgl_ctx);
690   } else {
691     Writer* head = newest_writer_.load(std::memory_order_acquire);
692     if (head != last_writer ||
693         !newest_writer_.compare_exchange_strong(head, nullptr)) {
694       // Either w wasn't the head during the load(), or it was the head
695       // during the load() but somebody else pushed onto the list before
696       // we did the compare_exchange_strong (causing it to fail).  In the
697       // latter case compare_exchange_strong has the effect of re-reading
698       // its first param (head).  No need to retry a failing CAS, because
699       // only a departing leader (which we are at the moment) can remove
700       // nodes from the list.
701       assert(head != last_writer);
702 
703       // After walking link_older starting from head (if not already done)
704       // we will be able to traverse w->link_newer below. This function
705       // can only be called from an active leader, only a leader can
706       // clear newest_writer_, we didn't, and only a clear newest_writer_
707       // could cause the next leader to start their work without a call
708       // to MarkJoined, so we can definitely conclude that no other leader
709       // work is going on here (with or without db mutex).
710       CreateMissingNewerLinks(head);
711       assert(last_writer->link_newer->link_older == last_writer);
712       last_writer->link_newer->link_older = nullptr;
713 
714       // Next leader didn't self-identify, because newest_writer_ wasn't
715       // nullptr when they enqueued (we were definitely enqueued before them
716       // and are still in the list).  That means leader handoff occurs when
717       // we call MarkJoined
718       SetState(last_writer->link_newer, STATE_GROUP_LEADER);
719     }
720     // else nobody else was waiting, although there might already be a new
721     // leader now
722 
723     while (last_writer != leader) {
724       last_writer->status = status;
725       // we need to read link_older before calling SetState, because as soon
726       // as it is marked committed the other thread's Await may return and
727       // deallocate the Writer.
728       auto next = last_writer->link_older;
729       SetState(last_writer, STATE_COMPLETED);
730 
731       last_writer = next;
732     }
733   }
734 }
735 
736 static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
EnterUnbatched(Writer * w,InstrumentedMutex * mu)737 void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
738   assert(w != nullptr && w->batch == nullptr);
739   mu->Unlock();
740   bool linked_as_leader = LinkOne(w, &newest_writer_);
741   if (!linked_as_leader) {
742     TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
743     // Last leader will not pick us as a follower since our batch is nullptr
744     AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
745   }
746   if (enable_pipelined_write_) {
747     WaitForMemTableWriters();
748   }
749   mu->Lock();
750 }
751 
ExitUnbatched(Writer * w)752 void WriteThread::ExitUnbatched(Writer* w) {
753   assert(w != nullptr);
754   Writer* newest_writer = w;
755   if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
756     CreateMissingNewerLinks(newest_writer);
757     Writer* next_leader = w->link_newer;
758     assert(next_leader != nullptr);
759     next_leader->link_older = nullptr;
760     SetState(next_leader, STATE_GROUP_LEADER);
761   }
762 }
763 
764 static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
WaitForMemTableWriters()765 void WriteThread::WaitForMemTableWriters() {
766   assert(enable_pipelined_write_);
767   if (newest_memtable_writer_.load() == nullptr) {
768     return;
769   }
770   Writer w;
771   if (!LinkOne(&w, &newest_memtable_writer_)) {
772     AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
773   }
774   newest_memtable_writer_.store(nullptr);
775 }
776 
777 }  // namespace ROCKSDB_NAMESPACE
778