xref: /oneTBB/examples/graph/fgbzip2/fgbzip2.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include <iostream>
18 #include <fstream>
19 #include <string>
20 #include <memory>
21 #include <queue>
22 #include <thread>
23 
24 #include "bzlib.hpp"
25 
26 #include "common/utility/utility.hpp"
27 
28 #include "oneapi/tbb/flow_graph.h"
29 #include "oneapi/tbb/tick_count.h"
30 #include "oneapi/tbb/concurrent_queue.h"
31 
32 // TODO: change memory allocation/deallocation to be managed in constructor/destructor
33 struct Buffer {
34     std::size_t len;
35     char* b;
36 };
37 
38 struct BufferMsg {
BufferMsgBufferMsg39     BufferMsg() {}
BufferMsgBufferMsg40     BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, std::size_t seqId, bool isLast = false)
41             : inputBuffer(inputBuffer),
42               outputBuffer(outputBuffer),
43               seqId(seqId),
44               isLast(isLast) {}
45 
createBufferMsgBufferMsg46     static BufferMsg createBufferMsg(std::size_t seqId, std::size_t chunkSize) {
47         Buffer inputBuffer;
48         inputBuffer.b = new char[chunkSize];
49         inputBuffer.len = chunkSize;
50 
51         Buffer outputBuffer;
52         std::size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead
53         outputBuffer.b = new char[compressedChunkSize];
54         outputBuffer.len = compressedChunkSize;
55 
56         return BufferMsg(inputBuffer, outputBuffer, seqId);
57     }
58 
destroyBufferMsgBufferMsg59     static void destroyBufferMsg(const BufferMsg& destroyMsg) {
60         delete[] destroyMsg.inputBuffer.b;
61         delete[] destroyMsg.outputBuffer.b;
62     }
63 
markLastBufferMsg64     void markLast(std::size_t lastId) {
65         isLast = true;
66         seqId = lastId;
67     }
68 
69     std::size_t seqId;
70     Buffer inputBuffer;
71     Buffer outputBuffer;
72     bool isLast;
73 };
74 
75 class BufferCompressor {
76 public:
BufferCompressor(int blockSizeIn100KB)77     BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {}
78 
operator ()(BufferMsg buffer) const79     BufferMsg operator()(BufferMsg buffer) const {
80         if (!buffer.isLast) {
81             unsigned int outSize = buffer.outputBuffer.len;
82             BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b,
83                                      &outSize,
84                                      buffer.inputBuffer.b,
85                                      buffer.inputBuffer.len,
86                                      m_blockSize,
87                                      0,
88                                      30);
89             buffer.outputBuffer.len = outSize;
90         }
91         return buffer;
92     }
93 
94 private:
95     int m_blockSize;
96 };
97 
98 class IOOperations {
99 public:
IOOperations(std::ifstream & inputStream,std::ofstream & outputStream,std::size_t chunkSize)100     IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, std::size_t chunkSize)
101             : m_inputStream(inputStream),
102               m_outputStream(outputStream),
103               m_chunkSize(chunkSize),
104               m_chunksRead(0) {}
105 
readChunk(Buffer & buffer)106     void readChunk(Buffer& buffer) {
107         m_inputStream.read(buffer.b, m_chunkSize);
108         buffer.len = static_cast<std::size_t>(m_inputStream.gcount());
109         m_chunksRead++;
110     }
111 
writeChunk(const Buffer & buffer)112     void writeChunk(const Buffer& buffer) {
113         m_outputStream.write(buffer.b, buffer.len);
114     }
115 
chunksRead() const116     std::size_t chunksRead() const {
117         return m_chunksRead;
118     }
119 
chunkSize() const120     std::size_t chunkSize() const {
121         return m_chunkSize;
122     }
123 
hasDataToRead() const124     bool hasDataToRead() const {
125         return m_inputStream.is_open() && !m_inputStream.eof();
126     }
127 
128 private:
129     std::ifstream& m_inputStream;
130     std::ofstream& m_outputStream;
131 
132     std::size_t m_chunkSize;
133     std::size_t m_chunksRead;
134 };
135 
136 //-----------------------------------------------------------------------------------------------------------------------
137 //---------------------------------------Compression example based on async_node-----------------------------------------
138 //-----------------------------------------------------------------------------------------------------------------------
139 
140 typedef oneapi::tbb::flow::async_node<oneapi::tbb::flow::continue_msg, BufferMsg>
141     async_file_reader_node;
142 typedef oneapi::tbb::flow::async_node<BufferMsg, oneapi::tbb::flow::continue_msg>
143     async_file_writer_node;
144 
145 class AsyncNodeActivity {
146 public:
AsyncNodeActivity(IOOperations & io)147     AsyncNodeActivity(IOOperations& io)
148             : m_io(io),
149               m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {}
150 
~AsyncNodeActivity()151     ~AsyncNodeActivity() {
152         m_fileReaderThread.join();
153         m_fileWriterThread.join();
154     }
155 
submitRead(async_file_reader_node::gateway_type & gateway)156     void submitRead(async_file_reader_node::gateway_type& gateway) {
157         gateway.reserve_wait();
158         std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway))
159             .swap(m_fileReaderThread);
160     }
161 
submitWrite(const BufferMsg & bufferMsg)162     void submitWrite(const BufferMsg& bufferMsg) {
163         m_writeQueue.push(bufferMsg);
164     }
165 
166 private:
readingLoop(async_file_reader_node::gateway_type & gateway)167     void readingLoop(async_file_reader_node::gateway_type& gateway) {
168         while (m_io.hasDataToRead()) {
169             BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize());
170             m_io.readChunk(bufferMsg.inputBuffer);
171             gateway.try_put(bufferMsg);
172         }
173         sendLastMessage(gateway);
174         gateway.release_wait();
175     }
176 
writingLoop()177     void writingLoop() {
178         BufferMsg buffer;
179         m_writeQueue.pop(buffer);
180         while (!buffer.isLast) {
181             m_io.writeChunk(buffer.outputBuffer);
182             m_writeQueue.pop(buffer);
183         }
184     }
185 
sendLastMessage(async_file_reader_node::gateway_type & gateway)186     void sendLastMessage(async_file_reader_node::gateway_type& gateway) {
187         BufferMsg lastMsg;
188         lastMsg.markLast(m_io.chunksRead());
189         gateway.try_put(lastMsg);
190     }
191 
192     IOOperations& m_io;
193 
194     oneapi::tbb::concurrent_bounded_queue<BufferMsg> m_writeQueue;
195 
196     std::thread m_fileReaderThread;
197     std::thread m_fileWriterThread;
198 };
199 
fgCompressionAsyncNode(IOOperations & io,int blockSizeIn100KB)200 void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) {
201     oneapi::tbb::flow::graph g;
202 
203     AsyncNodeActivity asyncNodeActivity(io);
204 
205     async_file_reader_node file_reader(
206         g,
207         oneapi::tbb::flow::unlimited,
208         [&asyncNodeActivity](const oneapi::tbb::flow::continue_msg& msg,
209                              async_file_reader_node::gateway_type& gateway) {
210             asyncNodeActivity.submitRead(gateway);
211         });
212 
213     oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
214         g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
215 
216     oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g,
217                                                           [](const BufferMsg& bufferMsg) -> size_t {
218                                                               return bufferMsg.seqId;
219                                                           });
220 
221     // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
222     async_file_writer_node output_writer(
223         g,
224         oneapi::tbb::flow::serial,
225         [&asyncNodeActivity](const BufferMsg& bufferMsg,
226                              async_file_writer_node::gateway_type& gateway) {
227             asyncNodeActivity.submitWrite(bufferMsg);
228         });
229 
230     make_edge(file_reader, compressor);
231     make_edge(compressor, ordering);
232     make_edge(ordering, output_writer);
233 
234     file_reader.try_put(oneapi::tbb::flow::continue_msg());
235 
236     g.wait_for_all();
237 }
238 
239 //-----------------------------------------------------------------------------------------------------------------------
240 //---------------------------------------------Simple compression example------------------------------------------------
241 //-----------------------------------------------------------------------------------------------------------------------
242 
fgCompression(IOOperations & io,int blockSizeIn100KB)243 void fgCompression(IOOperations& io, int blockSizeIn100KB) {
244     oneapi::tbb::flow::graph g;
245 
246     oneapi::tbb::flow::input_node<BufferMsg> file_reader(
247         g, [&io](oneapi::tbb::flow_control& fc) -> BufferMsg {
248             if (io.hasDataToRead()) {
249                 BufferMsg bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize());
250                 io.readChunk(bufferMsg.inputBuffer);
251                 return bufferMsg;
252             }
253             fc.stop();
254             return BufferMsg{};
255         });
256     file_reader.activate();
257 
258     oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
259         g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
260 
261     oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g, [](const BufferMsg& buffer) -> size_t {
262         return buffer.seqId;
263     });
264 
265     oneapi::tbb::flow::function_node<BufferMsg> output_writer(
266         g, oneapi::tbb::flow::serial, [&io](const BufferMsg& bufferMsg) {
267             io.writeChunk(bufferMsg.outputBuffer);
268             BufferMsg::destroyBufferMsg(bufferMsg);
269         });
270 
271     make_edge(file_reader, compressor);
272     make_edge(compressor, ordering);
273     make_edge(ordering, output_writer);
274 
275     g.wait_for_all();
276 }
277 
278 //-----------------------------------------------------------------------------------------------------------------------
279 
endsWith(const std::string & str,const std::string & suffix)280 bool endsWith(const std::string& str, const std::string& suffix) {
281     return str.find(suffix, str.length() - suffix.length()) != std::string::npos;
282 }
283 
284 //-----------------------------------------------------------------------------------------------------------------------
285 
main(int argc,char * argv[])286 int main(int argc, char* argv[]) {
287     oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
288 
289     const std::string archiveExtension = ".bz2";
290     bool verbose = false;
291     bool asyncType;
292     std::string inputFileName;
293     int blockSizeIn100KB = 1; // block size in 100KB chunks
294     std::size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity
295 
296     utility::parse_cli_arguments(
297         argc,
298         argv,
299         utility::cli_argument_pack()
300             //"-h" option for displaying help is present implicitly
301             .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]")
302             .arg(verbose, "-v", "verbose mode")
303             .arg(memoryLimitIn1MB,
304                  "-l",
305                  "used memory limit for compression algorithm in 1MB (minimum) granularity")
306             .arg(asyncType, "-async", "use graph async_node-based implementation")
307             .positional_arg(inputFileName, "filename", "input file name"));
308 
309     if (inputFileName.empty()) {
310         throw std::invalid_argument(
311             "Input file name is not specified. Try 'fgbzip2 -h' for more information.");
312     }
313 
314     if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) {
315         throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information.");
316     }
317 
318     if (memoryLimitIn1MB < 1) {
319         throw std::invalid_argument(
320             "Incorrect memory limit size. Try 'fgbzip2 -h' for more information.");
321     }
322 
323     if (verbose)
324         std::cout << "Input file name: " << inputFileName << "\n";
325     if (endsWith(inputFileName, archiveExtension)) {
326         throw std::invalid_argument("Input file already have " + archiveExtension + " extension.");
327     }
328 
329     std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary);
330     if (!inputStream.is_open()) {
331         throw std::invalid_argument("Cannot open " + inputFileName + " file.");
332     }
333 
334     std::string outputFileName(inputFileName + archiveExtension);
335 
336     std::ofstream outputStream(outputFileName.c_str(),
337                                std::ios::out | std::ios::binary | std::ios::trunc);
338     if (!outputStream.is_open()) {
339         throw std::invalid_argument("Cannot open " + outputFileName + " file.");
340     }
341 
342     // General interface to work with I/O buffers operations
343     std::size_t chunkSize = blockSizeIn100KB * 100 * 1024;
344     IOOperations io(inputStream, outputStream, chunkSize);
345 
346     if (asyncType) {
347         if (verbose)
348             std::cout
349                 << "Running flow graph based compression algorithm with async_node based asynchronous IO operations."
350                 << "\n";
351         fgCompressionAsyncNode(io, blockSizeIn100KB);
352     }
353     else {
354         if (verbose)
355             std::cout << "Running flow graph based compression algorithm."
356                       << "\n";
357         fgCompression(io, blockSizeIn100KB);
358     }
359 
360     inputStream.close();
361     outputStream.close();
362 
363     utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
364 
365     return 0;
366 }
367