1.. _Local_Serializer:
2
3Local Serializer
4================
5
6
7.. container:: section
8
9
10   .. rubric:: Context
11      :class: sectiontitle
12
13   Consider an interactive program. To maximize concurrency and
14   responsiveness, operations requested by the user can be implemented
15   as tasks. The order of operations can be important. For example,
16   suppose the program presents editable text to the user. There might
17   be operations to select text and delete selected text. Reversing the
18   order of "select" and "delete" operations on the same buffer would be
19   bad. However, commuting operations on different buffers might be
20   okay. Hence the goal is to establish serial ordering of tasks
21   associated with a given object, but not constrain ordering of tasks
22   between different objects.
23
24
25.. container:: section
26
27
28   .. rubric:: Forces
29      :class: sectiontitle
30
31   -  Operations associated with a certain object must be performed in
32      serial order.
33
34
35   -  Serializing with a lock would be wasteful because threads would be
36      waiting at the lock when they could be doing useful work
37      elsewhere.
38
39
40.. container:: section
41
42
43   .. rubric:: Solution
44      :class: sectiontitle
45
46   Sequence the work items using a FIFO (first-in first-out structure).
47   Always keep an item in flight if possible. If no item is in flight
48   when a work item appears, put the item in flight. Otherwise, push the
49   item onto the FIFO. When the current item in flight completes, pop
50   another item from the FIFO and put it in flight.
51
52
53   The logic can be implemented without mutexes, by using
54   ``concurrent_queue`` for the FIFO and ``atomic<int>`` to count the
55   number of items waiting and in flight. The example explains the
56   accounting in detail.
57
58
59.. container:: section
60
61
62   .. rubric:: Example
63      :class: sectiontitle
64
65   The following example builds on the Non-Preemptive Priorities example
66   to implement local serialization in addition to priorities. It
67   implements three priority levels and local serializers. The user
68   interface for it follows:
69
70
71   ::
72
73
74      enum Priority {
75         P_High,
76         P_Medium,
77         P_Low
78      };
79       
80
81      template<typename Func>
82      void EnqueueWork( Priority p, Func f, Serializer* s=NULL );
83
84
85   Template function ``EnqueueWork`` causes functor ``f`` to run when
86   the three constraints in the following table are met.
87
88
89   .. container:: tablenoborder
90
91
92      .. list-table::
93         :header-rows: 1
94
95         * -     Constraint
96           -     Resolved by class...
97         * -     Any prior work for the ``Serializer`` has completed.
98           -     \ ``Serializer``
99         * -     A thread is available.
100           -     \ ``RunWorkItem``
101         * -     No higher priority work is ready to run.
102           -     \ ``ReadyPileType``
103
104
105
106
107   Constraints on a given functor are resolved from top to bottom in the
108   table. The first constraint does not exist when s is NULL. The
109   implementation of ``EnqueueWork`` packages the functor in a
110   ``SerializedWorkItem`` and routes it to the class that enforces the
111   first relevant constraint between pieces of work.
112
113
114   ::
115
116
117      template<typename Func>
118      void EnqueueWork( Priority p, Func f, Serializer* s=NULL ) {
119         WorkItem* item = new SerializedWorkItem<Func>( p, f, s );
120         if( s )
121             s->add(item);
122         else
123             ReadyPile.add(item);
124      }
125
126
127   A ``SerializedWorkItem`` is derived from a ``WorkItem``, which serves
128   as a way to pass around a prioritized piece of work without knowing
129   further details of the work.
130
131
132   ::
133
134
135      // Abstract base class for a prioritized piece of work.
136      class WorkItem {
137      public:
138         WorkItem( Priority p ) : priority(p) {}
139         // Derived class defines the actual work.
140         virtual void run() = 0;
141         const Priority priority;
142      };
143       
144
145      template<typename Func>
146      class SerializedWorkItem: public WorkItem {
147         Serializer* serializer;
148         Func f;
149         /*override*/ void run() {
150             f();
151             Serializer* s = serializer;
152             // Destroy f before running Serializer’s next functor.
153             delete this;
154             if( s )
155                 s->noteCompletion();
156         }
157      public:
158         SerializedWorkItem( Priority p, const Func& f_, Serializer* s ) :
159             WorkItem(p), serializer(s), f(f_)
160         {}
161      };
162
163
164   Base class ``WorkItem`` is the same as class WorkItem in the example
165   for Non-Preemptive Priorities. The notion of serial constraints is
166   completely hidden from the base class, thus permitting the framework
167   to extend other kinds of constraints or lack of constraints. Class
168   ``SerializedWorkItem`` is essentially ``ConcreteWorkItem`` from the
169   example for Non-Preemptive Priorities, extended with a ``Serializer``
170   aspect.
171
172
173   Virtual method ``run()`` is invoked when it becomes time to run the
174   functor. It performs three steps:
175
176
177   #. Run the functor.
178
179
180   #. Destroy the functor.
181
182
183   #. Notify the ``Serializer`` that the functor completed, and thus
184      unconstraining the next waiting functor.
185
186
187   Step 3 is the difference from the operation of ConcreteWorkItem::run.
188   Step 2 could be done after step 3 in some contexts to increase
189   concurrency slightly. However, the presented order is recommended
190   because if step 2 takes non-trivial time, it likely has side effects
191   that should complete before the next functor runs.
192
193
194   Class ``Serializer`` implements the core of the Local Serializer
195   pattern:
196
197
198   ::
199
200
201      class Serializer {
202         oneapi::tbb::concurrent_queue<WorkItem*> queue;
203         std::atomic<int> count;         // Count of queued items and in-flight item
204         void moveOneItemToReadyPile() { // Transfer item from queue to ReadyPile
205             WorkItem* item;
206             queue.try_pop(item);
207             ReadyPile.add(item);
208         }
209      public:
210         void add( WorkItem* item ) {
211             queue.push(item);
212             if( ++count==1 )
213                 moveOneItemToReadyPile();
214         }
215         void noteCompletion() {        // Called when WorkItem completes.
216             if( --count!=0 )
217                 moveOneItemToReadyPile();
218         }
219      };
220
221
222   The class maintains two members:
223
224
225   -  A queue of WorkItem waiting for prior work to complete.
226
227
228   -  A count of queued or in-flight work.
229
230
231   Mutexes are avoided by using ``concurrent_queue<WorkItem*>`` and
232   ``atomic<int>`` along with careful ordering of operations. The
233   transitions of count are the key understanding how class
234   ``Serializer`` works.
235
236
237   -  If method ``add`` increments ``count`` from 0 to 1, this indicates
238      that no other work is in flight and thus the work should be moved
239      to the ``ReadyPile``.
240
241
242   -  If method ``noteCompletion`` decrements count and it is *not* from
243      1 to 0, then the queue is non-empty and another item in the queue
244      should be moved to ``ReadyPile``.
245
246
247   Class ``ReadyPile`` is explained in the example for Non-Preemptive
248   Priorities.
249
250
251   If priorities are not necessary, there are two variations on method
252   ``moveOneItemToReadyPile``, with different implications.
253
254
255   -  Method ``moveOneItemToReadyPile`` could directly
256      invoke\ ``item->run()``. This approach has relatively low overhead
257      and high thread locality for a given ``Serializer``. But it is
258      unfair. If the ``Serializer`` has a continual stream of tasks, the
259      thread operating on it will keep servicing those tasks to the
260      exclusion of others.
261
262
263   -  Method ``moveOneItemToReadyPile`` could invoke ``task::enqueue``
264      to enqueue a task that invokes ``item->run()``. Doing so
265      introduces higher overhead and less locality than the first
266      approach, but avoids starvation.
267
268
269   The conflict between fairness and maximum locality is fundamental.
270   The best resolution depends upon circumstance.
271
272
273   The pattern generalizes to constraints on work items more general
274   than those maintained by class Serializer. A generalized
275   ``Serializer::add`` determines if a work item is unconstrained, and
276   if so, runs it immediately. A generalized
277   ``Serializer::noteCompletion`` runs all previously constrained items
278   that have become unconstrained by the completion of the current work
279   item. The term "run" means to run work immediately, or if there are
280   more constraints, forwarding the work to the next constraint
281   resolver.
282
283