1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <cinttypes>
12 #include "db/error_handler.h"
13 #include "db/event_helpers.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "options/options_helper.h"
16 #include "test_util/sync_point.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 // Convenience methods
Put(const WriteOptions & o,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)20 Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
21                    const Slice& key, const Slice& val) {
22   return DB::Put(o, column_family, key, val);
23 }
24 
Merge(const WriteOptions & o,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)25 Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
26                      const Slice& key, const Slice& val) {
27   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
28   if (!cfh->cfd()->ioptions()->merge_operator) {
29     return Status::NotSupported("Provide a merge_operator when opening DB");
30   } else {
31     return DB::Merge(o, column_family, key, val);
32   }
33 }
34 
Delete(const WriteOptions & write_options,ColumnFamilyHandle * column_family,const Slice & key)35 Status DBImpl::Delete(const WriteOptions& write_options,
36                       ColumnFamilyHandle* column_family, const Slice& key) {
37   return DB::Delete(write_options, column_family, key);
38 }
39 
SingleDelete(const WriteOptions & write_options,ColumnFamilyHandle * column_family,const Slice & key)40 Status DBImpl::SingleDelete(const WriteOptions& write_options,
41                             ColumnFamilyHandle* column_family,
42                             const Slice& key) {
43   return DB::SingleDelete(write_options, column_family, key);
44 }
45 
SetRecoverableStatePreReleaseCallback(PreReleaseCallback * callback)46 void DBImpl::SetRecoverableStatePreReleaseCallback(
47     PreReleaseCallback* callback) {
48   recoverable_state_pre_release_callback_.reset(callback);
49 }
50 
Write(const WriteOptions & write_options,WriteBatch * my_batch)51 Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
52   return WriteImpl(write_options, my_batch, nullptr, nullptr);
53 }
54 
55 #ifndef ROCKSDB_LITE
WriteWithCallback(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback)56 Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
57                                  WriteBatch* my_batch,
58                                  WriteCallback* callback) {
59   return WriteImpl(write_options, my_batch, callback, nullptr);
60 }
61 #endif  // ROCKSDB_LITE
62 
63 // The main write queue. This is the only write queue that updates LastSequence.
64 // When using one write queue, the same sequence also indicates the last
65 // published sequence.
WriteImpl(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t * log_used,uint64_t log_ref,bool disable_memtable,uint64_t * seq_used,size_t batch_cnt,PreReleaseCallback * pre_release_callback)66 Status DBImpl::WriteImpl(const WriteOptions& write_options,
67                          WriteBatch* my_batch, WriteCallback* callback,
68                          uint64_t* log_used, uint64_t log_ref,
69                          bool disable_memtable, uint64_t* seq_used,
70                          size_t batch_cnt,
71                          PreReleaseCallback* pre_release_callback) {
72   assert(!seq_per_batch_ || batch_cnt != 0);
73   if (my_batch == nullptr) {
74     return Status::Corruption("Batch is nullptr!");
75   }
76   if (tracer_) {
77     InstrumentedMutexLock lock(&trace_mutex_);
78     if (tracer_) {
79       tracer_->Write(my_batch);
80     }
81   }
82   if (write_options.sync && write_options.disableWAL) {
83     return Status::InvalidArgument("Sync writes has to enable WAL.");
84   }
85   if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
86     return Status::NotSupported(
87         "pipelined_writes is not compatible with concurrent prepares");
88   }
89   if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
90     // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
91     return Status::NotSupported(
92         "pipelined_writes is not compatible with seq_per_batch");
93   }
94   if (immutable_db_options_.unordered_write &&
95       immutable_db_options_.enable_pipelined_write) {
96     return Status::NotSupported(
97         "pipelined_writes is not compatible with unordered_write");
98   }
99   // Otherwise IsLatestPersistentState optimization does not make sense
100   assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
101          disable_memtable);
102 
103   Status status;
104   IOStatus io_s;
105   if (write_options.low_pri) {
106     status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
107     if (!status.ok()) {
108       return status;
109     }
110   }
111 
112   if (two_write_queues_ && disable_memtable) {
113     AssignOrder assign_order =
114         seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
115     // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
116     // they don't consume sequence.
117     return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch,
118                             callback, log_used, log_ref, seq_used, batch_cnt,
119                             pre_release_callback, assign_order,
120                             kDontPublishLastSeq, disable_memtable);
121   }
122 
123   if (immutable_db_options_.unordered_write) {
124     const size_t sub_batch_cnt = batch_cnt != 0
125                                      ? batch_cnt
126                                      // every key is a sub-batch consuming a seq
127                                      : WriteBatchInternal::Count(my_batch);
128     uint64_t seq;
129     // Use a write thread to i) optimize for WAL write, ii) publish last
130     // sequence in in increasing order, iii) call pre_release_callback serially
131     status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
132                               log_used, log_ref, &seq, sub_batch_cnt,
133                               pre_release_callback, kDoAssignOrder,
134                               kDoPublishLastSeq, disable_memtable);
135     TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
136     if (!status.ok()) {
137       return status;
138     }
139     if (seq_used) {
140       *seq_used = seq;
141     }
142     if (!disable_memtable) {
143       TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
144       status = UnorderedWriteMemtable(write_options, my_batch, callback,
145                                       log_ref, seq, sub_batch_cnt);
146     }
147     return status;
148   }
149 
150   if (immutable_db_options_.enable_pipelined_write) {
151     return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
152                               log_ref, disable_memtable, seq_used);
153   }
154 
155   PERF_TIMER_GUARD(write_pre_and_post_process_time);
156   WriteThread::Writer w(write_options, my_batch, callback, log_ref,
157                         disable_memtable, batch_cnt, pre_release_callback);
158 
159   if (!write_options.disableWAL) {
160     RecordTick(stats_, WRITE_WITH_WAL);
161   }
162 
163   StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
164 
165   write_thread_.JoinBatchGroup(&w);
166   if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
167     // we are a non-leader in a parallel group
168 
169     if (w.ShouldWriteToMemtable()) {
170       PERF_TIMER_STOP(write_pre_and_post_process_time);
171       PERF_TIMER_GUARD(write_memtable_time);
172 
173       ColumnFamilyMemTablesImpl column_family_memtables(
174           versions_->GetColumnFamilySet());
175       w.status = WriteBatchInternal::InsertInto(
176           &w, w.sequence, &column_family_memtables, &flush_scheduler_,
177           &trim_history_scheduler_,
178           write_options.ignore_missing_column_families, 0 /*log_number*/, this,
179           true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
180           batch_per_txn_, write_options.memtable_insert_hint_per_batch);
181 
182       PERF_TIMER_START(write_pre_and_post_process_time);
183     }
184 
185     if (write_thread_.CompleteParallelMemTableWriter(&w)) {
186       // we're responsible for exit batch group
187       // TODO(myabandeh): propagate status to write_group
188       auto last_sequence = w.write_group->last_sequence;
189       versions_->SetLastSequence(last_sequence);
190       MemTableInsertStatusCheck(w.status);
191       write_thread_.ExitAsBatchGroupFollower(&w);
192     }
193     assert(w.state == WriteThread::STATE_COMPLETED);
194     // STATE_COMPLETED conditional below handles exit
195 
196     status = w.FinalStatus();
197   }
198   if (w.state == WriteThread::STATE_COMPLETED) {
199     if (log_used != nullptr) {
200       *log_used = w.log_used;
201     }
202     if (seq_used != nullptr) {
203       *seq_used = w.sequence;
204     }
205     // write is complete and leader has updated sequence
206     return w.FinalStatus();
207   }
208   // else we are the leader of the write batch group
209   assert(w.state == WriteThread::STATE_GROUP_LEADER);
210 
211   // Once reaches this point, the current writer "w" will try to do its write
212   // job.  It may also pick up some of the remaining writers in the "writers_"
213   // when it finds suitable, and finish them in the same write batch.
214   // This is how a write job could be done by the other writer.
215   WriteContext write_context;
216   WriteThread::WriteGroup write_group;
217   bool in_parallel_group = false;
218   uint64_t last_sequence = kMaxSequenceNumber;
219 
220   mutex_.Lock();
221 
222   bool need_log_sync = write_options.sync;
223   bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
224   if (!two_write_queues_ || !disable_memtable) {
225     // With concurrent writes we do preprocess only in the write thread that
226     // also does write to memtable to avoid sync issue on shared data structure
227     // with the other thread
228 
229     // PreprocessWrite does its own perf timing.
230     PERF_TIMER_STOP(write_pre_and_post_process_time);
231 
232     status = PreprocessWrite(write_options, &need_log_sync, &write_context);
233     if (!two_write_queues_) {
234       // Assign it after ::PreprocessWrite since the sequence might advance
235       // inside it by WriteRecoverableState
236       last_sequence = versions_->LastSequence();
237     }
238 
239     PERF_TIMER_START(write_pre_and_post_process_time);
240   }
241   log::Writer* log_writer = logs_.back().writer;
242 
243   mutex_.Unlock();
244 
245   // Add to log and apply to memtable.  We can release the lock
246   // during this phase since &w is currently responsible for logging
247   // and protects against concurrent loggers and concurrent writes
248   // into memtables
249 
250   TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
251   last_batch_group_size_ =
252       write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
253 
254   if (status.ok()) {
255     // Rules for when we can update the memtable concurrently
256     // 1. supported by memtable
257     // 2. Puts are not okay if inplace_update_support
258     // 3. Merges are not okay
259     //
260     // Rules 1..2 are enforced by checking the options
261     // during startup (CheckConcurrentWritesSupported), so if
262     // options.allow_concurrent_memtable_write is true then they can be
263     // assumed to be true.  Rule 3 is checked for each batch.  We could
264     // relax rules 2 if we could prevent write batches from referring
265     // more than once to a particular key.
266     bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
267                     write_group.size > 1;
268     size_t total_count = 0;
269     size_t valid_batches = 0;
270     size_t total_byte_size = 0;
271     size_t pre_release_callback_cnt = 0;
272     for (auto* writer : write_group) {
273       if (writer->CheckCallback(this)) {
274         valid_batches += writer->batch_cnt;
275         if (writer->ShouldWriteToMemtable()) {
276           total_count += WriteBatchInternal::Count(writer->batch);
277           parallel = parallel && !writer->batch->HasMerge();
278         }
279         total_byte_size = WriteBatchInternal::AppendedByteSize(
280             total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
281         if (writer->pre_release_callback) {
282           pre_release_callback_cnt++;
283         }
284       }
285     }
286     // Note about seq_per_batch_: either disableWAL is set for the entire write
287     // group or not. In either case we inc seq for each write batch with no
288     // failed callback. This means that there could be a batch with
289     // disalbe_memtable in between; although we do not write this batch to
290     // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
291     // the seq per valid written key to mem.
292     size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
293 
294     const bool concurrent_update = two_write_queues_;
295     // Update stats while we are an exclusive group leader, so we know
296     // that nobody else can be writing to these particular stats.
297     // We're optimistic, updating the stats before we successfully
298     // commit.  That lets us release our leader status early.
299     auto stats = default_cf_internal_stats_;
300     stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count,
301                       concurrent_update);
302     RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
303     stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
304                       concurrent_update);
305     RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
306     stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
307                       concurrent_update);
308     RecordTick(stats_, WRITE_DONE_BY_SELF);
309     auto write_done_by_other = write_group.size - 1;
310     if (write_done_by_other > 0) {
311       stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
312                         write_done_by_other, concurrent_update);
313       RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
314     }
315     RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
316 
317     if (write_options.disableWAL) {
318       has_unpersisted_data_.store(true, std::memory_order_relaxed);
319     }
320 
321     PERF_TIMER_STOP(write_pre_and_post_process_time);
322 
323     if (!two_write_queues_) {
324       if (status.ok() && !write_options.disableWAL) {
325         PERF_TIMER_GUARD(write_wal_time);
326         io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
327                           need_log_dir_sync, last_sequence + 1);
328       }
329     } else {
330       if (status.ok() && !write_options.disableWAL) {
331         PERF_TIMER_GUARD(write_wal_time);
332         // LastAllocatedSequence is increased inside WriteToWAL under
333         // wal_write_mutex_ to ensure ordered events in WAL
334         io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
335                                     seq_inc);
336       } else {
337         // Otherwise we inc seq number for memtable writes
338         last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
339       }
340     }
341     status = io_s;
342     assert(last_sequence != kMaxSequenceNumber);
343     const SequenceNumber current_sequence = last_sequence + 1;
344     last_sequence += seq_inc;
345 
346     // PreReleaseCallback is called after WAL write and before memtable write
347     if (status.ok()) {
348       SequenceNumber next_sequence = current_sequence;
349       size_t index = 0;
350       // Note: the logic for advancing seq here must be consistent with the
351       // logic in WriteBatchInternal::InsertInto(write_group...) as well as
352       // with WriteBatchInternal::InsertInto(write_batch...) that is called on
353       // the merged batch during recovery from the WAL.
354       for (auto* writer : write_group) {
355         if (writer->CallbackFailed()) {
356           continue;
357         }
358         writer->sequence = next_sequence;
359         if (writer->pre_release_callback) {
360           Status ws = writer->pre_release_callback->Callback(
361               writer->sequence, disable_memtable, writer->log_used, index++,
362               pre_release_callback_cnt);
363           if (!ws.ok()) {
364             status = ws;
365             break;
366           }
367         }
368         if (seq_per_batch_) {
369           assert(writer->batch_cnt);
370           next_sequence += writer->batch_cnt;
371         } else if (writer->ShouldWriteToMemtable()) {
372           next_sequence += WriteBatchInternal::Count(writer->batch);
373         }
374       }
375     }
376 
377     if (status.ok()) {
378       PERF_TIMER_GUARD(write_memtable_time);
379 
380       if (!parallel) {
381         // w.sequence will be set inside InsertInto
382         w.status = WriteBatchInternal::InsertInto(
383             write_group, current_sequence, column_family_memtables_.get(),
384             &flush_scheduler_, &trim_history_scheduler_,
385             write_options.ignore_missing_column_families,
386             0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
387             batch_per_txn_);
388       } else {
389         write_group.last_sequence = last_sequence;
390         write_thread_.LaunchParallelMemTableWriters(&write_group);
391         in_parallel_group = true;
392 
393         // Each parallel follower is doing each own writes. The leader should
394         // also do its own.
395         if (w.ShouldWriteToMemtable()) {
396           ColumnFamilyMemTablesImpl column_family_memtables(
397               versions_->GetColumnFamilySet());
398           assert(w.sequence == current_sequence);
399           w.status = WriteBatchInternal::InsertInto(
400               &w, w.sequence, &column_family_memtables, &flush_scheduler_,
401               &trim_history_scheduler_,
402               write_options.ignore_missing_column_families, 0 /*log_number*/,
403               this, true /*concurrent_memtable_writes*/, seq_per_batch_,
404               w.batch_cnt, batch_per_txn_,
405               write_options.memtable_insert_hint_per_batch);
406         }
407       }
408       if (seq_used != nullptr) {
409         *seq_used = w.sequence;
410       }
411     }
412   }
413   PERF_TIMER_START(write_pre_and_post_process_time);
414 
415   if (!w.CallbackFailed()) {
416     if (!io_s.ok()) {
417       IOStatusCheck(io_s);
418     } else {
419       WriteStatusCheck(status);
420     }
421   }
422 
423   if (need_log_sync) {
424     mutex_.Lock();
425     MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
426     mutex_.Unlock();
427     // Requesting sync with two_write_queues_ is expected to be very rare. We
428     // hence provide a simple implementation that is not necessarily efficient.
429     if (two_write_queues_) {
430       if (manual_wal_flush_) {
431         status = FlushWAL(true);
432       } else {
433         status = SyncWAL();
434       }
435     }
436   }
437 
438   bool should_exit_batch_group = true;
439   if (in_parallel_group) {
440     // CompleteParallelWorker returns true if this thread should
441     // handle exit, false means somebody else did
442     should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
443   }
444   if (should_exit_batch_group) {
445     if (status.ok()) {
446       // Note: if we are to resume after non-OK statuses we need to revisit how
447       // we reacts to non-OK statuses here.
448       versions_->SetLastSequence(last_sequence);
449     }
450     MemTableInsertStatusCheck(w.status);
451     write_thread_.ExitAsBatchGroupLeader(write_group, status);
452   }
453 
454   if (status.ok()) {
455     status = w.FinalStatus();
456   }
457   return status;
458 }
459 
PipelinedWriteImpl(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t * log_used,uint64_t log_ref,bool disable_memtable,uint64_t * seq_used)460 Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
461                                   WriteBatch* my_batch, WriteCallback* callback,
462                                   uint64_t* log_used, uint64_t log_ref,
463                                   bool disable_memtable, uint64_t* seq_used) {
464   PERF_TIMER_GUARD(write_pre_and_post_process_time);
465   StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
466 
467   WriteContext write_context;
468 
469   WriteThread::Writer w(write_options, my_batch, callback, log_ref,
470                         disable_memtable);
471   write_thread_.JoinBatchGroup(&w);
472   if (w.state == WriteThread::STATE_GROUP_LEADER) {
473     WriteThread::WriteGroup wal_write_group;
474     if (w.callback && !w.callback->AllowWriteBatching()) {
475       write_thread_.WaitForMemTableWriters();
476     }
477     mutex_.Lock();
478     bool need_log_sync = !write_options.disableWAL && write_options.sync;
479     bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
480     // PreprocessWrite does its own perf timing.
481     PERF_TIMER_STOP(write_pre_and_post_process_time);
482     w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
483     PERF_TIMER_START(write_pre_and_post_process_time);
484     log::Writer* log_writer = logs_.back().writer;
485     mutex_.Unlock();
486 
487     // This can set non-OK status if callback fail.
488     last_batch_group_size_ =
489         write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
490     const SequenceNumber current_sequence =
491         write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
492     size_t total_count = 0;
493     size_t total_byte_size = 0;
494 
495     if (w.status.ok()) {
496       SequenceNumber next_sequence = current_sequence;
497       for (auto writer : wal_write_group) {
498         if (writer->CheckCallback(this)) {
499           if (writer->ShouldWriteToMemtable()) {
500             writer->sequence = next_sequence;
501             size_t count = WriteBatchInternal::Count(writer->batch);
502             next_sequence += count;
503             total_count += count;
504           }
505           total_byte_size = WriteBatchInternal::AppendedByteSize(
506               total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
507         }
508       }
509       if (w.disable_wal) {
510         has_unpersisted_data_.store(true, std::memory_order_relaxed);
511       }
512       write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
513     }
514 
515     auto stats = default_cf_internal_stats_;
516     stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
517     RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
518     stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
519     RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
520     RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
521 
522     PERF_TIMER_STOP(write_pre_and_post_process_time);
523 
524     IOStatus io_s;
525     if (w.status.ok() && !write_options.disableWAL) {
526       PERF_TIMER_GUARD(write_wal_time);
527       stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
528       RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
529       if (wal_write_group.size > 1) {
530         stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
531                           wal_write_group.size - 1);
532         RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
533       }
534       io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
535                         need_log_dir_sync, current_sequence);
536       w.status = io_s;
537     }
538 
539     if (!w.CallbackFailed()) {
540       if (!io_s.ok()) {
541         IOStatusCheck(io_s);
542       } else {
543         WriteStatusCheck(w.status);
544       }
545     }
546 
547     if (need_log_sync) {
548       mutex_.Lock();
549       MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
550       mutex_.Unlock();
551     }
552 
553     write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
554   }
555 
556   WriteThread::WriteGroup memtable_write_group;
557   if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
558     PERF_TIMER_GUARD(write_memtable_time);
559     assert(w.ShouldWriteToMemtable());
560     write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
561     if (memtable_write_group.size > 1 &&
562         immutable_db_options_.allow_concurrent_memtable_write) {
563       write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
564     } else {
565       memtable_write_group.status = WriteBatchInternal::InsertInto(
566           memtable_write_group, w.sequence, column_family_memtables_.get(),
567           &flush_scheduler_, &trim_history_scheduler_,
568           write_options.ignore_missing_column_families, 0 /*log_number*/, this,
569           false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
570       versions_->SetLastSequence(memtable_write_group.last_sequence);
571       write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
572     }
573   }
574 
575   if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
576     assert(w.ShouldWriteToMemtable());
577     ColumnFamilyMemTablesImpl column_family_memtables(
578         versions_->GetColumnFamilySet());
579     w.status = WriteBatchInternal::InsertInto(
580         &w, w.sequence, &column_family_memtables, &flush_scheduler_,
581         &trim_history_scheduler_, write_options.ignore_missing_column_families,
582         0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
583         false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
584         write_options.memtable_insert_hint_per_batch);
585     if (write_thread_.CompleteParallelMemTableWriter(&w)) {
586       MemTableInsertStatusCheck(w.status);
587       versions_->SetLastSequence(w.write_group->last_sequence);
588       write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
589     }
590   }
591   if (seq_used != nullptr) {
592     *seq_used = w.sequence;
593   }
594 
595   assert(w.state == WriteThread::STATE_COMPLETED);
596   return w.FinalStatus();
597 }
598 
UnorderedWriteMemtable(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t log_ref,SequenceNumber seq,const size_t sub_batch_cnt)599 Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
600                                       WriteBatch* my_batch,
601                                       WriteCallback* callback, uint64_t log_ref,
602                                       SequenceNumber seq,
603                                       const size_t sub_batch_cnt) {
604   PERF_TIMER_GUARD(write_pre_and_post_process_time);
605   StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
606 
607   WriteThread::Writer w(write_options, my_batch, callback, log_ref,
608                         false /*disable_memtable*/);
609 
610   if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
611     w.sequence = seq;
612     size_t total_count = WriteBatchInternal::Count(my_batch);
613     InternalStats* stats = default_cf_internal_stats_;
614     stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
615     RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
616 
617     ColumnFamilyMemTablesImpl column_family_memtables(
618         versions_->GetColumnFamilySet());
619     w.status = WriteBatchInternal::InsertInto(
620         &w, w.sequence, &column_family_memtables, &flush_scheduler_,
621         &trim_history_scheduler_, write_options.ignore_missing_column_families,
622         0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
623         seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
624         write_options.memtable_insert_hint_per_batch);
625 
626     WriteStatusCheck(w.status);
627     if (write_options.disableWAL) {
628       has_unpersisted_data_.store(true, std::memory_order_relaxed);
629     }
630   }
631 
632   size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
633   if (pending_cnt == 0) {
634     // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
635     // before notify ensures that cv is in waiting state when it is notified
636     // thus not missing the update to pending_memtable_writes_ even though it is
637     // not modified under the mutex.
638     std::lock_guard<std::mutex> lck(switch_mutex_);
639     switch_cv_.notify_all();
640   }
641 
642   if (!w.FinalStatus().ok()) {
643     return w.FinalStatus();
644   }
645   return Status::OK();
646 }
647 
648 // The 2nd write queue. If enabled it will be used only for WAL-only writes.
649 // This is the only queue that updates LastPublishedSequence which is only
650 // applicable in a two-queue setting.
WriteImplWALOnly(WriteThread * write_thread,const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t * log_used,const uint64_t log_ref,uint64_t * seq_used,const size_t sub_batch_cnt,PreReleaseCallback * pre_release_callback,const AssignOrder assign_order,const PublishLastSeq publish_last_seq,const bool disable_memtable)651 Status DBImpl::WriteImplWALOnly(
652     WriteThread* write_thread, const WriteOptions& write_options,
653     WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used,
654     const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
655     PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
656     const PublishLastSeq publish_last_seq, const bool disable_memtable) {
657   Status status;
658   PERF_TIMER_GUARD(write_pre_and_post_process_time);
659   WriteThread::Writer w(write_options, my_batch, callback, log_ref,
660                         disable_memtable, sub_batch_cnt, pre_release_callback);
661   RecordTick(stats_, WRITE_WITH_WAL);
662   StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
663 
664   write_thread->JoinBatchGroup(&w);
665   assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
666   if (w.state == WriteThread::STATE_COMPLETED) {
667     if (log_used != nullptr) {
668       *log_used = w.log_used;
669     }
670     if (seq_used != nullptr) {
671       *seq_used = w.sequence;
672     }
673     return w.FinalStatus();
674   }
675   // else we are the leader of the write batch group
676   assert(w.state == WriteThread::STATE_GROUP_LEADER);
677 
678   if (publish_last_seq == kDoPublishLastSeq) {
679     // Currently we only use kDoPublishLastSeq in unordered_write
680     assert(immutable_db_options_.unordered_write);
681     WriteContext write_context;
682     if (error_handler_.IsDBStopped()) {
683       status = error_handler_.GetBGError();
684     }
685     // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
686     // without paying the cost of obtaining the mutex.
687     if (status.ok()) {
688       InstrumentedMutexLock l(&mutex_);
689       bool need_log_sync = false;
690       status = PreprocessWrite(write_options, &need_log_sync, &write_context);
691       WriteStatusCheck(status);
692     }
693     if (!status.ok()) {
694       WriteThread::WriteGroup write_group;
695       write_thread->EnterAsBatchGroupLeader(&w, &write_group);
696       write_thread->ExitAsBatchGroupLeader(write_group, status);
697       return status;
698     }
699   }
700 
701   WriteThread::WriteGroup write_group;
702   uint64_t last_sequence;
703   write_thread->EnterAsBatchGroupLeader(&w, &write_group);
704   // Note: no need to update last_batch_group_size_ here since the batch writes
705   // to WAL only
706 
707   size_t pre_release_callback_cnt = 0;
708   size_t total_byte_size = 0;
709   for (auto* writer : write_group) {
710     if (writer->CheckCallback(this)) {
711       total_byte_size = WriteBatchInternal::AppendedByteSize(
712           total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
713       if (writer->pre_release_callback) {
714         pre_release_callback_cnt++;
715       }
716     }
717   }
718 
719   const bool concurrent_update = true;
720   // Update stats while we are an exclusive group leader, so we know
721   // that nobody else can be writing to these particular stats.
722   // We're optimistic, updating the stats before we successfully
723   // commit.  That lets us release our leader status early.
724   auto stats = default_cf_internal_stats_;
725   stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
726                     concurrent_update);
727   RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
728   stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
729                     concurrent_update);
730   RecordTick(stats_, WRITE_DONE_BY_SELF);
731   auto write_done_by_other = write_group.size - 1;
732   if (write_done_by_other > 0) {
733     stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
734                       write_done_by_other, concurrent_update);
735     RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
736   }
737   RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
738 
739   PERF_TIMER_STOP(write_pre_and_post_process_time);
740 
741   PERF_TIMER_GUARD(write_wal_time);
742   // LastAllocatedSequence is increased inside WriteToWAL under
743   // wal_write_mutex_ to ensure ordered events in WAL
744   size_t seq_inc = 0 /* total_count */;
745   if (assign_order == kDoAssignOrder) {
746     size_t total_batch_cnt = 0;
747     for (auto* writer : write_group) {
748       assert(writer->batch_cnt || !seq_per_batch_);
749       if (!writer->CallbackFailed()) {
750         total_batch_cnt += writer->batch_cnt;
751       }
752     }
753     seq_inc = total_batch_cnt;
754   }
755   IOStatus io_s;
756   if (!write_options.disableWAL) {
757     io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
758     status = io_s;
759   } else {
760     // Otherwise we inc seq number to do solely the seq allocation
761     last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
762   }
763 
764   size_t memtable_write_cnt = 0;
765   auto curr_seq = last_sequence + 1;
766   for (auto* writer : write_group) {
767     if (writer->CallbackFailed()) {
768       continue;
769     }
770     writer->sequence = curr_seq;
771     if (assign_order == kDoAssignOrder) {
772       assert(writer->batch_cnt || !seq_per_batch_);
773       curr_seq += writer->batch_cnt;
774     }
775     if (!writer->disable_memtable) {
776       memtable_write_cnt++;
777     }
778     // else seq advances only by memtable writes
779   }
780   if (status.ok() && write_options.sync) {
781     assert(!write_options.disableWAL);
782     // Requesting sync with two_write_queues_ is expected to be very rare. We
783     // hance provide a simple implementation that is not necessarily efficient.
784     if (manual_wal_flush_) {
785       status = FlushWAL(true);
786     } else {
787       status = SyncWAL();
788     }
789   }
790   PERF_TIMER_START(write_pre_and_post_process_time);
791 
792   if (!w.CallbackFailed()) {
793     if (!io_s.ok()) {
794       IOStatusCheck(io_s);
795     } else {
796       WriteStatusCheck(status);
797     }
798   }
799   if (status.ok()) {
800     size_t index = 0;
801     for (auto* writer : write_group) {
802       if (!writer->CallbackFailed() && writer->pre_release_callback) {
803         assert(writer->sequence != kMaxSequenceNumber);
804         Status ws = writer->pre_release_callback->Callback(
805             writer->sequence, disable_memtable, writer->log_used, index++,
806             pre_release_callback_cnt);
807         if (!ws.ok()) {
808           status = ws;
809           break;
810         }
811       }
812     }
813   }
814   if (publish_last_seq == kDoPublishLastSeq) {
815     versions_->SetLastSequence(last_sequence + seq_inc);
816     // Currently we only use kDoPublishLastSeq in unordered_write
817     assert(immutable_db_options_.unordered_write);
818   }
819   if (immutable_db_options_.unordered_write && status.ok()) {
820     pending_memtable_writes_ += memtable_write_cnt;
821   }
822   write_thread->ExitAsBatchGroupLeader(write_group, status);
823   if (status.ok()) {
824     status = w.FinalStatus();
825   }
826   if (seq_used != nullptr) {
827     *seq_used = w.sequence;
828   }
829   return status;
830 }
831 
WriteStatusCheck(const Status & status)832 void DBImpl::WriteStatusCheck(const Status& status) {
833   // Is setting bg_error_ enough here?  This will at least stop
834   // compaction and fail any further writes.
835   if (immutable_db_options_.paranoid_checks && !status.ok() &&
836       !status.IsBusy() && !status.IsIncomplete()) {
837     mutex_.Lock();
838     error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
839     mutex_.Unlock();
840   }
841 }
842 
IOStatusCheck(const IOStatus & io_status)843 void DBImpl::IOStatusCheck(const IOStatus& io_status) {
844   // Is setting bg_error_ enough here?  This will at least stop
845   // compaction and fail any further writes.
846   if (immutable_db_options_.paranoid_checks && !io_status.ok() &&
847       !io_status.IsBusy() && !io_status.IsIncomplete()) {
848     mutex_.Lock();
849     error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
850     mutex_.Unlock();
851   }
852 }
853 
MemTableInsertStatusCheck(const Status & status)854 void DBImpl::MemTableInsertStatusCheck(const Status& status) {
855   // A non-OK status here indicates that the state implied by the
856   // WAL has diverged from the in-memory state.  This could be
857   // because of a corrupt write_batch (very bad), or because the
858   // client specified an invalid column family and didn't specify
859   // ignore_missing_column_families.
860   if (!status.ok()) {
861     mutex_.Lock();
862     assert(!error_handler_.IsBGWorkStopped());
863     error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
864     mutex_.Unlock();
865   }
866 }
867 
PreprocessWrite(const WriteOptions & write_options,bool * need_log_sync,WriteContext * write_context)868 Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
869                                bool* need_log_sync,
870                                WriteContext* write_context) {
871   mutex_.AssertHeld();
872   assert(write_context != nullptr && need_log_sync != nullptr);
873   Status status;
874 
875   if (error_handler_.IsDBStopped()) {
876     status = error_handler_.GetBGError();
877   }
878 
879   PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
880 
881   assert(!single_column_family_mode_ ||
882          versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
883   if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
884                total_log_size_ > GetMaxTotalWalSize())) {
885     WaitForPendingWrites();
886     status = SwitchWAL(write_context);
887   }
888 
889   if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
890     // Before a new memtable is added in SwitchMemtable(),
891     // write_buffer_manager_->ShouldFlush() will keep returning true. If another
892     // thread is writing to another DB with the same write buffer, they may also
893     // be flushed. We may end up with flushing much more DBs than needed. It's
894     // suboptimal but still correct.
895     WaitForPendingWrites();
896     status = HandleWriteBufferFull(write_context);
897   }
898 
899   if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
900     status = TrimMemtableHistory(write_context);
901   }
902 
903   if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
904     WaitForPendingWrites();
905     status = ScheduleFlushes(write_context);
906   }
907 
908   PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
909   PERF_TIMER_GUARD(write_pre_and_post_process_time);
910 
911   if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
912                                write_controller_.NeedsDelay()))) {
913     PERF_TIMER_STOP(write_pre_and_post_process_time);
914     PERF_TIMER_GUARD(write_delay_time);
915     // We don't know size of curent batch so that we always use the size
916     // for previous one. It might create a fairness issue that expiration
917     // might happen for smaller writes but larger writes can go through.
918     // Can optimize it if it is an issue.
919     status = DelayWrite(last_batch_group_size_, write_options);
920     PERF_TIMER_START(write_pre_and_post_process_time);
921   }
922 
923   if (status.ok() && *need_log_sync) {
924     // Wait until the parallel syncs are finished. Any sync process has to sync
925     // the front log too so it is enough to check the status of front()
926     // We do a while loop since log_sync_cv_ is signalled when any sync is
927     // finished
928     // Note: there does not seem to be a reason to wait for parallel sync at
929     // this early step but it is not important since parallel sync (SyncWAL) and
930     // need_log_sync are usually not used together.
931     while (logs_.front().getting_synced) {
932       log_sync_cv_.Wait();
933     }
934     for (auto& log : logs_) {
935       assert(!log.getting_synced);
936       // This is just to prevent the logs to be synced by a parallel SyncWAL
937       // call. We will do the actual syncing later after we will write to the
938       // WAL.
939       // Note: there does not seem to be a reason to set this early before we
940       // actually write to the WAL
941       log.getting_synced = true;
942     }
943   } else {
944     *need_log_sync = false;
945   }
946 
947   return status;
948 }
949 
MergeBatch(const WriteThread::WriteGroup & write_group,WriteBatch * tmp_batch,size_t * write_with_wal,WriteBatch ** to_be_cached_state)950 WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
951                                WriteBatch* tmp_batch, size_t* write_with_wal,
952                                WriteBatch** to_be_cached_state) {
953   assert(write_with_wal != nullptr);
954   assert(tmp_batch != nullptr);
955   assert(*to_be_cached_state == nullptr);
956   WriteBatch* merged_batch = nullptr;
957   *write_with_wal = 0;
958   auto* leader = write_group.leader;
959   assert(!leader->disable_wal);  // Same holds for all in the batch group
960   if (write_group.size == 1 && !leader->CallbackFailed() &&
961       leader->batch->GetWalTerminationPoint().is_cleared()) {
962     // we simply write the first WriteBatch to WAL if the group only
963     // contains one batch, that batch should be written to the WAL,
964     // and the batch is not wanting to be truncated
965     merged_batch = leader->batch;
966     if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
967       *to_be_cached_state = merged_batch;
968     }
969     *write_with_wal = 1;
970   } else {
971     // WAL needs all of the batches flattened into a single batch.
972     // We could avoid copying here with an iov-like AddRecord
973     // interface
974     merged_batch = tmp_batch;
975     for (auto writer : write_group) {
976       if (!writer->CallbackFailed()) {
977         WriteBatchInternal::Append(merged_batch, writer->batch,
978                                    /*WAL_only*/ true);
979         if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
980           // We only need to cache the last of such write batch
981           *to_be_cached_state = writer->batch;
982         }
983         (*write_with_wal)++;
984       }
985     }
986   }
987   return merged_batch;
988 }
989 
990 // When two_write_queues_ is disabled, this function is called from the only
991 // write thread. Otherwise this must be called holding log_write_mutex_.
WriteToWAL(const WriteBatch & merged_batch,log::Writer * log_writer,uint64_t * log_used,uint64_t * log_size)992 IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
993                             log::Writer* log_writer, uint64_t* log_used,
994                             uint64_t* log_size) {
995   assert(log_size != nullptr);
996   Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
997   *log_size = log_entry.size();
998   // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
999   // from the two queues anyway and log_write_mutex_ is already held. Otherwise
1000   // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
1001   // from possible concurrent calls via the FlushWAL by the application.
1002   const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
1003   // Due to performance cocerns of missed branch prediction penalize the new
1004   // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
1005   // when we do not need any locking.
1006   if (UNLIKELY(needs_locking)) {
1007     log_write_mutex_.Lock();
1008   }
1009   IOStatus io_s = log_writer->AddRecord(log_entry);
1010 
1011   if (UNLIKELY(needs_locking)) {
1012     log_write_mutex_.Unlock();
1013   }
1014   if (log_used != nullptr) {
1015     *log_used = logfile_number_;
1016   }
1017   total_log_size_ += log_entry.size();
1018   // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
1019   // since alive_log_files_ might be modified concurrently
1020   alive_log_files_.back().AddSize(log_entry.size());
1021   log_empty_ = false;
1022   return io_s;
1023 }
1024 
WriteToWAL(const WriteThread::WriteGroup & write_group,log::Writer * log_writer,uint64_t * log_used,bool need_log_sync,bool need_log_dir_sync,SequenceNumber sequence)1025 IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
1026                             log::Writer* log_writer, uint64_t* log_used,
1027                             bool need_log_sync, bool need_log_dir_sync,
1028                             SequenceNumber sequence) {
1029   IOStatus io_s;
1030   assert(!write_group.leader->disable_wal);
1031   // Same holds for all in the batch group
1032   size_t write_with_wal = 0;
1033   WriteBatch* to_be_cached_state = nullptr;
1034   WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
1035                                         &write_with_wal, &to_be_cached_state);
1036   if (merged_batch == write_group.leader->batch) {
1037     write_group.leader->log_used = logfile_number_;
1038   } else if (write_with_wal > 1) {
1039     for (auto writer : write_group) {
1040       writer->log_used = logfile_number_;
1041     }
1042   }
1043 
1044   WriteBatchInternal::SetSequence(merged_batch, sequence);
1045 
1046   uint64_t log_size;
1047   io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1048   if (to_be_cached_state) {
1049     cached_recoverable_state_ = *to_be_cached_state;
1050     cached_recoverable_state_empty_ = false;
1051   }
1052 
1053   if (io_s.ok() && need_log_sync) {
1054     StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
1055     // It's safe to access logs_ with unlocked mutex_ here because:
1056     //  - we've set getting_synced=true for all logs,
1057     //    so other threads won't pop from logs_ while we're here,
1058     //  - only writer thread can push to logs_, and we're in
1059     //    writer thread, so no one will push to logs_,
1060     //  - as long as other threads don't modify it, it's safe to read
1061     //    from std::deque from multiple threads concurrently.
1062     for (auto& log : logs_) {
1063       io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
1064       if (!io_s.ok()) {
1065         break;
1066       }
1067     }
1068 
1069     if (io_s.ok() && need_log_dir_sync) {
1070       // We only sync WAL directory the first time WAL syncing is
1071       // requested, so that in case users never turn on WAL sync,
1072       // we can avoid the disk I/O in the write code path.
1073       io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
1074     }
1075   }
1076 
1077   if (merged_batch == &tmp_batch_) {
1078     tmp_batch_.Clear();
1079   }
1080   if (io_s.ok()) {
1081     auto stats = default_cf_internal_stats_;
1082     if (need_log_sync) {
1083       stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
1084       RecordTick(stats_, WAL_FILE_SYNCED);
1085     }
1086     stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size);
1087     RecordTick(stats_, WAL_FILE_BYTES, log_size);
1088     stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
1089     RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
1090   }
1091   return io_s;
1092 }
1093 
ConcurrentWriteToWAL(const WriteThread::WriteGroup & write_group,uint64_t * log_used,SequenceNumber * last_sequence,size_t seq_inc)1094 IOStatus DBImpl::ConcurrentWriteToWAL(
1095     const WriteThread::WriteGroup& write_group, uint64_t* log_used,
1096     SequenceNumber* last_sequence, size_t seq_inc) {
1097   IOStatus io_s;
1098 
1099   assert(!write_group.leader->disable_wal);
1100   // Same holds for all in the batch group
1101   WriteBatch tmp_batch;
1102   size_t write_with_wal = 0;
1103   WriteBatch* to_be_cached_state = nullptr;
1104   WriteBatch* merged_batch =
1105       MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
1106 
1107   // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
1108   // pushed back concurrently
1109   log_write_mutex_.Lock();
1110   if (merged_batch == write_group.leader->batch) {
1111     write_group.leader->log_used = logfile_number_;
1112   } else if (write_with_wal > 1) {
1113     for (auto writer : write_group) {
1114       writer->log_used = logfile_number_;
1115     }
1116   }
1117   *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
1118   auto sequence = *last_sequence + 1;
1119   WriteBatchInternal::SetSequence(merged_batch, sequence);
1120 
1121   log::Writer* log_writer = logs_.back().writer;
1122   uint64_t log_size;
1123   io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1124   if (to_be_cached_state) {
1125     cached_recoverable_state_ = *to_be_cached_state;
1126     cached_recoverable_state_empty_ = false;
1127   }
1128   log_write_mutex_.Unlock();
1129 
1130   if (io_s.ok()) {
1131     const bool concurrent = true;
1132     auto stats = default_cf_internal_stats_;
1133     stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
1134                       concurrent);
1135     RecordTick(stats_, WAL_FILE_BYTES, log_size);
1136     stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
1137                       concurrent);
1138     RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
1139   }
1140   return io_s;
1141 }
1142 
WriteRecoverableState()1143 Status DBImpl::WriteRecoverableState() {
1144   mutex_.AssertHeld();
1145   if (!cached_recoverable_state_empty_) {
1146     bool dont_care_bool;
1147     SequenceNumber next_seq;
1148     if (two_write_queues_) {
1149       log_write_mutex_.Lock();
1150     }
1151     SequenceNumber seq;
1152     if (two_write_queues_) {
1153       seq = versions_->FetchAddLastAllocatedSequence(0);
1154     } else {
1155       seq = versions_->LastSequence();
1156     }
1157     WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
1158     auto status = WriteBatchInternal::InsertInto(
1159         &cached_recoverable_state_, column_family_memtables_.get(),
1160         &flush_scheduler_, &trim_history_scheduler_, true,
1161         0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
1162         &next_seq, &dont_care_bool, seq_per_batch_);
1163     auto last_seq = next_seq - 1;
1164     if (two_write_queues_) {
1165       versions_->FetchAddLastAllocatedSequence(last_seq - seq);
1166       versions_->SetLastPublishedSequence(last_seq);
1167     }
1168     versions_->SetLastSequence(last_seq);
1169     if (two_write_queues_) {
1170       log_write_mutex_.Unlock();
1171     }
1172     if (status.ok() && recoverable_state_pre_release_callback_) {
1173       const bool DISABLE_MEMTABLE = true;
1174       for (uint64_t sub_batch_seq = seq + 1;
1175            sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
1176         uint64_t const no_log_num = 0;
1177         // Unlock it since the callback might end up locking mutex. e.g.,
1178         // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
1179         mutex_.Unlock();
1180         status = recoverable_state_pre_release_callback_->Callback(
1181             sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
1182         mutex_.Lock();
1183       }
1184     }
1185     if (status.ok()) {
1186       cached_recoverable_state_.Clear();
1187       cached_recoverable_state_empty_ = true;
1188     }
1189     return status;
1190   }
1191   return Status::OK();
1192 }
1193 
SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData * > * cfds)1194 void DBImpl::SelectColumnFamiliesForAtomicFlush(
1195     autovector<ColumnFamilyData*>* cfds) {
1196   for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
1197     if (cfd->IsDropped()) {
1198       continue;
1199     }
1200     if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1201         !cached_recoverable_state_empty_.load()) {
1202       cfds->push_back(cfd);
1203     }
1204   }
1205 }
1206 
1207 // Assign sequence number for atomic flush.
AssignAtomicFlushSeq(const autovector<ColumnFamilyData * > & cfds)1208 void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
1209   assert(immutable_db_options_.atomic_flush);
1210   auto seq = versions_->LastSequence();
1211   for (auto cfd : cfds) {
1212     cfd->imm()->AssignAtomicFlushSeq(seq);
1213   }
1214 }
1215 
SwitchWAL(WriteContext * write_context)1216 Status DBImpl::SwitchWAL(WriteContext* write_context) {
1217   mutex_.AssertHeld();
1218   assert(write_context != nullptr);
1219   Status status;
1220 
1221   if (alive_log_files_.begin()->getting_flushed) {
1222     return status;
1223   }
1224 
1225   auto oldest_alive_log = alive_log_files_.begin()->number;
1226   bool flush_wont_release_oldest_log = false;
1227   if (allow_2pc()) {
1228     auto oldest_log_with_uncommitted_prep =
1229         logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
1230 
1231     assert(oldest_log_with_uncommitted_prep == 0 ||
1232            oldest_log_with_uncommitted_prep >= oldest_alive_log);
1233     if (oldest_log_with_uncommitted_prep > 0 &&
1234         oldest_log_with_uncommitted_prep == oldest_alive_log) {
1235       if (unable_to_release_oldest_log_) {
1236         // we already attempted to flush all column families dependent on
1237         // the oldest alive log but the log still contained uncommitted
1238         // transactions so there is still nothing that we can do.
1239         return status;
1240       } else {
1241         ROCKS_LOG_WARN(
1242             immutable_db_options_.info_log,
1243             "Unable to release oldest log due to uncommitted transaction");
1244         unable_to_release_oldest_log_ = true;
1245         flush_wont_release_oldest_log = true;
1246       }
1247     }
1248   }
1249   if (!flush_wont_release_oldest_log) {
1250     // we only mark this log as getting flushed if we have successfully
1251     // flushed all data in this log. If this log contains outstanding prepared
1252     // transactions then we cannot flush this log until those transactions are
1253     // commited.
1254     unable_to_release_oldest_log_ = false;
1255     alive_log_files_.begin()->getting_flushed = true;
1256   }
1257 
1258   ROCKS_LOG_INFO(
1259       immutable_db_options_.info_log,
1260       "Flushing all column families with data in WAL number %" PRIu64
1261       ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
1262       oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
1263   // no need to refcount because drop is happening in write thread, so can't
1264   // happen while we're in the write thread
1265   autovector<ColumnFamilyData*> cfds;
1266   if (immutable_db_options_.atomic_flush) {
1267     SelectColumnFamiliesForAtomicFlush(&cfds);
1268   } else {
1269     for (auto cfd : *versions_->GetColumnFamilySet()) {
1270       if (cfd->IsDropped()) {
1271         continue;
1272       }
1273       if (cfd->OldestLogToKeep() <= oldest_alive_log) {
1274         cfds.push_back(cfd);
1275       }
1276     }
1277     MaybeFlushStatsCF(&cfds);
1278   }
1279   WriteThread::Writer nonmem_w;
1280   if (two_write_queues_) {
1281     nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1282   }
1283 
1284   for (const auto cfd : cfds) {
1285     cfd->Ref();
1286     status = SwitchMemtable(cfd, write_context);
1287     cfd->UnrefAndTryDelete();
1288     if (!status.ok()) {
1289       break;
1290     }
1291   }
1292   if (two_write_queues_) {
1293     nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1294   }
1295 
1296   if (status.ok()) {
1297     if (immutable_db_options_.atomic_flush) {
1298       AssignAtomicFlushSeq(cfds);
1299     }
1300     for (auto cfd : cfds) {
1301       cfd->imm()->FlushRequested();
1302     }
1303     FlushRequest flush_req;
1304     GenerateFlushRequest(cfds, &flush_req);
1305     SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
1306     MaybeScheduleFlushOrCompaction();
1307   }
1308   return status;
1309 }
1310 
HandleWriteBufferFull(WriteContext * write_context)1311 Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
1312   mutex_.AssertHeld();
1313   assert(write_context != nullptr);
1314   Status status;
1315 
1316   // Before a new memtable is added in SwitchMemtable(),
1317   // write_buffer_manager_->ShouldFlush() will keep returning true. If another
1318   // thread is writing to another DB with the same write buffer, they may also
1319   // be flushed. We may end up with flushing much more DBs than needed. It's
1320   // suboptimal but still correct.
1321   ROCKS_LOG_INFO(
1322       immutable_db_options_.info_log,
1323       "Flushing column family with oldest memtable entry. Write buffer is "
1324       "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
1325       write_buffer_manager_->memory_usage(),
1326       write_buffer_manager_->buffer_size());
1327   // no need to refcount because drop is happening in write thread, so can't
1328   // happen while we're in the write thread
1329   autovector<ColumnFamilyData*> cfds;
1330   if (immutable_db_options_.atomic_flush) {
1331     SelectColumnFamiliesForAtomicFlush(&cfds);
1332   } else {
1333     ColumnFamilyData* cfd_picked = nullptr;
1334     SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
1335 
1336     for (auto cfd : *versions_->GetColumnFamilySet()) {
1337       if (cfd->IsDropped()) {
1338         continue;
1339       }
1340       if (!cfd->mem()->IsEmpty()) {
1341         // We only consider active mem table, hoping immutable memtable is
1342         // already in the process of flushing.
1343         uint64_t seq = cfd->mem()->GetCreationSeq();
1344         if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
1345           cfd_picked = cfd;
1346           seq_num_for_cf_picked = seq;
1347         }
1348       }
1349     }
1350     if (cfd_picked != nullptr) {
1351       cfds.push_back(cfd_picked);
1352     }
1353     MaybeFlushStatsCF(&cfds);
1354   }
1355 
1356   WriteThread::Writer nonmem_w;
1357   if (two_write_queues_) {
1358     nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1359   }
1360   for (const auto cfd : cfds) {
1361     if (cfd->mem()->IsEmpty()) {
1362       continue;
1363     }
1364     cfd->Ref();
1365     status = SwitchMemtable(cfd, write_context);
1366     cfd->UnrefAndTryDelete();
1367     if (!status.ok()) {
1368       break;
1369     }
1370   }
1371   if (two_write_queues_) {
1372     nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1373   }
1374 
1375   if (status.ok()) {
1376     if (immutable_db_options_.atomic_flush) {
1377       AssignAtomicFlushSeq(cfds);
1378     }
1379     for (const auto cfd : cfds) {
1380       cfd->imm()->FlushRequested();
1381     }
1382     FlushRequest flush_req;
1383     GenerateFlushRequest(cfds, &flush_req);
1384     SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1385     MaybeScheduleFlushOrCompaction();
1386   }
1387   return status;
1388 }
1389 
GetMaxTotalWalSize() const1390 uint64_t DBImpl::GetMaxTotalWalSize() const {
1391   mutex_.AssertHeld();
1392   return mutable_db_options_.max_total_wal_size == 0
1393              ? 4 * max_total_in_memory_state_
1394              : mutable_db_options_.max_total_wal_size;
1395 }
1396 
1397 // REQUIRES: mutex_ is held
1398 // REQUIRES: this thread is currently at the front of the writer queue
DelayWrite(uint64_t num_bytes,const WriteOptions & write_options)1399 Status DBImpl::DelayWrite(uint64_t num_bytes,
1400                           const WriteOptions& write_options) {
1401   uint64_t time_delayed = 0;
1402   bool delayed = false;
1403   {
1404     StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
1405     uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
1406     if (delay > 0) {
1407       if (write_options.no_slowdown) {
1408         return Status::Incomplete("Write stall");
1409       }
1410       TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
1411 
1412       // Notify write_thread_ about the stall so it can setup a barrier and
1413       // fail any pending writers with no_slowdown
1414       write_thread_.BeginWriteStall();
1415       TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
1416       mutex_.Unlock();
1417       // We will delay the write until we have slept for delay ms or
1418       // we don't need a delay anymore
1419       const uint64_t kDelayInterval = 1000;
1420       uint64_t stall_end = sw.start_time() + delay;
1421       while (write_controller_.NeedsDelay()) {
1422         if (env_->NowMicros() >= stall_end) {
1423           // We already delayed this write `delay` microseconds
1424           break;
1425         }
1426 
1427         delayed = true;
1428         // Sleep for 0.001 seconds
1429         env_->SleepForMicroseconds(kDelayInterval);
1430       }
1431       mutex_.Lock();
1432       write_thread_.EndWriteStall();
1433     }
1434 
1435     // Don't wait if there's a background error, even if its a soft error. We
1436     // might wait here indefinitely as the background compaction may never
1437     // finish successfully, resulting in the stall condition lasting
1438     // indefinitely
1439     while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
1440       if (write_options.no_slowdown) {
1441         return Status::Incomplete("Write stall");
1442       }
1443       delayed = true;
1444 
1445       // Notify write_thread_ about the stall so it can setup a barrier and
1446       // fail any pending writers with no_slowdown
1447       write_thread_.BeginWriteStall();
1448       TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
1449       bg_cv_.Wait();
1450       write_thread_.EndWriteStall();
1451     }
1452   }
1453   assert(!delayed || !write_options.no_slowdown);
1454   if (delayed) {
1455     default_cf_internal_stats_->AddDBStats(
1456         InternalStats::kIntStatsWriteStallMicros, time_delayed);
1457     RecordTick(stats_, STALL_MICROS, time_delayed);
1458   }
1459 
1460   // If DB is not in read-only mode and write_controller is not stopping
1461   // writes, we can ignore any background errors and allow the write to
1462   // proceed
1463   Status s;
1464   if (write_controller_.IsStopped()) {
1465     // If writes are still stopped, it means we bailed due to a background
1466     // error
1467     s = Status::Incomplete(error_handler_.GetBGError().ToString());
1468   }
1469   if (error_handler_.IsDBStopped()) {
1470     s = error_handler_.GetBGError();
1471   }
1472   return s;
1473 }
1474 
ThrottleLowPriWritesIfNeeded(const WriteOptions & write_options,WriteBatch * my_batch)1475 Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
1476                                             WriteBatch* my_batch) {
1477   assert(write_options.low_pri);
1478   // This is called outside the DB mutex. Although it is safe to make the call,
1479   // the consistency condition is not guaranteed to hold. It's OK to live with
1480   // it in this case.
1481   // If we need to speed compaction, it means the compaction is left behind
1482   // and we start to limit low pri writes to a limit.
1483   if (write_controller_.NeedSpeedupCompaction()) {
1484     if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
1485       // For 2PC, we only rate limit prepare, not commit.
1486       return Status::OK();
1487     }
1488     if (write_options.no_slowdown) {
1489       return Status::Incomplete("Low priority write stall");
1490     } else {
1491       assert(my_batch != nullptr);
1492       // Rate limit those writes. The reason that we don't completely wait
1493       // is that in case the write is heavy, low pri writes may never have
1494       // a chance to run. Now we guarantee we are still slowly making
1495       // progress.
1496       PERF_TIMER_GUARD(write_delay_time);
1497       write_controller_.low_pri_rate_limiter()->Request(
1498           my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
1499           RateLimiter::OpType::kWrite);
1500     }
1501   }
1502   return Status::OK();
1503 }
1504 
MaybeFlushStatsCF(autovector<ColumnFamilyData * > * cfds)1505 void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
1506   assert(cfds != nullptr);
1507   if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
1508     ColumnFamilyData* cfd_stats =
1509         versions_->GetColumnFamilySet()->GetColumnFamily(
1510             kPersistentStatsColumnFamilyName);
1511     if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
1512       for (ColumnFamilyData* cfd : *cfds) {
1513         if (cfd == cfd_stats) {
1514           // stats CF already included in cfds
1515           return;
1516         }
1517       }
1518       // force flush stats CF when its log number is less than all other CF's
1519       // log numbers
1520       bool force_flush_stats_cf = true;
1521       for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1522         if (loop_cfd == cfd_stats) {
1523           continue;
1524         }
1525         if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1526           force_flush_stats_cf = false;
1527         }
1528       }
1529       if (force_flush_stats_cf) {
1530         cfds->push_back(cfd_stats);
1531         ROCKS_LOG_INFO(immutable_db_options_.info_log,
1532                        "Force flushing stats CF with automated flush "
1533                        "to avoid holding old logs");
1534       }
1535     }
1536   }
1537 }
1538 
TrimMemtableHistory(WriteContext * context)1539 Status DBImpl::TrimMemtableHistory(WriteContext* context) {
1540   autovector<ColumnFamilyData*> cfds;
1541   ColumnFamilyData* tmp_cfd;
1542   while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
1543          nullptr) {
1544     cfds.push_back(tmp_cfd);
1545   }
1546   for (auto& cfd : cfds) {
1547     autovector<MemTable*> to_delete;
1548     cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
1549     if (!to_delete.empty()) {
1550       for (auto m : to_delete) {
1551         delete m;
1552       }
1553       context->superversion_context.NewSuperVersion();
1554       assert(context->superversion_context.new_superversion.get() != nullptr);
1555       cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
1556     }
1557 
1558     if (cfd->UnrefAndTryDelete()) {
1559       cfd = nullptr;
1560     }
1561   }
1562   return Status::OK();
1563 }
1564 
ScheduleFlushes(WriteContext * context)1565 Status DBImpl::ScheduleFlushes(WriteContext* context) {
1566   autovector<ColumnFamilyData*> cfds;
1567   if (immutable_db_options_.atomic_flush) {
1568     SelectColumnFamiliesForAtomicFlush(&cfds);
1569     for (auto cfd : cfds) {
1570       cfd->Ref();
1571     }
1572     flush_scheduler_.Clear();
1573   } else {
1574     ColumnFamilyData* tmp_cfd;
1575     while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1576       cfds.push_back(tmp_cfd);
1577     }
1578     MaybeFlushStatsCF(&cfds);
1579   }
1580   Status status;
1581   WriteThread::Writer nonmem_w;
1582   if (two_write_queues_) {
1583     nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1584   }
1585 
1586   for (auto& cfd : cfds) {
1587     if (!cfd->mem()->IsEmpty()) {
1588       status = SwitchMemtable(cfd, context);
1589     }
1590     if (cfd->UnrefAndTryDelete()) {
1591       cfd = nullptr;
1592     }
1593     if (!status.ok()) {
1594       break;
1595     }
1596   }
1597 
1598   if (two_write_queues_) {
1599     nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1600   }
1601 
1602   if (status.ok()) {
1603     if (immutable_db_options_.atomic_flush) {
1604       AssignAtomicFlushSeq(cfds);
1605     }
1606     FlushRequest flush_req;
1607     GenerateFlushRequest(cfds, &flush_req);
1608     SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1609     MaybeScheduleFlushOrCompaction();
1610   }
1611   return status;
1612 }
1613 
1614 #ifndef ROCKSDB_LITE
NotifyOnMemTableSealed(ColumnFamilyData *,const MemTableInfo & mem_table_info)1615 void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
1616                                     const MemTableInfo& mem_table_info) {
1617   if (immutable_db_options_.listeners.size() == 0U) {
1618     return;
1619   }
1620   if (shutting_down_.load(std::memory_order_acquire)) {
1621     return;
1622   }
1623 
1624   for (auto listener : immutable_db_options_.listeners) {
1625     listener->OnMemTableSealed(mem_table_info);
1626   }
1627 }
1628 #endif  // ROCKSDB_LITE
1629 
1630 // REQUIRES: mutex_ is held
1631 // REQUIRES: this thread is currently at the front of the writer queue
1632 // REQUIRES: this thread is currently at the front of the 2nd writer queue if
1633 // two_write_queues_ is true (This is to simplify the reasoning.)
SwitchMemtable(ColumnFamilyData * cfd,WriteContext * context)1634 Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
1635   mutex_.AssertHeld();
1636   WriteThread::Writer nonmem_w;
1637   std::unique_ptr<WritableFile> lfile;
1638   log::Writer* new_log = nullptr;
1639   MemTable* new_mem = nullptr;
1640 
1641   // Recoverable state is persisted in WAL. After memtable switch, WAL might
1642   // be deleted, so we write the state to memtable to be persisted as well.
1643   Status s = WriteRecoverableState();
1644   if (!s.ok()) {
1645     return s;
1646   }
1647 
1648   // Attempt to switch to a new memtable and trigger flush of old.
1649   // Do this without holding the dbmutex lock.
1650   assert(versions_->prev_log_number() == 0);
1651   if (two_write_queues_) {
1652     log_write_mutex_.Lock();
1653   }
1654   bool creating_new_log = !log_empty_;
1655   if (two_write_queues_) {
1656     log_write_mutex_.Unlock();
1657   }
1658   uint64_t recycle_log_number = 0;
1659   if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
1660       !log_recycle_files_.empty()) {
1661     recycle_log_number = log_recycle_files_.front();
1662   }
1663   uint64_t new_log_number =
1664       creating_new_log ? versions_->NewFileNumber() : logfile_number_;
1665   const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1666 
1667   // Set memtable_info for memtable sealed callback
1668 #ifndef ROCKSDB_LITE
1669   MemTableInfo memtable_info;
1670   memtable_info.cf_name = cfd->GetName();
1671   memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
1672   memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
1673   memtable_info.num_entries = cfd->mem()->num_entries();
1674   memtable_info.num_deletes = cfd->mem()->num_deletes();
1675 #endif  // ROCKSDB_LITE
1676   // Log this later after lock release. It may be outdated, e.g., if background
1677   // flush happens before logging, but that should be ok.
1678   int num_imm_unflushed = cfd->imm()->NumNotFlushed();
1679   const auto preallocate_block_size =
1680       GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
1681   mutex_.Unlock();
1682   if (creating_new_log) {
1683     // TODO: Write buffer size passed in should be max of all CF's instead
1684     // of mutable_cf_options.write_buffer_size.
1685     s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
1686                   &new_log);
1687   }
1688   if (s.ok()) {
1689     SequenceNumber seq = versions_->LastSequence();
1690     new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
1691     context->superversion_context.NewSuperVersion();
1692   }
1693   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1694                  "[%s] New memtable created with log file: #%" PRIu64
1695                  ". Immutable memtables: %d.\n",
1696                  cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
1697   mutex_.Lock();
1698   if (recycle_log_number != 0) {
1699     // Since renaming the file is done outside DB mutex, we need to ensure
1700     // concurrent full purges don't delete the file while we're recycling it.
1701     // To achieve that we hold the old log number in the recyclable list until
1702     // after it has been renamed.
1703     assert(log_recycle_files_.front() == recycle_log_number);
1704     log_recycle_files_.pop_front();
1705   }
1706   if (s.ok() && creating_new_log) {
1707     log_write_mutex_.Lock();
1708     assert(new_log != nullptr);
1709     if (!logs_.empty()) {
1710       // Alway flush the buffer of the last log before switching to a new one
1711       log::Writer* cur_log_writer = logs_.back().writer;
1712       s = cur_log_writer->WriteBuffer();
1713       if (!s.ok()) {
1714         ROCKS_LOG_WARN(immutable_db_options_.info_log,
1715                        "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
1716                        "  WAL file\n",
1717                        cfd->GetName().c_str(), cur_log_writer->get_log_number(),
1718                        new_log_number);
1719       }
1720     }
1721     if (s.ok()) {
1722       logfile_number_ = new_log_number;
1723       log_empty_ = true;
1724       log_dir_synced_ = false;
1725       logs_.emplace_back(logfile_number_, new_log);
1726       alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
1727     }
1728     log_write_mutex_.Unlock();
1729   }
1730 
1731   if (!s.ok()) {
1732     // how do we fail if we're not creating new log?
1733     assert(creating_new_log);
1734     if (new_mem) {
1735       delete new_mem;
1736     }
1737     if (new_log) {
1738       delete new_log;
1739     }
1740     SuperVersion* new_superversion =
1741         context->superversion_context.new_superversion.release();
1742     if (new_superversion != nullptr) {
1743       delete new_superversion;
1744     }
1745     // We may have lost data from the WritableFileBuffer in-memory buffer for
1746     // the current log, so treat it as a fatal error and set bg_error
1747     error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
1748     // Read back bg_error in order to get the right severity
1749     s = error_handler_.GetBGError();
1750     return s;
1751   }
1752 
1753   for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
1754     // all this is just optimization to delete logs that
1755     // are no longer needed -- if CF is empty, that means it
1756     // doesn't need that particular log to stay alive, so we just
1757     // advance the log number. no need to persist this in the manifest
1758     if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
1759         loop_cfd->imm()->NumNotFlushed() == 0) {
1760       if (creating_new_log) {
1761         loop_cfd->SetLogNumber(logfile_number_);
1762       }
1763       loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
1764     }
1765   }
1766 
1767   cfd->mem()->SetNextLogNumber(logfile_number_);
1768   cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
1769   new_mem->Ref();
1770   cfd->SetMemtable(new_mem);
1771   InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
1772                                      mutable_cf_options);
1773 #ifndef ROCKSDB_LITE
1774   mutex_.Unlock();
1775   // Notify client that memtable is sealed, now that we have successfully
1776   // installed a new memtable
1777   NotifyOnMemTableSealed(cfd, memtable_info);
1778   mutex_.Lock();
1779 #endif  // ROCKSDB_LITE
1780   return s;
1781 }
1782 
GetWalPreallocateBlockSize(uint64_t write_buffer_size) const1783 size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
1784   mutex_.AssertHeld();
1785   size_t bsize =
1786       static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
1787   // Some users might set very high write_buffer_size and rely on
1788   // max_total_wal_size or other parameters to control the WAL size.
1789   if (mutable_db_options_.max_total_wal_size > 0) {
1790     bsize = std::min<size_t>(
1791         bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
1792   }
1793   if (immutable_db_options_.db_write_buffer_size > 0) {
1794     bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
1795   }
1796   if (immutable_db_options_.write_buffer_manager &&
1797       immutable_db_options_.write_buffer_manager->enabled()) {
1798     bsize = std::min<size_t>(
1799         bsize, immutable_db_options_.write_buffer_manager->buffer_size());
1800   }
1801 
1802   return bsize;
1803 }
1804 
1805 // Default implementations of convenience methods that subclasses of DB
1806 // can call if they wish
Put(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1807 Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1808                const Slice& key, const Slice& value) {
1809   if (nullptr == opt.timestamp) {
1810     // Pre-allocate size of write batch conservatively.
1811     // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1812     // and we allocate 11 extra bytes for key length, as well as value length.
1813     WriteBatch batch(key.size() + value.size() + 24);
1814     Status s = batch.Put(column_family, key, value);
1815     if (!s.ok()) {
1816       return s;
1817     }
1818     return Write(opt, &batch);
1819   }
1820   const Slice* ts = opt.timestamp;
1821   assert(nullptr != ts);
1822   size_t ts_sz = ts->size();
1823   WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
1824                    ts_sz);
1825   Status s = batch.Put(column_family, key, value);
1826   if (!s.ok()) {
1827     return s;
1828   }
1829   s = batch.AssignTimestamp(*ts);
1830   if (!s.ok()) {
1831     return s;
1832   }
1833   return Write(opt, &batch);
1834 }
1835 
Delete(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key)1836 Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1837                   const Slice& key) {
1838   WriteBatch batch;
1839   batch.Delete(column_family, key);
1840   return Write(opt, &batch);
1841 }
1842 
SingleDelete(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key)1843 Status DB::SingleDelete(const WriteOptions& opt,
1844                         ColumnFamilyHandle* column_family, const Slice& key) {
1845   WriteBatch batch;
1846   batch.SingleDelete(column_family, key);
1847   return Write(opt, &batch);
1848 }
1849 
DeleteRange(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1850 Status DB::DeleteRange(const WriteOptions& opt,
1851                        ColumnFamilyHandle* column_family,
1852                        const Slice& begin_key, const Slice& end_key) {
1853   WriteBatch batch;
1854   batch.DeleteRange(column_family, begin_key, end_key);
1855   return Write(opt, &batch);
1856 }
1857 
Merge(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1858 Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1859                  const Slice& key, const Slice& value) {
1860   WriteBatch batch;
1861   Status s = batch.Merge(column_family, key, value);
1862   if (!s.ok()) {
1863     return s;
1864   }
1865   return Write(opt, &batch);
1866 }
1867 }  // namespace ROCKSDB_NAMESPACE
1868