1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12 #include "db/error_handler.h"
13 #include "db/event_helpers.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "options/options_helper.h"
16 #include "test_util/sync_point.h"
17
18 namespace ROCKSDB_NAMESPACE {
19 // Convenience methods
Put(const WriteOptions & o,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)20 Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
21 const Slice& key, const Slice& val) {
22 return DB::Put(o, column_family, key, val);
23 }
24
Merge(const WriteOptions & o,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)25 Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
26 const Slice& key, const Slice& val) {
27 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
28 if (!cfh->cfd()->ioptions()->merge_operator) {
29 return Status::NotSupported("Provide a merge_operator when opening DB");
30 } else {
31 return DB::Merge(o, column_family, key, val);
32 }
33 }
34
Delete(const WriteOptions & write_options,ColumnFamilyHandle * column_family,const Slice & key)35 Status DBImpl::Delete(const WriteOptions& write_options,
36 ColumnFamilyHandle* column_family, const Slice& key) {
37 return DB::Delete(write_options, column_family, key);
38 }
39
SingleDelete(const WriteOptions & write_options,ColumnFamilyHandle * column_family,const Slice & key)40 Status DBImpl::SingleDelete(const WriteOptions& write_options,
41 ColumnFamilyHandle* column_family,
42 const Slice& key) {
43 return DB::SingleDelete(write_options, column_family, key);
44 }
45
SetRecoverableStatePreReleaseCallback(PreReleaseCallback * callback)46 void DBImpl::SetRecoverableStatePreReleaseCallback(
47 PreReleaseCallback* callback) {
48 recoverable_state_pre_release_callback_.reset(callback);
49 }
50
Write(const WriteOptions & write_options,WriteBatch * my_batch)51 Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
52 return WriteImpl(write_options, my_batch, nullptr, nullptr);
53 }
54
55 #ifndef ROCKSDB_LITE
WriteWithCallback(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback)56 Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
57 WriteBatch* my_batch,
58 WriteCallback* callback) {
59 return WriteImpl(write_options, my_batch, callback, nullptr);
60 }
61 #endif // ROCKSDB_LITE
62
63 // The main write queue. This is the only write queue that updates LastSequence.
64 // When using one write queue, the same sequence also indicates the last
65 // published sequence.
WriteImpl(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t * log_used,uint64_t log_ref,bool disable_memtable,uint64_t * seq_used,size_t batch_cnt,PreReleaseCallback * pre_release_callback)66 Status DBImpl::WriteImpl(const WriteOptions& write_options,
67 WriteBatch* my_batch, WriteCallback* callback,
68 uint64_t* log_used, uint64_t log_ref,
69 bool disable_memtable, uint64_t* seq_used,
70 size_t batch_cnt,
71 PreReleaseCallback* pre_release_callback) {
72 assert(!seq_per_batch_ || batch_cnt != 0);
73 if (my_batch == nullptr) {
74 return Status::Corruption("Batch is nullptr!");
75 }
76 if (tracer_) {
77 InstrumentedMutexLock lock(&trace_mutex_);
78 if (tracer_) {
79 tracer_->Write(my_batch);
80 }
81 }
82 if (write_options.sync && write_options.disableWAL) {
83 return Status::InvalidArgument("Sync writes has to enable WAL.");
84 }
85 if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
86 return Status::NotSupported(
87 "pipelined_writes is not compatible with concurrent prepares");
88 }
89 if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
90 // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
91 return Status::NotSupported(
92 "pipelined_writes is not compatible with seq_per_batch");
93 }
94 if (immutable_db_options_.unordered_write &&
95 immutable_db_options_.enable_pipelined_write) {
96 return Status::NotSupported(
97 "pipelined_writes is not compatible with unordered_write");
98 }
99 // Otherwise IsLatestPersistentState optimization does not make sense
100 assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
101 disable_memtable);
102
103 Status status;
104 IOStatus io_s;
105 if (write_options.low_pri) {
106 status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
107 if (!status.ok()) {
108 return status;
109 }
110 }
111
112 if (two_write_queues_ && disable_memtable) {
113 AssignOrder assign_order =
114 seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
115 // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
116 // they don't consume sequence.
117 return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch,
118 callback, log_used, log_ref, seq_used, batch_cnt,
119 pre_release_callback, assign_order,
120 kDontPublishLastSeq, disable_memtable);
121 }
122
123 if (immutable_db_options_.unordered_write) {
124 const size_t sub_batch_cnt = batch_cnt != 0
125 ? batch_cnt
126 // every key is a sub-batch consuming a seq
127 : WriteBatchInternal::Count(my_batch);
128 uint64_t seq;
129 // Use a write thread to i) optimize for WAL write, ii) publish last
130 // sequence in in increasing order, iii) call pre_release_callback serially
131 status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
132 log_used, log_ref, &seq, sub_batch_cnt,
133 pre_release_callback, kDoAssignOrder,
134 kDoPublishLastSeq, disable_memtable);
135 TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
136 if (!status.ok()) {
137 return status;
138 }
139 if (seq_used) {
140 *seq_used = seq;
141 }
142 if (!disable_memtable) {
143 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
144 status = UnorderedWriteMemtable(write_options, my_batch, callback,
145 log_ref, seq, sub_batch_cnt);
146 }
147 return status;
148 }
149
150 if (immutable_db_options_.enable_pipelined_write) {
151 return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
152 log_ref, disable_memtable, seq_used);
153 }
154
155 PERF_TIMER_GUARD(write_pre_and_post_process_time);
156 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
157 disable_memtable, batch_cnt, pre_release_callback);
158
159 if (!write_options.disableWAL) {
160 RecordTick(stats_, WRITE_WITH_WAL);
161 }
162
163 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
164
165 write_thread_.JoinBatchGroup(&w);
166 if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
167 // we are a non-leader in a parallel group
168
169 if (w.ShouldWriteToMemtable()) {
170 PERF_TIMER_STOP(write_pre_and_post_process_time);
171 PERF_TIMER_GUARD(write_memtable_time);
172
173 ColumnFamilyMemTablesImpl column_family_memtables(
174 versions_->GetColumnFamilySet());
175 w.status = WriteBatchInternal::InsertInto(
176 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
177 &trim_history_scheduler_,
178 write_options.ignore_missing_column_families, 0 /*log_number*/, this,
179 true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
180 batch_per_txn_, write_options.memtable_insert_hint_per_batch);
181
182 PERF_TIMER_START(write_pre_and_post_process_time);
183 }
184
185 if (write_thread_.CompleteParallelMemTableWriter(&w)) {
186 // we're responsible for exit batch group
187 // TODO(myabandeh): propagate status to write_group
188 auto last_sequence = w.write_group->last_sequence;
189 versions_->SetLastSequence(last_sequence);
190 MemTableInsertStatusCheck(w.status);
191 write_thread_.ExitAsBatchGroupFollower(&w);
192 }
193 assert(w.state == WriteThread::STATE_COMPLETED);
194 // STATE_COMPLETED conditional below handles exit
195
196 status = w.FinalStatus();
197 }
198 if (w.state == WriteThread::STATE_COMPLETED) {
199 if (log_used != nullptr) {
200 *log_used = w.log_used;
201 }
202 if (seq_used != nullptr) {
203 *seq_used = w.sequence;
204 }
205 // write is complete and leader has updated sequence
206 return w.FinalStatus();
207 }
208 // else we are the leader of the write batch group
209 assert(w.state == WriteThread::STATE_GROUP_LEADER);
210
211 // Once reaches this point, the current writer "w" will try to do its write
212 // job. It may also pick up some of the remaining writers in the "writers_"
213 // when it finds suitable, and finish them in the same write batch.
214 // This is how a write job could be done by the other writer.
215 WriteContext write_context;
216 WriteThread::WriteGroup write_group;
217 bool in_parallel_group = false;
218 uint64_t last_sequence = kMaxSequenceNumber;
219
220 mutex_.Lock();
221
222 bool need_log_sync = write_options.sync;
223 bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
224 if (!two_write_queues_ || !disable_memtable) {
225 // With concurrent writes we do preprocess only in the write thread that
226 // also does write to memtable to avoid sync issue on shared data structure
227 // with the other thread
228
229 // PreprocessWrite does its own perf timing.
230 PERF_TIMER_STOP(write_pre_and_post_process_time);
231
232 status = PreprocessWrite(write_options, &need_log_sync, &write_context);
233 if (!two_write_queues_) {
234 // Assign it after ::PreprocessWrite since the sequence might advance
235 // inside it by WriteRecoverableState
236 last_sequence = versions_->LastSequence();
237 }
238
239 PERF_TIMER_START(write_pre_and_post_process_time);
240 }
241 log::Writer* log_writer = logs_.back().writer;
242
243 mutex_.Unlock();
244
245 // Add to log and apply to memtable. We can release the lock
246 // during this phase since &w is currently responsible for logging
247 // and protects against concurrent loggers and concurrent writes
248 // into memtables
249
250 TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
251 last_batch_group_size_ =
252 write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
253
254 if (status.ok()) {
255 // Rules for when we can update the memtable concurrently
256 // 1. supported by memtable
257 // 2. Puts are not okay if inplace_update_support
258 // 3. Merges are not okay
259 //
260 // Rules 1..2 are enforced by checking the options
261 // during startup (CheckConcurrentWritesSupported), so if
262 // options.allow_concurrent_memtable_write is true then they can be
263 // assumed to be true. Rule 3 is checked for each batch. We could
264 // relax rules 2 if we could prevent write batches from referring
265 // more than once to a particular key.
266 bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
267 write_group.size > 1;
268 size_t total_count = 0;
269 size_t valid_batches = 0;
270 size_t total_byte_size = 0;
271 size_t pre_release_callback_cnt = 0;
272 for (auto* writer : write_group) {
273 if (writer->CheckCallback(this)) {
274 valid_batches += writer->batch_cnt;
275 if (writer->ShouldWriteToMemtable()) {
276 total_count += WriteBatchInternal::Count(writer->batch);
277 parallel = parallel && !writer->batch->HasMerge();
278 }
279 total_byte_size = WriteBatchInternal::AppendedByteSize(
280 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
281 if (writer->pre_release_callback) {
282 pre_release_callback_cnt++;
283 }
284 }
285 }
286 // Note about seq_per_batch_: either disableWAL is set for the entire write
287 // group or not. In either case we inc seq for each write batch with no
288 // failed callback. This means that there could be a batch with
289 // disalbe_memtable in between; although we do not write this batch to
290 // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
291 // the seq per valid written key to mem.
292 size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
293
294 const bool concurrent_update = two_write_queues_;
295 // Update stats while we are an exclusive group leader, so we know
296 // that nobody else can be writing to these particular stats.
297 // We're optimistic, updating the stats before we successfully
298 // commit. That lets us release our leader status early.
299 auto stats = default_cf_internal_stats_;
300 stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count,
301 concurrent_update);
302 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
303 stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
304 concurrent_update);
305 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
306 stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
307 concurrent_update);
308 RecordTick(stats_, WRITE_DONE_BY_SELF);
309 auto write_done_by_other = write_group.size - 1;
310 if (write_done_by_other > 0) {
311 stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
312 write_done_by_other, concurrent_update);
313 RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
314 }
315 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
316
317 if (write_options.disableWAL) {
318 has_unpersisted_data_.store(true, std::memory_order_relaxed);
319 }
320
321 PERF_TIMER_STOP(write_pre_and_post_process_time);
322
323 if (!two_write_queues_) {
324 if (status.ok() && !write_options.disableWAL) {
325 PERF_TIMER_GUARD(write_wal_time);
326 io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
327 need_log_dir_sync, last_sequence + 1);
328 }
329 } else {
330 if (status.ok() && !write_options.disableWAL) {
331 PERF_TIMER_GUARD(write_wal_time);
332 // LastAllocatedSequence is increased inside WriteToWAL under
333 // wal_write_mutex_ to ensure ordered events in WAL
334 io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
335 seq_inc);
336 } else {
337 // Otherwise we inc seq number for memtable writes
338 last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
339 }
340 }
341 status = io_s;
342 assert(last_sequence != kMaxSequenceNumber);
343 const SequenceNumber current_sequence = last_sequence + 1;
344 last_sequence += seq_inc;
345
346 // PreReleaseCallback is called after WAL write and before memtable write
347 if (status.ok()) {
348 SequenceNumber next_sequence = current_sequence;
349 size_t index = 0;
350 // Note: the logic for advancing seq here must be consistent with the
351 // logic in WriteBatchInternal::InsertInto(write_group...) as well as
352 // with WriteBatchInternal::InsertInto(write_batch...) that is called on
353 // the merged batch during recovery from the WAL.
354 for (auto* writer : write_group) {
355 if (writer->CallbackFailed()) {
356 continue;
357 }
358 writer->sequence = next_sequence;
359 if (writer->pre_release_callback) {
360 Status ws = writer->pre_release_callback->Callback(
361 writer->sequence, disable_memtable, writer->log_used, index++,
362 pre_release_callback_cnt);
363 if (!ws.ok()) {
364 status = ws;
365 break;
366 }
367 }
368 if (seq_per_batch_) {
369 assert(writer->batch_cnt);
370 next_sequence += writer->batch_cnt;
371 } else if (writer->ShouldWriteToMemtable()) {
372 next_sequence += WriteBatchInternal::Count(writer->batch);
373 }
374 }
375 }
376
377 if (status.ok()) {
378 PERF_TIMER_GUARD(write_memtable_time);
379
380 if (!parallel) {
381 // w.sequence will be set inside InsertInto
382 w.status = WriteBatchInternal::InsertInto(
383 write_group, current_sequence, column_family_memtables_.get(),
384 &flush_scheduler_, &trim_history_scheduler_,
385 write_options.ignore_missing_column_families,
386 0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
387 batch_per_txn_);
388 } else {
389 write_group.last_sequence = last_sequence;
390 write_thread_.LaunchParallelMemTableWriters(&write_group);
391 in_parallel_group = true;
392
393 // Each parallel follower is doing each own writes. The leader should
394 // also do its own.
395 if (w.ShouldWriteToMemtable()) {
396 ColumnFamilyMemTablesImpl column_family_memtables(
397 versions_->GetColumnFamilySet());
398 assert(w.sequence == current_sequence);
399 w.status = WriteBatchInternal::InsertInto(
400 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
401 &trim_history_scheduler_,
402 write_options.ignore_missing_column_families, 0 /*log_number*/,
403 this, true /*concurrent_memtable_writes*/, seq_per_batch_,
404 w.batch_cnt, batch_per_txn_,
405 write_options.memtable_insert_hint_per_batch);
406 }
407 }
408 if (seq_used != nullptr) {
409 *seq_used = w.sequence;
410 }
411 }
412 }
413 PERF_TIMER_START(write_pre_and_post_process_time);
414
415 if (!w.CallbackFailed()) {
416 if (!io_s.ok()) {
417 IOStatusCheck(io_s);
418 } else {
419 WriteStatusCheck(status);
420 }
421 }
422
423 if (need_log_sync) {
424 mutex_.Lock();
425 MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
426 mutex_.Unlock();
427 // Requesting sync with two_write_queues_ is expected to be very rare. We
428 // hence provide a simple implementation that is not necessarily efficient.
429 if (two_write_queues_) {
430 if (manual_wal_flush_) {
431 status = FlushWAL(true);
432 } else {
433 status = SyncWAL();
434 }
435 }
436 }
437
438 bool should_exit_batch_group = true;
439 if (in_parallel_group) {
440 // CompleteParallelWorker returns true if this thread should
441 // handle exit, false means somebody else did
442 should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
443 }
444 if (should_exit_batch_group) {
445 if (status.ok()) {
446 // Note: if we are to resume after non-OK statuses we need to revisit how
447 // we reacts to non-OK statuses here.
448 versions_->SetLastSequence(last_sequence);
449 }
450 MemTableInsertStatusCheck(w.status);
451 write_thread_.ExitAsBatchGroupLeader(write_group, status);
452 }
453
454 if (status.ok()) {
455 status = w.FinalStatus();
456 }
457 return status;
458 }
459
PipelinedWriteImpl(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t * log_used,uint64_t log_ref,bool disable_memtable,uint64_t * seq_used)460 Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
461 WriteBatch* my_batch, WriteCallback* callback,
462 uint64_t* log_used, uint64_t log_ref,
463 bool disable_memtable, uint64_t* seq_used) {
464 PERF_TIMER_GUARD(write_pre_and_post_process_time);
465 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
466
467 WriteContext write_context;
468
469 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
470 disable_memtable);
471 write_thread_.JoinBatchGroup(&w);
472 if (w.state == WriteThread::STATE_GROUP_LEADER) {
473 WriteThread::WriteGroup wal_write_group;
474 if (w.callback && !w.callback->AllowWriteBatching()) {
475 write_thread_.WaitForMemTableWriters();
476 }
477 mutex_.Lock();
478 bool need_log_sync = !write_options.disableWAL && write_options.sync;
479 bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
480 // PreprocessWrite does its own perf timing.
481 PERF_TIMER_STOP(write_pre_and_post_process_time);
482 w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
483 PERF_TIMER_START(write_pre_and_post_process_time);
484 log::Writer* log_writer = logs_.back().writer;
485 mutex_.Unlock();
486
487 // This can set non-OK status if callback fail.
488 last_batch_group_size_ =
489 write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
490 const SequenceNumber current_sequence =
491 write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
492 size_t total_count = 0;
493 size_t total_byte_size = 0;
494
495 if (w.status.ok()) {
496 SequenceNumber next_sequence = current_sequence;
497 for (auto writer : wal_write_group) {
498 if (writer->CheckCallback(this)) {
499 if (writer->ShouldWriteToMemtable()) {
500 writer->sequence = next_sequence;
501 size_t count = WriteBatchInternal::Count(writer->batch);
502 next_sequence += count;
503 total_count += count;
504 }
505 total_byte_size = WriteBatchInternal::AppendedByteSize(
506 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
507 }
508 }
509 if (w.disable_wal) {
510 has_unpersisted_data_.store(true, std::memory_order_relaxed);
511 }
512 write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
513 }
514
515 auto stats = default_cf_internal_stats_;
516 stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
517 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
518 stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
519 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
520 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
521
522 PERF_TIMER_STOP(write_pre_and_post_process_time);
523
524 IOStatus io_s;
525 if (w.status.ok() && !write_options.disableWAL) {
526 PERF_TIMER_GUARD(write_wal_time);
527 stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
528 RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
529 if (wal_write_group.size > 1) {
530 stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
531 wal_write_group.size - 1);
532 RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
533 }
534 io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
535 need_log_dir_sync, current_sequence);
536 w.status = io_s;
537 }
538
539 if (!w.CallbackFailed()) {
540 if (!io_s.ok()) {
541 IOStatusCheck(io_s);
542 } else {
543 WriteStatusCheck(w.status);
544 }
545 }
546
547 if (need_log_sync) {
548 mutex_.Lock();
549 MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
550 mutex_.Unlock();
551 }
552
553 write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
554 }
555
556 WriteThread::WriteGroup memtable_write_group;
557 if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
558 PERF_TIMER_GUARD(write_memtable_time);
559 assert(w.ShouldWriteToMemtable());
560 write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
561 if (memtable_write_group.size > 1 &&
562 immutable_db_options_.allow_concurrent_memtable_write) {
563 write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
564 } else {
565 memtable_write_group.status = WriteBatchInternal::InsertInto(
566 memtable_write_group, w.sequence, column_family_memtables_.get(),
567 &flush_scheduler_, &trim_history_scheduler_,
568 write_options.ignore_missing_column_families, 0 /*log_number*/, this,
569 false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
570 versions_->SetLastSequence(memtable_write_group.last_sequence);
571 write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
572 }
573 }
574
575 if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
576 assert(w.ShouldWriteToMemtable());
577 ColumnFamilyMemTablesImpl column_family_memtables(
578 versions_->GetColumnFamilySet());
579 w.status = WriteBatchInternal::InsertInto(
580 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
581 &trim_history_scheduler_, write_options.ignore_missing_column_families,
582 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
583 false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
584 write_options.memtable_insert_hint_per_batch);
585 if (write_thread_.CompleteParallelMemTableWriter(&w)) {
586 MemTableInsertStatusCheck(w.status);
587 versions_->SetLastSequence(w.write_group->last_sequence);
588 write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
589 }
590 }
591 if (seq_used != nullptr) {
592 *seq_used = w.sequence;
593 }
594
595 assert(w.state == WriteThread::STATE_COMPLETED);
596 return w.FinalStatus();
597 }
598
UnorderedWriteMemtable(const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t log_ref,SequenceNumber seq,const size_t sub_batch_cnt)599 Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
600 WriteBatch* my_batch,
601 WriteCallback* callback, uint64_t log_ref,
602 SequenceNumber seq,
603 const size_t sub_batch_cnt) {
604 PERF_TIMER_GUARD(write_pre_and_post_process_time);
605 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
606
607 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
608 false /*disable_memtable*/);
609
610 if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
611 w.sequence = seq;
612 size_t total_count = WriteBatchInternal::Count(my_batch);
613 InternalStats* stats = default_cf_internal_stats_;
614 stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
615 RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
616
617 ColumnFamilyMemTablesImpl column_family_memtables(
618 versions_->GetColumnFamilySet());
619 w.status = WriteBatchInternal::InsertInto(
620 &w, w.sequence, &column_family_memtables, &flush_scheduler_,
621 &trim_history_scheduler_, write_options.ignore_missing_column_families,
622 0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
623 seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
624 write_options.memtable_insert_hint_per_batch);
625
626 WriteStatusCheck(w.status);
627 if (write_options.disableWAL) {
628 has_unpersisted_data_.store(true, std::memory_order_relaxed);
629 }
630 }
631
632 size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
633 if (pending_cnt == 0) {
634 // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
635 // before notify ensures that cv is in waiting state when it is notified
636 // thus not missing the update to pending_memtable_writes_ even though it is
637 // not modified under the mutex.
638 std::lock_guard<std::mutex> lck(switch_mutex_);
639 switch_cv_.notify_all();
640 }
641
642 if (!w.FinalStatus().ok()) {
643 return w.FinalStatus();
644 }
645 return Status::OK();
646 }
647
648 // The 2nd write queue. If enabled it will be used only for WAL-only writes.
649 // This is the only queue that updates LastPublishedSequence which is only
650 // applicable in a two-queue setting.
WriteImplWALOnly(WriteThread * write_thread,const WriteOptions & write_options,WriteBatch * my_batch,WriteCallback * callback,uint64_t * log_used,const uint64_t log_ref,uint64_t * seq_used,const size_t sub_batch_cnt,PreReleaseCallback * pre_release_callback,const AssignOrder assign_order,const PublishLastSeq publish_last_seq,const bool disable_memtable)651 Status DBImpl::WriteImplWALOnly(
652 WriteThread* write_thread, const WriteOptions& write_options,
653 WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used,
654 const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
655 PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
656 const PublishLastSeq publish_last_seq, const bool disable_memtable) {
657 Status status;
658 PERF_TIMER_GUARD(write_pre_and_post_process_time);
659 WriteThread::Writer w(write_options, my_batch, callback, log_ref,
660 disable_memtable, sub_batch_cnt, pre_release_callback);
661 RecordTick(stats_, WRITE_WITH_WAL);
662 StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
663
664 write_thread->JoinBatchGroup(&w);
665 assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
666 if (w.state == WriteThread::STATE_COMPLETED) {
667 if (log_used != nullptr) {
668 *log_used = w.log_used;
669 }
670 if (seq_used != nullptr) {
671 *seq_used = w.sequence;
672 }
673 return w.FinalStatus();
674 }
675 // else we are the leader of the write batch group
676 assert(w.state == WriteThread::STATE_GROUP_LEADER);
677
678 if (publish_last_seq == kDoPublishLastSeq) {
679 // Currently we only use kDoPublishLastSeq in unordered_write
680 assert(immutable_db_options_.unordered_write);
681 WriteContext write_context;
682 if (error_handler_.IsDBStopped()) {
683 status = error_handler_.GetBGError();
684 }
685 // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
686 // without paying the cost of obtaining the mutex.
687 if (status.ok()) {
688 InstrumentedMutexLock l(&mutex_);
689 bool need_log_sync = false;
690 status = PreprocessWrite(write_options, &need_log_sync, &write_context);
691 WriteStatusCheck(status);
692 }
693 if (!status.ok()) {
694 WriteThread::WriteGroup write_group;
695 write_thread->EnterAsBatchGroupLeader(&w, &write_group);
696 write_thread->ExitAsBatchGroupLeader(write_group, status);
697 return status;
698 }
699 }
700
701 WriteThread::WriteGroup write_group;
702 uint64_t last_sequence;
703 write_thread->EnterAsBatchGroupLeader(&w, &write_group);
704 // Note: no need to update last_batch_group_size_ here since the batch writes
705 // to WAL only
706
707 size_t pre_release_callback_cnt = 0;
708 size_t total_byte_size = 0;
709 for (auto* writer : write_group) {
710 if (writer->CheckCallback(this)) {
711 total_byte_size = WriteBatchInternal::AppendedByteSize(
712 total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
713 if (writer->pre_release_callback) {
714 pre_release_callback_cnt++;
715 }
716 }
717 }
718
719 const bool concurrent_update = true;
720 // Update stats while we are an exclusive group leader, so we know
721 // that nobody else can be writing to these particular stats.
722 // We're optimistic, updating the stats before we successfully
723 // commit. That lets us release our leader status early.
724 auto stats = default_cf_internal_stats_;
725 stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
726 concurrent_update);
727 RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
728 stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
729 concurrent_update);
730 RecordTick(stats_, WRITE_DONE_BY_SELF);
731 auto write_done_by_other = write_group.size - 1;
732 if (write_done_by_other > 0) {
733 stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
734 write_done_by_other, concurrent_update);
735 RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
736 }
737 RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
738
739 PERF_TIMER_STOP(write_pre_and_post_process_time);
740
741 PERF_TIMER_GUARD(write_wal_time);
742 // LastAllocatedSequence is increased inside WriteToWAL under
743 // wal_write_mutex_ to ensure ordered events in WAL
744 size_t seq_inc = 0 /* total_count */;
745 if (assign_order == kDoAssignOrder) {
746 size_t total_batch_cnt = 0;
747 for (auto* writer : write_group) {
748 assert(writer->batch_cnt || !seq_per_batch_);
749 if (!writer->CallbackFailed()) {
750 total_batch_cnt += writer->batch_cnt;
751 }
752 }
753 seq_inc = total_batch_cnt;
754 }
755 IOStatus io_s;
756 if (!write_options.disableWAL) {
757 io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
758 status = io_s;
759 } else {
760 // Otherwise we inc seq number to do solely the seq allocation
761 last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
762 }
763
764 size_t memtable_write_cnt = 0;
765 auto curr_seq = last_sequence + 1;
766 for (auto* writer : write_group) {
767 if (writer->CallbackFailed()) {
768 continue;
769 }
770 writer->sequence = curr_seq;
771 if (assign_order == kDoAssignOrder) {
772 assert(writer->batch_cnt || !seq_per_batch_);
773 curr_seq += writer->batch_cnt;
774 }
775 if (!writer->disable_memtable) {
776 memtable_write_cnt++;
777 }
778 // else seq advances only by memtable writes
779 }
780 if (status.ok() && write_options.sync) {
781 assert(!write_options.disableWAL);
782 // Requesting sync with two_write_queues_ is expected to be very rare. We
783 // hance provide a simple implementation that is not necessarily efficient.
784 if (manual_wal_flush_) {
785 status = FlushWAL(true);
786 } else {
787 status = SyncWAL();
788 }
789 }
790 PERF_TIMER_START(write_pre_and_post_process_time);
791
792 if (!w.CallbackFailed()) {
793 if (!io_s.ok()) {
794 IOStatusCheck(io_s);
795 } else {
796 WriteStatusCheck(status);
797 }
798 }
799 if (status.ok()) {
800 size_t index = 0;
801 for (auto* writer : write_group) {
802 if (!writer->CallbackFailed() && writer->pre_release_callback) {
803 assert(writer->sequence != kMaxSequenceNumber);
804 Status ws = writer->pre_release_callback->Callback(
805 writer->sequence, disable_memtable, writer->log_used, index++,
806 pre_release_callback_cnt);
807 if (!ws.ok()) {
808 status = ws;
809 break;
810 }
811 }
812 }
813 }
814 if (publish_last_seq == kDoPublishLastSeq) {
815 versions_->SetLastSequence(last_sequence + seq_inc);
816 // Currently we only use kDoPublishLastSeq in unordered_write
817 assert(immutable_db_options_.unordered_write);
818 }
819 if (immutable_db_options_.unordered_write && status.ok()) {
820 pending_memtable_writes_ += memtable_write_cnt;
821 }
822 write_thread->ExitAsBatchGroupLeader(write_group, status);
823 if (status.ok()) {
824 status = w.FinalStatus();
825 }
826 if (seq_used != nullptr) {
827 *seq_used = w.sequence;
828 }
829 return status;
830 }
831
WriteStatusCheck(const Status & status)832 void DBImpl::WriteStatusCheck(const Status& status) {
833 // Is setting bg_error_ enough here? This will at least stop
834 // compaction and fail any further writes.
835 if (immutable_db_options_.paranoid_checks && !status.ok() &&
836 !status.IsBusy() && !status.IsIncomplete()) {
837 mutex_.Lock();
838 error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
839 mutex_.Unlock();
840 }
841 }
842
IOStatusCheck(const IOStatus & io_status)843 void DBImpl::IOStatusCheck(const IOStatus& io_status) {
844 // Is setting bg_error_ enough here? This will at least stop
845 // compaction and fail any further writes.
846 if (immutable_db_options_.paranoid_checks && !io_status.ok() &&
847 !io_status.IsBusy() && !io_status.IsIncomplete()) {
848 mutex_.Lock();
849 error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
850 mutex_.Unlock();
851 }
852 }
853
MemTableInsertStatusCheck(const Status & status)854 void DBImpl::MemTableInsertStatusCheck(const Status& status) {
855 // A non-OK status here indicates that the state implied by the
856 // WAL has diverged from the in-memory state. This could be
857 // because of a corrupt write_batch (very bad), or because the
858 // client specified an invalid column family and didn't specify
859 // ignore_missing_column_families.
860 if (!status.ok()) {
861 mutex_.Lock();
862 assert(!error_handler_.IsBGWorkStopped());
863 error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
864 mutex_.Unlock();
865 }
866 }
867
PreprocessWrite(const WriteOptions & write_options,bool * need_log_sync,WriteContext * write_context)868 Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
869 bool* need_log_sync,
870 WriteContext* write_context) {
871 mutex_.AssertHeld();
872 assert(write_context != nullptr && need_log_sync != nullptr);
873 Status status;
874
875 if (error_handler_.IsDBStopped()) {
876 status = error_handler_.GetBGError();
877 }
878
879 PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
880
881 assert(!single_column_family_mode_ ||
882 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
883 if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
884 total_log_size_ > GetMaxTotalWalSize())) {
885 WaitForPendingWrites();
886 status = SwitchWAL(write_context);
887 }
888
889 if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
890 // Before a new memtable is added in SwitchMemtable(),
891 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
892 // thread is writing to another DB with the same write buffer, they may also
893 // be flushed. We may end up with flushing much more DBs than needed. It's
894 // suboptimal but still correct.
895 WaitForPendingWrites();
896 status = HandleWriteBufferFull(write_context);
897 }
898
899 if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
900 status = TrimMemtableHistory(write_context);
901 }
902
903 if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
904 WaitForPendingWrites();
905 status = ScheduleFlushes(write_context);
906 }
907
908 PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
909 PERF_TIMER_GUARD(write_pre_and_post_process_time);
910
911 if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
912 write_controller_.NeedsDelay()))) {
913 PERF_TIMER_STOP(write_pre_and_post_process_time);
914 PERF_TIMER_GUARD(write_delay_time);
915 // We don't know size of curent batch so that we always use the size
916 // for previous one. It might create a fairness issue that expiration
917 // might happen for smaller writes but larger writes can go through.
918 // Can optimize it if it is an issue.
919 status = DelayWrite(last_batch_group_size_, write_options);
920 PERF_TIMER_START(write_pre_and_post_process_time);
921 }
922
923 if (status.ok() && *need_log_sync) {
924 // Wait until the parallel syncs are finished. Any sync process has to sync
925 // the front log too so it is enough to check the status of front()
926 // We do a while loop since log_sync_cv_ is signalled when any sync is
927 // finished
928 // Note: there does not seem to be a reason to wait for parallel sync at
929 // this early step but it is not important since parallel sync (SyncWAL) and
930 // need_log_sync are usually not used together.
931 while (logs_.front().getting_synced) {
932 log_sync_cv_.Wait();
933 }
934 for (auto& log : logs_) {
935 assert(!log.getting_synced);
936 // This is just to prevent the logs to be synced by a parallel SyncWAL
937 // call. We will do the actual syncing later after we will write to the
938 // WAL.
939 // Note: there does not seem to be a reason to set this early before we
940 // actually write to the WAL
941 log.getting_synced = true;
942 }
943 } else {
944 *need_log_sync = false;
945 }
946
947 return status;
948 }
949
MergeBatch(const WriteThread::WriteGroup & write_group,WriteBatch * tmp_batch,size_t * write_with_wal,WriteBatch ** to_be_cached_state)950 WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
951 WriteBatch* tmp_batch, size_t* write_with_wal,
952 WriteBatch** to_be_cached_state) {
953 assert(write_with_wal != nullptr);
954 assert(tmp_batch != nullptr);
955 assert(*to_be_cached_state == nullptr);
956 WriteBatch* merged_batch = nullptr;
957 *write_with_wal = 0;
958 auto* leader = write_group.leader;
959 assert(!leader->disable_wal); // Same holds for all in the batch group
960 if (write_group.size == 1 && !leader->CallbackFailed() &&
961 leader->batch->GetWalTerminationPoint().is_cleared()) {
962 // we simply write the first WriteBatch to WAL if the group only
963 // contains one batch, that batch should be written to the WAL,
964 // and the batch is not wanting to be truncated
965 merged_batch = leader->batch;
966 if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
967 *to_be_cached_state = merged_batch;
968 }
969 *write_with_wal = 1;
970 } else {
971 // WAL needs all of the batches flattened into a single batch.
972 // We could avoid copying here with an iov-like AddRecord
973 // interface
974 merged_batch = tmp_batch;
975 for (auto writer : write_group) {
976 if (!writer->CallbackFailed()) {
977 WriteBatchInternal::Append(merged_batch, writer->batch,
978 /*WAL_only*/ true);
979 if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
980 // We only need to cache the last of such write batch
981 *to_be_cached_state = writer->batch;
982 }
983 (*write_with_wal)++;
984 }
985 }
986 }
987 return merged_batch;
988 }
989
990 // When two_write_queues_ is disabled, this function is called from the only
991 // write thread. Otherwise this must be called holding log_write_mutex_.
WriteToWAL(const WriteBatch & merged_batch,log::Writer * log_writer,uint64_t * log_used,uint64_t * log_size)992 IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
993 log::Writer* log_writer, uint64_t* log_used,
994 uint64_t* log_size) {
995 assert(log_size != nullptr);
996 Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
997 *log_size = log_entry.size();
998 // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
999 // from the two queues anyway and log_write_mutex_ is already held. Otherwise
1000 // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
1001 // from possible concurrent calls via the FlushWAL by the application.
1002 const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
1003 // Due to performance cocerns of missed branch prediction penalize the new
1004 // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
1005 // when we do not need any locking.
1006 if (UNLIKELY(needs_locking)) {
1007 log_write_mutex_.Lock();
1008 }
1009 IOStatus io_s = log_writer->AddRecord(log_entry);
1010
1011 if (UNLIKELY(needs_locking)) {
1012 log_write_mutex_.Unlock();
1013 }
1014 if (log_used != nullptr) {
1015 *log_used = logfile_number_;
1016 }
1017 total_log_size_ += log_entry.size();
1018 // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
1019 // since alive_log_files_ might be modified concurrently
1020 alive_log_files_.back().AddSize(log_entry.size());
1021 log_empty_ = false;
1022 return io_s;
1023 }
1024
WriteToWAL(const WriteThread::WriteGroup & write_group,log::Writer * log_writer,uint64_t * log_used,bool need_log_sync,bool need_log_dir_sync,SequenceNumber sequence)1025 IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
1026 log::Writer* log_writer, uint64_t* log_used,
1027 bool need_log_sync, bool need_log_dir_sync,
1028 SequenceNumber sequence) {
1029 IOStatus io_s;
1030 assert(!write_group.leader->disable_wal);
1031 // Same holds for all in the batch group
1032 size_t write_with_wal = 0;
1033 WriteBatch* to_be_cached_state = nullptr;
1034 WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
1035 &write_with_wal, &to_be_cached_state);
1036 if (merged_batch == write_group.leader->batch) {
1037 write_group.leader->log_used = logfile_number_;
1038 } else if (write_with_wal > 1) {
1039 for (auto writer : write_group) {
1040 writer->log_used = logfile_number_;
1041 }
1042 }
1043
1044 WriteBatchInternal::SetSequence(merged_batch, sequence);
1045
1046 uint64_t log_size;
1047 io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1048 if (to_be_cached_state) {
1049 cached_recoverable_state_ = *to_be_cached_state;
1050 cached_recoverable_state_empty_ = false;
1051 }
1052
1053 if (io_s.ok() && need_log_sync) {
1054 StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
1055 // It's safe to access logs_ with unlocked mutex_ here because:
1056 // - we've set getting_synced=true for all logs,
1057 // so other threads won't pop from logs_ while we're here,
1058 // - only writer thread can push to logs_, and we're in
1059 // writer thread, so no one will push to logs_,
1060 // - as long as other threads don't modify it, it's safe to read
1061 // from std::deque from multiple threads concurrently.
1062 for (auto& log : logs_) {
1063 io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
1064 if (!io_s.ok()) {
1065 break;
1066 }
1067 }
1068
1069 if (io_s.ok() && need_log_dir_sync) {
1070 // We only sync WAL directory the first time WAL syncing is
1071 // requested, so that in case users never turn on WAL sync,
1072 // we can avoid the disk I/O in the write code path.
1073 io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
1074 }
1075 }
1076
1077 if (merged_batch == &tmp_batch_) {
1078 tmp_batch_.Clear();
1079 }
1080 if (io_s.ok()) {
1081 auto stats = default_cf_internal_stats_;
1082 if (need_log_sync) {
1083 stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
1084 RecordTick(stats_, WAL_FILE_SYNCED);
1085 }
1086 stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size);
1087 RecordTick(stats_, WAL_FILE_BYTES, log_size);
1088 stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
1089 RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
1090 }
1091 return io_s;
1092 }
1093
ConcurrentWriteToWAL(const WriteThread::WriteGroup & write_group,uint64_t * log_used,SequenceNumber * last_sequence,size_t seq_inc)1094 IOStatus DBImpl::ConcurrentWriteToWAL(
1095 const WriteThread::WriteGroup& write_group, uint64_t* log_used,
1096 SequenceNumber* last_sequence, size_t seq_inc) {
1097 IOStatus io_s;
1098
1099 assert(!write_group.leader->disable_wal);
1100 // Same holds for all in the batch group
1101 WriteBatch tmp_batch;
1102 size_t write_with_wal = 0;
1103 WriteBatch* to_be_cached_state = nullptr;
1104 WriteBatch* merged_batch =
1105 MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
1106
1107 // We need to lock log_write_mutex_ since logs_ and alive_log_files might be
1108 // pushed back concurrently
1109 log_write_mutex_.Lock();
1110 if (merged_batch == write_group.leader->batch) {
1111 write_group.leader->log_used = logfile_number_;
1112 } else if (write_with_wal > 1) {
1113 for (auto writer : write_group) {
1114 writer->log_used = logfile_number_;
1115 }
1116 }
1117 *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
1118 auto sequence = *last_sequence + 1;
1119 WriteBatchInternal::SetSequence(merged_batch, sequence);
1120
1121 log::Writer* log_writer = logs_.back().writer;
1122 uint64_t log_size;
1123 io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
1124 if (to_be_cached_state) {
1125 cached_recoverable_state_ = *to_be_cached_state;
1126 cached_recoverable_state_empty_ = false;
1127 }
1128 log_write_mutex_.Unlock();
1129
1130 if (io_s.ok()) {
1131 const bool concurrent = true;
1132 auto stats = default_cf_internal_stats_;
1133 stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
1134 concurrent);
1135 RecordTick(stats_, WAL_FILE_BYTES, log_size);
1136 stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
1137 concurrent);
1138 RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
1139 }
1140 return io_s;
1141 }
1142
WriteRecoverableState()1143 Status DBImpl::WriteRecoverableState() {
1144 mutex_.AssertHeld();
1145 if (!cached_recoverable_state_empty_) {
1146 bool dont_care_bool;
1147 SequenceNumber next_seq;
1148 if (two_write_queues_) {
1149 log_write_mutex_.Lock();
1150 }
1151 SequenceNumber seq;
1152 if (two_write_queues_) {
1153 seq = versions_->FetchAddLastAllocatedSequence(0);
1154 } else {
1155 seq = versions_->LastSequence();
1156 }
1157 WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
1158 auto status = WriteBatchInternal::InsertInto(
1159 &cached_recoverable_state_, column_family_memtables_.get(),
1160 &flush_scheduler_, &trim_history_scheduler_, true,
1161 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */,
1162 &next_seq, &dont_care_bool, seq_per_batch_);
1163 auto last_seq = next_seq - 1;
1164 if (two_write_queues_) {
1165 versions_->FetchAddLastAllocatedSequence(last_seq - seq);
1166 versions_->SetLastPublishedSequence(last_seq);
1167 }
1168 versions_->SetLastSequence(last_seq);
1169 if (two_write_queues_) {
1170 log_write_mutex_.Unlock();
1171 }
1172 if (status.ok() && recoverable_state_pre_release_callback_) {
1173 const bool DISABLE_MEMTABLE = true;
1174 for (uint64_t sub_batch_seq = seq + 1;
1175 sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
1176 uint64_t const no_log_num = 0;
1177 // Unlock it since the callback might end up locking mutex. e.g.,
1178 // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
1179 mutex_.Unlock();
1180 status = recoverable_state_pre_release_callback_->Callback(
1181 sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
1182 mutex_.Lock();
1183 }
1184 }
1185 if (status.ok()) {
1186 cached_recoverable_state_.Clear();
1187 cached_recoverable_state_empty_ = true;
1188 }
1189 return status;
1190 }
1191 return Status::OK();
1192 }
1193
SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData * > * cfds)1194 void DBImpl::SelectColumnFamiliesForAtomicFlush(
1195 autovector<ColumnFamilyData*>* cfds) {
1196 for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
1197 if (cfd->IsDropped()) {
1198 continue;
1199 }
1200 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1201 !cached_recoverable_state_empty_.load()) {
1202 cfds->push_back(cfd);
1203 }
1204 }
1205 }
1206
1207 // Assign sequence number for atomic flush.
AssignAtomicFlushSeq(const autovector<ColumnFamilyData * > & cfds)1208 void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
1209 assert(immutable_db_options_.atomic_flush);
1210 auto seq = versions_->LastSequence();
1211 for (auto cfd : cfds) {
1212 cfd->imm()->AssignAtomicFlushSeq(seq);
1213 }
1214 }
1215
SwitchWAL(WriteContext * write_context)1216 Status DBImpl::SwitchWAL(WriteContext* write_context) {
1217 mutex_.AssertHeld();
1218 assert(write_context != nullptr);
1219 Status status;
1220
1221 if (alive_log_files_.begin()->getting_flushed) {
1222 return status;
1223 }
1224
1225 auto oldest_alive_log = alive_log_files_.begin()->number;
1226 bool flush_wont_release_oldest_log = false;
1227 if (allow_2pc()) {
1228 auto oldest_log_with_uncommitted_prep =
1229 logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
1230
1231 assert(oldest_log_with_uncommitted_prep == 0 ||
1232 oldest_log_with_uncommitted_prep >= oldest_alive_log);
1233 if (oldest_log_with_uncommitted_prep > 0 &&
1234 oldest_log_with_uncommitted_prep == oldest_alive_log) {
1235 if (unable_to_release_oldest_log_) {
1236 // we already attempted to flush all column families dependent on
1237 // the oldest alive log but the log still contained uncommitted
1238 // transactions so there is still nothing that we can do.
1239 return status;
1240 } else {
1241 ROCKS_LOG_WARN(
1242 immutable_db_options_.info_log,
1243 "Unable to release oldest log due to uncommitted transaction");
1244 unable_to_release_oldest_log_ = true;
1245 flush_wont_release_oldest_log = true;
1246 }
1247 }
1248 }
1249 if (!flush_wont_release_oldest_log) {
1250 // we only mark this log as getting flushed if we have successfully
1251 // flushed all data in this log. If this log contains outstanding prepared
1252 // transactions then we cannot flush this log until those transactions are
1253 // commited.
1254 unable_to_release_oldest_log_ = false;
1255 alive_log_files_.begin()->getting_flushed = true;
1256 }
1257
1258 ROCKS_LOG_INFO(
1259 immutable_db_options_.info_log,
1260 "Flushing all column families with data in WAL number %" PRIu64
1261 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
1262 oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
1263 // no need to refcount because drop is happening in write thread, so can't
1264 // happen while we're in the write thread
1265 autovector<ColumnFamilyData*> cfds;
1266 if (immutable_db_options_.atomic_flush) {
1267 SelectColumnFamiliesForAtomicFlush(&cfds);
1268 } else {
1269 for (auto cfd : *versions_->GetColumnFamilySet()) {
1270 if (cfd->IsDropped()) {
1271 continue;
1272 }
1273 if (cfd->OldestLogToKeep() <= oldest_alive_log) {
1274 cfds.push_back(cfd);
1275 }
1276 }
1277 MaybeFlushStatsCF(&cfds);
1278 }
1279 WriteThread::Writer nonmem_w;
1280 if (two_write_queues_) {
1281 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1282 }
1283
1284 for (const auto cfd : cfds) {
1285 cfd->Ref();
1286 status = SwitchMemtable(cfd, write_context);
1287 cfd->UnrefAndTryDelete();
1288 if (!status.ok()) {
1289 break;
1290 }
1291 }
1292 if (two_write_queues_) {
1293 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1294 }
1295
1296 if (status.ok()) {
1297 if (immutable_db_options_.atomic_flush) {
1298 AssignAtomicFlushSeq(cfds);
1299 }
1300 for (auto cfd : cfds) {
1301 cfd->imm()->FlushRequested();
1302 }
1303 FlushRequest flush_req;
1304 GenerateFlushRequest(cfds, &flush_req);
1305 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
1306 MaybeScheduleFlushOrCompaction();
1307 }
1308 return status;
1309 }
1310
HandleWriteBufferFull(WriteContext * write_context)1311 Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
1312 mutex_.AssertHeld();
1313 assert(write_context != nullptr);
1314 Status status;
1315
1316 // Before a new memtable is added in SwitchMemtable(),
1317 // write_buffer_manager_->ShouldFlush() will keep returning true. If another
1318 // thread is writing to another DB with the same write buffer, they may also
1319 // be flushed. We may end up with flushing much more DBs than needed. It's
1320 // suboptimal but still correct.
1321 ROCKS_LOG_INFO(
1322 immutable_db_options_.info_log,
1323 "Flushing column family with oldest memtable entry. Write buffer is "
1324 "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
1325 write_buffer_manager_->memory_usage(),
1326 write_buffer_manager_->buffer_size());
1327 // no need to refcount because drop is happening in write thread, so can't
1328 // happen while we're in the write thread
1329 autovector<ColumnFamilyData*> cfds;
1330 if (immutable_db_options_.atomic_flush) {
1331 SelectColumnFamiliesForAtomicFlush(&cfds);
1332 } else {
1333 ColumnFamilyData* cfd_picked = nullptr;
1334 SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
1335
1336 for (auto cfd : *versions_->GetColumnFamilySet()) {
1337 if (cfd->IsDropped()) {
1338 continue;
1339 }
1340 if (!cfd->mem()->IsEmpty()) {
1341 // We only consider active mem table, hoping immutable memtable is
1342 // already in the process of flushing.
1343 uint64_t seq = cfd->mem()->GetCreationSeq();
1344 if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
1345 cfd_picked = cfd;
1346 seq_num_for_cf_picked = seq;
1347 }
1348 }
1349 }
1350 if (cfd_picked != nullptr) {
1351 cfds.push_back(cfd_picked);
1352 }
1353 MaybeFlushStatsCF(&cfds);
1354 }
1355
1356 WriteThread::Writer nonmem_w;
1357 if (two_write_queues_) {
1358 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1359 }
1360 for (const auto cfd : cfds) {
1361 if (cfd->mem()->IsEmpty()) {
1362 continue;
1363 }
1364 cfd->Ref();
1365 status = SwitchMemtable(cfd, write_context);
1366 cfd->UnrefAndTryDelete();
1367 if (!status.ok()) {
1368 break;
1369 }
1370 }
1371 if (two_write_queues_) {
1372 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1373 }
1374
1375 if (status.ok()) {
1376 if (immutable_db_options_.atomic_flush) {
1377 AssignAtomicFlushSeq(cfds);
1378 }
1379 for (const auto cfd : cfds) {
1380 cfd->imm()->FlushRequested();
1381 }
1382 FlushRequest flush_req;
1383 GenerateFlushRequest(cfds, &flush_req);
1384 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1385 MaybeScheduleFlushOrCompaction();
1386 }
1387 return status;
1388 }
1389
GetMaxTotalWalSize() const1390 uint64_t DBImpl::GetMaxTotalWalSize() const {
1391 mutex_.AssertHeld();
1392 return mutable_db_options_.max_total_wal_size == 0
1393 ? 4 * max_total_in_memory_state_
1394 : mutable_db_options_.max_total_wal_size;
1395 }
1396
1397 // REQUIRES: mutex_ is held
1398 // REQUIRES: this thread is currently at the front of the writer queue
DelayWrite(uint64_t num_bytes,const WriteOptions & write_options)1399 Status DBImpl::DelayWrite(uint64_t num_bytes,
1400 const WriteOptions& write_options) {
1401 uint64_t time_delayed = 0;
1402 bool delayed = false;
1403 {
1404 StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
1405 uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
1406 if (delay > 0) {
1407 if (write_options.no_slowdown) {
1408 return Status::Incomplete("Write stall");
1409 }
1410 TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
1411
1412 // Notify write_thread_ about the stall so it can setup a barrier and
1413 // fail any pending writers with no_slowdown
1414 write_thread_.BeginWriteStall();
1415 TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
1416 mutex_.Unlock();
1417 // We will delay the write until we have slept for delay ms or
1418 // we don't need a delay anymore
1419 const uint64_t kDelayInterval = 1000;
1420 uint64_t stall_end = sw.start_time() + delay;
1421 while (write_controller_.NeedsDelay()) {
1422 if (env_->NowMicros() >= stall_end) {
1423 // We already delayed this write `delay` microseconds
1424 break;
1425 }
1426
1427 delayed = true;
1428 // Sleep for 0.001 seconds
1429 env_->SleepForMicroseconds(kDelayInterval);
1430 }
1431 mutex_.Lock();
1432 write_thread_.EndWriteStall();
1433 }
1434
1435 // Don't wait if there's a background error, even if its a soft error. We
1436 // might wait here indefinitely as the background compaction may never
1437 // finish successfully, resulting in the stall condition lasting
1438 // indefinitely
1439 while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
1440 if (write_options.no_slowdown) {
1441 return Status::Incomplete("Write stall");
1442 }
1443 delayed = true;
1444
1445 // Notify write_thread_ about the stall so it can setup a barrier and
1446 // fail any pending writers with no_slowdown
1447 write_thread_.BeginWriteStall();
1448 TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
1449 bg_cv_.Wait();
1450 write_thread_.EndWriteStall();
1451 }
1452 }
1453 assert(!delayed || !write_options.no_slowdown);
1454 if (delayed) {
1455 default_cf_internal_stats_->AddDBStats(
1456 InternalStats::kIntStatsWriteStallMicros, time_delayed);
1457 RecordTick(stats_, STALL_MICROS, time_delayed);
1458 }
1459
1460 // If DB is not in read-only mode and write_controller is not stopping
1461 // writes, we can ignore any background errors and allow the write to
1462 // proceed
1463 Status s;
1464 if (write_controller_.IsStopped()) {
1465 // If writes are still stopped, it means we bailed due to a background
1466 // error
1467 s = Status::Incomplete(error_handler_.GetBGError().ToString());
1468 }
1469 if (error_handler_.IsDBStopped()) {
1470 s = error_handler_.GetBGError();
1471 }
1472 return s;
1473 }
1474
ThrottleLowPriWritesIfNeeded(const WriteOptions & write_options,WriteBatch * my_batch)1475 Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
1476 WriteBatch* my_batch) {
1477 assert(write_options.low_pri);
1478 // This is called outside the DB mutex. Although it is safe to make the call,
1479 // the consistency condition is not guaranteed to hold. It's OK to live with
1480 // it in this case.
1481 // If we need to speed compaction, it means the compaction is left behind
1482 // and we start to limit low pri writes to a limit.
1483 if (write_controller_.NeedSpeedupCompaction()) {
1484 if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
1485 // For 2PC, we only rate limit prepare, not commit.
1486 return Status::OK();
1487 }
1488 if (write_options.no_slowdown) {
1489 return Status::Incomplete("Low priority write stall");
1490 } else {
1491 assert(my_batch != nullptr);
1492 // Rate limit those writes. The reason that we don't completely wait
1493 // is that in case the write is heavy, low pri writes may never have
1494 // a chance to run. Now we guarantee we are still slowly making
1495 // progress.
1496 PERF_TIMER_GUARD(write_delay_time);
1497 write_controller_.low_pri_rate_limiter()->Request(
1498 my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
1499 RateLimiter::OpType::kWrite);
1500 }
1501 }
1502 return Status::OK();
1503 }
1504
MaybeFlushStatsCF(autovector<ColumnFamilyData * > * cfds)1505 void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
1506 assert(cfds != nullptr);
1507 if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
1508 ColumnFamilyData* cfd_stats =
1509 versions_->GetColumnFamilySet()->GetColumnFamily(
1510 kPersistentStatsColumnFamilyName);
1511 if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
1512 for (ColumnFamilyData* cfd : *cfds) {
1513 if (cfd == cfd_stats) {
1514 // stats CF already included in cfds
1515 return;
1516 }
1517 }
1518 // force flush stats CF when its log number is less than all other CF's
1519 // log numbers
1520 bool force_flush_stats_cf = true;
1521 for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1522 if (loop_cfd == cfd_stats) {
1523 continue;
1524 }
1525 if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1526 force_flush_stats_cf = false;
1527 }
1528 }
1529 if (force_flush_stats_cf) {
1530 cfds->push_back(cfd_stats);
1531 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1532 "Force flushing stats CF with automated flush "
1533 "to avoid holding old logs");
1534 }
1535 }
1536 }
1537 }
1538
TrimMemtableHistory(WriteContext * context)1539 Status DBImpl::TrimMemtableHistory(WriteContext* context) {
1540 autovector<ColumnFamilyData*> cfds;
1541 ColumnFamilyData* tmp_cfd;
1542 while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
1543 nullptr) {
1544 cfds.push_back(tmp_cfd);
1545 }
1546 for (auto& cfd : cfds) {
1547 autovector<MemTable*> to_delete;
1548 cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
1549 if (!to_delete.empty()) {
1550 for (auto m : to_delete) {
1551 delete m;
1552 }
1553 context->superversion_context.NewSuperVersion();
1554 assert(context->superversion_context.new_superversion.get() != nullptr);
1555 cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
1556 }
1557
1558 if (cfd->UnrefAndTryDelete()) {
1559 cfd = nullptr;
1560 }
1561 }
1562 return Status::OK();
1563 }
1564
ScheduleFlushes(WriteContext * context)1565 Status DBImpl::ScheduleFlushes(WriteContext* context) {
1566 autovector<ColumnFamilyData*> cfds;
1567 if (immutable_db_options_.atomic_flush) {
1568 SelectColumnFamiliesForAtomicFlush(&cfds);
1569 for (auto cfd : cfds) {
1570 cfd->Ref();
1571 }
1572 flush_scheduler_.Clear();
1573 } else {
1574 ColumnFamilyData* tmp_cfd;
1575 while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1576 cfds.push_back(tmp_cfd);
1577 }
1578 MaybeFlushStatsCF(&cfds);
1579 }
1580 Status status;
1581 WriteThread::Writer nonmem_w;
1582 if (two_write_queues_) {
1583 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1584 }
1585
1586 for (auto& cfd : cfds) {
1587 if (!cfd->mem()->IsEmpty()) {
1588 status = SwitchMemtable(cfd, context);
1589 }
1590 if (cfd->UnrefAndTryDelete()) {
1591 cfd = nullptr;
1592 }
1593 if (!status.ok()) {
1594 break;
1595 }
1596 }
1597
1598 if (two_write_queues_) {
1599 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1600 }
1601
1602 if (status.ok()) {
1603 if (immutable_db_options_.atomic_flush) {
1604 AssignAtomicFlushSeq(cfds);
1605 }
1606 FlushRequest flush_req;
1607 GenerateFlushRequest(cfds, &flush_req);
1608 SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
1609 MaybeScheduleFlushOrCompaction();
1610 }
1611 return status;
1612 }
1613
1614 #ifndef ROCKSDB_LITE
NotifyOnMemTableSealed(ColumnFamilyData *,const MemTableInfo & mem_table_info)1615 void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
1616 const MemTableInfo& mem_table_info) {
1617 if (immutable_db_options_.listeners.size() == 0U) {
1618 return;
1619 }
1620 if (shutting_down_.load(std::memory_order_acquire)) {
1621 return;
1622 }
1623
1624 for (auto listener : immutable_db_options_.listeners) {
1625 listener->OnMemTableSealed(mem_table_info);
1626 }
1627 }
1628 #endif // ROCKSDB_LITE
1629
1630 // REQUIRES: mutex_ is held
1631 // REQUIRES: this thread is currently at the front of the writer queue
1632 // REQUIRES: this thread is currently at the front of the 2nd writer queue if
1633 // two_write_queues_ is true (This is to simplify the reasoning.)
SwitchMemtable(ColumnFamilyData * cfd,WriteContext * context)1634 Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
1635 mutex_.AssertHeld();
1636 WriteThread::Writer nonmem_w;
1637 std::unique_ptr<WritableFile> lfile;
1638 log::Writer* new_log = nullptr;
1639 MemTable* new_mem = nullptr;
1640
1641 // Recoverable state is persisted in WAL. After memtable switch, WAL might
1642 // be deleted, so we write the state to memtable to be persisted as well.
1643 Status s = WriteRecoverableState();
1644 if (!s.ok()) {
1645 return s;
1646 }
1647
1648 // Attempt to switch to a new memtable and trigger flush of old.
1649 // Do this without holding the dbmutex lock.
1650 assert(versions_->prev_log_number() == 0);
1651 if (two_write_queues_) {
1652 log_write_mutex_.Lock();
1653 }
1654 bool creating_new_log = !log_empty_;
1655 if (two_write_queues_) {
1656 log_write_mutex_.Unlock();
1657 }
1658 uint64_t recycle_log_number = 0;
1659 if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
1660 !log_recycle_files_.empty()) {
1661 recycle_log_number = log_recycle_files_.front();
1662 }
1663 uint64_t new_log_number =
1664 creating_new_log ? versions_->NewFileNumber() : logfile_number_;
1665 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1666
1667 // Set memtable_info for memtable sealed callback
1668 #ifndef ROCKSDB_LITE
1669 MemTableInfo memtable_info;
1670 memtable_info.cf_name = cfd->GetName();
1671 memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
1672 memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
1673 memtable_info.num_entries = cfd->mem()->num_entries();
1674 memtable_info.num_deletes = cfd->mem()->num_deletes();
1675 #endif // ROCKSDB_LITE
1676 // Log this later after lock release. It may be outdated, e.g., if background
1677 // flush happens before logging, but that should be ok.
1678 int num_imm_unflushed = cfd->imm()->NumNotFlushed();
1679 const auto preallocate_block_size =
1680 GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
1681 mutex_.Unlock();
1682 if (creating_new_log) {
1683 // TODO: Write buffer size passed in should be max of all CF's instead
1684 // of mutable_cf_options.write_buffer_size.
1685 s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
1686 &new_log);
1687 }
1688 if (s.ok()) {
1689 SequenceNumber seq = versions_->LastSequence();
1690 new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
1691 context->superversion_context.NewSuperVersion();
1692 }
1693 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1694 "[%s] New memtable created with log file: #%" PRIu64
1695 ". Immutable memtables: %d.\n",
1696 cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
1697 mutex_.Lock();
1698 if (recycle_log_number != 0) {
1699 // Since renaming the file is done outside DB mutex, we need to ensure
1700 // concurrent full purges don't delete the file while we're recycling it.
1701 // To achieve that we hold the old log number in the recyclable list until
1702 // after it has been renamed.
1703 assert(log_recycle_files_.front() == recycle_log_number);
1704 log_recycle_files_.pop_front();
1705 }
1706 if (s.ok() && creating_new_log) {
1707 log_write_mutex_.Lock();
1708 assert(new_log != nullptr);
1709 if (!logs_.empty()) {
1710 // Alway flush the buffer of the last log before switching to a new one
1711 log::Writer* cur_log_writer = logs_.back().writer;
1712 s = cur_log_writer->WriteBuffer();
1713 if (!s.ok()) {
1714 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1715 "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
1716 " WAL file\n",
1717 cfd->GetName().c_str(), cur_log_writer->get_log_number(),
1718 new_log_number);
1719 }
1720 }
1721 if (s.ok()) {
1722 logfile_number_ = new_log_number;
1723 log_empty_ = true;
1724 log_dir_synced_ = false;
1725 logs_.emplace_back(logfile_number_, new_log);
1726 alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
1727 }
1728 log_write_mutex_.Unlock();
1729 }
1730
1731 if (!s.ok()) {
1732 // how do we fail if we're not creating new log?
1733 assert(creating_new_log);
1734 if (new_mem) {
1735 delete new_mem;
1736 }
1737 if (new_log) {
1738 delete new_log;
1739 }
1740 SuperVersion* new_superversion =
1741 context->superversion_context.new_superversion.release();
1742 if (new_superversion != nullptr) {
1743 delete new_superversion;
1744 }
1745 // We may have lost data from the WritableFileBuffer in-memory buffer for
1746 // the current log, so treat it as a fatal error and set bg_error
1747 error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
1748 // Read back bg_error in order to get the right severity
1749 s = error_handler_.GetBGError();
1750 return s;
1751 }
1752
1753 for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
1754 // all this is just optimization to delete logs that
1755 // are no longer needed -- if CF is empty, that means it
1756 // doesn't need that particular log to stay alive, so we just
1757 // advance the log number. no need to persist this in the manifest
1758 if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
1759 loop_cfd->imm()->NumNotFlushed() == 0) {
1760 if (creating_new_log) {
1761 loop_cfd->SetLogNumber(logfile_number_);
1762 }
1763 loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
1764 }
1765 }
1766
1767 cfd->mem()->SetNextLogNumber(logfile_number_);
1768 cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
1769 new_mem->Ref();
1770 cfd->SetMemtable(new_mem);
1771 InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
1772 mutable_cf_options);
1773 #ifndef ROCKSDB_LITE
1774 mutex_.Unlock();
1775 // Notify client that memtable is sealed, now that we have successfully
1776 // installed a new memtable
1777 NotifyOnMemTableSealed(cfd, memtable_info);
1778 mutex_.Lock();
1779 #endif // ROCKSDB_LITE
1780 return s;
1781 }
1782
GetWalPreallocateBlockSize(uint64_t write_buffer_size) const1783 size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
1784 mutex_.AssertHeld();
1785 size_t bsize =
1786 static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
1787 // Some users might set very high write_buffer_size and rely on
1788 // max_total_wal_size or other parameters to control the WAL size.
1789 if (mutable_db_options_.max_total_wal_size > 0) {
1790 bsize = std::min<size_t>(
1791 bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
1792 }
1793 if (immutable_db_options_.db_write_buffer_size > 0) {
1794 bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
1795 }
1796 if (immutable_db_options_.write_buffer_manager &&
1797 immutable_db_options_.write_buffer_manager->enabled()) {
1798 bsize = std::min<size_t>(
1799 bsize, immutable_db_options_.write_buffer_manager->buffer_size());
1800 }
1801
1802 return bsize;
1803 }
1804
1805 // Default implementations of convenience methods that subclasses of DB
1806 // can call if they wish
Put(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1807 Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1808 const Slice& key, const Slice& value) {
1809 if (nullptr == opt.timestamp) {
1810 // Pre-allocate size of write batch conservatively.
1811 // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1812 // and we allocate 11 extra bytes for key length, as well as value length.
1813 WriteBatch batch(key.size() + value.size() + 24);
1814 Status s = batch.Put(column_family, key, value);
1815 if (!s.ok()) {
1816 return s;
1817 }
1818 return Write(opt, &batch);
1819 }
1820 const Slice* ts = opt.timestamp;
1821 assert(nullptr != ts);
1822 size_t ts_sz = ts->size();
1823 WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
1824 ts_sz);
1825 Status s = batch.Put(column_family, key, value);
1826 if (!s.ok()) {
1827 return s;
1828 }
1829 s = batch.AssignTimestamp(*ts);
1830 if (!s.ok()) {
1831 return s;
1832 }
1833 return Write(opt, &batch);
1834 }
1835
Delete(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key)1836 Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1837 const Slice& key) {
1838 WriteBatch batch;
1839 batch.Delete(column_family, key);
1840 return Write(opt, &batch);
1841 }
1842
SingleDelete(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key)1843 Status DB::SingleDelete(const WriteOptions& opt,
1844 ColumnFamilyHandle* column_family, const Slice& key) {
1845 WriteBatch batch;
1846 batch.SingleDelete(column_family, key);
1847 return Write(opt, &batch);
1848 }
1849
DeleteRange(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1850 Status DB::DeleteRange(const WriteOptions& opt,
1851 ColumnFamilyHandle* column_family,
1852 const Slice& begin_key, const Slice& end_key) {
1853 WriteBatch batch;
1854 batch.DeleteRange(column_family, begin_key, end_key);
1855 return Write(opt, &batch);
1856 }
1857
Merge(const WriteOptions & opt,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1858 Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
1859 const Slice& key, const Slice& value) {
1860 WriteBatch batch;
1861 Status s = batch.Merge(column_family, key, value);
1862 if (!s.ok()) {
1863 return s;
1864 }
1865 return Write(opt, &batch);
1866 }
1867 } // namespace ROCKSDB_NAMESPACE
1868