1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "utilities/trace/file_trace_reader_writer.h"
7 
8 #include "env/composite_env_wrapper.h"
9 #include "file/random_access_file_reader.h"
10 #include "file/writable_file_writer.h"
11 #include "trace_replay/trace_replay.h"
12 #include "util/coding.h"
13 
14 namespace ROCKSDB_NAMESPACE {
15 
16 const unsigned int FileTraceReader::kBufferSize = 1024;  // 1KB
17 
FileTraceReader(std::unique_ptr<RandomAccessFileReader> && reader)18 FileTraceReader::FileTraceReader(
19     std::unique_ptr<RandomAccessFileReader>&& reader)
20     : file_reader_(std::move(reader)),
21       offset_(0),
22       buffer_(new char[kBufferSize]) {}
23 
~FileTraceReader()24 FileTraceReader::~FileTraceReader() {
25   Close();
26   delete[] buffer_;
27 }
28 
Close()29 Status FileTraceReader::Close() {
30   file_reader_.reset();
31   return Status::OK();
32 }
33 
Read(std::string * data)34 Status FileTraceReader::Read(std::string* data) {
35   assert(file_reader_ != nullptr);
36   Status s = file_reader_->Read(offset_, kTraceMetadataSize, &result_, buffer_,
37                                 nullptr);
38   if (!s.ok()) {
39     return s;
40   }
41   if (result_.size() == 0) {
42     // No more data to read
43     // Todo: Come up with a better way to indicate end of data. May be this
44     // could be avoided once footer is introduced.
45     return Status::Incomplete();
46   }
47   if (result_.size() < kTraceMetadataSize) {
48     return Status::Corruption("Corrupted trace file.");
49   }
50   *data = result_.ToString();
51   offset_ += kTraceMetadataSize;
52 
53   uint32_t payload_len =
54       DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]);
55 
56   // Read Payload
57   unsigned int bytes_to_read = payload_len;
58   unsigned int to_read =
59       bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
60   while (to_read > 0) {
61     s = file_reader_->Read(offset_, to_read, &result_, buffer_, nullptr);
62     if (!s.ok()) {
63       return s;
64     }
65     if (result_.size() < to_read) {
66       return Status::Corruption("Corrupted trace file.");
67     }
68     data->append(result_.data(), result_.size());
69 
70     offset_ += to_read;
71     bytes_to_read -= to_read;
72     to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
73   }
74 
75   return s;
76 }
77 
~FileTraceWriter()78 FileTraceWriter::~FileTraceWriter() { Close(); }
79 
Close()80 Status FileTraceWriter::Close() {
81   file_writer_.reset();
82   return Status::OK();
83 }
84 
Write(const Slice & data)85 Status FileTraceWriter::Write(const Slice& data) {
86   return file_writer_->Append(data);
87 }
88 
GetFileSize()89 uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); }
90 
NewFileTraceReader(Env * env,const EnvOptions & env_options,const std::string & trace_filename,std::unique_ptr<TraceReader> * trace_reader)91 Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
92                           const std::string& trace_filename,
93                           std::unique_ptr<TraceReader>* trace_reader) {
94   std::unique_ptr<RandomAccessFile> trace_file;
95   Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options);
96   if (!s.ok()) {
97     return s;
98   }
99 
100   std::unique_ptr<RandomAccessFileReader> file_reader;
101   file_reader.reset(new RandomAccessFileReader(
102       NewLegacyRandomAccessFileWrapper(trace_file), trace_filename));
103   trace_reader->reset(new FileTraceReader(std::move(file_reader)));
104   return s;
105 }
106 
NewFileTraceWriter(Env * env,const EnvOptions & env_options,const std::string & trace_filename,std::unique_ptr<TraceWriter> * trace_writer)107 Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
108                           const std::string& trace_filename,
109                           std::unique_ptr<TraceWriter>* trace_writer) {
110   std::unique_ptr<WritableFile> trace_file;
111   Status s = env->NewWritableFile(trace_filename, &trace_file, env_options);
112   if (!s.ok()) {
113     return s;
114   }
115 
116   std::unique_ptr<WritableFileWriter> file_writer;
117   file_writer.reset(new WritableFileWriter(
118       NewLegacyWritableFileWrapper(std::move(trace_file)), trace_filename,
119       env_options));
120   trace_writer->reset(new FileTraceWriter(std::move(file_writer)));
121   return s;
122 }
123 
124 }  // namespace ROCKSDB_NAMESPACE
125