1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
9 
10 #include "db/column_family.h"
11 #include "db/merge_context.h"
12 #include "db/merge_helper.h"
13 #include "rocksdb/comparator.h"
14 #include "rocksdb/db.h"
15 #include "rocksdb/utilities/write_batch_with_index.h"
16 #include "util/coding.h"
17 #include "util/string_util.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 class Env;
22 class Logger;
23 class Statistics;
24 
GetEntryFromDataOffset(size_t data_offset,WriteType * type,Slice * Key,Slice * value,Slice * blob,Slice * xid) const25 Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
26                                                   WriteType* type, Slice* Key,
27                                                   Slice* value, Slice* blob,
28                                                   Slice* xid) const {
29   if (type == nullptr || Key == nullptr || value == nullptr ||
30       blob == nullptr || xid == nullptr) {
31     return Status::InvalidArgument("Output parameters cannot be null");
32   }
33 
34   if (data_offset == GetDataSize()) {
35     // reached end of batch.
36     return Status::NotFound();
37   }
38 
39   if (data_offset > GetDataSize()) {
40     return Status::InvalidArgument("data offset exceed write batch size");
41   }
42   Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
43   char tag;
44   uint32_t column_family;
45   Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
46                                       blob, xid);
47 
48   switch (tag) {
49     case kTypeColumnFamilyValue:
50     case kTypeValue:
51       *type = kPutRecord;
52       break;
53     case kTypeColumnFamilyDeletion:
54     case kTypeDeletion:
55       *type = kDeleteRecord;
56       break;
57     case kTypeColumnFamilySingleDeletion:
58     case kTypeSingleDeletion:
59       *type = kSingleDeleteRecord;
60       break;
61     case kTypeColumnFamilyRangeDeletion:
62     case kTypeRangeDeletion:
63       *type = kDeleteRangeRecord;
64       break;
65     case kTypeColumnFamilyMerge:
66     case kTypeMerge:
67       *type = kMergeRecord;
68       break;
69     case kTypeLogData:
70       *type = kLogDataRecord;
71       break;
72     case kTypeNoop:
73     case kTypeBeginPrepareXID:
74     case kTypeBeginPersistedPrepareXID:
75     case kTypeBeginUnprepareXID:
76     case kTypeEndPrepareXID:
77     case kTypeCommitXID:
78     case kTypeRollbackXID:
79       *type = kXIDRecord;
80       break;
81     default:
82       return Status::Corruption("unknown WriteBatch tag ",
83                                 ToString(static_cast<unsigned int>(tag)));
84   }
85   return Status::OK();
86 }
87 
88 // If both of `entry1` and `entry2` point to real entry in write batch, we
89 // compare the entries as following:
90 // 1. first compare the column family, the one with larger CF will be larger;
91 // 2. Inside the same CF, we first decode the entry to find the key of the entry
92 //    and the entry with larger key will be larger;
93 // 3. If two entries are of the same CF and offset, the one with larger offset
94 //    will be larger.
95 // Some times either `entry1` or `entry2` is dummy entry, which is actually
96 // a search key. In this case, in step 2, we don't go ahead and decode the
97 // entry but use the value in WriteBatchIndexEntry::search_key.
98 // One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
99 // This indicate that we are going to seek to the first of the column family.
100 // Once we see this, this entry will be smaller than all the real entries of
101 // the column family.
operator ()(const WriteBatchIndexEntry * entry1,const WriteBatchIndexEntry * entry2) const102 int WriteBatchEntryComparator::operator()(
103     const WriteBatchIndexEntry* entry1,
104     const WriteBatchIndexEntry* entry2) const {
105   if (entry1->column_family > entry2->column_family) {
106     return 1;
107   } else if (entry1->column_family < entry2->column_family) {
108     return -1;
109   }
110 
111   // Deal with special case of seeking to the beginning of a column family
112   if (entry1->is_min_in_cf()) {
113     return -1;
114   } else if (entry2->is_min_in_cf()) {
115     return 1;
116   }
117 
118   Slice key1, key2;
119   if (entry1->search_key == nullptr) {
120     key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
121                  entry1->key_size);
122   } else {
123     key1 = *(entry1->search_key);
124   }
125   if (entry2->search_key == nullptr) {
126     key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
127                  entry2->key_size);
128   } else {
129     key2 = *(entry2->search_key);
130   }
131 
132   int cmp = CompareKey(entry1->column_family, key1, key2);
133   if (cmp != 0) {
134     return cmp;
135   } else if (entry1->offset > entry2->offset) {
136     return 1;
137   } else if (entry1->offset < entry2->offset) {
138     return -1;
139   }
140   return 0;
141 }
142 
CompareKey(uint32_t column_family,const Slice & key1,const Slice & key2) const143 int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
144                                           const Slice& key1,
145                                           const Slice& key2) const {
146   if (column_family < cf_comparators_.size() &&
147       cf_comparators_[column_family] != nullptr) {
148     return cf_comparators_[column_family]->Compare(key1, key2);
149   } else {
150     return default_comparator_->Compare(key1, key2);
151   }
152 }
153 
GetFromBatch(const ImmutableDBOptions & immuable_db_options,WriteBatchWithIndex * batch,ColumnFamilyHandle * column_family,const Slice & key,MergeContext * merge_context,WriteBatchEntryComparator * cmp,std::string * value,bool overwrite_key,Status * s)154 WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
155     const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
156     ColumnFamilyHandle* column_family, const Slice& key,
157     MergeContext* merge_context, WriteBatchEntryComparator* cmp,
158     std::string* value, bool overwrite_key, Status* s) {
159   uint32_t cf_id = GetColumnFamilyID(column_family);
160   *s = Status::OK();
161   WriteBatchWithIndexInternal::Result result =
162       WriteBatchWithIndexInternal::Result::kNotFound;
163 
164   std::unique_ptr<WBWIIterator> iter =
165       std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
166 
167   // We want to iterate in the reverse order that the writes were added to the
168   // batch.  Since we don't have a reverse iterator, we must seek past the end.
169   // TODO(agiardullo): consider adding support for reverse iteration
170   iter->Seek(key);
171   while (iter->Valid()) {
172     const WriteEntry entry = iter->Entry();
173     if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
174       break;
175     }
176 
177     iter->Next();
178   }
179 
180   if (!(*s).ok()) {
181     return WriteBatchWithIndexInternal::Result::kError;
182   }
183 
184   if (!iter->Valid()) {
185     // Read past end of results.  Reposition on last result.
186     iter->SeekToLast();
187   } else {
188     iter->Prev();
189   }
190 
191   Slice entry_value;
192   while (iter->Valid()) {
193     const WriteEntry entry = iter->Entry();
194     if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
195       // Unexpected error or we've reached a different next key
196       break;
197     }
198 
199     switch (entry.type) {
200       case kPutRecord: {
201         result = WriteBatchWithIndexInternal::Result::kFound;
202         entry_value = entry.value;
203         break;
204       }
205       case kMergeRecord: {
206         result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
207         merge_context->PushOperand(entry.value);
208         break;
209       }
210       case kDeleteRecord:
211       case kSingleDeleteRecord: {
212         result = WriteBatchWithIndexInternal::Result::kDeleted;
213         break;
214       }
215       case kLogDataRecord:
216       case kXIDRecord: {
217         // ignore
218         break;
219       }
220       default: {
221         result = WriteBatchWithIndexInternal::Result::kError;
222         (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
223                                   ToString(entry.type));
224         break;
225       }
226     }
227     if (result == WriteBatchWithIndexInternal::Result::kFound ||
228         result == WriteBatchWithIndexInternal::Result::kDeleted ||
229         result == WriteBatchWithIndexInternal::Result::kError) {
230       // We can stop iterating once we find a PUT or DELETE
231       break;
232     }
233     if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
234         overwrite_key == true) {
235       // Since we've overwritten keys, we do not know what other operations are
236       // in this batch for this key, so we cannot do a Merge to compute the
237       // result.  Instead, we will simply return MergeInProgress.
238       break;
239     }
240 
241     iter->Prev();
242   }
243 
244   if ((*s).ok()) {
245     if (result == WriteBatchWithIndexInternal::Result::kFound ||
246         result == WriteBatchWithIndexInternal::Result::kDeleted) {
247       // Found a Put or Delete.  Merge if necessary.
248       if (merge_context->GetNumOperands() > 0) {
249         const MergeOperator* merge_operator;
250 
251         if (column_family != nullptr) {
252           auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
253           merge_operator = cfh->cfd()->ioptions()->merge_operator;
254         } else {
255           *s = Status::InvalidArgument("Must provide a column_family");
256           result = WriteBatchWithIndexInternal::Result::kError;
257           return result;
258         }
259         Statistics* statistics = immuable_db_options.statistics.get();
260         Env* env = immuable_db_options.env;
261         Logger* logger = immuable_db_options.info_log.get();
262 
263         if (merge_operator) {
264           *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
265                                            merge_context->GetOperands(), value,
266                                            logger, statistics, env);
267         } else {
268           *s = Status::InvalidArgument("Options::merge_operator must be set");
269         }
270         if ((*s).ok()) {
271           result = WriteBatchWithIndexInternal::Result::kFound;
272         } else {
273           result = WriteBatchWithIndexInternal::Result::kError;
274         }
275       } else {  // nothing to merge
276         if (result == WriteBatchWithIndexInternal::Result::kFound) {  // PUT
277           value->assign(entry_value.data(), entry_value.size());
278         }
279       }
280     }
281   }
282 
283   return result;
284 }
285 
286 }  // namespace ROCKSDB_NAMESPACE
287 
288 #endif  // !ROCKSDB_LITE
289