1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #include <assert.h>
9 #include <stdint.h>
10 #include <atomic>
11 #include <chrono>
12 #include <condition_variable>
13 #include <mutex>
14 #include <type_traits>
15 #include <vector>
16 
17 #include "db/dbformat.h"
18 #include "db/pre_release_callback.h"
19 #include "db/write_callback.h"
20 #include "monitoring/instrumented_mutex.h"
21 #include "rocksdb/options.h"
22 #include "rocksdb/status.h"
23 #include "rocksdb/types.h"
24 #include "rocksdb/write_batch.h"
25 #include "util/autovector.h"
26 
27 namespace ROCKSDB_NAMESPACE {
28 
29 class WriteThread {
30  public:
31   enum State : uint8_t {
32     // The initial state of a writer.  This is a Writer that is
33     // waiting in JoinBatchGroup.  This state can be left when another
34     // thread informs the waiter that it has become a group leader
35     // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
36     // non-parallel informs a follower that its writes have been committed
37     // (-> STATE_COMPLETED), or when a leader that has chosen to perform
38     // updates in parallel and needs this Writer to apply its batch (->
39     // STATE_PARALLEL_FOLLOWER).
40     STATE_INIT = 1,
41 
42     // The state used to inform a waiting Writer that it has become the
43     // leader, and it should now build a write batch group.  Tricky:
44     // this state is not used if newest_writer_ is empty when a writer
45     // enqueues itself, because there is no need to wait (or even to
46     // create the mutex and condvar used to wait) in that case.  This is
47     // a terminal state unless the leader chooses to make this a parallel
48     // batch, in which case the last parallel worker to finish will move
49     // the leader to STATE_COMPLETED.
50     STATE_GROUP_LEADER = 2,
51 
52     // The state used to inform a waiting writer that it has become the
53     // leader of memtable writer group. The leader will either write
54     // memtable for the whole group, or launch a parallel group write
55     // to memtable by calling LaunchParallelMemTableWrite.
56     STATE_MEMTABLE_WRITER_LEADER = 4,
57 
58     // The state used to inform a waiting writer that it has become a
59     // parallel memtable writer. It can be the group leader who launch the
60     // parallel writer group, or one of the followers. The writer should then
61     // apply its batch to the memtable concurrently and call
62     // CompleteParallelMemTableWriter.
63     STATE_PARALLEL_MEMTABLE_WRITER = 8,
64 
65     // A follower whose writes have been applied, or a parallel leader
66     // whose followers have all finished their work.  This is a terminal
67     // state.
68     STATE_COMPLETED = 16,
69 
70     // A state indicating that the thread may be waiting using StateMutex()
71     // and StateCondVar()
72     STATE_LOCKED_WAITING = 32,
73   };
74 
75   struct Writer;
76 
77   struct WriteGroup {
78     Writer* leader = nullptr;
79     Writer* last_writer = nullptr;
80     SequenceNumber last_sequence;
81     // before running goes to zero, status needs leader->StateMutex()
82     Status status;
83     std::atomic<size_t> running;
84     size_t size = 0;
85 
86     struct Iterator {
87       Writer* writer;
88       Writer* last_writer;
89 
IteratorWriteGroup::Iterator90       explicit Iterator(Writer* w, Writer* last)
91           : writer(w), last_writer(last) {}
92 
93       Writer* operator*() const { return writer; }
94 
95       Iterator& operator++() {
96         assert(writer != nullptr);
97         if (writer == last_writer) {
98           writer = nullptr;
99         } else {
100           writer = writer->link_newer;
101         }
102         return *this;
103       }
104 
105       bool operator!=(const Iterator& other) const {
106         return writer != other.writer;
107       }
108     };
109 
beginWriteGroup110     Iterator begin() const { return Iterator(leader, last_writer); }
endWriteGroup111     Iterator end() const { return Iterator(nullptr, nullptr); }
112   };
113 
114   // Information kept for every waiting writer.
115   struct Writer {
116     WriteBatch* batch;
117     bool sync;
118     bool no_slowdown;
119     bool disable_wal;
120     bool disable_memtable;
121     size_t batch_cnt;  // if non-zero, number of sub-batches in the write batch
122     PreReleaseCallback* pre_release_callback;
123     uint64_t log_used;  // log number that this batch was inserted into
124     uint64_t log_ref;   // log number that memtable insert should reference
125     WriteCallback* callback;
126     bool made_waitable;          // records lazy construction of mutex and cv
127     std::atomic<uint8_t> state;  // write under StateMutex() or pre-link
128     WriteGroup* write_group;
129     SequenceNumber sequence;  // the sequence number to use for the first key
130     Status status;
131     Status callback_status;   // status returned by callback->Callback()
132 
133     std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
134     std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
135     Writer* link_older;  // read/write only before linking, or as leader
136     Writer* link_newer;  // lazy, read/write only before linking, or as leader
137 
WriterWriter138     Writer()
139         : batch(nullptr),
140           sync(false),
141           no_slowdown(false),
142           disable_wal(false),
143           disable_memtable(false),
144           batch_cnt(0),
145           pre_release_callback(nullptr),
146           log_used(0),
147           log_ref(0),
148           callback(nullptr),
149           made_waitable(false),
150           state(STATE_INIT),
151           write_group(nullptr),
152           sequence(kMaxSequenceNumber),
153           link_older(nullptr),
154           link_newer(nullptr) {}
155 
156     Writer(const WriteOptions& write_options, WriteBatch* _batch,
157            WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
158            size_t _batch_cnt = 0,
159            PreReleaseCallback* _pre_release_callback = nullptr)
batchWriter160         : batch(_batch),
161           sync(write_options.sync),
162           no_slowdown(write_options.no_slowdown),
163           disable_wal(write_options.disableWAL),
164           disable_memtable(_disable_memtable),
165           batch_cnt(_batch_cnt),
166           pre_release_callback(_pre_release_callback),
167           log_used(0),
168           log_ref(_log_ref),
169           callback(_callback),
170           made_waitable(false),
171           state(STATE_INIT),
172           write_group(nullptr),
173           sequence(kMaxSequenceNumber),
174           link_older(nullptr),
175           link_newer(nullptr) {}
176 
~WriterWriter177     ~Writer() {
178       if (made_waitable) {
179         StateMutex().~mutex();
180         StateCV().~condition_variable();
181       }
182     }
183 
CheckCallbackWriter184     bool CheckCallback(DB* db) {
185       if (callback != nullptr) {
186         callback_status = callback->Callback(db);
187       }
188       return callback_status.ok();
189     }
190 
CreateMutexWriter191     void CreateMutex() {
192       if (!made_waitable) {
193         // Note that made_waitable is tracked separately from state
194         // transitions, because we can't atomically create the mutex and
195         // link into the list.
196         made_waitable = true;
197         new (&state_mutex_bytes) std::mutex;
198         new (&state_cv_bytes) std::condition_variable;
199       }
200     }
201 
202     // returns the aggregate status of this Writer
FinalStatusWriter203     Status FinalStatus() {
204       if (!status.ok()) {
205         // a non-ok memtable write status takes presidence
206         assert(callback == nullptr || callback_status.ok());
207         return status;
208       } else if (!callback_status.ok()) {
209         // if the callback failed then that is the status we want
210         // because a memtable insert should not have been attempted
211         assert(callback != nullptr);
212         assert(status.ok());
213         return callback_status;
214       } else {
215         // if there is no callback then we only care about
216         // the memtable insert status
217         assert(callback == nullptr || callback_status.ok());
218         return status;
219       }
220     }
221 
CallbackFailedWriter222     bool CallbackFailed() {
223       return (callback != nullptr) && !callback_status.ok();
224     }
225 
ShouldWriteToMemtableWriter226     bool ShouldWriteToMemtable() {
227       return status.ok() && !CallbackFailed() && !disable_memtable;
228     }
229 
ShouldWriteToWALWriter230     bool ShouldWriteToWAL() {
231       return status.ok() && !CallbackFailed() && !disable_wal;
232     }
233 
234     // No other mutexes may be acquired while holding StateMutex(), it is
235     // always last in the order
StateMutexWriter236     std::mutex& StateMutex() {
237       assert(made_waitable);
238       return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
239     }
240 
StateCVWriter241     std::condition_variable& StateCV() {
242       assert(made_waitable);
243       return *static_cast<std::condition_variable*>(
244                  static_cast<void*>(&state_cv_bytes));
245     }
246   };
247 
248   struct AdaptationContext {
249     const char* name;
250     std::atomic<int32_t> value;
251 
AdaptationContextAdaptationContext252     explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
253   };
254 
255   explicit WriteThread(const ImmutableDBOptions& db_options);
256 
257   virtual ~WriteThread() = default;
258 
259   // IMPORTANT: None of the methods in this class rely on the db mutex
260   // for correctness. All of the methods except JoinBatchGroup and
261   // EnterUnbatched may be called either with or without the db mutex held.
262   // Correctness is maintained by ensuring that only a single thread is
263   // a leader at a time.
264 
265   // Registers w as ready to become part of a batch group, waits until the
266   // caller should perform some work, and returns the current state of the
267   // writer.  If w has become the leader of a write batch group, returns
268   // STATE_GROUP_LEADER.  If w has been made part of a sequential batch
269   // group and the leader has performed the write, returns STATE_DONE.
270   // If w has been made part of a parallel batch group and is responsible
271   // for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
272   //
273   // The db mutex SHOULD NOT be held when calling this function, because
274   // it will block.
275   //
276   // Writer* w:        Writer to be executed as part of a batch group
277   void JoinBatchGroup(Writer* w);
278 
279   // Constructs a write batch group led by leader, which should be a
280   // Writer passed to JoinBatchGroup on the current thread.
281   //
282   // Writer* leader:          Writer that is STATE_GROUP_LEADER
283   // WriteGroup* write_group: Out-param of group members
284   // returns:                 Total batch group byte size
285   size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
286 
287   // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
288   // and wakes up the next leader (if any).
289   //
290   // WriteGroup* write_group: the write group
291   // Status status:           Status of write operation
292   void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
293 
294   // Exit batch group on behalf of batch group leader.
295   void ExitAsBatchGroupFollower(Writer* w);
296 
297   // Constructs a write batch group led by leader from newest_memtable_writers_
298   // list. The leader should either write memtable for the whole group and
299   // call ExitAsMemTableWriter, or launch parallel memtable write through
300   // LaunchParallelMemTableWriters.
301   void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
302 
303   // Memtable writer group leader, or the last finished writer in a parallel
304   // write group, exit from the newest_memtable_writers_ list, and wake up
305   // the next leader if needed.
306   void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
307 
308   // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
309   // non-leader members of this write batch group.  Sets Writer::sequence
310   // before waking them up.
311   //
312   // WriteGroup* write_group: Extra state used to coordinate the parallel add
313   void LaunchParallelMemTableWriters(WriteGroup* write_group);
314 
315   // Reports the completion of w's batch to the parallel group leader, and
316   // waits for the rest of the parallel batch to complete.  Returns true
317   // if this thread is the last to complete, and hence should advance
318   // the sequence number and then call EarlyExitParallelGroup, false if
319   // someone else has already taken responsibility for that.
320   bool CompleteParallelMemTableWriter(Writer* w);
321 
322   // Waits for all preceding writers (unlocking mu while waiting), then
323   // registers w as the currently proceeding writer.
324   //
325   // Writer* w:              A Writer not eligible for batching
326   // InstrumentedMutex* mu:  The db mutex, to unlock while waiting
327   // REQUIRES: db mutex held
328   void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
329 
330   // Completes a Writer begun with EnterUnbatched, unblocking subsequent
331   // writers.
332   void ExitUnbatched(Writer* w);
333 
334   // Wait for all parallel memtable writers to finish, in case pipelined
335   // write is enabled.
336   void WaitForMemTableWriters();
337 
UpdateLastSequence(SequenceNumber sequence)338   SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
339     if (sequence > last_sequence_) {
340       last_sequence_ = sequence;
341     }
342     return last_sequence_;
343   }
344 
345   // Insert a dummy writer at the tail of the write queue to indicate a write
346   // stall, and fail any writers in the queue with no_slowdown set to true
347   void BeginWriteStall();
348 
349   // Remove the dummy writer and wake up waiting writers
350   void EndWriteStall();
351 
352  private:
353   // See AwaitState.
354   const uint64_t max_yield_usec_;
355   const uint64_t slow_yield_usec_;
356 
357   // Allow multiple writers write to memtable concurrently.
358   const bool allow_concurrent_memtable_write_;
359 
360   // Enable pipelined write to WAL and memtable.
361   const bool enable_pipelined_write_;
362 
363   // The maximum limit of number of bytes that are written in a single batch
364   // of WAL or memtable write. It is followed when the leader write size
365   // is larger than 1/8 of this limit.
366   const uint64_t max_write_batch_group_size_bytes;
367 
368   // Points to the newest pending writer. Only leader can remove
369   // elements, adding can be done lock-free by anybody.
370   std::atomic<Writer*> newest_writer_;
371 
372   // Points to the newest pending memtable writer. Used only when pipelined
373   // write is enabled.
374   std::atomic<Writer*> newest_memtable_writer_;
375 
376   // The last sequence that have been consumed by a writer. The sequence
377   // is not necessary visible to reads because the writer can be ongoing.
378   SequenceNumber last_sequence_;
379 
380   // A dummy writer to indicate a write stall condition. This will be inserted
381   // at the tail of the writer queue by the leader, so newer writers can just
382   // check for this and bail
383   Writer write_stall_dummy_;
384 
385   // Mutex and condvar for writers to block on a write stall. During a write
386   // stall, writers with no_slowdown set to false will wait on this rather
387   // on the writer queue
388   port::Mutex stall_mu_;
389   port::CondVar stall_cv_;
390 
391   // Waits for w->state & goal_mask using w->StateMutex().  Returns
392   // the state that satisfies goal_mask.
393   uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
394 
395   // Blocks until w->state & goal_mask, returning the state value
396   // that satisfied the predicate.  Uses ctx to adaptively use
397   // std::this_thread::yield() to avoid mutex overheads.  ctx should be
398   // a context-dependent static.
399   uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
400 
401   // Set writer state and wake the writer up if it is waiting.
402   void SetState(Writer* w, uint8_t new_state);
403 
404   // Links w into the newest_writer list. Return true if w was linked directly
405   // into the leader position.  Safe to call from multiple threads without
406   // external locking.
407   bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
408 
409   // Link write group into the newest_writer list as a whole, while keeping the
410   // order of the writers unchanged. Return true if the group was linked
411   // directly into the leader position.
412   bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
413 
414   // Computes any missing link_newer links.  Should not be called
415   // concurrently with itself.
416   void CreateMissingNewerLinks(Writer* head);
417 
418   // Starting from a pending writer, follow link_older to search for next
419   // leader, until we hit boundary.
420   Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);
421 
422   // Set the leader in write_group to completed state and remove it from the
423   // write group.
424   void CompleteLeader(WriteGroup& write_group);
425 
426   // Set a follower in write_group to completed state and remove it from the
427   // write group.
428   void CompleteFollower(Writer* w, WriteGroup& write_group);
429 };
430 
431 }  // namespace ROCKSDB_NAMESPACE
432