1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #include <assert.h> 9 #include <stdint.h> 10 #include <atomic> 11 #include <chrono> 12 #include <condition_variable> 13 #include <mutex> 14 #include <type_traits> 15 #include <vector> 16 17 #include "db/dbformat.h" 18 #include "db/pre_release_callback.h" 19 #include "db/write_callback.h" 20 #include "monitoring/instrumented_mutex.h" 21 #include "rocksdb/options.h" 22 #include "rocksdb/status.h" 23 #include "rocksdb/types.h" 24 #include "rocksdb/write_batch.h" 25 #include "util/autovector.h" 26 27 namespace ROCKSDB_NAMESPACE { 28 29 class WriteThread { 30 public: 31 enum State : uint8_t { 32 // The initial state of a writer. This is a Writer that is 33 // waiting in JoinBatchGroup. This state can be left when another 34 // thread informs the waiter that it has become a group leader 35 // (-> STATE_GROUP_LEADER), when a leader that has chosen to be 36 // non-parallel informs a follower that its writes have been committed 37 // (-> STATE_COMPLETED), or when a leader that has chosen to perform 38 // updates in parallel and needs this Writer to apply its batch (-> 39 // STATE_PARALLEL_FOLLOWER). 40 STATE_INIT = 1, 41 42 // The state used to inform a waiting Writer that it has become the 43 // leader, and it should now build a write batch group. Tricky: 44 // this state is not used if newest_writer_ is empty when a writer 45 // enqueues itself, because there is no need to wait (or even to 46 // create the mutex and condvar used to wait) in that case. This is 47 // a terminal state unless the leader chooses to make this a parallel 48 // batch, in which case the last parallel worker to finish will move 49 // the leader to STATE_COMPLETED. 50 STATE_GROUP_LEADER = 2, 51 52 // The state used to inform a waiting writer that it has become the 53 // leader of memtable writer group. The leader will either write 54 // memtable for the whole group, or launch a parallel group write 55 // to memtable by calling LaunchParallelMemTableWrite. 56 STATE_MEMTABLE_WRITER_LEADER = 4, 57 58 // The state used to inform a waiting writer that it has become a 59 // parallel memtable writer. It can be the group leader who launch the 60 // parallel writer group, or one of the followers. The writer should then 61 // apply its batch to the memtable concurrently and call 62 // CompleteParallelMemTableWriter. 63 STATE_PARALLEL_MEMTABLE_WRITER = 8, 64 65 // A follower whose writes have been applied, or a parallel leader 66 // whose followers have all finished their work. This is a terminal 67 // state. 68 STATE_COMPLETED = 16, 69 70 // A state indicating that the thread may be waiting using StateMutex() 71 // and StateCondVar() 72 STATE_LOCKED_WAITING = 32, 73 }; 74 75 struct Writer; 76 77 struct WriteGroup { 78 Writer* leader = nullptr; 79 Writer* last_writer = nullptr; 80 SequenceNumber last_sequence; 81 // before running goes to zero, status needs leader->StateMutex() 82 Status status; 83 std::atomic<size_t> running; 84 size_t size = 0; 85 86 struct Iterator { 87 Writer* writer; 88 Writer* last_writer; 89 IteratorWriteGroup::Iterator90 explicit Iterator(Writer* w, Writer* last) 91 : writer(w), last_writer(last) {} 92 93 Writer* operator*() const { return writer; } 94 95 Iterator& operator++() { 96 assert(writer != nullptr); 97 if (writer == last_writer) { 98 writer = nullptr; 99 } else { 100 writer = writer->link_newer; 101 } 102 return *this; 103 } 104 105 bool operator!=(const Iterator& other) const { 106 return writer != other.writer; 107 } 108 }; 109 beginWriteGroup110 Iterator begin() const { return Iterator(leader, last_writer); } endWriteGroup111 Iterator end() const { return Iterator(nullptr, nullptr); } 112 }; 113 114 // Information kept for every waiting writer. 115 struct Writer { 116 WriteBatch* batch; 117 bool sync; 118 bool no_slowdown; 119 bool disable_wal; 120 bool disable_memtable; 121 size_t batch_cnt; // if non-zero, number of sub-batches in the write batch 122 PreReleaseCallback* pre_release_callback; 123 uint64_t log_used; // log number that this batch was inserted into 124 uint64_t log_ref; // log number that memtable insert should reference 125 WriteCallback* callback; 126 bool made_waitable; // records lazy construction of mutex and cv 127 std::atomic<uint8_t> state; // write under StateMutex() or pre-link 128 WriteGroup* write_group; 129 SequenceNumber sequence; // the sequence number to use for the first key 130 Status status; 131 Status callback_status; // status returned by callback->Callback() 132 133 std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; 134 std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; 135 Writer* link_older; // read/write only before linking, or as leader 136 Writer* link_newer; // lazy, read/write only before linking, or as leader 137 WriterWriter138 Writer() 139 : batch(nullptr), 140 sync(false), 141 no_slowdown(false), 142 disable_wal(false), 143 disable_memtable(false), 144 batch_cnt(0), 145 pre_release_callback(nullptr), 146 log_used(0), 147 log_ref(0), 148 callback(nullptr), 149 made_waitable(false), 150 state(STATE_INIT), 151 write_group(nullptr), 152 sequence(kMaxSequenceNumber), 153 link_older(nullptr), 154 link_newer(nullptr) {} 155 156 Writer(const WriteOptions& write_options, WriteBatch* _batch, 157 WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, 158 size_t _batch_cnt = 0, 159 PreReleaseCallback* _pre_release_callback = nullptr) batchWriter160 : batch(_batch), 161 sync(write_options.sync), 162 no_slowdown(write_options.no_slowdown), 163 disable_wal(write_options.disableWAL), 164 disable_memtable(_disable_memtable), 165 batch_cnt(_batch_cnt), 166 pre_release_callback(_pre_release_callback), 167 log_used(0), 168 log_ref(_log_ref), 169 callback(_callback), 170 made_waitable(false), 171 state(STATE_INIT), 172 write_group(nullptr), 173 sequence(kMaxSequenceNumber), 174 link_older(nullptr), 175 link_newer(nullptr) {} 176 ~WriterWriter177 ~Writer() { 178 if (made_waitable) { 179 StateMutex().~mutex(); 180 StateCV().~condition_variable(); 181 } 182 } 183 CheckCallbackWriter184 bool CheckCallback(DB* db) { 185 if (callback != nullptr) { 186 callback_status = callback->Callback(db); 187 } 188 return callback_status.ok(); 189 } 190 CreateMutexWriter191 void CreateMutex() { 192 if (!made_waitable) { 193 // Note that made_waitable is tracked separately from state 194 // transitions, because we can't atomically create the mutex and 195 // link into the list. 196 made_waitable = true; 197 new (&state_mutex_bytes) std::mutex; 198 new (&state_cv_bytes) std::condition_variable; 199 } 200 } 201 202 // returns the aggregate status of this Writer FinalStatusWriter203 Status FinalStatus() { 204 if (!status.ok()) { 205 // a non-ok memtable write status takes presidence 206 assert(callback == nullptr || callback_status.ok()); 207 return status; 208 } else if (!callback_status.ok()) { 209 // if the callback failed then that is the status we want 210 // because a memtable insert should not have been attempted 211 assert(callback != nullptr); 212 assert(status.ok()); 213 return callback_status; 214 } else { 215 // if there is no callback then we only care about 216 // the memtable insert status 217 assert(callback == nullptr || callback_status.ok()); 218 return status; 219 } 220 } 221 CallbackFailedWriter222 bool CallbackFailed() { 223 return (callback != nullptr) && !callback_status.ok(); 224 } 225 ShouldWriteToMemtableWriter226 bool ShouldWriteToMemtable() { 227 return status.ok() && !CallbackFailed() && !disable_memtable; 228 } 229 ShouldWriteToWALWriter230 bool ShouldWriteToWAL() { 231 return status.ok() && !CallbackFailed() && !disable_wal; 232 } 233 234 // No other mutexes may be acquired while holding StateMutex(), it is 235 // always last in the order StateMutexWriter236 std::mutex& StateMutex() { 237 assert(made_waitable); 238 return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes)); 239 } 240 StateCVWriter241 std::condition_variable& StateCV() { 242 assert(made_waitable); 243 return *static_cast<std::condition_variable*>( 244 static_cast<void*>(&state_cv_bytes)); 245 } 246 }; 247 248 struct AdaptationContext { 249 const char* name; 250 std::atomic<int32_t> value; 251 AdaptationContextAdaptationContext252 explicit AdaptationContext(const char* name0) : name(name0), value(0) {} 253 }; 254 255 explicit WriteThread(const ImmutableDBOptions& db_options); 256 257 virtual ~WriteThread() = default; 258 259 // IMPORTANT: None of the methods in this class rely on the db mutex 260 // for correctness. All of the methods except JoinBatchGroup and 261 // EnterUnbatched may be called either with or without the db mutex held. 262 // Correctness is maintained by ensuring that only a single thread is 263 // a leader at a time. 264 265 // Registers w as ready to become part of a batch group, waits until the 266 // caller should perform some work, and returns the current state of the 267 // writer. If w has become the leader of a write batch group, returns 268 // STATE_GROUP_LEADER. If w has been made part of a sequential batch 269 // group and the leader has performed the write, returns STATE_DONE. 270 // If w has been made part of a parallel batch group and is responsible 271 // for updating the memtable, returns STATE_PARALLEL_FOLLOWER. 272 // 273 // The db mutex SHOULD NOT be held when calling this function, because 274 // it will block. 275 // 276 // Writer* w: Writer to be executed as part of a batch group 277 void JoinBatchGroup(Writer* w); 278 279 // Constructs a write batch group led by leader, which should be a 280 // Writer passed to JoinBatchGroup on the current thread. 281 // 282 // Writer* leader: Writer that is STATE_GROUP_LEADER 283 // WriteGroup* write_group: Out-param of group members 284 // returns: Total batch group byte size 285 size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group); 286 287 // Unlinks the Writer-s in a batch group, wakes up the non-leaders, 288 // and wakes up the next leader (if any). 289 // 290 // WriteGroup* write_group: the write group 291 // Status status: Status of write operation 292 void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status); 293 294 // Exit batch group on behalf of batch group leader. 295 void ExitAsBatchGroupFollower(Writer* w); 296 297 // Constructs a write batch group led by leader from newest_memtable_writers_ 298 // list. The leader should either write memtable for the whole group and 299 // call ExitAsMemTableWriter, or launch parallel memtable write through 300 // LaunchParallelMemTableWriters. 301 void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup); 302 303 // Memtable writer group leader, or the last finished writer in a parallel 304 // write group, exit from the newest_memtable_writers_ list, and wake up 305 // the next leader if needed. 306 void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group); 307 308 // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the 309 // non-leader members of this write batch group. Sets Writer::sequence 310 // before waking them up. 311 // 312 // WriteGroup* write_group: Extra state used to coordinate the parallel add 313 void LaunchParallelMemTableWriters(WriteGroup* write_group); 314 315 // Reports the completion of w's batch to the parallel group leader, and 316 // waits for the rest of the parallel batch to complete. Returns true 317 // if this thread is the last to complete, and hence should advance 318 // the sequence number and then call EarlyExitParallelGroup, false if 319 // someone else has already taken responsibility for that. 320 bool CompleteParallelMemTableWriter(Writer* w); 321 322 // Waits for all preceding writers (unlocking mu while waiting), then 323 // registers w as the currently proceeding writer. 324 // 325 // Writer* w: A Writer not eligible for batching 326 // InstrumentedMutex* mu: The db mutex, to unlock while waiting 327 // REQUIRES: db mutex held 328 void EnterUnbatched(Writer* w, InstrumentedMutex* mu); 329 330 // Completes a Writer begun with EnterUnbatched, unblocking subsequent 331 // writers. 332 void ExitUnbatched(Writer* w); 333 334 // Wait for all parallel memtable writers to finish, in case pipelined 335 // write is enabled. 336 void WaitForMemTableWriters(); 337 UpdateLastSequence(SequenceNumber sequence)338 SequenceNumber UpdateLastSequence(SequenceNumber sequence) { 339 if (sequence > last_sequence_) { 340 last_sequence_ = sequence; 341 } 342 return last_sequence_; 343 } 344 345 // Insert a dummy writer at the tail of the write queue to indicate a write 346 // stall, and fail any writers in the queue with no_slowdown set to true 347 void BeginWriteStall(); 348 349 // Remove the dummy writer and wake up waiting writers 350 void EndWriteStall(); 351 352 private: 353 // See AwaitState. 354 const uint64_t max_yield_usec_; 355 const uint64_t slow_yield_usec_; 356 357 // Allow multiple writers write to memtable concurrently. 358 const bool allow_concurrent_memtable_write_; 359 360 // Enable pipelined write to WAL and memtable. 361 const bool enable_pipelined_write_; 362 363 // The maximum limit of number of bytes that are written in a single batch 364 // of WAL or memtable write. It is followed when the leader write size 365 // is larger than 1/8 of this limit. 366 const uint64_t max_write_batch_group_size_bytes; 367 368 // Points to the newest pending writer. Only leader can remove 369 // elements, adding can be done lock-free by anybody. 370 std::atomic<Writer*> newest_writer_; 371 372 // Points to the newest pending memtable writer. Used only when pipelined 373 // write is enabled. 374 std::atomic<Writer*> newest_memtable_writer_; 375 376 // The last sequence that have been consumed by a writer. The sequence 377 // is not necessary visible to reads because the writer can be ongoing. 378 SequenceNumber last_sequence_; 379 380 // A dummy writer to indicate a write stall condition. This will be inserted 381 // at the tail of the writer queue by the leader, so newer writers can just 382 // check for this and bail 383 Writer write_stall_dummy_; 384 385 // Mutex and condvar for writers to block on a write stall. During a write 386 // stall, writers with no_slowdown set to false will wait on this rather 387 // on the writer queue 388 port::Mutex stall_mu_; 389 port::CondVar stall_cv_; 390 391 // Waits for w->state & goal_mask using w->StateMutex(). Returns 392 // the state that satisfies goal_mask. 393 uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); 394 395 // Blocks until w->state & goal_mask, returning the state value 396 // that satisfied the predicate. Uses ctx to adaptively use 397 // std::this_thread::yield() to avoid mutex overheads. ctx should be 398 // a context-dependent static. 399 uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); 400 401 // Set writer state and wake the writer up if it is waiting. 402 void SetState(Writer* w, uint8_t new_state); 403 404 // Links w into the newest_writer list. Return true if w was linked directly 405 // into the leader position. Safe to call from multiple threads without 406 // external locking. 407 bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer); 408 409 // Link write group into the newest_writer list as a whole, while keeping the 410 // order of the writers unchanged. Return true if the group was linked 411 // directly into the leader position. 412 bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer); 413 414 // Computes any missing link_newer links. Should not be called 415 // concurrently with itself. 416 void CreateMissingNewerLinks(Writer* head); 417 418 // Starting from a pending writer, follow link_older to search for next 419 // leader, until we hit boundary. 420 Writer* FindNextLeader(Writer* pending_writer, Writer* boundary); 421 422 // Set the leader in write_group to completed state and remove it from the 423 // write group. 424 void CompleteLeader(WriteGroup& write_group); 425 426 // Set a follower in write_group to completed state and remove it from the 427 // write group. 428 void CompleteFollower(Writer* w, WriteGroup& write_group); 429 }; 430 431 } // namespace ROCKSDB_NAMESPACE 432