1 // The MIT License (MIT)
2 //
3 // 	Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev
4 //
5 // 	Permission is hereby granted, free of charge, to any person obtaining a copy
6 // 	of this software and associated documentation files (the "Software"), to deal
7 // 	in the Software without restriction, including without limitation the rights
8 // 	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // 	copies of the Software, and to permit persons to whom the Software is
10 // 	furnished to do so, subject to the following conditions:
11 //
12 //  The above copyright notice and this permission notice shall be included in
13 // 	all copies or substantial portions of the Software.
14 //
15 // 	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // 	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // 	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // 	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // 	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // 	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // 	THE SOFTWARE.
22 
23 #pragma once
24 
25 #include <MTTools.h>
26 #include <utility>
27 
28 namespace MT
29 {
30 	/// \class ConcurrentQueueLIFO
31 	/// \brief Lock-Free Multi-Producer Multi-Consumer Queue with fixed capacity.
32 	///
33 	/// based on Bounded MPMC queue article by Dmitry Vyukov
34 	/// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
35 	///
36 	template<typename T, uint32 CAPACITY>
37 	class LockFreeQueueMPMC
38 	{
39 		static const int32 ALIGNMENT = 16;
40 		static const int32 ALIGNMENT_MASK = (ALIGNMENT-1);
41 		static const uint32 MASK = (CAPACITY - 1);
42 
43 		struct Cell
44 		{
45 			Atomic32<uint32> sequence;
46 			T data;
47 		};
48 
49 		// Raw memory buffer
50 		byte rawMemory[ sizeof(Cell) * CAPACITY + ALIGNMENT ];
51 
52 		// Prevent false sharing between threads
53 		uint8 cacheline0[64];
54 
55 		Cell* const buffer;
56 
57 		// Prevent false sharing between threads
58 		uint8 cacheline1[64];
59 
60 		Atomic32<uint32> enqueuePos;
61 
62 		// Prevent false sharing between threads
63 		uint8 cacheline2[64];
64 
65 		Atomic32<uint32> dequeuePos;
66 
67 		inline void MoveCtor(T* element, T && val)
68 		{
69 			new(element) T(std::move(val));
70 		}
71 
72 
73 	public:
74 
75 		MT_NOCOPYABLE(LockFreeQueueMPMC);
76 
77 		LockFreeQueueMPMC()
78 			: buffer( (Cell*)( ( (uintptr_t)&rawMemory[0] + ALIGNMENT_MASK ) & ~(uintptr_t)ALIGNMENT_MASK ) )
79 		{
80 			static_assert( MT::StaticIsPow2<CAPACITY>::result, "LockFreeQueueMPMC capacity must be power of 2");
81 
82 			for (uint32 i = 0; i < CAPACITY; i++)
83 			{
84 				buffer[i].sequence.StoreRelaxed(i);
85 			}
86 
87 			enqueuePos.StoreRelaxed(0);
88 			dequeuePos.StoreRelaxed(0);
89 		}
90 
91 		bool TryPush(T && data)
92 		{
93 			Cell* cell = nullptr;
94 
95 			uint32 pos = enqueuePos.LoadRelaxed();
96 			for(;;)
97 			{
98 				cell = &buffer[pos & MASK];
99 
100 				uint32 seq = cell->sequence.Load();
101 				int32 dif = (int32)seq - (int32)pos;
102 
103 				if (dif == 0)
104 				{
105 					uint32 nowPos = enqueuePos.CompareAndSwap(pos, pos + 1);
106 					if (nowPos == pos)
107 					{
108 						break;
109 					} else
110 					{
111 						pos = nowPos;
112 					}
113 				} else
114 				{
115 					if (dif < 0)
116 					{
117 						return false;
118 					} else
119 					{
120 						pos = enqueuePos.LoadRelaxed();
121 					}
122 				}
123 			}
124 
125 			// successfully found a cell
126 			MoveCtor( &cell->data, std::move(data) );
127 			cell->sequence.Store(pos + 1);
128 			return true;
129 		}
130 
131 
132 		bool TryPop(T& data)
133 		{
134 			Cell* cell = nullptr;
135 			uint32 pos = dequeuePos.LoadRelaxed();
136 
137 			for (;;)
138 			{
139 				cell = &buffer[pos & MASK];
140 
141 				uint32 seq = cell->sequence.Load();
142 				int32 dif = (int32)seq - (int32)(pos + 1);
143 
144 				if (dif == 0)
145 				{
146 					uint32 nowPos = dequeuePos.CompareAndSwap(pos, pos + 1);
147 					if (nowPos == pos)
148 					{
149 						break;
150 					} else
151 					{
152 						pos = nowPos;
153 					}
154 				} else
155 				{
156 					if (dif < 0)
157 					{
158 						return false;
159 					} else
160 					{
161 						pos = dequeuePos.LoadRelaxed();
162 					}
163 				}
164 			}
165 
166 			// successfully found a cell
167 			MoveCtor( &data, std::move(cell->data) );
168 			cell->sequence.Store(pos + MASK + 1);
169 			return true;
170 		}
171 
172 	};
173 
174 
175 }