1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 #ifndef GFLAGS
8 #include <cstdio>
main()9 int main() {
10   fprintf(stderr, "Please install gflags to run rocksdb tools\n");
11   return 1;
12 }
13 #else
14 
15 #include <atomic>
16 #include <cstdio>
17 
18 #include "db/write_batch_internal.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/types.h"
21 #include "test_util/testutil.h"
22 #include "util/gflags_compat.h"
23 
24 // Run a thread to perform Put's.
25 // Another thread uses GetUpdatesSince API to keep getting the updates.
26 // options :
27 // --num_inserts = the num of inserts the first thread should perform.
28 // --wal_ttl = the wal ttl for the run.
29 
30 using namespace ROCKSDB_NAMESPACE;
31 
32 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
33 using GFLAGS_NAMESPACE::SetUsageMessage;
34 
35 struct DataPumpThread {
36   size_t no_records;
37   DB* db;  // Assumption DB is Open'ed already.
38 };
39 
RandomString(Random * rnd,int len)40 static std::string RandomString(Random* rnd, int len) {
41   std::string r;
42   test::RandomString(rnd, len, &r);
43   return r;
44 }
45 
DataPumpThreadBody(void * arg)46 static void DataPumpThreadBody(void* arg) {
47   DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
48   DB* db = t->db;
49   Random rnd(301);
50   size_t i = 0;
51   while (i++ < t->no_records) {
52     if (!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
53                  Slice(RandomString(&rnd, 500)))
54              .ok()) {
55       fprintf(stderr, "Error in put\n");
56       exit(1);
57     }
58   }
59 }
60 
61 struct ReplicationThread {
62   std::atomic<bool> stop;
63   DB* db;
64   volatile size_t no_read;
65 };
66 
ReplicationThreadBody(void * arg)67 static void ReplicationThreadBody(void* arg) {
68   ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
69   DB* db = t->db;
70   std::unique_ptr<TransactionLogIterator> iter;
71   SequenceNumber currentSeqNum = 1;
72   while (!t->stop.load(std::memory_order_acquire)) {
73     iter.reset();
74     Status s;
75     while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
76       if (t->stop.load(std::memory_order_acquire)) {
77         return;
78       }
79     }
80     fprintf(stderr, "Refreshing iterator\n");
81     for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
82       BatchResult res = iter->GetBatch();
83       if (res.sequence != currentSeqNum) {
84         fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
85                 (long)currentSeqNum, (long)res.sequence);
86         exit(1);
87       }
88     }
89   }
90 }
91 
92 DEFINE_uint64(num_inserts, 1000,
93               "the num of inserts the first thread should"
94               " perform.");
95 DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
96 DEFINE_uint64(wal_size_limit_MB, 10,
97               "the wal size limit for the run"
98               "(in MB)");
99 
main(int argc,const char ** argv)100 int main(int argc, const char** argv) {
101   SetUsageMessage(
102       std::string("\nUSAGE:\n") + std::string(argv[0]) +
103       " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
104       " --wal_size_limit_MB=<WAL_size_limit_MB>");
105   ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
106 
107   Env* env = Env::Default();
108   std::string default_db_path;
109   env->GetTestDirectory(&default_db_path);
110   default_db_path += "db_repl_stress";
111   Options options;
112   options.create_if_missing = true;
113   options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
114   options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
115   DB* db;
116   DestroyDB(default_db_path, options);
117 
118   Status s = DB::Open(options, default_db_path, &db);
119 
120   if (!s.ok()) {
121     fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
122     exit(1);
123   }
124 
125   DataPumpThread dataPump;
126   dataPump.no_records = FLAGS_num_inserts;
127   dataPump.db = db;
128   env->StartThread(DataPumpThreadBody, &dataPump);
129 
130   ReplicationThread replThread;
131   replThread.db = db;
132   replThread.no_read = 0;
133   replThread.stop.store(false, std::memory_order_release);
134 
135   env->StartThread(ReplicationThreadBody, &replThread);
136   while (replThread.no_read < FLAGS_num_inserts)
137     ;
138   replThread.stop.store(true, std::memory_order_release);
139   if (replThread.no_read < dataPump.no_records) {
140     // no. read should be => than inserted.
141     fprintf(stderr,
142             "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
143             " Written : %" ROCKSDB_PRIszt "\n",
144             replThread.no_read, dataPump.no_records);
145     exit(1);
146   }
147   fprintf(stderr, "Successful!\n");
148   exit(0);
149 }
150 
151 #endif  // GFLAGS
152 
153 #else  // ROCKSDB_LITE
154 #include <stdio.h>
main(int,char **)155 int main(int /*argc*/, char** /*argv*/) {
156   fprintf(stderr, "Not supported in lite mode.\n");
157   return 1;
158 }
159 #endif  // ROCKSDB_LITE
160