1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 package org.rocksdb;
7 
8 import java.util.List;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.TimeUnit;
12 
13 /**
14  * <p>Helper class to collect DB statistics periodically at a period specified in
15  * constructor. Callback function (provided in constructor) is called with
16  * every statistics collection.</p>
17  *
18  * <p>Caller should call start() to start statistics collection. Shutdown() should
19  * be called to stop stats collection and should be called before statistics (
20  * provided in constructor) reference has been disposed.</p>
21  */
22 public class StatisticsCollector {
23   private final List<StatsCollectorInput> _statsCollectorInputList;
24   private final ExecutorService _executorService;
25   private final int _statsCollectionInterval;
26   private volatile boolean _isRunning = true;
27 
28   /**
29    * Constructor for statistics collector.
30    *
31    * @param statsCollectorInputList List of statistics collector input.
32    * @param statsCollectionIntervalInMilliSeconds Statistics collection time
33    *        period (specified in milliseconds).
34    */
StatisticsCollector( final List<StatsCollectorInput> statsCollectorInputList, final int statsCollectionIntervalInMilliSeconds)35   public StatisticsCollector(
36       final List<StatsCollectorInput> statsCollectorInputList,
37       final int statsCollectionIntervalInMilliSeconds) {
38     _statsCollectorInputList = statsCollectorInputList;
39     _statsCollectionInterval = statsCollectionIntervalInMilliSeconds;
40 
41     _executorService = Executors.newSingleThreadExecutor();
42   }
43 
start()44   public void start() {
45     _executorService.submit(collectStatistics());
46   }
47 
48   /**
49    * Shuts down statistics collector.
50    *
51    * @param shutdownTimeout Time in milli-seconds to wait for shutdown before
52    *        killing the collection process.
53    * @throws java.lang.InterruptedException thrown if Threads are interrupted.
54    */
shutDown(final int shutdownTimeout)55   public void shutDown(final int shutdownTimeout) throws InterruptedException {
56     _isRunning = false;
57 
58     _executorService.shutdownNow();
59     // Wait for collectStatistics runnable to finish so that disposal of
60     // statistics does not cause any exceptions to be thrown.
61     _executorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS);
62   }
63 
collectStatistics()64   private Runnable collectStatistics() {
65     return new Runnable() {
66 
67       @Override
68       public void run() {
69         while (_isRunning) {
70           try {
71             if(Thread.currentThread().isInterrupted()) {
72               break;
73             }
74             for(final StatsCollectorInput statsCollectorInput :
75                 _statsCollectorInputList) {
76               Statistics statistics = statsCollectorInput.getStatistics();
77               StatisticsCollectorCallback statsCallback =
78                   statsCollectorInput.getCallback();
79 
80               // Collect ticker data
81               for(final TickerType ticker : TickerType.values()) {
82                 if(ticker != TickerType.TICKER_ENUM_MAX) {
83                   final long tickerValue = statistics.getTickerCount(ticker);
84                   statsCallback.tickerCallback(ticker, tickerValue);
85                 }
86               }
87 
88               // Collect histogram data
89               for(final HistogramType histogramType : HistogramType.values()) {
90                 if(histogramType != HistogramType.HISTOGRAM_ENUM_MAX) {
91                   final HistogramData histogramData =
92                           statistics.getHistogramData(histogramType);
93                   statsCallback.histogramCallback(histogramType, histogramData);
94                 }
95               }
96             }
97 
98             Thread.sleep(_statsCollectionInterval);
99           }
100           catch (final InterruptedException e) {
101             Thread.currentThread().interrupt();
102             break;
103           }
104           catch (final Exception e) {
105             throw new RuntimeException("Error while calculating statistics", e);
106           }
107         }
108       }
109     };
110   }
111 }
112