1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #include <list>
8 #include <memory>
9 #include <string>
10 
11 #include "rocksdb/comparator.h"
12 #include "memory/arena.h"
13 #include "util/mutexlock.h"
14 
15 namespace ROCKSDB_NAMESPACE {
16 
17 //
18 // CacheWriteBuffer
19 //
20 // Buffer abstraction that can be manipulated via append
21 // (not thread safe)
22 class CacheWriteBuffer {
23  public:
CacheWriteBuffer(const size_t size)24   explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) {
25     buf_.reset(new char[size_]);
26     assert(!pos_);
27     assert(size_);
28   }
29 
~CacheWriteBuffer()30   virtual ~CacheWriteBuffer() {}
31 
Append(const char * buf,const size_t size)32   void Append(const char* buf, const size_t size) {
33     assert(pos_ + size <= size_);
34     memcpy(buf_.get() + pos_, buf, size);
35     pos_ += size;
36     assert(pos_ <= size_);
37   }
38 
FillTrailingZeros()39   void FillTrailingZeros() {
40     assert(pos_ <= size_);
41     memset(buf_.get() + pos_, '0', size_ - pos_);
42     pos_ = size_;
43   }
44 
Reset()45   void Reset() { pos_ = 0; }
Free()46   size_t Free() const { return size_ - pos_; }
Capacity()47   size_t Capacity() const { return size_; }
Used()48   size_t Used() const { return pos_; }
Data()49   char* Data() const { return buf_.get(); }
50 
51  private:
52   std::unique_ptr<char[]> buf_;
53   const size_t size_;
54   size_t pos_;
55 };
56 
57 //
58 // CacheWriteBufferAllocator
59 //
60 // Buffer pool abstraction(not thread safe)
61 //
62 class CacheWriteBufferAllocator {
63  public:
CacheWriteBufferAllocator(const size_t buffer_size,const size_t buffer_count)64   explicit CacheWriteBufferAllocator(const size_t buffer_size,
65                                      const size_t buffer_count)
66       : cond_empty_(&lock_), buffer_size_(buffer_size) {
67     MutexLock _(&lock_);
68     buffer_size_ = buffer_size;
69     for (uint32_t i = 0; i < buffer_count; i++) {
70       auto* buf = new CacheWriteBuffer(buffer_size_);
71       assert(buf);
72       if (buf) {
73         bufs_.push_back(buf);
74         cond_empty_.Signal();
75       }
76     }
77   }
78 
~CacheWriteBufferAllocator()79   virtual ~CacheWriteBufferAllocator() {
80     MutexLock _(&lock_);
81     assert(bufs_.size() * buffer_size_ == Capacity());
82     for (auto* buf : bufs_) {
83       delete buf;
84     }
85     bufs_.clear();
86   }
87 
Allocate()88   CacheWriteBuffer* Allocate() {
89     MutexLock _(&lock_);
90     if (bufs_.empty()) {
91       return nullptr;
92     }
93 
94     assert(!bufs_.empty());
95     CacheWriteBuffer* const buf = bufs_.front();
96     bufs_.pop_front();
97     return buf;
98   }
99 
Deallocate(CacheWriteBuffer * const buf)100   void Deallocate(CacheWriteBuffer* const buf) {
101     assert(buf);
102     MutexLock _(&lock_);
103     buf->Reset();
104     bufs_.push_back(buf);
105     cond_empty_.Signal();
106   }
107 
WaitUntilUsable()108   void WaitUntilUsable() {
109     // We are asked to wait till we have buffers available
110     MutexLock _(&lock_);
111     while (bufs_.empty()) {
112       cond_empty_.Wait();
113     }
114   }
115 
Capacity()116   size_t Capacity() const { return bufs_.size() * buffer_size_; }
Free()117   size_t Free() const { return bufs_.size() * buffer_size_; }
BufferSize()118   size_t BufferSize() const { return buffer_size_; }
119 
120  private:
121   port::Mutex lock_;                   // Sync lock
122   port::CondVar cond_empty_;           // Condition var for empty buffers
123   size_t buffer_size_;                 // Size of each buffer
124   std::list<CacheWriteBuffer*> bufs_;  // Buffer stash
125 };
126 
127 }  // namespace ROCKSDB_NAMESPACE
128