1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include <iostream>
18 #include <fstream>
19 #include <string>
20 #include <memory>
21 #include <queue>
22 #include <thread>
23
24 #include "bzlib.hpp"
25
26 #include "common/utility/utility.hpp"
27
28 #include "oneapi/tbb/flow_graph.h"
29 #include "oneapi/tbb/tick_count.h"
30 #include "oneapi/tbb/concurrent_queue.h"
31
32 // TODO: change memory allocation/deallocation to be managed in constructor/destructor
33 struct Buffer {
34 std::size_t len;
35 char* b;
36 };
37
38 struct BufferMsg {
BufferMsgBufferMsg39 BufferMsg() {}
BufferMsgBufferMsg40 BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, std::size_t seqId, bool isLast = false)
41 : inputBuffer(inputBuffer),
42 outputBuffer(outputBuffer),
43 seqId(seqId),
44 isLast(isLast) {}
45
createBufferMsgBufferMsg46 static BufferMsg createBufferMsg(std::size_t seqId, std::size_t chunkSize) {
47 Buffer inputBuffer;
48 inputBuffer.b = new char[chunkSize];
49 inputBuffer.len = chunkSize;
50
51 Buffer outputBuffer;
52 std::size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead
53 outputBuffer.b = new char[compressedChunkSize];
54 outputBuffer.len = compressedChunkSize;
55
56 return BufferMsg(inputBuffer, outputBuffer, seqId);
57 }
58
destroyBufferMsgBufferMsg59 static void destroyBufferMsg(const BufferMsg& destroyMsg) {
60 delete[] destroyMsg.inputBuffer.b;
61 delete[] destroyMsg.outputBuffer.b;
62 }
63
markLastBufferMsg64 void markLast(std::size_t lastId) {
65 isLast = true;
66 seqId = lastId;
67 }
68
69 std::size_t seqId;
70 Buffer inputBuffer;
71 Buffer outputBuffer;
72 bool isLast;
73 };
74
75 class BufferCompressor {
76 public:
BufferCompressor(int blockSizeIn100KB)77 BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {}
78
operator ()(BufferMsg buffer) const79 BufferMsg operator()(BufferMsg buffer) const {
80 if (!buffer.isLast) {
81 unsigned int outSize = buffer.outputBuffer.len;
82 BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b,
83 &outSize,
84 buffer.inputBuffer.b,
85 buffer.inputBuffer.len,
86 m_blockSize,
87 0,
88 30);
89 buffer.outputBuffer.len = outSize;
90 }
91 return buffer;
92 }
93
94 private:
95 int m_blockSize;
96 };
97
98 class IOOperations {
99 public:
IOOperations(std::ifstream & inputStream,std::ofstream & outputStream,std::size_t chunkSize)100 IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, std::size_t chunkSize)
101 : m_inputStream(inputStream),
102 m_outputStream(outputStream),
103 m_chunkSize(chunkSize),
104 m_chunksRead(0) {}
105
readChunk(Buffer & buffer)106 void readChunk(Buffer& buffer) {
107 m_inputStream.read(buffer.b, m_chunkSize);
108 buffer.len = static_cast<std::size_t>(m_inputStream.gcount());
109 m_chunksRead++;
110 }
111
writeChunk(const Buffer & buffer)112 void writeChunk(const Buffer& buffer) {
113 m_outputStream.write(buffer.b, buffer.len);
114 }
115
chunksRead() const116 std::size_t chunksRead() const {
117 return m_chunksRead;
118 }
119
chunkSize() const120 std::size_t chunkSize() const {
121 return m_chunkSize;
122 }
123
hasDataToRead() const124 bool hasDataToRead() const {
125 return m_inputStream.is_open() && !m_inputStream.eof();
126 }
127
128 private:
129 std::ifstream& m_inputStream;
130 std::ofstream& m_outputStream;
131
132 std::size_t m_chunkSize;
133 std::size_t m_chunksRead;
134 };
135
136 //-----------------------------------------------------------------------------------------------------------------------
137 //---------------------------------------Compression example based on async_node-----------------------------------------
138 //-----------------------------------------------------------------------------------------------------------------------
139
140 typedef oneapi::tbb::flow::async_node<oneapi::tbb::flow::continue_msg, BufferMsg>
141 async_file_reader_node;
142 typedef oneapi::tbb::flow::async_node<BufferMsg, oneapi::tbb::flow::continue_msg>
143 async_file_writer_node;
144
145 class AsyncNodeActivity {
146 public:
AsyncNodeActivity(IOOperations & io)147 AsyncNodeActivity(IOOperations& io)
148 : m_io(io),
149 m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {}
150
~AsyncNodeActivity()151 ~AsyncNodeActivity() {
152 m_fileReaderThread.join();
153 m_fileWriterThread.join();
154 }
155
submitRead(async_file_reader_node::gateway_type & gateway)156 void submitRead(async_file_reader_node::gateway_type& gateway) {
157 gateway.reserve_wait();
158 std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway))
159 .swap(m_fileReaderThread);
160 }
161
submitWrite(const BufferMsg & bufferMsg)162 void submitWrite(const BufferMsg& bufferMsg) {
163 m_writeQueue.push(bufferMsg);
164 }
165
166 private:
readingLoop(async_file_reader_node::gateway_type & gateway)167 void readingLoop(async_file_reader_node::gateway_type& gateway) {
168 while (m_io.hasDataToRead()) {
169 BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize());
170 m_io.readChunk(bufferMsg.inputBuffer);
171 gateway.try_put(bufferMsg);
172 }
173 sendLastMessage(gateway);
174 gateway.release_wait();
175 }
176
writingLoop()177 void writingLoop() {
178 BufferMsg buffer;
179 m_writeQueue.pop(buffer);
180 while (!buffer.isLast) {
181 m_io.writeChunk(buffer.outputBuffer);
182 m_writeQueue.pop(buffer);
183 }
184 }
185
sendLastMessage(async_file_reader_node::gateway_type & gateway)186 void sendLastMessage(async_file_reader_node::gateway_type& gateway) {
187 BufferMsg lastMsg;
188 lastMsg.markLast(m_io.chunksRead());
189 gateway.try_put(lastMsg);
190 }
191
192 IOOperations& m_io;
193
194 oneapi::tbb::concurrent_bounded_queue<BufferMsg> m_writeQueue;
195
196 std::thread m_fileReaderThread;
197 std::thread m_fileWriterThread;
198 };
199
fgCompressionAsyncNode(IOOperations & io,int blockSizeIn100KB)200 void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) {
201 oneapi::tbb::flow::graph g;
202
203 AsyncNodeActivity asyncNodeActivity(io);
204
205 async_file_reader_node file_reader(
206 g,
207 oneapi::tbb::flow::unlimited,
208 [&asyncNodeActivity](const oneapi::tbb::flow::continue_msg& msg,
209 async_file_reader_node::gateway_type& gateway) {
210 asyncNodeActivity.submitRead(gateway);
211 });
212
213 oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
214 g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
215
216 oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g,
217 [](const BufferMsg& bufferMsg) -> size_t {
218 return bufferMsg.seqId;
219 });
220
221 // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
222 async_file_writer_node output_writer(
223 g,
224 oneapi::tbb::flow::serial,
225 [&asyncNodeActivity](const BufferMsg& bufferMsg,
226 async_file_writer_node::gateway_type& gateway) {
227 asyncNodeActivity.submitWrite(bufferMsg);
228 });
229
230 make_edge(file_reader, compressor);
231 make_edge(compressor, ordering);
232 make_edge(ordering, output_writer);
233
234 file_reader.try_put(oneapi::tbb::flow::continue_msg());
235
236 g.wait_for_all();
237 }
238
239 //-----------------------------------------------------------------------------------------------------------------------
240 //---------------------------------------------Simple compression example------------------------------------------------
241 //-----------------------------------------------------------------------------------------------------------------------
242
fgCompression(IOOperations & io,int blockSizeIn100KB)243 void fgCompression(IOOperations& io, int blockSizeIn100KB) {
244 oneapi::tbb::flow::graph g;
245
246 oneapi::tbb::flow::input_node<BufferMsg> file_reader(
247 g, [&io](oneapi::tbb::flow_control& fc) -> BufferMsg {
248 if (io.hasDataToRead()) {
249 BufferMsg bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize());
250 io.readChunk(bufferMsg.inputBuffer);
251 return bufferMsg;
252 }
253 fc.stop();
254 return BufferMsg{};
255 });
256 file_reader.activate();
257
258 oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor(
259 g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
260
261 oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g, [](const BufferMsg& buffer) -> size_t {
262 return buffer.seqId;
263 });
264
265 oneapi::tbb::flow::function_node<BufferMsg> output_writer(
266 g, oneapi::tbb::flow::serial, [&io](const BufferMsg& bufferMsg) {
267 io.writeChunk(bufferMsg.outputBuffer);
268 BufferMsg::destroyBufferMsg(bufferMsg);
269 });
270
271 make_edge(file_reader, compressor);
272 make_edge(compressor, ordering);
273 make_edge(ordering, output_writer);
274
275 g.wait_for_all();
276 }
277
278 //-----------------------------------------------------------------------------------------------------------------------
279
endsWith(const std::string & str,const std::string & suffix)280 bool endsWith(const std::string& str, const std::string& suffix) {
281 return str.find(suffix, str.length() - suffix.length()) != std::string::npos;
282 }
283
284 //-----------------------------------------------------------------------------------------------------------------------
285
main(int argc,char * argv[])286 int main(int argc, char* argv[]) {
287 oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now();
288
289 const std::string archiveExtension = ".bz2";
290 bool verbose = false;
291 bool asyncType;
292 std::string inputFileName;
293 int blockSizeIn100KB = 1; // block size in 100KB chunks
294 std::size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity
295
296 utility::parse_cli_arguments(
297 argc,
298 argv,
299 utility::cli_argument_pack()
300 //"-h" option for displaying help is present implicitly
301 .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]")
302 .arg(verbose, "-v", "verbose mode")
303 .arg(memoryLimitIn1MB,
304 "-l",
305 "used memory limit for compression algorithm in 1MB (minimum) granularity")
306 .arg(asyncType, "-async", "use graph async_node-based implementation")
307 .positional_arg(inputFileName, "filename", "input file name"));
308
309 if (inputFileName.empty()) {
310 throw std::invalid_argument(
311 "Input file name is not specified. Try 'fgbzip2 -h' for more information.");
312 }
313
314 if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) {
315 throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information.");
316 }
317
318 if (memoryLimitIn1MB < 1) {
319 throw std::invalid_argument(
320 "Incorrect memory limit size. Try 'fgbzip2 -h' for more information.");
321 }
322
323 if (verbose)
324 std::cout << "Input file name: " << inputFileName << "\n";
325 if (endsWith(inputFileName, archiveExtension)) {
326 throw std::invalid_argument("Input file already have " + archiveExtension + " extension.");
327 }
328
329 std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary);
330 if (!inputStream.is_open()) {
331 throw std::invalid_argument("Cannot open " + inputFileName + " file.");
332 }
333
334 std::string outputFileName(inputFileName + archiveExtension);
335
336 std::ofstream outputStream(outputFileName.c_str(),
337 std::ios::out | std::ios::binary | std::ios::trunc);
338 if (!outputStream.is_open()) {
339 throw std::invalid_argument("Cannot open " + outputFileName + " file.");
340 }
341
342 // General interface to work with I/O buffers operations
343 std::size_t chunkSize = blockSizeIn100KB * 100 * 1024;
344 IOOperations io(inputStream, outputStream, chunkSize);
345
346 if (asyncType) {
347 if (verbose)
348 std::cout
349 << "Running flow graph based compression algorithm with async_node based asynchronous IO operations."
350 << "\n";
351 fgCompressionAsyncNode(io, blockSizeIn100KB);
352 }
353 else {
354 if (verbose)
355 std::cout << "Running flow graph based compression algorithm."
356 << "\n";
357 fgCompression(io, blockSizeIn100KB);
358 }
359
360 inputStream.close();
361 outputStream.close();
362
363 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds());
364
365 return 0;
366 }
367