1d86ed7fbStbbdev /*
2*b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation
3d86ed7fbStbbdev
4d86ed7fbStbbdev Licensed under the Apache License, Version 2.0 (the "License");
5d86ed7fbStbbdev you may not use this file except in compliance with the License.
6d86ed7fbStbbdev You may obtain a copy of the License at
7d86ed7fbStbbdev
8d86ed7fbStbbdev http://www.apache.org/licenses/LICENSE-2.0
9d86ed7fbStbbdev
10d86ed7fbStbbdev Unless required by applicable law or agreed to in writing, software
11d86ed7fbStbbdev distributed under the License is distributed on an "AS IS" BASIS,
12d86ed7fbStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13d86ed7fbStbbdev See the License for the specific language governing permissions and
14d86ed7fbStbbdev limitations under the License.
15d86ed7fbStbbdev */
16d86ed7fbStbbdev
17d86ed7fbStbbdev #include <iostream>
18d86ed7fbStbbdev #include <fstream>
19d86ed7fbStbbdev #include <string>
20d86ed7fbStbbdev #include <memory>
21d86ed7fbStbbdev #include <queue>
22d86ed7fbStbbdev #include <thread>
23d86ed7fbStbbdev
24d86ed7fbStbbdev #include "bzlib.hpp"
25d86ed7fbStbbdev
26d86ed7fbStbbdev #include "common/utility/utility.hpp"
27d86ed7fbStbbdev
28d86ed7fbStbbdev #include "oneapi/tbb/flow_graph.h"
29d86ed7fbStbbdev #include "oneapi/tbb/tick_count.h"
30d86ed7fbStbbdev #include "oneapi/tbb/concurrent_queue.h"
31d86ed7fbStbbdev
32d86ed7fbStbbdev // TODO: change memory allocation/deallocation to be managed in constructor/destructor
33d86ed7fbStbbdev struct Buffer {
34d86ed7fbStbbdev std::size_t len;
35d86ed7fbStbbdev char* b;
36d86ed7fbStbbdev };
37d86ed7fbStbbdev
38d86ed7fbStbbdev struct BufferMsg {
BufferMsgBufferMsg39d86ed7fbStbbdev BufferMsg() {}
BufferMsgBufferMsg40d86ed7fbStbbdev BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, std::size_t seqId, bool isLast = false)
41d86ed7fbStbbdev : inputBuffer(inputBuffer),
42d86ed7fbStbbdev outputBuffer(outputBuffer),
43d86ed7fbStbbdev seqId(seqId),
44d86ed7fbStbbdev isLast(isLast) {}
45d86ed7fbStbbdev
createBufferMsgBufferMsg46d86ed7fbStbbdev static BufferMsg createBufferMsg(std::size_t seqId, std::size_t chunkSize) {
47d86ed7fbStbbdev Buffer inputBuffer;
48d86ed7fbStbbdev inputBuffer.b = new char[chunkSize];
49d86ed7fbStbbdev inputBuffer.len = chunkSize;
50d86ed7fbStbbdev
51d86ed7fbStbbdev Buffer outputBuffer;
52d86ed7fbStbbdev std::size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead
53d86ed7fbStbbdev outputBuffer.b = new char[compressedChunkSize];
54d86ed7fbStbbdev outputBuffer.len = compressedChunkSize;
55d86ed7fbStbbdev
56d86ed7fbStbbdev return BufferMsg(inputBuffer, outputBuffer, seqId);
57d86ed7fbStbbdev }
58d86ed7fbStbbdev
destroyBufferMsgBufferMsg59d86ed7fbStbbdev static void destroyBufferMsg(const BufferMsg& destroyMsg) {
60d86ed7fbStbbdev delete[] destroyMsg.inputBuffer.b;
61d86ed7fbStbbdev delete[] destroyMsg.outputBuffer.b;
62d86ed7fbStbbdev }
63d86ed7fbStbbdev
markLastBufferMsg64d86ed7fbStbbdev void markLast(std::size_t lastId) {
65d86ed7fbStbbdev isLast = true;
66d86ed7fbStbbdev seqId = lastId;
67d86ed7fbStbbdev }
68d86ed7fbStbbdev
69d86ed7fbStbbdev std::size_t seqId;
70d86ed7fbStbbdev Buffer inputBuffer;
71d86ed7fbStbbdev Buffer outputBuffer;
72d86ed7fbStbbdev bool isLast;
73d86ed7fbStbbdev };
74d86ed7fbStbbdev
75d86ed7fbStbbdev class BufferCompressor {
76d86ed7fbStbbdev public:
BufferCompressor(int blockSizeIn100KB)77d86ed7fbStbbdev BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {}
78d86ed7fbStbbdev
operator ()(BufferMsg buffer) const79d86ed7fbStbbdev BufferMsg operator()(BufferMsg buffer) const {
80d86ed7fbStbbdev if (!buffer.isLast) {
81d86ed7fbStbbdev unsigned int outSize = buffer.outputBuffer.len;
82d86ed7fbStbbdev BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b,
83d86ed7fbStbbdev &outSize,
84d86ed7fbStbbdev buffer.inputBuffer.b,
85d86ed7fbStbbdev buffer.inputBuffer.len,
86d86ed7fbStbbdev m_blockSize,
87d86ed7fbStbbdev 0,
88d86ed7fbStbbdev 30);
89d86ed7fbStbbdev buffer.outputBuffer.len = outSize;
90d86ed7fbStbbdev }
91d86ed7fbStbbdev return buffer;
92d86ed7fbStbbdev }
93d86ed7fbStbbdev
94d86ed7fbStbbdev private:
95d86ed7fbStbbdev int m_blockSize;
96d86ed7fbStbbdev };
97d86ed7fbStbbdev
98d86ed7fbStbbdev class IOOperations {
99d86ed7fbStbbdev public:
IOOperations(std::ifstream & inputStream,std::ofstream & outputStream,std::size_t chunkSize)100d86ed7fbStbbdev IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, std::size_t chunkSize)
101d86ed7fbStbbdev : m_inputStream(inputStream),
102d86ed7fbStbbdev m_outputStream(outputStream),
103d86ed7fbStbbdev m_chunkSize(chunkSize),
104d86ed7fbStbbdev m_chunksRead(0) {}
105d86ed7fbStbbdev
readChunk(Buffer & buffer)106d86ed7fbStbbdev void readChunk(Buffer& buffer) {
107d86ed7fbStbbdev m_inputStream.read(buffer.b, m_chunkSize);
108d86ed7fbStbbdev buffer.len = static_cast<std::size_t>(m_inputStream.gcount());
109d86ed7fbStbbdev m_chunksRead++;
110d86ed7fbStbbdev }
111d86ed7fbStbbdev
writeChunk(const Buffer & buffer)112d86ed7fbStbbdev void writeChunk(const Buffer& buffer) {
113d86ed7fbStbbdev m_outputStream.write(buffer.b, buffer.len);
114d86ed7fbStbbdev }
115d86ed7fbStbbdev
chunksRead() const116d86ed7fbStbbdev std::size_t chunksRead() const {
117d86ed7fbStbbdev return m_chunksRead;
118d86ed7fbStbbdev }
119d86ed7fbStbbdev
chunkSize() const120d86ed7fbStbbdev std::size_t chunkSize() const {
121d86ed7fbStbbdev return m_chunkSize;
122d86ed7fbStbbdev }
123d86ed7fbStbbdev
hasDataToRead() const124d86ed7fbStbbdev bool hasDataToRead() const {
125d86ed7fbStbbdev return m_inputStream.is_open() && !m_inputStream.eof();
126d86ed7fbStbbdev }
127d86ed7fbStbbdev
128d86ed7fbStbbdev private:
129d86ed7fbStbbdev std::ifstream& m_inputStream;
130d86ed7fbStbbdev std::ofstream& m_outputStream;
131d86ed7fbStbbdev
132d86ed7fbStbbdev std::size_t m_chunkSize;
133d86ed7fbStbbdev std::size_t m_chunksRead;
134d86ed7fbStbbdev };
135d86ed7fbStbbdev
136d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
137d86ed7fbStbbdev //---------------------------------------Compression example based on async_node-----------------------------------------
138d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
139d86ed7fbStbbdev
140d86ed7fbStbbdev typedef oneapi::tbb::flow::async_node<oneapi::tbb::flow::continue_msg, BufferMsg>
141d86ed7fbStbbdev async_file_reader_node;
142d86ed7fbStbbdev typedef oneapi::tbb::flow::async_node<BufferMsg, oneapi::tbb::flow::continue_msg>
143d86ed7fbStbbdev async_file_writer_node;
144d86ed7fbStbbdev
145d86ed7fbStbbdev class AsyncNodeActivity {
146d86ed7fbStbbdev public:
AsyncNodeActivity(IOOperations & io)147d86ed7fbStbbdev AsyncNodeActivity(IOOperations& io)
148d86ed7fbStbbdev : m_io(io),
149d86ed7fbStbbdev m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {}
150d86ed7fbStbbdev
~AsyncNodeActivity()151d86ed7fbStbbdev ~AsyncNodeActivity() {
152d86ed7fbStbbdev m_fileReaderThread.join();
153d86ed7fbStbbdev m_fileWriterThread.join();
154d86ed7fbStbbdev }
155d86ed7fbStbbdev
submitRead(async_file_reader_node::gateway_type & gateway)156d86ed7fbStbbdev void submitRead(async_file_reader_node::gateway_type& gateway) {
157d86ed7fbStbbdev gateway.reserve_wait();
158d86ed7fbStbbdev std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway))
159d86ed7fbStbbdev .swap(m_fileReaderThread);
160d86ed7fbStbbdev }
161d86ed7fbStbbdev
submitWrite(const BufferMsg & bufferMsg)162d86ed7fbStbbdev void submitWrite(const BufferMsg& bufferMsg) {
163d86ed7fbStbbdev m_writeQueue.push(bufferMsg);
164d86ed7fbStbbdev }
165d86ed7fbStbbdev
166d86ed7fbStbbdev private:
readingLoop(async_file_reader_node::gateway_type & gateway)167d86ed7fbStbbdev void readingLoop(async_file_reader_node::gateway_type& gateway) {
168d86ed7fbStbbdev while (m_io.hasDataToRead()) {
169d86ed7fbStbbdev BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize());
170d86ed7fbStbbdev m_io.readChunk(bufferMsg.inputBuffer);
171d86ed7fbStbbdev gateway.try_put(bufferMsg);
172d86ed7fbStbbdev }
173d86ed7fbStbbdev sendLastMessage(gateway);
174d86ed7fbStbbdev gateway.release_wait();
175d86ed7fbStbbdev }
176d86ed7fbStbbdev
writingLoop()177d86ed7fbStbbdev void writingLoop() {
178d86ed7fbStbbdev BufferMsg buffer;
179d86ed7fbStbbdev m_writeQueue.pop(buffer);
180d86ed7fbStbbdev while (!buffer.isLast) {
181d86ed7fbStbbdev m_io.writeChunk(buffer.outputBuffer);
182d86ed7fbStbbdev m_writeQueue.pop(buffer);
183d86ed7fbStbbdev }
184d86ed7fbStbbdev }
185d86ed7fbStbbdev
sendLastMessage(async_file_reader_node::gateway_type & gateway)186d86ed7fbStbbdev void sendLastMessage(async_file_reader_node::gateway_type& gateway) {
187d86ed7fbStbbdev BufferMsg lastMsg;
188d86ed7fbStbbdev lastMsg.markLast(m_io.chunksRead());
189d86ed7fbStbbdev gateway.try_put(lastMsg);
190d86ed7fbStbbdev }
191d86ed7fbStbbdev
192d86ed7fbStbbdev IOOperations& m_io;
193d86ed7fbStbbdev
194d86ed7fbStbbdev oneapi::tbb::concurrent_bounded_queue<BufferMsg> m_writeQueue;
195d86ed7fbStbbdev
196d86ed7fbStbbdev std::thread m_fileReaderThread;
197d86ed7fbStbbdev std::thread m_fileWriterThread;
198d86ed7fbStbbdev };
199d86ed7fbStbbdev
fgCompressionAsyncNode(IOOperations & io,int blockSizeIn100KB)200d86ed7fbStbbdev void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) {
201d86ed7fbStbbdev oneapi::tbb::flow::graph g;
202d86ed7fbStbbdev
203d86ed7fbStbbdev AsyncNodeActivity asyncNodeActivity(io);
204d86ed7fbStbbdev
205d86ed7fbStbbdev async_file_reader_node file_reader(
206d86ed7fbStbbdev g,
207d86ed7fbStbbdev oneapi::tbb::flow::unlimited,
208d86ed7fbStbbdev [&asyncNodeActivity](const oneapi::tbb::flow::continue_msg& msg,
209d86ed7fbStbbdev async_file_reader_node::gateway_type& gateway) {
210d86ed7fbStbbdev asyncNodeActivity.submitRead(gateway);
211d86ed7fbStbbdev });
212d86ed7fbStbbdev
213d86ed7fbStbbdev oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
214d86ed7fbStbbdev g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
215d86ed7fbStbbdev
216d86ed7fbStbbdev oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g,
217d86ed7fbStbbdev [](const BufferMsg& bufferMsg) -> size_t {
218d86ed7fbStbbdev return bufferMsg.seqId;
219d86ed7fbStbbdev });
220d86ed7fbStbbdev
221d86ed7fbStbbdev // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
222d86ed7fbStbbdev async_file_writer_node output_writer(
223d86ed7fbStbbdev g,
224d86ed7fbStbbdev oneapi::tbb::flow::serial,
225d86ed7fbStbbdev [&asyncNodeActivity](const BufferMsg& bufferMsg,
226d86ed7fbStbbdev async_file_writer_node::gateway_type& gateway) {
227d86ed7fbStbbdev asyncNodeActivity.submitWrite(bufferMsg);
228d86ed7fbStbbdev });
229d86ed7fbStbbdev
230d86ed7fbStbbdev make_edge(file_reader, compressor);
231d86ed7fbStbbdev make_edge(compressor, ordering);
232d86ed7fbStbbdev make_edge(ordering, output_writer);
233d86ed7fbStbbdev
234d86ed7fbStbbdev file_reader.try_put(oneapi::tbb::flow::continue_msg());
235d86ed7fbStbbdev
236d86ed7fbStbbdev g.wait_for_all();
237d86ed7fbStbbdev }
238d86ed7fbStbbdev
239d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
240d86ed7fbStbbdev //---------------------------------------------Simple compression example------------------------------------------------
241d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
242d86ed7fbStbbdev
fgCompression(IOOperations & io,int blockSizeIn100KB)243d86ed7fbStbbdev void fgCompression(IOOperations& io, int blockSizeIn100KB) {
244d86ed7fbStbbdev oneapi::tbb::flow::graph g;
245d86ed7fbStbbdev
246d86ed7fbStbbdev oneapi::tbb::flow::input_node<BufferMsg> file_reader(
247d86ed7fbStbbdev g, [&io](oneapi::tbb::flow_control& fc) -> BufferMsg {
248d86ed7fbStbbdev if (io.hasDataToRead()) {
249d86ed7fbStbbdev BufferMsg bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize());
250d86ed7fbStbbdev io.readChunk(bufferMsg.inputBuffer);
251d86ed7fbStbbdev return bufferMsg;
252d86ed7fbStbbdev }
253d86ed7fbStbbdev fc.stop();
254d86ed7fbStbbdev return BufferMsg{};
255d86ed7fbStbbdev });
256d86ed7fbStbbdev file_reader.activate();
257d86ed7fbStbbdev
258d86ed7fbStbbdev oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
259d86ed7fbStbbdev g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
260d86ed7fbStbbdev
261d86ed7fbStbbdev oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g, [](const BufferMsg& buffer) -> size_t {
262d86ed7fbStbbdev return buffer.seqId;
263d86ed7fbStbbdev });
264d86ed7fbStbbdev
265d86ed7fbStbbdev oneapi::tbb::flow::function_node<BufferMsg> output_writer(
266d86ed7fbStbbdev g, oneapi::tbb::flow::serial, [&io](const BufferMsg& bufferMsg) {
267d86ed7fbStbbdev io.writeChunk(bufferMsg.outputBuffer);
268d86ed7fbStbbdev BufferMsg::destroyBufferMsg(bufferMsg);
269d86ed7fbStbbdev });
270d86ed7fbStbbdev
271d86ed7fbStbbdev make_edge(file_reader, compressor);
272d86ed7fbStbbdev make_edge(compressor, ordering);
273d86ed7fbStbbdev make_edge(ordering, output_writer);
274d86ed7fbStbbdev
275d86ed7fbStbbdev g.wait_for_all();
276d86ed7fbStbbdev }
277d86ed7fbStbbdev
278d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
279d86ed7fbStbbdev
endsWith(const std::string & str,const std::string & suffix)280d86ed7fbStbbdev bool endsWith(const std::string& str, const std::string& suffix) {
281d86ed7fbStbbdev return str.find(suffix, str.length() - suffix.length()) != std::string::npos;
282d86ed7fbStbbdev }
283d86ed7fbStbbdev
284d86ed7fbStbbdev //-----------------------------------------------------------------------------------------------------------------------
285d86ed7fbStbbdev
main(int argc,char * argv[])286d86ed7fbStbbdev int main(int argc, char* argv[]) {
287d86ed7fbStbbdev oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
288d86ed7fbStbbdev
289d86ed7fbStbbdev const std::string archiveExtension = ".bz2";
290d86ed7fbStbbdev bool verbose = false;
291d86ed7fbStbbdev bool asyncType;
292d86ed7fbStbbdev std::string inputFileName;
293d86ed7fbStbbdev int blockSizeIn100KB = 1; // block size in 100KB chunks
294d86ed7fbStbbdev std::size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity
295d86ed7fbStbbdev
296d86ed7fbStbbdev utility::parse_cli_arguments(
297d86ed7fbStbbdev argc,
298d86ed7fbStbbdev argv,
299d86ed7fbStbbdev utility::cli_argument_pack()
300d86ed7fbStbbdev //"-h" option for displaying help is present implicitly
301d86ed7fbStbbdev .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]")
302d86ed7fbStbbdev .arg(verbose, "-v", "verbose mode")
303d86ed7fbStbbdev .arg(memoryLimitIn1MB,
304d86ed7fbStbbdev "-l",
305d86ed7fbStbbdev "used memory limit for compression algorithm in 1MB (minimum) granularity")
306d86ed7fbStbbdev .arg(asyncType, "-async", "use graph async_node-based implementation")
307d86ed7fbStbbdev .positional_arg(inputFileName, "filename", "input file name"));
308d86ed7fbStbbdev
309d86ed7fbStbbdev if (inputFileName.empty()) {
310d86ed7fbStbbdev throw std::invalid_argument(
311d86ed7fbStbbdev "Input file name is not specified. Try 'fgbzip2 -h' for more information.");
312d86ed7fbStbbdev }
313d86ed7fbStbbdev
314d86ed7fbStbbdev if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) {
315d86ed7fbStbbdev throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information.");
316d86ed7fbStbbdev }
317d86ed7fbStbbdev
318d86ed7fbStbbdev if (memoryLimitIn1MB < 1) {
319d86ed7fbStbbdev throw std::invalid_argument(
320d86ed7fbStbbdev "Incorrect memory limit size. Try 'fgbzip2 -h' for more information.");
321d86ed7fbStbbdev }
322d86ed7fbStbbdev
323d86ed7fbStbbdev if (verbose)
324d86ed7fbStbbdev std::cout << "Input file name: " << inputFileName << "\n";
325d86ed7fbStbbdev if (endsWith(inputFileName, archiveExtension)) {
326d86ed7fbStbbdev throw std::invalid_argument("Input file already have " + archiveExtension + " extension.");
327d86ed7fbStbbdev }
328d86ed7fbStbbdev
329d86ed7fbStbbdev std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary);
330d86ed7fbStbbdev if (!inputStream.is_open()) {
331d86ed7fbStbbdev throw std::invalid_argument("Cannot open " + inputFileName + " file.");
332d86ed7fbStbbdev }
333d86ed7fbStbbdev
334d86ed7fbStbbdev std::string outputFileName(inputFileName + archiveExtension);
335d86ed7fbStbbdev
336d86ed7fbStbbdev std::ofstream outputStream(outputFileName.c_str(),
337d86ed7fbStbbdev std::ios::out | std::ios::binary | std::ios::trunc);
338d86ed7fbStbbdev if (!outputStream.is_open()) {
339d86ed7fbStbbdev throw std::invalid_argument("Cannot open " + outputFileName + " file.");
340d86ed7fbStbbdev }
341d86ed7fbStbbdev
342d86ed7fbStbbdev // General interface to work with I/O buffers operations
343d86ed7fbStbbdev std::size_t chunkSize = blockSizeIn100KB * 100 * 1024;
344d86ed7fbStbbdev IOOperations io(inputStream, outputStream, chunkSize);
345d86ed7fbStbbdev
346d86ed7fbStbbdev if (asyncType) {
347d86ed7fbStbbdev if (verbose)
348d86ed7fbStbbdev std::cout
349d86ed7fbStbbdev << "Running flow graph based compression algorithm with async_node based asynchronous IO operations."
350d86ed7fbStbbdev << "\n";
351d86ed7fbStbbdev fgCompressionAsyncNode(io, blockSizeIn100KB);
352d86ed7fbStbbdev }
353d86ed7fbStbbdev else {
354d86ed7fbStbbdev if (verbose)
355d86ed7fbStbbdev std::cout << "Running flow graph based compression algorithm."
356d86ed7fbStbbdev << "\n";
357d86ed7fbStbbdev fgCompression(io, blockSizeIn100KB);
358d86ed7fbStbbdev }
359d86ed7fbStbbdev
360d86ed7fbStbbdev inputStream.close();
361d86ed7fbStbbdev outputStream.close();
362d86ed7fbStbbdev
363d86ed7fbStbbdev utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
364d86ed7fbStbbdev
365d86ed7fbStbbdev return 0;
366d86ed7fbStbbdev }
367