xref: /oneTBB/examples/graph/fgbzip2/fgbzip2.cpp (revision b15aabb3)
1d86ed7fbStbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
3d86ed7fbStbbdev 
4d86ed7fbStbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5d86ed7fbStbbdev     you may not use this file except in compliance with the License.
6d86ed7fbStbbdev     You may obtain a copy of the License at
7d86ed7fbStbbdev 
8d86ed7fbStbbdev         http://www.apache.org/licenses/LICENSE-2.0
9d86ed7fbStbbdev 
10d86ed7fbStbbdev     Unless required by applicable law or agreed to in writing, software
11d86ed7fbStbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12d86ed7fbStbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13d86ed7fbStbbdev     See the License for the specific language governing permissions and
14d86ed7fbStbbdev     limitations under the License.
15d86ed7fbStbbdev */
16d86ed7fbStbbdev 
17d86ed7fbStbbdev #include <iostream>
18d86ed7fbStbbdev #include <fstream>
19d86ed7fbStbbdev #include <string>
20d86ed7fbStbbdev #include <memory>
21d86ed7fbStbbdev #include <queue>
22d86ed7fbStbbdev #include <thread>
23d86ed7fbStbbdev 
24d86ed7fbStbbdev #include "bzlib.hpp"
25d86ed7fbStbbdev 
26d86ed7fbStbbdev #include "common/utility/utility.hpp"
27d86ed7fbStbbdev 
28d86ed7fbStbbdev #include "oneapi/tbb/flow_graph.h"
29d86ed7fbStbbdev #include "oneapi/tbb/tick_count.h"
30d86ed7fbStbbdev #include "oneapi/tbb/concurrent_queue.h"
31d86ed7fbStbbdev 
32d86ed7fbStbbdev // TODO: change memory allocation/deallocation to be managed in constructor/destructor
33d86ed7fbStbbdev struct Buffer {
34d86ed7fbStbbdev     std::size_t len;
35d86ed7fbStbbdev     char* b;
36d86ed7fbStbbdev };
37d86ed7fbStbbdev 
38d86ed7fbStbbdev struct BufferMsg {
BufferMsgBufferMsg39d86ed7fbStbbdev     BufferMsg() {}
BufferMsgBufferMsg40d86ed7fbStbbdev     BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, std::size_t seqId, bool isLast = false)
41d86ed7fbStbbdev             : inputBuffer(inputBuffer),
42d86ed7fbStbbdev               outputBuffer(outputBuffer),
43d86ed7fbStbbdev               seqId(seqId),
44d86ed7fbStbbdev               isLast(isLast) {}
45d86ed7fbStbbdev 
createBufferMsgBufferMsg46d86ed7fbStbbdev     static BufferMsg createBufferMsg(std::size_t seqId, std::size_t chunkSize) {
47d86ed7fbStbbdev         Buffer inputBuffer;
48d86ed7fbStbbdev         inputBuffer.b = new char[chunkSize];
49d86ed7fbStbbdev         inputBuffer.len = chunkSize;
50d86ed7fbStbbdev 
51d86ed7fbStbbdev         Buffer outputBuffer;
52d86ed7fbStbbdev         std::size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead
53d86ed7fbStbbdev         outputBuffer.b = new char[compressedChunkSize];
54d86ed7fbStbbdev         outputBuffer.len = compressedChunkSize;
55d86ed7fbStbbdev 
56d86ed7fbStbbdev         return BufferMsg(inputBuffer, outputBuffer, seqId);
57d86ed7fbStbbdev     }
58d86ed7fbStbbdev 
destroyBufferMsgBufferMsg59d86ed7fbStbbdev     static void destroyBufferMsg(const BufferMsg& destroyMsg) {
60d86ed7fbStbbdev         delete[] destroyMsg.inputBuffer.b;
61d86ed7fbStbbdev         delete[] destroyMsg.outputBuffer.b;
62d86ed7fbStbbdev     }
63d86ed7fbStbbdev 
markLastBufferMsg64d86ed7fbStbbdev     void markLast(std::size_t lastId) {
65d86ed7fbStbbdev         isLast = true;
66d86ed7fbStbbdev         seqId = lastId;
67d86ed7fbStbbdev     }
68d86ed7fbStbbdev 
69d86ed7fbStbbdev     std::size_t seqId;
70d86ed7fbStbbdev     Buffer inputBuffer;
71d86ed7fbStbbdev     Buffer outputBuffer;
72d86ed7fbStbbdev     bool isLast;
73d86ed7fbStbbdev };
74d86ed7fbStbbdev 
75d86ed7fbStbbdev class BufferCompressor {
76d86ed7fbStbbdev public:
BufferCompressor(int blockSizeIn100KB)77d86ed7fbStbbdev     BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {}
78d86ed7fbStbbdev 
operator ()(BufferMsg buffer) const79d86ed7fbStbbdev     BufferMsg operator()(BufferMsg buffer) const {
80d86ed7fbStbbdev         if (!buffer.isLast) {
81d86ed7fbStbbdev             unsigned int outSize = buffer.outputBuffer.len;
82d86ed7fbStbbdev             BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b,
83d86ed7fbStbbdev                                      &outSize,
84d86ed7fbStbbdev                                      buffer.inputBuffer.b,
85d86ed7fbStbbdev                                      buffer.inputBuffer.len,
86d86ed7fbStbbdev                                      m_blockSize,
87d86ed7fbStbbdev                                      0,
88d86ed7fbStbbdev                                      30);
89d86ed7fbStbbdev             buffer.outputBuffer.len = outSize;
90d86ed7fbStbbdev         }
91d86ed7fbStbbdev         return buffer;
92d86ed7fbStbbdev     }
93d86ed7fbStbbdev 
94d86ed7fbStbbdev private:
95d86ed7fbStbbdev     int m_blockSize;
96d86ed7fbStbbdev };
97d86ed7fbStbbdev 
98d86ed7fbStbbdev class IOOperations {
99d86ed7fbStbbdev public:
IOOperations(std::ifstream & inputStream,std::ofstream & outputStream,std::size_t chunkSize)100d86ed7fbStbbdev     IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, std::size_t chunkSize)
101d86ed7fbStbbdev             : m_inputStream(inputStream),
102d86ed7fbStbbdev               m_outputStream(outputStream),
103d86ed7fbStbbdev               m_chunkSize(chunkSize),
104d86ed7fbStbbdev               m_chunksRead(0) {}
105d86ed7fbStbbdev 
readChunk(Buffer & buffer)106d86ed7fbStbbdev     void readChunk(Buffer& buffer) {
107d86ed7fbStbbdev         m_inputStream.read(buffer.b, m_chunkSize);
108d86ed7fbStbbdev         buffer.len = static_cast<std::size_t>(m_inputStream.gcount());
109d86ed7fbStbbdev         m_chunksRead++;
110d86ed7fbStbbdev     }
111d86ed7fbStbbdev 
writeChunk(const Buffer & buffer)112d86ed7fbStbbdev     void writeChunk(const Buffer& buffer) {
113d86ed7fbStbbdev         m_outputStream.write(buffer.b, buffer.len);
114d86ed7fbStbbdev     }
115d86ed7fbStbbdev 
chunksRead() const116d86ed7fbStbbdev     std::size_t chunksRead() const {
117d86ed7fbStbbdev         return m_chunksRead;
118d86ed7fbStbbdev     }
119d86ed7fbStbbdev 
chunkSize() const120d86ed7fbStbbdev     std::size_t chunkSize() const {
121d86ed7fbStbbdev         return m_chunkSize;
122d86ed7fbStbbdev     }
123d86ed7fbStbbdev 
hasDataToRead() const124d86ed7fbStbbdev     bool hasDataToRead() const {
125d86ed7fbStbbdev         return m_inputStream.is_open() && !m_inputStream.eof();
126d86ed7fbStbbdev     }
127d86ed7fbStbbdev 
128d86ed7fbStbbdev private:
129d86ed7fbStbbdev     std::ifstream& m_inputStream;
130d86ed7fbStbbdev     std::ofstream& m_outputStream;
131d86ed7fbStbbdev 
132d86ed7fbStbbdev     std::size_t m_chunkSize;
133d86ed7fbStbbdev     std::size_t m_chunksRead;
134d86ed7fbStbbdev };
135d86ed7fbStbbdev 
136d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
137d86ed7fbStbbdev //---------------------------------------Compression example based on async_node-----------------------------------------
138d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
139d86ed7fbStbbdev 
140d86ed7fbStbbdev typedef oneapi::tbb::flow::async_node<oneapi::tbb::flow::continue_msg, BufferMsg>
141d86ed7fbStbbdev     async_file_reader_node;
142d86ed7fbStbbdev typedef oneapi::tbb::flow::async_node<BufferMsg, oneapi::tbb::flow::continue_msg>
143d86ed7fbStbbdev     async_file_writer_node;
144d86ed7fbStbbdev 
145d86ed7fbStbbdev class AsyncNodeActivity {
146d86ed7fbStbbdev public:
AsyncNodeActivity(IOOperations & io)147d86ed7fbStbbdev     AsyncNodeActivity(IOOperations& io)
148d86ed7fbStbbdev             : m_io(io),
149d86ed7fbStbbdev               m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {}
150d86ed7fbStbbdev 
~AsyncNodeActivity()151d86ed7fbStbbdev     ~AsyncNodeActivity() {
152d86ed7fbStbbdev         m_fileReaderThread.join();
153d86ed7fbStbbdev         m_fileWriterThread.join();
154d86ed7fbStbbdev     }
155d86ed7fbStbbdev 
submitRead(async_file_reader_node::gateway_type & gateway)156d86ed7fbStbbdev     void submitRead(async_file_reader_node::gateway_type& gateway) {
157d86ed7fbStbbdev         gateway.reserve_wait();
158d86ed7fbStbbdev         std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway))
159d86ed7fbStbbdev             .swap(m_fileReaderThread);
160d86ed7fbStbbdev     }
161d86ed7fbStbbdev 
submitWrite(const BufferMsg & bufferMsg)162d86ed7fbStbbdev     void submitWrite(const BufferMsg& bufferMsg) {
163d86ed7fbStbbdev         m_writeQueue.push(bufferMsg);
164d86ed7fbStbbdev     }
165d86ed7fbStbbdev 
166d86ed7fbStbbdev private:
readingLoop(async_file_reader_node::gateway_type & gateway)167d86ed7fbStbbdev     void readingLoop(async_file_reader_node::gateway_type& gateway) {
168d86ed7fbStbbdev         while (m_io.hasDataToRead()) {
169d86ed7fbStbbdev             BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize());
170d86ed7fbStbbdev             m_io.readChunk(bufferMsg.inputBuffer);
171d86ed7fbStbbdev             gateway.try_put(bufferMsg);
172d86ed7fbStbbdev         }
173d86ed7fbStbbdev         sendLastMessage(gateway);
174d86ed7fbStbbdev         gateway.release_wait();
175d86ed7fbStbbdev     }
176d86ed7fbStbbdev 
writingLoop()177d86ed7fbStbbdev     void writingLoop() {
178d86ed7fbStbbdev         BufferMsg buffer;
179d86ed7fbStbbdev         m_writeQueue.pop(buffer);
180d86ed7fbStbbdev         while (!buffer.isLast) {
181d86ed7fbStbbdev             m_io.writeChunk(buffer.outputBuffer);
182d86ed7fbStbbdev             m_writeQueue.pop(buffer);
183d86ed7fbStbbdev         }
184d86ed7fbStbbdev     }
185d86ed7fbStbbdev 
sendLastMessage(async_file_reader_node::gateway_type & gateway)186d86ed7fbStbbdev     void sendLastMessage(async_file_reader_node::gateway_type& gateway) {
187d86ed7fbStbbdev         BufferMsg lastMsg;
188d86ed7fbStbbdev         lastMsg.markLast(m_io.chunksRead());
189d86ed7fbStbbdev         gateway.try_put(lastMsg);
190d86ed7fbStbbdev     }
191d86ed7fbStbbdev 
192d86ed7fbStbbdev     IOOperations& m_io;
193d86ed7fbStbbdev 
194d86ed7fbStbbdev     oneapi::tbb::concurrent_bounded_queue<BufferMsg> m_writeQueue;
195d86ed7fbStbbdev 
196d86ed7fbStbbdev     std::thread m_fileReaderThread;
197d86ed7fbStbbdev     std::thread m_fileWriterThread;
198d86ed7fbStbbdev };
199d86ed7fbStbbdev 
fgCompressionAsyncNode(IOOperations & io,int blockSizeIn100KB)200d86ed7fbStbbdev void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) {
201d86ed7fbStbbdev     oneapi::tbb::flow::graph g;
202d86ed7fbStbbdev 
203d86ed7fbStbbdev     AsyncNodeActivity asyncNodeActivity(io);
204d86ed7fbStbbdev 
205d86ed7fbStbbdev     async_file_reader_node file_reader(
206d86ed7fbStbbdev         g,
207d86ed7fbStbbdev         oneapi::tbb::flow::unlimited,
208d86ed7fbStbbdev         [&asyncNodeActivity](const oneapi::tbb::flow::continue_msg& msg,
209d86ed7fbStbbdev                              async_file_reader_node::gateway_type& gateway) {
210d86ed7fbStbbdev             asyncNodeActivity.submitRead(gateway);
211d86ed7fbStbbdev         });
212d86ed7fbStbbdev 
213d86ed7fbStbbdev     oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
214d86ed7fbStbbdev         g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
215d86ed7fbStbbdev 
216d86ed7fbStbbdev     oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g,
217d86ed7fbStbbdev                                                           [](const BufferMsg& bufferMsg) -> size_t {
218d86ed7fbStbbdev                                                               return bufferMsg.seqId;
219d86ed7fbStbbdev                                                           });
220d86ed7fbStbbdev 
221d86ed7fbStbbdev     // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
222d86ed7fbStbbdev     async_file_writer_node output_writer(
223d86ed7fbStbbdev         g,
224d86ed7fbStbbdev         oneapi::tbb::flow::serial,
225d86ed7fbStbbdev         [&asyncNodeActivity](const BufferMsg& bufferMsg,
226d86ed7fbStbbdev                              async_file_writer_node::gateway_type& gateway) {
227d86ed7fbStbbdev             asyncNodeActivity.submitWrite(bufferMsg);
228d86ed7fbStbbdev         });
229d86ed7fbStbbdev 
230d86ed7fbStbbdev     make_edge(file_reader, compressor);
231d86ed7fbStbbdev     make_edge(compressor, ordering);
232d86ed7fbStbbdev     make_edge(ordering, output_writer);
233d86ed7fbStbbdev 
234d86ed7fbStbbdev     file_reader.try_put(oneapi::tbb::flow::continue_msg());
235d86ed7fbStbbdev 
236d86ed7fbStbbdev     g.wait_for_all();
237d86ed7fbStbbdev }
238d86ed7fbStbbdev 
239d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
240d86ed7fbStbbdev //---------------------------------------------Simple compression example------------------------------------------------
241d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
242d86ed7fbStbbdev 
fgCompression(IOOperations & io,int blockSizeIn100KB)243d86ed7fbStbbdev void fgCompression(IOOperations& io, int blockSizeIn100KB) {
244d86ed7fbStbbdev     oneapi::tbb::flow::graph g;
245d86ed7fbStbbdev 
246d86ed7fbStbbdev     oneapi::tbb::flow::input_node<BufferMsg> file_reader(
247d86ed7fbStbbdev         g, [&io](oneapi::tbb::flow_control& fc) -> BufferMsg {
248d86ed7fbStbbdev             if (io.hasDataToRead()) {
249d86ed7fbStbbdev                 BufferMsg bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize());
250d86ed7fbStbbdev                 io.readChunk(bufferMsg.inputBuffer);
251d86ed7fbStbbdev                 return bufferMsg;
252d86ed7fbStbbdev             }
253d86ed7fbStbbdev             fc.stop();
254d86ed7fbStbbdev             return BufferMsg{};
255d86ed7fbStbbdev         });
256d86ed7fbStbbdev     file_reader.activate();
257d86ed7fbStbbdev 
258d86ed7fbStbbdev     oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
259d86ed7fbStbbdev         g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
260d86ed7fbStbbdev 
261d86ed7fbStbbdev     oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g, [](const BufferMsg& buffer) -> size_t {
262d86ed7fbStbbdev         return buffer.seqId;
263d86ed7fbStbbdev     });
264d86ed7fbStbbdev 
265d86ed7fbStbbdev     oneapi::tbb::flow::function_node<BufferMsg> output_writer(
266d86ed7fbStbbdev         g, oneapi::tbb::flow::serial, [&io](const BufferMsg& bufferMsg) {
267d86ed7fbStbbdev             io.writeChunk(bufferMsg.outputBuffer);
268d86ed7fbStbbdev             BufferMsg::destroyBufferMsg(bufferMsg);
269d86ed7fbStbbdev         });
270d86ed7fbStbbdev 
271d86ed7fbStbbdev     make_edge(file_reader, compressor);
272d86ed7fbStbbdev     make_edge(compressor, ordering);
273d86ed7fbStbbdev     make_edge(ordering, output_writer);
274d86ed7fbStbbdev 
275d86ed7fbStbbdev     g.wait_for_all();
276d86ed7fbStbbdev }
277d86ed7fbStbbdev 
278d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
279d86ed7fbStbbdev 
endsWith(const std::string & str,const std::string & suffix)280d86ed7fbStbbdev bool endsWith(const std::string& str, const std::string& suffix) {
281d86ed7fbStbbdev     return str.find(suffix, str.length() - suffix.length()) != std::string::npos;
282d86ed7fbStbbdev }
283d86ed7fbStbbdev 
284d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
285d86ed7fbStbbdev 
main(int argc,char * argv[])286d86ed7fbStbbdev int main(int argc, char* argv[]) {
287d86ed7fbStbbdev     oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
288d86ed7fbStbbdev 
289d86ed7fbStbbdev     const std::string archiveExtension = ".bz2";
290d86ed7fbStbbdev     bool verbose = false;
291d86ed7fbStbbdev     bool asyncType;
292d86ed7fbStbbdev     std::string inputFileName;
293d86ed7fbStbbdev     int blockSizeIn100KB = 1; // block size in 100KB chunks
294d86ed7fbStbbdev     std::size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity
295d86ed7fbStbbdev 
296d86ed7fbStbbdev     utility::parse_cli_arguments(
297d86ed7fbStbbdev         argc,
298d86ed7fbStbbdev         argv,
299d86ed7fbStbbdev         utility::cli_argument_pack()
300d86ed7fbStbbdev             //"-h" option for displaying help is present implicitly
301d86ed7fbStbbdev             .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]")
302d86ed7fbStbbdev             .arg(verbose, "-v", "verbose mode")
303d86ed7fbStbbdev             .arg(memoryLimitIn1MB,
304d86ed7fbStbbdev                  "-l",
305d86ed7fbStbbdev                  "used memory limit for compression algorithm in 1MB (minimum) granularity")
306d86ed7fbStbbdev             .arg(asyncType, "-async", "use graph async_node-based implementation")
307d86ed7fbStbbdev             .positional_arg(inputFileName, "filename", "input file name"));
308d86ed7fbStbbdev 
309d86ed7fbStbbdev     if (inputFileName.empty()) {
310d86ed7fbStbbdev         throw std::invalid_argument(
311d86ed7fbStbbdev             "Input file name is not specified. Try 'fgbzip2 -h' for more information.");
312d86ed7fbStbbdev     }
313d86ed7fbStbbdev 
314d86ed7fbStbbdev     if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) {
315d86ed7fbStbbdev         throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information.");
316d86ed7fbStbbdev     }
317d86ed7fbStbbdev 
318d86ed7fbStbbdev     if (memoryLimitIn1MB < 1) {
319d86ed7fbStbbdev         throw std::invalid_argument(
320d86ed7fbStbbdev             "Incorrect memory limit size. Try 'fgbzip2 -h' for more information.");
321d86ed7fbStbbdev     }
322d86ed7fbStbbdev 
323d86ed7fbStbbdev     if (verbose)
324d86ed7fbStbbdev         std::cout << "Input file name: " << inputFileName << "\n";
325d86ed7fbStbbdev     if (endsWith(inputFileName, archiveExtension)) {
326d86ed7fbStbbdev         throw std::invalid_argument("Input file already have " + archiveExtension + " extension.");
327d86ed7fbStbbdev     }
328d86ed7fbStbbdev 
329d86ed7fbStbbdev     std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary);
330d86ed7fbStbbdev     if (!inputStream.is_open()) {
331d86ed7fbStbbdev         throw std::invalid_argument("Cannot open " + inputFileName + " file.");
332d86ed7fbStbbdev     }
333d86ed7fbStbbdev 
334d86ed7fbStbbdev     std::string outputFileName(inputFileName + archiveExtension);
335d86ed7fbStbbdev 
336d86ed7fbStbbdev     std::ofstream outputStream(outputFileName.c_str(),
337d86ed7fbStbbdev                                std::ios::out | std::ios::binary | std::ios::trunc);
338d86ed7fbStbbdev     if (!outputStream.is_open()) {
339d86ed7fbStbbdev         throw std::invalid_argument("Cannot open " + outputFileName + " file.");
340d86ed7fbStbbdev     }
341d86ed7fbStbbdev 
342d86ed7fbStbbdev     // General interface to work with I/O buffers operations
343d86ed7fbStbbdev     std::size_t chunkSize = blockSizeIn100KB * 100 * 1024;
344d86ed7fbStbbdev     IOOperations io(inputStream, outputStream, chunkSize);
345d86ed7fbStbbdev 
346d86ed7fbStbbdev     if (asyncType) {
347d86ed7fbStbbdev         if (verbose)
348d86ed7fbStbbdev             std::cout
349d86ed7fbStbbdev                 << "Running flow graph based compression algorithm with async_node based asynchronous IO operations."
350d86ed7fbStbbdev                 << "\n";
351d86ed7fbStbbdev         fgCompressionAsyncNode(io, blockSizeIn100KB);
352d86ed7fbStbbdev     }
353d86ed7fbStbbdev     else {
354d86ed7fbStbbdev         if (verbose)
355d86ed7fbStbbdev             std::cout << "Running flow graph based compression algorithm."
356d86ed7fbStbbdev                       << "\n";
357d86ed7fbStbbdev         fgCompression(io, blockSizeIn100KB);
358d86ed7fbStbbdev     }
359d86ed7fbStbbdev 
360d86ed7fbStbbdev     inputStream.close();
361d86ed7fbStbbdev     outputStream.close();
362d86ed7fbStbbdev 
363d86ed7fbStbbdev     utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
364d86ed7fbStbbdev 
365d86ed7fbStbbdev     return 0;
366d86ed7fbStbbdev }
367