1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/writable_file_writer.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22
23 namespace ROCKSDB_NAMESPACE {
Append(const Slice & data)24 IOStatus WritableFileWriter::Append(const Slice& data) {
25 const char* src = data.data();
26 size_t left = data.size();
27 IOStatus s;
28 pending_sync_ = true;
29
30 TEST_KILL_RANDOM("WritableFileWriter::Append:0",
31 rocksdb_kill_odds * REDUCE_ODDS2);
32
33 {
34 IOSTATS_TIMER_GUARD(prepare_write_nanos);
35 TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
36 writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
37 IOOptions(), nullptr);
38 }
39
40 // See whether we need to enlarge the buffer to avoid the flush
41 if (buf_.Capacity() - buf_.CurrentSize() < left) {
42 for (size_t cap = buf_.Capacity();
43 cap < max_buffer_size_; // There is still room to increase
44 cap *= 2) {
45 // See whether the next available size is large enough.
46 // Buffer will never be increased to more than max_buffer_size_.
47 size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
48 if (desired_capacity - buf_.CurrentSize() >= left ||
49 (use_direct_io() && desired_capacity == max_buffer_size_)) {
50 buf_.AllocateNewBuffer(desired_capacity, true);
51 break;
52 }
53 }
54 }
55
56 // Flush only when buffered I/O
57 if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
58 if (buf_.CurrentSize() > 0) {
59 s = Flush();
60 if (!s.ok()) {
61 return s;
62 }
63 }
64 assert(buf_.CurrentSize() == 0);
65 }
66
67 // We never write directly to disk with direct I/O on.
68 // or we simply use it for its original purpose to accumulate many small
69 // chunks
70 if (use_direct_io() || (buf_.Capacity() >= left)) {
71 while (left > 0) {
72 size_t appended = buf_.Append(src, left);
73 left -= appended;
74 src += appended;
75
76 if (left > 0) {
77 s = Flush();
78 if (!s.ok()) {
79 break;
80 }
81 }
82 }
83 } else {
84 // Writing directly to file bypassing the buffer
85 assert(buf_.CurrentSize() == 0);
86 s = WriteBuffered(src, left);
87 }
88
89 TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
90 if (s.ok()) {
91 filesize_ += data.size();
92 CalculateFileChecksum(data);
93 }
94 return s;
95 }
96
Pad(const size_t pad_bytes)97 IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
98 assert(pad_bytes < kDefaultPageSize);
99 size_t left = pad_bytes;
100 size_t cap = buf_.Capacity() - buf_.CurrentSize();
101
102 // Assume pad_bytes is small compared to buf_ capacity. So we always
103 // use buf_ rather than write directly to file in certain cases like
104 // Append() does.
105 while (left) {
106 size_t append_bytes = std::min(cap, left);
107 buf_.PadWith(append_bytes, 0);
108 left -= append_bytes;
109 if (left > 0) {
110 IOStatus s = Flush();
111 if (!s.ok()) {
112 return s;
113 }
114 }
115 cap = buf_.Capacity() - buf_.CurrentSize();
116 }
117 pending_sync_ = true;
118 filesize_ += pad_bytes;
119 return IOStatus::OK();
120 }
121
Close()122 IOStatus WritableFileWriter::Close() {
123 // Do not quit immediately on failure the file MUST be closed
124 IOStatus s;
125
126 // Possible to close it twice now as we MUST close
127 // in __dtor, simply flushing is not enough
128 // Windows when pre-allocating does not fill with zeros
129 // also with unbuffered access we also set the end of data.
130 if (!writable_file_) {
131 return s;
132 }
133
134 s = Flush(); // flush cache to OS
135
136 IOStatus interim;
137 // In direct I/O mode we write whole pages so
138 // we need to let the file know where data ends.
139 if (use_direct_io()) {
140 interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
141 if (interim.ok()) {
142 interim = writable_file_->Fsync(IOOptions(), nullptr);
143 }
144 if (!interim.ok() && s.ok()) {
145 s = interim;
146 }
147 }
148
149 TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
150 interim = writable_file_->Close(IOOptions(), nullptr);
151 if (!interim.ok() && s.ok()) {
152 s = interim;
153 }
154
155 writable_file_.reset();
156 TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
157
158 if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) {
159 checksum_generator_->Finalize();
160 checksum_finalized_ = true;
161 }
162
163 return s;
164 }
165
166 // write out the cached data to the OS cache or storage if direct I/O
167 // enabled
Flush()168 IOStatus WritableFileWriter::Flush() {
169 IOStatus s;
170 TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
171 rocksdb_kill_odds * REDUCE_ODDS2);
172
173 if (buf_.CurrentSize() > 0) {
174 if (use_direct_io()) {
175 #ifndef ROCKSDB_LITE
176 if (pending_sync_) {
177 s = WriteDirect();
178 }
179 #endif // !ROCKSDB_LITE
180 } else {
181 s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
182 }
183 if (!s.ok()) {
184 return s;
185 }
186 }
187
188 s = writable_file_->Flush(IOOptions(), nullptr);
189
190 if (!s.ok()) {
191 return s;
192 }
193
194 // sync OS cache to disk for every bytes_per_sync_
195 // TODO: give log file and sst file different options (log
196 // files could be potentially cached in OS for their whole
197 // life time, thus we might not want to flush at all).
198
199 // We try to avoid sync to the last 1MB of data. For two reasons:
200 // (1) avoid rewrite the same page that is modified later.
201 // (2) for older version of OS, write can block while writing out
202 // the page.
203 // Xfs does neighbor page flushing outside of the specified ranges. We
204 // need to make sure sync range is far from the write offset.
205 if (!use_direct_io() && bytes_per_sync_) {
206 const uint64_t kBytesNotSyncRange =
207 1024 * 1024; // recent 1MB is not synced.
208 const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
209 if (filesize_ > kBytesNotSyncRange) {
210 uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
211 offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
212 assert(offset_sync_to >= last_sync_size_);
213 if (offset_sync_to > 0 &&
214 offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
215 s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
216 last_sync_size_ = offset_sync_to;
217 }
218 }
219 }
220
221 return s;
222 }
223
GetFileChecksum()224 std::string WritableFileWriter::GetFileChecksum() {
225 if (checksum_generator_ != nullptr) {
226 return checksum_generator_->GetChecksum();
227 } else {
228 return kUnknownFileChecksum;
229 }
230 }
231
GetFileChecksumFuncName() const232 const char* WritableFileWriter::GetFileChecksumFuncName() const {
233 if (checksum_generator_ != nullptr) {
234 return checksum_generator_->Name();
235 } else {
236 return kUnknownFileChecksumFuncName.c_str();
237 }
238 }
239
Sync(bool use_fsync)240 IOStatus WritableFileWriter::Sync(bool use_fsync) {
241 IOStatus s = Flush();
242 if (!s.ok()) {
243 return s;
244 }
245 TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
246 if (!use_direct_io() && pending_sync_) {
247 s = SyncInternal(use_fsync);
248 if (!s.ok()) {
249 return s;
250 }
251 }
252 TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
253 pending_sync_ = false;
254 return IOStatus::OK();
255 }
256
SyncWithoutFlush(bool use_fsync)257 IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
258 if (!writable_file_->IsSyncThreadSafe()) {
259 return IOStatus::NotSupported(
260 "Can't WritableFileWriter::SyncWithoutFlush() because "
261 "WritableFile::IsSyncThreadSafe() is false");
262 }
263 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
264 IOStatus s = SyncInternal(use_fsync);
265 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
266 return s;
267 }
268
SyncInternal(bool use_fsync)269 IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
270 IOStatus s;
271 IOSTATS_TIMER_GUARD(fsync_nanos);
272 TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
273 auto prev_perf_level = GetPerfLevel();
274 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
275 if (use_fsync) {
276 s = writable_file_->Fsync(IOOptions(), nullptr);
277 } else {
278 s = writable_file_->Sync(IOOptions(), nullptr);
279 }
280 SetPerfLevel(prev_perf_level);
281 return s;
282 }
283
RangeSync(uint64_t offset,uint64_t nbytes)284 IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
285 IOSTATS_TIMER_GUARD(range_sync_nanos);
286 TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
287 return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
288 }
289
290 // This method writes to disk the specified data and makes use of the rate
291 // limiter if available
WriteBuffered(const char * data,size_t size)292 IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
293 IOStatus s;
294 assert(!use_direct_io());
295 const char* src = data;
296 size_t left = size;
297
298 while (left > 0) {
299 size_t allowed;
300 if (rate_limiter_ != nullptr) {
301 allowed = rate_limiter_->RequestToken(
302 left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
303 RateLimiter::OpType::kWrite);
304 } else {
305 allowed = left;
306 }
307
308 {
309 IOSTATS_TIMER_GUARD(write_nanos);
310 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
311
312 #ifndef ROCKSDB_LITE
313 FileOperationInfo::TimePoint start_ts;
314 uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
315 if (ShouldNotifyListeners()) {
316 start_ts = std::chrono::system_clock::now();
317 old_size = next_write_offset_;
318 }
319 #endif
320 {
321 auto prev_perf_level = GetPerfLevel();
322 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
323 s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
324 SetPerfLevel(prev_perf_level);
325 }
326 #ifndef ROCKSDB_LITE
327 if (ShouldNotifyListeners()) {
328 auto finish_ts = std::chrono::system_clock::now();
329 NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
330 }
331 #endif
332 if (!s.ok()) {
333 return s;
334 }
335 }
336
337 IOSTATS_ADD(bytes_written, allowed);
338 TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
339
340 left -= allowed;
341 src += allowed;
342 }
343 buf_.Size(0);
344 return s;
345 }
346
CalculateFileChecksum(const Slice & data)347 void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
348 if (checksum_generator_ != nullptr) {
349 checksum_generator_->Update(data.data(), data.size());
350 }
351 }
352
353 // This flushes the accumulated data in the buffer. We pad data with zeros if
354 // necessary to the whole page.
355 // However, during automatic flushes padding would not be necessary.
356 // We always use RateLimiter if available. We move (Refit) any buffer bytes
357 // that are left over the
358 // whole number of pages to be written again on the next flush because we can
359 // only write on aligned
360 // offsets.
361 #ifndef ROCKSDB_LITE
WriteDirect()362 IOStatus WritableFileWriter::WriteDirect() {
363 assert(use_direct_io());
364 IOStatus s;
365 const size_t alignment = buf_.Alignment();
366 assert((next_write_offset_ % alignment) == 0);
367
368 // Calculate whole page final file advance if all writes succeed
369 size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
370
371 // Calculate the leftover tail, we write it here padded with zeros BUT we
372 // will write
373 // it again in the future either on Close() OR when the current whole page
374 // fills out
375 size_t leftover_tail = buf_.CurrentSize() - file_advance;
376
377 // Round up and pad
378 buf_.PadToAlignmentWith(0);
379
380 const char* src = buf_.BufferStart();
381 uint64_t write_offset = next_write_offset_;
382 size_t left = buf_.CurrentSize();
383
384 while (left > 0) {
385 // Check how much is allowed
386 size_t size;
387 if (rate_limiter_ != nullptr) {
388 size = rate_limiter_->RequestToken(left, buf_.Alignment(),
389 writable_file_->GetIOPriority(),
390 stats_, RateLimiter::OpType::kWrite);
391 } else {
392 size = left;
393 }
394
395 {
396 IOSTATS_TIMER_GUARD(write_nanos);
397 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
398 FileOperationInfo::TimePoint start_ts;
399 if (ShouldNotifyListeners()) {
400 start_ts = std::chrono::system_clock::now();
401 }
402 // direct writes must be positional
403 s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
404 IOOptions(), nullptr);
405 if (ShouldNotifyListeners()) {
406 auto finish_ts = std::chrono::system_clock::now();
407 NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
408 }
409 if (!s.ok()) {
410 buf_.Size(file_advance + leftover_tail);
411 return s;
412 }
413 }
414
415 IOSTATS_ADD(bytes_written, size);
416 left -= size;
417 src += size;
418 write_offset += size;
419 assert((next_write_offset_ % alignment) == 0);
420 }
421
422 if (s.ok()) {
423 // Move the tail to the beginning of the buffer
424 // This never happens during normal Append but rather during
425 // explicit call to Flush()/Sync() or Close()
426 buf_.RefitTail(file_advance, leftover_tail);
427 // This is where we start writing next time which may or not be
428 // the actual file size on disk. They match if the buffer size
429 // is a multiple of whole pages otherwise filesize_ is leftover_tail
430 // behind
431 next_write_offset_ += file_advance;
432 }
433 return s;
434 }
435 #endif // !ROCKSDB_LITE
436 } // namespace ROCKSDB_NAMESPACE
437