1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef OS_WIN
11 #include <sys/ioctl.h>
12 #endif
13
14 #include <sys/types.h>
15
16 #include <iostream>
17 #include <unordered_set>
18 #include <atomic>
19 #include <list>
20
21 #ifdef OS_LINUX
22 #include <fcntl.h>
23 #include <linux/fs.h>
24 #include <stdlib.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #endif
28
29 #ifdef ROCKSDB_FALLOCATE_PRESENT
30 #include <errno.h>
31 #endif
32
33 #include "env/env_chroot.h"
34 #include "logging/log_buffer.h"
35 #include "port/malloc.h"
36 #include "port/port.h"
37 #include "rocksdb/env.h"
38 #include "test_util/fault_injection_test_env.h"
39 #include "test_util/fault_injection_test_fs.h"
40 #include "test_util/sync_point.h"
41 #include "test_util/testharness.h"
42 #include "test_util/testutil.h"
43 #include "util/coding.h"
44 #include "util/mutexlock.h"
45 #include "util/string_util.h"
46
47 namespace ROCKSDB_NAMESPACE {
48
49 using port::kPageSize;
50
51 static const int kDelayMicros = 100000;
52
53 struct Deleter {
DeleterROCKSDB_NAMESPACE::Deleter54 explicit Deleter(void (*fn)(void*)) : fn_(fn) {}
55
operator ()ROCKSDB_NAMESPACE::Deleter56 void operator()(void* ptr) {
57 assert(fn_);
58 assert(ptr);
59 (*fn_)(ptr);
60 }
61
62 void (*fn_)(void*);
63 };
64
NewAligned(const size_t size,const char ch)65 std::unique_ptr<char, Deleter> NewAligned(const size_t size, const char ch) {
66 char* ptr = nullptr;
67 #ifdef OS_WIN
68 if (nullptr == (ptr = reinterpret_cast<char*>(_aligned_malloc(size, kPageSize)))) {
69 return std::unique_ptr<char, Deleter>(nullptr, Deleter(_aligned_free));
70 }
71 std::unique_ptr<char, Deleter> uptr(ptr, Deleter(_aligned_free));
72 #else
73 if (posix_memalign(reinterpret_cast<void**>(&ptr), kPageSize, size) != 0) {
74 return std::unique_ptr<char, Deleter>(nullptr, Deleter(free));
75 }
76 std::unique_ptr<char, Deleter> uptr(ptr, Deleter(free));
77 #endif
78 memset(uptr.get(), ch, size);
79 return uptr;
80 }
81
82 class EnvPosixTest : public testing::Test {
83 private:
84 port::Mutex mu_;
85 std::string events_;
86
87 public:
88 Env* env_;
89 bool direct_io_;
EnvPosixTest()90 EnvPosixTest() : env_(Env::Default()), direct_io_(false) {}
91 };
92
93 class EnvPosixTestWithParam
94 : public EnvPosixTest,
95 public ::testing::WithParamInterface<std::pair<Env*, bool>> {
96 public:
EnvPosixTestWithParam()97 EnvPosixTestWithParam() {
98 std::pair<Env*, bool> param_pair = GetParam();
99 env_ = param_pair.first;
100 direct_io_ = param_pair.second;
101 }
102
WaitThreadPoolsEmpty()103 void WaitThreadPoolsEmpty() {
104 // Wait until the thread pools are empty.
105 while (env_->GetThreadPoolQueueLen(Env::Priority::LOW) != 0) {
106 Env::Default()->SleepForMicroseconds(kDelayMicros);
107 }
108 while (env_->GetThreadPoolQueueLen(Env::Priority::HIGH) != 0) {
109 Env::Default()->SleepForMicroseconds(kDelayMicros);
110 }
111 }
112
~EnvPosixTestWithParam()113 ~EnvPosixTestWithParam() override { WaitThreadPoolsEmpty(); }
114 };
115
SetBool(void * ptr)116 static void SetBool(void* ptr) {
117 reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
118 }
119
TEST_F(EnvPosixTest,DISABLED_RunImmediately)120 TEST_F(EnvPosixTest, DISABLED_RunImmediately) {
121 for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) {
122 std::atomic<bool> called(false);
123 env_->SetBackgroundThreads(1, static_cast<Env::Priority>(pri));
124 env_->Schedule(&SetBool, &called, static_cast<Env::Priority>(pri));
125 Env::Default()->SleepForMicroseconds(kDelayMicros);
126 ASSERT_TRUE(called.load());
127 }
128 }
129
TEST_F(EnvPosixTest,RunEventually)130 TEST_F(EnvPosixTest, RunEventually) {
131 std::atomic<bool> called(false);
132 env_->StartThread(&SetBool, &called);
133 env_->WaitForJoin();
134 ASSERT_TRUE(called.load());
135 }
136
137 #ifdef OS_WIN
TEST_F(EnvPosixTest,AreFilesSame)138 TEST_F(EnvPosixTest, AreFilesSame) {
139 {
140 bool tmp;
141 if (env_->AreFilesSame("", "", &tmp).IsNotSupported()) {
142 fprintf(stderr,
143 "skipping EnvBasicTestWithParam.AreFilesSame due to "
144 "unsupported Env::AreFilesSame\n");
145 return;
146 }
147 }
148
149 const EnvOptions soptions;
150 auto* env = Env::Default();
151 std::string same_file_name = test::PerThreadDBPath(env, "same_file");
152 std::string same_file_link_name = same_file_name + "_link";
153
154 std::unique_ptr<WritableFile> same_file;
155 ASSERT_OK(env->NewWritableFile(same_file_name,
156 &same_file, soptions));
157 same_file->Append("random_data");
158 ASSERT_OK(same_file->Flush());
159 same_file.reset();
160
161 ASSERT_OK(env->LinkFile(same_file_name, same_file_link_name));
162 bool result = false;
163 ASSERT_OK(env->AreFilesSame(same_file_name, same_file_link_name, &result));
164 ASSERT_TRUE(result);
165 }
166 #endif
167
168 #ifdef OS_LINUX
TEST_F(EnvPosixTest,DISABLED_FilePermission)169 TEST_F(EnvPosixTest, DISABLED_FilePermission) {
170 // Only works for Linux environment
171 if (env_ == Env::Default()) {
172 EnvOptions soptions;
173 std::vector<std::string> fileNames{
174 test::PerThreadDBPath(env_, "testfile"),
175 test::PerThreadDBPath(env_, "testfile1")};
176 std::unique_ptr<WritableFile> wfile;
177 ASSERT_OK(env_->NewWritableFile(fileNames[0], &wfile, soptions));
178 ASSERT_OK(env_->NewWritableFile(fileNames[1], &wfile, soptions));
179 wfile.reset();
180 std::unique_ptr<RandomRWFile> rwfile;
181 ASSERT_OK(env_->NewRandomRWFile(fileNames[1], &rwfile, soptions));
182
183 struct stat sb;
184 for (const auto& filename : fileNames) {
185 if (::stat(filename.c_str(), &sb) == 0) {
186 ASSERT_EQ(sb.st_mode & 0777, 0644);
187 }
188 env_->DeleteFile(filename);
189 }
190
191 env_->SetAllowNonOwnerAccess(false);
192 ASSERT_OK(env_->NewWritableFile(fileNames[0], &wfile, soptions));
193 ASSERT_OK(env_->NewWritableFile(fileNames[1], &wfile, soptions));
194 wfile.reset();
195 ASSERT_OK(env_->NewRandomRWFile(fileNames[1], &rwfile, soptions));
196
197 for (const auto& filename : fileNames) {
198 if (::stat(filename.c_str(), &sb) == 0) {
199 ASSERT_EQ(sb.st_mode & 0777, 0600);
200 }
201 env_->DeleteFile(filename);
202 }
203 }
204 }
205 #endif
206
TEST_F(EnvPosixTest,MemoryMappedFileBuffer)207 TEST_F(EnvPosixTest, MemoryMappedFileBuffer) {
208 const int kFileBytes = 1 << 15; // 32 KB
209 std::string expected_data;
210 std::string fname = test::PerThreadDBPath(env_, "testfile");
211 {
212 std::unique_ptr<WritableFile> wfile;
213 const EnvOptions soptions;
214 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
215
216 Random rnd(301);
217 test::RandomString(&rnd, kFileBytes, &expected_data);
218 ASSERT_OK(wfile->Append(expected_data));
219 }
220
221 std::unique_ptr<MemoryMappedFileBuffer> mmap_buffer;
222 Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer);
223 // it should be supported at least on linux
224 #if !defined(OS_LINUX)
225 if (status.IsNotSupported()) {
226 fprintf(stderr,
227 "skipping EnvPosixTest.MemoryMappedFileBuffer due to "
228 "unsupported Env::NewMemoryMappedFileBuffer\n");
229 return;
230 }
231 #endif // !defined(OS_LINUX)
232
233 ASSERT_OK(status);
234 ASSERT_NE(nullptr, mmap_buffer.get());
235 ASSERT_NE(nullptr, mmap_buffer->GetBase());
236 ASSERT_EQ(kFileBytes, mmap_buffer->GetLen());
237 std::string actual_data(reinterpret_cast<const char*>(mmap_buffer->GetBase()),
238 mmap_buffer->GetLen());
239 ASSERT_EQ(expected_data, actual_data);
240 }
241
242 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
TEST_F(EnvPosixTest,LoadRocksDBLibrary)243 TEST_F(EnvPosixTest, LoadRocksDBLibrary) {
244 std::shared_ptr<DynamicLibrary> library;
245 std::function<void*(void*, const char*)> function;
246 Status status = env_->LoadLibrary("no-such-library", "", &library);
247 ASSERT_NOK(status);
248 ASSERT_EQ(nullptr, library.get());
249 status = env_->LoadLibrary("rocksdb", "", &library);
250 if (status.ok()) { // If we have can find a rocksdb shared library
251 ASSERT_NE(nullptr, library.get());
252 ASSERT_OK(library->LoadFunction("rocksdb_create_default_env",
253 &function)); // from C definition
254 ASSERT_NE(nullptr, function);
255 ASSERT_NOK(library->LoadFunction("no-such-method", &function));
256 ASSERT_EQ(nullptr, function);
257 ASSERT_OK(env_->LoadLibrary(library->Name(), "", &library));
258 } else {
259 ASSERT_EQ(nullptr, library.get());
260 }
261 }
262 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
263
264 #if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
TEST_F(EnvPosixTest,LoadRocksDBLibraryWithSearchPath)265 TEST_F(EnvPosixTest, LoadRocksDBLibraryWithSearchPath) {
266 std::shared_ptr<DynamicLibrary> library;
267 std::function<void*(void*, const char*)> function;
268 ASSERT_NOK(env_->LoadLibrary("no-such-library", "/tmp", &library));
269 ASSERT_EQ(nullptr, library.get());
270 ASSERT_NOK(env_->LoadLibrary("dl", "/tmp", &library));
271 ASSERT_EQ(nullptr, library.get());
272 Status status = env_->LoadLibrary("rocksdb", "/tmp:./", &library);
273 if (status.ok()) {
274 ASSERT_NE(nullptr, library.get());
275 ASSERT_OK(env_->LoadLibrary(library->Name(), "", &library));
276 }
277 char buff[1024];
278 std::string cwd = getcwd(buff, sizeof(buff));
279
280 status = env_->LoadLibrary("rocksdb", "/tmp:" + cwd, &library);
281 if (status.ok()) {
282 ASSERT_NE(nullptr, library.get());
283 ASSERT_OK(env_->LoadLibrary(library->Name(), "", &library));
284 }
285 }
286 #endif // !OS_WIN && !ROCKSDB_NO_DYNAMIC_EXTENSION
287
TEST_P(EnvPosixTestWithParam,UnSchedule)288 TEST_P(EnvPosixTestWithParam, UnSchedule) {
289 std::atomic<bool> called(false);
290 env_->SetBackgroundThreads(1, Env::LOW);
291
292 /* Block the low priority queue */
293 test::SleepingBackgroundTask sleeping_task, sleeping_task1;
294 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
295 Env::Priority::LOW);
296
297 /* Schedule another task */
298 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task1,
299 Env::Priority::LOW, &sleeping_task1);
300
301 /* Remove it with a different tag */
302 ASSERT_EQ(0, env_->UnSchedule(&called, Env::Priority::LOW));
303
304 /* Remove it from the queue with the right tag */
305 ASSERT_EQ(1, env_->UnSchedule(&sleeping_task1, Env::Priority::LOW));
306
307 // Unblock background thread
308 sleeping_task.WakeUp();
309
310 /* Schedule another task */
311 env_->Schedule(&SetBool, &called);
312 for (int i = 0; i < kDelayMicros; i++) {
313 if (called.load()) {
314 break;
315 }
316 Env::Default()->SleepForMicroseconds(1);
317 }
318 ASSERT_TRUE(called.load());
319
320 ASSERT_TRUE(!sleeping_task.IsSleeping() && !sleeping_task1.IsSleeping());
321 WaitThreadPoolsEmpty();
322 }
323
324 // This tests assumes that the last scheduled
325 // task will run last. In fact, in the allotted
326 // sleeping time nothing may actually run or they may
327 // run in any order. The purpose of the test is unclear.
328 #ifndef OS_WIN
TEST_P(EnvPosixTestWithParam,RunMany)329 TEST_P(EnvPosixTestWithParam, RunMany) {
330 std::atomic<int> last_id(0);
331
332 struct CB {
333 std::atomic<int>* last_id_ptr; // Pointer to shared slot
334 int id; // Order# for the execution of this callback
335
336 CB(std::atomic<int>* p, int i) : last_id_ptr(p), id(i) {}
337
338 static void Run(void* v) {
339 CB* cb = reinterpret_cast<CB*>(v);
340 int cur = cb->last_id_ptr->load();
341 ASSERT_EQ(cb->id - 1, cur);
342 cb->last_id_ptr->store(cb->id);
343 }
344 };
345
346 // Schedule in different order than start time
347 CB cb1(&last_id, 1);
348 CB cb2(&last_id, 2);
349 CB cb3(&last_id, 3);
350 CB cb4(&last_id, 4);
351 env_->Schedule(&CB::Run, &cb1);
352 env_->Schedule(&CB::Run, &cb2);
353 env_->Schedule(&CB::Run, &cb3);
354 env_->Schedule(&CB::Run, &cb4);
355
356 Env::Default()->SleepForMicroseconds(kDelayMicros);
357 int cur = last_id.load(std::memory_order_acquire);
358 ASSERT_EQ(4, cur);
359 WaitThreadPoolsEmpty();
360 }
361 #endif
362
363 struct State {
364 port::Mutex mu;
365 int val;
366 int num_running;
367 };
368
ThreadBody(void * arg)369 static void ThreadBody(void* arg) {
370 State* s = reinterpret_cast<State*>(arg);
371 s->mu.Lock();
372 s->val += 1;
373 s->num_running -= 1;
374 s->mu.Unlock();
375 }
376
TEST_P(EnvPosixTestWithParam,StartThread)377 TEST_P(EnvPosixTestWithParam, StartThread) {
378 State state;
379 state.val = 0;
380 state.num_running = 3;
381 for (int i = 0; i < 3; i++) {
382 env_->StartThread(&ThreadBody, &state);
383 }
384 while (true) {
385 state.mu.Lock();
386 int num = state.num_running;
387 state.mu.Unlock();
388 if (num == 0) {
389 break;
390 }
391 Env::Default()->SleepForMicroseconds(kDelayMicros);
392 }
393 ASSERT_EQ(state.val, 3);
394 WaitThreadPoolsEmpty();
395 }
396
TEST_P(EnvPosixTestWithParam,TwoPools)397 TEST_P(EnvPosixTestWithParam, TwoPools) {
398 // Data structures to signal tasks to run.
399 port::Mutex mutex;
400 port::CondVar cv(&mutex);
401 bool should_start = false;
402
403 class CB {
404 public:
405 CB(const std::string& pool_name, int pool_size, port::Mutex* trigger_mu,
406 port::CondVar* trigger_cv, bool* _should_start)
407 : mu_(),
408 num_running_(0),
409 num_finished_(0),
410 pool_size_(pool_size),
411 pool_name_(pool_name),
412 trigger_mu_(trigger_mu),
413 trigger_cv_(trigger_cv),
414 should_start_(_should_start) {}
415
416 static void Run(void* v) {
417 CB* cb = reinterpret_cast<CB*>(v);
418 cb->Run();
419 }
420
421 void Run() {
422 {
423 MutexLock l(&mu_);
424 num_running_++;
425 // make sure we don't have more than pool_size_ jobs running.
426 ASSERT_LE(num_running_, pool_size_.load());
427 }
428
429 {
430 MutexLock l(trigger_mu_);
431 while (!(*should_start_)) {
432 trigger_cv_->Wait();
433 }
434 }
435
436 {
437 MutexLock l(&mu_);
438 num_running_--;
439 num_finished_++;
440 }
441 }
442
443 int NumFinished() {
444 MutexLock l(&mu_);
445 return num_finished_;
446 }
447
448 void Reset(int pool_size) {
449 pool_size_.store(pool_size);
450 num_finished_ = 0;
451 }
452
453 private:
454 port::Mutex mu_;
455 int num_running_;
456 int num_finished_;
457 std::atomic<int> pool_size_;
458 std::string pool_name_;
459 port::Mutex* trigger_mu_;
460 port::CondVar* trigger_cv_;
461 bool* should_start_;
462 };
463
464 const int kLowPoolSize = 2;
465 const int kHighPoolSize = 4;
466 const int kJobs = 8;
467
468 CB low_pool_job("low", kLowPoolSize, &mutex, &cv, &should_start);
469 CB high_pool_job("high", kHighPoolSize, &mutex, &cv, &should_start);
470
471 env_->SetBackgroundThreads(kLowPoolSize);
472 env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
473
474 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
475 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
476
477 // schedule same number of jobs in each pool
478 for (int i = 0; i < kJobs; i++) {
479 env_->Schedule(&CB::Run, &low_pool_job);
480 env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
481 }
482 // Wait a short while for the jobs to be dispatched.
483 int sleep_count = 0;
484 while ((unsigned int)(kJobs - kLowPoolSize) !=
485 env_->GetThreadPoolQueueLen(Env::Priority::LOW) ||
486 (unsigned int)(kJobs - kHighPoolSize) !=
487 env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) {
488 env_->SleepForMicroseconds(kDelayMicros);
489 if (++sleep_count > 100) {
490 break;
491 }
492 }
493
494 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
495 env_->GetThreadPoolQueueLen());
496 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
497 env_->GetThreadPoolQueueLen(Env::Priority::LOW));
498 ASSERT_EQ((unsigned int)(kJobs - kHighPoolSize),
499 env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
500
501 // Trigger jobs to run.
502 {
503 MutexLock l(&mutex);
504 should_start = true;
505 cv.SignalAll();
506 }
507
508 // wait for all jobs to finish
509 while (low_pool_job.NumFinished() < kJobs ||
510 high_pool_job.NumFinished() < kJobs) {
511 env_->SleepForMicroseconds(kDelayMicros);
512 }
513
514 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
515 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
516
517 // Hold jobs to schedule;
518 should_start = false;
519
520 // call IncBackgroundThreadsIfNeeded to two pools. One increasing and
521 // the other decreasing
522 env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW);
523 env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH);
524 high_pool_job.Reset(kHighPoolSize + 1);
525 low_pool_job.Reset(kLowPoolSize);
526
527 // schedule same number of jobs in each pool
528 for (int i = 0; i < kJobs; i++) {
529 env_->Schedule(&CB::Run, &low_pool_job);
530 env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
531 }
532 // Wait a short while for the jobs to be dispatched.
533 sleep_count = 0;
534 while ((unsigned int)(kJobs - kLowPoolSize) !=
535 env_->GetThreadPoolQueueLen(Env::Priority::LOW) ||
536 (unsigned int)(kJobs - (kHighPoolSize + 1)) !=
537 env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) {
538 env_->SleepForMicroseconds(kDelayMicros);
539 if (++sleep_count > 100) {
540 break;
541 }
542 }
543 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
544 env_->GetThreadPoolQueueLen());
545 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
546 env_->GetThreadPoolQueueLen(Env::Priority::LOW));
547 ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)),
548 env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
549
550 // Trigger jobs to run.
551 {
552 MutexLock l(&mutex);
553 should_start = true;
554 cv.SignalAll();
555 }
556
557 // wait for all jobs to finish
558 while (low_pool_job.NumFinished() < kJobs ||
559 high_pool_job.NumFinished() < kJobs) {
560 env_->SleepForMicroseconds(kDelayMicros);
561 }
562
563 env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
564 WaitThreadPoolsEmpty();
565 }
566
TEST_P(EnvPosixTestWithParam,DecreaseNumBgThreads)567 TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
568 constexpr int kWaitMicros = 60000000; // 1min
569
570 std::vector<test::SleepingBackgroundTask> tasks(10);
571
572 // Set number of thread to 1 first.
573 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
574
575 // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
576 for (size_t i = 0; i < 3; i++) {
577 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
578 Env::Priority::HIGH);
579 }
580 ASSERT_FALSE(tasks[0].TimedWaitUntilSleeping(kWaitMicros));
581 ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
582 ASSERT_TRUE(tasks[0].IsSleeping());
583 ASSERT_TRUE(!tasks[1].IsSleeping());
584 ASSERT_TRUE(!tasks[2].IsSleeping());
585
586 // Increase to 2 threads. Task 0, 1 running; 2 waiting
587 env_->SetBackgroundThreads(2, Env::Priority::HIGH);
588 ASSERT_FALSE(tasks[1].TimedWaitUntilSleeping(kWaitMicros));
589 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
590 ASSERT_TRUE(tasks[0].IsSleeping());
591 ASSERT_TRUE(tasks[1].IsSleeping());
592 ASSERT_TRUE(!tasks[2].IsSleeping());
593
594 // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
595 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
596 Env::Default()->SleepForMicroseconds(kDelayMicros);
597 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
598 ASSERT_TRUE(tasks[0].IsSleeping());
599 ASSERT_TRUE(tasks[1].IsSleeping());
600 ASSERT_TRUE(!tasks[2].IsSleeping());
601
602 // The last task finishes. Task 0 running, 2 waiting.
603 tasks[1].WakeUp();
604 ASSERT_FALSE(tasks[1].TimedWaitUntilDone(kWaitMicros));
605 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
606 ASSERT_TRUE(tasks[0].IsSleeping());
607 ASSERT_TRUE(!tasks[1].IsSleeping());
608 ASSERT_TRUE(!tasks[2].IsSleeping());
609
610 // Increase to 5 threads. Task 0 and 2 running.
611 env_->SetBackgroundThreads(5, Env::Priority::HIGH);
612 ASSERT_FALSE(tasks[2].TimedWaitUntilSleeping(kWaitMicros));
613 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
614 ASSERT_TRUE(tasks[0].IsSleeping());
615 ASSERT_TRUE(!tasks[1].IsSleeping());
616 ASSERT_TRUE(tasks[2].IsSleeping());
617
618 // Change number of threads a couple of times while there is no sufficient
619 // tasks.
620 env_->SetBackgroundThreads(7, Env::Priority::HIGH);
621 tasks[2].WakeUp();
622 ASSERT_FALSE(tasks[2].TimedWaitUntilDone(kWaitMicros));
623 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
624 env_->SetBackgroundThreads(3, Env::Priority::HIGH);
625 Env::Default()->SleepForMicroseconds(kDelayMicros);
626 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
627 env_->SetBackgroundThreads(4, Env::Priority::HIGH);
628 Env::Default()->SleepForMicroseconds(kDelayMicros);
629 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
630 env_->SetBackgroundThreads(5, Env::Priority::HIGH);
631 Env::Default()->SleepForMicroseconds(kDelayMicros);
632 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
633 env_->SetBackgroundThreads(4, Env::Priority::HIGH);
634 Env::Default()->SleepForMicroseconds(kDelayMicros);
635 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
636
637 Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
638
639 // Enqueue 5 more tasks. Thread pool size now is 4.
640 // Task 0, 3, 4, 5 running;6, 7 waiting.
641 for (size_t i = 3; i < 8; i++) {
642 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
643 Env::Priority::HIGH);
644 }
645 for (size_t i = 3; i <= 5; i++) {
646 ASSERT_FALSE(tasks[i].TimedWaitUntilSleeping(kWaitMicros));
647 }
648 ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
649 ASSERT_TRUE(tasks[0].IsSleeping());
650 ASSERT_TRUE(!tasks[1].IsSleeping());
651 ASSERT_TRUE(!tasks[2].IsSleeping());
652 ASSERT_TRUE(tasks[3].IsSleeping());
653 ASSERT_TRUE(tasks[4].IsSleeping());
654 ASSERT_TRUE(tasks[5].IsSleeping());
655 ASSERT_TRUE(!tasks[6].IsSleeping());
656 ASSERT_TRUE(!tasks[7].IsSleeping());
657
658 // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
659 tasks[0].WakeUp();
660 tasks[3].WakeUp();
661 tasks[4].WakeUp();
662
663 for (size_t i = 5; i < 8; i++) {
664 ASSERT_FALSE(tasks[i].TimedWaitUntilSleeping(kWaitMicros));
665 }
666 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
667 for (size_t i = 5; i < 8; i++) {
668 ASSERT_TRUE(tasks[i].IsSleeping());
669 }
670
671 // Shrink back to 1 thread. Still task 5, 6, 7 running
672 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
673 Env::Default()->SleepForMicroseconds(kDelayMicros);
674 ASSERT_TRUE(tasks[5].IsSleeping());
675 ASSERT_TRUE(tasks[6].IsSleeping());
676 ASSERT_TRUE(tasks[7].IsSleeping());
677
678 // Wake up task 6. Task 5, 7 running
679 tasks[6].WakeUp();
680 ASSERT_FALSE(tasks[6].TimedWaitUntilDone(kWaitMicros));
681 ASSERT_TRUE(tasks[5].IsSleeping());
682 ASSERT_TRUE(!tasks[6].IsSleeping());
683 ASSERT_TRUE(tasks[7].IsSleeping());
684
685 // Wake up threads 7. Task 5 running
686 tasks[7].WakeUp();
687 ASSERT_FALSE(tasks[7].TimedWaitUntilDone(kWaitMicros));
688 ASSERT_TRUE(!tasks[7].IsSleeping());
689
690 // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
691 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[8],
692 Env::Priority::HIGH);
693 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[9],
694 Env::Priority::HIGH);
695 Env::Default()->SleepForMicroseconds(kDelayMicros);
696 ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0);
697 ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
698
699 // Increase to 4 threads. Task 5, 8, 9 running.
700 env_->SetBackgroundThreads(4, Env::Priority::HIGH);
701 Env::Default()->SleepForMicroseconds(kDelayMicros);
702 ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
703 ASSERT_TRUE(tasks[8].IsSleeping());
704 ASSERT_TRUE(tasks[9].IsSleeping());
705
706 // Shrink to 1 thread
707 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
708
709 // Wake up thread 9.
710 tasks[9].WakeUp();
711 ASSERT_FALSE(tasks[9].TimedWaitUntilDone(kWaitMicros));
712 ASSERT_TRUE(!tasks[9].IsSleeping());
713 ASSERT_TRUE(tasks[8].IsSleeping());
714
715 // Wake up thread 8
716 tasks[8].WakeUp();
717 ASSERT_FALSE(tasks[8].TimedWaitUntilDone(kWaitMicros));
718 ASSERT_TRUE(!tasks[8].IsSleeping());
719
720 // Wake up the last thread
721 tasks[5].WakeUp();
722 ASSERT_FALSE(tasks[5].TimedWaitUntilDone(kWaitMicros));
723 WaitThreadPoolsEmpty();
724 }
725
726 #if (defined OS_LINUX || defined OS_WIN)
727 // Travis doesn't support fallocate or getting unique ID from files for whatever
728 // reason.
729 #ifndef TRAVIS
730
731 namespace {
IsSingleVarint(const std::string & s)732 bool IsSingleVarint(const std::string& s) {
733 Slice slice(s);
734
735 uint64_t v;
736 if (!GetVarint64(&slice, &v)) {
737 return false;
738 }
739
740 return slice.size() == 0;
741 }
742
IsUniqueIDValid(const std::string & s)743 bool IsUniqueIDValid(const std::string& s) {
744 return !s.empty() && !IsSingleVarint(s);
745 }
746
747 const size_t MAX_ID_SIZE = 100;
748 char temp_id[MAX_ID_SIZE];
749
750
751 } // namespace
752
753 // Determine whether we can use the FS_IOC_GETVERSION ioctl
754 // on a file in directory DIR. Create a temporary file therein,
755 // try to apply the ioctl (save that result), cleanup and
756 // return the result. Return true if it is supported, and
757 // false if anything fails.
758 // Note that this function "knows" that dir has just been created
759 // and is empty, so we create a simply-named test file: "f".
ioctl_support__FS_IOC_GETVERSION(const std::string & dir)760 bool ioctl_support__FS_IOC_GETVERSION(const std::string& dir) {
761 #ifdef OS_WIN
762 return true;
763 #else
764 const std::string file = dir + "/f";
765 int fd;
766 do {
767 fd = open(file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
768 } while (fd < 0 && errno == EINTR);
769 long int version;
770 bool ok = (fd >= 0 && ioctl(fd, FS_IOC_GETVERSION, &version) >= 0);
771
772 close(fd);
773 unlink(file.c_str());
774
775 return ok;
776 #endif
777 }
778
779 // To ensure that Env::GetUniqueId-related tests work correctly, the files
780 // should be stored in regular storage like "hard disk" or "flash device",
781 // and not on a tmpfs file system (like /dev/shm and /tmp on some systems).
782 // Otherwise we cannot get the correct id.
783 //
784 // This function serves as the replacement for test::TmpDir(), which may be
785 // customized to be on a file system that doesn't work with GetUniqueId().
786
787 class IoctlFriendlyTmpdir {
788 public:
IoctlFriendlyTmpdir()789 explicit IoctlFriendlyTmpdir() {
790 char dir_buf[100];
791
792 const char *fmt = "%s/rocksdb.XXXXXX";
793 const char *tmp = getenv("TEST_IOCTL_FRIENDLY_TMPDIR");
794
795 #ifdef OS_WIN
796 #define rmdir _rmdir
797 if(tmp == nullptr) {
798 tmp = getenv("TMP");
799 }
800
801 snprintf(dir_buf, sizeof dir_buf, fmt, tmp);
802 auto result = _mktemp(dir_buf);
803 assert(result != nullptr);
804 BOOL ret = CreateDirectory(dir_buf, NULL);
805 assert(ret == TRUE);
806 dir_ = dir_buf;
807 #else
808 std::list<std::string> candidate_dir_list = {"/var/tmp", "/tmp"};
809
810 // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use
811 // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and
812 // add 1 for the trailing NUL byte.
813 if (tmp && strlen(tmp) + strlen(fmt) - 2 + 1 <= sizeof dir_buf) {
814 // use $TEST_IOCTL_FRIENDLY_TMPDIR value
815 candidate_dir_list.push_front(tmp);
816 }
817
818 for (const std::string& d : candidate_dir_list) {
819 snprintf(dir_buf, sizeof dir_buf, fmt, d.c_str());
820 if (mkdtemp(dir_buf)) {
821 if (ioctl_support__FS_IOC_GETVERSION(dir_buf)) {
822 dir_ = dir_buf;
823 return;
824 } else {
825 // Diagnose ioctl-related failure only if this is the
826 // directory specified via that envvar.
827 if (tmp && tmp == d) {
828 fprintf(stderr, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is "
829 "not suitable: %s\n", d.c_str());
830 }
831 rmdir(dir_buf); // ignore failure
832 }
833 } else {
834 // mkdtemp failed: diagnose it, but don't give up.
835 fprintf(stderr, "mkdtemp(%s/...) failed: %s\n", d.c_str(),
836 strerror(errno));
837 }
838 }
839
840 fprintf(stderr, "failed to find an ioctl-friendly temporary directory;"
841 " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n");
842 std::abort();
843 #endif
844 }
845
~IoctlFriendlyTmpdir()846 ~IoctlFriendlyTmpdir() {
847 rmdir(dir_.c_str());
848 }
849
name() const850 const std::string& name() const {
851 return dir_;
852 }
853
854 private:
855 std::string dir_;
856 };
857
858 #ifndef ROCKSDB_LITE
TEST_F(EnvPosixTest,PositionedAppend)859 TEST_F(EnvPosixTest, PositionedAppend) {
860 std::unique_ptr<WritableFile> writable_file;
861 EnvOptions options;
862 options.use_direct_writes = true;
863 options.use_mmap_writes = false;
864 IoctlFriendlyTmpdir ift;
865 ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options));
866 const size_t kBlockSize = 4096;
867 const size_t kDataSize = kPageSize;
868 // Write a page worth of 'a'
869 auto data_ptr = NewAligned(kDataSize, 'a');
870 Slice data_a(data_ptr.get(), kDataSize);
871 ASSERT_OK(writable_file->PositionedAppend(data_a, 0U));
872 // Write a page worth of 'b' right after the first sector
873 data_ptr = NewAligned(kDataSize, 'b');
874 Slice data_b(data_ptr.get(), kDataSize);
875 ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize));
876 ASSERT_OK(writable_file->Close());
877 // The file now has 1 sector worth of a followed by a page worth of b
878
879 // Verify the above
880 std::unique_ptr<SequentialFile> seq_file;
881 ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options));
882 size_t scratch_len = kPageSize * 2;
883 std::unique_ptr<char[]> scratch(new char[scratch_len]);
884 Slice result;
885 ASSERT_OK(seq_file->Read(scratch_len, &result, scratch.get()));
886 ASSERT_EQ(kPageSize + kBlockSize, result.size());
887 ASSERT_EQ('a', result[kBlockSize - 1]);
888 ASSERT_EQ('b', result[kBlockSize]);
889 }
890 #endif // !ROCKSDB_LITE
891
892 // `GetUniqueId()` temporarily returns zero on Windows. `BlockBasedTable` can
893 // handle a return value of zero but this test case cannot.
894 #ifndef OS_WIN
TEST_P(EnvPosixTestWithParam,RandomAccessUniqueID)895 TEST_P(EnvPosixTestWithParam, RandomAccessUniqueID) {
896 // Create file.
897 if (env_ == Env::Default()) {
898 EnvOptions soptions;
899 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
900 IoctlFriendlyTmpdir ift;
901 std::string fname = ift.name() + "/testfile";
902 std::unique_ptr<WritableFile> wfile;
903 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
904
905 std::unique_ptr<RandomAccessFile> file;
906
907 // Get Unique ID
908 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
909 size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
910 ASSERT_TRUE(id_size > 0);
911 std::string unique_id1(temp_id, id_size);
912 ASSERT_TRUE(IsUniqueIDValid(unique_id1));
913
914 // Get Unique ID again
915 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
916 id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
917 ASSERT_TRUE(id_size > 0);
918 std::string unique_id2(temp_id, id_size);
919 ASSERT_TRUE(IsUniqueIDValid(unique_id2));
920
921 // Get Unique ID again after waiting some time.
922 env_->SleepForMicroseconds(1000000);
923 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
924 id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
925 ASSERT_TRUE(id_size > 0);
926 std::string unique_id3(temp_id, id_size);
927 ASSERT_TRUE(IsUniqueIDValid(unique_id3));
928
929 // Check IDs are the same.
930 ASSERT_EQ(unique_id1, unique_id2);
931 ASSERT_EQ(unique_id2, unique_id3);
932
933 // Delete the file
934 env_->DeleteFile(fname);
935 }
936 }
937 #endif // !defined(OS_WIN)
938
939 // only works in linux platforms
940 #ifdef ROCKSDB_FALLOCATE_PRESENT
TEST_P(EnvPosixTestWithParam,AllocateTest)941 TEST_P(EnvPosixTestWithParam, AllocateTest) {
942 if (env_ == Env::Default()) {
943 IoctlFriendlyTmpdir ift;
944 std::string fname = ift.name() + "/preallocate_testfile";
945
946 // Try fallocate in a file to see whether the target file system supports
947 // it.
948 // Skip the test if fallocate is not supported.
949 std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2";
950 int fd = -1;
951 do {
952 fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
953 } while (fd < 0 && errno == EINTR);
954 ASSERT_GT(fd, 0);
955
956 int alloc_status = fallocate(fd, 0, 0, 1);
957
958 int err_number = 0;
959 if (alloc_status != 0) {
960 err_number = errno;
961 fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number));
962 }
963 close(fd);
964 ASSERT_OK(env_->DeleteFile(fname_test_fallocate));
965 if (alloc_status != 0 && err_number == EOPNOTSUPP) {
966 // The filesystem containing the file does not support fallocate
967 return;
968 }
969
970 EnvOptions soptions;
971 soptions.use_mmap_writes = false;
972 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
973 std::unique_ptr<WritableFile> wfile;
974 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
975
976 // allocate 100 MB
977 size_t kPreallocateSize = 100 * 1024 * 1024;
978 size_t kBlockSize = 512;
979 size_t kDataSize = 1024 * 1024;
980 auto data_ptr = NewAligned(kDataSize, 'A');
981 Slice data(data_ptr.get(), kDataSize);
982 wfile->SetPreallocationBlockSize(kPreallocateSize);
983 wfile->PrepareWrite(wfile->GetFileSize(), kDataSize);
984 ASSERT_OK(wfile->Append(data));
985 ASSERT_OK(wfile->Flush());
986
987 struct stat f_stat;
988 ASSERT_EQ(stat(fname.c_str(), &f_stat), 0);
989 ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
990 // verify that blocks are preallocated
991 // Note here that we don't check the exact number of blocks preallocated --
992 // we only require that number of allocated blocks is at least what we
993 // expect.
994 // It looks like some FS give us more blocks that we asked for. That's fine.
995 // It might be worth investigating further.
996 ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks);
997
998 // close the file, should deallocate the blocks
999 wfile.reset();
1000
1001 stat(fname.c_str(), &f_stat);
1002 ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
1003 // verify that preallocated blocks were deallocated on file close
1004 // Because the FS might give us more blocks, we add a full page to the size
1005 // and expect the number of blocks to be less or equal to that.
1006 ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize,
1007 (unsigned int)f_stat.st_blocks);
1008 }
1009 }
1010 #endif // ROCKSDB_FALLOCATE_PRESENT
1011
1012 // Returns true if any of the strings in ss are the prefix of another string.
HasPrefix(const std::unordered_set<std::string> & ss)1013 bool HasPrefix(const std::unordered_set<std::string>& ss) {
1014 for (const std::string& s: ss) {
1015 if (s.empty()) {
1016 return true;
1017 }
1018 for (size_t i = 1; i < s.size(); ++i) {
1019 if (ss.count(s.substr(0, i)) != 0) {
1020 return true;
1021 }
1022 }
1023 }
1024 return false;
1025 }
1026
1027 // `GetUniqueId()` temporarily returns zero on Windows. `BlockBasedTable` can
1028 // handle a return value of zero but this test case cannot.
1029 #ifndef OS_WIN
TEST_P(EnvPosixTestWithParam,RandomAccessUniqueIDConcurrent)1030 TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDConcurrent) {
1031 if (env_ == Env::Default()) {
1032 // Check whether a bunch of concurrently existing files have unique IDs.
1033 EnvOptions soptions;
1034 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1035
1036 // Create the files
1037 IoctlFriendlyTmpdir ift;
1038 std::vector<std::string> fnames;
1039 for (int i = 0; i < 1000; ++i) {
1040 fnames.push_back(ift.name() + "/" + "testfile" + ToString(i));
1041
1042 // Create file.
1043 std::unique_ptr<WritableFile> wfile;
1044 ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions));
1045 }
1046
1047 // Collect and check whether the IDs are unique.
1048 std::unordered_set<std::string> ids;
1049 for (const std::string fname : fnames) {
1050 std::unique_ptr<RandomAccessFile> file;
1051 std::string unique_id;
1052 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
1053 size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
1054 ASSERT_TRUE(id_size > 0);
1055 unique_id = std::string(temp_id, id_size);
1056 ASSERT_TRUE(IsUniqueIDValid(unique_id));
1057
1058 ASSERT_TRUE(ids.count(unique_id) == 0);
1059 ids.insert(unique_id);
1060 }
1061
1062 // Delete the files
1063 for (const std::string fname : fnames) {
1064 ASSERT_OK(env_->DeleteFile(fname));
1065 }
1066
1067 ASSERT_TRUE(!HasPrefix(ids));
1068 }
1069 }
1070
TEST_P(EnvPosixTestWithParam,RandomAccessUniqueIDDeletes)1071 TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) {
1072 if (env_ == Env::Default()) {
1073 EnvOptions soptions;
1074 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1075
1076 IoctlFriendlyTmpdir ift;
1077 std::string fname = ift.name() + "/" + "testfile";
1078
1079 // Check that after file is deleted we don't get same ID again in a new
1080 // file.
1081 std::unordered_set<std::string> ids;
1082 for (int i = 0; i < 1000; ++i) {
1083 // Create file.
1084 {
1085 std::unique_ptr<WritableFile> wfile;
1086 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
1087 }
1088
1089 // Get Unique ID
1090 std::string unique_id;
1091 {
1092 std::unique_ptr<RandomAccessFile> file;
1093 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
1094 size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
1095 ASSERT_TRUE(id_size > 0);
1096 unique_id = std::string(temp_id, id_size);
1097 }
1098
1099 ASSERT_TRUE(IsUniqueIDValid(unique_id));
1100 ASSERT_TRUE(ids.count(unique_id) == 0);
1101 ids.insert(unique_id);
1102
1103 // Delete the file
1104 ASSERT_OK(env_->DeleteFile(fname));
1105 }
1106
1107 ASSERT_TRUE(!HasPrefix(ids));
1108 }
1109 }
1110 #endif // !defined(OS_WIN)
1111
TEST_P(EnvPosixTestWithParam,MultiRead)1112 TEST_P(EnvPosixTestWithParam, MultiRead) {
1113 EnvOptions soptions;
1114 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1115 std::string fname = test::PerThreadDBPath(env_, "testfile");
1116
1117 const size_t kSectorSize = 4096;
1118 const size_t kNumSectors = 8;
1119
1120 // Create file.
1121 {
1122 std::unique_ptr<WritableFile> wfile;
1123 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
1124 !defined(OS_AIX)
1125 if (soptions.use_direct_writes) {
1126 soptions.use_direct_writes = false;
1127 }
1128 #endif
1129 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
1130 for (size_t i = 0; i < kNumSectors; ++i) {
1131 auto data = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
1132 Slice slice(data.get(), kSectorSize);
1133 ASSERT_OK(wfile->Append(slice));
1134 }
1135 ASSERT_OK(wfile->Close());
1136 }
1137
1138 // More attempts to simulate more partial result sequences.
1139 for (uint32_t attempt = 0; attempt < 20; attempt++) {
1140 // Random Read
1141 Random rnd(301 + attempt);
1142 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1143 "PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) {
1144 if (attempt > 0) {
1145 // No failure in the first attempt.
1146 size_t& bytes_read = *static_cast<size_t*>(arg);
1147 if (rnd.OneIn(4)) {
1148 bytes_read = 0;
1149 } else if (rnd.OneIn(3)) {
1150 bytes_read = static_cast<size_t>(
1151 rnd.Uniform(static_cast<int>(bytes_read)));
1152 }
1153 }
1154 });
1155 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1156
1157 std::unique_ptr<RandomAccessFile> file;
1158 std::vector<ReadRequest> reqs(3);
1159 std::vector<std::unique_ptr<char, Deleter>> data;
1160 uint64_t offset = 0;
1161 for (size_t i = 0; i < reqs.size(); ++i) {
1162 reqs[i].offset = offset;
1163 offset += 2 * kSectorSize;
1164 reqs[i].len = kSectorSize;
1165 data.emplace_back(NewAligned(kSectorSize, 0));
1166 reqs[i].scratch = data.back().get();
1167 }
1168 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
1169 !defined(OS_AIX)
1170 if (soptions.use_direct_reads) {
1171 soptions.use_direct_reads = false;
1172 }
1173 #endif
1174 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
1175 ASSERT_OK(file->MultiRead(reqs.data(), reqs.size()));
1176 for (size_t i = 0; i < reqs.size(); ++i) {
1177 auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i * 2 + 1));
1178 ASSERT_OK(reqs[i].status);
1179 ASSERT_EQ(memcmp(reqs[i].scratch, buf.get(), kSectorSize), 0);
1180 }
1181 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1182 }
1183 }
1184
TEST_F(EnvPosixTest,MultiReadNonAlignedLargeNum)1185 TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
1186 // In this test we don't do aligned read, wo it doesn't work for
1187 // direct I/O case.
1188 EnvOptions soptions;
1189 soptions.use_direct_reads = soptions.use_direct_writes = false;
1190 std::string fname = test::PerThreadDBPath(env_, "testfile");
1191
1192 const size_t kTotalSize = 81920;
1193 std::string expected_data;
1194 Random rnd(301);
1195 test::RandomString(&rnd, kTotalSize, &expected_data);
1196
1197 // Create file.
1198 {
1199 std::unique_ptr<WritableFile> wfile;
1200 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
1201 ASSERT_OK(wfile->Append(expected_data));
1202 ASSERT_OK(wfile->Close());
1203 }
1204
1205 // More attempts to simulate more partial result sequences.
1206 for (uint32_t attempt = 0; attempt < 25; attempt++) {
1207 // Right now kIoUringDepth is hard coded as 256, so we need very large
1208 // number of keys to cover the case of multiple rounds of submissions.
1209 // Right now the test latency is still acceptable. If it ends up with
1210 // too long, we can modify the io uring depth with SyncPoint here.
1211 const int num_reads = rnd.Uniform(512) + 1;
1212
1213 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1214 "PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) {
1215 if (attempt > 5) {
1216 // Improve partial result rates in second half of the run to
1217 // cover the case of repeated partial results.
1218 int odd = (attempt < 15) ? num_reads / 2 : 4;
1219 // No failure in first several attempts.
1220 size_t& bytes_read = *static_cast<size_t*>(arg);
1221 if (rnd.OneIn(odd)) {
1222 bytes_read = 0;
1223 } else if (rnd.OneIn(odd / 2)) {
1224 bytes_read = static_cast<size_t>(
1225 rnd.Uniform(static_cast<int>(bytes_read)));
1226 }
1227 }
1228 });
1229 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1230
1231 // Generate (offset, len) pairs
1232 std::set<int> start_offsets;
1233 for (int i = 0; i < num_reads; i++) {
1234 int rnd_off;
1235 // No repeat offsets.
1236 while (start_offsets.find(rnd_off = rnd.Uniform(81920)) != start_offsets.end()) {}
1237 start_offsets.insert(rnd_off);
1238 }
1239 std::vector<size_t> offsets;
1240 std::vector<size_t> lens;
1241 // std::set already sorted the offsets.
1242 for (int so: start_offsets) {
1243 offsets.push_back(so);
1244 }
1245 for (size_t i = 0; i < offsets.size() - 1; i++) {
1246 lens.push_back(static_cast<size_t>(rnd.Uniform(static_cast<int>(offsets[i + 1] - offsets[i])) + 1));
1247 }
1248 lens.push_back(static_cast<size_t>(rnd.Uniform(static_cast<int>(kTotalSize - offsets.back())) + 1));
1249 ASSERT_EQ(num_reads, lens.size());
1250
1251 // Create requests
1252 std::vector<std::string> scratches;
1253 scratches.reserve(num_reads);
1254 std::vector<ReadRequest> reqs(num_reads);
1255 for (size_t i = 0; i < reqs.size(); ++i) {
1256 reqs[i].offset = offsets[i];
1257 reqs[i].len = lens[i];
1258 scratches.emplace_back(reqs[i].len, ' ');
1259 reqs[i].scratch = const_cast<char*>(scratches.back().data());
1260 }
1261
1262 // Query the data
1263 std::unique_ptr<RandomAccessFile> file;
1264 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
1265 ASSERT_OK(file->MultiRead(reqs.data(), reqs.size()));
1266
1267 // Validate results
1268 for (int i = 0; i < num_reads; ++i) {
1269 ASSERT_OK(reqs[i].status);
1270 ASSERT_EQ(Slice(expected_data.data() + offsets[i], lens[i]).ToString(true),
1271 reqs[i].result.ToString(true));
1272 }
1273
1274 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1275 }
1276 }
1277
1278 // Only works in linux platforms
1279 #ifdef OS_WIN
TEST_P(EnvPosixTestWithParam,DISABLED_InvalidateCache)1280 TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {
1281 #else
1282 TEST_P(EnvPosixTestWithParam, InvalidateCache) {
1283 #endif
1284 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1285 EnvOptions soptions;
1286 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1287 std::string fname = test::PerThreadDBPath(env_, "testfile");
1288
1289 const size_t kSectorSize = 512;
1290 auto data = NewAligned(kSectorSize, 0);
1291 Slice slice(data.get(), kSectorSize);
1292
1293 // Create file.
1294 {
1295 std::unique_ptr<WritableFile> wfile;
1296 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1297 if (soptions.use_direct_writes) {
1298 soptions.use_direct_writes = false;
1299 }
1300 #endif
1301 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
1302 ASSERT_OK(wfile->Append(slice));
1303 ASSERT_OK(wfile->InvalidateCache(0, 0));
1304 ASSERT_OK(wfile->Close());
1305 }
1306
1307 // Random Read
1308 {
1309 std::unique_ptr<RandomAccessFile> file;
1310 auto scratch = NewAligned(kSectorSize, 0);
1311 Slice result;
1312 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1313 if (soptions.use_direct_reads) {
1314 soptions.use_direct_reads = false;
1315 }
1316 #endif
1317 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
1318 ASSERT_OK(file->Read(0, kSectorSize, &result, scratch.get()));
1319 ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
1320 ASSERT_OK(file->InvalidateCache(0, 11));
1321 ASSERT_OK(file->InvalidateCache(0, 0));
1322 }
1323
1324 // Sequential Read
1325 {
1326 std::unique_ptr<SequentialFile> file;
1327 auto scratch = NewAligned(kSectorSize, 0);
1328 Slice result;
1329 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1330 if (soptions.use_direct_reads) {
1331 soptions.use_direct_reads = false;
1332 }
1333 #endif
1334 ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions));
1335 if (file->use_direct_io()) {
1336 ASSERT_OK(file->PositionedRead(0, kSectorSize, &result, scratch.get()));
1337 } else {
1338 ASSERT_OK(file->Read(kSectorSize, &result, scratch.get()));
1339 }
1340 ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
1341 ASSERT_OK(file->InvalidateCache(0, 11));
1342 ASSERT_OK(file->InvalidateCache(0, 0));
1343 }
1344 // Delete the file
1345 ASSERT_OK(env_->DeleteFile(fname));
1346 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1347 }
1348 #endif // not TRAVIS
1349 #endif // OS_LINUX || OS_WIN
1350
1351 class TestLogger : public Logger {
1352 public:
1353 using Logger::Logv;
1354 void Logv(const char* format, va_list ap) override {
1355 log_count++;
1356
1357 char new_format[550];
1358 std::fill_n(new_format, sizeof(new_format), '2');
1359 {
1360 va_list backup_ap;
1361 va_copy(backup_ap, ap);
1362 int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
1363 // 48 bytes for extra information + bytes allocated
1364
1365 // When we have n == -1 there is not a terminating zero expected
1366 #ifdef OS_WIN
1367 if (n < 0) {
1368 char_0_count++;
1369 }
1370 #endif
1371
1372 if (new_format[0] == '[') {
1373 // "[DEBUG] "
1374 ASSERT_TRUE(n <= 56 + (512 - static_cast<int>(sizeof(struct timeval))));
1375 } else {
1376 ASSERT_TRUE(n <= 48 + (512 - static_cast<int>(sizeof(struct timeval))));
1377 }
1378 va_end(backup_ap);
1379 }
1380
1381 for (size_t i = 0; i < sizeof(new_format); i++) {
1382 if (new_format[i] == 'x') {
1383 char_x_count++;
1384 } else if (new_format[i] == '\0') {
1385 char_0_count++;
1386 }
1387 }
1388 }
1389 int log_count;
1390 int char_x_count;
1391 int char_0_count;
1392 };
1393
1394 TEST_P(EnvPosixTestWithParam, LogBufferTest) {
1395 TestLogger test_logger;
1396 test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
1397 test_logger.log_count = 0;
1398 test_logger.char_x_count = 0;
1399 test_logger.char_0_count = 0;
1400 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
1401 LogBuffer log_buffer_debug(DEBUG_LEVEL, &test_logger);
1402
1403 char bytes200[200];
1404 std::fill_n(bytes200, sizeof(bytes200), '1');
1405 bytes200[sizeof(bytes200) - 1] = '\0';
1406 char bytes600[600];
1407 std::fill_n(bytes600, sizeof(bytes600), '1');
1408 bytes600[sizeof(bytes600) - 1] = '\0';
1409 char bytes9000[9000];
1410 std::fill_n(bytes9000, sizeof(bytes9000), '1');
1411 bytes9000[sizeof(bytes9000) - 1] = '\0';
1412
1413 ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes200);
1414 ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes600);
1415 ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx%sx", bytes200, bytes200, bytes200);
1416 ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes200, bytes600);
1417 ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes600, bytes9000);
1418
1419 ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx", bytes200);
1420 test_logger.SetInfoLogLevel(DEBUG_LEVEL);
1421 ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx%sx%sx", bytes600, bytes9000,
1422 bytes200);
1423
1424 ASSERT_EQ(0, test_logger.log_count);
1425 log_buffer.FlushBufferToLog();
1426 log_buffer_debug.FlushBufferToLog();
1427 ASSERT_EQ(6, test_logger.log_count);
1428 ASSERT_EQ(6, test_logger.char_0_count);
1429 ASSERT_EQ(10, test_logger.char_x_count);
1430 }
1431
1432 class TestLogger2 : public Logger {
1433 public:
1434 explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {}
1435 using Logger::Logv;
1436 void Logv(const char* format, va_list ap) override {
1437 char new_format[2000];
1438 std::fill_n(new_format, sizeof(new_format), '2');
1439 {
1440 va_list backup_ap;
1441 va_copy(backup_ap, ap);
1442 int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
1443 // 48 bytes for extra information + bytes allocated
1444 ASSERT_TRUE(
1445 n <= 48 + static_cast<int>(max_log_size_ - sizeof(struct timeval)));
1446 ASSERT_TRUE(n > static_cast<int>(max_log_size_ - sizeof(struct timeval)));
1447 va_end(backup_ap);
1448 }
1449 }
1450 size_t max_log_size_;
1451 };
1452
1453 TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) {
1454 char bytes9000[9000];
1455 std::fill_n(bytes9000, sizeof(bytes9000), '1');
1456 bytes9000[sizeof(bytes9000) - 1] = '\0';
1457
1458 for (size_t max_log_size = 256; max_log_size <= 1024;
1459 max_log_size += 1024 - 256) {
1460 TestLogger2 test_logger(max_log_size);
1461 test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
1462 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
1463 ROCKS_LOG_BUFFER_MAX_SZ(&log_buffer, max_log_size, "%s", bytes9000);
1464 log_buffer.FlushBufferToLog();
1465 }
1466 }
1467
1468 TEST_P(EnvPosixTestWithParam, Preallocation) {
1469 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1470 const std::string src = test::PerThreadDBPath(env_, "testfile");
1471 std::unique_ptr<WritableFile> srcfile;
1472 EnvOptions soptions;
1473 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1474 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
1475 if (soptions.use_direct_writes) {
1476 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1477 "NewWritableFile:O_DIRECT", [&](void* arg) {
1478 int* val = static_cast<int*>(arg);
1479 *val &= ~O_DIRECT;
1480 });
1481 }
1482 #endif
1483 ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
1484 srcfile->SetPreallocationBlockSize(1024 * 1024);
1485
1486 // No writes should mean no preallocation
1487 size_t block_size, last_allocated_block;
1488 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1489 ASSERT_EQ(last_allocated_block, 0UL);
1490
1491 // Small write should preallocate one block
1492 size_t kStrSize = 4096;
1493 auto data = NewAligned(kStrSize, 'A');
1494 Slice str(data.get(), kStrSize);
1495 srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize);
1496 srcfile->Append(str);
1497 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1498 ASSERT_EQ(last_allocated_block, 1UL);
1499
1500 // Write an entire preallocation block, make sure we increased by two.
1501 {
1502 auto buf_ptr = NewAligned(block_size, ' ');
1503 Slice buf(buf_ptr.get(), block_size);
1504 srcfile->PrepareWrite(srcfile->GetFileSize(), block_size);
1505 srcfile->Append(buf);
1506 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1507 ASSERT_EQ(last_allocated_block, 2UL);
1508 }
1509
1510 // Write five more blocks at once, ensure we're where we need to be.
1511 {
1512 auto buf_ptr = NewAligned(block_size * 5, ' ');
1513 Slice buf = Slice(buf_ptr.get(), block_size * 5);
1514 srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size());
1515 srcfile->Append(buf);
1516 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1517 ASSERT_EQ(last_allocated_block, 7UL);
1518 }
1519 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1520 }
1521
1522 // Test that the two ways to get children file attributes (in bulk or
1523 // individually) behave consistently.
1524 TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
1525 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1526 EnvOptions soptions;
1527 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1528 const int kNumChildren = 10;
1529
1530 std::string data;
1531 for (int i = 0; i < kNumChildren; ++i) {
1532 const std::string path =
1533 test::TmpDir(env_) + "/" + "testfile_" + std::to_string(i);
1534 std::unique_ptr<WritableFile> file;
1535 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
1536 if (soptions.use_direct_writes) {
1537 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1538 "NewWritableFile:O_DIRECT", [&](void* arg) {
1539 int* val = static_cast<int*>(arg);
1540 *val &= ~O_DIRECT;
1541 });
1542 }
1543 #endif
1544 ASSERT_OK(env_->NewWritableFile(path, &file, soptions));
1545 auto buf_ptr = NewAligned(data.size(), 'T');
1546 Slice buf(buf_ptr.get(), data.size());
1547 file->Append(buf);
1548 data.append(std::string(4096, 'T'));
1549 }
1550
1551 std::vector<Env::FileAttributes> file_attrs;
1552 ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs));
1553 for (int i = 0; i < kNumChildren; ++i) {
1554 const std::string name = "testfile_" + std::to_string(i);
1555 const std::string path = test::TmpDir(env_) + "/" + name;
1556
1557 auto file_attrs_iter = std::find_if(
1558 file_attrs.begin(), file_attrs.end(),
1559 [&name](const Env::FileAttributes& fm) { return fm.name == name; });
1560 ASSERT_TRUE(file_attrs_iter != file_attrs.end());
1561 uint64_t size;
1562 ASSERT_OK(env_->GetFileSize(path, &size));
1563 ASSERT_EQ(size, 4096 * i);
1564 ASSERT_EQ(size, file_attrs_iter->size_bytes);
1565 }
1566 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
1567 }
1568
1569 // Test that all WritableFileWrapper forwards all calls to WritableFile.
1570 TEST_P(EnvPosixTestWithParam, WritableFileWrapper) {
1571 class Base : public WritableFile {
1572 public:
1573 mutable int *step_;
1574
1575 void inc(int x) const {
1576 EXPECT_EQ(x, (*step_)++);
1577 }
1578
1579 explicit Base(int* step) : step_(step) {
1580 inc(0);
1581 }
1582
1583 Status Append(const Slice& /*data*/) override {
1584 inc(1);
1585 return Status::OK();
1586 }
1587
1588 Status PositionedAppend(const Slice& /*data*/,
1589 uint64_t /*offset*/) override {
1590 inc(2);
1591 return Status::OK();
1592 }
1593
1594 Status Truncate(uint64_t /*size*/) override {
1595 inc(3);
1596 return Status::OK();
1597 }
1598
1599 Status Close() override {
1600 inc(4);
1601 return Status::OK();
1602 }
1603
1604 Status Flush() override {
1605 inc(5);
1606 return Status::OK();
1607 }
1608
1609 Status Sync() override {
1610 inc(6);
1611 return Status::OK();
1612 }
1613
1614 Status Fsync() override {
1615 inc(7);
1616 return Status::OK();
1617 }
1618
1619 bool IsSyncThreadSafe() const override {
1620 inc(8);
1621 return true;
1622 }
1623
1624 bool use_direct_io() const override {
1625 inc(9);
1626 return true;
1627 }
1628
1629 size_t GetRequiredBufferAlignment() const override {
1630 inc(10);
1631 return 0;
1632 }
1633
1634 void SetIOPriority(Env::IOPriority /*pri*/) override { inc(11); }
1635
1636 Env::IOPriority GetIOPriority() override {
1637 inc(12);
1638 return Env::IOPriority::IO_LOW;
1639 }
1640
1641 void SetWriteLifeTimeHint(Env::WriteLifeTimeHint /*hint*/) override {
1642 inc(13);
1643 }
1644
1645 Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
1646 inc(14);
1647 return Env::WriteLifeTimeHint::WLTH_NOT_SET;
1648 }
1649
1650 uint64_t GetFileSize() override {
1651 inc(15);
1652 return 0;
1653 }
1654
1655 void SetPreallocationBlockSize(size_t /*size*/) override { inc(16); }
1656
1657 void GetPreallocationStatus(size_t* /*block_size*/,
1658 size_t* /*last_allocated_block*/) override {
1659 inc(17);
1660 }
1661
1662 size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
1663 inc(18);
1664 return 0;
1665 }
1666
1667 Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
1668 inc(19);
1669 return Status::OK();
1670 }
1671
1672 Status RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/) override {
1673 inc(20);
1674 return Status::OK();
1675 }
1676
1677 void PrepareWrite(size_t /*offset*/, size_t /*len*/) override { inc(21); }
1678
1679 Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
1680 inc(22);
1681 return Status::OK();
1682 }
1683
1684 public:
1685 ~Base() override { inc(23); }
1686 };
1687
1688 class Wrapper : public WritableFileWrapper {
1689 public:
1690 explicit Wrapper(WritableFile* target) : WritableFileWrapper(target) {}
1691 };
1692
1693 int step = 0;
1694
1695 {
1696 Base b(&step);
1697 Wrapper w(&b);
1698 w.Append(Slice());
1699 w.PositionedAppend(Slice(), 0);
1700 w.Truncate(0);
1701 w.Close();
1702 w.Flush();
1703 w.Sync();
1704 w.Fsync();
1705 w.IsSyncThreadSafe();
1706 w.use_direct_io();
1707 w.GetRequiredBufferAlignment();
1708 w.SetIOPriority(Env::IOPriority::IO_HIGH);
1709 w.GetIOPriority();
1710 w.SetWriteLifeTimeHint(Env::WriteLifeTimeHint::WLTH_NOT_SET);
1711 w.GetWriteLifeTimeHint();
1712 w.GetFileSize();
1713 w.SetPreallocationBlockSize(0);
1714 w.GetPreallocationStatus(nullptr, nullptr);
1715 w.GetUniqueId(nullptr, 0);
1716 w.InvalidateCache(0, 0);
1717 w.RangeSync(0, 0);
1718 w.PrepareWrite(0, 0);
1719 w.Allocate(0, 0);
1720 }
1721
1722 EXPECT_EQ(24, step);
1723 }
1724
1725 TEST_P(EnvPosixTestWithParam, PosixRandomRWFile) {
1726 const std::string path = test::PerThreadDBPath(env_, "random_rw_file");
1727
1728 env_->DeleteFile(path);
1729
1730 std::unique_ptr<RandomRWFile> file;
1731
1732 // Cannot open non-existing file.
1733 ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1734
1735 // Create the file using WriteableFile
1736 {
1737 std::unique_ptr<WritableFile> wf;
1738 ASSERT_OK(env_->NewWritableFile(path, &wf, EnvOptions()));
1739 }
1740
1741 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1742
1743 char buf[10000];
1744 Slice read_res;
1745
1746 ASSERT_OK(file->Write(0, "ABCD"));
1747 ASSERT_OK(file->Read(0, 10, &read_res, buf));
1748 ASSERT_EQ(read_res.ToString(), "ABCD");
1749
1750 ASSERT_OK(file->Write(2, "XXXX"));
1751 ASSERT_OK(file->Read(0, 10, &read_res, buf));
1752 ASSERT_EQ(read_res.ToString(), "ABXXXX");
1753
1754 ASSERT_OK(file->Write(10, "ZZZ"));
1755 ASSERT_OK(file->Read(10, 10, &read_res, buf));
1756 ASSERT_EQ(read_res.ToString(), "ZZZ");
1757
1758 ASSERT_OK(file->Write(11, "Y"));
1759 ASSERT_OK(file->Read(10, 10, &read_res, buf));
1760 ASSERT_EQ(read_res.ToString(), "ZYZ");
1761
1762 ASSERT_OK(file->Write(200, "FFFFF"));
1763 ASSERT_OK(file->Read(200, 10, &read_res, buf));
1764 ASSERT_EQ(read_res.ToString(), "FFFFF");
1765
1766 ASSERT_OK(file->Write(205, "XXXX"));
1767 ASSERT_OK(file->Read(200, 10, &read_res, buf));
1768 ASSERT_EQ(read_res.ToString(), "FFFFFXXXX");
1769
1770 ASSERT_OK(file->Write(5, "QQQQ"));
1771 ASSERT_OK(file->Read(0, 9, &read_res, buf));
1772 ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ");
1773
1774 ASSERT_OK(file->Read(2, 4, &read_res, buf));
1775 ASSERT_EQ(read_res.ToString(), "XXXQ");
1776
1777 // Close file and reopen it
1778 file->Close();
1779 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1780
1781 ASSERT_OK(file->Read(0, 9, &read_res, buf));
1782 ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ");
1783
1784 ASSERT_OK(file->Read(10, 3, &read_res, buf));
1785 ASSERT_EQ(read_res.ToString(), "ZYZ");
1786
1787 ASSERT_OK(file->Read(200, 9, &read_res, buf));
1788 ASSERT_EQ(read_res.ToString(), "FFFFFXXXX");
1789
1790 ASSERT_OK(file->Write(4, "TTTTTTTTTTTTTTTT"));
1791 ASSERT_OK(file->Read(0, 10, &read_res, buf));
1792 ASSERT_EQ(read_res.ToString(), "ABXXTTTTTT");
1793
1794 // Clean up
1795 env_->DeleteFile(path);
1796 }
1797
1798 class RandomRWFileWithMirrorString {
1799 public:
1800 explicit RandomRWFileWithMirrorString(RandomRWFile* _file) : file_(_file) {}
1801
1802 void Write(size_t offset, const std::string& data) {
1803 // Write to mirror string
1804 StringWrite(offset, data);
1805
1806 // Write to file
1807 Status s = file_->Write(offset, data);
1808 ASSERT_OK(s) << s.ToString();
1809 }
1810
1811 void Read(size_t offset = 0, size_t n = 1000000) {
1812 Slice str_res(nullptr, 0);
1813 if (offset < file_mirror_.size()) {
1814 size_t str_res_sz = std::min(file_mirror_.size() - offset, n);
1815 str_res = Slice(file_mirror_.data() + offset, str_res_sz);
1816 StopSliceAtNull(&str_res);
1817 }
1818
1819 Slice file_res;
1820 Status s = file_->Read(offset, n, &file_res, buf_);
1821 ASSERT_OK(s) << s.ToString();
1822 StopSliceAtNull(&file_res);
1823
1824 ASSERT_EQ(str_res.ToString(), file_res.ToString()) << offset << " " << n;
1825 }
1826
1827 void SetFile(RandomRWFile* _file) { file_ = _file; }
1828
1829 private:
1830 void StringWrite(size_t offset, const std::string& src) {
1831 if (offset + src.size() > file_mirror_.size()) {
1832 file_mirror_.resize(offset + src.size(), '\0');
1833 }
1834
1835 char* pos = const_cast<char*>(file_mirror_.data() + offset);
1836 memcpy(pos, src.data(), src.size());
1837 }
1838
1839 void StopSliceAtNull(Slice* slc) {
1840 for (size_t i = 0; i < slc->size(); i++) {
1841 if ((*slc)[i] == '\0') {
1842 *slc = Slice(slc->data(), i);
1843 break;
1844 }
1845 }
1846 }
1847
1848 char buf_[10000];
1849 RandomRWFile* file_;
1850 std::string file_mirror_;
1851 };
1852
1853 TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) {
1854 const std::string path = test::PerThreadDBPath(env_, "random_rw_file_rand");
1855 env_->DeleteFile(path);
1856
1857 std::unique_ptr<RandomRWFile> file;
1858
1859 #ifdef OS_LINUX
1860 // Cannot open non-existing file.
1861 ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1862 #endif
1863
1864 // Create the file using WriteableFile
1865 {
1866 std::unique_ptr<WritableFile> wf;
1867 ASSERT_OK(env_->NewWritableFile(path, &wf, EnvOptions()));
1868 }
1869
1870 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1871 RandomRWFileWithMirrorString file_with_mirror(file.get());
1872
1873 Random rnd(301);
1874 std::string buf;
1875 for (int i = 0; i < 10000; i++) {
1876 // Genrate random data
1877 test::RandomString(&rnd, 10, &buf);
1878
1879 // Pick random offset for write
1880 size_t write_off = rnd.Next() % 1000;
1881 file_with_mirror.Write(write_off, buf);
1882
1883 // Pick random offset for read
1884 size_t read_off = rnd.Next() % 1000;
1885 size_t read_sz = rnd.Next() % 20;
1886 file_with_mirror.Read(read_off, read_sz);
1887
1888 if (i % 500 == 0) {
1889 // Reopen the file every 500 iters
1890 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1891 file_with_mirror.SetFile(file.get());
1892 }
1893 }
1894
1895 // clean up
1896 env_->DeleteFile(path);
1897 }
1898
1899 class TestEnv : public EnvWrapper {
1900 public:
1901 explicit TestEnv() : EnvWrapper(Env::Default()),
1902 close_count(0) { }
1903
1904 class TestLogger : public Logger {
1905 public:
1906 using Logger::Logv;
1907 TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
1908 ~TestLogger() override {
1909 if (!closed_) {
1910 CloseHelper();
1911 }
1912 }
1913 void Logv(const char* /*format*/, va_list /*ap*/) override{};
1914
1915 protected:
1916 Status CloseImpl() override { return CloseHelper(); }
1917
1918 private:
1919 Status CloseHelper() {
1920 env->CloseCountInc();;
1921 return Status::OK();
1922 }
1923 TestEnv* env;
1924 };
1925
1926 void CloseCountInc() { close_count++; }
1927
1928 int GetCloseCount() { return close_count; }
1929
1930 Status NewLogger(const std::string& /*fname*/,
1931 std::shared_ptr<Logger>* result) override {
1932 result->reset(new TestLogger(this));
1933 return Status::OK();
1934 }
1935
1936 private:
1937 int close_count;
1938 };
1939
1940 class EnvTest : public testing::Test {};
1941
1942 TEST_F(EnvTest, Close) {
1943 TestEnv* env = new TestEnv();
1944 std::shared_ptr<Logger> logger;
1945 Status s;
1946
1947 s = env->NewLogger("", &logger);
1948 ASSERT_EQ(s, Status::OK());
1949 logger.get()->Close();
1950 ASSERT_EQ(env->GetCloseCount(), 1);
1951 // Call Close() again. CloseHelper() should not be called again
1952 logger.get()->Close();
1953 ASSERT_EQ(env->GetCloseCount(), 1);
1954 logger.reset();
1955 ASSERT_EQ(env->GetCloseCount(), 1);
1956
1957 s = env->NewLogger("", &logger);
1958 ASSERT_EQ(s, Status::OK());
1959 logger.reset();
1960 ASSERT_EQ(env->GetCloseCount(), 2);
1961
1962 delete env;
1963 }
1964
1965 INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam,
1966 ::testing::Values(std::pair<Env*, bool>(Env::Default(),
1967 false)));
1968 #if !defined(ROCKSDB_LITE)
1969 INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO, EnvPosixTestWithParam,
1970 ::testing::Values(std::pair<Env*, bool>(Env::Default(),
1971 true)));
1972 #endif // !defined(ROCKSDB_LITE)
1973
1974 #if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1975 static std::unique_ptr<Env> chroot_env(
1976 NewChrootEnv(Env::Default(), test::TmpDir(Env::Default())));
1977 INSTANTIATE_TEST_CASE_P(
1978 ChrootEnvWithoutDirectIO, EnvPosixTestWithParam,
1979 ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), false)));
1980 INSTANTIATE_TEST_CASE_P(
1981 ChrootEnvWithDirectIO, EnvPosixTestWithParam,
1982 ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), true)));
1983 #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1984
1985 class EnvFSTestWithParam
1986 : public ::testing::Test,
1987 public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
1988 public:
1989 EnvFSTestWithParam() {
1990 bool env_non_null = std::get<0>(GetParam());
1991 bool env_default = std::get<1>(GetParam());
1992 bool fs_default = std::get<2>(GetParam());
1993
1994 env_ = env_non_null ? (env_default ? Env::Default() : nullptr) : nullptr;
1995 fs_ = fs_default
1996 ? FileSystem::Default()
1997 : std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
1998 if (env_non_null && env_default && !fs_default) {
1999 env_ptr_ = NewCompositeEnv(fs_);
2000 }
2001 if (env_non_null && !env_default && fs_default) {
2002 env_ptr_ = std::unique_ptr<Env>(new FaultInjectionTestEnv(Env::Default()));
2003 fs_.reset();
2004 }
2005 if (env_non_null && !env_default && !fs_default) {
2006 env_ptr_.reset(new FaultInjectionTestEnv(Env::Default()));
2007 composite_env_ptr_.reset(new CompositeEnvWrapper(env_ptr_.get(), fs_));
2008 env_ = composite_env_ptr_.get();
2009 } else {
2010 env_ = env_ptr_.get();
2011 }
2012
2013 dbname1_ = test::PerThreadDBPath("env_fs_test1");
2014 dbname2_ = test::PerThreadDBPath("env_fs_test2");
2015 }
2016
2017 ~EnvFSTestWithParam() = default;
2018
2019 Env* env_;
2020 std::unique_ptr<Env> env_ptr_;
2021 std::unique_ptr<Env> composite_env_ptr_;
2022 std::shared_ptr<FileSystem> fs_;
2023 std::string dbname1_;
2024 std::string dbname2_;
2025 };
2026
2027 TEST_P(EnvFSTestWithParam, OptionsTest) {
2028 Options opts;
2029 opts.env = env_;
2030 opts.create_if_missing = true;
2031 std::string dbname = dbname1_;
2032
2033 if (env_) {
2034 if (fs_) {
2035 ASSERT_EQ(fs_.get(), env_->GetFileSystem().get());
2036 } else {
2037 ASSERT_NE(FileSystem::Default().get(), env_->GetFileSystem().get());
2038 }
2039 }
2040 for (int i = 0; i < 2; ++i) {
2041 DB* db;
2042 Status s = DB::Open(opts, dbname, &db);
2043 ASSERT_OK(s);
2044
2045 WriteOptions wo;
2046 db->Put(wo, "a", "a");
2047 db->Flush(FlushOptions());
2048 db->Put(wo, "b", "b");
2049 db->Flush(FlushOptions());
2050 db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2051
2052 std::string val;
2053 ASSERT_OK(db->Get(ReadOptions(), "a", &val));
2054 ASSERT_EQ("a", val);
2055 ASSERT_OK(db->Get(ReadOptions(), "b", &val));
2056 ASSERT_EQ("b", val);
2057
2058 db->Close();
2059 delete db;
2060 DestroyDB(dbname, opts);
2061
2062 dbname = dbname2_;
2063 }
2064 }
2065
2066 // The parameters are as follows -
2067 // 1. True means Options::env is non-null, false means null
2068 // 2. True means use Env::Default, false means custom
2069 // 3. True means use FileSystem::Default, false means custom
2070 INSTANTIATE_TEST_CASE_P(
2071 EnvFSTest, EnvFSTestWithParam,
2072 ::testing::Combine(::testing::Bool(), ::testing::Bool(),
2073 ::testing::Bool()));
2074
2075 // This test ensures that default Env and those allocated by
2076 // NewCompositeEnv() all share the same threadpool
2077 TEST_F(EnvTest, MultipleCompositeEnv) {
2078 std::shared_ptr<FaultInjectionTestFS> fs1 =
2079 std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
2080 std::shared_ptr<FaultInjectionTestFS> fs2 =
2081 std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
2082 std::unique_ptr<Env> env1 = NewCompositeEnv(fs1);
2083 std::unique_ptr<Env> env2 = NewCompositeEnv(fs2);
2084 Env::Default()->SetBackgroundThreads(8, Env::HIGH);
2085 Env::Default()->SetBackgroundThreads(16, Env::LOW);
2086
2087 ASSERT_EQ(env1->GetBackgroundThreads(Env::LOW), 16);
2088 ASSERT_EQ(env1->GetBackgroundThreads(Env::HIGH), 8);
2089 ASSERT_EQ(env2->GetBackgroundThreads(Env::LOW), 16);
2090 ASSERT_EQ(env2->GetBackgroundThreads(Env::HIGH), 8);
2091 }
2092
2093 } // namespace ROCKSDB_NAMESPACE
2094
main(int argc,char ** argv)2095 int main(int argc, char** argv) {
2096 ::testing::InitGoogleTest(&argc, argv);
2097 return RUN_ALL_TESTS();
2098 }
2099