1 // The MIT License (MIT) 2 // 3 // Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev 4 // 5 // Permission is hereby granted, free of charge, to any person obtaining a copy 6 // of this software and associated documentation files (the "Software"), to deal 7 // in the Software without restriction, including without limitation the rights 8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 // copies of the Software, and to permit persons to whom the Software is 10 // furnished to do so, subject to the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included in 13 // all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 // THE SOFTWARE. 22 23 #pragma once 24 25 #include <MTTools.h> 26 #include <utility> 27 28 namespace MT 29 { 30 /// \class ConcurrentQueueLIFO 31 /// \brief Lock-Free Multi-Producer Multi-Consumer Queue with fixed capacity. 32 /// 33 /// based on Bounded MPMC queue article by Dmitry Vyukov 34 /// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue 35 /// 36 template<typename T, uint32 CAPACITY> 37 class LockFreeQueueMPMC 38 { 39 static const int32 ALIGNMENT = 16; 40 static const int32 ALIGNMENT_MASK = (ALIGNMENT-1); 41 static const uint32 MASK = (CAPACITY - 1); 42 43 struct Cell 44 { 45 Atomic32<uint32> sequence; 46 T data; 47 }; 48 49 // Raw memory buffer 50 byte rawMemory[ sizeof(Cell) * CAPACITY + ALIGNMENT ]; 51 52 // Prevent false sharing between threads 53 uint8 cacheline0[64]; 54 55 Cell* const buffer; 56 57 // Prevent false sharing between threads 58 uint8 cacheline1[64]; 59 60 Atomic32<uint32> enqueuePos; 61 62 // Prevent false sharing between threads 63 uint8 cacheline2[64]; 64 65 Atomic32<uint32> dequeuePos; 66 MoveCtor(T * element,T && val)67 inline void MoveCtor(T* element, T && val) 68 { 69 new(element) T(std::move(val)); 70 } 71 72 73 public: 74 75 MT_NOCOPYABLE(LockFreeQueueMPMC); 76 LockFreeQueueMPMC()77 LockFreeQueueMPMC() 78 : buffer( (Cell*)( ( (uintptr_t)&rawMemory[0] + ALIGNMENT_MASK ) & ~(uintptr_t)ALIGNMENT_MASK ) ) 79 { 80 static_assert( MT::StaticIsPow2<CAPACITY>::result, "LockFreeQueueMPMC capacity must be power of 2"); 81 82 for (uint32 i = 0; i < CAPACITY; i++) 83 { 84 buffer[i].sequence.StoreRelaxed(i); 85 } 86 87 enqueuePos.StoreRelaxed(0); 88 dequeuePos.StoreRelaxed(0); 89 } 90 TryPush(T && data)91 bool TryPush(T && data) 92 { 93 Cell* cell = nullptr; 94 95 uint32 pos = enqueuePos.LoadRelaxed(); 96 for(;;) 97 { 98 cell = &buffer[pos & MASK]; 99 100 uint32 seq = cell->sequence.Load(); 101 int32 dif = (int32)seq - (int32)pos; 102 103 if (dif == 0) 104 { 105 uint32 nowPos = enqueuePos.CompareAndSwap(pos, pos + 1); 106 if (nowPos == pos) 107 { 108 break; 109 } else 110 { 111 pos = nowPos; 112 } 113 } else 114 { 115 if (dif < 0) 116 { 117 return false; 118 } else 119 { 120 pos = enqueuePos.LoadRelaxed(); 121 } 122 } 123 } 124 125 // successfully found a cell 126 MoveCtor( &cell->data, std::move(data) ); 127 cell->sequence.Store(pos + 1); 128 return true; 129 } 130 131 TryPop(T & data)132 bool TryPop(T& data) 133 { 134 Cell* cell = nullptr; 135 uint32 pos = dequeuePos.LoadRelaxed(); 136 137 for (;;) 138 { 139 cell = &buffer[pos & MASK]; 140 141 uint32 seq = cell->sequence.Load(); 142 int32 dif = (int32)seq - (int32)(pos + 1); 143 144 if (dif == 0) 145 { 146 uint32 nowPos = dequeuePos.CompareAndSwap(pos, pos + 1); 147 if (nowPos == pos) 148 { 149 break; 150 } else 151 { 152 pos = nowPos; 153 } 154 } else 155 { 156 if (dif < 0) 157 { 158 return false; 159 } else 160 { 161 pos = dequeuePos.LoadRelaxed(); 162 } 163 } 164 } 165 166 // successfully found a cell 167 MoveCtor( &data, std::move(cell->data) ); 168 cell->sequence.Store(pos + MASK + 1); 169 return true; 170 } 171 172 }; 173 174 175 }