1.. _Local_Serializer: 2 3Local Serializer 4================ 5 6 7.. container:: section 8 9 10 .. rubric:: Context 11 :class: sectiontitle 12 13 Consider an interactive program. To maximize concurrency and 14 responsiveness, operations requested by the user can be implemented 15 as tasks. The order of operations can be important. For example, 16 suppose the program presents editable text to the user. There might 17 be operations to select text and delete selected text. Reversing the 18 order of "select" and "delete" operations on the same buffer would be 19 bad. However, commuting operations on different buffers might be 20 okay. Hence the goal is to establish serial ordering of tasks 21 associated with a given object, but not constrain ordering of tasks 22 between different objects. 23 24 25.. container:: section 26 27 28 .. rubric:: Forces 29 :class: sectiontitle 30 31 - Operations associated with a certain object must be performed in 32 serial order. 33 34 35 - Serializing with a lock would be wasteful because threads would be 36 waiting at the lock when they could be doing useful work 37 elsewhere. 38 39 40.. container:: section 41 42 43 .. rubric:: Solution 44 :class: sectiontitle 45 46 Sequence the work items using a FIFO (first-in first-out structure). 47 Always keep an item in flight if possible. If no item is in flight 48 when a work item appears, put the item in flight. Otherwise, push the 49 item onto the FIFO. When the current item in flight completes, pop 50 another item from the FIFO and put it in flight. 51 52 53 The logic can be implemented without mutexes, by using 54 ``concurrent_queue`` for the FIFO and ``atomic<int>`` to count the 55 number of items waiting and in flight. The example explains the 56 accounting in detail. 57 58 59.. container:: section 60 61 62 .. rubric:: Example 63 :class: sectiontitle 64 65 The following example builds on the Non-Preemptive Priorities example 66 to implement local serialization in addition to priorities. It 67 implements three priority levels and local serializers. The user 68 interface for it follows: 69 70 71 :: 72 73 74 enum Priority { 75 P_High, 76 P_Medium, 77 P_Low 78 }; 79 80 81 template<typename Func> 82 void EnqueueWork( Priority p, Func f, Serializer* s=NULL ); 83 84 85 Template function ``EnqueueWork`` causes functor ``f`` to run when 86 the three constraints in the following table are met. 87 88 89 .. container:: tablenoborder 90 91 92 .. list-table:: 93 :header-rows: 1 94 95 * - Constraint 96 - Resolved by class... 97 * - Any prior work for the ``Serializer`` has completed. 98 - \ ``Serializer`` 99 * - A thread is available. 100 - \ ``RunWorkItem`` 101 * - No higher priority work is ready to run. 102 - \ ``ReadyPileType`` 103 104 105 106 107 Constraints on a given functor are resolved from top to bottom in the 108 table. The first constraint does not exist when s is NULL. The 109 implementation of ``EnqueueWork`` packages the functor in a 110 ``SerializedWorkItem`` and routes it to the class that enforces the 111 first relevant constraint between pieces of work. 112 113 114 :: 115 116 117 template<typename Func> 118 void EnqueueWork( Priority p, Func f, Serializer* s=NULL ) { 119 WorkItem* item = new SerializedWorkItem<Func>( p, f, s ); 120 if( s ) 121 s->add(item); 122 else 123 ReadyPile.add(item); 124 } 125 126 127 A ``SerializedWorkItem`` is derived from a ``WorkItem``, which serves 128 as a way to pass around a prioritized piece of work without knowing 129 further details of the work. 130 131 132 :: 133 134 135 // Abstract base class for a prioritized piece of work. 136 class WorkItem { 137 public: 138 WorkItem( Priority p ) : priority(p) {} 139 // Derived class defines the actual work. 140 virtual void run() = 0; 141 const Priority priority; 142 }; 143 144 145 template<typename Func> 146 class SerializedWorkItem: public WorkItem { 147 Serializer* serializer; 148 Func f; 149 /*override*/ void run() { 150 f(); 151 Serializer* s = serializer; 152 // Destroy f before running Serializer’s next functor. 153 delete this; 154 if( s ) 155 s->noteCompletion(); 156 } 157 public: 158 SerializedWorkItem( Priority p, const Func& f_, Serializer* s ) : 159 WorkItem(p), serializer(s), f(f_) 160 {} 161 }; 162 163 164 Base class ``WorkItem`` is the same as class WorkItem in the example 165 for Non-Preemptive Priorities. The notion of serial constraints is 166 completely hidden from the base class, thus permitting the framework 167 to extend other kinds of constraints or lack of constraints. Class 168 ``SerializedWorkItem`` is essentially ``ConcreteWorkItem`` from the 169 example for Non-Preemptive Priorities, extended with a ``Serializer`` 170 aspect. 171 172 173 Virtual method ``run()`` is invoked when it becomes time to run the 174 functor. It performs three steps: 175 176 177 #. Run the functor. 178 179 180 #. Destroy the functor. 181 182 183 #. Notify the ``Serializer`` that the functor completed, and thus 184 unconstraining the next waiting functor. 185 186 187 Step 3 is the difference from the operation of ConcreteWorkItem::run. 188 Step 2 could be done after step 3 in some contexts to increase 189 concurrency slightly. However, the presented order is recommended 190 because if step 2 takes non-trivial time, it likely has side effects 191 that should complete before the next functor runs. 192 193 194 Class ``Serializer`` implements the core of the Local Serializer 195 pattern: 196 197 198 :: 199 200 201 class Serializer { 202 oneapi::tbb::concurrent_queue<WorkItem*> queue; 203 std::atomic<int> count; // Count of queued items and in-flight item 204 void moveOneItemToReadyPile() { // Transfer item from queue to ReadyPile 205 WorkItem* item; 206 queue.try_pop(item); 207 ReadyPile.add(item); 208 } 209 public: 210 void add( WorkItem* item ) { 211 queue.push(item); 212 if( ++count==1 ) 213 moveOneItemToReadyPile(); 214 } 215 void noteCompletion() { // Called when WorkItem completes. 216 if( --count!=0 ) 217 moveOneItemToReadyPile(); 218 } 219 }; 220 221 222 The class maintains two members: 223 224 225 - A queue of WorkItem waiting for prior work to complete. 226 227 228 - A count of queued or in-flight work. 229 230 231 Mutexes are avoided by using ``concurrent_queue<WorkItem*>`` and 232 ``atomic<int>`` along with careful ordering of operations. The 233 transitions of count are the key understanding how class 234 ``Serializer`` works. 235 236 237 - If method ``add`` increments ``count`` from 0 to 1, this indicates 238 that no other work is in flight and thus the work should be moved 239 to the ``ReadyPile``. 240 241 242 - If method ``noteCompletion`` decrements count and it is *not* from 243 1 to 0, then the queue is non-empty and another item in the queue 244 should be moved to ``ReadyPile``. 245 246 247 Class ``ReadyPile`` is explained in the example for Non-Preemptive 248 Priorities. 249 250 251 If priorities are not necessary, there are two variations on method 252 ``moveOneItemToReadyPile``, with different implications. 253 254 255 - Method ``moveOneItemToReadyPile`` could directly 256 invoke\ ``item->run()``. This approach has relatively low overhead 257 and high thread locality for a given ``Serializer``. But it is 258 unfair. If the ``Serializer`` has a continual stream of tasks, the 259 thread operating on it will keep servicing those tasks to the 260 exclusion of others. 261 262 263 - Method ``moveOneItemToReadyPile`` could invoke ``task::enqueue`` 264 to enqueue a task that invokes ``item->run()``. Doing so 265 introduces higher overhead and less locality than the first 266 approach, but avoids starvation. 267 268 269 The conflict between fairness and maximum locality is fundamental. 270 The best resolution depends upon circumstance. 271 272 273 The pattern generalizes to constraints on work items more general 274 than those maintained by class Serializer. A generalized 275 ``Serializer::add`` determines if a work item is unconstrained, and 276 if so, runs it immediately. A generalized 277 ``Serializer::noteCompletion`` runs all previously constrained items 278 that have become unconstrained by the completion of the current work 279 item. The term "run" means to run work immediately, or if there are 280 more constraints, forwarding the work to the next constraint 281 resolver. 282 283