1 // Copyright (c) 2013, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 7 #include <list> 8 #include <memory> 9 #include <string> 10 11 #include "rocksdb/comparator.h" 12 #include "memory/arena.h" 13 #include "util/mutexlock.h" 14 15 namespace ROCKSDB_NAMESPACE { 16 17 // 18 // CacheWriteBuffer 19 // 20 // Buffer abstraction that can be manipulated via append 21 // (not thread safe) 22 class CacheWriteBuffer { 23 public: CacheWriteBuffer(const size_t size)24 explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) { 25 buf_.reset(new char[size_]); 26 assert(!pos_); 27 assert(size_); 28 } 29 ~CacheWriteBuffer()30 virtual ~CacheWriteBuffer() {} 31 Append(const char * buf,const size_t size)32 void Append(const char* buf, const size_t size) { 33 assert(pos_ + size <= size_); 34 memcpy(buf_.get() + pos_, buf, size); 35 pos_ += size; 36 assert(pos_ <= size_); 37 } 38 FillTrailingZeros()39 void FillTrailingZeros() { 40 assert(pos_ <= size_); 41 memset(buf_.get() + pos_, '0', size_ - pos_); 42 pos_ = size_; 43 } 44 Reset()45 void Reset() { pos_ = 0; } Free()46 size_t Free() const { return size_ - pos_; } Capacity()47 size_t Capacity() const { return size_; } Used()48 size_t Used() const { return pos_; } Data()49 char* Data() const { return buf_.get(); } 50 51 private: 52 std::unique_ptr<char[]> buf_; 53 const size_t size_; 54 size_t pos_; 55 }; 56 57 // 58 // CacheWriteBufferAllocator 59 // 60 // Buffer pool abstraction(not thread safe) 61 // 62 class CacheWriteBufferAllocator { 63 public: CacheWriteBufferAllocator(const size_t buffer_size,const size_t buffer_count)64 explicit CacheWriteBufferAllocator(const size_t buffer_size, 65 const size_t buffer_count) 66 : cond_empty_(&lock_), buffer_size_(buffer_size) { 67 MutexLock _(&lock_); 68 buffer_size_ = buffer_size; 69 for (uint32_t i = 0; i < buffer_count; i++) { 70 auto* buf = new CacheWriteBuffer(buffer_size_); 71 assert(buf); 72 if (buf) { 73 bufs_.push_back(buf); 74 cond_empty_.Signal(); 75 } 76 } 77 } 78 ~CacheWriteBufferAllocator()79 virtual ~CacheWriteBufferAllocator() { 80 MutexLock _(&lock_); 81 assert(bufs_.size() * buffer_size_ == Capacity()); 82 for (auto* buf : bufs_) { 83 delete buf; 84 } 85 bufs_.clear(); 86 } 87 Allocate()88 CacheWriteBuffer* Allocate() { 89 MutexLock _(&lock_); 90 if (bufs_.empty()) { 91 return nullptr; 92 } 93 94 assert(!bufs_.empty()); 95 CacheWriteBuffer* const buf = bufs_.front(); 96 bufs_.pop_front(); 97 return buf; 98 } 99 Deallocate(CacheWriteBuffer * const buf)100 void Deallocate(CacheWriteBuffer* const buf) { 101 assert(buf); 102 MutexLock _(&lock_); 103 buf->Reset(); 104 bufs_.push_back(buf); 105 cond_empty_.Signal(); 106 } 107 WaitUntilUsable()108 void WaitUntilUsable() { 109 // We are asked to wait till we have buffers available 110 MutexLock _(&lock_); 111 while (bufs_.empty()) { 112 cond_empty_.Wait(); 113 } 114 } 115 Capacity()116 size_t Capacity() const { return bufs_.size() * buffer_size_; } Free()117 size_t Free() const { return bufs_.size() * buffer_size_; } BufferSize()118 size_t BufferSize() const { return buffer_size_; } 119 120 private: 121 port::Mutex lock_; // Sync lock 122 port::CondVar cond_empty_; // Condition var for empty buffers 123 size_t buffer_size_; // Size of each buffer 124 std::list<CacheWriteBuffer*> bufs_; // Buffer stash 125 }; 126 127 } // namespace ROCKSDB_NAMESPACE 128