1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/writable_file_writer.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22 
23 namespace ROCKSDB_NAMESPACE {
Append(const Slice & data)24 IOStatus WritableFileWriter::Append(const Slice& data) {
25   const char* src = data.data();
26   size_t left = data.size();
27   IOStatus s;
28   pending_sync_ = true;
29 
30   TEST_KILL_RANDOM("WritableFileWriter::Append:0",
31                    rocksdb_kill_odds * REDUCE_ODDS2);
32 
33   {
34     IOSTATS_TIMER_GUARD(prepare_write_nanos);
35     TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
36     writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
37                                  IOOptions(), nullptr);
38   }
39 
40   // See whether we need to enlarge the buffer to avoid the flush
41   if (buf_.Capacity() - buf_.CurrentSize() < left) {
42     for (size_t cap = buf_.Capacity();
43          cap < max_buffer_size_;  // There is still room to increase
44          cap *= 2) {
45       // See whether the next available size is large enough.
46       // Buffer will never be increased to more than max_buffer_size_.
47       size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
48       if (desired_capacity - buf_.CurrentSize() >= left ||
49           (use_direct_io() && desired_capacity == max_buffer_size_)) {
50         buf_.AllocateNewBuffer(desired_capacity, true);
51         break;
52       }
53     }
54   }
55 
56   // Flush only when buffered I/O
57   if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
58     if (buf_.CurrentSize() > 0) {
59       s = Flush();
60       if (!s.ok()) {
61         return s;
62       }
63     }
64     assert(buf_.CurrentSize() == 0);
65   }
66 
67   // We never write directly to disk with direct I/O on.
68   // or we simply use it for its original purpose to accumulate many small
69   // chunks
70   if (use_direct_io() || (buf_.Capacity() >= left)) {
71     while (left > 0) {
72       size_t appended = buf_.Append(src, left);
73       left -= appended;
74       src += appended;
75 
76       if (left > 0) {
77         s = Flush();
78         if (!s.ok()) {
79           break;
80         }
81       }
82     }
83   } else {
84     // Writing directly to file bypassing the buffer
85     assert(buf_.CurrentSize() == 0);
86     s = WriteBuffered(src, left);
87   }
88 
89   TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
90   if (s.ok()) {
91     filesize_ += data.size();
92     CalculateFileChecksum(data);
93   }
94   return s;
95 }
96 
Pad(const size_t pad_bytes)97 IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
98   assert(pad_bytes < kDefaultPageSize);
99   size_t left = pad_bytes;
100   size_t cap = buf_.Capacity() - buf_.CurrentSize();
101 
102   // Assume pad_bytes is small compared to buf_ capacity. So we always
103   // use buf_ rather than write directly to file in certain cases like
104   // Append() does.
105   while (left) {
106     size_t append_bytes = std::min(cap, left);
107     buf_.PadWith(append_bytes, 0);
108     left -= append_bytes;
109     if (left > 0) {
110       IOStatus s = Flush();
111       if (!s.ok()) {
112         return s;
113       }
114     }
115     cap = buf_.Capacity() - buf_.CurrentSize();
116   }
117   pending_sync_ = true;
118   filesize_ += pad_bytes;
119   return IOStatus::OK();
120 }
121 
Close()122 IOStatus WritableFileWriter::Close() {
123   // Do not quit immediately on failure the file MUST be closed
124   IOStatus s;
125 
126   // Possible to close it twice now as we MUST close
127   // in __dtor, simply flushing is not enough
128   // Windows when pre-allocating does not fill with zeros
129   // also with unbuffered access we also set the end of data.
130   if (!writable_file_) {
131     return s;
132   }
133 
134   s = Flush();  // flush cache to OS
135 
136   IOStatus interim;
137   // In direct I/O mode we write whole pages so
138   // we need to let the file know where data ends.
139   if (use_direct_io()) {
140     interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
141     if (interim.ok()) {
142       interim = writable_file_->Fsync(IOOptions(), nullptr);
143     }
144     if (!interim.ok() && s.ok()) {
145       s = interim;
146     }
147   }
148 
149   TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
150   interim = writable_file_->Close(IOOptions(), nullptr);
151   if (!interim.ok() && s.ok()) {
152     s = interim;
153   }
154 
155   writable_file_.reset();
156   TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
157 
158   if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) {
159     checksum_generator_->Finalize();
160     checksum_finalized_ = true;
161   }
162 
163   return s;
164 }
165 
166 // write out the cached data to the OS cache or storage if direct I/O
167 // enabled
Flush()168 IOStatus WritableFileWriter::Flush() {
169   IOStatus s;
170   TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
171                    rocksdb_kill_odds * REDUCE_ODDS2);
172 
173   if (buf_.CurrentSize() > 0) {
174     if (use_direct_io()) {
175 #ifndef ROCKSDB_LITE
176       if (pending_sync_) {
177         s = WriteDirect();
178       }
179 #endif  // !ROCKSDB_LITE
180     } else {
181       s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
182     }
183     if (!s.ok()) {
184       return s;
185     }
186   }
187 
188   s = writable_file_->Flush(IOOptions(), nullptr);
189 
190   if (!s.ok()) {
191     return s;
192   }
193 
194   // sync OS cache to disk for every bytes_per_sync_
195   // TODO: give log file and sst file different options (log
196   // files could be potentially cached in OS for their whole
197   // life time, thus we might not want to flush at all).
198 
199   // We try to avoid sync to the last 1MB of data. For two reasons:
200   // (1) avoid rewrite the same page that is modified later.
201   // (2) for older version of OS, write can block while writing out
202   //     the page.
203   // Xfs does neighbor page flushing outside of the specified ranges. We
204   // need to make sure sync range is far from the write offset.
205   if (!use_direct_io() && bytes_per_sync_) {
206     const uint64_t kBytesNotSyncRange =
207         1024 * 1024;                                // recent 1MB is not synced.
208     const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
209     if (filesize_ > kBytesNotSyncRange) {
210       uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
211       offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
212       assert(offset_sync_to >= last_sync_size_);
213       if (offset_sync_to > 0 &&
214           offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
215         s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
216         last_sync_size_ = offset_sync_to;
217       }
218     }
219   }
220 
221   return s;
222 }
223 
GetFileChecksum()224 std::string WritableFileWriter::GetFileChecksum() {
225   if (checksum_generator_ != nullptr) {
226     return checksum_generator_->GetChecksum();
227   } else {
228     return kUnknownFileChecksum;
229   }
230 }
231 
GetFileChecksumFuncName() const232 const char* WritableFileWriter::GetFileChecksumFuncName() const {
233   if (checksum_generator_ != nullptr) {
234     return checksum_generator_->Name();
235   } else {
236     return kUnknownFileChecksumFuncName.c_str();
237   }
238 }
239 
Sync(bool use_fsync)240 IOStatus WritableFileWriter::Sync(bool use_fsync) {
241   IOStatus s = Flush();
242   if (!s.ok()) {
243     return s;
244   }
245   TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
246   if (!use_direct_io() && pending_sync_) {
247     s = SyncInternal(use_fsync);
248     if (!s.ok()) {
249       return s;
250     }
251   }
252   TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
253   pending_sync_ = false;
254   return IOStatus::OK();
255 }
256 
SyncWithoutFlush(bool use_fsync)257 IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
258   if (!writable_file_->IsSyncThreadSafe()) {
259     return IOStatus::NotSupported(
260         "Can't WritableFileWriter::SyncWithoutFlush() because "
261         "WritableFile::IsSyncThreadSafe() is false");
262   }
263   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
264   IOStatus s = SyncInternal(use_fsync);
265   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
266   return s;
267 }
268 
SyncInternal(bool use_fsync)269 IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
270   IOStatus s;
271   IOSTATS_TIMER_GUARD(fsync_nanos);
272   TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
273   auto prev_perf_level = GetPerfLevel();
274   IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
275   if (use_fsync) {
276     s = writable_file_->Fsync(IOOptions(), nullptr);
277   } else {
278     s = writable_file_->Sync(IOOptions(), nullptr);
279   }
280   SetPerfLevel(prev_perf_level);
281   return s;
282 }
283 
RangeSync(uint64_t offset,uint64_t nbytes)284 IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
285   IOSTATS_TIMER_GUARD(range_sync_nanos);
286   TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
287   return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
288 }
289 
290 // This method writes to disk the specified data and makes use of the rate
291 // limiter if available
WriteBuffered(const char * data,size_t size)292 IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
293   IOStatus s;
294   assert(!use_direct_io());
295   const char* src = data;
296   size_t left = size;
297 
298   while (left > 0) {
299     size_t allowed;
300     if (rate_limiter_ != nullptr) {
301       allowed = rate_limiter_->RequestToken(
302           left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
303           RateLimiter::OpType::kWrite);
304     } else {
305       allowed = left;
306     }
307 
308     {
309       IOSTATS_TIMER_GUARD(write_nanos);
310       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
311 
312 #ifndef ROCKSDB_LITE
313       FileOperationInfo::TimePoint start_ts;
314       uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
315       if (ShouldNotifyListeners()) {
316         start_ts = std::chrono::system_clock::now();
317         old_size = next_write_offset_;
318       }
319 #endif
320       {
321         auto prev_perf_level = GetPerfLevel();
322         IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
323         s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
324         SetPerfLevel(prev_perf_level);
325       }
326 #ifndef ROCKSDB_LITE
327       if (ShouldNotifyListeners()) {
328         auto finish_ts = std::chrono::system_clock::now();
329         NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
330       }
331 #endif
332       if (!s.ok()) {
333         return s;
334       }
335     }
336 
337     IOSTATS_ADD(bytes_written, allowed);
338     TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
339 
340     left -= allowed;
341     src += allowed;
342   }
343   buf_.Size(0);
344   return s;
345 }
346 
CalculateFileChecksum(const Slice & data)347 void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
348   if (checksum_generator_ != nullptr) {
349     checksum_generator_->Update(data.data(), data.size());
350   }
351 }
352 
353 // This flushes the accumulated data in the buffer. We pad data with zeros if
354 // necessary to the whole page.
355 // However, during automatic flushes padding would not be necessary.
356 // We always use RateLimiter if available. We move (Refit) any buffer bytes
357 // that are left over the
358 // whole number of pages to be written again on the next flush because we can
359 // only write on aligned
360 // offsets.
361 #ifndef ROCKSDB_LITE
WriteDirect()362 IOStatus WritableFileWriter::WriteDirect() {
363   assert(use_direct_io());
364   IOStatus s;
365   const size_t alignment = buf_.Alignment();
366   assert((next_write_offset_ % alignment) == 0);
367 
368   // Calculate whole page final file advance if all writes succeed
369   size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
370 
371   // Calculate the leftover tail, we write it here padded with zeros BUT we
372   // will write
373   // it again in the future either on Close() OR when the current whole page
374   // fills out
375   size_t leftover_tail = buf_.CurrentSize() - file_advance;
376 
377   // Round up and pad
378   buf_.PadToAlignmentWith(0);
379 
380   const char* src = buf_.BufferStart();
381   uint64_t write_offset = next_write_offset_;
382   size_t left = buf_.CurrentSize();
383 
384   while (left > 0) {
385     // Check how much is allowed
386     size_t size;
387     if (rate_limiter_ != nullptr) {
388       size = rate_limiter_->RequestToken(left, buf_.Alignment(),
389                                          writable_file_->GetIOPriority(),
390                                          stats_, RateLimiter::OpType::kWrite);
391     } else {
392       size = left;
393     }
394 
395     {
396       IOSTATS_TIMER_GUARD(write_nanos);
397       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
398       FileOperationInfo::TimePoint start_ts;
399       if (ShouldNotifyListeners()) {
400         start_ts = std::chrono::system_clock::now();
401       }
402       // direct writes must be positional
403       s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
404                                            IOOptions(), nullptr);
405       if (ShouldNotifyListeners()) {
406         auto finish_ts = std::chrono::system_clock::now();
407         NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
408       }
409       if (!s.ok()) {
410         buf_.Size(file_advance + leftover_tail);
411         return s;
412       }
413     }
414 
415     IOSTATS_ADD(bytes_written, size);
416     left -= size;
417     src += size;
418     write_offset += size;
419     assert((next_write_offset_ % alignment) == 0);
420   }
421 
422   if (s.ok()) {
423     // Move the tail to the beginning of the buffer
424     // This never happens during normal Append but rather during
425     // explicit call to Flush()/Sync() or Close()
426     buf_.RefitTail(file_advance, leftover_tail);
427     // This is where we start writing next time which may or not be
428     // the actual file size on disk. They match if the buffer size
429     // is a multiple of whole pages otherwise filesize_ is leftover_tail
430     // behind
431     next_write_offset_ += file_advance;
432   }
433   return s;
434 }
435 #endif  // !ROCKSDB_LITE
436 }  // namespace ROCKSDB_NAMESPACE
437