1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
9
10 #include "db/column_family.h"
11 #include "db/merge_context.h"
12 #include "db/merge_helper.h"
13 #include "rocksdb/comparator.h"
14 #include "rocksdb/db.h"
15 #include "rocksdb/utilities/write_batch_with_index.h"
16 #include "util/coding.h"
17 #include "util/string_util.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 class Env;
22 class Logger;
23 class Statistics;
24
GetEntryFromDataOffset(size_t data_offset,WriteType * type,Slice * Key,Slice * value,Slice * blob,Slice * xid) const25 Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
26 WriteType* type, Slice* Key,
27 Slice* value, Slice* blob,
28 Slice* xid) const {
29 if (type == nullptr || Key == nullptr || value == nullptr ||
30 blob == nullptr || xid == nullptr) {
31 return Status::InvalidArgument("Output parameters cannot be null");
32 }
33
34 if (data_offset == GetDataSize()) {
35 // reached end of batch.
36 return Status::NotFound();
37 }
38
39 if (data_offset > GetDataSize()) {
40 return Status::InvalidArgument("data offset exceed write batch size");
41 }
42 Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
43 char tag;
44 uint32_t column_family;
45 Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
46 blob, xid);
47
48 switch (tag) {
49 case kTypeColumnFamilyValue:
50 case kTypeValue:
51 *type = kPutRecord;
52 break;
53 case kTypeColumnFamilyDeletion:
54 case kTypeDeletion:
55 *type = kDeleteRecord;
56 break;
57 case kTypeColumnFamilySingleDeletion:
58 case kTypeSingleDeletion:
59 *type = kSingleDeleteRecord;
60 break;
61 case kTypeColumnFamilyRangeDeletion:
62 case kTypeRangeDeletion:
63 *type = kDeleteRangeRecord;
64 break;
65 case kTypeColumnFamilyMerge:
66 case kTypeMerge:
67 *type = kMergeRecord;
68 break;
69 case kTypeLogData:
70 *type = kLogDataRecord;
71 break;
72 case kTypeNoop:
73 case kTypeBeginPrepareXID:
74 case kTypeBeginPersistedPrepareXID:
75 case kTypeBeginUnprepareXID:
76 case kTypeEndPrepareXID:
77 case kTypeCommitXID:
78 case kTypeRollbackXID:
79 *type = kXIDRecord;
80 break;
81 default:
82 return Status::Corruption("unknown WriteBatch tag ",
83 ToString(static_cast<unsigned int>(tag)));
84 }
85 return Status::OK();
86 }
87
88 // If both of `entry1` and `entry2` point to real entry in write batch, we
89 // compare the entries as following:
90 // 1. first compare the column family, the one with larger CF will be larger;
91 // 2. Inside the same CF, we first decode the entry to find the key of the entry
92 // and the entry with larger key will be larger;
93 // 3. If two entries are of the same CF and offset, the one with larger offset
94 // will be larger.
95 // Some times either `entry1` or `entry2` is dummy entry, which is actually
96 // a search key. In this case, in step 2, we don't go ahead and decode the
97 // entry but use the value in WriteBatchIndexEntry::search_key.
98 // One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
99 // This indicate that we are going to seek to the first of the column family.
100 // Once we see this, this entry will be smaller than all the real entries of
101 // the column family.
operator ()(const WriteBatchIndexEntry * entry1,const WriteBatchIndexEntry * entry2) const102 int WriteBatchEntryComparator::operator()(
103 const WriteBatchIndexEntry* entry1,
104 const WriteBatchIndexEntry* entry2) const {
105 if (entry1->column_family > entry2->column_family) {
106 return 1;
107 } else if (entry1->column_family < entry2->column_family) {
108 return -1;
109 }
110
111 // Deal with special case of seeking to the beginning of a column family
112 if (entry1->is_min_in_cf()) {
113 return -1;
114 } else if (entry2->is_min_in_cf()) {
115 return 1;
116 }
117
118 Slice key1, key2;
119 if (entry1->search_key == nullptr) {
120 key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
121 entry1->key_size);
122 } else {
123 key1 = *(entry1->search_key);
124 }
125 if (entry2->search_key == nullptr) {
126 key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
127 entry2->key_size);
128 } else {
129 key2 = *(entry2->search_key);
130 }
131
132 int cmp = CompareKey(entry1->column_family, key1, key2);
133 if (cmp != 0) {
134 return cmp;
135 } else if (entry1->offset > entry2->offset) {
136 return 1;
137 } else if (entry1->offset < entry2->offset) {
138 return -1;
139 }
140 return 0;
141 }
142
CompareKey(uint32_t column_family,const Slice & key1,const Slice & key2) const143 int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
144 const Slice& key1,
145 const Slice& key2) const {
146 if (column_family < cf_comparators_.size() &&
147 cf_comparators_[column_family] != nullptr) {
148 return cf_comparators_[column_family]->Compare(key1, key2);
149 } else {
150 return default_comparator_->Compare(key1, key2);
151 }
152 }
153
GetFromBatch(const ImmutableDBOptions & immuable_db_options,WriteBatchWithIndex * batch,ColumnFamilyHandle * column_family,const Slice & key,MergeContext * merge_context,WriteBatchEntryComparator * cmp,std::string * value,bool overwrite_key,Status * s)154 WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
155 const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
156 ColumnFamilyHandle* column_family, const Slice& key,
157 MergeContext* merge_context, WriteBatchEntryComparator* cmp,
158 std::string* value, bool overwrite_key, Status* s) {
159 uint32_t cf_id = GetColumnFamilyID(column_family);
160 *s = Status::OK();
161 WriteBatchWithIndexInternal::Result result =
162 WriteBatchWithIndexInternal::Result::kNotFound;
163
164 std::unique_ptr<WBWIIterator> iter =
165 std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
166
167 // We want to iterate in the reverse order that the writes were added to the
168 // batch. Since we don't have a reverse iterator, we must seek past the end.
169 // TODO(agiardullo): consider adding support for reverse iteration
170 iter->Seek(key);
171 while (iter->Valid()) {
172 const WriteEntry entry = iter->Entry();
173 if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
174 break;
175 }
176
177 iter->Next();
178 }
179
180 if (!(*s).ok()) {
181 return WriteBatchWithIndexInternal::Result::kError;
182 }
183
184 if (!iter->Valid()) {
185 // Read past end of results. Reposition on last result.
186 iter->SeekToLast();
187 } else {
188 iter->Prev();
189 }
190
191 Slice entry_value;
192 while (iter->Valid()) {
193 const WriteEntry entry = iter->Entry();
194 if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
195 // Unexpected error or we've reached a different next key
196 break;
197 }
198
199 switch (entry.type) {
200 case kPutRecord: {
201 result = WriteBatchWithIndexInternal::Result::kFound;
202 entry_value = entry.value;
203 break;
204 }
205 case kMergeRecord: {
206 result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
207 merge_context->PushOperand(entry.value);
208 break;
209 }
210 case kDeleteRecord:
211 case kSingleDeleteRecord: {
212 result = WriteBatchWithIndexInternal::Result::kDeleted;
213 break;
214 }
215 case kLogDataRecord:
216 case kXIDRecord: {
217 // ignore
218 break;
219 }
220 default: {
221 result = WriteBatchWithIndexInternal::Result::kError;
222 (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
223 ToString(entry.type));
224 break;
225 }
226 }
227 if (result == WriteBatchWithIndexInternal::Result::kFound ||
228 result == WriteBatchWithIndexInternal::Result::kDeleted ||
229 result == WriteBatchWithIndexInternal::Result::kError) {
230 // We can stop iterating once we find a PUT or DELETE
231 break;
232 }
233 if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
234 overwrite_key == true) {
235 // Since we've overwritten keys, we do not know what other operations are
236 // in this batch for this key, so we cannot do a Merge to compute the
237 // result. Instead, we will simply return MergeInProgress.
238 break;
239 }
240
241 iter->Prev();
242 }
243
244 if ((*s).ok()) {
245 if (result == WriteBatchWithIndexInternal::Result::kFound ||
246 result == WriteBatchWithIndexInternal::Result::kDeleted) {
247 // Found a Put or Delete. Merge if necessary.
248 if (merge_context->GetNumOperands() > 0) {
249 const MergeOperator* merge_operator;
250
251 if (column_family != nullptr) {
252 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
253 merge_operator = cfh->cfd()->ioptions()->merge_operator;
254 } else {
255 *s = Status::InvalidArgument("Must provide a column_family");
256 result = WriteBatchWithIndexInternal::Result::kError;
257 return result;
258 }
259 Statistics* statistics = immuable_db_options.statistics.get();
260 Env* env = immuable_db_options.env;
261 Logger* logger = immuable_db_options.info_log.get();
262
263 if (merge_operator) {
264 *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
265 merge_context->GetOperands(), value,
266 logger, statistics, env);
267 } else {
268 *s = Status::InvalidArgument("Options::merge_operator must be set");
269 }
270 if ((*s).ok()) {
271 result = WriteBatchWithIndexInternal::Result::kFound;
272 } else {
273 result = WriteBatchWithIndexInternal::Result::kError;
274 }
275 } else { // nothing to merge
276 if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
277 value->assign(entry_value.data(), entry_value.size());
278 }
279 }
280 }
281 }
282
283 return result;
284 }
285
286 } // namespace ROCKSDB_NAMESPACE
287
288 #endif // !ROCKSDB_LITE
289