1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include <iostream> 18 #include <fstream> 19 #include <string> 20 #include <memory> 21 #include <queue> 22 #include <thread> 23 24 #include "bzlib.hpp" 25 26 #include "common/utility/utility.hpp" 27 28 #include "oneapi/tbb/flow_graph.h" 29 #include "oneapi/tbb/tick_count.h" 30 #include "oneapi/tbb/concurrent_queue.h" 31 32 // TODO: change memory allocation/deallocation to be managed in constructor/destructor 33 struct Buffer { 34 std::size_t len; 35 char* b; 36 }; 37 38 struct BufferMsg { 39 BufferMsg() {} 40 BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, std::size_t seqId, bool isLast = false) 41 : inputBuffer(inputBuffer), 42 outputBuffer(outputBuffer), 43 seqId(seqId), 44 isLast(isLast) {} 45 46 static BufferMsg createBufferMsg(std::size_t seqId, std::size_t chunkSize) { 47 Buffer inputBuffer; 48 inputBuffer.b = new char[chunkSize]; 49 inputBuffer.len = chunkSize; 50 51 Buffer outputBuffer; 52 std::size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead 53 outputBuffer.b = new char[compressedChunkSize]; 54 outputBuffer.len = compressedChunkSize; 55 56 return BufferMsg(inputBuffer, outputBuffer, seqId); 57 } 58 59 static void destroyBufferMsg(const BufferMsg& destroyMsg) { 60 delete[] destroyMsg.inputBuffer.b; 61 delete[] destroyMsg.outputBuffer.b; 62 } 63 64 void markLast(std::size_t lastId) { 65 isLast = true; 66 seqId = lastId; 67 } 68 69 std::size_t seqId; 70 Buffer inputBuffer; 71 Buffer outputBuffer; 72 bool isLast; 73 }; 74 75 class BufferCompressor { 76 public: 77 BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {} 78 79 BufferMsg operator()(BufferMsg buffer) const { 80 if (!buffer.isLast) { 81 unsigned int outSize = buffer.outputBuffer.len; 82 BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b, 83 &outSize, 84 buffer.inputBuffer.b, 85 buffer.inputBuffer.len, 86 m_blockSize, 87 0, 88 30); 89 buffer.outputBuffer.len = outSize; 90 } 91 return buffer; 92 } 93 94 private: 95 int m_blockSize; 96 }; 97 98 class IOOperations { 99 public: 100 IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, std::size_t chunkSize) 101 : m_inputStream(inputStream), 102 m_outputStream(outputStream), 103 m_chunkSize(chunkSize), 104 m_chunksRead(0) {} 105 106 void readChunk(Buffer& buffer) { 107 m_inputStream.read(buffer.b, m_chunkSize); 108 buffer.len = static_cast<std::size_t>(m_inputStream.gcount()); 109 m_chunksRead++; 110 } 111 112 void writeChunk(const Buffer& buffer) { 113 m_outputStream.write(buffer.b, buffer.len); 114 } 115 116 std::size_t chunksRead() const { 117 return m_chunksRead; 118 } 119 120 std::size_t chunkSize() const { 121 return m_chunkSize; 122 } 123 124 bool hasDataToRead() const { 125 return m_inputStream.is_open() && !m_inputStream.eof(); 126 } 127 128 private: 129 std::ifstream& m_inputStream; 130 std::ofstream& m_outputStream; 131 132 std::size_t m_chunkSize; 133 std::size_t m_chunksRead; 134 }; 135 136 //----------------------------------------------------------------------------------------------------------------------- 137 //---------------------------------------Compression example based on async_node----------------------------------------- 138 //----------------------------------------------------------------------------------------------------------------------- 139 140 typedef oneapi::tbb::flow::async_node<oneapi::tbb::flow::continue_msg, BufferMsg> 141 async_file_reader_node; 142 typedef oneapi::tbb::flow::async_node<BufferMsg, oneapi::tbb::flow::continue_msg> 143 async_file_writer_node; 144 145 class AsyncNodeActivity { 146 public: 147 AsyncNodeActivity(IOOperations& io) 148 : m_io(io), 149 m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {} 150 151 ~AsyncNodeActivity() { 152 m_fileReaderThread.join(); 153 m_fileWriterThread.join(); 154 } 155 156 void submitRead(async_file_reader_node::gateway_type& gateway) { 157 gateway.reserve_wait(); 158 std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway)) 159 .swap(m_fileReaderThread); 160 } 161 162 void submitWrite(const BufferMsg& bufferMsg) { 163 m_writeQueue.push(bufferMsg); 164 } 165 166 private: 167 void readingLoop(async_file_reader_node::gateway_type& gateway) { 168 while (m_io.hasDataToRead()) { 169 BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize()); 170 m_io.readChunk(bufferMsg.inputBuffer); 171 gateway.try_put(bufferMsg); 172 } 173 sendLastMessage(gateway); 174 gateway.release_wait(); 175 } 176 177 void writingLoop() { 178 BufferMsg buffer; 179 m_writeQueue.pop(buffer); 180 while (!buffer.isLast) { 181 m_io.writeChunk(buffer.outputBuffer); 182 m_writeQueue.pop(buffer); 183 } 184 } 185 186 void sendLastMessage(async_file_reader_node::gateway_type& gateway) { 187 BufferMsg lastMsg; 188 lastMsg.markLast(m_io.chunksRead()); 189 gateway.try_put(lastMsg); 190 } 191 192 IOOperations& m_io; 193 194 oneapi::tbb::concurrent_bounded_queue<BufferMsg> m_writeQueue; 195 196 std::thread m_fileReaderThread; 197 std::thread m_fileWriterThread; 198 }; 199 200 void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) { 201 oneapi::tbb::flow::graph g; 202 203 AsyncNodeActivity asyncNodeActivity(io); 204 205 async_file_reader_node file_reader( 206 g, 207 oneapi::tbb::flow::unlimited, 208 [&asyncNodeActivity](const oneapi::tbb::flow::continue_msg& msg, 209 async_file_reader_node::gateway_type& gateway) { 210 asyncNodeActivity.submitRead(gateway); 211 }); 212 213 oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor( 214 g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB)); 215 216 oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g, 217 [](const BufferMsg& bufferMsg) -> size_t { 218 return bufferMsg.seqId; 219 }); 220 221 // The node is serial to preserve the right order of buffers set by the preceding sequencer_node 222 async_file_writer_node output_writer( 223 g, 224 oneapi::tbb::flow::serial, 225 [&asyncNodeActivity](const BufferMsg& bufferMsg, 226 async_file_writer_node::gateway_type& gateway) { 227 asyncNodeActivity.submitWrite(bufferMsg); 228 }); 229 230 make_edge(file_reader, compressor); 231 make_edge(compressor, ordering); 232 make_edge(ordering, output_writer); 233 234 file_reader.try_put(oneapi::tbb::flow::continue_msg()); 235 236 g.wait_for_all(); 237 } 238 239 //----------------------------------------------------------------------------------------------------------------------- 240 //---------------------------------------------Simple compression example------------------------------------------------ 241 //----------------------------------------------------------------------------------------------------------------------- 242 243 void fgCompression(IOOperations& io, int blockSizeIn100KB) { 244 oneapi::tbb::flow::graph g; 245 246 oneapi::tbb::flow::input_node<BufferMsg> file_reader( 247 g, [&io](oneapi::tbb::flow_control& fc) -> BufferMsg { 248 if (io.hasDataToRead()) { 249 BufferMsg bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize()); 250 io.readChunk(bufferMsg.inputBuffer); 251 return bufferMsg; 252 } 253 fc.stop(); 254 return BufferMsg{}; 255 }); 256 file_reader.activate(); 257 258 oneapi::tbb::flow::function_node<BufferMsg, BufferMsg> compressor( 259 g, oneapi::tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB)); 260 261 oneapi::tbb::flow::sequencer_node<BufferMsg> ordering(g, [](const BufferMsg& buffer) -> size_t { 262 return buffer.seqId; 263 }); 264 265 oneapi::tbb::flow::function_node<BufferMsg> output_writer( 266 g, oneapi::tbb::flow::serial, [&io](const BufferMsg& bufferMsg) { 267 io.writeChunk(bufferMsg.outputBuffer); 268 BufferMsg::destroyBufferMsg(bufferMsg); 269 }); 270 271 make_edge(file_reader, compressor); 272 make_edge(compressor, ordering); 273 make_edge(ordering, output_writer); 274 275 g.wait_for_all(); 276 } 277 278 //----------------------------------------------------------------------------------------------------------------------- 279 280 bool endsWith(const std::string& str, const std::string& suffix) { 281 return str.find(suffix, str.length() - suffix.length()) != std::string::npos; 282 } 283 284 //----------------------------------------------------------------------------------------------------------------------- 285 286 int main(int argc, char* argv[]) { 287 oneapi::tbb::tick_count mainStartTime = oneapi::tbb::tick_count::now(); 288 289 const std::string archiveExtension = ".bz2"; 290 bool verbose = false; 291 bool asyncType; 292 std::string inputFileName; 293 int blockSizeIn100KB = 1; // block size in 100KB chunks 294 std::size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity 295 296 utility::parse_cli_arguments( 297 argc, 298 argv, 299 utility::cli_argument_pack() 300 //"-h" option for displaying help is present implicitly 301 .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]") 302 .arg(verbose, "-v", "verbose mode") 303 .arg(memoryLimitIn1MB, 304 "-l", 305 "used memory limit for compression algorithm in 1MB (minimum) granularity") 306 .arg(asyncType, "-async", "use graph async_node-based implementation") 307 .positional_arg(inputFileName, "filename", "input file name")); 308 309 if (inputFileName.empty()) { 310 throw std::invalid_argument( 311 "Input file name is not specified. Try 'fgbzip2 -h' for more information."); 312 } 313 314 if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) { 315 throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information."); 316 } 317 318 if (memoryLimitIn1MB < 1) { 319 throw std::invalid_argument( 320 "Incorrect memory limit size. Try 'fgbzip2 -h' for more information."); 321 } 322 323 if (verbose) 324 std::cout << "Input file name: " << inputFileName << "\n"; 325 if (endsWith(inputFileName, archiveExtension)) { 326 throw std::invalid_argument("Input file already have " + archiveExtension + " extension."); 327 } 328 329 std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary); 330 if (!inputStream.is_open()) { 331 throw std::invalid_argument("Cannot open " + inputFileName + " file."); 332 } 333 334 std::string outputFileName(inputFileName + archiveExtension); 335 336 std::ofstream outputStream(outputFileName.c_str(), 337 std::ios::out | std::ios::binary | std::ios::trunc); 338 if (!outputStream.is_open()) { 339 throw std::invalid_argument("Cannot open " + outputFileName + " file."); 340 } 341 342 // General interface to work with I/O buffers operations 343 std::size_t chunkSize = blockSizeIn100KB * 100 * 1024; 344 IOOperations io(inputStream, outputStream, chunkSize); 345 346 if (asyncType) { 347 if (verbose) 348 std::cout 349 << "Running flow graph based compression algorithm with async_node based asynchronous IO operations." 350 << "\n"; 351 fgCompressionAsyncNode(io, blockSizeIn100KB); 352 } 353 else { 354 if (verbose) 355 std::cout << "Running flow graph based compression algorithm." 356 << "\n"; 357 fgCompression(io, blockSizeIn100KB); 358 } 359 360 inputStream.close(); 361 outputStream.close(); 362 363 utility::report_elapsed_time((oneapi::tbb::tick_count::now() - mainStartTime).seconds()); 364 365 return 0; 366 } 367