1 // The MIT License (MIT)
2 //
3 // 	Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev
4 //
5 // 	Permission is hereby granted, free of charge, to any person obtaining a copy
6 // 	of this software and associated documentation files (the "Software"), to deal
7 // 	in the Software without restriction, including without limitation the rights
8 // 	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // 	copies of the Software, and to permit persons to whom the Software is
10 // 	furnished to do so, subject to the following conditions:
11 //
12 //  The above copyright notice and this permission notice shall be included in
13 // 	all copies or substantial portions of the Software.
14 //
15 // 	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // 	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // 	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // 	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // 	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // 	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // 	THE SOFTWARE.
22 
23 #pragma once
24 
25 #include <MTTools.h>
26 
27 namespace MT
28 {
29 	/// \class ConcurrentQueueLIFO
30 	/// \brief Lock-Free Multi-Producer Multi-Consumer Queue
31 	///
32 	/// based on Bounded MPMC queue article by Dmitry Vyukov
33 	/// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
34 	///
35 	template<typename T, uint32 CAPACITY>
36 	class LockFreeQueueMPMC
37 	{
38 		static const int32 ALIGNMENT = 16;
39 		static const int32 ALIGNMENT_MASK = (ALIGNMENT-1);
40 		static const uint32 MASK = (CAPACITY - 1);
41 
42 		struct Cell
43 		{
44 			Atomic32<uint32> sequence;
45 			T data;
46 		};
47 
48 		// Raw memory buffer
49 		byte rawMemory[ sizeof(Cell) * CAPACITY + ALIGNMENT ];
50 
51 		// Prevent false sharing between threads
52 		uint8 cacheline0[64];
53 
54 		Cell* const buffer;
55 
56 		// Prevent false sharing between threads
57 		uint8 cacheline1[64];
58 
59 		Atomic32<uint32> enqueuePos;
60 
61 		// Prevent false sharing between threads
62 		uint8 cacheline2[64];
63 
64 		Atomic32<uint32> dequeuePos;
65 
66 		inline void MoveCtor(T* element, T && val)
67 		{
68 			new(element) T(std::move(val));
69 		}
70 
71 
72 	public:
73 
74 		MT_NOCOPYABLE(LockFreeQueueMPMC);
75 
76 		LockFreeQueueMPMC()
77 			: buffer( (Cell*)( ( (uintptr_t)&rawMemory[0] + ALIGNMENT_MASK ) & ~(uintptr_t)ALIGNMENT_MASK ) )
78 		{
79 			static_assert( MT::StaticIsPow2<CAPACITY>::result, "LockFreeQueueMPMC capacity must be power of 2");
80 
81 			for (uint32 i = 0; i < CAPACITY; i++)
82 			{
83 				buffer[i].sequence.StoreRelaxed(i);
84 			}
85 
86 			enqueuePos.StoreRelaxed(0);
87 			dequeuePos.StoreRelaxed(0);
88 		}
89 
90 		bool TryPush(T && data)
91 		{
92 			Cell* cell = nullptr;
93 
94 			uint32 pos = enqueuePos.LoadRelaxed();
95 			for(;;)
96 			{
97 				cell = &buffer[pos & MASK];
98 
99 				uint32 seq = cell->sequence.Load();
100 				int32 dif = (int32)seq - (int32)pos;
101 
102 				if (dif == 0)
103 				{
104 					uint32 nowPos = enqueuePos.CompareAndSwap(pos, pos + 1);
105 					if (nowPos == pos)
106 					{
107 						break;
108 					} else
109 					{
110 						pos = nowPos;
111 					}
112 				} else
113 				{
114 					if (dif < 0)
115 					{
116 						return false;
117 					} else
118 					{
119 						pos = enqueuePos.LoadRelaxed();
120 					}
121 				}
122 			}
123 
124 			// successfully found a cell
125 			MoveCtor( &cell->data, std::move(data) );
126 			cell->sequence.Store(pos + 1);
127 			return true;
128 		}
129 
130 
131 		bool TryPop(T& data)
132 		{
133 			Cell* cell = nullptr;
134 			uint32 pos = dequeuePos.LoadRelaxed();
135 
136 			for (;;)
137 			{
138 				cell = &buffer[pos & MASK];
139 
140 				uint32 seq = cell->sequence.Load();
141 				int32 dif = (int32)seq - (int32)(pos + 1);
142 
143 				if (dif == 0)
144 				{
145 					uint32 nowPos = dequeuePos.CompareAndSwap(pos, pos + 1);
146 					if (nowPos == pos)
147 					{
148 						break;
149 					} else
150 					{
151 						pos = nowPos;
152 					}
153 				} else
154 				{
155 					if (dif < 0)
156 					{
157 						return false;
158 					} else
159 					{
160 						pos = dequeuePos.LoadRelaxed();
161 					}
162 				}
163 			}
164 
165 			// successfully found a cell
166 			MoveCtor( &data, std::move(cell->data) );
167 			cell->sequence.Store(pos + MASK + 1);
168 			return true;
169 		}
170 
171 	};
172 
173 
174 }