1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7 #include "table/plain/plain_table_key_coding.h"
8
9 #include <algorithm>
10 #include <string>
11 #include "db/dbformat.h"
12 #include "file/writable_file_writer.h"
13 #include "table/plain/plain_table_factory.h"
14 #include "table/plain/plain_table_reader.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 enum PlainTableEntryType : unsigned char {
19 kFullKey = 0,
20 kPrefixFromPreviousKey = 1,
21 kKeySuffix = 2,
22 };
23
24 namespace {
25
26 // Control byte:
27 // First two bits indicate type of entry
28 // Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
29 // are used. key_size-0x3F will be encoded as a variint32 after this bytes.
30
31 const unsigned char kSizeInlineLimit = 0x3F;
32
33 // Return 0 for error
EncodeSize(PlainTableEntryType type,uint32_t key_size,char * out_buffer)34 size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
35 char* out_buffer) {
36 out_buffer[0] = type << 6;
37
38 if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
39 // size inlined
40 out_buffer[0] |= static_cast<char>(key_size);
41 return 1;
42 } else {
43 out_buffer[0] |= kSizeInlineLimit;
44 char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
45 return ptr - out_buffer;
46 }
47 }
48 } // namespace
49
50 // Fill bytes_read with number of bytes read.
DecodeSize(uint32_t start_offset,PlainTableEntryType * entry_type,uint32_t * key_size,uint32_t * bytes_read)51 inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
52 PlainTableEntryType* entry_type,
53 uint32_t* key_size,
54 uint32_t* bytes_read) {
55 Slice next_byte_slice;
56 bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
57 if (!success) {
58 return file_reader_.status();
59 }
60 *entry_type = static_cast<PlainTableEntryType>(
61 (static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
62 6);
63 char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
64 if (inline_key_size < kSizeInlineLimit) {
65 *key_size = inline_key_size;
66 *bytes_read = 1;
67 return Status::OK();
68 } else {
69 uint32_t extra_size;
70 uint32_t tmp_bytes_read;
71 success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
72 &tmp_bytes_read);
73 if (!success) {
74 return file_reader_.status();
75 }
76 assert(tmp_bytes_read > 0);
77 *key_size = kSizeInlineLimit + extra_size;
78 *bytes_read = tmp_bytes_read + 1;
79 return Status::OK();
80 }
81 }
82
AppendKey(const Slice & key,WritableFileWriter * file,uint64_t * offset,char * meta_bytes_buf,size_t * meta_bytes_buf_size)83 IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key,
84 WritableFileWriter* file,
85 uint64_t* offset, char* meta_bytes_buf,
86 size_t* meta_bytes_buf_size) {
87 ParsedInternalKey parsed_key;
88 if (!ParseInternalKey(key, &parsed_key)) {
89 return IOStatus::Corruption(Slice());
90 }
91
92 Slice key_to_write = key; // Portion of internal key to write out.
93
94 uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
95 if (encoding_type_ == kPlain) {
96 if (fixed_user_key_len_ == kPlainTableVariableLength) {
97 // Write key length
98 char key_size_buf[5]; // tmp buffer for key size as varint32
99 char* ptr = EncodeVarint32(key_size_buf, user_key_size);
100 assert(ptr <= key_size_buf + sizeof(key_size_buf));
101 auto len = ptr - key_size_buf;
102 IOStatus io_s = file->Append(Slice(key_size_buf, len));
103 if (!io_s.ok()) {
104 return io_s;
105 }
106 *offset += len;
107 }
108 } else {
109 assert(encoding_type_ == kPrefix);
110 char size_bytes[12];
111 size_t size_bytes_pos = 0;
112
113 Slice prefix =
114 prefix_extractor_->Transform(Slice(key.data(), user_key_size));
115 if (key_count_for_prefix_ == 0 || prefix != pre_prefix_.GetUserKey() ||
116 key_count_for_prefix_ % index_sparseness_ == 0) {
117 key_count_for_prefix_ = 1;
118 pre_prefix_.SetUserKey(prefix);
119 size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
120 IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
121 if (!io_s.ok()) {
122 return io_s;
123 }
124 *offset += size_bytes_pos;
125 } else {
126 key_count_for_prefix_++;
127 if (key_count_for_prefix_ == 2) {
128 // For second key within a prefix, need to encode prefix length
129 size_bytes_pos +=
130 EncodeSize(kPrefixFromPreviousKey,
131 static_cast<uint32_t>(pre_prefix_.GetUserKey().size()),
132 size_bytes + size_bytes_pos);
133 }
134 uint32_t prefix_len =
135 static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
136 size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
137 size_bytes + size_bytes_pos);
138 IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
139 if (!io_s.ok()) {
140 return io_s;
141 }
142 *offset += size_bytes_pos;
143 key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
144 }
145 }
146
147 // Encode full key
148 // For value size as varint32 (up to 5 bytes).
149 // If the row is of value type with seqId 0, flush the special flag together
150 // in this buffer to safe one file append call, which takes 1 byte.
151 if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
152 IOStatus io_s =
153 file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
154 if (!io_s.ok()) {
155 return io_s;
156 }
157 *offset += key_to_write.size() - 8;
158 meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
159 *meta_bytes_buf_size += 1;
160 } else {
161 IOStatus io_s = file->Append(key_to_write);
162 if (!io_s.ok()) {
163 return io_s;
164 }
165 *offset += key_to_write.size();
166 }
167
168 return IOStatus::OK();
169 }
170
GetFromBuffer(Buffer * buffer,uint32_t file_offset,uint32_t len)171 Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
172 uint32_t len) {
173 assert(file_offset + len <= file_info_->data_end_offset);
174 return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
175 len);
176 }
177
ReadNonMmap(uint32_t file_offset,uint32_t len,Slice * out)178 bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
179 Slice* out) {
180 const uint32_t kPrefetchSize = 256u;
181
182 // Try to read from buffers.
183 for (uint32_t i = 0; i < num_buf_; i++) {
184 Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
185 if (file_offset >= buffer->buf_start_offset &&
186 file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
187 *out = GetFromBuffer(buffer, file_offset, len);
188 return true;
189 }
190 }
191
192 Buffer* new_buffer;
193 // Data needed is not in any of the buffer. Allocate a new buffer.
194 if (num_buf_ < buffers_.size()) {
195 // Add a new buffer
196 new_buffer = new Buffer();
197 buffers_[num_buf_++].reset(new_buffer);
198 } else {
199 // Now simply replace the last buffer. Can improve the placement policy
200 // if needed.
201 new_buffer = buffers_[num_buf_ - 1].get();
202 }
203
204 assert(file_offset + len <= file_info_->data_end_offset);
205 uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
206 std::max(kPrefetchSize, len));
207 if (size_to_read > new_buffer->buf_capacity) {
208 new_buffer->buf.reset(new char[size_to_read]);
209 new_buffer->buf_capacity = size_to_read;
210 new_buffer->buf_len = 0;
211 }
212 Slice read_result;
213 Status s = file_info_->file->Read(file_offset, size_to_read, &read_result,
214 new_buffer->buf.get(), nullptr);
215 if (!s.ok()) {
216 status_ = s;
217 return false;
218 }
219 new_buffer->buf_start_offset = file_offset;
220 new_buffer->buf_len = size_to_read;
221 *out = GetFromBuffer(new_buffer, file_offset, len);
222 return true;
223 }
224
ReadVarint32(uint32_t offset,uint32_t * out,uint32_t * bytes_read)225 inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
226 uint32_t* bytes_read) {
227 if (file_info_->is_mmap_mode) {
228 const char* start = file_info_->file_data.data() + offset;
229 const char* limit =
230 file_info_->file_data.data() + file_info_->data_end_offset;
231 const char* key_ptr = GetVarint32Ptr(start, limit, out);
232 assert(key_ptr != nullptr);
233 *bytes_read = static_cast<uint32_t>(key_ptr - start);
234 return true;
235 } else {
236 return ReadVarint32NonMmap(offset, out, bytes_read);
237 }
238 }
239
ReadVarint32NonMmap(uint32_t offset,uint32_t * out,uint32_t * bytes_read)240 bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
241 uint32_t* bytes_read) {
242 const char* start;
243 const char* limit;
244 const uint32_t kMaxVarInt32Size = 6u;
245 uint32_t bytes_to_read =
246 std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
247 Slice bytes;
248 if (!Read(offset, bytes_to_read, &bytes)) {
249 return false;
250 }
251 start = bytes.data();
252 limit = bytes.data() + bytes.size();
253
254 const char* key_ptr = GetVarint32Ptr(start, limit, out);
255 *bytes_read =
256 (key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
257 return true;
258 }
259
ReadInternalKey(uint32_t file_offset,uint32_t user_key_size,ParsedInternalKey * parsed_key,uint32_t * bytes_read,bool * internal_key_valid,Slice * internal_key)260 Status PlainTableKeyDecoder::ReadInternalKey(
261 uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
262 uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
263 Slice tmp_slice;
264 bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
265 if (!success) {
266 return file_reader_.status();
267 }
268 if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
269 // Special encoding for the row with seqID=0
270 parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
271 parsed_key->sequence = 0;
272 parsed_key->type = kTypeValue;
273 *bytes_read += user_key_size + 1;
274 *internal_key_valid = false;
275 } else {
276 success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
277 if (!success) {
278 return file_reader_.status();
279 }
280 *internal_key_valid = true;
281 if (!ParseInternalKey(*internal_key, parsed_key)) {
282 return Status::Corruption(
283 Slice("Incorrect value type found when reading the next key"));
284 }
285 *bytes_read += user_key_size + 8;
286 }
287 return Status::OK();
288 }
289
NextPlainEncodingKey(uint32_t start_offset,ParsedInternalKey * parsed_key,Slice * internal_key,uint32_t * bytes_read,bool *)290 Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
291 ParsedInternalKey* parsed_key,
292 Slice* internal_key,
293 uint32_t* bytes_read,
294 bool* /*seekable*/) {
295 uint32_t user_key_size = 0;
296 Status s;
297 if (fixed_user_key_len_ != kPlainTableVariableLength) {
298 user_key_size = fixed_user_key_len_;
299 } else {
300 uint32_t tmp_size = 0;
301 uint32_t tmp_read;
302 bool success =
303 file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
304 if (!success) {
305 return file_reader_.status();
306 }
307 assert(tmp_read > 0);
308 user_key_size = tmp_size;
309 *bytes_read = tmp_read;
310 }
311 // dummy initial value to avoid compiler complain
312 bool decoded_internal_key_valid = true;
313 Slice decoded_internal_key;
314 s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
315 bytes_read, &decoded_internal_key_valid,
316 &decoded_internal_key);
317 if (!s.ok()) {
318 return s;
319 }
320 if (!file_reader_.file_info()->is_mmap_mode) {
321 cur_key_.SetInternalKey(*parsed_key);
322 parsed_key->user_key =
323 Slice(cur_key_.GetInternalKey().data(), user_key_size);
324 if (internal_key != nullptr) {
325 *internal_key = cur_key_.GetInternalKey();
326 }
327 } else if (internal_key != nullptr) {
328 if (decoded_internal_key_valid) {
329 *internal_key = decoded_internal_key;
330 } else {
331 // Need to copy out the internal key
332 cur_key_.SetInternalKey(*parsed_key);
333 *internal_key = cur_key_.GetInternalKey();
334 }
335 }
336 return Status::OK();
337 }
338
NextPrefixEncodingKey(uint32_t start_offset,ParsedInternalKey * parsed_key,Slice * internal_key,uint32_t * bytes_read,bool * seekable)339 Status PlainTableKeyDecoder::NextPrefixEncodingKey(
340 uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
341 uint32_t* bytes_read, bool* seekable) {
342 PlainTableEntryType entry_type;
343
344 bool expect_suffix = false;
345 Status s;
346 do {
347 uint32_t size = 0;
348 // dummy initial value to avoid compiler complain
349 bool decoded_internal_key_valid = true;
350 uint32_t my_bytes_read = 0;
351 s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
352 &my_bytes_read);
353 if (!s.ok()) {
354 return s;
355 }
356 if (my_bytes_read == 0) {
357 return Status::Corruption("Unexpected EOF when reading size of the key");
358 }
359 *bytes_read += my_bytes_read;
360
361 switch (entry_type) {
362 case kFullKey: {
363 expect_suffix = false;
364 Slice decoded_internal_key;
365 s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
366 bytes_read, &decoded_internal_key_valid,
367 &decoded_internal_key);
368 if (!s.ok()) {
369 return s;
370 }
371 if (!file_reader_.file_info()->is_mmap_mode ||
372 (internal_key != nullptr && !decoded_internal_key_valid)) {
373 // In non-mmap mode, always need to make a copy of keys returned to
374 // users, because after reading value for the key, the key might
375 // be invalid.
376 cur_key_.SetInternalKey(*parsed_key);
377 saved_user_key_ = cur_key_.GetUserKey();
378 if (!file_reader_.file_info()->is_mmap_mode) {
379 parsed_key->user_key =
380 Slice(cur_key_.GetInternalKey().data(), size);
381 }
382 if (internal_key != nullptr) {
383 *internal_key = cur_key_.GetInternalKey();
384 }
385 } else {
386 if (internal_key != nullptr) {
387 *internal_key = decoded_internal_key;
388 }
389 saved_user_key_ = parsed_key->user_key;
390 }
391 break;
392 }
393 case kPrefixFromPreviousKey: {
394 if (seekable != nullptr) {
395 *seekable = false;
396 }
397 prefix_len_ = size;
398 assert(prefix_extractor_ == nullptr ||
399 prefix_extractor_->Transform(saved_user_key_).size() ==
400 prefix_len_);
401 // Need read another size flag for suffix
402 expect_suffix = true;
403 break;
404 }
405 case kKeySuffix: {
406 expect_suffix = false;
407 if (seekable != nullptr) {
408 *seekable = false;
409 }
410
411 Slice tmp_slice;
412 s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
413 bytes_read, &decoded_internal_key_valid,
414 &tmp_slice);
415 if (!s.ok()) {
416 return s;
417 }
418 if (!file_reader_.file_info()->is_mmap_mode) {
419 // In non-mmap mode, we need to make a copy of keys returned to
420 // users, because after reading value for the key, the key might
421 // be invalid.
422 // saved_user_key_ points to cur_key_. We are making a copy of
423 // the prefix part to another string, and construct the current
424 // key from the prefix part and the suffix part back to cur_key_.
425 std::string tmp =
426 Slice(saved_user_key_.data(), prefix_len_).ToString();
427 cur_key_.Reserve(prefix_len_ + size);
428 cur_key_.SetInternalKey(tmp, *parsed_key);
429 parsed_key->user_key =
430 Slice(cur_key_.GetInternalKey().data(), prefix_len_ + size);
431 saved_user_key_ = cur_key_.GetUserKey();
432 } else {
433 cur_key_.Reserve(prefix_len_ + size);
434 cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
435 *parsed_key);
436 }
437 parsed_key->user_key = cur_key_.GetUserKey();
438 if (internal_key != nullptr) {
439 *internal_key = cur_key_.GetInternalKey();
440 }
441 break;
442 }
443 default:
444 return Status::Corruption("Un-identified size flag.");
445 }
446 } while (expect_suffix); // Another round if suffix is expected.
447 return Status::OK();
448 }
449
NextKey(uint32_t start_offset,ParsedInternalKey * parsed_key,Slice * internal_key,Slice * value,uint32_t * bytes_read,bool * seekable)450 Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
451 ParsedInternalKey* parsed_key,
452 Slice* internal_key, Slice* value,
453 uint32_t* bytes_read, bool* seekable) {
454 assert(value != nullptr);
455 Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
456 seekable);
457 if (s.ok()) {
458 assert(bytes_read != nullptr);
459 uint32_t value_size;
460 uint32_t value_size_bytes;
461 bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
462 &value_size, &value_size_bytes);
463 if (!success) {
464 return file_reader_.status();
465 }
466 if (value_size_bytes == 0) {
467 return Status::Corruption(
468 "Unexpected EOF when reading the next value's size.");
469 }
470 *bytes_read += value_size_bytes;
471 success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
472 if (!success) {
473 return file_reader_.status();
474 }
475 *bytes_read += value_size;
476 }
477 return s;
478 }
479
NextKeyNoValue(uint32_t start_offset,ParsedInternalKey * parsed_key,Slice * internal_key,uint32_t * bytes_read,bool * seekable)480 Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
481 ParsedInternalKey* parsed_key,
482 Slice* internal_key,
483 uint32_t* bytes_read,
484 bool* seekable) {
485 *bytes_read = 0;
486 if (seekable != nullptr) {
487 *seekable = true;
488 }
489 Status s;
490 if (encoding_type_ == kPlain) {
491 return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
492 bytes_read, seekable);
493 } else {
494 assert(encoding_type_ == kPrefix);
495 return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
496 bytes_read, seekable);
497 }
498 }
499
500 } // namespace ROCKSDB_NAMESPACE
501 #endif // ROCKSDB_LIT
502