1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 
7 #ifndef ROCKSDB_LITE
8 
9 #ifdef GFLAGS
10 #ifdef NUMA
11 #include <numa.h>
12 #include <numaif.h>
13 #endif
14 #ifndef OS_WIN
15 #include <unistd.h>
16 #endif
17 
18 #include <cinttypes>
19 #include <cmath>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <memory>
23 #include <sstream>
24 #include <stdexcept>
25 
26 #include "db/db_impl/db_impl.h"
27 #include "db/memtable.h"
28 #include "db/write_batch_internal.h"
29 #include "env/composite_env_wrapper.h"
30 #include "file/read_write_util.h"
31 #include "file/writable_file_writer.h"
32 #include "options/cf_options.h"
33 #include "rocksdb/db.h"
34 #include "rocksdb/env.h"
35 #include "rocksdb/iterator.h"
36 #include "rocksdb/slice.h"
37 #include "rocksdb/slice_transform.h"
38 #include "rocksdb/status.h"
39 #include "rocksdb/table_properties.h"
40 #include "rocksdb/utilities/ldb_cmd.h"
41 #include "rocksdb/write_batch.h"
42 #include "table/meta_blocks.h"
43 #include "table/plain/plain_table_factory.h"
44 #include "table/table_reader.h"
45 #include "tools/trace_analyzer_tool.h"
46 #include "trace_replay/trace_replay.h"
47 #include "util/coding.h"
48 #include "util/compression.h"
49 #include "util/gflags_compat.h"
50 #include "util/random.h"
51 #include "util/string_util.h"
52 
53 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
54 using GFLAGS_NAMESPACE::RegisterFlagValidator;
55 using GFLAGS_NAMESPACE::SetUsageMessage;
56 
57 DEFINE_string(trace_path, "", "The trace file path.");
58 DEFINE_string(output_dir, "", "The directory to store the output files.");
59 DEFINE_string(output_prefix, "trace",
60               "The prefix used for all the output files.");
61 DEFINE_bool(output_key_stats, false,
62             "Output the key access count statistics to file\n"
63             "for accessed keys:\n"
64             "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n"
65             "Format:[cf_id value_size access_keyid access_count]\n"
66             "for the whole key space keys:\n"
67             "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n"
68             "Format:[whole_key_space_keyid access_count]");
69 DEFINE_bool(output_access_count_stats, false,
70             "Output the access count distribution statistics to file.\n"
71             "File name:  <prefix>-<query_type>-<cf_id>-accessed_"
72             "key_count_distribution.txt \n"
73             "Format:[access_count number_of_access_count]");
74 DEFINE_bool(output_time_series, false,
75             "Output the access time in second of each key, "
76             "such that we can have the time series data of the queries \n"
77             "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n"
78             "Format:[type_id time_in_sec access_keyid].");
79 DEFINE_bool(try_process_corrupted_trace, false,
80             "In default, trace_analyzer will exit if the trace file is "
81             "corrupted due to the unexpected tracing cases. If this option "
82             "is enabled, trace_analyzer will stop reading the trace file, "
83             "and start analyzing the read-in data.");
84 DEFINE_int32(output_prefix_cut, 0,
85              "The number of bytes as prefix to cut the keys.\n"
86              "If it is enabled, it will generate the following:\n"
87              "For accessed keys:\n"
88              "File name: <prefix>-<query_type>-<cf_id>-"
89              "accessed_key_prefix_cut.txt \n"
90              "Format:[acessed_keyid access_count_of_prefix "
91              "number_of_keys_in_prefix average_key_access "
92              "prefix_succ_ratio prefix]\n"
93              "For whole key space keys:\n"
94              "File name: <prefix>-<query_type>-<cf_id>"
95              "-whole_key_prefix_cut.txt\n"
96              "Format:[start_keyid_in_whole_keyspace prefix]\n"
97              "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
98              "File name: <prefix>-<query_type>-<cf_id>"
99              "-accessed_top_k_qps_prefix_cut.txt\n"
100              "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
101 DEFINE_bool(convert_to_human_readable_trace, false,
102             "Convert the binary trace file to a human readable txt file "
103             "for further processing. "
104             "This file will be extremely large "
105             "(similar size as the original binary trace file). "
106             "You can specify 'no_key' to reduce the size, if key is not "
107             "needed in the next step.\n"
108             "File name: <prefix>_human_readable_trace.txt\n"
109             "Format:[type_id cf_id value_size time_in_micorsec <key>].");
110 DEFINE_bool(output_qps_stats, false,
111             "Output the query per second(qps) statistics \n"
112             "For the overall qps, it will contain all qps of each query type. "
113             "The time is started from the first trace record\n"
114             "File name: <prefix>_qps_stats.txt\n"
115             "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
116             "For each cf and query, it will have its own qps output.\n"
117             "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n"
118             "Format:[query_count_in_this_second].");
119 DEFINE_bool(no_print, false, "Do not print out any result");
120 DEFINE_string(
121     print_correlation, "",
122     "intput format: [correlation pairs][.,.]\n"
123     "Output the query correlations between the pairs of query types "
124     "listed in the parameter, input should select the operations from:\n"
125     "get, put, delete, single_delete, rangle_delete, merge. No space "
126     "between the pairs separated by commar. Example: =[get,get]... "
127     "It will print out the number of pairs of 'A after B' and "
128     "the average time interval between the two query.");
129 DEFINE_string(key_space_dir, "",
130               "<the directory stores full key space files> \n"
131               "The key space files should be: <column family id>.txt");
132 DEFINE_bool(analyze_get, false, "Analyze the Get query.");
133 DEFINE_bool(analyze_put, false, "Analyze the Put query.");
134 DEFINE_bool(analyze_delete, false, "Analyze the Delete query.");
135 DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
136 DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
137 DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
138 DEFINE_bool(analyze_iterator, false,
139             " Analyze the iterate query like seek() and seekForPrev().");
140 DEFINE_bool(no_key, false,
141             " Does not output the key to the result files to make smaller.");
142 DEFINE_bool(print_overall_stats, true,
143             " Print the stats of the whole trace, "
144             "like total requests, keys, and etc.");
145 DEFINE_bool(output_key_distribution, false, "Print the key size distribution.");
146 DEFINE_bool(
147     output_value_distribution, false,
148     "Out put the value size distribution, only available for Put and Merge.\n"
149     "File name: <prefix>-<query_type>-<cf_id>"
150     "-accessed_value_size_distribution.txt\n"
151     "Format:[Number_of_value_size_between x and "
152     "x+value_interval is: <the count>]");
153 DEFINE_int32(print_top_k_access, 1,
154              "<top K of the variables to be printed> "
155              "Print the top k accessed keys, top k accessed prefix "
156              "and etc.");
157 DEFINE_int32(output_ignore_count, 0,
158              "<threshold>, ignores the access count <= this value, "
159              "it will shorter the output.");
160 DEFINE_int32(value_interval, 8,
161              "To output the value distribution, we need to set the value "
162              "intervals and make the statistic of the value size distribution "
163              "in different intervals. The default is 8.");
164 DEFINE_double(sample_ratio, 1.0,
165               "If the trace size is extremely huge or user want to sample "
166               "the trace when analyzing, sample ratio can be set (0, 1.0]");
167 
168 namespace ROCKSDB_NAMESPACE {
169 
170 std::map<std::string, int> taOptToIndex = {
171     {"get", 0},           {"put", 1},
172     {"delete", 2},        {"single_delete", 3},
173     {"range_delete", 4},  {"merge", 5},
174     {"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}};
175 
176 std::map<int, std::string> taIndexToOpt = {
177     {0, "get"},           {1, "put"},
178     {2, "delete"},        {3, "single_delete"},
179     {4, "range_delete"},  {5, "merge"},
180     {6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}};
181 
182 namespace {
183 
MultiplyCheckOverflow(uint64_t op1,uint64_t op2)184 uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
185   if (op1 == 0 || op2 == 0) {
186     return 0;
187   }
188   if (port::kMaxUint64 / op1 < op2) {
189     return op1;
190   }
191   return (op1 * op2);
192 }
193 
DecodeCFAndKeyFromString(std::string & buffer,uint32_t * cf_id,Slice * key)194 void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
195   Slice buf(buffer);
196   GetFixed32(&buf, cf_id);
197   GetLengthPrefixedSlice(&buf, key);
198 }
199 
200 }  // namespace
201 
202 // The default constructor of AnalyzerOptions
AnalyzerOptions()203 AnalyzerOptions::AnalyzerOptions()
204     : correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {}
205 
~AnalyzerOptions()206 AnalyzerOptions::~AnalyzerOptions() {}
207 
SparseCorrelationInput(const std::string & in_str)208 void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) {
209   std::string cur = in_str;
210   if (cur.size() == 0) {
211     return;
212   }
213   while (!cur.empty()) {
214     if (cur.compare(0, 1, "[") != 0) {
215       fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
216       exit(1);
217     }
218     std::string opt1, opt2;
219     std::size_t split = cur.find_first_of(",");
220     if (split != std::string::npos) {
221       opt1 = cur.substr(1, split - 1);
222     } else {
223       fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
224       exit(1);
225     }
226     std::size_t end = cur.find_first_of("]");
227     if (end != std::string::npos) {
228       opt2 = cur.substr(split + 1, end - split - 1);
229     } else {
230       fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
231       exit(1);
232     }
233     cur = cur.substr(end + 1);
234 
235     if (taOptToIndex.find(opt1) != taOptToIndex.end() &&
236         taOptToIndex.find(opt2) != taOptToIndex.end()) {
237       correlation_list.push_back(
238           std::make_pair(taOptToIndex[opt1], taOptToIndex[opt2]));
239     } else {
240       fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
241       exit(1);
242     }
243   }
244 
245   int sequence = 0;
246   for (auto& it : correlation_list) {
247     correlation_map[it.first][it.second] = sequence;
248     sequence++;
249   }
250   return;
251 }
252 
253 // The trace statistic struct constructor
TraceStats()254 TraceStats::TraceStats() {
255   cf_id = 0;
256   cf_name = "0";
257   a_count = 0;
258   a_key_id = 0;
259   a_key_size_sqsum = 0;
260   a_key_size_sum = 0;
261   a_key_mid = 0;
262   a_value_size_sqsum = 0;
263   a_value_size_sum = 0;
264   a_value_mid = 0;
265   a_peak_qps = 0;
266   a_ave_qps = 0.0;
267 }
268 
~TraceStats()269 TraceStats::~TraceStats() {}
270 
271 // The trace analyzer constructor
TraceAnalyzer(std::string & trace_path,std::string & output_path,AnalyzerOptions _analyzer_opts)272 TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
273                              AnalyzerOptions _analyzer_opts)
274     : trace_name_(trace_path),
275       output_path_(output_path),
276       analyzer_opts_(_analyzer_opts) {
277   ROCKSDB_NAMESPACE::EnvOptions env_options;
278   env_ = ROCKSDB_NAMESPACE::Env::Default();
279   offset_ = 0;
280   c_time_ = 0;
281   total_requests_ = 0;
282   total_access_keys_ = 0;
283   total_gets_ = 0;
284   total_writes_ = 0;
285   trace_create_time_ = 0;
286   begin_time_ = 0;
287   end_time_ = 0;
288   time_series_start_ = 0;
289   cur_time_sec_ = 0;
290   if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
291     sample_max_ = 1;
292   } else {
293     sample_max_ = static_cast<uint32_t>(1.0 / FLAGS_sample_ratio);
294   }
295 
296   ta_.resize(kTaTypeNum);
297   ta_[0].type_name = "get";
298   if (FLAGS_analyze_get) {
299     ta_[0].enabled = true;
300   } else {
301     ta_[0].enabled = false;
302   }
303   ta_[1].type_name = "put";
304   if (FLAGS_analyze_put) {
305     ta_[1].enabled = true;
306   } else {
307     ta_[1].enabled = false;
308   }
309   ta_[2].type_name = "delete";
310   if (FLAGS_analyze_delete) {
311     ta_[2].enabled = true;
312   } else {
313     ta_[2].enabled = false;
314   }
315   ta_[3].type_name = "single_delete";
316   if (FLAGS_analyze_single_delete) {
317     ta_[3].enabled = true;
318   } else {
319     ta_[3].enabled = false;
320   }
321   ta_[4].type_name = "range_delete";
322   if (FLAGS_analyze_range_delete) {
323     ta_[4].enabled = true;
324   } else {
325     ta_[4].enabled = false;
326   }
327   ta_[5].type_name = "merge";
328   if (FLAGS_analyze_merge) {
329     ta_[5].enabled = true;
330   } else {
331     ta_[5].enabled = false;
332   }
333   ta_[6].type_name = "iterator_Seek";
334   if (FLAGS_analyze_iterator) {
335     ta_[6].enabled = true;
336   } else {
337     ta_[6].enabled = false;
338   }
339   ta_[7].type_name = "iterator_SeekForPrev";
340   if (FLAGS_analyze_iterator) {
341     ta_[7].enabled = true;
342   } else {
343     ta_[7].enabled = false;
344   }
345   for (int i = 0; i < kTaTypeNum; i++) {
346     ta_[i].sample_count = 0;
347   }
348 }
349 
~TraceAnalyzer()350 TraceAnalyzer::~TraceAnalyzer() {}
351 
352 // Prepare the processing
353 // Initiate the global trace reader and writer here
PrepareProcessing()354 Status TraceAnalyzer::PrepareProcessing() {
355   Status s;
356   // Prepare the trace reader
357   s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
358   if (!s.ok()) {
359     return s;
360   }
361 
362   // Prepare and open the trace sequence file writer if needed
363   if (FLAGS_convert_to_human_readable_trace) {
364     std::string trace_sequence_name;
365     trace_sequence_name =
366         output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt";
367     s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_,
368                               env_options_);
369     if (!s.ok()) {
370       return s;
371     }
372   }
373 
374   // prepare the general QPS file writer
375   if (FLAGS_output_qps_stats) {
376     std::string qps_stats_name;
377     qps_stats_name =
378         output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt";
379     s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_);
380     if (!s.ok()) {
381       return s;
382     }
383 
384     qps_stats_name =
385         output_path_ + "/" + FLAGS_output_prefix + "-cf_qps_stats.txt";
386     s = env_->NewWritableFile(qps_stats_name, &cf_qps_f_, env_options_);
387     if (!s.ok()) {
388       return s;
389     }
390   }
391   return Status::OK();
392 }
393 
ReadTraceHeader(Trace * header)394 Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
395   assert(header != nullptr);
396   Status s = ReadTraceRecord(header);
397   if (!s.ok()) {
398     return s;
399   }
400   if (header->type != kTraceBegin) {
401     return Status::Corruption("Corrupted trace file. Incorrect header.");
402   }
403   if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
404     return Status::Corruption("Corrupted trace file. Incorrect magic.");
405   }
406 
407   return s;
408 }
409 
ReadTraceFooter(Trace * footer)410 Status TraceAnalyzer::ReadTraceFooter(Trace* footer) {
411   assert(footer != nullptr);
412   Status s = ReadTraceRecord(footer);
413   if (!s.ok()) {
414     return s;
415   }
416   if (footer->type != kTraceEnd) {
417     return Status::Corruption("Corrupted trace file. Incorrect footer.");
418   }
419   return s;
420 }
421 
ReadTraceRecord(Trace * trace)422 Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
423   assert(trace != nullptr);
424   std::string encoded_trace;
425   Status s = trace_reader_->Read(&encoded_trace);
426   if (!s.ok()) {
427     return s;
428   }
429 
430   Slice enc_slice = Slice(encoded_trace);
431   GetFixed64(&enc_slice, &trace->ts);
432   trace->type = static_cast<TraceType>(enc_slice[0]);
433   enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
434   trace->payload = enc_slice.ToString();
435   return s;
436 }
437 
438 // process the trace itself and redirect the trace content
439 // to different operation type handler. With different race
440 // format, this function can be changed
StartProcessing()441 Status TraceAnalyzer::StartProcessing() {
442   Status s;
443   Trace header;
444   s = ReadTraceHeader(&header);
445   if (!s.ok()) {
446     fprintf(stderr, "Cannot read the header\n");
447     return s;
448   }
449   trace_create_time_ = header.ts;
450   if (FLAGS_output_time_series) {
451     time_series_start_ = header.ts;
452   }
453 
454   Trace trace;
455   while (s.ok()) {
456     trace.reset();
457     s = ReadTraceRecord(&trace);
458     if (!s.ok()) {
459       break;
460     }
461 
462     total_requests_++;
463     end_time_ = trace.ts;
464     if (trace.type == kTraceWrite) {
465       total_writes_++;
466       c_time_ = trace.ts;
467       WriteBatch batch(trace.payload);
468 
469       // Note that, if the write happens in a transaction,
470       // 'Write' will be called twice, one for Prepare, one for
471       // Commit. Thus, in the trace, for the same WriteBatch, there
472       // will be two reords if it is in a transaction. Here, we only
473       // process the reord that is committed. If write is non-transaction,
474       // HasBeginPrepare()==false, so we process it normally.
475       if (batch.HasBeginPrepare() && !batch.HasCommit()) {
476         continue;
477       }
478       TraceWriteHandler write_handler(this);
479       s = batch.Iterate(&write_handler);
480       if (!s.ok()) {
481         fprintf(stderr, "Cannot process the write batch in the trace\n");
482         return s;
483       }
484     } else if (trace.type == kTraceGet) {
485       uint32_t cf_id = 0;
486       Slice key;
487       DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
488       total_gets_++;
489 
490       s = HandleGet(cf_id, key.ToString(), trace.ts, 1);
491       if (!s.ok()) {
492         fprintf(stderr, "Cannot process the get in the trace\n");
493         return s;
494       }
495     } else if (trace.type == kTraceIteratorSeek ||
496                trace.type == kTraceIteratorSeekForPrev) {
497       uint32_t cf_id = 0;
498       Slice key;
499       DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
500       s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type);
501       if (!s.ok()) {
502         fprintf(stderr, "Cannot process the iterator in the trace\n");
503         return s;
504       }
505     } else if (trace.type == kTraceEnd) {
506       break;
507     }
508   }
509   if (s.IsIncomplete()) {
510     // Fix it: Reaching eof returns Incomplete status at the moment.
511     //
512     return Status::OK();
513   }
514   return s;
515 }
516 
517 // After the trace is processed by StartProcessing, the statistic data
518 // is stored in the map or other in memory data structures. To get the
519 // other statistic result such as key size distribution, value size
520 // distribution, these data structures are re-processed here.
MakeStatistics()521 Status TraceAnalyzer::MakeStatistics() {
522   int ret;
523   Status s;
524   for (int type = 0; type < kTaTypeNum; type++) {
525     if (!ta_[type].enabled) {
526       continue;
527     }
528     for (auto& stat : ta_[type].stats) {
529       stat.second.a_key_id = 0;
530       for (auto& record : stat.second.a_key_stats) {
531         record.second.key_id = stat.second.a_key_id;
532         stat.second.a_key_id++;
533         if (record.second.access_count <=
534             static_cast<uint64_t>(FLAGS_output_ignore_count)) {
535           continue;
536         }
537 
538         // Generate the key access count distribution data
539         if (FLAGS_output_access_count_stats) {
540           if (stat.second.a_count_stats.find(record.second.access_count) ==
541               stat.second.a_count_stats.end()) {
542             stat.second.a_count_stats[record.second.access_count] = 1;
543           } else {
544             stat.second.a_count_stats[record.second.access_count]++;
545           }
546         }
547 
548         // Generate the key size distribution data
549         if (FLAGS_output_key_distribution) {
550           if (stat.second.a_key_size_stats.find(record.first.size()) ==
551               stat.second.a_key_size_stats.end()) {
552             stat.second.a_key_size_stats[record.first.size()] = 1;
553           } else {
554             stat.second.a_key_size_stats[record.first.size()]++;
555           }
556         }
557 
558         if (!FLAGS_print_correlation.empty()) {
559           s = MakeStatisticCorrelation(stat.second, record.second);
560           if (!s.ok()) {
561             return s;
562           }
563         }
564       }
565 
566       // Output the prefix cut or the whole content of the accessed key space
567       if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) {
568         s = MakeStatisticKeyStatsOrPrefix(stat.second);
569         if (!s.ok()) {
570           return s;
571         }
572       }
573 
574       // output the access count distribution
575       if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) {
576         for (auto& record : stat.second.a_count_stats) {
577           ret = snprintf(buffer_, sizeof(buffer_),
578                          "access_count: %" PRIu64 " num: %" PRIu64 "\n",
579                          record.first, record.second);
580           if (ret < 0) {
581             return Status::IOError("Format the output failed");
582           }
583           std::string printout(buffer_);
584           s = stat.second.a_count_dist_f->Append(printout);
585           if (!s.ok()) {
586             fprintf(stderr, "Write access count distribution file failed\n");
587             return s;
588           }
589         }
590       }
591 
592       // find the medium of the key size
593       uint64_t k_count = 0;
594       bool get_mid = false;
595       for (auto& record : stat.second.a_key_size_stats) {
596         k_count += record.second;
597         if (!get_mid && k_count >= stat.second.a_key_mid) {
598           stat.second.a_key_mid = record.first;
599           get_mid = true;
600         }
601         if (FLAGS_output_key_distribution && stat.second.a_key_size_f) {
602           ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %" PRIu64 "\n",
603                          record.first, record.second);
604           if (ret < 0) {
605             return Status::IOError("Format output failed");
606           }
607           std::string printout(buffer_);
608           s = stat.second.a_key_size_f->Append(printout);
609           if (!s.ok()) {
610             fprintf(stderr, "Write key size distribution file failed\n");
611             return s;
612           }
613         }
614       }
615 
616       // output the value size distribution
617       uint64_t v_begin = 0, v_end = 0, v_count = 0;
618       get_mid = false;
619       for (auto& record : stat.second.a_value_size_stats) {
620         v_begin = v_end;
621         v_end = (record.first + 1) * FLAGS_value_interval;
622         v_count += record.second;
623         if (!get_mid && v_count >= stat.second.a_count / 2) {
624           stat.second.a_value_mid = (v_begin + v_end) / 2;
625           get_mid = true;
626         }
627         if (FLAGS_output_value_distribution && stat.second.a_value_size_f &&
628             (type == TraceOperationType::kPut ||
629              type == TraceOperationType::kMerge)) {
630           ret = snprintf(buffer_, sizeof(buffer_),
631                          "Number_of_value_size_between %" PRIu64 " and %" PRIu64
632                          " is: %" PRIu64 "\n",
633                          v_begin, v_end, record.second);
634           if (ret < 0) {
635             return Status::IOError("Format output failed");
636           }
637           std::string printout(buffer_);
638           s = stat.second.a_value_size_f->Append(printout);
639           if (!s.ok()) {
640             fprintf(stderr, "Write value size distribution file failed\n");
641             return s;
642           }
643         }
644       }
645     }
646   }
647 
648   // Make the QPS statistics
649   if (FLAGS_output_qps_stats) {
650     s = MakeStatisticQPS();
651     if (!s.ok()) {
652       return s;
653     }
654   }
655 
656   return Status::OK();
657 }
658 
659 // Process the statistics of the key access and
660 // prefix of the accessed keys if required
MakeStatisticKeyStatsOrPrefix(TraceStats & stats)661 Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) {
662   int ret;
663   Status s;
664   std::string prefix = "0";
665   uint64_t prefix_access = 0;
666   uint64_t prefix_count = 0;
667   uint64_t prefix_succ_access = 0;
668   double prefix_ave_access = 0.0;
669   stats.a_succ_count = 0;
670   for (auto& record : stats.a_key_stats) {
671     // write the key access statistic file
672     if (!stats.a_key_f) {
673       return Status::IOError("Failed to open accessed_key_stats file.");
674     }
675     stats.a_succ_count += record.second.succ_count;
676     double succ_ratio = 0.0;
677     if (record.second.access_count > 0) {
678       succ_ratio = (static_cast<double>(record.second.succ_count)) /
679                    record.second.access_count;
680     }
681     ret = snprintf(buffer_, sizeof(buffer_),
682                    "%u %zu %" PRIu64 " %" PRIu64 " %f\n", record.second.cf_id,
683                    record.second.value_size, record.second.key_id,
684                    record.second.access_count, succ_ratio);
685     if (ret < 0) {
686       return Status::IOError("Format output failed");
687     }
688     std::string printout(buffer_);
689     s = stats.a_key_f->Append(printout);
690     if (!s.ok()) {
691       fprintf(stderr, "Write key access file failed\n");
692       return s;
693     }
694 
695     // write the prefix cut of the accessed keys
696     if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) {
697       if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) {
698         std::string prefix_out =
699             ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix);
700         if (prefix_count == 0) {
701           prefix_ave_access = 0.0;
702         } else {
703           prefix_ave_access =
704               (static_cast<double>(prefix_access)) / prefix_count;
705         }
706         double prefix_succ_ratio = 0.0;
707         if (prefix_access > 0) {
708           prefix_succ_ratio =
709               (static_cast<double>(prefix_succ_access)) / prefix_access;
710         }
711         ret =
712             snprintf(buffer_, sizeof(buffer_),
713                      "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n",
714                      record.second.key_id, prefix_access, prefix_count,
715                      prefix_ave_access, prefix_succ_ratio, prefix_out.c_str());
716         if (ret < 0) {
717           return Status::IOError("Format output failed");
718         }
719         std::string pout(buffer_);
720         s = stats.a_prefix_cut_f->Append(pout);
721         if (!s.ok()) {
722           fprintf(stderr, "Write accessed key prefix file failed\n");
723           return s;
724         }
725 
726         // make the top k statistic for the prefix
727         if (static_cast<int32_t>(stats.top_k_prefix_access.size()) <
728             FLAGS_print_top_k_access) {
729           stats.top_k_prefix_access.push(
730               std::make_pair(prefix_access, prefix_out));
731         } else {
732           if (prefix_access > stats.top_k_prefix_access.top().first) {
733             stats.top_k_prefix_access.pop();
734             stats.top_k_prefix_access.push(
735                 std::make_pair(prefix_access, prefix_out));
736           }
737         }
738 
739         if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) <
740             FLAGS_print_top_k_access) {
741           stats.top_k_prefix_ave.push(
742               std::make_pair(prefix_ave_access, prefix_out));
743         } else {
744           if (prefix_ave_access > stats.top_k_prefix_ave.top().first) {
745             stats.top_k_prefix_ave.pop();
746             stats.top_k_prefix_ave.push(
747                 std::make_pair(prefix_ave_access, prefix_out));
748           }
749         }
750 
751         prefix = record.first.substr(0, FLAGS_output_prefix_cut);
752         prefix_access = 0;
753         prefix_count = 0;
754         prefix_succ_access = 0;
755       }
756       prefix_access += record.second.access_count;
757       prefix_count += 1;
758       prefix_succ_access += record.second.succ_count;
759     }
760   }
761   return Status::OK();
762 }
763 
764 // Process the statistics of different query type
765 // correlations
MakeStatisticCorrelation(TraceStats & stats,StatsUnit & unit)766 Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
767                                                StatsUnit& unit) {
768   if (stats.correlation_output.size() !=
769       analyzer_opts_.correlation_list.size()) {
770     return Status::Corruption("Cannot make the statistic of correlation.");
771   }
772 
773   for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size());
774        i++) {
775     if (i >= static_cast<int>(stats.correlation_output.size()) ||
776         i >= static_cast<int>(unit.v_correlation.size())) {
777       break;
778     }
779     stats.correlation_output[i].first += unit.v_correlation[i].count;
780     stats.correlation_output[i].second += unit.v_correlation[i].total_ts;
781   }
782   return Status::OK();
783 }
784 
785 // Process the statistics of QPS
MakeStatisticQPS()786 Status TraceAnalyzer::MakeStatisticQPS() {
787   if(begin_time_ == 0) {
788     begin_time_ = trace_create_time_;
789   }
790   uint32_t duration =
791       static_cast<uint32_t>((end_time_ - begin_time_) / 1000000);
792   int ret;
793   Status s;
794   std::vector<std::vector<uint32_t>> type_qps(
795       duration, std::vector<uint32_t>(kTaTypeNum + 1, 0));
796   std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0);
797   std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0);
798   qps_ave_.resize(kTaTypeNum + 1);
799 
800   for (int type = 0; type < kTaTypeNum; type++) {
801     if (!ta_[type].enabled) {
802       continue;
803     }
804     for (auto& stat : ta_[type].stats) {
805       uint32_t time_line = 0;
806       uint64_t cf_qps_sum = 0;
807       for (auto& time_it : stat.second.a_qps_stats) {
808         if (time_it.first >= duration) {
809           continue;
810         }
811         type_qps[time_it.first][kTaTypeNum] += time_it.second;
812         type_qps[time_it.first][type] += time_it.second;
813         cf_qps_sum += time_it.second;
814         if (time_it.second > stat.second.a_peak_qps) {
815           stat.second.a_peak_qps = time_it.second;
816         }
817         if (stat.second.a_qps_f) {
818           while (time_line < time_it.first) {
819             ret = snprintf(buffer_, sizeof(buffer_), "%u\n", 0);
820             if (ret < 0) {
821               return Status::IOError("Format the output failed");
822             }
823             std::string printout(buffer_);
824             s = stat.second.a_qps_f->Append(printout);
825             if (!s.ok()) {
826               fprintf(stderr, "Write QPS file failed\n");
827               return s;
828             }
829             time_line++;
830           }
831           ret = snprintf(buffer_, sizeof(buffer_), "%u\n", time_it.second);
832           if (ret < 0) {
833             return Status::IOError("Format the output failed");
834           }
835           std::string printout(buffer_);
836           s = stat.second.a_qps_f->Append(printout);
837           if (!s.ok()) {
838             fprintf(stderr, "Write QPS file failed\n");
839             return s;
840           }
841           if (time_line == time_it.first) {
842             time_line++;
843           }
844         }
845 
846         // Process the top k QPS peaks
847         if (FLAGS_output_prefix_cut > 0) {
848           if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) <
849               FLAGS_print_top_k_access) {
850             stat.second.top_k_qps_sec.push(
851                 std::make_pair(time_it.second, time_it.first));
852           } else {
853             if (stat.second.top_k_qps_sec.size() > 0 &&
854                 stat.second.top_k_qps_sec.top().first < time_it.second) {
855               stat.second.top_k_qps_sec.pop();
856               stat.second.top_k_qps_sec.push(
857                   std::make_pair(time_it.second, time_it.first));
858             }
859           }
860         }
861       }
862       if (duration == 0) {
863         stat.second.a_ave_qps = 0;
864       } else {
865         stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration;
866       }
867 
868       // Output the accessed unique key number change overtime
869       if (stat.second.a_key_num_f) {
870         uint64_t cur_uni_key =
871             static_cast<uint64_t>(stat.second.a_key_stats.size());
872         double cur_ratio = 0.0;
873         uint64_t cur_num = 0;
874         for (uint32_t i = 0; i < duration; i++) {
875           auto find_time = stat.second.uni_key_num.find(i);
876           if (find_time != stat.second.uni_key_num.end()) {
877             cur_ratio = (static_cast<double>(find_time->second)) / cur_uni_key;
878             cur_num = find_time->second;
879           }
880           ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %.12f\n",
881                          cur_num, cur_ratio);
882           if (ret < 0) {
883             return Status::IOError("Format the output failed");
884           }
885           std::string printout(buffer_);
886           s = stat.second.a_key_num_f->Append(printout);
887           if (!s.ok()) {
888             fprintf(stderr,
889                     "Write accessed unique key number change file failed\n");
890             return s;
891           }
892         }
893       }
894 
895       // output the prefix of top k access peak
896       if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) {
897         while (!stat.second.top_k_qps_sec.empty()) {
898           ret = snprintf(buffer_, sizeof(buffer_), "At time: %u with QPS: %u\n",
899                          stat.second.top_k_qps_sec.top().second,
900                          stat.second.top_k_qps_sec.top().first);
901           if (ret < 0) {
902             return Status::IOError("Format the output failed");
903           }
904           std::string printout(buffer_);
905           s = stat.second.a_top_qps_prefix_f->Append(printout);
906           if (!s.ok()) {
907             fprintf(stderr, "Write prefix QPS top K file failed\n");
908             return s;
909           }
910           uint32_t qps_time = stat.second.top_k_qps_sec.top().second;
911           stat.second.top_k_qps_sec.pop();
912           if (stat.second.a_qps_prefix_stats.find(qps_time) !=
913               stat.second.a_qps_prefix_stats.end()) {
914             for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) {
915               std::string qps_prefix_out =
916                   ROCKSDB_NAMESPACE::LDBCommand::StringToHex(qps_prefix.first);
917               ret = snprintf(buffer_, sizeof(buffer_),
918                              "The prefix: %s Access count: %u\n",
919                              qps_prefix_out.c_str(), qps_prefix.second);
920               if (ret < 0) {
921                 return Status::IOError("Format the output failed");
922               }
923               std::string pout(buffer_);
924               s = stat.second.a_top_qps_prefix_f->Append(pout);
925               if (!s.ok()) {
926                 fprintf(stderr, "Write prefix QPS top K file failed\n");
927                 return s;
928               }
929             }
930           }
931         }
932       }
933     }
934   }
935 
936   if (qps_f_) {
937     for (uint32_t i = 0; i < duration; i++) {
938       for (int type = 0; type <= kTaTypeNum; type++) {
939         if (type < kTaTypeNum) {
940           ret = snprintf(buffer_, sizeof(buffer_), "%u ", type_qps[i][type]);
941         } else {
942           ret = snprintf(buffer_, sizeof(buffer_), "%u\n", type_qps[i][type]);
943         }
944         if (ret < 0) {
945           return Status::IOError("Format the output failed");
946         }
947         std::string printout(buffer_);
948         s = qps_f_->Append(printout);
949         if (!s.ok()) {
950           return s;
951         }
952         qps_sum[type] += type_qps[i][type];
953         if (type_qps[i][type] > qps_peak[type]) {
954           qps_peak[type] = type_qps[i][type];
955         }
956       }
957     }
958   }
959 
960   if (cf_qps_f_) {
961     int cfs_size = static_cast<uint32_t>(cfs_.size());
962     uint32_t v;
963     for (uint32_t i = 0; i < duration; i++) {
964       for (int cf = 0; cf < cfs_size; cf++) {
965         if (cfs_[cf].cf_qps.find(i) != cfs_[cf].cf_qps.end()) {
966           v = cfs_[cf].cf_qps[i];
967         } else {
968           v = 0;
969         }
970         if (cf < cfs_size - 1) {
971           ret = snprintf(buffer_, sizeof(buffer_), "%u ", v);
972         } else {
973           ret = snprintf(buffer_, sizeof(buffer_), "%u\n", v);
974         }
975         if (ret < 0) {
976           return Status::IOError("Format the output failed");
977         }
978         std::string printout(buffer_);
979         s = cf_qps_f_->Append(printout);
980         if (!s.ok()) {
981           return s;
982         }
983       }
984     }
985   }
986 
987   qps_peak_ = qps_peak;
988   for (int type = 0; type <= kTaTypeNum; type++) {
989     if (duration == 0) {
990       qps_ave_[type] = 0;
991     } else {
992       qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration;
993     }
994   }
995 
996   return Status::OK();
997 }
998 
999 // In reprocessing, if we have the whole key space
1000 // we can output the access count of all keys in a cf
1001 // we can make some statistics of the whole key space
1002 // also, we output the top k accessed keys here
ReProcessing()1003 Status TraceAnalyzer::ReProcessing() {
1004   int ret;
1005   Status s;
1006   for (auto& cf_it : cfs_) {
1007     uint32_t cf_id = cf_it.first;
1008 
1009     // output the time series;
1010     if (FLAGS_output_time_series) {
1011       for (int type = 0; type < kTaTypeNum; type++) {
1012         if (!ta_[type].enabled ||
1013             ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
1014           continue;
1015         }
1016         TraceStats& stat = ta_[type].stats[cf_id];
1017         if (!stat.time_series_f) {
1018           fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n",
1019                   ta_[type].type_name.c_str(), cf_id);
1020           continue;
1021         }
1022         while (!stat.time_series.empty()) {
1023           uint64_t key_id = 0;
1024           auto found = stat.a_key_stats.find(stat.time_series.front().key);
1025           if (found != stat.a_key_stats.end()) {
1026             key_id = found->second.key_id;
1027           }
1028           ret =
1029               snprintf(buffer_, sizeof(buffer_), "%u %" PRIu64 " %" PRIu64 "\n",
1030                        stat.time_series.front().type,
1031                        stat.time_series.front().ts, key_id);
1032           if (ret < 0) {
1033             return Status::IOError("Format the output failed");
1034           }
1035           std::string printout(buffer_);
1036           s = stat.time_series_f->Append(printout);
1037           if (!s.ok()) {
1038             fprintf(stderr, "Write time series file failed\n");
1039             return s;
1040           }
1041           stat.time_series.pop_front();
1042         }
1043       }
1044     }
1045 
1046     // process the whole key space if needed
1047     if (!FLAGS_key_space_dir.empty()) {
1048       std::string whole_key_path =
1049           FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
1050       std::string input_key, get_key;
1051       std::vector<std::string> prefix(kTaTypeNum);
1052       std::istringstream iss;
1053       bool has_data = true;
1054       std::unique_ptr<SequentialFile> wkey_input_f;
1055 
1056       s = env_->NewSequentialFile(whole_key_path, &wkey_input_f, env_options_);
1057       if (!s.ok()) {
1058         fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
1059                 cf_id);
1060         wkey_input_f.reset();
1061       }
1062 
1063       if (wkey_input_f) {
1064         std::unique_ptr<FSSequentialFile> file;
1065         file = NewLegacySequentialFileWrapper(wkey_input_f);
1066         size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
1067         SequentialFileReader sf_reader(
1068             std::move(file), whole_key_path,
1069             kTraceFileReadaheadSize /* filereadahead_size */);
1070         for (cfs_[cf_id].w_count = 0;
1071              ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s);
1072              ++cfs_[cf_id].w_count) {
1073           if (!s.ok()) {
1074             fprintf(stderr, "Read whole key space file failed\n");
1075             return s;
1076           }
1077 
1078           input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key);
1079           for (int type = 0; type < kTaTypeNum; type++) {
1080             if (!ta_[type].enabled) {
1081               continue;
1082             }
1083             TraceStats& stat = ta_[type].stats[cf_id];
1084             if (stat.w_key_f) {
1085               if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) {
1086                 ret = snprintf(buffer_, sizeof(buffer_),
1087                                "%" PRIu64 " %" PRIu64 "\n", cfs_[cf_id].w_count,
1088                                stat.a_key_stats[input_key].access_count);
1089                 if (ret < 0) {
1090                   return Status::IOError("Format the output failed");
1091                 }
1092                 std::string printout(buffer_);
1093                 s = stat.w_key_f->Append(printout);
1094                 if (!s.ok()) {
1095                   fprintf(stderr, "Write whole key space access file failed\n");
1096                   return s;
1097                 }
1098               }
1099             }
1100 
1101             // Output the prefix cut file of the whole key space
1102             if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) {
1103               if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) !=
1104                   0) {
1105                 prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut);
1106                 std::string prefix_out =
1107                     ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix[type]);
1108                 ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %s\n",
1109                                cfs_[cf_id].w_count, prefix_out.c_str());
1110                 if (ret < 0) {
1111                   return Status::IOError("Format the output failed");
1112                 }
1113                 std::string printout(buffer_);
1114                 s = stat.w_prefix_cut_f->Append(printout);
1115                 if (!s.ok()) {
1116                   fprintf(stderr,
1117                           "Write whole key space prefix cut file failed\n");
1118                   return s;
1119                 }
1120               }
1121             }
1122           }
1123 
1124           // Make the statistics fo the key size distribution
1125           if (FLAGS_output_key_distribution) {
1126             if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) ==
1127                 cfs_[cf_id].w_key_size_stats.end()) {
1128               cfs_[cf_id].w_key_size_stats[input_key.size()] = 1;
1129             } else {
1130               cfs_[cf_id].w_key_size_stats[input_key.size()]++;
1131             }
1132           }
1133         }
1134       }
1135     }
1136 
1137     // process the top k accessed keys
1138     if (FLAGS_print_top_k_access > 0) {
1139       for (int type = 0; type < kTaTypeNum; type++) {
1140         if (!ta_[type].enabled ||
1141             ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
1142           continue;
1143         }
1144         TraceStats& stat = ta_[type].stats[cf_id];
1145         for (auto& record : stat.a_key_stats) {
1146           if (static_cast<int32_t>(stat.top_k_queue.size()) <
1147               FLAGS_print_top_k_access) {
1148             stat.top_k_queue.push(
1149                 std::make_pair(record.second.access_count, record.first));
1150           } else {
1151             if (record.second.access_count > stat.top_k_queue.top().first) {
1152               stat.top_k_queue.pop();
1153               stat.top_k_queue.push(
1154                   std::make_pair(record.second.access_count, record.first));
1155             }
1156           }
1157         }
1158       }
1159     }
1160   }
1161   return Status::OK();
1162 }
1163 
1164 // End the processing, print the requested results
EndProcessing()1165 Status TraceAnalyzer::EndProcessing() {
1166   if (trace_sequence_f_) {
1167     trace_sequence_f_->Close();
1168   }
1169   if (FLAGS_no_print) {
1170     return Status::OK();
1171   }
1172   PrintStatistics();
1173   CloseOutputFiles();
1174   return Status::OK();
1175 }
1176 
1177 // Insert the corresponding key statistics to the correct type
1178 // and correct CF, output the time-series file if needed
KeyStatsInsertion(const uint32_t & type,const uint32_t & cf_id,const std::string & key,const size_t value_size,const uint64_t ts)1179 Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type,
1180                                         const uint32_t& cf_id,
1181                                         const std::string& key,
1182                                         const size_t value_size,
1183                                         const uint64_t ts) {
1184   Status s;
1185   StatsUnit unit;
1186   unit.key_id = 0;
1187   unit.cf_id = cf_id;
1188   unit.value_size = value_size;
1189   unit.access_count = 1;
1190   unit.latest_ts = ts;
1191   if (type != TraceOperationType::kGet || value_size > 0) {
1192     unit.succ_count = 1;
1193   } else {
1194     unit.succ_count = 0;
1195   }
1196   unit.v_correlation.resize(analyzer_opts_.correlation_list.size());
1197   for (int i = 0;
1198        i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) {
1199     unit.v_correlation[i].count = 0;
1200     unit.v_correlation[i].total_ts = 0;
1201   }
1202   std::string prefix;
1203   if (FLAGS_output_prefix_cut > 0) {
1204     prefix = key.substr(0, FLAGS_output_prefix_cut);
1205   }
1206 
1207   if (begin_time_ == 0) {
1208     begin_time_ = ts;
1209   }
1210   uint32_t time_in_sec;
1211   if (ts < begin_time_) {
1212     time_in_sec = 0;
1213   } else {
1214     time_in_sec = static_cast<uint32_t>((ts - begin_time_) / 1000000);
1215   }
1216 
1217   uint64_t dist_value_size = value_size / FLAGS_value_interval;
1218   auto found_stats = ta_[type].stats.find(cf_id);
1219   if (found_stats == ta_[type].stats.end()) {
1220     ta_[type].stats[cf_id].cf_id = cf_id;
1221     ta_[type].stats[cf_id].cf_name = std::to_string(cf_id);
1222     ta_[type].stats[cf_id].a_count = 1;
1223     ta_[type].stats[cf_id].a_key_id = 0;
1224     ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow(
1225         static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
1226     ta_[type].stats[cf_id].a_key_size_sum = key.size();
1227     ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow(
1228         static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
1229     ta_[type].stats[cf_id].a_value_size_sum = value_size;
1230     s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]);
1231     if (!FLAGS_print_correlation.empty()) {
1232       s = StatsUnitCorrelationUpdate(unit, type, ts, key);
1233     }
1234     ta_[type].stats[cf_id].a_key_stats[key] = unit;
1235     ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1;
1236     ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1;
1237     ta_[type].stats[cf_id].correlation_output.resize(
1238         analyzer_opts_.correlation_list.size());
1239     if (FLAGS_output_prefix_cut > 0) {
1240       std::map<std::string, uint32_t> tmp_qps_map;
1241       tmp_qps_map[prefix] = 1;
1242       ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
1243     }
1244     if (time_in_sec != cur_time_sec_) {
1245       ta_[type].stats[cf_id].uni_key_num[cur_time_sec_] =
1246           static_cast<uint64_t>(ta_[type].stats[cf_id].a_key_stats.size());
1247       cur_time_sec_ = time_in_sec;
1248     }
1249   } else {
1250     found_stats->second.a_count++;
1251     found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow(
1252         static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
1253     found_stats->second.a_key_size_sum += key.size();
1254     found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow(
1255         static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
1256     found_stats->second.a_value_size_sum += value_size;
1257     auto found_key = found_stats->second.a_key_stats.find(key);
1258     if (found_key == found_stats->second.a_key_stats.end()) {
1259       found_stats->second.a_key_stats[key] = unit;
1260     } else {
1261       found_key->second.access_count++;
1262       if (type != TraceOperationType::kGet || value_size > 0) {
1263         found_key->second.succ_count++;
1264       }
1265       if (!FLAGS_print_correlation.empty()) {
1266         s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key);
1267       }
1268     }
1269     if (time_in_sec != cur_time_sec_) {
1270       found_stats->second.uni_key_num[cur_time_sec_] =
1271           static_cast<uint64_t>(found_stats->second.a_key_stats.size());
1272       cur_time_sec_ = time_in_sec;
1273     }
1274 
1275     auto found_value =
1276         found_stats->second.a_value_size_stats.find(dist_value_size);
1277     if (found_value == found_stats->second.a_value_size_stats.end()) {
1278       found_stats->second.a_value_size_stats[dist_value_size] = 1;
1279     } else {
1280       found_value->second++;
1281     }
1282 
1283     auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec);
1284     if (found_qps == found_stats->second.a_qps_stats.end()) {
1285       found_stats->second.a_qps_stats[time_in_sec] = 1;
1286     } else {
1287       found_qps->second++;
1288     }
1289 
1290     if (FLAGS_output_prefix_cut > 0) {
1291       auto found_qps_prefix =
1292           found_stats->second.a_qps_prefix_stats.find(time_in_sec);
1293       if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) {
1294         std::map<std::string, uint32_t> tmp_qps_map;
1295         found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
1296       }
1297       if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) ==
1298           found_stats->second.a_qps_prefix_stats[time_in_sec].end()) {
1299         found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1;
1300       } else {
1301         found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++;
1302       }
1303     }
1304   }
1305 
1306   if (cfs_.find(cf_id) == cfs_.end()) {
1307     CfUnit cf_unit;
1308     cf_unit.cf_id = cf_id;
1309     cf_unit.w_count = 0;
1310     cf_unit.a_count = 0;
1311     cfs_[cf_id] = cf_unit;
1312   }
1313 
1314   if (FLAGS_output_qps_stats) {
1315     cfs_[cf_id].cf_qps[time_in_sec]++;
1316   }
1317 
1318   if (FLAGS_output_time_series) {
1319     TraceUnit trace_u;
1320     trace_u.type = type;
1321     trace_u.key = key;
1322     trace_u.value_size = value_size;
1323     trace_u.ts = (ts - time_series_start_) / 1000000;
1324     trace_u.cf_id = cf_id;
1325     ta_[type].stats[cf_id].time_series.push_back(trace_u);
1326   }
1327 
1328   return Status::OK();
1329 }
1330 
1331 // Update the correlation unit of each key if enabled
StatsUnitCorrelationUpdate(StatsUnit & unit,const uint32_t & type_second,const uint64_t & ts,const std::string & key)1332 Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit,
1333                                                  const uint32_t& type_second,
1334                                                  const uint64_t& ts,
1335                                                  const std::string& key) {
1336   if (type_second >= kTaTypeNum) {
1337     fprintf(stderr, "Unknown Type Id: %u\n", type_second);
1338     return Status::NotFound();
1339   }
1340 
1341   for (int type_first = 0; type_first < kTaTypeNum; type_first++) {
1342     if (type_first >= static_cast<int>(ta_.size()) ||
1343         type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) {
1344       break;
1345     }
1346     if (analyzer_opts_.correlation_map[type_first][type_second] < 0 ||
1347         ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() ||
1348         ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) ==
1349             ta_[type_first].stats[unit.cf_id].a_key_stats.end() ||
1350         ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) {
1351       continue;
1352     }
1353 
1354     int correlation_id =
1355         analyzer_opts_.correlation_map[type_first][type_second];
1356 
1357     // after get the x-y operation time or x, update;
1358     if (correlation_id < 0 ||
1359         correlation_id >= static_cast<int>(unit.v_correlation.size())) {
1360       continue;
1361     }
1362     unit.v_correlation[correlation_id].count++;
1363     unit.v_correlation[correlation_id].total_ts +=
1364         (ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts);
1365   }
1366 
1367   unit.latest_ts = ts;
1368   return Status::OK();
1369 }
1370 
1371 // when a new trace statistic is created, the file handler
1372 // pointers should be initiated if needed according to
1373 // the trace analyzer options
OpenStatsOutputFiles(const std::string & type,TraceStats & new_stats)1374 Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type,
1375                                            TraceStats& new_stats) {
1376   Status s;
1377   if (FLAGS_output_key_stats) {
1378     s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt",
1379                          &new_stats.a_key_f);
1380     s = CreateOutputFile(type, new_stats.cf_name,
1381                          "accessed_unique_key_num_change.txt",
1382                          &new_stats.a_key_num_f);
1383     if (!FLAGS_key_space_dir.empty()) {
1384       s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt",
1385                            &new_stats.w_key_f);
1386     }
1387   }
1388 
1389   if (FLAGS_output_access_count_stats) {
1390     s = CreateOutputFile(type, new_stats.cf_name,
1391                          "accessed_key_count_distribution.txt",
1392                          &new_stats.a_count_dist_f);
1393   }
1394 
1395   if (FLAGS_output_prefix_cut > 0) {
1396     s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt",
1397                          &new_stats.a_prefix_cut_f);
1398     if (!FLAGS_key_space_dir.empty()) {
1399       s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt",
1400                            &new_stats.w_prefix_cut_f);
1401     }
1402 
1403     if (FLAGS_output_qps_stats) {
1404       s = CreateOutputFile(type, new_stats.cf_name,
1405                            "accessed_top_k_qps_prefix_cut.txt",
1406                            &new_stats.a_top_qps_prefix_f);
1407     }
1408   }
1409 
1410   if (FLAGS_output_time_series) {
1411     s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt",
1412                          &new_stats.time_series_f);
1413   }
1414 
1415   if (FLAGS_output_value_distribution) {
1416     s = CreateOutputFile(type, new_stats.cf_name,
1417                          "accessed_value_size_distribution.txt",
1418                          &new_stats.a_value_size_f);
1419   }
1420 
1421   if (FLAGS_output_key_distribution) {
1422     s = CreateOutputFile(type, new_stats.cf_name,
1423                          "accessed_key_size_distribution.txt",
1424                          &new_stats.a_key_size_f);
1425   }
1426 
1427   if (FLAGS_output_qps_stats) {
1428     s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt",
1429                          &new_stats.a_qps_f);
1430   }
1431 
1432   return Status::OK();
1433 }
1434 
1435 // create the output path of the files to be opened
CreateOutputFile(const std::string & type,const std::string & cf_name,const std::string & ending,std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> * f_ptr)1436 Status TraceAnalyzer::CreateOutputFile(
1437     const std::string& type, const std::string& cf_name,
1438     const std::string& ending,
1439     std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr) {
1440   std::string path;
1441   path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name +
1442          "-" + ending;
1443   Status s;
1444   s = env_->NewWritableFile(path, f_ptr, env_options_);
1445   if (!s.ok()) {
1446     fprintf(stderr, "Cannot open file: %s\n", path.c_str());
1447     exit(1);
1448   }
1449   return Status::OK();
1450 }
1451 
1452 // Close the output files in the TraceStats if they are opened
CloseOutputFiles()1453 void TraceAnalyzer::CloseOutputFiles() {
1454   for (int type = 0; type < kTaTypeNum; type++) {
1455     if (!ta_[type].enabled) {
1456       continue;
1457     }
1458     for (auto& stat : ta_[type].stats) {
1459       if (stat.second.time_series_f) {
1460         stat.second.time_series_f->Close();
1461       }
1462 
1463       if (stat.second.a_key_f) {
1464         stat.second.a_key_f->Close();
1465       }
1466 
1467       if (stat.second.a_key_num_f) {
1468         stat.second.a_key_num_f->Close();
1469       }
1470 
1471       if (stat.second.a_count_dist_f) {
1472         stat.second.a_count_dist_f->Close();
1473       }
1474 
1475       if (stat.second.a_prefix_cut_f) {
1476         stat.second.a_prefix_cut_f->Close();
1477       }
1478 
1479       if (stat.second.a_value_size_f) {
1480         stat.second.a_value_size_f->Close();
1481       }
1482 
1483       if (stat.second.a_key_size_f) {
1484         stat.second.a_key_size_f->Close();
1485       }
1486 
1487       if (stat.second.a_qps_f) {
1488         stat.second.a_qps_f->Close();
1489       }
1490 
1491       if (stat.second.a_top_qps_prefix_f) {
1492         stat.second.a_top_qps_prefix_f->Close();
1493       }
1494 
1495       if (stat.second.w_key_f) {
1496         stat.second.w_key_f->Close();
1497       }
1498       if (stat.second.w_prefix_cut_f) {
1499         stat.second.w_prefix_cut_f->Close();
1500       }
1501     }
1502   }
1503   return;
1504 }
1505 
1506 // Handle the Get request in the trace
HandleGet(uint32_t column_family_id,const std::string & key,const uint64_t & ts,const uint32_t & get_ret)1507 Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
1508                                 const std::string& key, const uint64_t& ts,
1509                                 const uint32_t& get_ret) {
1510   Status s;
1511   size_t value_size = 0;
1512   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1513     s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
1514                            value_size, ts);
1515     if (!s.ok()) {
1516       return Status::Corruption("Failed to write the trace sequence to file");
1517     }
1518   }
1519 
1520   if (ta_[TraceOperationType::kGet].sample_count >= sample_max_) {
1521     ta_[TraceOperationType::kGet].sample_count = 0;
1522   }
1523   if (ta_[TraceOperationType::kGet].sample_count > 0) {
1524     ta_[TraceOperationType::kGet].sample_count++;
1525     return Status::OK();
1526   }
1527   ta_[TraceOperationType::kGet].sample_count++;
1528 
1529   if (!ta_[TraceOperationType::kGet].enabled) {
1530     return Status::OK();
1531   }
1532   if (get_ret == 1) {
1533     value_size = 10;
1534   }
1535   s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
1536                         value_size, ts);
1537   if (!s.ok()) {
1538     return Status::Corruption("Failed to insert key statistics");
1539   }
1540   return s;
1541 }
1542 
1543 // Handle the Put request in the write batch of the trace
HandlePut(uint32_t column_family_id,const Slice & key,const Slice & value)1544 Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
1545                                 const Slice& value) {
1546   Status s;
1547   size_t value_size = value.ToString().size();
1548   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1549     s = WriteTraceSequence(TraceOperationType::kPut, column_family_id,
1550                            key.ToString(), value_size, c_time_);
1551     if (!s.ok()) {
1552       return Status::Corruption("Failed to write the trace sequence to file");
1553     }
1554   }
1555 
1556   if (ta_[TraceOperationType::kPut].sample_count >= sample_max_) {
1557     ta_[TraceOperationType::kPut].sample_count = 0;
1558   }
1559   if (ta_[TraceOperationType::kPut].sample_count > 0) {
1560     ta_[TraceOperationType::kPut].sample_count++;
1561     return Status::OK();
1562   }
1563   ta_[TraceOperationType::kPut].sample_count++;
1564 
1565   if (!ta_[TraceOperationType::kPut].enabled) {
1566     return Status::OK();
1567   }
1568   s = KeyStatsInsertion(TraceOperationType::kPut, column_family_id,
1569                         key.ToString(), value_size, c_time_);
1570   if (!s.ok()) {
1571     return Status::Corruption("Failed to insert key statistics");
1572   }
1573   return s;
1574 }
1575 
1576 // Handle the Delete request in the write batch of the trace
HandleDelete(uint32_t column_family_id,const Slice & key)1577 Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
1578                                    const Slice& key) {
1579   Status s;
1580   size_t value_size = 0;
1581   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1582     s = WriteTraceSequence(TraceOperationType::kDelete, column_family_id,
1583                            key.ToString(), value_size, c_time_);
1584     if (!s.ok()) {
1585       return Status::Corruption("Failed to write the trace sequence to file");
1586     }
1587   }
1588 
1589   if (ta_[TraceOperationType::kDelete].sample_count >= sample_max_) {
1590     ta_[TraceOperationType::kDelete].sample_count = 0;
1591   }
1592   if (ta_[TraceOperationType::kDelete].sample_count > 0) {
1593     ta_[TraceOperationType::kDelete].sample_count++;
1594     return Status::OK();
1595   }
1596   ta_[TraceOperationType::kDelete].sample_count++;
1597 
1598   if (!ta_[TraceOperationType::kDelete].enabled) {
1599     return Status::OK();
1600   }
1601   s = KeyStatsInsertion(TraceOperationType::kDelete, column_family_id,
1602                         key.ToString(), value_size, c_time_);
1603   if (!s.ok()) {
1604     return Status::Corruption("Failed to insert key statistics");
1605   }
1606   return s;
1607 }
1608 
1609 // Handle the SingleDelete request in the write batch of the trace
HandleSingleDelete(uint32_t column_family_id,const Slice & key)1610 Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
1611                                          const Slice& key) {
1612   Status s;
1613   size_t value_size = 0;
1614   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1615     s = WriteTraceSequence(TraceOperationType::kSingleDelete, column_family_id,
1616                            key.ToString(), value_size, c_time_);
1617     if (!s.ok()) {
1618       return Status::Corruption("Failed to write the trace sequence to file");
1619     }
1620   }
1621 
1622   if (ta_[TraceOperationType::kSingleDelete].sample_count >= sample_max_) {
1623     ta_[TraceOperationType::kSingleDelete].sample_count = 0;
1624   }
1625   if (ta_[TraceOperationType::kSingleDelete].sample_count > 0) {
1626     ta_[TraceOperationType::kSingleDelete].sample_count++;
1627     return Status::OK();
1628   }
1629   ta_[TraceOperationType::kSingleDelete].sample_count++;
1630 
1631   if (!ta_[TraceOperationType::kSingleDelete].enabled) {
1632     return Status::OK();
1633   }
1634   s = KeyStatsInsertion(TraceOperationType::kSingleDelete, column_family_id,
1635                         key.ToString(), value_size, c_time_);
1636   if (!s.ok()) {
1637     return Status::Corruption("Failed to insert key statistics");
1638   }
1639   return s;
1640 }
1641 
1642 // Handle the DeleteRange request in the write batch of the trace
HandleDeleteRange(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1643 Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
1644                                         const Slice& begin_key,
1645                                         const Slice& end_key) {
1646   Status s;
1647   size_t value_size = 0;
1648   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1649     s = WriteTraceSequence(TraceOperationType::kRangeDelete, column_family_id,
1650                            begin_key.ToString(), value_size, c_time_);
1651     if (!s.ok()) {
1652       return Status::Corruption("Failed to write the trace sequence to file");
1653     }
1654   }
1655 
1656   if (ta_[TraceOperationType::kRangeDelete].sample_count >= sample_max_) {
1657     ta_[TraceOperationType::kRangeDelete].sample_count = 0;
1658   }
1659   if (ta_[TraceOperationType::kRangeDelete].sample_count > 0) {
1660     ta_[TraceOperationType::kRangeDelete].sample_count++;
1661     return Status::OK();
1662   }
1663   ta_[TraceOperationType::kRangeDelete].sample_count++;
1664 
1665   if (!ta_[TraceOperationType::kRangeDelete].enabled) {
1666     return Status::OK();
1667   }
1668   s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
1669                         begin_key.ToString(), value_size, c_time_);
1670   s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
1671                         end_key.ToString(), value_size, c_time_);
1672   if (!s.ok()) {
1673     return Status::Corruption("Failed to insert key statistics");
1674   }
1675   return s;
1676 }
1677 
1678 // Handle the Merge request in the write batch of the trace
HandleMerge(uint32_t column_family_id,const Slice & key,const Slice & value)1679 Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
1680                                   const Slice& value) {
1681   Status s;
1682   size_t value_size = value.ToString().size();
1683   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1684     s = WriteTraceSequence(TraceOperationType::kMerge, column_family_id,
1685                            key.ToString(), value_size, c_time_);
1686     if (!s.ok()) {
1687       return Status::Corruption("Failed to write the trace sequence to file");
1688     }
1689   }
1690 
1691   if (ta_[TraceOperationType::kMerge].sample_count >= sample_max_) {
1692     ta_[TraceOperationType::kMerge].sample_count = 0;
1693   }
1694   if (ta_[TraceOperationType::kMerge].sample_count > 0) {
1695     ta_[TraceOperationType::kMerge].sample_count++;
1696     return Status::OK();
1697   }
1698   ta_[TraceOperationType::kMerge].sample_count++;
1699 
1700   if (!ta_[TraceOperationType::kMerge].enabled) {
1701     return Status::OK();
1702   }
1703   s = KeyStatsInsertion(TraceOperationType::kMerge, column_family_id,
1704                         key.ToString(), value_size, c_time_);
1705   if (!s.ok()) {
1706     return Status::Corruption("Failed to insert key statistics");
1707   }
1708   return s;
1709 }
1710 
1711 // Handle the Iterator request in the trace
HandleIter(uint32_t column_family_id,const std::string & key,const uint64_t & ts,TraceType & trace_type)1712 Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
1713                                  const std::string& key, const uint64_t& ts,
1714                                  TraceType& trace_type) {
1715   Status s;
1716   size_t value_size = 0;
1717   int type = -1;
1718   if (trace_type == kTraceIteratorSeek) {
1719     type = TraceOperationType::kIteratorSeek;
1720   } else if (trace_type == kTraceIteratorSeekForPrev) {
1721     type = TraceOperationType::kIteratorSeekForPrev;
1722   } else {
1723     return s;
1724   }
1725   if (type == -1) {
1726     return s;
1727   }
1728 
1729   if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1730     s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
1731     if (!s.ok()) {
1732       return Status::Corruption("Failed to write the trace sequence to file");
1733     }
1734   }
1735 
1736   if (ta_[type].sample_count >= sample_max_) {
1737     ta_[type].sample_count = 0;
1738   }
1739   if (ta_[type].sample_count > 0) {
1740     ta_[type].sample_count++;
1741     return Status::OK();
1742   }
1743   ta_[type].sample_count++;
1744 
1745   if (!ta_[type].enabled) {
1746     return Status::OK();
1747   }
1748   s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
1749   if (!s.ok()) {
1750     return Status::Corruption("Failed to insert key statistics");
1751   }
1752   return s;
1753 }
1754 
1755 // Before the analyzer is closed, the requested general statistic results are
1756 // printed out here. In current stage, these information are not output to
1757 // the files.
1758 // -----type
1759 //          |__cf_id
1760 //                |_statistics
PrintStatistics()1761 void TraceAnalyzer::PrintStatistics() {
1762   for (int type = 0; type < kTaTypeNum; type++) {
1763     if (!ta_[type].enabled) {
1764       continue;
1765     }
1766     ta_[type].total_keys = 0;
1767     ta_[type].total_access = 0;
1768     ta_[type].total_succ_access = 0;
1769     printf("\n################# Operation Type: %s #####################\n",
1770            ta_[type].type_name.c_str());
1771     if (qps_ave_.size() == kTaTypeNum + 1) {
1772       printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type],
1773              qps_ave_[type]);
1774     }
1775     for (auto& stat_it : ta_[type].stats) {
1776       if (stat_it.second.a_count == 0) {
1777         continue;
1778       }
1779       TraceStats& stat = stat_it.second;
1780       uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size());
1781       double key_size_ave = 0.0;
1782       double value_size_ave = 0.0;
1783       double key_size_vari = 0.0;
1784       double value_size_vari = 0.0;
1785       if (stat.a_count > 0) {
1786         key_size_ave =
1787             (static_cast<double>(stat.a_key_size_sum)) / stat.a_count;
1788         value_size_ave =
1789             (static_cast<double>(stat.a_value_size_sum)) / stat.a_count;
1790         key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) /
1791                                       stat.a_count -
1792                                   key_size_ave * key_size_ave);
1793         value_size_vari = std::sqrt(
1794             (static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count -
1795             value_size_ave * value_size_ave);
1796       }
1797       if (value_size_ave == 0.0) {
1798         stat.a_value_mid = 0;
1799       }
1800       cfs_[stat.cf_id].a_count += total_a_keys;
1801       ta_[type].total_keys += total_a_keys;
1802       ta_[type].total_access += stat.a_count;
1803       ta_[type].total_succ_access += stat.a_succ_count;
1804       printf("*********************************************************\n");
1805       printf("colume family id: %u\n", stat.cf_id);
1806       printf("Total number of queries to this cf by %s: %" PRIu64 "\n",
1807              ta_[type].type_name.c_str(), stat.a_count);
1808       printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys);
1809       printf("Average key size: %f key size medium: %" PRIu64
1810              " Key size Variation: %f\n",
1811              key_size_ave, stat.a_key_mid, key_size_vari);
1812       if (type == kPut || type == kMerge) {
1813         printf("Average value size: %f Value size medium: %" PRIu64
1814                " Value size variation: %f\n",
1815                value_size_ave, stat.a_value_mid, value_size_vari);
1816       }
1817       printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps,
1818              stat.a_ave_qps);
1819 
1820       // print the top k accessed key and its access count
1821       if (FLAGS_print_top_k_access > 0) {
1822         printf("The Top %d keys that are accessed:\n",
1823                FLAGS_print_top_k_access);
1824         while (!stat.top_k_queue.empty()) {
1825           std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(
1826               stat.top_k_queue.top().second);
1827           printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first,
1828                  hex_key.c_str());
1829           stat.top_k_queue.pop();
1830         }
1831       }
1832 
1833       // print the top k access prefix range and
1834       // top k prefix range with highest average access per key
1835       if (FLAGS_output_prefix_cut > 0) {
1836         printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access);
1837         while (!stat.top_k_prefix_access.empty()) {
1838           printf("Prefix: %s Access count: %" PRIu64 "\n",
1839                  stat.top_k_prefix_access.top().second.c_str(),
1840                  stat.top_k_prefix_access.top().first);
1841           stat.top_k_prefix_access.pop();
1842         }
1843 
1844         printf("The Top %d prefix with highest access per key:\n",
1845                FLAGS_print_top_k_access);
1846         while (!stat.top_k_prefix_ave.empty()) {
1847           printf("Prefix: %s access per key: %f\n",
1848                  stat.top_k_prefix_ave.top().second.c_str(),
1849                  stat.top_k_prefix_ave.top().first);
1850           stat.top_k_prefix_ave.pop();
1851         }
1852       }
1853 
1854       // print the operation correlations
1855       if (!FLAGS_print_correlation.empty()) {
1856         for (int correlation = 0;
1857              correlation <
1858              static_cast<int>(analyzer_opts_.correlation_list.size());
1859              correlation++) {
1860           printf(
1861               "The correlation statistics of '%s' after '%s' is:",
1862               taIndexToOpt[analyzer_opts_.correlation_list[correlation].second]
1863                   .c_str(),
1864               taIndexToOpt[analyzer_opts_.correlation_list[correlation].first]
1865                   .c_str());
1866           double correlation_ave = 0.0;
1867           if (stat.correlation_output[correlation].first > 0) {
1868             correlation_ave =
1869                 (static_cast<double>(
1870                     stat.correlation_output[correlation].second)) /
1871                 (stat.correlation_output[correlation].first * 1000);
1872           }
1873           printf(" total numbers: %" PRIu64 " average time: %f(ms)\n",
1874                  stat.correlation_output[correlation].first, correlation_ave);
1875         }
1876       }
1877     }
1878     printf("*********************************************************\n");
1879     printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(),
1880            ta_[type].total_keys);
1881     printf("Total access is: %" PRIu64 "\n", ta_[type].total_access);
1882     total_access_keys_ += ta_[type].total_keys;
1883   }
1884 
1885   // Print the overall statistic information of the trace
1886   printf("\n*********************************************************\n");
1887   printf("*********************************************************\n");
1888   printf("The column family based statistics\n");
1889   for (auto& cf : cfs_) {
1890     printf("The column family id: %u\n", cf.first);
1891     printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count);
1892     printf("The accessed key space key numbers: %" PRIu64 "\n",
1893            cf.second.a_count);
1894   }
1895 
1896   if (FLAGS_print_overall_stats) {
1897     printf("\n*********************************************************\n");
1898     printf("*********************************************************\n");
1899     if (qps_peak_.size() == kTaTypeNum + 1) {
1900       printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum],
1901              qps_peak_[kTaTypeNum]);
1902     }
1903     printf("The statistics related to query number need to times: %u\n",
1904            sample_max_);
1905     printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
1906            " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
1907            total_requests_, total_access_keys_, total_gets_, total_writes_);
1908     for (int type = 0; type < kTaTypeNum; type++) {
1909       if (!ta_[type].enabled) {
1910         continue;
1911       }
1912       printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(),
1913              ta_[type].total_access);
1914     }
1915   }
1916 }
1917 
1918 // Write the trace sequence to file
WriteTraceSequence(const uint32_t & type,const uint32_t & cf_id,const std::string & key,const size_t value_size,const uint64_t ts)1919 Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
1920                                          const uint32_t& cf_id,
1921                                          const std::string& key,
1922                                          const size_t value_size,
1923                                          const uint64_t ts) {
1924   std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key);
1925   int ret;
1926   ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
1927                  cf_id, value_size, ts);
1928   if (ret < 0) {
1929     return Status::IOError("failed to format the output");
1930   }
1931   std::string printout(buffer_);
1932   if (!FLAGS_no_key) {
1933     printout = hex_key + " " + printout;
1934   }
1935   return trace_sequence_f_->Append(printout);
1936 }
1937 
1938 // The entrance function of Trace_Analyzer
trace_analyzer_tool(int argc,char ** argv)1939 int trace_analyzer_tool(int argc, char** argv) {
1940   std::string trace_path;
1941   std::string output_path;
1942 
1943   AnalyzerOptions analyzer_opts;
1944 
1945   ParseCommandLineFlags(&argc, &argv, true);
1946 
1947   if (!FLAGS_print_correlation.empty()) {
1948     analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation);
1949   }
1950 
1951   std::unique_ptr<TraceAnalyzer> analyzer(
1952       new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts));
1953 
1954   if (!analyzer) {
1955     fprintf(stderr, "Cannot initiate the trace analyzer\n");
1956     exit(1);
1957   }
1958 
1959   ROCKSDB_NAMESPACE::Status s = analyzer->PrepareProcessing();
1960   if (!s.ok()) {
1961     fprintf(stderr, "%s\n", s.getState());
1962     fprintf(stderr, "Cannot initiate the trace reader\n");
1963     exit(1);
1964   }
1965 
1966   s = analyzer->StartProcessing();
1967   if (!s.ok() && !FLAGS_try_process_corrupted_trace) {
1968     fprintf(stderr, "%s\n", s.getState());
1969     fprintf(stderr, "Cannot processing the trace\n");
1970     exit(1);
1971   }
1972 
1973   s = analyzer->MakeStatistics();
1974   if (!s.ok()) {
1975     fprintf(stderr, "%s\n", s.getState());
1976     analyzer->EndProcessing();
1977     fprintf(stderr, "Cannot make the statistics\n");
1978     exit(1);
1979   }
1980 
1981   s = analyzer->ReProcessing();
1982   if (!s.ok()) {
1983     fprintf(stderr, "%s\n", s.getState());
1984     fprintf(stderr, "Cannot re-process the trace for more statistics\n");
1985     analyzer->EndProcessing();
1986     exit(1);
1987   }
1988 
1989   s = analyzer->EndProcessing();
1990   if (!s.ok()) {
1991     fprintf(stderr, "%s\n", s.getState());
1992     fprintf(stderr, "Cannot close the trace analyzer\n");
1993     exit(1);
1994   }
1995 
1996   return 0;
1997 }
1998 }  // namespace ROCKSDB_NAMESPACE
1999 
2000 #endif  // Endif of Gflag
2001 #endif  // RocksDB LITE
2002