1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #ifndef ROCKSDB_LITE
8
9 #ifdef GFLAGS
10 #ifdef NUMA
11 #include <numa.h>
12 #include <numaif.h>
13 #endif
14 #ifndef OS_WIN
15 #include <unistd.h>
16 #endif
17
18 #include <cinttypes>
19 #include <cmath>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <memory>
23 #include <sstream>
24 #include <stdexcept>
25
26 #include "db/db_impl/db_impl.h"
27 #include "db/memtable.h"
28 #include "db/write_batch_internal.h"
29 #include "env/composite_env_wrapper.h"
30 #include "file/read_write_util.h"
31 #include "file/writable_file_writer.h"
32 #include "options/cf_options.h"
33 #include "rocksdb/db.h"
34 #include "rocksdb/env.h"
35 #include "rocksdb/iterator.h"
36 #include "rocksdb/slice.h"
37 #include "rocksdb/slice_transform.h"
38 #include "rocksdb/status.h"
39 #include "rocksdb/table_properties.h"
40 #include "rocksdb/utilities/ldb_cmd.h"
41 #include "rocksdb/write_batch.h"
42 #include "table/meta_blocks.h"
43 #include "table/plain/plain_table_factory.h"
44 #include "table/table_reader.h"
45 #include "tools/trace_analyzer_tool.h"
46 #include "trace_replay/trace_replay.h"
47 #include "util/coding.h"
48 #include "util/compression.h"
49 #include "util/gflags_compat.h"
50 #include "util/random.h"
51 #include "util/string_util.h"
52
53 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
54 using GFLAGS_NAMESPACE::RegisterFlagValidator;
55 using GFLAGS_NAMESPACE::SetUsageMessage;
56
57 DEFINE_string(trace_path, "", "The trace file path.");
58 DEFINE_string(output_dir, "", "The directory to store the output files.");
59 DEFINE_string(output_prefix, "trace",
60 "The prefix used for all the output files.");
61 DEFINE_bool(output_key_stats, false,
62 "Output the key access count statistics to file\n"
63 "for accessed keys:\n"
64 "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n"
65 "Format:[cf_id value_size access_keyid access_count]\n"
66 "for the whole key space keys:\n"
67 "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n"
68 "Format:[whole_key_space_keyid access_count]");
69 DEFINE_bool(output_access_count_stats, false,
70 "Output the access count distribution statistics to file.\n"
71 "File name: <prefix>-<query_type>-<cf_id>-accessed_"
72 "key_count_distribution.txt \n"
73 "Format:[access_count number_of_access_count]");
74 DEFINE_bool(output_time_series, false,
75 "Output the access time in second of each key, "
76 "such that we can have the time series data of the queries \n"
77 "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n"
78 "Format:[type_id time_in_sec access_keyid].");
79 DEFINE_bool(try_process_corrupted_trace, false,
80 "In default, trace_analyzer will exit if the trace file is "
81 "corrupted due to the unexpected tracing cases. If this option "
82 "is enabled, trace_analyzer will stop reading the trace file, "
83 "and start analyzing the read-in data.");
84 DEFINE_int32(output_prefix_cut, 0,
85 "The number of bytes as prefix to cut the keys.\n"
86 "If it is enabled, it will generate the following:\n"
87 "For accessed keys:\n"
88 "File name: <prefix>-<query_type>-<cf_id>-"
89 "accessed_key_prefix_cut.txt \n"
90 "Format:[acessed_keyid access_count_of_prefix "
91 "number_of_keys_in_prefix average_key_access "
92 "prefix_succ_ratio prefix]\n"
93 "For whole key space keys:\n"
94 "File name: <prefix>-<query_type>-<cf_id>"
95 "-whole_key_prefix_cut.txt\n"
96 "Format:[start_keyid_in_whole_keyspace prefix]\n"
97 "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
98 "File name: <prefix>-<query_type>-<cf_id>"
99 "-accessed_top_k_qps_prefix_cut.txt\n"
100 "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
101 DEFINE_bool(convert_to_human_readable_trace, false,
102 "Convert the binary trace file to a human readable txt file "
103 "for further processing. "
104 "This file will be extremely large "
105 "(similar size as the original binary trace file). "
106 "You can specify 'no_key' to reduce the size, if key is not "
107 "needed in the next step.\n"
108 "File name: <prefix>_human_readable_trace.txt\n"
109 "Format:[type_id cf_id value_size time_in_micorsec <key>].");
110 DEFINE_bool(output_qps_stats, false,
111 "Output the query per second(qps) statistics \n"
112 "For the overall qps, it will contain all qps of each query type. "
113 "The time is started from the first trace record\n"
114 "File name: <prefix>_qps_stats.txt\n"
115 "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
116 "For each cf and query, it will have its own qps output.\n"
117 "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n"
118 "Format:[query_count_in_this_second].");
119 DEFINE_bool(no_print, false, "Do not print out any result");
120 DEFINE_string(
121 print_correlation, "",
122 "intput format: [correlation pairs][.,.]\n"
123 "Output the query correlations between the pairs of query types "
124 "listed in the parameter, input should select the operations from:\n"
125 "get, put, delete, single_delete, rangle_delete, merge. No space "
126 "between the pairs separated by commar. Example: =[get,get]... "
127 "It will print out the number of pairs of 'A after B' and "
128 "the average time interval between the two query.");
129 DEFINE_string(key_space_dir, "",
130 "<the directory stores full key space files> \n"
131 "The key space files should be: <column family id>.txt");
132 DEFINE_bool(analyze_get, false, "Analyze the Get query.");
133 DEFINE_bool(analyze_put, false, "Analyze the Put query.");
134 DEFINE_bool(analyze_delete, false, "Analyze the Delete query.");
135 DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
136 DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
137 DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
138 DEFINE_bool(analyze_iterator, false,
139 " Analyze the iterate query like seek() and seekForPrev().");
140 DEFINE_bool(no_key, false,
141 " Does not output the key to the result files to make smaller.");
142 DEFINE_bool(print_overall_stats, true,
143 " Print the stats of the whole trace, "
144 "like total requests, keys, and etc.");
145 DEFINE_bool(output_key_distribution, false, "Print the key size distribution.");
146 DEFINE_bool(
147 output_value_distribution, false,
148 "Out put the value size distribution, only available for Put and Merge.\n"
149 "File name: <prefix>-<query_type>-<cf_id>"
150 "-accessed_value_size_distribution.txt\n"
151 "Format:[Number_of_value_size_between x and "
152 "x+value_interval is: <the count>]");
153 DEFINE_int32(print_top_k_access, 1,
154 "<top K of the variables to be printed> "
155 "Print the top k accessed keys, top k accessed prefix "
156 "and etc.");
157 DEFINE_int32(output_ignore_count, 0,
158 "<threshold>, ignores the access count <= this value, "
159 "it will shorter the output.");
160 DEFINE_int32(value_interval, 8,
161 "To output the value distribution, we need to set the value "
162 "intervals and make the statistic of the value size distribution "
163 "in different intervals. The default is 8.");
164 DEFINE_double(sample_ratio, 1.0,
165 "If the trace size is extremely huge or user want to sample "
166 "the trace when analyzing, sample ratio can be set (0, 1.0]");
167
168 namespace ROCKSDB_NAMESPACE {
169
170 std::map<std::string, int> taOptToIndex = {
171 {"get", 0}, {"put", 1},
172 {"delete", 2}, {"single_delete", 3},
173 {"range_delete", 4}, {"merge", 5},
174 {"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}};
175
176 std::map<int, std::string> taIndexToOpt = {
177 {0, "get"}, {1, "put"},
178 {2, "delete"}, {3, "single_delete"},
179 {4, "range_delete"}, {5, "merge"},
180 {6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}};
181
182 namespace {
183
MultiplyCheckOverflow(uint64_t op1,uint64_t op2)184 uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
185 if (op1 == 0 || op2 == 0) {
186 return 0;
187 }
188 if (port::kMaxUint64 / op1 < op2) {
189 return op1;
190 }
191 return (op1 * op2);
192 }
193
DecodeCFAndKeyFromString(std::string & buffer,uint32_t * cf_id,Slice * key)194 void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
195 Slice buf(buffer);
196 GetFixed32(&buf, cf_id);
197 GetLengthPrefixedSlice(&buf, key);
198 }
199
200 } // namespace
201
202 // The default constructor of AnalyzerOptions
AnalyzerOptions()203 AnalyzerOptions::AnalyzerOptions()
204 : correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {}
205
~AnalyzerOptions()206 AnalyzerOptions::~AnalyzerOptions() {}
207
SparseCorrelationInput(const std::string & in_str)208 void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) {
209 std::string cur = in_str;
210 if (cur.size() == 0) {
211 return;
212 }
213 while (!cur.empty()) {
214 if (cur.compare(0, 1, "[") != 0) {
215 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
216 exit(1);
217 }
218 std::string opt1, opt2;
219 std::size_t split = cur.find_first_of(",");
220 if (split != std::string::npos) {
221 opt1 = cur.substr(1, split - 1);
222 } else {
223 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
224 exit(1);
225 }
226 std::size_t end = cur.find_first_of("]");
227 if (end != std::string::npos) {
228 opt2 = cur.substr(split + 1, end - split - 1);
229 } else {
230 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
231 exit(1);
232 }
233 cur = cur.substr(end + 1);
234
235 if (taOptToIndex.find(opt1) != taOptToIndex.end() &&
236 taOptToIndex.find(opt2) != taOptToIndex.end()) {
237 correlation_list.push_back(
238 std::make_pair(taOptToIndex[opt1], taOptToIndex[opt2]));
239 } else {
240 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
241 exit(1);
242 }
243 }
244
245 int sequence = 0;
246 for (auto& it : correlation_list) {
247 correlation_map[it.first][it.second] = sequence;
248 sequence++;
249 }
250 return;
251 }
252
253 // The trace statistic struct constructor
TraceStats()254 TraceStats::TraceStats() {
255 cf_id = 0;
256 cf_name = "0";
257 a_count = 0;
258 a_key_id = 0;
259 a_key_size_sqsum = 0;
260 a_key_size_sum = 0;
261 a_key_mid = 0;
262 a_value_size_sqsum = 0;
263 a_value_size_sum = 0;
264 a_value_mid = 0;
265 a_peak_qps = 0;
266 a_ave_qps = 0.0;
267 }
268
~TraceStats()269 TraceStats::~TraceStats() {}
270
271 // The trace analyzer constructor
TraceAnalyzer(std::string & trace_path,std::string & output_path,AnalyzerOptions _analyzer_opts)272 TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
273 AnalyzerOptions _analyzer_opts)
274 : trace_name_(trace_path),
275 output_path_(output_path),
276 analyzer_opts_(_analyzer_opts) {
277 ROCKSDB_NAMESPACE::EnvOptions env_options;
278 env_ = ROCKSDB_NAMESPACE::Env::Default();
279 offset_ = 0;
280 c_time_ = 0;
281 total_requests_ = 0;
282 total_access_keys_ = 0;
283 total_gets_ = 0;
284 total_writes_ = 0;
285 trace_create_time_ = 0;
286 begin_time_ = 0;
287 end_time_ = 0;
288 time_series_start_ = 0;
289 cur_time_sec_ = 0;
290 if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
291 sample_max_ = 1;
292 } else {
293 sample_max_ = static_cast<uint32_t>(1.0 / FLAGS_sample_ratio);
294 }
295
296 ta_.resize(kTaTypeNum);
297 ta_[0].type_name = "get";
298 if (FLAGS_analyze_get) {
299 ta_[0].enabled = true;
300 } else {
301 ta_[0].enabled = false;
302 }
303 ta_[1].type_name = "put";
304 if (FLAGS_analyze_put) {
305 ta_[1].enabled = true;
306 } else {
307 ta_[1].enabled = false;
308 }
309 ta_[2].type_name = "delete";
310 if (FLAGS_analyze_delete) {
311 ta_[2].enabled = true;
312 } else {
313 ta_[2].enabled = false;
314 }
315 ta_[3].type_name = "single_delete";
316 if (FLAGS_analyze_single_delete) {
317 ta_[3].enabled = true;
318 } else {
319 ta_[3].enabled = false;
320 }
321 ta_[4].type_name = "range_delete";
322 if (FLAGS_analyze_range_delete) {
323 ta_[4].enabled = true;
324 } else {
325 ta_[4].enabled = false;
326 }
327 ta_[5].type_name = "merge";
328 if (FLAGS_analyze_merge) {
329 ta_[5].enabled = true;
330 } else {
331 ta_[5].enabled = false;
332 }
333 ta_[6].type_name = "iterator_Seek";
334 if (FLAGS_analyze_iterator) {
335 ta_[6].enabled = true;
336 } else {
337 ta_[6].enabled = false;
338 }
339 ta_[7].type_name = "iterator_SeekForPrev";
340 if (FLAGS_analyze_iterator) {
341 ta_[7].enabled = true;
342 } else {
343 ta_[7].enabled = false;
344 }
345 for (int i = 0; i < kTaTypeNum; i++) {
346 ta_[i].sample_count = 0;
347 }
348 }
349
~TraceAnalyzer()350 TraceAnalyzer::~TraceAnalyzer() {}
351
352 // Prepare the processing
353 // Initiate the global trace reader and writer here
PrepareProcessing()354 Status TraceAnalyzer::PrepareProcessing() {
355 Status s;
356 // Prepare the trace reader
357 s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
358 if (!s.ok()) {
359 return s;
360 }
361
362 // Prepare and open the trace sequence file writer if needed
363 if (FLAGS_convert_to_human_readable_trace) {
364 std::string trace_sequence_name;
365 trace_sequence_name =
366 output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt";
367 s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_,
368 env_options_);
369 if (!s.ok()) {
370 return s;
371 }
372 }
373
374 // prepare the general QPS file writer
375 if (FLAGS_output_qps_stats) {
376 std::string qps_stats_name;
377 qps_stats_name =
378 output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt";
379 s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_);
380 if (!s.ok()) {
381 return s;
382 }
383
384 qps_stats_name =
385 output_path_ + "/" + FLAGS_output_prefix + "-cf_qps_stats.txt";
386 s = env_->NewWritableFile(qps_stats_name, &cf_qps_f_, env_options_);
387 if (!s.ok()) {
388 return s;
389 }
390 }
391 return Status::OK();
392 }
393
ReadTraceHeader(Trace * header)394 Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
395 assert(header != nullptr);
396 Status s = ReadTraceRecord(header);
397 if (!s.ok()) {
398 return s;
399 }
400 if (header->type != kTraceBegin) {
401 return Status::Corruption("Corrupted trace file. Incorrect header.");
402 }
403 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
404 return Status::Corruption("Corrupted trace file. Incorrect magic.");
405 }
406
407 return s;
408 }
409
ReadTraceFooter(Trace * footer)410 Status TraceAnalyzer::ReadTraceFooter(Trace* footer) {
411 assert(footer != nullptr);
412 Status s = ReadTraceRecord(footer);
413 if (!s.ok()) {
414 return s;
415 }
416 if (footer->type != kTraceEnd) {
417 return Status::Corruption("Corrupted trace file. Incorrect footer.");
418 }
419 return s;
420 }
421
ReadTraceRecord(Trace * trace)422 Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
423 assert(trace != nullptr);
424 std::string encoded_trace;
425 Status s = trace_reader_->Read(&encoded_trace);
426 if (!s.ok()) {
427 return s;
428 }
429
430 Slice enc_slice = Slice(encoded_trace);
431 GetFixed64(&enc_slice, &trace->ts);
432 trace->type = static_cast<TraceType>(enc_slice[0]);
433 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
434 trace->payload = enc_slice.ToString();
435 return s;
436 }
437
438 // process the trace itself and redirect the trace content
439 // to different operation type handler. With different race
440 // format, this function can be changed
StartProcessing()441 Status TraceAnalyzer::StartProcessing() {
442 Status s;
443 Trace header;
444 s = ReadTraceHeader(&header);
445 if (!s.ok()) {
446 fprintf(stderr, "Cannot read the header\n");
447 return s;
448 }
449 trace_create_time_ = header.ts;
450 if (FLAGS_output_time_series) {
451 time_series_start_ = header.ts;
452 }
453
454 Trace trace;
455 while (s.ok()) {
456 trace.reset();
457 s = ReadTraceRecord(&trace);
458 if (!s.ok()) {
459 break;
460 }
461
462 total_requests_++;
463 end_time_ = trace.ts;
464 if (trace.type == kTraceWrite) {
465 total_writes_++;
466 c_time_ = trace.ts;
467 WriteBatch batch(trace.payload);
468
469 // Note that, if the write happens in a transaction,
470 // 'Write' will be called twice, one for Prepare, one for
471 // Commit. Thus, in the trace, for the same WriteBatch, there
472 // will be two reords if it is in a transaction. Here, we only
473 // process the reord that is committed. If write is non-transaction,
474 // HasBeginPrepare()==false, so we process it normally.
475 if (batch.HasBeginPrepare() && !batch.HasCommit()) {
476 continue;
477 }
478 TraceWriteHandler write_handler(this);
479 s = batch.Iterate(&write_handler);
480 if (!s.ok()) {
481 fprintf(stderr, "Cannot process the write batch in the trace\n");
482 return s;
483 }
484 } else if (trace.type == kTraceGet) {
485 uint32_t cf_id = 0;
486 Slice key;
487 DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
488 total_gets_++;
489
490 s = HandleGet(cf_id, key.ToString(), trace.ts, 1);
491 if (!s.ok()) {
492 fprintf(stderr, "Cannot process the get in the trace\n");
493 return s;
494 }
495 } else if (trace.type == kTraceIteratorSeek ||
496 trace.type == kTraceIteratorSeekForPrev) {
497 uint32_t cf_id = 0;
498 Slice key;
499 DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
500 s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type);
501 if (!s.ok()) {
502 fprintf(stderr, "Cannot process the iterator in the trace\n");
503 return s;
504 }
505 } else if (trace.type == kTraceEnd) {
506 break;
507 }
508 }
509 if (s.IsIncomplete()) {
510 // Fix it: Reaching eof returns Incomplete status at the moment.
511 //
512 return Status::OK();
513 }
514 return s;
515 }
516
517 // After the trace is processed by StartProcessing, the statistic data
518 // is stored in the map or other in memory data structures. To get the
519 // other statistic result such as key size distribution, value size
520 // distribution, these data structures are re-processed here.
MakeStatistics()521 Status TraceAnalyzer::MakeStatistics() {
522 int ret;
523 Status s;
524 for (int type = 0; type < kTaTypeNum; type++) {
525 if (!ta_[type].enabled) {
526 continue;
527 }
528 for (auto& stat : ta_[type].stats) {
529 stat.second.a_key_id = 0;
530 for (auto& record : stat.second.a_key_stats) {
531 record.second.key_id = stat.second.a_key_id;
532 stat.second.a_key_id++;
533 if (record.second.access_count <=
534 static_cast<uint64_t>(FLAGS_output_ignore_count)) {
535 continue;
536 }
537
538 // Generate the key access count distribution data
539 if (FLAGS_output_access_count_stats) {
540 if (stat.second.a_count_stats.find(record.second.access_count) ==
541 stat.second.a_count_stats.end()) {
542 stat.second.a_count_stats[record.second.access_count] = 1;
543 } else {
544 stat.second.a_count_stats[record.second.access_count]++;
545 }
546 }
547
548 // Generate the key size distribution data
549 if (FLAGS_output_key_distribution) {
550 if (stat.second.a_key_size_stats.find(record.first.size()) ==
551 stat.second.a_key_size_stats.end()) {
552 stat.second.a_key_size_stats[record.first.size()] = 1;
553 } else {
554 stat.second.a_key_size_stats[record.first.size()]++;
555 }
556 }
557
558 if (!FLAGS_print_correlation.empty()) {
559 s = MakeStatisticCorrelation(stat.second, record.second);
560 if (!s.ok()) {
561 return s;
562 }
563 }
564 }
565
566 // Output the prefix cut or the whole content of the accessed key space
567 if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) {
568 s = MakeStatisticKeyStatsOrPrefix(stat.second);
569 if (!s.ok()) {
570 return s;
571 }
572 }
573
574 // output the access count distribution
575 if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) {
576 for (auto& record : stat.second.a_count_stats) {
577 ret = snprintf(buffer_, sizeof(buffer_),
578 "access_count: %" PRIu64 " num: %" PRIu64 "\n",
579 record.first, record.second);
580 if (ret < 0) {
581 return Status::IOError("Format the output failed");
582 }
583 std::string printout(buffer_);
584 s = stat.second.a_count_dist_f->Append(printout);
585 if (!s.ok()) {
586 fprintf(stderr, "Write access count distribution file failed\n");
587 return s;
588 }
589 }
590 }
591
592 // find the medium of the key size
593 uint64_t k_count = 0;
594 bool get_mid = false;
595 for (auto& record : stat.second.a_key_size_stats) {
596 k_count += record.second;
597 if (!get_mid && k_count >= stat.second.a_key_mid) {
598 stat.second.a_key_mid = record.first;
599 get_mid = true;
600 }
601 if (FLAGS_output_key_distribution && stat.second.a_key_size_f) {
602 ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %" PRIu64 "\n",
603 record.first, record.second);
604 if (ret < 0) {
605 return Status::IOError("Format output failed");
606 }
607 std::string printout(buffer_);
608 s = stat.second.a_key_size_f->Append(printout);
609 if (!s.ok()) {
610 fprintf(stderr, "Write key size distribution file failed\n");
611 return s;
612 }
613 }
614 }
615
616 // output the value size distribution
617 uint64_t v_begin = 0, v_end = 0, v_count = 0;
618 get_mid = false;
619 for (auto& record : stat.second.a_value_size_stats) {
620 v_begin = v_end;
621 v_end = (record.first + 1) * FLAGS_value_interval;
622 v_count += record.second;
623 if (!get_mid && v_count >= stat.second.a_count / 2) {
624 stat.second.a_value_mid = (v_begin + v_end) / 2;
625 get_mid = true;
626 }
627 if (FLAGS_output_value_distribution && stat.second.a_value_size_f &&
628 (type == TraceOperationType::kPut ||
629 type == TraceOperationType::kMerge)) {
630 ret = snprintf(buffer_, sizeof(buffer_),
631 "Number_of_value_size_between %" PRIu64 " and %" PRIu64
632 " is: %" PRIu64 "\n",
633 v_begin, v_end, record.second);
634 if (ret < 0) {
635 return Status::IOError("Format output failed");
636 }
637 std::string printout(buffer_);
638 s = stat.second.a_value_size_f->Append(printout);
639 if (!s.ok()) {
640 fprintf(stderr, "Write value size distribution file failed\n");
641 return s;
642 }
643 }
644 }
645 }
646 }
647
648 // Make the QPS statistics
649 if (FLAGS_output_qps_stats) {
650 s = MakeStatisticQPS();
651 if (!s.ok()) {
652 return s;
653 }
654 }
655
656 return Status::OK();
657 }
658
659 // Process the statistics of the key access and
660 // prefix of the accessed keys if required
MakeStatisticKeyStatsOrPrefix(TraceStats & stats)661 Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) {
662 int ret;
663 Status s;
664 std::string prefix = "0";
665 uint64_t prefix_access = 0;
666 uint64_t prefix_count = 0;
667 uint64_t prefix_succ_access = 0;
668 double prefix_ave_access = 0.0;
669 stats.a_succ_count = 0;
670 for (auto& record : stats.a_key_stats) {
671 // write the key access statistic file
672 if (!stats.a_key_f) {
673 return Status::IOError("Failed to open accessed_key_stats file.");
674 }
675 stats.a_succ_count += record.second.succ_count;
676 double succ_ratio = 0.0;
677 if (record.second.access_count > 0) {
678 succ_ratio = (static_cast<double>(record.second.succ_count)) /
679 record.second.access_count;
680 }
681 ret = snprintf(buffer_, sizeof(buffer_),
682 "%u %zu %" PRIu64 " %" PRIu64 " %f\n", record.second.cf_id,
683 record.second.value_size, record.second.key_id,
684 record.second.access_count, succ_ratio);
685 if (ret < 0) {
686 return Status::IOError("Format output failed");
687 }
688 std::string printout(buffer_);
689 s = stats.a_key_f->Append(printout);
690 if (!s.ok()) {
691 fprintf(stderr, "Write key access file failed\n");
692 return s;
693 }
694
695 // write the prefix cut of the accessed keys
696 if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) {
697 if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) {
698 std::string prefix_out =
699 ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix);
700 if (prefix_count == 0) {
701 prefix_ave_access = 0.0;
702 } else {
703 prefix_ave_access =
704 (static_cast<double>(prefix_access)) / prefix_count;
705 }
706 double prefix_succ_ratio = 0.0;
707 if (prefix_access > 0) {
708 prefix_succ_ratio =
709 (static_cast<double>(prefix_succ_access)) / prefix_access;
710 }
711 ret =
712 snprintf(buffer_, sizeof(buffer_),
713 "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n",
714 record.second.key_id, prefix_access, prefix_count,
715 prefix_ave_access, prefix_succ_ratio, prefix_out.c_str());
716 if (ret < 0) {
717 return Status::IOError("Format output failed");
718 }
719 std::string pout(buffer_);
720 s = stats.a_prefix_cut_f->Append(pout);
721 if (!s.ok()) {
722 fprintf(stderr, "Write accessed key prefix file failed\n");
723 return s;
724 }
725
726 // make the top k statistic for the prefix
727 if (static_cast<int32_t>(stats.top_k_prefix_access.size()) <
728 FLAGS_print_top_k_access) {
729 stats.top_k_prefix_access.push(
730 std::make_pair(prefix_access, prefix_out));
731 } else {
732 if (prefix_access > stats.top_k_prefix_access.top().first) {
733 stats.top_k_prefix_access.pop();
734 stats.top_k_prefix_access.push(
735 std::make_pair(prefix_access, prefix_out));
736 }
737 }
738
739 if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) <
740 FLAGS_print_top_k_access) {
741 stats.top_k_prefix_ave.push(
742 std::make_pair(prefix_ave_access, prefix_out));
743 } else {
744 if (prefix_ave_access > stats.top_k_prefix_ave.top().first) {
745 stats.top_k_prefix_ave.pop();
746 stats.top_k_prefix_ave.push(
747 std::make_pair(prefix_ave_access, prefix_out));
748 }
749 }
750
751 prefix = record.first.substr(0, FLAGS_output_prefix_cut);
752 prefix_access = 0;
753 prefix_count = 0;
754 prefix_succ_access = 0;
755 }
756 prefix_access += record.second.access_count;
757 prefix_count += 1;
758 prefix_succ_access += record.second.succ_count;
759 }
760 }
761 return Status::OK();
762 }
763
764 // Process the statistics of different query type
765 // correlations
MakeStatisticCorrelation(TraceStats & stats,StatsUnit & unit)766 Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
767 StatsUnit& unit) {
768 if (stats.correlation_output.size() !=
769 analyzer_opts_.correlation_list.size()) {
770 return Status::Corruption("Cannot make the statistic of correlation.");
771 }
772
773 for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size());
774 i++) {
775 if (i >= static_cast<int>(stats.correlation_output.size()) ||
776 i >= static_cast<int>(unit.v_correlation.size())) {
777 break;
778 }
779 stats.correlation_output[i].first += unit.v_correlation[i].count;
780 stats.correlation_output[i].second += unit.v_correlation[i].total_ts;
781 }
782 return Status::OK();
783 }
784
785 // Process the statistics of QPS
MakeStatisticQPS()786 Status TraceAnalyzer::MakeStatisticQPS() {
787 if(begin_time_ == 0) {
788 begin_time_ = trace_create_time_;
789 }
790 uint32_t duration =
791 static_cast<uint32_t>((end_time_ - begin_time_) / 1000000);
792 int ret;
793 Status s;
794 std::vector<std::vector<uint32_t>> type_qps(
795 duration, std::vector<uint32_t>(kTaTypeNum + 1, 0));
796 std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0);
797 std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0);
798 qps_ave_.resize(kTaTypeNum + 1);
799
800 for (int type = 0; type < kTaTypeNum; type++) {
801 if (!ta_[type].enabled) {
802 continue;
803 }
804 for (auto& stat : ta_[type].stats) {
805 uint32_t time_line = 0;
806 uint64_t cf_qps_sum = 0;
807 for (auto& time_it : stat.second.a_qps_stats) {
808 if (time_it.first >= duration) {
809 continue;
810 }
811 type_qps[time_it.first][kTaTypeNum] += time_it.second;
812 type_qps[time_it.first][type] += time_it.second;
813 cf_qps_sum += time_it.second;
814 if (time_it.second > stat.second.a_peak_qps) {
815 stat.second.a_peak_qps = time_it.second;
816 }
817 if (stat.second.a_qps_f) {
818 while (time_line < time_it.first) {
819 ret = snprintf(buffer_, sizeof(buffer_), "%u\n", 0);
820 if (ret < 0) {
821 return Status::IOError("Format the output failed");
822 }
823 std::string printout(buffer_);
824 s = stat.second.a_qps_f->Append(printout);
825 if (!s.ok()) {
826 fprintf(stderr, "Write QPS file failed\n");
827 return s;
828 }
829 time_line++;
830 }
831 ret = snprintf(buffer_, sizeof(buffer_), "%u\n", time_it.second);
832 if (ret < 0) {
833 return Status::IOError("Format the output failed");
834 }
835 std::string printout(buffer_);
836 s = stat.second.a_qps_f->Append(printout);
837 if (!s.ok()) {
838 fprintf(stderr, "Write QPS file failed\n");
839 return s;
840 }
841 if (time_line == time_it.first) {
842 time_line++;
843 }
844 }
845
846 // Process the top k QPS peaks
847 if (FLAGS_output_prefix_cut > 0) {
848 if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) <
849 FLAGS_print_top_k_access) {
850 stat.second.top_k_qps_sec.push(
851 std::make_pair(time_it.second, time_it.first));
852 } else {
853 if (stat.second.top_k_qps_sec.size() > 0 &&
854 stat.second.top_k_qps_sec.top().first < time_it.second) {
855 stat.second.top_k_qps_sec.pop();
856 stat.second.top_k_qps_sec.push(
857 std::make_pair(time_it.second, time_it.first));
858 }
859 }
860 }
861 }
862 if (duration == 0) {
863 stat.second.a_ave_qps = 0;
864 } else {
865 stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration;
866 }
867
868 // Output the accessed unique key number change overtime
869 if (stat.second.a_key_num_f) {
870 uint64_t cur_uni_key =
871 static_cast<uint64_t>(stat.second.a_key_stats.size());
872 double cur_ratio = 0.0;
873 uint64_t cur_num = 0;
874 for (uint32_t i = 0; i < duration; i++) {
875 auto find_time = stat.second.uni_key_num.find(i);
876 if (find_time != stat.second.uni_key_num.end()) {
877 cur_ratio = (static_cast<double>(find_time->second)) / cur_uni_key;
878 cur_num = find_time->second;
879 }
880 ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %.12f\n",
881 cur_num, cur_ratio);
882 if (ret < 0) {
883 return Status::IOError("Format the output failed");
884 }
885 std::string printout(buffer_);
886 s = stat.second.a_key_num_f->Append(printout);
887 if (!s.ok()) {
888 fprintf(stderr,
889 "Write accessed unique key number change file failed\n");
890 return s;
891 }
892 }
893 }
894
895 // output the prefix of top k access peak
896 if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) {
897 while (!stat.second.top_k_qps_sec.empty()) {
898 ret = snprintf(buffer_, sizeof(buffer_), "At time: %u with QPS: %u\n",
899 stat.second.top_k_qps_sec.top().second,
900 stat.second.top_k_qps_sec.top().first);
901 if (ret < 0) {
902 return Status::IOError("Format the output failed");
903 }
904 std::string printout(buffer_);
905 s = stat.second.a_top_qps_prefix_f->Append(printout);
906 if (!s.ok()) {
907 fprintf(stderr, "Write prefix QPS top K file failed\n");
908 return s;
909 }
910 uint32_t qps_time = stat.second.top_k_qps_sec.top().second;
911 stat.second.top_k_qps_sec.pop();
912 if (stat.second.a_qps_prefix_stats.find(qps_time) !=
913 stat.second.a_qps_prefix_stats.end()) {
914 for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) {
915 std::string qps_prefix_out =
916 ROCKSDB_NAMESPACE::LDBCommand::StringToHex(qps_prefix.first);
917 ret = snprintf(buffer_, sizeof(buffer_),
918 "The prefix: %s Access count: %u\n",
919 qps_prefix_out.c_str(), qps_prefix.second);
920 if (ret < 0) {
921 return Status::IOError("Format the output failed");
922 }
923 std::string pout(buffer_);
924 s = stat.second.a_top_qps_prefix_f->Append(pout);
925 if (!s.ok()) {
926 fprintf(stderr, "Write prefix QPS top K file failed\n");
927 return s;
928 }
929 }
930 }
931 }
932 }
933 }
934 }
935
936 if (qps_f_) {
937 for (uint32_t i = 0; i < duration; i++) {
938 for (int type = 0; type <= kTaTypeNum; type++) {
939 if (type < kTaTypeNum) {
940 ret = snprintf(buffer_, sizeof(buffer_), "%u ", type_qps[i][type]);
941 } else {
942 ret = snprintf(buffer_, sizeof(buffer_), "%u\n", type_qps[i][type]);
943 }
944 if (ret < 0) {
945 return Status::IOError("Format the output failed");
946 }
947 std::string printout(buffer_);
948 s = qps_f_->Append(printout);
949 if (!s.ok()) {
950 return s;
951 }
952 qps_sum[type] += type_qps[i][type];
953 if (type_qps[i][type] > qps_peak[type]) {
954 qps_peak[type] = type_qps[i][type];
955 }
956 }
957 }
958 }
959
960 if (cf_qps_f_) {
961 int cfs_size = static_cast<uint32_t>(cfs_.size());
962 uint32_t v;
963 for (uint32_t i = 0; i < duration; i++) {
964 for (int cf = 0; cf < cfs_size; cf++) {
965 if (cfs_[cf].cf_qps.find(i) != cfs_[cf].cf_qps.end()) {
966 v = cfs_[cf].cf_qps[i];
967 } else {
968 v = 0;
969 }
970 if (cf < cfs_size - 1) {
971 ret = snprintf(buffer_, sizeof(buffer_), "%u ", v);
972 } else {
973 ret = snprintf(buffer_, sizeof(buffer_), "%u\n", v);
974 }
975 if (ret < 0) {
976 return Status::IOError("Format the output failed");
977 }
978 std::string printout(buffer_);
979 s = cf_qps_f_->Append(printout);
980 if (!s.ok()) {
981 return s;
982 }
983 }
984 }
985 }
986
987 qps_peak_ = qps_peak;
988 for (int type = 0; type <= kTaTypeNum; type++) {
989 if (duration == 0) {
990 qps_ave_[type] = 0;
991 } else {
992 qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration;
993 }
994 }
995
996 return Status::OK();
997 }
998
999 // In reprocessing, if we have the whole key space
1000 // we can output the access count of all keys in a cf
1001 // we can make some statistics of the whole key space
1002 // also, we output the top k accessed keys here
ReProcessing()1003 Status TraceAnalyzer::ReProcessing() {
1004 int ret;
1005 Status s;
1006 for (auto& cf_it : cfs_) {
1007 uint32_t cf_id = cf_it.first;
1008
1009 // output the time series;
1010 if (FLAGS_output_time_series) {
1011 for (int type = 0; type < kTaTypeNum; type++) {
1012 if (!ta_[type].enabled ||
1013 ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
1014 continue;
1015 }
1016 TraceStats& stat = ta_[type].stats[cf_id];
1017 if (!stat.time_series_f) {
1018 fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n",
1019 ta_[type].type_name.c_str(), cf_id);
1020 continue;
1021 }
1022 while (!stat.time_series.empty()) {
1023 uint64_t key_id = 0;
1024 auto found = stat.a_key_stats.find(stat.time_series.front().key);
1025 if (found != stat.a_key_stats.end()) {
1026 key_id = found->second.key_id;
1027 }
1028 ret =
1029 snprintf(buffer_, sizeof(buffer_), "%u %" PRIu64 " %" PRIu64 "\n",
1030 stat.time_series.front().type,
1031 stat.time_series.front().ts, key_id);
1032 if (ret < 0) {
1033 return Status::IOError("Format the output failed");
1034 }
1035 std::string printout(buffer_);
1036 s = stat.time_series_f->Append(printout);
1037 if (!s.ok()) {
1038 fprintf(stderr, "Write time series file failed\n");
1039 return s;
1040 }
1041 stat.time_series.pop_front();
1042 }
1043 }
1044 }
1045
1046 // process the whole key space if needed
1047 if (!FLAGS_key_space_dir.empty()) {
1048 std::string whole_key_path =
1049 FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
1050 std::string input_key, get_key;
1051 std::vector<std::string> prefix(kTaTypeNum);
1052 std::istringstream iss;
1053 bool has_data = true;
1054 std::unique_ptr<SequentialFile> wkey_input_f;
1055
1056 s = env_->NewSequentialFile(whole_key_path, &wkey_input_f, env_options_);
1057 if (!s.ok()) {
1058 fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
1059 cf_id);
1060 wkey_input_f.reset();
1061 }
1062
1063 if (wkey_input_f) {
1064 std::unique_ptr<FSSequentialFile> file;
1065 file = NewLegacySequentialFileWrapper(wkey_input_f);
1066 size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
1067 SequentialFileReader sf_reader(
1068 std::move(file), whole_key_path,
1069 kTraceFileReadaheadSize /* filereadahead_size */);
1070 for (cfs_[cf_id].w_count = 0;
1071 ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s);
1072 ++cfs_[cf_id].w_count) {
1073 if (!s.ok()) {
1074 fprintf(stderr, "Read whole key space file failed\n");
1075 return s;
1076 }
1077
1078 input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key);
1079 for (int type = 0; type < kTaTypeNum; type++) {
1080 if (!ta_[type].enabled) {
1081 continue;
1082 }
1083 TraceStats& stat = ta_[type].stats[cf_id];
1084 if (stat.w_key_f) {
1085 if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) {
1086 ret = snprintf(buffer_, sizeof(buffer_),
1087 "%" PRIu64 " %" PRIu64 "\n", cfs_[cf_id].w_count,
1088 stat.a_key_stats[input_key].access_count);
1089 if (ret < 0) {
1090 return Status::IOError("Format the output failed");
1091 }
1092 std::string printout(buffer_);
1093 s = stat.w_key_f->Append(printout);
1094 if (!s.ok()) {
1095 fprintf(stderr, "Write whole key space access file failed\n");
1096 return s;
1097 }
1098 }
1099 }
1100
1101 // Output the prefix cut file of the whole key space
1102 if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) {
1103 if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) !=
1104 0) {
1105 prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut);
1106 std::string prefix_out =
1107 ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix[type]);
1108 ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %s\n",
1109 cfs_[cf_id].w_count, prefix_out.c_str());
1110 if (ret < 0) {
1111 return Status::IOError("Format the output failed");
1112 }
1113 std::string printout(buffer_);
1114 s = stat.w_prefix_cut_f->Append(printout);
1115 if (!s.ok()) {
1116 fprintf(stderr,
1117 "Write whole key space prefix cut file failed\n");
1118 return s;
1119 }
1120 }
1121 }
1122 }
1123
1124 // Make the statistics fo the key size distribution
1125 if (FLAGS_output_key_distribution) {
1126 if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) ==
1127 cfs_[cf_id].w_key_size_stats.end()) {
1128 cfs_[cf_id].w_key_size_stats[input_key.size()] = 1;
1129 } else {
1130 cfs_[cf_id].w_key_size_stats[input_key.size()]++;
1131 }
1132 }
1133 }
1134 }
1135 }
1136
1137 // process the top k accessed keys
1138 if (FLAGS_print_top_k_access > 0) {
1139 for (int type = 0; type < kTaTypeNum; type++) {
1140 if (!ta_[type].enabled ||
1141 ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
1142 continue;
1143 }
1144 TraceStats& stat = ta_[type].stats[cf_id];
1145 for (auto& record : stat.a_key_stats) {
1146 if (static_cast<int32_t>(stat.top_k_queue.size()) <
1147 FLAGS_print_top_k_access) {
1148 stat.top_k_queue.push(
1149 std::make_pair(record.second.access_count, record.first));
1150 } else {
1151 if (record.second.access_count > stat.top_k_queue.top().first) {
1152 stat.top_k_queue.pop();
1153 stat.top_k_queue.push(
1154 std::make_pair(record.second.access_count, record.first));
1155 }
1156 }
1157 }
1158 }
1159 }
1160 }
1161 return Status::OK();
1162 }
1163
1164 // End the processing, print the requested results
EndProcessing()1165 Status TraceAnalyzer::EndProcessing() {
1166 if (trace_sequence_f_) {
1167 trace_sequence_f_->Close();
1168 }
1169 if (FLAGS_no_print) {
1170 return Status::OK();
1171 }
1172 PrintStatistics();
1173 CloseOutputFiles();
1174 return Status::OK();
1175 }
1176
1177 // Insert the corresponding key statistics to the correct type
1178 // and correct CF, output the time-series file if needed
KeyStatsInsertion(const uint32_t & type,const uint32_t & cf_id,const std::string & key,const size_t value_size,const uint64_t ts)1179 Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type,
1180 const uint32_t& cf_id,
1181 const std::string& key,
1182 const size_t value_size,
1183 const uint64_t ts) {
1184 Status s;
1185 StatsUnit unit;
1186 unit.key_id = 0;
1187 unit.cf_id = cf_id;
1188 unit.value_size = value_size;
1189 unit.access_count = 1;
1190 unit.latest_ts = ts;
1191 if (type != TraceOperationType::kGet || value_size > 0) {
1192 unit.succ_count = 1;
1193 } else {
1194 unit.succ_count = 0;
1195 }
1196 unit.v_correlation.resize(analyzer_opts_.correlation_list.size());
1197 for (int i = 0;
1198 i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) {
1199 unit.v_correlation[i].count = 0;
1200 unit.v_correlation[i].total_ts = 0;
1201 }
1202 std::string prefix;
1203 if (FLAGS_output_prefix_cut > 0) {
1204 prefix = key.substr(0, FLAGS_output_prefix_cut);
1205 }
1206
1207 if (begin_time_ == 0) {
1208 begin_time_ = ts;
1209 }
1210 uint32_t time_in_sec;
1211 if (ts < begin_time_) {
1212 time_in_sec = 0;
1213 } else {
1214 time_in_sec = static_cast<uint32_t>((ts - begin_time_) / 1000000);
1215 }
1216
1217 uint64_t dist_value_size = value_size / FLAGS_value_interval;
1218 auto found_stats = ta_[type].stats.find(cf_id);
1219 if (found_stats == ta_[type].stats.end()) {
1220 ta_[type].stats[cf_id].cf_id = cf_id;
1221 ta_[type].stats[cf_id].cf_name = std::to_string(cf_id);
1222 ta_[type].stats[cf_id].a_count = 1;
1223 ta_[type].stats[cf_id].a_key_id = 0;
1224 ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow(
1225 static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
1226 ta_[type].stats[cf_id].a_key_size_sum = key.size();
1227 ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow(
1228 static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
1229 ta_[type].stats[cf_id].a_value_size_sum = value_size;
1230 s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]);
1231 if (!FLAGS_print_correlation.empty()) {
1232 s = StatsUnitCorrelationUpdate(unit, type, ts, key);
1233 }
1234 ta_[type].stats[cf_id].a_key_stats[key] = unit;
1235 ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1;
1236 ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1;
1237 ta_[type].stats[cf_id].correlation_output.resize(
1238 analyzer_opts_.correlation_list.size());
1239 if (FLAGS_output_prefix_cut > 0) {
1240 std::map<std::string, uint32_t> tmp_qps_map;
1241 tmp_qps_map[prefix] = 1;
1242 ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
1243 }
1244 if (time_in_sec != cur_time_sec_) {
1245 ta_[type].stats[cf_id].uni_key_num[cur_time_sec_] =
1246 static_cast<uint64_t>(ta_[type].stats[cf_id].a_key_stats.size());
1247 cur_time_sec_ = time_in_sec;
1248 }
1249 } else {
1250 found_stats->second.a_count++;
1251 found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow(
1252 static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
1253 found_stats->second.a_key_size_sum += key.size();
1254 found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow(
1255 static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
1256 found_stats->second.a_value_size_sum += value_size;
1257 auto found_key = found_stats->second.a_key_stats.find(key);
1258 if (found_key == found_stats->second.a_key_stats.end()) {
1259 found_stats->second.a_key_stats[key] = unit;
1260 } else {
1261 found_key->second.access_count++;
1262 if (type != TraceOperationType::kGet || value_size > 0) {
1263 found_key->second.succ_count++;
1264 }
1265 if (!FLAGS_print_correlation.empty()) {
1266 s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key);
1267 }
1268 }
1269 if (time_in_sec != cur_time_sec_) {
1270 found_stats->second.uni_key_num[cur_time_sec_] =
1271 static_cast<uint64_t>(found_stats->second.a_key_stats.size());
1272 cur_time_sec_ = time_in_sec;
1273 }
1274
1275 auto found_value =
1276 found_stats->second.a_value_size_stats.find(dist_value_size);
1277 if (found_value == found_stats->second.a_value_size_stats.end()) {
1278 found_stats->second.a_value_size_stats[dist_value_size] = 1;
1279 } else {
1280 found_value->second++;
1281 }
1282
1283 auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec);
1284 if (found_qps == found_stats->second.a_qps_stats.end()) {
1285 found_stats->second.a_qps_stats[time_in_sec] = 1;
1286 } else {
1287 found_qps->second++;
1288 }
1289
1290 if (FLAGS_output_prefix_cut > 0) {
1291 auto found_qps_prefix =
1292 found_stats->second.a_qps_prefix_stats.find(time_in_sec);
1293 if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) {
1294 std::map<std::string, uint32_t> tmp_qps_map;
1295 found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
1296 }
1297 if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) ==
1298 found_stats->second.a_qps_prefix_stats[time_in_sec].end()) {
1299 found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1;
1300 } else {
1301 found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++;
1302 }
1303 }
1304 }
1305
1306 if (cfs_.find(cf_id) == cfs_.end()) {
1307 CfUnit cf_unit;
1308 cf_unit.cf_id = cf_id;
1309 cf_unit.w_count = 0;
1310 cf_unit.a_count = 0;
1311 cfs_[cf_id] = cf_unit;
1312 }
1313
1314 if (FLAGS_output_qps_stats) {
1315 cfs_[cf_id].cf_qps[time_in_sec]++;
1316 }
1317
1318 if (FLAGS_output_time_series) {
1319 TraceUnit trace_u;
1320 trace_u.type = type;
1321 trace_u.key = key;
1322 trace_u.value_size = value_size;
1323 trace_u.ts = (ts - time_series_start_) / 1000000;
1324 trace_u.cf_id = cf_id;
1325 ta_[type].stats[cf_id].time_series.push_back(trace_u);
1326 }
1327
1328 return Status::OK();
1329 }
1330
1331 // Update the correlation unit of each key if enabled
StatsUnitCorrelationUpdate(StatsUnit & unit,const uint32_t & type_second,const uint64_t & ts,const std::string & key)1332 Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit,
1333 const uint32_t& type_second,
1334 const uint64_t& ts,
1335 const std::string& key) {
1336 if (type_second >= kTaTypeNum) {
1337 fprintf(stderr, "Unknown Type Id: %u\n", type_second);
1338 return Status::NotFound();
1339 }
1340
1341 for (int type_first = 0; type_first < kTaTypeNum; type_first++) {
1342 if (type_first >= static_cast<int>(ta_.size()) ||
1343 type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) {
1344 break;
1345 }
1346 if (analyzer_opts_.correlation_map[type_first][type_second] < 0 ||
1347 ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() ||
1348 ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) ==
1349 ta_[type_first].stats[unit.cf_id].a_key_stats.end() ||
1350 ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) {
1351 continue;
1352 }
1353
1354 int correlation_id =
1355 analyzer_opts_.correlation_map[type_first][type_second];
1356
1357 // after get the x-y operation time or x, update;
1358 if (correlation_id < 0 ||
1359 correlation_id >= static_cast<int>(unit.v_correlation.size())) {
1360 continue;
1361 }
1362 unit.v_correlation[correlation_id].count++;
1363 unit.v_correlation[correlation_id].total_ts +=
1364 (ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts);
1365 }
1366
1367 unit.latest_ts = ts;
1368 return Status::OK();
1369 }
1370
1371 // when a new trace statistic is created, the file handler
1372 // pointers should be initiated if needed according to
1373 // the trace analyzer options
OpenStatsOutputFiles(const std::string & type,TraceStats & new_stats)1374 Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type,
1375 TraceStats& new_stats) {
1376 Status s;
1377 if (FLAGS_output_key_stats) {
1378 s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt",
1379 &new_stats.a_key_f);
1380 s = CreateOutputFile(type, new_stats.cf_name,
1381 "accessed_unique_key_num_change.txt",
1382 &new_stats.a_key_num_f);
1383 if (!FLAGS_key_space_dir.empty()) {
1384 s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt",
1385 &new_stats.w_key_f);
1386 }
1387 }
1388
1389 if (FLAGS_output_access_count_stats) {
1390 s = CreateOutputFile(type, new_stats.cf_name,
1391 "accessed_key_count_distribution.txt",
1392 &new_stats.a_count_dist_f);
1393 }
1394
1395 if (FLAGS_output_prefix_cut > 0) {
1396 s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt",
1397 &new_stats.a_prefix_cut_f);
1398 if (!FLAGS_key_space_dir.empty()) {
1399 s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt",
1400 &new_stats.w_prefix_cut_f);
1401 }
1402
1403 if (FLAGS_output_qps_stats) {
1404 s = CreateOutputFile(type, new_stats.cf_name,
1405 "accessed_top_k_qps_prefix_cut.txt",
1406 &new_stats.a_top_qps_prefix_f);
1407 }
1408 }
1409
1410 if (FLAGS_output_time_series) {
1411 s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt",
1412 &new_stats.time_series_f);
1413 }
1414
1415 if (FLAGS_output_value_distribution) {
1416 s = CreateOutputFile(type, new_stats.cf_name,
1417 "accessed_value_size_distribution.txt",
1418 &new_stats.a_value_size_f);
1419 }
1420
1421 if (FLAGS_output_key_distribution) {
1422 s = CreateOutputFile(type, new_stats.cf_name,
1423 "accessed_key_size_distribution.txt",
1424 &new_stats.a_key_size_f);
1425 }
1426
1427 if (FLAGS_output_qps_stats) {
1428 s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt",
1429 &new_stats.a_qps_f);
1430 }
1431
1432 return Status::OK();
1433 }
1434
1435 // create the output path of the files to be opened
CreateOutputFile(const std::string & type,const std::string & cf_name,const std::string & ending,std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> * f_ptr)1436 Status TraceAnalyzer::CreateOutputFile(
1437 const std::string& type, const std::string& cf_name,
1438 const std::string& ending,
1439 std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr) {
1440 std::string path;
1441 path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name +
1442 "-" + ending;
1443 Status s;
1444 s = env_->NewWritableFile(path, f_ptr, env_options_);
1445 if (!s.ok()) {
1446 fprintf(stderr, "Cannot open file: %s\n", path.c_str());
1447 exit(1);
1448 }
1449 return Status::OK();
1450 }
1451
1452 // Close the output files in the TraceStats if they are opened
CloseOutputFiles()1453 void TraceAnalyzer::CloseOutputFiles() {
1454 for (int type = 0; type < kTaTypeNum; type++) {
1455 if (!ta_[type].enabled) {
1456 continue;
1457 }
1458 for (auto& stat : ta_[type].stats) {
1459 if (stat.second.time_series_f) {
1460 stat.second.time_series_f->Close();
1461 }
1462
1463 if (stat.second.a_key_f) {
1464 stat.second.a_key_f->Close();
1465 }
1466
1467 if (stat.second.a_key_num_f) {
1468 stat.second.a_key_num_f->Close();
1469 }
1470
1471 if (stat.second.a_count_dist_f) {
1472 stat.second.a_count_dist_f->Close();
1473 }
1474
1475 if (stat.second.a_prefix_cut_f) {
1476 stat.second.a_prefix_cut_f->Close();
1477 }
1478
1479 if (stat.second.a_value_size_f) {
1480 stat.second.a_value_size_f->Close();
1481 }
1482
1483 if (stat.second.a_key_size_f) {
1484 stat.second.a_key_size_f->Close();
1485 }
1486
1487 if (stat.second.a_qps_f) {
1488 stat.second.a_qps_f->Close();
1489 }
1490
1491 if (stat.second.a_top_qps_prefix_f) {
1492 stat.second.a_top_qps_prefix_f->Close();
1493 }
1494
1495 if (stat.second.w_key_f) {
1496 stat.second.w_key_f->Close();
1497 }
1498 if (stat.second.w_prefix_cut_f) {
1499 stat.second.w_prefix_cut_f->Close();
1500 }
1501 }
1502 }
1503 return;
1504 }
1505
1506 // Handle the Get request in the trace
HandleGet(uint32_t column_family_id,const std::string & key,const uint64_t & ts,const uint32_t & get_ret)1507 Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
1508 const std::string& key, const uint64_t& ts,
1509 const uint32_t& get_ret) {
1510 Status s;
1511 size_t value_size = 0;
1512 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1513 s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
1514 value_size, ts);
1515 if (!s.ok()) {
1516 return Status::Corruption("Failed to write the trace sequence to file");
1517 }
1518 }
1519
1520 if (ta_[TraceOperationType::kGet].sample_count >= sample_max_) {
1521 ta_[TraceOperationType::kGet].sample_count = 0;
1522 }
1523 if (ta_[TraceOperationType::kGet].sample_count > 0) {
1524 ta_[TraceOperationType::kGet].sample_count++;
1525 return Status::OK();
1526 }
1527 ta_[TraceOperationType::kGet].sample_count++;
1528
1529 if (!ta_[TraceOperationType::kGet].enabled) {
1530 return Status::OK();
1531 }
1532 if (get_ret == 1) {
1533 value_size = 10;
1534 }
1535 s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
1536 value_size, ts);
1537 if (!s.ok()) {
1538 return Status::Corruption("Failed to insert key statistics");
1539 }
1540 return s;
1541 }
1542
1543 // Handle the Put request in the write batch of the trace
HandlePut(uint32_t column_family_id,const Slice & key,const Slice & value)1544 Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
1545 const Slice& value) {
1546 Status s;
1547 size_t value_size = value.ToString().size();
1548 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1549 s = WriteTraceSequence(TraceOperationType::kPut, column_family_id,
1550 key.ToString(), value_size, c_time_);
1551 if (!s.ok()) {
1552 return Status::Corruption("Failed to write the trace sequence to file");
1553 }
1554 }
1555
1556 if (ta_[TraceOperationType::kPut].sample_count >= sample_max_) {
1557 ta_[TraceOperationType::kPut].sample_count = 0;
1558 }
1559 if (ta_[TraceOperationType::kPut].sample_count > 0) {
1560 ta_[TraceOperationType::kPut].sample_count++;
1561 return Status::OK();
1562 }
1563 ta_[TraceOperationType::kPut].sample_count++;
1564
1565 if (!ta_[TraceOperationType::kPut].enabled) {
1566 return Status::OK();
1567 }
1568 s = KeyStatsInsertion(TraceOperationType::kPut, column_family_id,
1569 key.ToString(), value_size, c_time_);
1570 if (!s.ok()) {
1571 return Status::Corruption("Failed to insert key statistics");
1572 }
1573 return s;
1574 }
1575
1576 // Handle the Delete request in the write batch of the trace
HandleDelete(uint32_t column_family_id,const Slice & key)1577 Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
1578 const Slice& key) {
1579 Status s;
1580 size_t value_size = 0;
1581 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1582 s = WriteTraceSequence(TraceOperationType::kDelete, column_family_id,
1583 key.ToString(), value_size, c_time_);
1584 if (!s.ok()) {
1585 return Status::Corruption("Failed to write the trace sequence to file");
1586 }
1587 }
1588
1589 if (ta_[TraceOperationType::kDelete].sample_count >= sample_max_) {
1590 ta_[TraceOperationType::kDelete].sample_count = 0;
1591 }
1592 if (ta_[TraceOperationType::kDelete].sample_count > 0) {
1593 ta_[TraceOperationType::kDelete].sample_count++;
1594 return Status::OK();
1595 }
1596 ta_[TraceOperationType::kDelete].sample_count++;
1597
1598 if (!ta_[TraceOperationType::kDelete].enabled) {
1599 return Status::OK();
1600 }
1601 s = KeyStatsInsertion(TraceOperationType::kDelete, column_family_id,
1602 key.ToString(), value_size, c_time_);
1603 if (!s.ok()) {
1604 return Status::Corruption("Failed to insert key statistics");
1605 }
1606 return s;
1607 }
1608
1609 // Handle the SingleDelete request in the write batch of the trace
HandleSingleDelete(uint32_t column_family_id,const Slice & key)1610 Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
1611 const Slice& key) {
1612 Status s;
1613 size_t value_size = 0;
1614 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1615 s = WriteTraceSequence(TraceOperationType::kSingleDelete, column_family_id,
1616 key.ToString(), value_size, c_time_);
1617 if (!s.ok()) {
1618 return Status::Corruption("Failed to write the trace sequence to file");
1619 }
1620 }
1621
1622 if (ta_[TraceOperationType::kSingleDelete].sample_count >= sample_max_) {
1623 ta_[TraceOperationType::kSingleDelete].sample_count = 0;
1624 }
1625 if (ta_[TraceOperationType::kSingleDelete].sample_count > 0) {
1626 ta_[TraceOperationType::kSingleDelete].sample_count++;
1627 return Status::OK();
1628 }
1629 ta_[TraceOperationType::kSingleDelete].sample_count++;
1630
1631 if (!ta_[TraceOperationType::kSingleDelete].enabled) {
1632 return Status::OK();
1633 }
1634 s = KeyStatsInsertion(TraceOperationType::kSingleDelete, column_family_id,
1635 key.ToString(), value_size, c_time_);
1636 if (!s.ok()) {
1637 return Status::Corruption("Failed to insert key statistics");
1638 }
1639 return s;
1640 }
1641
1642 // Handle the DeleteRange request in the write batch of the trace
HandleDeleteRange(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1643 Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
1644 const Slice& begin_key,
1645 const Slice& end_key) {
1646 Status s;
1647 size_t value_size = 0;
1648 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1649 s = WriteTraceSequence(TraceOperationType::kRangeDelete, column_family_id,
1650 begin_key.ToString(), value_size, c_time_);
1651 if (!s.ok()) {
1652 return Status::Corruption("Failed to write the trace sequence to file");
1653 }
1654 }
1655
1656 if (ta_[TraceOperationType::kRangeDelete].sample_count >= sample_max_) {
1657 ta_[TraceOperationType::kRangeDelete].sample_count = 0;
1658 }
1659 if (ta_[TraceOperationType::kRangeDelete].sample_count > 0) {
1660 ta_[TraceOperationType::kRangeDelete].sample_count++;
1661 return Status::OK();
1662 }
1663 ta_[TraceOperationType::kRangeDelete].sample_count++;
1664
1665 if (!ta_[TraceOperationType::kRangeDelete].enabled) {
1666 return Status::OK();
1667 }
1668 s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
1669 begin_key.ToString(), value_size, c_time_);
1670 s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
1671 end_key.ToString(), value_size, c_time_);
1672 if (!s.ok()) {
1673 return Status::Corruption("Failed to insert key statistics");
1674 }
1675 return s;
1676 }
1677
1678 // Handle the Merge request in the write batch of the trace
HandleMerge(uint32_t column_family_id,const Slice & key,const Slice & value)1679 Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
1680 const Slice& value) {
1681 Status s;
1682 size_t value_size = value.ToString().size();
1683 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1684 s = WriteTraceSequence(TraceOperationType::kMerge, column_family_id,
1685 key.ToString(), value_size, c_time_);
1686 if (!s.ok()) {
1687 return Status::Corruption("Failed to write the trace sequence to file");
1688 }
1689 }
1690
1691 if (ta_[TraceOperationType::kMerge].sample_count >= sample_max_) {
1692 ta_[TraceOperationType::kMerge].sample_count = 0;
1693 }
1694 if (ta_[TraceOperationType::kMerge].sample_count > 0) {
1695 ta_[TraceOperationType::kMerge].sample_count++;
1696 return Status::OK();
1697 }
1698 ta_[TraceOperationType::kMerge].sample_count++;
1699
1700 if (!ta_[TraceOperationType::kMerge].enabled) {
1701 return Status::OK();
1702 }
1703 s = KeyStatsInsertion(TraceOperationType::kMerge, column_family_id,
1704 key.ToString(), value_size, c_time_);
1705 if (!s.ok()) {
1706 return Status::Corruption("Failed to insert key statistics");
1707 }
1708 return s;
1709 }
1710
1711 // Handle the Iterator request in the trace
HandleIter(uint32_t column_family_id,const std::string & key,const uint64_t & ts,TraceType & trace_type)1712 Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
1713 const std::string& key, const uint64_t& ts,
1714 TraceType& trace_type) {
1715 Status s;
1716 size_t value_size = 0;
1717 int type = -1;
1718 if (trace_type == kTraceIteratorSeek) {
1719 type = TraceOperationType::kIteratorSeek;
1720 } else if (trace_type == kTraceIteratorSeekForPrev) {
1721 type = TraceOperationType::kIteratorSeekForPrev;
1722 } else {
1723 return s;
1724 }
1725 if (type == -1) {
1726 return s;
1727 }
1728
1729 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1730 s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
1731 if (!s.ok()) {
1732 return Status::Corruption("Failed to write the trace sequence to file");
1733 }
1734 }
1735
1736 if (ta_[type].sample_count >= sample_max_) {
1737 ta_[type].sample_count = 0;
1738 }
1739 if (ta_[type].sample_count > 0) {
1740 ta_[type].sample_count++;
1741 return Status::OK();
1742 }
1743 ta_[type].sample_count++;
1744
1745 if (!ta_[type].enabled) {
1746 return Status::OK();
1747 }
1748 s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
1749 if (!s.ok()) {
1750 return Status::Corruption("Failed to insert key statistics");
1751 }
1752 return s;
1753 }
1754
1755 // Before the analyzer is closed, the requested general statistic results are
1756 // printed out here. In current stage, these information are not output to
1757 // the files.
1758 // -----type
1759 // |__cf_id
1760 // |_statistics
PrintStatistics()1761 void TraceAnalyzer::PrintStatistics() {
1762 for (int type = 0; type < kTaTypeNum; type++) {
1763 if (!ta_[type].enabled) {
1764 continue;
1765 }
1766 ta_[type].total_keys = 0;
1767 ta_[type].total_access = 0;
1768 ta_[type].total_succ_access = 0;
1769 printf("\n################# Operation Type: %s #####################\n",
1770 ta_[type].type_name.c_str());
1771 if (qps_ave_.size() == kTaTypeNum + 1) {
1772 printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type],
1773 qps_ave_[type]);
1774 }
1775 for (auto& stat_it : ta_[type].stats) {
1776 if (stat_it.second.a_count == 0) {
1777 continue;
1778 }
1779 TraceStats& stat = stat_it.second;
1780 uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size());
1781 double key_size_ave = 0.0;
1782 double value_size_ave = 0.0;
1783 double key_size_vari = 0.0;
1784 double value_size_vari = 0.0;
1785 if (stat.a_count > 0) {
1786 key_size_ave =
1787 (static_cast<double>(stat.a_key_size_sum)) / stat.a_count;
1788 value_size_ave =
1789 (static_cast<double>(stat.a_value_size_sum)) / stat.a_count;
1790 key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) /
1791 stat.a_count -
1792 key_size_ave * key_size_ave);
1793 value_size_vari = std::sqrt(
1794 (static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count -
1795 value_size_ave * value_size_ave);
1796 }
1797 if (value_size_ave == 0.0) {
1798 stat.a_value_mid = 0;
1799 }
1800 cfs_[stat.cf_id].a_count += total_a_keys;
1801 ta_[type].total_keys += total_a_keys;
1802 ta_[type].total_access += stat.a_count;
1803 ta_[type].total_succ_access += stat.a_succ_count;
1804 printf("*********************************************************\n");
1805 printf("colume family id: %u\n", stat.cf_id);
1806 printf("Total number of queries to this cf by %s: %" PRIu64 "\n",
1807 ta_[type].type_name.c_str(), stat.a_count);
1808 printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys);
1809 printf("Average key size: %f key size medium: %" PRIu64
1810 " Key size Variation: %f\n",
1811 key_size_ave, stat.a_key_mid, key_size_vari);
1812 if (type == kPut || type == kMerge) {
1813 printf("Average value size: %f Value size medium: %" PRIu64
1814 " Value size variation: %f\n",
1815 value_size_ave, stat.a_value_mid, value_size_vari);
1816 }
1817 printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps,
1818 stat.a_ave_qps);
1819
1820 // print the top k accessed key and its access count
1821 if (FLAGS_print_top_k_access > 0) {
1822 printf("The Top %d keys that are accessed:\n",
1823 FLAGS_print_top_k_access);
1824 while (!stat.top_k_queue.empty()) {
1825 std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(
1826 stat.top_k_queue.top().second);
1827 printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first,
1828 hex_key.c_str());
1829 stat.top_k_queue.pop();
1830 }
1831 }
1832
1833 // print the top k access prefix range and
1834 // top k prefix range with highest average access per key
1835 if (FLAGS_output_prefix_cut > 0) {
1836 printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access);
1837 while (!stat.top_k_prefix_access.empty()) {
1838 printf("Prefix: %s Access count: %" PRIu64 "\n",
1839 stat.top_k_prefix_access.top().second.c_str(),
1840 stat.top_k_prefix_access.top().first);
1841 stat.top_k_prefix_access.pop();
1842 }
1843
1844 printf("The Top %d prefix with highest access per key:\n",
1845 FLAGS_print_top_k_access);
1846 while (!stat.top_k_prefix_ave.empty()) {
1847 printf("Prefix: %s access per key: %f\n",
1848 stat.top_k_prefix_ave.top().second.c_str(),
1849 stat.top_k_prefix_ave.top().first);
1850 stat.top_k_prefix_ave.pop();
1851 }
1852 }
1853
1854 // print the operation correlations
1855 if (!FLAGS_print_correlation.empty()) {
1856 for (int correlation = 0;
1857 correlation <
1858 static_cast<int>(analyzer_opts_.correlation_list.size());
1859 correlation++) {
1860 printf(
1861 "The correlation statistics of '%s' after '%s' is:",
1862 taIndexToOpt[analyzer_opts_.correlation_list[correlation].second]
1863 .c_str(),
1864 taIndexToOpt[analyzer_opts_.correlation_list[correlation].first]
1865 .c_str());
1866 double correlation_ave = 0.0;
1867 if (stat.correlation_output[correlation].first > 0) {
1868 correlation_ave =
1869 (static_cast<double>(
1870 stat.correlation_output[correlation].second)) /
1871 (stat.correlation_output[correlation].first * 1000);
1872 }
1873 printf(" total numbers: %" PRIu64 " average time: %f(ms)\n",
1874 stat.correlation_output[correlation].first, correlation_ave);
1875 }
1876 }
1877 }
1878 printf("*********************************************************\n");
1879 printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(),
1880 ta_[type].total_keys);
1881 printf("Total access is: %" PRIu64 "\n", ta_[type].total_access);
1882 total_access_keys_ += ta_[type].total_keys;
1883 }
1884
1885 // Print the overall statistic information of the trace
1886 printf("\n*********************************************************\n");
1887 printf("*********************************************************\n");
1888 printf("The column family based statistics\n");
1889 for (auto& cf : cfs_) {
1890 printf("The column family id: %u\n", cf.first);
1891 printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count);
1892 printf("The accessed key space key numbers: %" PRIu64 "\n",
1893 cf.second.a_count);
1894 }
1895
1896 if (FLAGS_print_overall_stats) {
1897 printf("\n*********************************************************\n");
1898 printf("*********************************************************\n");
1899 if (qps_peak_.size() == kTaTypeNum + 1) {
1900 printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum],
1901 qps_peak_[kTaTypeNum]);
1902 }
1903 printf("The statistics related to query number need to times: %u\n",
1904 sample_max_);
1905 printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
1906 " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
1907 total_requests_, total_access_keys_, total_gets_, total_writes_);
1908 for (int type = 0; type < kTaTypeNum; type++) {
1909 if (!ta_[type].enabled) {
1910 continue;
1911 }
1912 printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(),
1913 ta_[type].total_access);
1914 }
1915 }
1916 }
1917
1918 // Write the trace sequence to file
WriteTraceSequence(const uint32_t & type,const uint32_t & cf_id,const std::string & key,const size_t value_size,const uint64_t ts)1919 Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
1920 const uint32_t& cf_id,
1921 const std::string& key,
1922 const size_t value_size,
1923 const uint64_t ts) {
1924 std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key);
1925 int ret;
1926 ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
1927 cf_id, value_size, ts);
1928 if (ret < 0) {
1929 return Status::IOError("failed to format the output");
1930 }
1931 std::string printout(buffer_);
1932 if (!FLAGS_no_key) {
1933 printout = hex_key + " " + printout;
1934 }
1935 return trace_sequence_f_->Append(printout);
1936 }
1937
1938 // The entrance function of Trace_Analyzer
trace_analyzer_tool(int argc,char ** argv)1939 int trace_analyzer_tool(int argc, char** argv) {
1940 std::string trace_path;
1941 std::string output_path;
1942
1943 AnalyzerOptions analyzer_opts;
1944
1945 ParseCommandLineFlags(&argc, &argv, true);
1946
1947 if (!FLAGS_print_correlation.empty()) {
1948 analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation);
1949 }
1950
1951 std::unique_ptr<TraceAnalyzer> analyzer(
1952 new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts));
1953
1954 if (!analyzer) {
1955 fprintf(stderr, "Cannot initiate the trace analyzer\n");
1956 exit(1);
1957 }
1958
1959 ROCKSDB_NAMESPACE::Status s = analyzer->PrepareProcessing();
1960 if (!s.ok()) {
1961 fprintf(stderr, "%s\n", s.getState());
1962 fprintf(stderr, "Cannot initiate the trace reader\n");
1963 exit(1);
1964 }
1965
1966 s = analyzer->StartProcessing();
1967 if (!s.ok() && !FLAGS_try_process_corrupted_trace) {
1968 fprintf(stderr, "%s\n", s.getState());
1969 fprintf(stderr, "Cannot processing the trace\n");
1970 exit(1);
1971 }
1972
1973 s = analyzer->MakeStatistics();
1974 if (!s.ok()) {
1975 fprintf(stderr, "%s\n", s.getState());
1976 analyzer->EndProcessing();
1977 fprintf(stderr, "Cannot make the statistics\n");
1978 exit(1);
1979 }
1980
1981 s = analyzer->ReProcessing();
1982 if (!s.ok()) {
1983 fprintf(stderr, "%s\n", s.getState());
1984 fprintf(stderr, "Cannot re-process the trace for more statistics\n");
1985 analyzer->EndProcessing();
1986 exit(1);
1987 }
1988
1989 s = analyzer->EndProcessing();
1990 if (!s.ok()) {
1991 fprintf(stderr, "%s\n", s.getState());
1992 fprintf(stderr, "Cannot close the trace analyzer\n");
1993 exit(1);
1994 }
1995
1996 return 0;
1997 }
1998 } // namespace ROCKSDB_NAMESPACE
1999
2000 #endif // Endif of Gflag
2001 #endif // RocksDB LITE
2002