1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "rocksdb/utilities/write_batch_with_index.h"
9 
10 #include <memory>
11 
12 #include "db/column_family.h"
13 #include "db/db_impl/db_impl.h"
14 #include "db/merge_context.h"
15 #include "db/merge_helper.h"
16 #include "memory/arena.h"
17 #include "memtable/skiplist.h"
18 #include "options/db_options.h"
19 #include "rocksdb/comparator.h"
20 #include "rocksdb/iterator.h"
21 #include "util/cast_util.h"
22 #include "util/string_util.h"
23 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
24 
25 namespace ROCKSDB_NAMESPACE {
26 
27 // when direction == forward
28 // * current_at_base_ <=> base_iterator > delta_iterator
29 // when direction == backwards
30 // * current_at_base_ <=> base_iterator < delta_iterator
31 // always:
32 // * equal_keys_ <=> base_iterator == delta_iterator
33 class BaseDeltaIterator : public Iterator {
34  public:
BaseDeltaIterator(Iterator * base_iterator,WBWIIterator * delta_iterator,const Comparator * comparator,const ReadOptions * read_options=nullptr)35   BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
36                     const Comparator* comparator,
37                     const ReadOptions* read_options = nullptr)
38       : forward_(true),
39         current_at_base_(true),
40         equal_keys_(false),
41         status_(Status::OK()),
42         base_iterator_(base_iterator),
43         delta_iterator_(delta_iterator),
44         comparator_(comparator),
45         iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
46                                           : nullptr) {}
47 
~BaseDeltaIterator()48   ~BaseDeltaIterator() override {}
49 
Valid() const50   bool Valid() const override {
51     return current_at_base_ ? BaseValid() : DeltaValid();
52   }
53 
SeekToFirst()54   void SeekToFirst() override {
55     forward_ = true;
56     base_iterator_->SeekToFirst();
57     delta_iterator_->SeekToFirst();
58     UpdateCurrent();
59   }
60 
SeekToLast()61   void SeekToLast() override {
62     forward_ = false;
63     base_iterator_->SeekToLast();
64     delta_iterator_->SeekToLast();
65     UpdateCurrent();
66   }
67 
Seek(const Slice & k)68   void Seek(const Slice& k) override {
69     forward_ = true;
70     base_iterator_->Seek(k);
71     delta_iterator_->Seek(k);
72     UpdateCurrent();
73   }
74 
SeekForPrev(const Slice & k)75   void SeekForPrev(const Slice& k) override {
76     forward_ = false;
77     base_iterator_->SeekForPrev(k);
78     delta_iterator_->SeekForPrev(k);
79     UpdateCurrent();
80   }
81 
Next()82   void Next() override {
83     if (!Valid()) {
84       status_ = Status::NotSupported("Next() on invalid iterator");
85       return;
86     }
87 
88     if (!forward_) {
89       // Need to change direction
90       // if our direction was backward and we're not equal, we have two states:
91       // * both iterators are valid: we're already in a good state (current
92       // shows to smaller)
93       // * only one iterator is valid: we need to advance that iterator
94       forward_ = true;
95       equal_keys_ = false;
96       if (!BaseValid()) {
97         assert(DeltaValid());
98         base_iterator_->SeekToFirst();
99       } else if (!DeltaValid()) {
100         delta_iterator_->SeekToFirst();
101       } else if (current_at_base_) {
102         // Change delta from larger than base to smaller
103         AdvanceDelta();
104       } else {
105         // Change base from larger than delta to smaller
106         AdvanceBase();
107       }
108       if (DeltaValid() && BaseValid()) {
109         if (comparator_->Equal(delta_iterator_->Entry().key,
110                                base_iterator_->key())) {
111           equal_keys_ = true;
112         }
113       }
114     }
115     Advance();
116   }
117 
Prev()118   void Prev() override {
119     if (!Valid()) {
120       status_ = Status::NotSupported("Prev() on invalid iterator");
121       return;
122     }
123 
124     if (forward_) {
125       // Need to change direction
126       // if our direction was backward and we're not equal, we have two states:
127       // * both iterators are valid: we're already in a good state (current
128       // shows to smaller)
129       // * only one iterator is valid: we need to advance that iterator
130       forward_ = false;
131       equal_keys_ = false;
132       if (!BaseValid()) {
133         assert(DeltaValid());
134         base_iterator_->SeekToLast();
135       } else if (!DeltaValid()) {
136         delta_iterator_->SeekToLast();
137       } else if (current_at_base_) {
138         // Change delta from less advanced than base to more advanced
139         AdvanceDelta();
140       } else {
141         // Change base from less advanced than delta to more advanced
142         AdvanceBase();
143       }
144       if (DeltaValid() && BaseValid()) {
145         if (comparator_->Equal(delta_iterator_->Entry().key,
146                                base_iterator_->key())) {
147           equal_keys_ = true;
148         }
149       }
150     }
151 
152     Advance();
153   }
154 
key() const155   Slice key() const override {
156     return current_at_base_ ? base_iterator_->key()
157                             : delta_iterator_->Entry().key;
158   }
159 
value() const160   Slice value() const override {
161     return current_at_base_ ? base_iterator_->value()
162                             : delta_iterator_->Entry().value;
163   }
164 
status() const165   Status status() const override {
166     if (!status_.ok()) {
167       return status_;
168     }
169     if (!base_iterator_->status().ok()) {
170       return base_iterator_->status();
171     }
172     return delta_iterator_->status();
173   }
174 
175  private:
AssertInvariants()176   void AssertInvariants() {
177 #ifndef NDEBUG
178     bool not_ok = false;
179     if (!base_iterator_->status().ok()) {
180       assert(!base_iterator_->Valid());
181       not_ok = true;
182     }
183     if (!delta_iterator_->status().ok()) {
184       assert(!delta_iterator_->Valid());
185       not_ok = true;
186     }
187     if (not_ok) {
188       assert(!Valid());
189       assert(!status().ok());
190       return;
191     }
192 
193     if (!Valid()) {
194       return;
195     }
196     if (!BaseValid()) {
197       assert(!current_at_base_ && delta_iterator_->Valid());
198       return;
199     }
200     if (!DeltaValid()) {
201       assert(current_at_base_ && base_iterator_->Valid());
202       return;
203     }
204     // we don't support those yet
205     assert(delta_iterator_->Entry().type != kMergeRecord &&
206            delta_iterator_->Entry().type != kLogDataRecord);
207     int compare = comparator_->Compare(delta_iterator_->Entry().key,
208                                        base_iterator_->key());
209     if (forward_) {
210       // current_at_base -> compare < 0
211       assert(!current_at_base_ || compare < 0);
212       // !current_at_base -> compare <= 0
213       assert(current_at_base_ && compare >= 0);
214     } else {
215       // current_at_base -> compare > 0
216       assert(!current_at_base_ || compare > 0);
217       // !current_at_base -> compare <= 0
218       assert(current_at_base_ && compare <= 0);
219     }
220     // equal_keys_ <=> compare == 0
221     assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
222 #endif
223   }
224 
Advance()225   void Advance() {
226     if (equal_keys_) {
227       assert(BaseValid() && DeltaValid());
228       AdvanceBase();
229       AdvanceDelta();
230     } else {
231       if (current_at_base_) {
232         assert(BaseValid());
233         AdvanceBase();
234       } else {
235         assert(DeltaValid());
236         AdvanceDelta();
237       }
238     }
239     UpdateCurrent();
240   }
241 
AdvanceDelta()242   void AdvanceDelta() {
243     if (forward_) {
244       delta_iterator_->Next();
245     } else {
246       delta_iterator_->Prev();
247     }
248   }
AdvanceBase()249   void AdvanceBase() {
250     if (forward_) {
251       base_iterator_->Next();
252     } else {
253       base_iterator_->Prev();
254     }
255   }
BaseValid() const256   bool BaseValid() const { return base_iterator_->Valid(); }
DeltaValid() const257   bool DeltaValid() const { return delta_iterator_->Valid(); }
UpdateCurrent()258   void UpdateCurrent() {
259 // Suppress false positive clang analyzer warnings.
260 #ifndef __clang_analyzer__
261     status_ = Status::OK();
262     while (true) {
263       WriteEntry delta_entry;
264       if (DeltaValid()) {
265         assert(delta_iterator_->status().ok());
266         delta_entry = delta_iterator_->Entry();
267       } else if (!delta_iterator_->status().ok()) {
268         // Expose the error status and stop.
269         current_at_base_ = false;
270         return;
271       }
272       equal_keys_ = false;
273       if (!BaseValid()) {
274         if (!base_iterator_->status().ok()) {
275           // Expose the error status and stop.
276           current_at_base_ = true;
277           return;
278         }
279 
280         // Base has finished.
281         if (!DeltaValid()) {
282           // Finished
283           return;
284         }
285         if (iterate_upper_bound_) {
286           if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >=
287               0) {
288             // out of upper bound -> finished.
289             return;
290           }
291         }
292         if (delta_entry.type == kDeleteRecord ||
293             delta_entry.type == kSingleDeleteRecord) {
294           AdvanceDelta();
295         } else {
296           current_at_base_ = false;
297           return;
298         }
299       } else if (!DeltaValid()) {
300         // Delta has finished.
301         current_at_base_ = true;
302         return;
303       } else {
304         int compare =
305             (forward_ ? 1 : -1) *
306             comparator_->Compare(delta_entry.key, base_iterator_->key());
307         if (compare <= 0) {  // delta bigger or equal
308           if (compare == 0) {
309             equal_keys_ = true;
310           }
311           if (delta_entry.type != kDeleteRecord &&
312               delta_entry.type != kSingleDeleteRecord) {
313             current_at_base_ = false;
314             return;
315           }
316           // Delta is less advanced and is delete.
317           AdvanceDelta();
318           if (equal_keys_) {
319             AdvanceBase();
320           }
321         } else {
322           current_at_base_ = true;
323           return;
324         }
325       }
326     }
327 
328     AssertInvariants();
329 #endif  // __clang_analyzer__
330   }
331 
332   bool forward_;
333   bool current_at_base_;
334   bool equal_keys_;
335   Status status_;
336   std::unique_ptr<Iterator> base_iterator_;
337   std::unique_ptr<WBWIIterator> delta_iterator_;
338   const Comparator* comparator_;  // not owned
339   const Slice* iterate_upper_bound_;
340 };
341 
342 typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
343     WriteBatchEntrySkipList;
344 
345 class WBWIIteratorImpl : public WBWIIterator {
346  public:
WBWIIteratorImpl(uint32_t column_family_id,WriteBatchEntrySkipList * skip_list,const ReadableWriteBatch * write_batch)347   WBWIIteratorImpl(uint32_t column_family_id,
348                    WriteBatchEntrySkipList* skip_list,
349                    const ReadableWriteBatch* write_batch)
350       : column_family_id_(column_family_id),
351         skip_list_iter_(skip_list),
352         write_batch_(write_batch) {}
353 
~WBWIIteratorImpl()354   ~WBWIIteratorImpl() override {}
355 
Valid() const356   bool Valid() const override {
357     if (!skip_list_iter_.Valid()) {
358       return false;
359     }
360     const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
361     return (iter_entry != nullptr &&
362             iter_entry->column_family == column_family_id_);
363   }
364 
SeekToFirst()365   void SeekToFirst() override {
366     WriteBatchIndexEntry search_entry(
367         nullptr /* search_key */, column_family_id_,
368         true /* is_forward_direction */, true /* is_seek_to_first */);
369     skip_list_iter_.Seek(&search_entry);
370   }
371 
SeekToLast()372   void SeekToLast() override {
373     WriteBatchIndexEntry search_entry(
374         nullptr /* search_key */, column_family_id_ + 1,
375         true /* is_forward_direction */, true /* is_seek_to_first */);
376     skip_list_iter_.Seek(&search_entry);
377     if (!skip_list_iter_.Valid()) {
378       skip_list_iter_.SeekToLast();
379     } else {
380       skip_list_iter_.Prev();
381     }
382   }
383 
Seek(const Slice & key)384   void Seek(const Slice& key) override {
385     WriteBatchIndexEntry search_entry(&key, column_family_id_,
386                                       true /* is_forward_direction */,
387                                       false /* is_seek_to_first */);
388     skip_list_iter_.Seek(&search_entry);
389   }
390 
SeekForPrev(const Slice & key)391   void SeekForPrev(const Slice& key) override {
392     WriteBatchIndexEntry search_entry(&key, column_family_id_,
393                                       false /* is_forward_direction */,
394                                       false /* is_seek_to_first */);
395     skip_list_iter_.SeekForPrev(&search_entry);
396   }
397 
Next()398   void Next() override { skip_list_iter_.Next(); }
399 
Prev()400   void Prev() override { skip_list_iter_.Prev(); }
401 
Entry() const402   WriteEntry Entry() const override {
403     WriteEntry ret;
404     Slice blob, xid;
405     const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
406     // this is guaranteed with Valid()
407     assert(iter_entry != nullptr &&
408            iter_entry->column_family == column_family_id_);
409     auto s = write_batch_->GetEntryFromDataOffset(
410         iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
411     assert(s.ok());
412     assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
413            ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
414            ret.type == kMergeRecord);
415     return ret;
416   }
417 
status() const418   Status status() const override {
419     // this is in-memory data structure, so the only way status can be non-ok is
420     // through memory corruption
421     return Status::OK();
422   }
423 
GetRawEntry() const424   const WriteBatchIndexEntry* GetRawEntry() const {
425     return skip_list_iter_.key();
426   }
427 
428  private:
429   uint32_t column_family_id_;
430   WriteBatchEntrySkipList::Iterator skip_list_iter_;
431   const ReadableWriteBatch* write_batch_;
432 };
433 
434 struct WriteBatchWithIndex::Rep {
RepROCKSDB_NAMESPACE::WriteBatchWithIndex::Rep435   explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
436                size_t max_bytes = 0, bool _overwrite_key = false)
437       : write_batch(reserved_bytes, max_bytes),
438         comparator(index_comparator, &write_batch),
439         skip_list(comparator, &arena),
440         overwrite_key(_overwrite_key),
441         last_entry_offset(0),
442         last_sub_batch_offset(0),
443         sub_batch_cnt(1) {}
444   ReadableWriteBatch write_batch;
445   WriteBatchEntryComparator comparator;
446   Arena arena;
447   WriteBatchEntrySkipList skip_list;
448   bool overwrite_key;
449   size_t last_entry_offset;
450   // The starting offset of the last sub-batch. A sub-batch starts right before
451   // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
452   // the default, means that no duplicate key is detected so far.
453   size_t last_sub_batch_offset;
454   // Total number of sub-batches in the write batch. Default is 1.
455   size_t sub_batch_cnt;
456 
457   // Remember current offset of internal write batch, which is used as
458   // the starting offset of the next record.
SetLastEntryOffsetROCKSDB_NAMESPACE::WriteBatchWithIndex::Rep459   void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
460 
461   // In overwrite mode, find the existing entry for the same key and update it
462   // to point to the current entry.
463   // Return true if the key is found and updated.
464   bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
465   bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
466 
467   // Add the recent entry to the update.
468   // In overwrite mode, if key already exists in the index, update it.
469   void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
470   void AddOrUpdateIndex(const Slice& key);
471 
472   // Allocate an index entry pointing to the last entry in the write batch and
473   // put it to skip list.
474   void AddNewEntry(uint32_t column_family_id);
475 
476   // Clear all updates buffered in this batch.
477   void Clear();
478   void ClearIndex();
479 
480   // Rebuild index by reading all records from the batch.
481   // Returns non-ok status on corruption.
482   Status ReBuildIndex();
483 };
484 
UpdateExistingEntry(ColumnFamilyHandle * column_family,const Slice & key)485 bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
486     ColumnFamilyHandle* column_family, const Slice& key) {
487   uint32_t cf_id = GetColumnFamilyID(column_family);
488   return UpdateExistingEntryWithCfId(cf_id, key);
489 }
490 
UpdateExistingEntryWithCfId(uint32_t column_family_id,const Slice & key)491 bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
492     uint32_t column_family_id, const Slice& key) {
493   if (!overwrite_key) {
494     return false;
495   }
496 
497   WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
498   iter.Seek(key);
499   if (!iter.Valid()) {
500     return false;
501   }
502   if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
503     return false;
504   }
505   WriteBatchIndexEntry* non_const_entry =
506       const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
507   if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
508     last_sub_batch_offset = last_entry_offset;
509     sub_batch_cnt++;
510   }
511   non_const_entry->offset = last_entry_offset;
512   return true;
513 }
514 
AddOrUpdateIndex(ColumnFamilyHandle * column_family,const Slice & key)515 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
516     ColumnFamilyHandle* column_family, const Slice& key) {
517   if (!UpdateExistingEntry(column_family, key)) {
518     uint32_t cf_id = GetColumnFamilyID(column_family);
519     const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
520     if (cf_cmp != nullptr) {
521       comparator.SetComparatorForCF(cf_id, cf_cmp);
522     }
523     AddNewEntry(cf_id);
524   }
525 }
526 
AddOrUpdateIndex(const Slice & key)527 void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
528   if (!UpdateExistingEntryWithCfId(0, key)) {
529     AddNewEntry(0);
530   }
531 }
532 
AddNewEntry(uint32_t column_family_id)533 void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
534   const std::string& wb_data = write_batch.Data();
535   Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
536                           wb_data.size() - last_entry_offset);
537   // Extract key
538   Slice key;
539   bool success __attribute__((__unused__));
540   success =
541       ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
542   assert(success);
543 
544   auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
545   auto* index_entry =
546       new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
547                                       key.data() - wb_data.data(), key.size());
548   skip_list.Insert(index_entry);
549 }
550 
Clear()551 void WriteBatchWithIndex::Rep::Clear() {
552   write_batch.Clear();
553   ClearIndex();
554 }
555 
ClearIndex()556 void WriteBatchWithIndex::Rep::ClearIndex() {
557   skip_list.~WriteBatchEntrySkipList();
558   arena.~Arena();
559   new (&arena) Arena();
560   new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
561   last_entry_offset = 0;
562   last_sub_batch_offset = 0;
563   sub_batch_cnt = 1;
564 }
565 
ReBuildIndex()566 Status WriteBatchWithIndex::Rep::ReBuildIndex() {
567   Status s;
568 
569   ClearIndex();
570 
571   if (write_batch.Count() == 0) {
572     // Nothing to re-index
573     return s;
574   }
575 
576   size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
577 
578   Slice input(write_batch.Data());
579   input.remove_prefix(offset);
580 
581   // Loop through all entries in Rep and add each one to the index
582   uint32_t found = 0;
583   while (s.ok() && !input.empty()) {
584     Slice key, value, blob, xid;
585     uint32_t column_family_id = 0;  // default
586     char tag = 0;
587 
588     // set offset of current entry for call to AddNewEntry()
589     last_entry_offset = input.data() - write_batch.Data().data();
590 
591     s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
592                                   &value, &blob, &xid);
593     if (!s.ok()) {
594       break;
595     }
596 
597     switch (tag) {
598       case kTypeColumnFamilyValue:
599       case kTypeValue:
600       case kTypeColumnFamilyDeletion:
601       case kTypeDeletion:
602       case kTypeColumnFamilySingleDeletion:
603       case kTypeSingleDeletion:
604       case kTypeColumnFamilyMerge:
605       case kTypeMerge:
606         found++;
607         if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
608           AddNewEntry(column_family_id);
609         }
610         break;
611       case kTypeLogData:
612       case kTypeBeginPrepareXID:
613       case kTypeBeginPersistedPrepareXID:
614       case kTypeBeginUnprepareXID:
615       case kTypeEndPrepareXID:
616       case kTypeCommitXID:
617       case kTypeRollbackXID:
618       case kTypeNoop:
619         break;
620       default:
621         return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
622                                   ToString(static_cast<unsigned int>(tag)));
623     }
624   }
625 
626   if (s.ok() && found != write_batch.Count()) {
627     s = Status::Corruption("WriteBatch has wrong count");
628   }
629 
630   return s;
631 }
632 
WriteBatchWithIndex(const Comparator * default_index_comparator,size_t reserved_bytes,bool overwrite_key,size_t max_bytes)633 WriteBatchWithIndex::WriteBatchWithIndex(
634     const Comparator* default_index_comparator, size_t reserved_bytes,
635     bool overwrite_key, size_t max_bytes)
636     : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
637                   overwrite_key)) {}
638 
~WriteBatchWithIndex()639 WriteBatchWithIndex::~WriteBatchWithIndex() {}
640 
641 WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
642 
643 WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
644     default;
645 
GetWriteBatch()646 WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
647 
SubBatchCnt()648 size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
649 
NewIterator()650 WBWIIterator* WriteBatchWithIndex::NewIterator() {
651   return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
652 }
653 
NewIterator(ColumnFamilyHandle * column_family)654 WBWIIterator* WriteBatchWithIndex::NewIterator(
655     ColumnFamilyHandle* column_family) {
656   return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
657                               &(rep->skip_list), &rep->write_batch);
658 }
659 
NewIteratorWithBase(ColumnFamilyHandle * column_family,Iterator * base_iterator,const ReadOptions * read_options)660 Iterator* WriteBatchWithIndex::NewIteratorWithBase(
661     ColumnFamilyHandle* column_family, Iterator* base_iterator,
662     const ReadOptions* read_options) {
663   if (rep->overwrite_key == false) {
664     assert(false);
665     return nullptr;
666   }
667   return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
668                                GetColumnFamilyUserComparator(column_family),
669                                read_options);
670 }
671 
NewIteratorWithBase(Iterator * base_iterator)672 Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
673   if (rep->overwrite_key == false) {
674     assert(false);
675     return nullptr;
676   }
677   // default column family's comparator
678   return new BaseDeltaIterator(base_iterator, NewIterator(),
679                                rep->comparator.default_comparator());
680 }
681 
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)682 Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
683                                 const Slice& key, const Slice& value) {
684   rep->SetLastEntryOffset();
685   auto s = rep->write_batch.Put(column_family, key, value);
686   if (s.ok()) {
687     rep->AddOrUpdateIndex(column_family, key);
688   }
689   return s;
690 }
691 
Put(const Slice & key,const Slice & value)692 Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
693   rep->SetLastEntryOffset();
694   auto s = rep->write_batch.Put(key, value);
695   if (s.ok()) {
696     rep->AddOrUpdateIndex(key);
697   }
698   return s;
699 }
700 
Delete(ColumnFamilyHandle * column_family,const Slice & key)701 Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
702                                    const Slice& key) {
703   rep->SetLastEntryOffset();
704   auto s = rep->write_batch.Delete(column_family, key);
705   if (s.ok()) {
706     rep->AddOrUpdateIndex(column_family, key);
707   }
708   return s;
709 }
710 
Delete(const Slice & key)711 Status WriteBatchWithIndex::Delete(const Slice& key) {
712   rep->SetLastEntryOffset();
713   auto s = rep->write_batch.Delete(key);
714   if (s.ok()) {
715     rep->AddOrUpdateIndex(key);
716   }
717   return s;
718 }
719 
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)720 Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
721                                          const Slice& key) {
722   rep->SetLastEntryOffset();
723   auto s = rep->write_batch.SingleDelete(column_family, key);
724   if (s.ok()) {
725     rep->AddOrUpdateIndex(column_family, key);
726   }
727   return s;
728 }
729 
SingleDelete(const Slice & key)730 Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
731   rep->SetLastEntryOffset();
732   auto s = rep->write_batch.SingleDelete(key);
733   if (s.ok()) {
734     rep->AddOrUpdateIndex(key);
735   }
736   return s;
737 }
738 
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)739 Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
740                                   const Slice& key, const Slice& value) {
741   rep->SetLastEntryOffset();
742   auto s = rep->write_batch.Merge(column_family, key, value);
743   if (s.ok()) {
744     rep->AddOrUpdateIndex(column_family, key);
745   }
746   return s;
747 }
748 
Merge(const Slice & key,const Slice & value)749 Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
750   rep->SetLastEntryOffset();
751   auto s = rep->write_batch.Merge(key, value);
752   if (s.ok()) {
753     rep->AddOrUpdateIndex(key);
754   }
755   return s;
756 }
757 
PutLogData(const Slice & blob)758 Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
759   return rep->write_batch.PutLogData(blob);
760 }
761 
Clear()762 void WriteBatchWithIndex::Clear() { rep->Clear(); }
763 
GetFromBatch(ColumnFamilyHandle * column_family,const DBOptions & options,const Slice & key,std::string * value)764 Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
765                                          const DBOptions& options,
766                                          const Slice& key, std::string* value) {
767   Status s;
768   MergeContext merge_context;
769   const ImmutableDBOptions immuable_db_options(options);
770 
771   WriteBatchWithIndexInternal::Result result =
772       WriteBatchWithIndexInternal::GetFromBatch(
773           immuable_db_options, this, column_family, key, &merge_context,
774           &rep->comparator, value, rep->overwrite_key, &s);
775 
776   switch (result) {
777     case WriteBatchWithIndexInternal::Result::kFound:
778     case WriteBatchWithIndexInternal::Result::kError:
779       // use returned status
780       break;
781     case WriteBatchWithIndexInternal::Result::kDeleted:
782     case WriteBatchWithIndexInternal::Result::kNotFound:
783       s = Status::NotFound();
784       break;
785     case WriteBatchWithIndexInternal::Result::kMergeInProgress:
786       s = Status::MergeInProgress();
787       break;
788     default:
789       assert(false);
790   }
791 
792   return s;
793 }
794 
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,const Slice & key,std::string * value)795 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
796                                               const ReadOptions& read_options,
797                                               const Slice& key,
798                                               std::string* value) {
799   assert(value != nullptr);
800   PinnableSlice pinnable_val(value);
801   assert(!pinnable_val.IsPinned());
802   auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
803                              &pinnable_val);
804   if (s.ok() && pinnable_val.IsPinned()) {
805     value->assign(pinnable_val.data(), pinnable_val.size());
806   }  // else value is already assigned
807   return s;
808 }
809 
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,const Slice & key,PinnableSlice * pinnable_val)810 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
811                                               const ReadOptions& read_options,
812                                               const Slice& key,
813                                               PinnableSlice* pinnable_val) {
814   return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
815                            pinnable_val);
816 }
817 
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value)818 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
819                                               const ReadOptions& read_options,
820                                               ColumnFamilyHandle* column_family,
821                                               const Slice& key,
822                                               std::string* value) {
823   assert(value != nullptr);
824   PinnableSlice pinnable_val(value);
825   assert(!pinnable_val.IsPinned());
826   auto s =
827       GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
828   if (s.ok() && pinnable_val.IsPinned()) {
829     value->assign(pinnable_val.data(), pinnable_val.size());
830   }  // else value is already assigned
831   return s;
832 }
833 
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)834 Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
835                                               const ReadOptions& read_options,
836                                               ColumnFamilyHandle* column_family,
837                                               const Slice& key,
838                                               PinnableSlice* pinnable_val) {
839   return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
840                            nullptr);
841 }
842 
GetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val,ReadCallback * callback)843 Status WriteBatchWithIndex::GetFromBatchAndDB(
844     DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
845     const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
846   Status s;
847   MergeContext merge_context;
848   const ImmutableDBOptions& immuable_db_options =
849       static_cast_with_check<DBImpl, DB>(db->GetRootDB())
850           ->immutable_db_options();
851 
852   // Since the lifetime of the WriteBatch is the same as that of the transaction
853   // we cannot pin it as otherwise the returned value will not be available
854   // after the transaction finishes.
855   std::string& batch_value = *pinnable_val->GetSelf();
856   WriteBatchWithIndexInternal::Result result =
857       WriteBatchWithIndexInternal::GetFromBatch(
858           immuable_db_options, this, column_family, key, &merge_context,
859           &rep->comparator, &batch_value, rep->overwrite_key, &s);
860 
861   if (result == WriteBatchWithIndexInternal::Result::kFound) {
862     pinnable_val->PinSelf();
863     return s;
864   }
865   if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
866     return Status::NotFound();
867   }
868   if (result == WriteBatchWithIndexInternal::Result::kError) {
869     return s;
870   }
871   if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
872       rep->overwrite_key == true) {
873     // Since we've overwritten keys, we do not know what other operations are
874     // in this batch for this key, so we cannot do a Merge to compute the
875     // result.  Instead, we will simply return MergeInProgress.
876     return Status::MergeInProgress();
877   }
878 
879   assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
880          result == WriteBatchWithIndexInternal::Result::kNotFound);
881 
882   // Did not find key in batch OR could not resolve Merges.  Try DB.
883   if (!callback) {
884     s = db->Get(read_options, column_family, key, pinnable_val);
885   } else {
886     DBImpl::GetImplOptions get_impl_options;
887     get_impl_options.column_family = column_family;
888     get_impl_options.value = pinnable_val;
889     get_impl_options.callback = callback;
890     s = static_cast_with_check<DBImpl, DB>(db->GetRootDB())
891             ->GetImpl(read_options, key, get_impl_options);
892   }
893 
894   if (s.ok() || s.IsNotFound()) {  // DB Get Succeeded
895     if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
896       // Merge result from DB with merges in Batch
897       auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
898       const MergeOperator* merge_operator =
899           cfh->cfd()->ioptions()->merge_operator;
900       Statistics* statistics = immuable_db_options.statistics.get();
901       Env* env = immuable_db_options.env;
902       Logger* logger = immuable_db_options.info_log.get();
903 
904       Slice* merge_data;
905       if (s.ok()) {
906         merge_data = pinnable_val;
907       } else {  // Key not present in db (s.IsNotFound())
908         merge_data = nullptr;
909       }
910 
911       if (merge_operator) {
912         std::string merge_result;
913         s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
914                                         merge_context.GetOperands(),
915                                         &merge_result, logger, statistics, env);
916         pinnable_val->Reset();
917         *pinnable_val->GetSelf() = std::move(merge_result);
918         pinnable_val->PinSelf();
919       } else {
920         s = Status::InvalidArgument("Options::merge_operator must be set");
921       }
922     }
923   }
924 
925   return s;
926 }
927 
MultiGetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,bool sorted_input)928 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
929     DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
930     const size_t num_keys, const Slice* keys, PinnableSlice* values,
931     Status* statuses, bool sorted_input) {
932   MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
933                          values, statuses, sorted_input, nullptr);
934 }
935 
MultiGetFromBatchAndDB(DB * db,const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,bool sorted_input,ReadCallback * callback)936 void WriteBatchWithIndex::MultiGetFromBatchAndDB(
937     DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
938     const size_t num_keys, const Slice* keys, PinnableSlice* values,
939     Status* statuses, bool sorted_input, ReadCallback* callback) {
940   const ImmutableDBOptions& immuable_db_options =
941       static_cast_with_check<DBImpl, DB>(db->GetRootDB())
942           ->immutable_db_options();
943 
944   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
945   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
946   // To hold merges from the write batch
947   autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
948              MultiGetContext::MAX_BATCH_SIZE>
949       merges;
950   // Since the lifetime of the WriteBatch is the same as that of the transaction
951   // we cannot pin it as otherwise the returned value will not be available
952   // after the transaction finishes.
953   for (size_t i = 0; i < num_keys; ++i) {
954     MergeContext merge_context;
955     PinnableSlice* pinnable_val = &values[i];
956     std::string& batch_value = *pinnable_val->GetSelf();
957     Status* s = &statuses[i];
958     WriteBatchWithIndexInternal::Result result =
959         WriteBatchWithIndexInternal::GetFromBatch(
960             immuable_db_options, this, column_family, keys[i], &merge_context,
961             &rep->comparator, &batch_value, rep->overwrite_key, s);
962 
963     if (result == WriteBatchWithIndexInternal::Result::kFound) {
964       pinnable_val->PinSelf();
965       continue;
966     }
967     if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
968       *s = Status::NotFound();
969       continue;
970     }
971     if (result == WriteBatchWithIndexInternal::Result::kError) {
972       continue;
973     }
974     if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
975         rep->overwrite_key == true) {
976       // Since we've overwritten keys, we do not know what other operations are
977       // in this batch for this key, so we cannot do a Merge to compute the
978       // result.  Instead, we will simply return MergeInProgress.
979       *s = Status::MergeInProgress();
980       continue;
981     }
982 
983     assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
984            result == WriteBatchWithIndexInternal::Result::kNotFound);
985     key_context.emplace_back(column_family, keys[i], &values[i],
986                              /*timestamp*/ nullptr, &statuses[i]);
987     merges.emplace_back(result, std::move(merge_context));
988   }
989 
990   for (KeyContext& key : key_context) {
991     sorted_keys.emplace_back(&key);
992   }
993 
994   // Did not find key in batch OR could not resolve Merges.  Try DB.
995   static_cast_with_check<DBImpl, DB>(db->GetRootDB())
996       ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
997   static_cast_with_check<DBImpl, DB>(db->GetRootDB())
998       ->MultiGetWithCallback(read_options, column_family, callback,
999                              &sorted_keys);
1000 
1001   ColumnFamilyHandleImpl* cfh =
1002       reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1003   const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
1004   for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
1005     KeyContext& key = *iter;
1006     if (key.s->ok() || key.s->IsNotFound()) {  // DB Get Succeeded
1007       size_t index = iter - key_context.begin();
1008       std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
1009           merge_result = merges[index];
1010       if (merge_result.first ==
1011           WriteBatchWithIndexInternal::Result::kMergeInProgress) {
1012         // Merge result from DB with merges in Batch
1013         Statistics* statistics = immuable_db_options.statistics.get();
1014         Env* env = immuable_db_options.env;
1015         Logger* logger = immuable_db_options.info_log.get();
1016 
1017         Slice* merge_data;
1018         if (key.s->ok()) {
1019           merge_data = iter->value;
1020         } else {  // Key not present in db (s.IsNotFound())
1021           merge_data = nullptr;
1022         }
1023 
1024         if (merge_operator) {
1025           *key.s = MergeHelper::TimedFullMerge(
1026               merge_operator, *key.key, merge_data,
1027               merge_result.second.GetOperands(), key.value->GetSelf(), logger,
1028               statistics, env);
1029           key.value->PinSelf();
1030         } else {
1031           *key.s =
1032               Status::InvalidArgument("Options::merge_operator must be set");
1033         }
1034       }
1035     }
1036   }
1037 }
1038 
SetSavePoint()1039 void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
1040 
RollbackToSavePoint()1041 Status WriteBatchWithIndex::RollbackToSavePoint() {
1042   Status s = rep->write_batch.RollbackToSavePoint();
1043 
1044   if (s.ok()) {
1045     rep->sub_batch_cnt = 1;
1046     rep->last_sub_batch_offset = 0;
1047     s = rep->ReBuildIndex();
1048   }
1049 
1050   return s;
1051 }
1052 
PopSavePoint()1053 Status WriteBatchWithIndex::PopSavePoint() {
1054   return rep->write_batch.PopSavePoint();
1055 }
1056 
SetMaxBytes(size_t max_bytes)1057 void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
1058   rep->write_batch.SetMaxBytes(max_bytes);
1059 }
1060 
GetDataSize() const1061 size_t WriteBatchWithIndex::GetDataSize() const {
1062   return rep->write_batch.GetDataSize();
1063 }
1064 
1065 }  // namespace ROCKSDB_NAMESPACE
1066 #endif  // !ROCKSDB_LITE
1067