1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 
12 #include <fcntl.h>
13 #include <cinttypes>
14 
15 #include <algorithm>
16 #include <map>
17 #include <set>
18 #include <string>
19 #include <thread>
20 #include <unordered_set>
21 #include <utility>
22 #include <vector>
23 
24 #include "db/db_impl/db_impl.h"
25 #include "db/dbformat.h"
26 #include "env/mock_env.h"
27 #include "file/filename.h"
28 #include "memtable/hash_linklist_rep.h"
29 #include "rocksdb/cache.h"
30 #include "rocksdb/compaction_filter.h"
31 #include "rocksdb/convenience.h"
32 #include "rocksdb/db.h"
33 #include "rocksdb/env.h"
34 #include "rocksdb/filter_policy.h"
35 #include "rocksdb/options.h"
36 #include "rocksdb/slice.h"
37 #include "rocksdb/sst_file_writer.h"
38 #include "rocksdb/statistics.h"
39 #include "rocksdb/table.h"
40 #include "rocksdb/utilities/checkpoint.h"
41 #include "table/block_based/block_based_table_factory.h"
42 #include "table/mock_table.h"
43 #include "table/plain/plain_table_factory.h"
44 #include "table/scoped_arena_iterator.h"
45 #include "test_util/mock_time_env.h"
46 #include "util/compression.h"
47 #include "util/mutexlock.h"
48 
49 #include "test_util/sync_point.h"
50 #include "test_util/testharness.h"
51 #include "test_util/testutil.h"
52 #include "util/string_util.h"
53 #include "utilities/merge_operators.h"
54 
55 namespace ROCKSDB_NAMESPACE {
56 
57 namespace anon {
58 class AtomicCounter {
59  public:
60   explicit AtomicCounter(Env* env = NULL)
env_(env)61       : env_(env), cond_count_(&mu_), count_(0) {}
62 
Increment()63   void Increment() {
64     MutexLock l(&mu_);
65     count_++;
66     cond_count_.SignalAll();
67   }
68 
Read()69   int Read() {
70     MutexLock l(&mu_);
71     return count_;
72   }
73 
WaitFor(int count)74   bool WaitFor(int count) {
75     MutexLock l(&mu_);
76 
77     uint64_t start = env_->NowMicros();
78     while (count_ < count) {
79       uint64_t now = env_->NowMicros();
80       cond_count_.TimedWait(now + /*1s*/ 1 * 1000 * 1000);
81       if (env_->NowMicros() - start > /*10s*/ 10 * 1000 * 1000) {
82         return false;
83       }
84       if (count_ < count) {
85         GTEST_LOG_(WARNING) << "WaitFor is taking more time than usual";
86       }
87     }
88 
89     return true;
90   }
91 
Reset()92   void Reset() {
93     MutexLock l(&mu_);
94     count_ = 0;
95     cond_count_.SignalAll();
96   }
97 
98  private:
99   Env* env_;
100   port::Mutex mu_;
101   port::CondVar cond_count_;
102   int count_;
103 };
104 
105 struct OptionsOverride {
106   std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
107   // These will be used only if filter_policy is set
108   bool partition_filters = false;
109   uint64_t metadata_block_size = 1024;
110 
111   // Used as a bit mask of individual enums in which to skip an XF test point
112   int skip_policy = 0;
113 };
114 
115 }  // namespace anon
116 
117 enum SkipPolicy { kSkipNone = 0, kSkipNoSnapshot = 1, kSkipNoPrefix = 2 };
118 
119 // A hacky skip list mem table that triggers flush after number of entries.
120 class SpecialMemTableRep : public MemTableRep {
121  public:
SpecialMemTableRep(Allocator * allocator,MemTableRep * memtable,int num_entries_flush)122   explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable,
123                               int num_entries_flush)
124       : MemTableRep(allocator),
125         memtable_(memtable),
126         num_entries_flush_(num_entries_flush),
127         num_entries_(0) {}
128 
Allocate(const size_t len,char ** buf)129   virtual KeyHandle Allocate(const size_t len, char** buf) override {
130     return memtable_->Allocate(len, buf);
131   }
132 
133   // Insert key into the list.
134   // REQUIRES: nothing that compares equal to key is currently in the list.
Insert(KeyHandle handle)135   virtual void Insert(KeyHandle handle) override {
136     num_entries_++;
137     memtable_->Insert(handle);
138   }
139 
InsertConcurrently(KeyHandle handle)140   void InsertConcurrently(KeyHandle handle) override {
141     num_entries_++;
142     memtable_->Insert(handle);
143   }
144 
145   // Returns true iff an entry that compares equal to key is in the list.
Contains(const char * key)146   virtual bool Contains(const char* key) const override {
147     return memtable_->Contains(key);
148   }
149 
ApproximateMemoryUsage()150   virtual size_t ApproximateMemoryUsage() override {
151     // Return a high memory usage when number of entries exceeds the threshold
152     // to trigger a flush.
153     return (num_entries_ < num_entries_flush_) ? 0 : 1024 * 1024 * 1024;
154   }
155 
Get(const LookupKey & k,void * callback_args,bool (* callback_func)(void * arg,const char * entry))156   virtual void Get(const LookupKey& k, void* callback_args,
157                    bool (*callback_func)(void* arg,
158                                          const char* entry)) override {
159     memtable_->Get(k, callback_args, callback_func);
160   }
161 
ApproximateNumEntries(const Slice & start_ikey,const Slice & end_ikey)162   uint64_t ApproximateNumEntries(const Slice& start_ikey,
163                                  const Slice& end_ikey) override {
164     return memtable_->ApproximateNumEntries(start_ikey, end_ikey);
165   }
166 
167   virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
168     return memtable_->GetIterator(arena);
169   }
170 
~SpecialMemTableRep()171   virtual ~SpecialMemTableRep() override {}
172 
173  private:
174   std::unique_ptr<MemTableRep> memtable_;
175   int num_entries_flush_;
176   int num_entries_;
177 };
178 
179 // The factory for the hacky skip list mem table that triggers flush after
180 // number of entries exceeds a threshold.
181 class SpecialSkipListFactory : public MemTableRepFactory {
182  public:
183   // After number of inserts exceeds `num_entries_flush` in a mem table, trigger
184   // flush.
SpecialSkipListFactory(int num_entries_flush)185   explicit SpecialSkipListFactory(int num_entries_flush)
186       : num_entries_flush_(num_entries_flush) {}
187 
188   using MemTableRepFactory::CreateMemTableRep;
CreateMemTableRep(const MemTableRep::KeyComparator & compare,Allocator * allocator,const SliceTransform * transform,Logger *)189   virtual MemTableRep* CreateMemTableRep(
190       const MemTableRep::KeyComparator& compare, Allocator* allocator,
191       const SliceTransform* transform, Logger* /*logger*/) override {
192     return new SpecialMemTableRep(
193         allocator, factory_.CreateMemTableRep(compare, allocator, transform, 0),
194         num_entries_flush_);
195   }
Name()196   virtual const char* Name() const override { return "SkipListFactory"; }
197 
IsInsertConcurrentlySupported()198   bool IsInsertConcurrentlySupported() const override {
199     return factory_.IsInsertConcurrentlySupported();
200   }
201 
202  private:
203   SkipListFactory factory_;
204   int num_entries_flush_;
205 };
206 
207 // Special Env used to delay background operations
208 class SpecialEnv : public EnvWrapper {
209  public:
210   explicit SpecialEnv(Env* base);
211 
NewWritableFile(const std::string & f,std::unique_ptr<WritableFile> * r,const EnvOptions & soptions)212   Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
213                          const EnvOptions& soptions) override {
214     class SSTableFile : public WritableFile {
215      private:
216       SpecialEnv* env_;
217       std::unique_ptr<WritableFile> base_;
218 
219      public:
220       SSTableFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& base)
221           : env_(env), base_(std::move(base)) {}
222       Status Append(const Slice& data) override {
223         if (env_->table_write_callback_) {
224           (*env_->table_write_callback_)();
225         }
226         if (env_->drop_writes_.load(std::memory_order_acquire)) {
227           // Drop writes on the floor
228           return Status::OK();
229         } else if (env_->no_space_.load(std::memory_order_acquire)) {
230           return Status::NoSpace("No space left on device");
231         } else {
232           env_->bytes_written_ += data.size();
233           return base_->Append(data);
234         }
235       }
236       Status PositionedAppend(const Slice& data, uint64_t offset) override {
237         if (env_->table_write_callback_) {
238           (*env_->table_write_callback_)();
239         }
240         if (env_->drop_writes_.load(std::memory_order_acquire)) {
241           // Drop writes on the floor
242           return Status::OK();
243         } else if (env_->no_space_.load(std::memory_order_acquire)) {
244           return Status::NoSpace("No space left on device");
245         } else {
246           env_->bytes_written_ += data.size();
247           return base_->PositionedAppend(data, offset);
248         }
249       }
250       Status Truncate(uint64_t size) override { return base_->Truncate(size); }
251       Status RangeSync(uint64_t offset, uint64_t nbytes) override {
252         Status s = base_->RangeSync(offset, nbytes);
253 #if !(defined NDEBUG) || !defined(OS_WIN)
254         TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s);
255 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
256         return s;
257       }
258       Status Close() override {
259 // SyncPoint is not supported in Released Windows Mode.
260 #if !(defined NDEBUG) || !defined(OS_WIN)
261         // Check preallocation size
262         // preallocation size is never passed to base file.
263         size_t preallocation_size = preallocation_block_size();
264         TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus",
265                                  &preallocation_size);
266 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
267         Status s = base_->Close();
268 #if !(defined NDEBUG) || !defined(OS_WIN)
269         TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s);
270 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
271         return s;
272       }
273       Status Flush() override { return base_->Flush(); }
274       Status Sync() override {
275         ++env_->sync_counter_;
276         while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) {
277           env_->SleepForMicroseconds(100000);
278         }
279         Status s = base_->Sync();
280 #if !(defined NDEBUG) || !defined(OS_WIN)
281         TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s);
282 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
283         return s;
284       }
285       void SetIOPriority(Env::IOPriority pri) override {
286         base_->SetIOPriority(pri);
287       }
288       Env::IOPriority GetIOPriority() override {
289         return base_->GetIOPriority();
290       }
291       bool use_direct_io() const override {
292         return base_->use_direct_io();
293       }
294       Status Allocate(uint64_t offset, uint64_t len) override {
295         return base_->Allocate(offset, len);
296       }
297     };
298     class ManifestFile : public WritableFile {
299      public:
300       ManifestFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
301           : env_(env), base_(std::move(b)) {}
302       Status Append(const Slice& data) override {
303         if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
304           return Status::IOError("simulated writer error");
305         } else {
306           return base_->Append(data);
307         }
308       }
309       Status Truncate(uint64_t size) override { return base_->Truncate(size); }
310       Status Close() override { return base_->Close(); }
311       Status Flush() override { return base_->Flush(); }
312       Status Sync() override {
313         ++env_->sync_counter_;
314         if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
315           return Status::IOError("simulated sync error");
316         } else {
317           return base_->Sync();
318         }
319       }
320       uint64_t GetFileSize() override { return base_->GetFileSize(); }
321       Status Allocate(uint64_t offset, uint64_t len) override {
322         return base_->Allocate(offset, len);
323       }
324 
325      private:
326       SpecialEnv* env_;
327       std::unique_ptr<WritableFile> base_;
328     };
329     class WalFile : public WritableFile {
330      public:
331       WalFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
332           : env_(env), base_(std::move(b)) {
333         env_->num_open_wal_file_.fetch_add(1);
334       }
335       virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
336       Status Append(const Slice& data) override {
337 #if !(defined NDEBUG) || !defined(OS_WIN)
338         TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
339 #endif
340         Status s;
341         if (env_->log_write_error_.load(std::memory_order_acquire)) {
342           s = Status::IOError("simulated writer error");
343         } else {
344           int slowdown =
345               env_->log_write_slowdown_.load(std::memory_order_acquire);
346           if (slowdown > 0) {
347             env_->SleepForMicroseconds(slowdown);
348           }
349           s = base_->Append(data);
350         }
351 #if !(defined NDEBUG) || !defined(OS_WIN)
352         TEST_SYNC_POINT("SpecialEnv::WalFile::Append:2");
353 #endif
354         return s;
355       }
356       Status Truncate(uint64_t size) override { return base_->Truncate(size); }
357       Status Close() override {
358 // SyncPoint is not supported in Released Windows Mode.
359 #if !(defined NDEBUG) || !defined(OS_WIN)
360         // Check preallocation size
361         // preallocation size is never passed to base file.
362         size_t preallocation_size = preallocation_block_size();
363         TEST_SYNC_POINT_CALLBACK("DBTestWalFile.GetPreallocationStatus",
364                                  &preallocation_size);
365 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
366 
367         return base_->Close();
368       }
369       Status Flush() override { return base_->Flush(); }
370       Status Sync() override {
371         ++env_->sync_counter_;
372         return base_->Sync();
373       }
374       bool IsSyncThreadSafe() const override {
375         return env_->is_wal_sync_thread_safe_.load();
376       }
377       Status Allocate(uint64_t offset, uint64_t len) override {
378         return base_->Allocate(offset, len);
379       }
380 
381      private:
382       SpecialEnv* env_;
383       std::unique_ptr<WritableFile> base_;
384     };
385 
386     if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
387       uint32_t random_number;
388       {
389         MutexLock l(&rnd_mutex_);
390         random_number = rnd_.Uniform(100);
391       }
392       if (random_number < non_writeable_rate_.load()) {
393         return Status::IOError("simulated random write error");
394       }
395     }
396 
397     new_writable_count_++;
398 
399     if (non_writable_count_.load() > 0) {
400       non_writable_count_--;
401       return Status::IOError("simulated write error");
402     }
403 
404     EnvOptions optimized = soptions;
405     if (strstr(f.c_str(), "MANIFEST") != nullptr ||
406         strstr(f.c_str(), "log") != nullptr) {
407       optimized.use_mmap_writes = false;
408       optimized.use_direct_writes = false;
409     }
410 
411     Status s = target()->NewWritableFile(f, r, optimized);
412     if (s.ok()) {
413       if (strstr(f.c_str(), ".sst") != nullptr) {
414         r->reset(new SSTableFile(this, std::move(*r)));
415       } else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
416         r->reset(new ManifestFile(this, std::move(*r)));
417       } else if (strstr(f.c_str(), "log") != nullptr) {
418         r->reset(new WalFile(this, std::move(*r)));
419       }
420     }
421     return s;
422   }
423 
NewRandomAccessFile(const std::string & f,std::unique_ptr<RandomAccessFile> * r,const EnvOptions & soptions)424   Status NewRandomAccessFile(const std::string& f,
425                              std::unique_ptr<RandomAccessFile>* r,
426                              const EnvOptions& soptions) override {
427     class CountingFile : public RandomAccessFile {
428      public:
429       CountingFile(std::unique_ptr<RandomAccessFile>&& target,
430                    anon::AtomicCounter* counter,
431                    std::atomic<size_t>* bytes_read)
432           : target_(std::move(target)),
433             counter_(counter),
434             bytes_read_(bytes_read) {}
435       virtual Status Read(uint64_t offset, size_t n, Slice* result,
436                           char* scratch) const override {
437         counter_->Increment();
438         Status s = target_->Read(offset, n, result, scratch);
439         *bytes_read_ += result->size();
440         return s;
441       }
442 
443       virtual Status Prefetch(uint64_t offset, size_t n) override {
444         Status s = target_->Prefetch(offset, n);
445         *bytes_read_ += n;
446         return s;
447       }
448 
449      private:
450       std::unique_ptr<RandomAccessFile> target_;
451       anon::AtomicCounter* counter_;
452       std::atomic<size_t>* bytes_read_;
453     };
454 
455     Status s = target()->NewRandomAccessFile(f, r, soptions);
456     random_file_open_counter_++;
457     if (s.ok() && count_random_reads_) {
458       r->reset(new CountingFile(std::move(*r), &random_read_counter_,
459                                 &random_read_bytes_counter_));
460     }
461     if (s.ok() && soptions.compaction_readahead_size > 0) {
462       compaction_readahead_size_ = soptions.compaction_readahead_size;
463     }
464     return s;
465   }
466 
NewSequentialFile(const std::string & f,std::unique_ptr<SequentialFile> * r,const EnvOptions & soptions)467   virtual Status NewSequentialFile(const std::string& f,
468                                    std::unique_ptr<SequentialFile>* r,
469                                    const EnvOptions& soptions) override {
470     class CountingFile : public SequentialFile {
471      public:
472       CountingFile(std::unique_ptr<SequentialFile>&& target,
473                    anon::AtomicCounter* counter)
474           : target_(std::move(target)), counter_(counter) {}
475       virtual Status Read(size_t n, Slice* result, char* scratch) override {
476         counter_->Increment();
477         return target_->Read(n, result, scratch);
478       }
479       virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
480 
481      private:
482       std::unique_ptr<SequentialFile> target_;
483       anon::AtomicCounter* counter_;
484     };
485 
486     Status s = target()->NewSequentialFile(f, r, soptions);
487     if (s.ok() && count_sequential_reads_) {
488       r->reset(new CountingFile(std::move(*r), &sequential_read_counter_));
489     }
490     return s;
491   }
492 
SleepForMicroseconds(int micros)493   virtual void SleepForMicroseconds(int micros) override {
494     sleep_counter_.Increment();
495     if (no_slowdown_ || time_elapse_only_sleep_) {
496       addon_time_.fetch_add(micros);
497     }
498     if (!no_slowdown_) {
499       target()->SleepForMicroseconds(micros);
500     }
501   }
502 
GetCurrentTime(int64_t * unix_time)503   virtual Status GetCurrentTime(int64_t* unix_time) override {
504     Status s;
505     if (!time_elapse_only_sleep_) {
506       s = target()->GetCurrentTime(unix_time);
507     }
508     if (s.ok()) {
509       *unix_time += addon_time_.load();
510     }
511     return s;
512   }
513 
NowCPUNanos()514   virtual uint64_t NowCPUNanos() override {
515     now_cpu_count_.fetch_add(1);
516     return target()->NowCPUNanos();
517   }
518 
NowNanos()519   virtual uint64_t NowNanos() override {
520     return (time_elapse_only_sleep_ ? 0 : target()->NowNanos()) +
521            addon_time_.load() * 1000;
522   }
523 
NowMicros()524   virtual uint64_t NowMicros() override {
525     return (time_elapse_only_sleep_ ? 0 : target()->NowMicros()) +
526            addon_time_.load();
527   }
528 
DeleteFile(const std::string & fname)529   virtual Status DeleteFile(const std::string& fname) override {
530     delete_count_.fetch_add(1);
531     return target()->DeleteFile(fname);
532   }
533 
534   Random rnd_;
535   port::Mutex rnd_mutex_;  // Lock to pretect rnd_
536 
537   // sstable Sync() calls are blocked while this pointer is non-nullptr.
538   std::atomic<bool> delay_sstable_sync_;
539 
540   // Drop writes on the floor while this pointer is non-nullptr.
541   std::atomic<bool> drop_writes_;
542 
543   // Simulate no-space errors while this pointer is non-nullptr.
544   std::atomic<bool> no_space_;
545 
546   // Simulate non-writable file system while this pointer is non-nullptr
547   std::atomic<bool> non_writable_;
548 
549   // Force sync of manifest files to fail while this pointer is non-nullptr
550   std::atomic<bool> manifest_sync_error_;
551 
552   // Force write to manifest files to fail while this pointer is non-nullptr
553   std::atomic<bool> manifest_write_error_;
554 
555   // Force write to log files to fail while this pointer is non-nullptr
556   std::atomic<bool> log_write_error_;
557 
558   // Slow down every log write, in micro-seconds.
559   std::atomic<int> log_write_slowdown_;
560 
561   // Number of WAL files that are still open for write.
562   std::atomic<int> num_open_wal_file_;
563 
564   bool count_random_reads_;
565   anon::AtomicCounter random_read_counter_;
566   std::atomic<size_t> random_read_bytes_counter_;
567   std::atomic<int> random_file_open_counter_;
568 
569   bool count_sequential_reads_;
570   anon::AtomicCounter sequential_read_counter_;
571 
572   anon::AtomicCounter sleep_counter_;
573 
574   std::atomic<int64_t> bytes_written_;
575 
576   std::atomic<int> sync_counter_;
577 
578   std::atomic<uint32_t> non_writeable_rate_;
579 
580   std::atomic<uint32_t> new_writable_count_;
581 
582   std::atomic<uint32_t> non_writable_count_;
583 
584   std::function<void()>* table_write_callback_;
585 
586   std::atomic<int64_t> addon_time_;
587 
588   std::atomic<int> now_cpu_count_;
589 
590   std::atomic<int> delete_count_;
591 
592   std::atomic<bool> time_elapse_only_sleep_;
593 
594   bool no_slowdown_;
595 
596   std::atomic<bool> is_wal_sync_thread_safe_{true};
597 
598   std::atomic<size_t> compaction_readahead_size_{};
599 };
600 
601 #ifndef ROCKSDB_LITE
602 class OnFileDeletionListener : public EventListener {
603  public:
OnFileDeletionListener()604   OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {}
605 
SetExpectedFileName(const std::string file_name)606   void SetExpectedFileName(const std::string file_name) {
607     expected_file_name_ = file_name;
608   }
609 
VerifyMatchedCount(size_t expected_value)610   void VerifyMatchedCount(size_t expected_value) {
611     ASSERT_EQ(matched_count_, expected_value);
612   }
613 
OnTableFileDeleted(const TableFileDeletionInfo & info)614   void OnTableFileDeleted(const TableFileDeletionInfo& info) override {
615     if (expected_file_name_ != "") {
616       ASSERT_EQ(expected_file_name_, info.file_path);
617       expected_file_name_ = "";
618       matched_count_++;
619     }
620   }
621 
622  private:
623   size_t matched_count_;
624   std::string expected_file_name_;
625 };
626 #endif
627 
628 // A test merge operator mimics put but also fails if one of merge operands is
629 // "corrupted".
630 class TestPutOperator : public MergeOperator {
631  public:
FullMergeV2(const MergeOperationInput & merge_in,MergeOperationOutput * merge_out)632   virtual bool FullMergeV2(const MergeOperationInput& merge_in,
633                            MergeOperationOutput* merge_out) const override {
634     if (merge_in.existing_value != nullptr &&
635         *(merge_in.existing_value) == "corrupted") {
636       return false;
637     }
638     for (auto value : merge_in.operand_list) {
639       if (value == "corrupted") {
640         return false;
641       }
642     }
643     merge_out->existing_operand = merge_in.operand_list.back();
644     return true;
645   }
646 
Name()647   virtual const char* Name() const override { return "TestPutOperator"; }
648 };
649 
650 class DBTestBase : public testing::Test {
651  public:
652   // Sequence of option configurations to try
653   enum OptionConfig : int {
654     kDefault = 0,
655     kBlockBasedTableWithPrefixHashIndex = 1,
656     kBlockBasedTableWithWholeKeyHashIndex = 2,
657     kPlainTableFirstBytePrefix = 3,
658     kPlainTableCappedPrefix = 4,
659     kPlainTableCappedPrefixNonMmap = 5,
660     kPlainTableAllBytesPrefix = 6,
661     kVectorRep = 7,
662     kHashLinkList = 8,
663     kMergePut = 9,
664     kFilter = 10,
665     kFullFilterWithNewTableReaderForCompactions = 11,
666     kUncompressed = 12,
667     kNumLevel_3 = 13,
668     kDBLogDir = 14,
669     kWalDirAndMmapReads = 15,
670     kManifestFileSize = 16,
671     kPerfOptions = 17,
672     kHashSkipList = 18,
673     kUniversalCompaction = 19,
674     kUniversalCompactionMultiLevel = 20,
675     kCompressedBlockCache = 21,
676     kInfiniteMaxOpenFiles = 22,
677     kxxHashChecksum = 23,
678     kFIFOCompaction = 24,
679     kOptimizeFiltersForHits = 25,
680     kRowCache = 26,
681     kRecycleLogFiles = 27,
682     kConcurrentSkipList = 28,
683     kPipelinedWrite = 29,
684     kConcurrentWALWrites = 30,
685     kDirectIO,
686     kLevelSubcompactions,
687     kBlockBasedTableWithIndexRestartInterval,
688     kBlockBasedTableWithPartitionedIndex,
689     kBlockBasedTableWithPartitionedIndexFormat4,
690     kPartitionedFilterWithNewTableReaderForCompactions,
691     kUniversalSubcompactions,
692     kxxHash64Checksum,
693     kUnorderedWrite,
694     // This must be the last line
695     kEnd,
696   };
697 
698  public:
699   std::string dbname_;
700   std::string alternative_wal_dir_;
701   std::string alternative_db_log_dir_;
702   MockEnv* mem_env_;
703   Env* encrypted_env_;
704   SpecialEnv* env_;
705   std::shared_ptr<Env> env_guard_;
706   DB* db_;
707   std::vector<ColumnFamilyHandle*> handles_;
708 
709   int option_config_;
710   Options last_options_;
711 
712   // Skip some options, as they may not be applicable to a specific test.
713   // To add more skip constants, use values 4, 8, 16, etc.
714   enum OptionSkip {
715     kNoSkip = 0,
716     kSkipDeletesFilterFirst = 1,
717     kSkipUniversalCompaction = 2,
718     kSkipMergePut = 4,
719     kSkipPlainTable = 8,
720     kSkipHashIndex = 16,
721     kSkipNoSeekToLast = 32,
722     kSkipFIFOCompaction = 128,
723     kSkipMmapReads = 256,
724   };
725 
726   const int kRangeDelSkipConfigs =
727       // Plain tables do not support range deletions.
728       kSkipPlainTable |
729       // MmapReads disables the iterator pinning that RangeDelAggregator
730       // requires.
731       kSkipMmapReads;
732 
733   explicit DBTestBase(const std::string path);
734 
735   ~DBTestBase();
736 
RandomString(Random * rnd,int len)737   static std::string RandomString(Random* rnd, int len) {
738     std::string r;
739     test::RandomString(rnd, len, &r);
740     return r;
741   }
742 
Key(int i)743   static std::string Key(int i) {
744     char buf[100];
745     snprintf(buf, sizeof(buf), "key%06d", i);
746     return std::string(buf);
747   }
748 
749   static bool ShouldSkipOptions(int option_config, int skip_mask = kNoSkip);
750 
751   // Switch to a fresh database with the next option configuration to
752   // test.  Return false if there are no more configurations to test.
753   bool ChangeOptions(int skip_mask = kNoSkip);
754 
755   // Switch between different compaction styles.
756   bool ChangeCompactOptions();
757 
758   // Switch between different WAL-realted options.
759   bool ChangeWalOptions();
760 
761   // Switch between different filter policy
762   // Jump from kDefault to kFilter to kFullFilter
763   bool ChangeFilterOptions();
764 
765   // Switch between different DB options for file ingestion tests.
766   bool ChangeOptionsForFileIngestionTest();
767 
768   // Return the current option configuration.
769   Options CurrentOptions(const anon::OptionsOverride& options_override =
770                              anon::OptionsOverride()) const;
771 
772   Options CurrentOptions(const Options& default_options,
773                          const anon::OptionsOverride& options_override =
774                              anon::OptionsOverride()) const;
775 
776   static Options GetDefaultOptions();
777 
778   Options GetOptions(int option_config,
779                      const Options& default_options = GetDefaultOptions(),
780                      const anon::OptionsOverride& options_override =
781                          anon::OptionsOverride()) const;
782 
dbfull()783   DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }
784 
785   void CreateColumnFamilies(const std::vector<std::string>& cfs,
786                             const Options& options);
787 
788   void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
789                              const Options& options);
790 
791   void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
792                                 const std::vector<Options>& options);
793 
794   void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
795                                 const Options& options);
796 
797   Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
798                                      const std::vector<Options>& options);
799 
800   Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
801                                      const Options& options);
802 
803   void Reopen(const Options& options);
804 
805   void Close();
806 
807   void DestroyAndReopen(const Options& options);
808 
809   void Destroy(const Options& options, bool delete_cf_paths = false);
810 
811   Status ReadOnlyReopen(const Options& options);
812 
813   Status TryReopen(const Options& options);
814 
815   bool IsDirectIOSupported();
816 
817   bool IsMemoryMappedAccessSupported() const;
818 
819   Status Flush(int cf = 0);
820 
821   Status Flush(const std::vector<int>& cf_ids);
822 
823   Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions());
824 
825   Status Put(int cf, const Slice& k, const Slice& v,
826              WriteOptions wo = WriteOptions());
827 
828   Status Merge(const Slice& k, const Slice& v,
829                WriteOptions wo = WriteOptions());
830 
831   Status Merge(int cf, const Slice& k, const Slice& v,
832                WriteOptions wo = WriteOptions());
833 
834   Status Delete(const std::string& k);
835 
836   Status Delete(int cf, const std::string& k);
837 
838   Status SingleDelete(const std::string& k);
839 
840   Status SingleDelete(int cf, const std::string& k);
841 
842   bool SetPreserveDeletesSequenceNumber(SequenceNumber sn);
843 
844   std::string Get(const std::string& k, const Snapshot* snapshot = nullptr);
845 
846   std::string Get(int cf, const std::string& k,
847                   const Snapshot* snapshot = nullptr);
848 
849   Status Get(const std::string& k, PinnableSlice* v);
850 
851   std::vector<std::string> MultiGet(std::vector<int> cfs,
852                                     const std::vector<std::string>& k,
853                                     const Snapshot* snapshot,
854                                     const bool batched);
855 
856   std::vector<std::string> MultiGet(const std::vector<std::string>& k,
857                                     const Snapshot* snapshot = nullptr);
858 
859   uint64_t GetNumSnapshots();
860 
861   uint64_t GetTimeOldestSnapshots();
862 
863   uint64_t GetSequenceOldestSnapshots();
864 
865   // Return a string that contains all key,value pairs in order,
866   // formatted like "(k1->v1)(k2->v2)".
867   std::string Contents(int cf = 0);
868 
869   std::string AllEntriesFor(const Slice& user_key, int cf = 0);
870 
871 #ifndef ROCKSDB_LITE
872   int NumSortedRuns(int cf = 0);
873 
874   uint64_t TotalSize(int cf = 0);
875 
876   uint64_t SizeAtLevel(int level);
877 
878   size_t TotalLiveFiles(int cf = 0);
879 
880   size_t CountLiveFiles();
881 
882   int NumTableFilesAtLevel(int level, int cf = 0);
883 
884   double CompressionRatioAtLevel(int level, int cf = 0);
885 
886   int TotalTableFiles(int cf = 0, int levels = -1);
887 #endif  // ROCKSDB_LITE
888 
889   // Return spread of files per level
890   std::string FilesPerLevel(int cf = 0);
891 
892   size_t CountFiles();
893 
894   uint64_t Size(const Slice& start, const Slice& limit, int cf = 0);
895 
896   void Compact(int cf, const Slice& start, const Slice& limit,
897                uint32_t target_path_id);
898 
899   void Compact(int cf, const Slice& start, const Slice& limit);
900 
901   void Compact(const Slice& start, const Slice& limit);
902 
903   // Do n memtable compactions, each of which produces an sstable
904   // covering the range [small,large].
905   void MakeTables(int n, const std::string& small, const std::string& large,
906                   int cf = 0);
907 
908   // Prevent pushing of new sstables into deeper levels by adding
909   // tables that cover a specified range to all levels.
910   void FillLevels(const std::string& smallest, const std::string& largest,
911                   int cf);
912 
913   void MoveFilesToLevel(int level, int cf = 0);
914 
915 #ifndef ROCKSDB_LITE
916   void DumpFileCounts(const char* label);
917 #endif  // ROCKSDB_LITE
918 
919   std::string DumpSSTableList();
920 
921   static void GetSstFiles(Env* env, std::string path,
922                           std::vector<std::string>* files);
923 
924   int GetSstFileCount(std::string path);
925 
926   // this will generate non-overlapping files since it keeps increasing key_idx
927   void GenerateNewFile(Random* rnd, int* key_idx, bool nowait = false);
928 
929   void GenerateNewFile(int fd, Random* rnd, int* key_idx, bool nowait = false);
930 
931   static const int kNumKeysByGenerateNewRandomFile;
932   static const int KNumKeysByGenerateNewFile = 100;
933 
934   void GenerateNewRandomFile(Random* rnd, bool nowait = false);
935 
936   std::string IterStatus(Iterator* iter);
937 
938   Options OptionsForLogIterTest();
939 
940   std::string DummyString(size_t len, char c = 'a');
941 
942   void VerifyIterLast(std::string expected_key, int cf = 0);
943 
944   // Used to test InplaceUpdate
945 
946   // If previous value is nullptr or delta is > than previous value,
947   //   sets newValue with delta
948   // If previous value is not empty,
949   //   updates previous value with 'b' string of previous value size - 1.
950   static UpdateStatus updateInPlaceSmallerSize(char* prevValue,
951                                                uint32_t* prevSize, Slice delta,
952                                                std::string* newValue);
953 
954   static UpdateStatus updateInPlaceSmallerVarintSize(char* prevValue,
955                                                      uint32_t* prevSize,
956                                                      Slice delta,
957                                                      std::string* newValue);
958 
959   static UpdateStatus updateInPlaceLargerSize(char* prevValue,
960                                               uint32_t* prevSize, Slice delta,
961                                               std::string* newValue);
962 
963   static UpdateStatus updateInPlaceNoAction(char* prevValue, uint32_t* prevSize,
964                                             Slice delta, std::string* newValue);
965 
966   // Utility method to test InplaceUpdate
967   void validateNumberOfEntries(int numValues, int cf = 0);
968 
969   void CopyFile(const std::string& source, const std::string& destination,
970                 uint64_t size = 0);
971 
972   std::unordered_map<std::string, uint64_t> GetAllSSTFiles(
973       uint64_t* total_size = nullptr);
974 
975   std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path);
976 
977   void VerifyDBFromMap(
978       std::map<std::string, std::string> true_data,
979       size_t* total_reads_res = nullptr, bool tailing_iter = false,
980       std::map<std::string, Status> status = std::map<std::string, Status>());
981 
982   void VerifyDBInternal(
983       std::vector<std::pair<std::string, std::string>> true_data);
984 
985 #ifndef ROCKSDB_LITE
986   uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
987                                               std::string column_family_name);
988 #endif  // ROCKSDB_LITE
989 
TestGetTickerCount(const Options & options,Tickers ticker_type)990   uint64_t TestGetTickerCount(const Options& options, Tickers ticker_type) {
991     return options.statistics->getTickerCount(ticker_type);
992   }
993 
TestGetAndResetTickerCount(const Options & options,Tickers ticker_type)994   uint64_t TestGetAndResetTickerCount(const Options& options,
995                                       Tickers ticker_type) {
996     return options.statistics->getAndResetTickerCount(ticker_type);
997   }
998 };
999 
1000 }  // namespace ROCKSDB_NAMESPACE
1001