1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "rocksdb/utilities/write_batch_with_index.h"
9
10 #include <memory>
11
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "db/merge_context.h"
15 #include "db/merge_helper.h"
16 #include "memory/arena.h"
17 #include "memtable/skiplist.h"
18 #include "options/db_options.h"
19 #include "rocksdb/comparator.h"
20 #include "rocksdb/iterator.h"
21 #include "util/cast_util.h"
22 #include "util/string_util.h"
23 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
27 // when direction == forward
28 // * current_at_base_ <=> base_iterator > delta_iterator
29 // when direction == backwards
30 // * current_at_base_ <=> base_iterator < delta_iterator
31 // always:
32 // * equal_keys_ <=> base_iterator == delta_iterator
33 class BaseDeltaIterator : public Iterator {
34 public:
BaseDeltaIterator(Iterator * base_iterator,WBWIIterator * delta_iterator,const Comparator * comparator,const ReadOptions * read_options=nullptr)35 BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
36 const Comparator* comparator,
37 const ReadOptions* read_options = nullptr)
38 : forward_(true),
39 current_at_base_(true),
40 equal_keys_(false),
41 status_(Status::OK()),
42 base_iterator_(base_iterator),
43 delta_iterator_(delta_iterator),
44 comparator_(comparator),
45 iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
46 : nullptr) {}
47
~BaseDeltaIterator()48 ~BaseDeltaIterator() override {}
49
Valid() const50 bool Valid() const override {
51 return current_at_base_ ? BaseValid() : DeltaValid();
52 }
53
SeekToFirst()54 void SeekToFirst() override {
55 forward_ = true;
56 base_iterator_->SeekToFirst();
57 delta_iterator_->SeekToFirst();
58 UpdateCurrent();
59 }
60
SeekToLast()61 void SeekToLast() override {
62 forward_ = false;
63 base_iterator_->SeekToLast();
64 delta_iterator_->SeekToLast();
65 UpdateCurrent();
66 }
67
Seek(const Slice & k)68 void Seek(const Slice& k) override {
69 forward_ = true;
70 base_iterator_->Seek(k);
71 delta_iterator_->Seek(k);
72 UpdateCurrent();
73 }
74
SeekForPrev(const Slice & k)75 void SeekForPrev(const Slice& k) override {
76 forward_ = false;
77 base_iterator_->SeekForPrev(k);
78 delta_iterator_->SeekForPrev(k);
79 UpdateCurrent();
80 }
81
Next()82 void Next() override {
83 if (!Valid()) {
84 status_ = Status::NotSupported("Next() on invalid iterator");
85 return;
86 }
87
88 if (!forward_) {
89 // Need to change direction
90 // if our direction was backward and we're not equal, we have two states:
91 // * both iterators are valid: we're already in a good state (current
92 // shows to smaller)
93 // * only one iterator is valid: we need to advance that iterator
94 forward_ = true;
95 equal_keys_ = false;
96 if (!BaseValid()) {
97 assert(DeltaValid());
98 base_iterator_->SeekToFirst();
99 } else if (!DeltaValid()) {
100 delta_iterator_->SeekToFirst();
101 } else if (current_at_base_) {
102 // Change delta from larger than base to smaller
103 AdvanceDelta();
104 } else {
105 // Change base from larger than delta to smaller
106 AdvanceBase();
107 }
108 if (DeltaValid() && BaseValid()) {
109 if (comparator_->Equal(delta_iterator_->Entry().key,
110 base_iterator_->key())) {
111 equal_keys_ = true;
112 }
113 }
114 }
115 Advance();
116 }
117
Prev()118 void Prev() override {
119 if (!Valid()) {
120 status_ = Status::NotSupported("Prev() on invalid iterator");
121 return;
122 }
123
124 if (forward_) {
125 // Need to change direction
126 // if our direction was backward and we're not equal, we have two states:
127 // * both iterators are valid: we're already in a good state (current
128 // shows to smaller)
129 // * only one iterator is valid: we need to advance that iterator
130 forward_ = false;
131 equal_keys_ = false;
132 if (!BaseValid()) {
133 assert(DeltaValid());
134 base_iterator_->SeekToLast();
135 } else if (!DeltaValid()) {
136 delta_iterator_->SeekToLast();
137 } else if (current_at_base_) {
138 // Change delta from less advanced than base to more advanced
139 AdvanceDelta();
140 } else {
141 // Change base from less advanced than delta to more advanced
142 AdvanceBase();
143 }
144 if (DeltaValid() && BaseValid()) {
145 if (comparator_->Equal(delta_iterator_->Entry().key,
146 base_iterator_->key())) {
147 equal_keys_ = true;
148 }
149 }
150 }
151
152 Advance();
153 }
154
key() const155 Slice key() const override {
156 return current_at_base_ ? base_iterator_->key()
157 : delta_iterator_->Entry().key;
158 }
159
value() const160 Slice value() const override {
161 return current_at_base_ ? base_iterator_->value()
162 : delta_iterator_->Entry().value;
163 }
164
status() const165 Status status() const override {
166 if (!status_.ok()) {
167 return status_;
168 }
169 if (!base_iterator_->status().ok()) {
170 return base_iterator_->status();
171 }
172 return delta_iterator_->status();
173 }
174
175 private:
AssertInvariants()176 void AssertInvariants() {
177 #ifndef NDEBUG
178 bool not_ok = false;
179 if (!base_iterator_->status().ok()) {
180 assert(!base_iterator_->Valid());
181 not_ok = true;
182 }
183 if (!delta_iterator_->status().ok()) {
184 assert(!delta_iterator_->Valid());
185 not_ok = true;
186 }
187 if (not_ok) {
188 assert(!Valid());
189 assert(!status().ok());
190 return;
191 }
192
193 if (!Valid()) {
194 return;
195 }
196 if (!BaseValid()) {
197 assert(!current_at_base_ && delta_iterator_->Valid());
198 return;
199 }
200 if (!DeltaValid()) {
201 assert(current_at_base_ && base_iterator_->Valid());
202 return;
203 }
204 // we don't support those yet
205 assert(delta_iterator_->Entry().type != kMergeRecord &&
206 delta_iterator_->Entry().type != kLogDataRecord);
207 int compare = comparator_->Compare(delta_iterator_->Entry().key,
208 base_iterator_->key());
209 if (forward_) {
210 // current_at_base -> compare < 0
211 assert(!current_at_base_ || compare < 0);
212 // !current_at_base -> compare <= 0
213 assert(current_at_base_ && compare >= 0);
214 } else {
215 // current_at_base -> compare > 0
216 assert(!current_at_base_ || compare > 0);
217 // !current_at_base -> compare <= 0
218 assert(current_at_base_ && compare <= 0);
219 }
220 // equal_keys_ <=> compare == 0
221 assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
222 #endif
223 }
224
Advance()225 void Advance() {
226 if (equal_keys_) {
227 assert(BaseValid() && DeltaValid());
228 AdvanceBase();
229 AdvanceDelta();
230 } else {
231 if (current_at_base_) {
232 assert(BaseValid());
233 AdvanceBase();
234 } else {
235 assert(DeltaValid());
236 AdvanceDelta();
237 }
238 }
239 UpdateCurrent();
240 }
241
AdvanceDelta()242 void AdvanceDelta() {
243 if (forward_) {
244 delta_iterator_->Next();
245 } else {
246 delta_iterator_->Prev();
247 }
248 }
AdvanceBase()249 void AdvanceBase() {
250 if (forward_) {
251 base_iterator_->Next();
252 } else {
253 base_iterator_->Prev();
254 }
255 }
BaseValid() const256 bool BaseValid() const { return base_iterator_->Valid(); }
DeltaValid() const257 bool DeltaValid() const { return delta_iterator_->Valid(); }
UpdateCurrent()258 void UpdateCurrent() {
259 // Suppress false positive clang analyzer warnings.
260 #ifndef __clang_analyzer__
261 status_ = Status::OK();
262 while (true) {
263 WriteEntry delta_entry;
264 if (DeltaValid()) {
265 assert(delta_iterator_->status().ok());
266 delta_entry = delta_iterator_->Entry();
267 } else if (!delta_iterator_->status().ok()) {
268 // Expose the error status and stop.
269 current_at_base_ = false;
270 return;
271 }
272 equal_keys_ = false;
273 if (!BaseValid()) {
274 if (!base_iterator_->status().ok()) {
275 // Expose the error status and stop.
276 current_at_base_ = true;
277 return;
278 }
279
280 // Base has finished.
281 if (!DeltaValid()) {
282 // Finished
283 return;
284 }
285 if (iterate_upper_bound_) {
286 if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >=
287 0) {
288 // out of upper bound -> finished.
289 return;
290 }
291 }
292 if (delta_entry.type == kDeleteRecord ||
293 delta_entry.type == kSingleDeleteRecord) {
294 AdvanceDelta();
295 } else {
296 current_at_base_ = false;
297 return;
298 }
299 } else if (!DeltaValid()) {
300 // Delta has finished.
301 current_at_base_ = true;
302 return;
303 } else {
304 int compare =
305 (forward_ ? 1 : -1) *
306 comparator_->Compare(delta_entry.key, base_iterator_->key());
307 if (compare <= 0) { // delta bigger or equal
308 if (compare == 0) {
309 equal_keys_ = true;
310 }
311 if (delta_entry.type != kDeleteRecord &&
312 delta_entry.type != kSingleDeleteRecord) {
313 current_at_base_ = false;
314 return;
315 }
316 // Delta is less advanced and is delete.
317 AdvanceDelta();
318 if (equal_keys_) {
319 AdvanceBase();
320 }
321 } else {
322 current_at_base_ = true;
323 return;
324 }
325 }
326 }
327
328 AssertInvariants();
329 #endif // __clang_analyzer__
330 }
331
332 bool forward_;
333 bool current_at_base_;
334 bool equal_keys_;
335 Status status_;
336 std::unique_ptr<Iterator> base_iterator_;
337 std::unique_ptr<WBWIIterator> delta_iterator_;
338 const Comparator* comparator_; // not owned
339 const Slice* iterate_upper_bound_;
340 };
341
342 typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
343 WriteBatchEntrySkipList;
344
345 class WBWIIteratorImpl : public WBWIIterator {
346 public:
WBWIIteratorImpl(uint32_t column_family_id,WriteBatchEntrySkipList * skip_list,const ReadableWriteBatch * write_batch)347 WBWIIteratorImpl(uint32_t column_family_id,
348 WriteBatchEntrySkipList* skip_list,
349 const ReadableWriteBatch* write_batch)
350 : column_family_id_(column_family_id),
351 skip_list_iter_(skip_list),
352 write_batch_(write_batch) {}
353
~WBWIIteratorImpl()354 ~WBWIIteratorImpl() override {}
355
Valid() const356 bool Valid() const override {
357 if (!skip_list_iter_.Valid()) {
358 return false;
359 }
360 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
361 return (iter_entry != nullptr &&
362 iter_entry->column_family == column_family_id_);
363 }
364
SeekToFirst()365 void SeekToFirst() override {
366 WriteBatchIndexEntry search_entry(
367 nullptr /* search_key */, column_family_id_,
368 true /* is_forward_direction */, true /* is_seek_to_first */);
369 skip_list_iter_.Seek(&search_entry);
370 }
371
SeekToLast()372 void SeekToLast() override {
373 WriteBatchIndexEntry search_entry(
374 nullptr /* search_key */, column_family_id_ + 1,
375 true /* is_forward_direction */, true /* is_seek_to_first */);
376 skip_list_iter_.Seek(&search_entry);
377 if (!skip_list_iter_.Valid()) {
378 skip_list_iter_.SeekToLast();
379 } else {
380 skip_list_iter_.Prev();
381 }
382 }
383
Seek(const Slice & key)384 void Seek(const Slice& key) override {
385 WriteBatchIndexEntry search_entry(&key, column_family_id_,
386 true /* is_forward_direction */,
387 false /* is_seek_to_first */);
388 skip_list_iter_.Seek(&search_entry);
389 }
390
SeekForPrev(const Slice & key)391 void SeekForPrev(const Slice& key) override {
392 WriteBatchIndexEntry search_entry(&key, column_family_id_,
393 false /* is_forward_direction */,
394 false /* is_seek_to_first */);
395 skip_list_iter_.SeekForPrev(&search_entry);
396 }
397
Next()398 void Next() override { skip_list_iter_.Next(); }
399
Prev()400 void Prev() override { skip_list_iter_.Prev(); }
401
Entry() const402 WriteEntry Entry() const override {
403 WriteEntry ret;
404 Slice blob, xid;
405 const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
406 // this is guaranteed with Valid()
407 assert(iter_entry != nullptr &&
408 iter_entry->column_family == column_family_id_);
409 auto s = write_batch_->GetEntryFromDataOffset(
410 iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
411 assert(s.ok());
412 assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
413 ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
414 ret.type == kMergeRecord);
415 return ret;
416 }
417
status() const418 Status status() const override {
419 // this is in-memory data structure, so the only way status can be non-ok is
420 // through memory corruption
421 return Status::OK();
422 }
423
GetRawEntry() const424 const WriteBatchIndexEntry* GetRawEntry() const {
425 return skip_list_iter_.key();
426 }
427
428 private:
429 uint32_t column_family_id_;
430 WriteBatchEntrySkipList::Iterator skip_list_iter_;
431 const ReadableWriteBatch* write_batch_;
432 };
433
434 struct WriteBatchWithIndex::Rep {
RepROCKSDB_NAMESPACE::WriteBatchWithIndex::Rep435 explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
436 size_t max_bytes = 0, bool _overwrite_key = false)
437 : write_batch(reserved_bytes, max_bytes),
438 comparator(index_comparator, &write_batch),
439 skip_list(comparator, &arena),
440 overwrite_key(_overwrite_key),
441 last_entry_offset(0),
442 last_sub_batch_offset(0),
443 sub_batch_cnt(1) {}
444 ReadableWriteBatch write_batch;
445 WriteBatchEntryComparator comparator;
446 Arena arena;
447 WriteBatchEntrySkipList skip_list;
448 bool overwrite_key;
449 size_t last_entry_offset;
450 // The starting offset of the last sub-batch. A sub-batch starts right before
451 // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
452 // the default, means that no duplicate key is detected so far.
453 size_t last_sub_batch_offset;
454 // Total number of sub-batches in the write batch. Default is 1.
455 size_t sub_batch_cnt;
456
457 // Remember current offset of internal write batch, which is used as
458 // the starting offset of the next record.
SetLastEntryOffsetROCKSDB_NAMESPACE::WriteBatchWithIndex::Rep459 void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
460
461 // In overwrite mode, find the existing entry for the same key and update it
462 // to point to the current entry.
463 // Return true if the key is found and updated.
464 bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
465 bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
466
467 // Add the recent entry to the update.
468 // In overwrite mode, if key already exists in the index, update it.
469 void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
470 void AddOrUpdateIndex(const Slice& key);
471
472 // Allocate an index entry pointing to the last entry in the write batch and
473 // put it to skip list.
474 void AddNewEntry(uint32_t column_family_id);
475
476 // Clear all updates buffered in this batch.
477 void Clear();
478 void ClearIndex();
479
480 // Rebuild index by reading all records from the batch.
481 // Returns non-ok status on corruption.
482 Status ReBuildIndex();
483 };
484
UpdateExistingEntry(ColumnFamilyHandle * column_family,const Slice & key)485 bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
486 ColumnFamilyHandle* column_family, const Slice& key) {
487 uint32_t cf_id = GetColumnFamilyID(column_family);
488 return UpdateExistingEntryWithCfId(cf_id, key);
489 }
490
UpdateExistingEntryWithCfId(uint32_t column_family_id,const Slice & key)491 bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
492 uint32_t column_family_id, const Slice& key) {
493 if (!overwrite_key) {
494 return false;
495 }
496
497 WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
498 iter.Seek(key);
499 if (!iter.Valid()) {
500 return false;
501 }
502 if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
503 return false;
504 }
505 WriteBatchIndexEntry* non_const_entry =
506 const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
507 if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
508 last_sub_batch_offset = last_entry_offset;
509 sub_batch_cnt++;
510 }
511 non_const_entry->offset = last_entry_offset;
512 return true;
513 }
514
AddOrUpdateIndex(ColumnFamilyHandle * column_family,const Slice & key)515 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
516 ColumnFamilyHandle* column_family, const Slice& key) {
517 if (!UpdateExistingEntry(column_family, key)) {
518 uint32_t cf_id = GetColumnFamilyID(column_family);
519 const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
520 if (cf_cmp != nullptr) {
521 comparator.SetComparatorForCF(cf_id, cf_cmp);
522 }
523 AddNewEntry(cf_id);
524 }
525 }
526
AddOrUpdateIndex(const Slice & key)527 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
528 if (!UpdateExistingEntryWithCfId(0, key)) {
529 AddNewEntry(0);
530 }
531 }
532
AddNewEntry(uint32_t column_family_id)533 void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
534 const std::string& wb_data = write_batch.Data();
535 Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
536 wb_data.size() - last_entry_offset);
537 // Extract key
538 Slice key;
539 bool success __attribute__((__unused__));
540 success =
541 ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
542 assert(success);
543
544 auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
545 auto* index_entry =
546 new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
547 key.data() - wb_data.data(), key.size());
548 skip_list.Insert(index_entry);
549 }
550
Clear()551 void WriteBatchWithIndex::Rep::Clear() {
552 write_batch.Clear();
553 ClearIndex();
554 }
555
ClearIndex()556 void WriteBatchWithIndex::Rep::ClearIndex() {
557 skip_list.~WriteBatchEntrySkipList();
558 arena.~Arena();
559 new (&arena) Arena();
560 new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
561 last_entry_offset = 0;
562 last_sub_batch_offset = 0;
563 sub_batch_cnt = 1;
564 }
565
ReBuildIndex()566 Status WriteBatchWithIndex::Rep::ReBuildIndex() {
567 Status s;
568
569 ClearIndex();
570
571 if (write_batch.Count() == 0) {
572 // Nothing to re-index
573 return s;
574 }
575
576 size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
577
578 Slice input(write_batch.Data());
579 input.remove_prefix(offset);
580
581 // Loop through all entries in Rep and add each one to the index
582 uint32_t found = 0;
583 while (s.ok() && !input.empty()) {
584 Slice key, value, blob, xid;
585 uint32_t column_family_id = 0; // default
586 char tag = 0;
587
588 // set offset of current entry for call to AddNewEntry()
589 last_entry_offset = input.data() - write_batch.Data().data();
590
591 s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
592 &value, &blob, &xid);
593 if (!s.ok()) {
594 break;
595 }
596
597 switch (tag) {
598 case kTypeColumnFamilyValue:
599 case kTypeValue:
600 case kTypeColumnFamilyDeletion:
601 case kTypeDeletion:
602 case kTypeColumnFamilySingleDeletion:
603 case kTypeSingleDeletion:
604 case kTypeColumnFamilyMerge:
605 case kTypeMerge:
606 found++;
607 if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
608 AddNewEntry(column_family_id);
609 }
610 break;
611 case kTypeLogData:
612 case kTypeBeginPrepareXID:
613 case kTypeBeginPersistedPrepareXID:
614 case kTypeBeginUnprepareXID:
615 case kTypeEndPrepareXID:
616 case kTypeCommitXID:
617 case kTypeRollbackXID:
618 case kTypeNoop:
619 break;
620 default:
621 return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
622 ToString(static_cast<unsigned int>(tag)));
623 }
624 }
625
626 if (s.ok() && found != write_batch.Count()) {
627 s = Status::Corruption("WriteBatch has wrong count");
628 }
629
630 return s;
631 }
632
WriteBatchWithIndex(const Comparator * default_index_comparator,size_t reserved_bytes,bool overwrite_key,size_t max_bytes)633 WriteBatchWithIndex::WriteBatchWithIndex(
634 const Comparator* default_index_comparator, size_t reserved_bytes,
635 bool overwrite_key, size_t max_bytes)
636 : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
637 overwrite_key)) {}
638
~WriteBatchWithIndex()639 WriteBatchWithIndex::~WriteBatchWithIndex() {}
640
641 WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
642
643 WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
644 default;
645
GetWriteBatch()646 WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
647
SubBatchCnt()648 size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
649
NewIterator()650 WBWIIterator* WriteBatchWithIndex::NewIterator() {
651 return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
652 }
653
NewIterator(ColumnFamilyHandle * column_family)654 WBWIIterator* WriteBatchWithIndex::NewIterator(
655 ColumnFamilyHandle* column_family) {
656 return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
657 &(rep->skip_list), &rep->write_batch);
658 }
659
NewIteratorWithBase(ColumnFamilyHandle * column_family,Iterator * base_iterator,const ReadOptions * read_options)660 Iterator* WriteBatchWithIndex::NewIteratorWithBase(
661 ColumnFamilyHandle* column_family, Iterator* base_iterator,
662 const ReadOptions* read_options) {
663 if (rep->overwrite_key == false) {
664 assert(false);
665 return nullptr;
666 }
667 return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
668 GetColumnFamilyUserComparator(column_family),
669 read_options);
670 }
671
NewIteratorWithBase(Iterator * base_iterator)672 Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
673 if (rep->overwrite_key == false) {
674 assert(false);
675 return nullptr;
676 }
677 // default column family's comparator
678 return new BaseDeltaIterator(base_iterator, NewIterator(),
679 rep->comparator.default_comparator());
680 }
681
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)682 Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
683 const Slice& key, const Slice& value) {
684 rep->SetLastEntryOffset();
685 auto s = rep->write_batch.Put(column_family, key, value);
686 if (s.ok()) {
687 rep->AddOrUpdateIndex(column_family, key);
688 }
689 return s;
690 }
691
Put(const Slice & key,const Slice & value)692 Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
693 rep->SetLastEntryOffset();
694 auto s = rep->write_batch.Put(key, value);
695 if (s.ok()) {
696 rep->AddOrUpdateIndex(key);
697 }
698 return s;
699 }
700
Delete(ColumnFamilyHandle * column_family,const Slice & key)701 Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
702 const Slice& key) {
703 rep->SetLastEntryOffset();
704 auto s = rep->write_batch.Delete(column_family, key);
705 if (s.ok()) {
706 rep->AddOrUpdateIndex(column_family, key);
707 }
708 return s;
709 }
710
Delete(const Slice & key)711 Status WriteBatchWithIndex::Delete(const Slice& key) {
712 rep->SetLastEntryOffset();
713 auto s = rep->write_batch.Delete(key);
714 if (s.ok()) {
715 rep->AddOrUpdateIndex(key);
716 }
717 return s;
718 }
719
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)720 Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
721 const Slice& key) {
722 rep->SetLastEntryOffset();
723 auto s = rep->write_batch.SingleDelete(column_family, key);
724 if (s.ok()) {
725 rep->AddOrUpdateIndex(column_family, key);
726 }
727 return s;
728 }
729
SingleDelete(const Slice & key)730 Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
731 rep->SetLastEntryOffset();
732 auto s = rep->write_batch.SingleDelete(key);
733 if (s.ok()) {
734 rep->AddOrUpdateIndex(key);
735 }
736 return s;
737 }
738
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)739 Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
740 const Slice& key, const Slice& value) {
741 rep->SetLastEntryOffset();
742 auto s = rep->write_batch.Merge(column_family, key, value);
743 if (s.ok()) {
744 rep->AddOrUpdateIndex(column_family, key);
745 }
746 return s;
747 }
748
Merge(const Slice & key,const Slice & value)749 Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
750 rep->SetLastEntryOffset();
751 auto s = rep->write_batch.Merge(key, value);
752 if (s.ok()) {
753 rep->AddOrUpdateIndex(key);
754 }
755 return s;
756 }
757
PutLogData(const Slice & blob)758 Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
759 return rep->write_batch.PutLogData(blob);
760 }
761
Clear()762 void WriteBatchWithIndex::Clear() { rep->Clear(); }
763
GetFromBatch(ColumnFamilyHandle * column_family,const DBOptions & options,const Slice & key,std::string * value)764 Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
765 const DBOptions& options,
766 const Slice& key, std::string* value) {
767 Status s;
768 MergeContext merge_context;
769 const ImmutableDBOptions immuable_db_options(options);
770
771 WriteBatchWithIndexInternal::Result result =
772 WriteBatchWithIndexInternal::GetFromBatch(
773 immuable_db_options, this, column_family, key, &merge_context,
774 &rep->comparator, value, rep->overwrite_key, &s);
775
776 switch (result) {
777 case WriteBatchWithIndexInternal::Result::kFound:
778 case WriteBatchWithIndexInternal::Result::kError:
779 // use returned status
780 break;
781 case WriteBatchWithIndexInternal::Result::kDeleted:
782 case WriteBatchWithIndexInternal::Result::kNotFound:
783 s = Status::NotFound();
784 break;
785 case WriteBatchWithIndexInternal::Result::kMergeInProgress:
786 s = Status::MergeInProgress();
787 break;
788 default:
789 assert(false);
790 }
791
792 return s;
793 }
794
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,const Slice & key,std::string * value)795 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
796 const ReadOptions& read_options,
797 const Slice& key,
798 std::string* value) {
799 assert(value != nullptr);
800 PinnableSlice pinnable_val(value);
801 assert(!pinnable_val.IsPinned());
802 auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
803 &pinnable_val);
804 if (s.ok() && pinnable_val.IsPinned()) {
805 value->assign(pinnable_val.data(), pinnable_val.size());
806 } // else value is already assigned
807 return s;
808 }
809
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,const Slice & key,PinnableSlice * pinnable_val)810 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
811 const ReadOptions& read_options,
812 const Slice& key,
813 PinnableSlice* pinnable_val) {
814 return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
815 pinnable_val);
816 }
817
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value)818 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
819 const ReadOptions& read_options,
820 ColumnFamilyHandle* column_family,
821 const Slice& key,
822 std::string* value) {
823 assert(value != nullptr);
824 PinnableSlice pinnable_val(value);
825 assert(!pinnable_val.IsPinned());
826 auto s =
827 GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
828 if (s.ok() && pinnable_val.IsPinned()) {
829 value->assign(pinnable_val.data(), pinnable_val.size());
830 } // else value is already assigned
831 return s;
832 }
833
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)834 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
835 const ReadOptions& read_options,
836 ColumnFamilyHandle* column_family,
837 const Slice& key,
838 PinnableSlice* pinnable_val) {
839 return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
840 nullptr);
841 }
842
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val,ReadCallback * callback)843 Status WriteBatchWithIndex::GetFromBatchAndDB(
844 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
845 const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
846 Status s;
847 MergeContext merge_context;
848 const ImmutableDBOptions& immuable_db_options =
849 static_cast_with_check<DBImpl, DB>(db->GetRootDB())
850 ->immutable_db_options();
851
852 // Since the lifetime of the WriteBatch is the same as that of the transaction
853 // we cannot pin it as otherwise the returned value will not be available
854 // after the transaction finishes.
855 std::string& batch_value = *pinnable_val->GetSelf();
856 WriteBatchWithIndexInternal::Result result =
857 WriteBatchWithIndexInternal::GetFromBatch(
858 immuable_db_options, this, column_family, key, &merge_context,
859 &rep->comparator, &batch_value, rep->overwrite_key, &s);
860
861 if (result == WriteBatchWithIndexInternal::Result::kFound) {
862 pinnable_val->PinSelf();
863 return s;
864 }
865 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
866 return Status::NotFound();
867 }
868 if (result == WriteBatchWithIndexInternal::Result::kError) {
869 return s;
870 }
871 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
872 rep->overwrite_key == true) {
873 // Since we've overwritten keys, we do not know what other operations are
874 // in this batch for this key, so we cannot do a Merge to compute the
875 // result. Instead, we will simply return MergeInProgress.
876 return Status::MergeInProgress();
877 }
878
879 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
880 result == WriteBatchWithIndexInternal::Result::kNotFound);
881
882 // Did not find key in batch OR could not resolve Merges. Try DB.
883 if (!callback) {
884 s = db->Get(read_options, column_family, key, pinnable_val);
885 } else {
886 DBImpl::GetImplOptions get_impl_options;
887 get_impl_options.column_family = column_family;
888 get_impl_options.value = pinnable_val;
889 get_impl_options.callback = callback;
890 s = static_cast_with_check<DBImpl, DB>(db->GetRootDB())
891 ->GetImpl(read_options, key, get_impl_options);
892 }
893
894 if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
895 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
896 // Merge result from DB with merges in Batch
897 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
898 const MergeOperator* merge_operator =
899 cfh->cfd()->ioptions()->merge_operator;
900 Statistics* statistics = immuable_db_options.statistics.get();
901 Env* env = immuable_db_options.env;
902 Logger* logger = immuable_db_options.info_log.get();
903
904 Slice* merge_data;
905 if (s.ok()) {
906 merge_data = pinnable_val;
907 } else { // Key not present in db (s.IsNotFound())
908 merge_data = nullptr;
909 }
910
911 if (merge_operator) {
912 std::string merge_result;
913 s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
914 merge_context.GetOperands(),
915 &merge_result, logger, statistics, env);
916 pinnable_val->Reset();
917 *pinnable_val->GetSelf() = std::move(merge_result);
918 pinnable_val->PinSelf();
919 } else {
920 s = Status::InvalidArgument("Options::merge_operator must be set");
921 }
922 }
923 }
924
925 return s;
926 }
927
MultiGetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,bool sorted_input)928 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
929 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
930 const size_t num_keys, const Slice* keys, PinnableSlice* values,
931 Status* statuses, bool sorted_input) {
932 MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
933 values, statuses, sorted_input, nullptr);
934 }
935
MultiGetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,bool sorted_input,ReadCallback * callback)936 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
937 DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
938 const size_t num_keys, const Slice* keys, PinnableSlice* values,
939 Status* statuses, bool sorted_input, ReadCallback* callback) {
940 const ImmutableDBOptions& immuable_db_options =
941 static_cast_with_check<DBImpl, DB>(db->GetRootDB())
942 ->immutable_db_options();
943
944 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
945 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
946 // To hold merges from the write batch
947 autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
948 MultiGetContext::MAX_BATCH_SIZE>
949 merges;
950 // Since the lifetime of the WriteBatch is the same as that of the transaction
951 // we cannot pin it as otherwise the returned value will not be available
952 // after the transaction finishes.
953 for (size_t i = 0; i < num_keys; ++i) {
954 MergeContext merge_context;
955 PinnableSlice* pinnable_val = &values[i];
956 std::string& batch_value = *pinnable_val->GetSelf();
957 Status* s = &statuses[i];
958 WriteBatchWithIndexInternal::Result result =
959 WriteBatchWithIndexInternal::GetFromBatch(
960 immuable_db_options, this, column_family, keys[i], &merge_context,
961 &rep->comparator, &batch_value, rep->overwrite_key, s);
962
963 if (result == WriteBatchWithIndexInternal::Result::kFound) {
964 pinnable_val->PinSelf();
965 continue;
966 }
967 if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
968 *s = Status::NotFound();
969 continue;
970 }
971 if (result == WriteBatchWithIndexInternal::Result::kError) {
972 continue;
973 }
974 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
975 rep->overwrite_key == true) {
976 // Since we've overwritten keys, we do not know what other operations are
977 // in this batch for this key, so we cannot do a Merge to compute the
978 // result. Instead, we will simply return MergeInProgress.
979 *s = Status::MergeInProgress();
980 continue;
981 }
982
983 assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
984 result == WriteBatchWithIndexInternal::Result::kNotFound);
985 key_context.emplace_back(column_family, keys[i], &values[i],
986 /*timestamp*/ nullptr, &statuses[i]);
987 merges.emplace_back(result, std::move(merge_context));
988 }
989
990 for (KeyContext& key : key_context) {
991 sorted_keys.emplace_back(&key);
992 }
993
994 // Did not find key in batch OR could not resolve Merges. Try DB.
995 static_cast_with_check<DBImpl, DB>(db->GetRootDB())
996 ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
997 static_cast_with_check<DBImpl, DB>(db->GetRootDB())
998 ->MultiGetWithCallback(read_options, column_family, callback,
999 &sorted_keys);
1000
1001 ColumnFamilyHandleImpl* cfh =
1002 reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1003 const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
1004 for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
1005 KeyContext& key = *iter;
1006 if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
1007 size_t index = iter - key_context.begin();
1008 std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
1009 merge_result = merges[index];
1010 if (merge_result.first ==
1011 WriteBatchWithIndexInternal::Result::kMergeInProgress) {
1012 // Merge result from DB with merges in Batch
1013 Statistics* statistics = immuable_db_options.statistics.get();
1014 Env* env = immuable_db_options.env;
1015 Logger* logger = immuable_db_options.info_log.get();
1016
1017 Slice* merge_data;
1018 if (key.s->ok()) {
1019 merge_data = iter->value;
1020 } else { // Key not present in db (s.IsNotFound())
1021 merge_data = nullptr;
1022 }
1023
1024 if (merge_operator) {
1025 *key.s = MergeHelper::TimedFullMerge(
1026 merge_operator, *key.key, merge_data,
1027 merge_result.second.GetOperands(), key.value->GetSelf(), logger,
1028 statistics, env);
1029 key.value->PinSelf();
1030 } else {
1031 *key.s =
1032 Status::InvalidArgument("Options::merge_operator must be set");
1033 }
1034 }
1035 }
1036 }
1037 }
1038
SetSavePoint()1039 void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
1040
RollbackToSavePoint()1041 Status WriteBatchWithIndex::RollbackToSavePoint() {
1042 Status s = rep->write_batch.RollbackToSavePoint();
1043
1044 if (s.ok()) {
1045 rep->sub_batch_cnt = 1;
1046 rep->last_sub_batch_offset = 0;
1047 s = rep->ReBuildIndex();
1048 }
1049
1050 return s;
1051 }
1052
PopSavePoint()1053 Status WriteBatchWithIndex::PopSavePoint() {
1054 return rep->write_batch.PopSavePoint();
1055 }
1056
SetMaxBytes(size_t max_bytes)1057 void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
1058 rep->write_batch.SetMaxBytes(max_bytes);
1059 }
1060
GetDataSize() const1061 size_t WriteBatchWithIndex::GetDataSize() const {
1062 return rep->write_batch.GetDataSize();
1063 }
1064
1065 } // namespace ROCKSDB_NAMESPACE
1066 #endif // !ROCKSDB_LITE
1067