xref: /oneTBB/test/common/graph_utils.h (revision 0a2b3987)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 /** @file harness_graph.cpp
18     This contains common helper classes and functions for testing graph nodes
19 **/
20 
21 #ifndef __TBB_harness_graph_H
22 #define __TBB_harness_graph_H
23 
24 #include "config.h"
25 
26 #include "oneapi/tbb/flow_graph.h"
27 #include "oneapi/tbb/null_rw_mutex.h"
28 #include "oneapi/tbb/concurrent_unordered_set.h"
29 
30 #include <atomic>
31 #include <thread>
32 #include <mutex>
33 #include <condition_variable>
34 
35 #include "common/spin_barrier.h"
36 
37 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
38 
39 // Needed conversion to and from continue_msg, but didn't want to add
40 // conversion operators to the class, since we don't want it in general,
41 // only in these tests.
42 template<typename InputType, typename OutputType>
43 struct converter {
44     static OutputType convert_value(const InputType &i) {
45         return OutputType(i);
46     }
47 };
48 
49 template<typename InputType>
50 struct converter<InputType,tbb::flow::continue_msg> {
51     static tbb::flow::continue_msg convert_value(const InputType &/*i*/) {
52         return tbb::flow::continue_msg();
53     }
54 };
55 
56 template<typename OutputType>
57 struct converter<tbb::flow::continue_msg,OutputType> {
58     static OutputType convert_value(const tbb::flow::continue_msg &/*i*/) {
59         return OutputType();
60     }
61 };
62 
63 // helper for multifunction_node tests.
64 template<size_t N>
65 struct mof_helper {
66     template<typename InputType, typename ports_type>
67     static inline void output_converted_value(const InputType &i, ports_type &p) {
68         (void)std::get<N-1>(p).try_put(converter<InputType,typename std::tuple_element<N-1,ports_type>::type::output_type>::convert_value(i));
69         output_converted_value<N-1>(i, p);
70     }
71 };
72 
73 template<>
74 struct mof_helper<1> {
75     template<typename InputType, typename ports_type>
76     static inline void output_converted_value(const InputType &i, ports_type &p) {
77         // just emit a default-constructed object
78         (void)std::get<0>(p).try_put(converter<InputType,typename std::tuple_element<0,ports_type>::type::output_type>::convert_value(i));
79     }
80 };
81 
82 template< typename InputType, typename OutputType >
83 struct harness_graph_default_functor {
84     static OutputType construct( InputType v ) {
85         return OutputType(v);
86     }
87 };
88 
89 template< typename OutputType >
90 struct harness_graph_default_functor< tbb::flow::continue_msg, OutputType > {
91     static OutputType construct( tbb::flow::continue_msg ) {
92         return OutputType();
93     }
94 };
95 
96 template< typename InputType >
97 struct harness_graph_default_functor< InputType, tbb::flow::continue_msg > {
98     static tbb::flow::continue_msg construct( InputType ) {
99         return tbb::flow::continue_msg();
100     }
101 };
102 
103 template< >
104 struct harness_graph_default_functor< tbb::flow::continue_msg, tbb::flow::continue_msg > {
105     static tbb::flow::continue_msg construct( tbb::flow::continue_msg ) {
106         return tbb::flow::continue_msg();
107     }
108 };
109 
110 template<typename InputType, typename OutputSet>
111 struct harness_graph_default_multifunction_functor {
112     static const int N = std::tuple_size<OutputSet>::value;
113     typedef typename tbb::flow::multifunction_node<InputType,OutputSet>::output_ports_type ports_type;
114     static void construct(const InputType &i, ports_type &p) {
115         mof_helper<N>::output_converted_value(i, p);
116     }
117 };
118 
119 //! An executor that accepts InputType and generates OutputType
120 template< typename InputType, typename OutputType >
121 struct harness_graph_executor {
122 
123     typedef OutputType (*function_ptr_type)( InputType v );
124 
125     template<typename RW>
126     struct mutex_holder { static RW mutex; };
127 
128     static function_ptr_type fptr;
129     static std::atomic<size_t> execute_count;
130     static std::atomic<size_t> current_executors;
131     static size_t max_executors;
132 
133     static inline OutputType func( InputType v ) {
134         size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture
135         c = current_executors++;
136         if (max_executors != 0) {
137             CHECK(c <= max_executors);
138         }
139         ++execute_count;
140         OutputType v2 = (*fptr)(v);
141         --current_executors;
142         return v2;
143     }
144 
145     template< typename RW >
146     static inline OutputType tfunc( InputType v ) {
147         // Invocations allowed to be concurrent, the lock is acquired in shared ("read") mode.
148         // A test can take it exclusively, thus creating a barrier for invocations.
149         typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false );
150         return func(v);
151     }
152 
153     template< typename RW >
154     struct tfunctor {
155         std::atomic<size_t> my_execute_count;
156         tfunctor() { my_execute_count = 0; }
157         tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); }
158         OutputType operator()( InputType i ) {
159            typename RW::scoped_lock l( harness_graph_executor::mutex_holder<RW>::mutex, /*write=*/false );
160            ++my_execute_count;
161            return harness_graph_executor::func(i);
162         }
163     };
164     typedef tfunctor<tbb::null_rw_mutex> functor;
165 
166 };
167 
168 //! A multifunction executor that accepts InputType and has only one Output of OutputType.
169 template< typename InputType, typename OutputTuple >
170 struct harness_graph_multifunction_executor {
171     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type ports_type;
172     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
173 
174     typedef void (*mfunction_ptr_type)( const InputType& v, ports_type &p );
175 
176     template<typename RW>
177     struct mutex_holder { static RW mutex; };
178 
179     static mfunction_ptr_type fptr;
180     static std::atomic<size_t> execute_count;
181     static std::atomic<size_t> current_executors;
182     static size_t max_executors;
183 
184     static inline void empty_func( const InputType&, ports_type& ) {
185     }
186 
187     static inline void func( const InputType &v, ports_type &p ) {
188         size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture
189         c = current_executors++;
190         CHECK( (max_executors == 0 || c <= max_executors) );
191         CHECK( (std::tuple_size<OutputTuple>::value == 1) );
192         ++execute_count;
193         (*fptr)(v,p);
194         --current_executors;
195     }
196 
197     template< typename RW >
198     static inline void tfunc( const InputType& v, ports_type &p ) {
199         // Shared lock in invocations, exclusive in a test; see a comment in harness_graph_executor.
200         typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false );
201         func(v,p);
202     }
203 
204     template< typename RW >
205     struct tfunctor {
206         std::atomic<size_t> my_execute_count;
207         tfunctor() { my_execute_count = 0; }
208         tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); }
209         void operator()( const InputType &i, ports_type &p ) {
210            typename RW::scoped_lock l( harness_graph_multifunction_executor::mutex_holder<RW>::mutex, /*write=*/false );
211            ++my_execute_count;
212            harness_graph_multifunction_executor::func(i,p);
213         }
214     };
215     typedef tfunctor<tbb::null_rw_mutex> functor;
216 
217 };
218 
219 // static vars for function_node tests
220 template< typename InputType, typename OutputType >
221 template< typename RW >
222 RW harness_graph_executor<InputType, OutputType>::mutex_holder<RW>::mutex;
223 
224 template< typename InputType, typename OutputType >
225 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::execute_count;
226 
227 template< typename InputType, typename OutputType >
228 typename harness_graph_executor<InputType, OutputType>::function_ptr_type harness_graph_executor<InputType, OutputType>::fptr
229     = harness_graph_default_functor< InputType, OutputType >::construct;
230 
231 template< typename InputType, typename OutputType >
232 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::current_executors;
233 
234 template< typename InputType, typename OutputType >
235 size_t harness_graph_executor<InputType, OutputType>::max_executors = 0;
236 
237 // static vars for multifunction_node tests
238 template< typename InputType, typename OutputTuple >
239 template< typename RW >
240 RW harness_graph_multifunction_executor<InputType, OutputTuple>::mutex_holder<RW>::mutex;
241 
242 template< typename InputType, typename OutputTuple >
243 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
244 
245 template< typename InputType, typename OutputTuple >
246 typename harness_graph_multifunction_executor<InputType, OutputTuple>::mfunction_ptr_type harness_graph_multifunction_executor<InputType, OutputTuple>::fptr
247     = harness_graph_default_multifunction_functor< InputType, OutputTuple >::construct;
248 
249 template< typename InputType, typename OutputTuple >
250 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors;
251 
252 template< typename InputType, typename OutputTuple >
253 size_t harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
254 
255 //! Counts the number of puts received
256 template< typename T >
257 struct harness_counting_receiver : public tbb::flow::receiver<T> {
258     harness_counting_receiver& operator=(const harness_counting_receiver&) = delete;
259 
260     std::atomic< size_t > my_count;
261     T max_value;
262     size_t num_copies;
263     tbb::flow::graph& my_graph;
264 
265     harness_counting_receiver(tbb::flow::graph& g) : num_copies(1), my_graph(g) {
266        my_count = 0;
267     }
268 
269     void initialize_map( const T& m, size_t c ) {
270        my_count = 0;
271        max_value = m;
272        num_copies = c;
273     }
274 
275     tbb::flow::graph& graph_reference() const override {
276         return my_graph;
277     }
278 
279     tbb::detail::d1::graph_task *try_put_task( const T & ) override {
280       ++my_count;
281       return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED);
282     }
283 
284     void validate() {
285         size_t n = my_count;
286         CHECK( n == num_copies*max_value );
287     }
288  };
289 
290 //! Counts the number of puts received
291 template< typename T >
292 struct harness_mapped_receiver : public tbb::flow::receiver<T> {
293     harness_mapped_receiver(const harness_mapped_receiver&) = delete;
294     harness_mapped_receiver& operator=(const harness_mapped_receiver&) = delete;
295 
296     std::atomic< size_t > my_count;
297     T max_value;
298     size_t num_copies;
299     typedef tbb::concurrent_unordered_multiset<T> multiset_type;
300     multiset_type *my_multiset;
301     tbb::flow::graph& my_graph;
302 
303     harness_mapped_receiver(tbb::flow::graph& g) : my_multiset(NULL), my_graph(g) {
304        my_count = 0;
305     }
306 
307 #if __INTEL_COMPILER <= 2021
308     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
309     // class while the parent class has the virtual keyword for the destrocutor.
310     virtual
311 #endif
312     ~harness_mapped_receiver() {
313         if ( my_multiset ) delete my_multiset;
314     }
315 
316     void initialize_map( const T& m, size_t c ) {
317        my_count = 0;
318        max_value = m;
319        num_copies = c;
320        if ( my_multiset ) delete my_multiset;
321        my_multiset = new multiset_type;
322     }
323 
324     tbb::detail::d1::graph_task* try_put_task( const T &t ) override {
325       if ( my_multiset ) {
326           (*my_multiset).emplace( t );
327       } else {
328           ++my_count;
329       }
330       return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED);
331     }
332 
333     tbb::flow::graph& graph_reference() const override {
334         return my_graph;
335     }
336 
337     void validate() {
338         if ( my_multiset ) {
339             for ( size_t i = 0; i < (size_t)max_value; ++i ) {
340                 auto it = (*my_multiset).find((int)i);
341                 CHECK_MESSAGE( it != my_multiset->end(), "Expected element in the map." );
342                 size_t n = (*my_multiset).count(int(i));
343                 CHECK( n == num_copies );
344             }
345         } else {
346             size_t n = my_count;
347             CHECK( n == num_copies*max_value );
348         }
349     }
350 
351     void reset_receiver(tbb::flow::reset_flags /*f*/) {
352         my_count = 0;
353         if(my_multiset) delete my_multiset;
354         my_multiset = new multiset_type;
355     }
356 
357 };
358 
359 //! Counts the number of puts received
360 template< typename T >
361 struct harness_counting_sender : public tbb::flow::sender<T> {
362     harness_counting_sender(const harness_counting_sender&) = delete;
363     harness_counting_sender& operator=(const harness_counting_sender&) = delete;
364 
365     typedef typename tbb::flow::sender<T>::successor_type successor_type;
366     std::atomic< successor_type * > my_receiver;
367     std::atomic< size_t > my_count;
368     std::atomic< size_t > my_received;
369     size_t my_limit;
370 
371     harness_counting_sender( ) : my_limit(~size_t(0)) {
372        my_receiver = NULL;
373        my_count = 0;
374        my_received = 0;
375     }
376 
377     harness_counting_sender( size_t limit ) : my_limit(limit) {
378        my_receiver = NULL;
379        my_count = 0;
380        my_received = 0;
381     }
382 
383     bool register_successor( successor_type &r ) override {
384         my_receiver = &r;
385         return true;
386     }
387 
388     bool remove_successor( successor_type &r ) override {
389         successor_type *s = my_receiver.exchange( NULL );
390         CHECK( s == &r );
391         return true;
392     }
393 
394     bool try_get( T & v ) override {
395         size_t i = my_count++;
396         if ( i < my_limit ) {
397            v = T( i );
398            ++my_received;
399            return true;
400         } else {
401            return false;
402         }
403     }
404 
405     bool try_put_once() {
406         successor_type *s = my_receiver;
407         size_t i = my_count++;
408         if ( s->try_put( T(i) ) ) {
409             ++my_received;
410             return true;
411         } else {
412             return false;
413         }
414     }
415 
416     void try_put_until_false() {
417         successor_type *s = my_receiver;
418         size_t i = my_count++;
419 
420         while ( s->try_put( T(i) ) ) {
421             ++my_received;
422             i = my_count++;
423         }
424     }
425 
426     void try_put_until_limit() {
427         successor_type *s = my_receiver;
428 
429         for ( int i = 0; i < (int)my_limit; ++i ) {
430             CHECK( s->try_put( T(i) ) );
431             ++my_received;
432         }
433         CHECK( my_received == my_limit );
434     }
435 
436 };
437 
438 template< typename InputType >
439 struct parallel_put_until_limit {
440     parallel_put_until_limit& operator=(const parallel_put_until_limit&) = delete;
441 
442     typedef std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders_t;
443 
444     senders_t& my_senders;
445 
446     parallel_put_until_limit( senders_t& senders ) : my_senders(senders) {}
447 
448     void operator()( int i ) const  { my_senders[i]->try_put_until_limit(); }
449 };
450 
451 // test for resets of buffer-type nodes.
452 std::atomic<int> serial_fn_state0;
453 std::atomic<int> serial_fn_state1;
454 std::atomic<int> serial_continue_state0;
455 
456 template<typename T>
457 struct serial_fn_body {
458     std::atomic<int>& my_flag;
459     serial_fn_body(std::atomic<int>& flag) : my_flag(flag) { }
460     T operator()(const T& in) {
461         if (my_flag == 0) {
462             my_flag = 1;
463 
464             // wait until we are released
465             utils::SpinWaitWhileEq(my_flag, 1);
466         }
467         return in;
468     }
469 };
470 
471 template<typename T>
472 struct serial_continue_body {
473     std::atomic<int>& my_flag;
474     serial_continue_body(std::atomic<int> &flag) : my_flag(flag) {}
475     T operator()(const tbb::flow::continue_msg& /*in*/) {
476         // signal we have received a value
477         my_flag = 1;
478         // wait until we are released
479         utils::SpinWaitWhileEq(my_flag, 1);
480         return (T)1;
481     }
482 };
483 
484 template<typename T, typename BufferType>
485 void test_resets() {
486     const int NN = 3;
487     bool nFound[NN];
488     tbb::task_arena arena{4};
489     arena.execute(
490         [&] {
491             tbb::task_group_context   tgc;
492             tbb::flow::graph          g(tgc);
493             BufferType                b0(g);
494             tbb::flow::queue_node<T>  q0(g);
495             T j{};
496 
497             // reset empties buffer
498             for(T i = 0; i < NN; ++i) {
499                 b0.try_put(i);
500                 nFound[(int)i] = false;
501             }
502             g.wait_for_all();
503             g.reset();
504             CHECK_MESSAGE(!b0.try_get(j), "reset did not empty buffer");
505 
506             // reset doesn't delete edge
507 
508             tbb::flow::make_edge(b0,q0);
509             g.wait_for_all(); // TODO: invesigate why make_edge to buffer_node always creates a forwarding task
510             g.reset();
511             for(T i = 0; i < NN; ++i) {
512                 b0.try_put(i);
513             }
514 
515             g.wait_for_all();
516             for( T i = 0; i < NN; ++i) {
517                 CHECK_MESSAGE(q0.try_get(j), "Missing value from buffer");
518                 CHECK_MESSAGE(!nFound[(int)j], "Duplicate value found");
519                 nFound[(int)j] = true;
520             }
521 
522             for(int ii = 0; ii < NN; ++ii) {
523                 CHECK_MESSAGE(nFound[ii], "missing value");
524             }
525             CHECK_MESSAGE(!q0.try_get(j), "Extra values in output");
526 
527             // reset reverses a reversed edge.
528             // we will use a serial rejecting node to get the edge to reverse.
529             tbb::flow::function_node<T, T, tbb::flow::rejecting> sfn(g, tbb::flow::serial, serial_fn_body<T>(serial_fn_state0));
530             tbb::flow::queue_node<T> outq(g);
531             tbb::flow::remove_edge(b0,q0);
532             tbb::flow::make_edge(b0, sfn);
533             tbb::flow::make_edge(sfn,outq);
534             g.wait_for_all();  // wait for all the tasks started by building the graph are done.
535             serial_fn_state0 = 0;
536 
537             // b0 ------> sfn ------> outq
538             for(int icnt = 0; icnt < 2; ++icnt) {
539                 g.wait_for_all();
540                 serial_fn_state0 = 0;
541                 std::thread t([&] {
542                     b0.try_put((T)0);  // will start sfn
543                     g.wait_for_all();  // wait for all the tasks to complete.
544                 });
545                 // wait until function_node starts
546                 utils::SpinWaitWhileEq(serial_fn_state0, 0);
547                 // now the function_node is executing.
548                 // this will start a task to forward the second item
549                 // to the serial function node
550                 b0.try_put((T)1);  // first item will be consumed by task completing the execution
551                 b0.try_put((T)2);  // second item will remain after cancellation
552                 // now wait for the task that attempts to forward the buffer item to
553                 // complete.
554                 // now cancel the graph.
555                 CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled");
556                 serial_fn_state0 = 0;  // release the function_node.
557                 t.join();
558                 // check that at most one output reached the queue_node
559                 T outt;
560                 T outt2;
561                 bool got_item1 = outq.try_get(outt);
562                 bool got_item2 = outq.try_get(outt2);
563                 // either the output queue was empty (if the function_node tested for cancellation before putting the
564                 // result to the queue) or there was one element in the queue (the 0).
565                 bool is_successful_operation = got_item1 && (int)outt == 0 && !got_item2;
566                 CHECK_MESSAGE( is_successful_operation, "incorrect output from function_node");
567                 // the edge between the buffer and the function_node should be reversed, and the last
568                 // message we put in the buffer should still be there.  We can't directly test for the
569                 // edge reversal.
570                 got_item1 = b0.try_get(outt);
571                 CHECK_MESSAGE(got_item1, " buffer lost a message");
572                 is_successful_operation = (2 == (int)outt || 1 == (int)outt);
573                 CHECK_MESSAGE(is_successful_operation, " buffer had incorrect message");  // the one not consumed by the node.
574                 CHECK_MESSAGE(g.is_cancelled(), "Graph was not cancelled");
575                 g.reset();
576             }  // icnt
577 
578             // reset with remove_edge removes edge.  (icnt ==0 => forward edge, 1 => reversed edge
579             for(int icnt = 0; icnt < 2; ++icnt) {
580                 if(icnt == 1) {
581                     // set up reversed edge
582                     tbb::flow::make_edge(b0, sfn);
583                     tbb::flow::make_edge(sfn,outq);
584                     serial_fn_state0 = 0;
585                     std::thread t([&] {
586                         b0.try_put((T)0);  // starts up the function node
587                         b0.try_put((T)1);  // should reverse the edge
588                         g.wait_for_all();  // wait for all the tasks to complete.
589                     });
590                     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for edge reversal
591                     CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled");
592                     serial_fn_state0 = 0;  // release the function_node.
593                     t.join();
594                 }
595                 g.reset(tbb::flow::rf_clear_edges);
596                 // test that no one is a successor to the buffer now.
597                 serial_fn_state0 = 1;  // let the function_node go if it gets an input message
598                 b0.try_put((T)23);
599                 g.wait_for_all();
600                 CHECK_MESSAGE((int)serial_fn_state0 == 1, "function_node executed when it shouldn't");
601                 T outt;
602                 bool is_successful_operation = b0.try_get(outt) && (T)23 == outt && !outq.try_get(outt);
603                 CHECK_MESSAGE(is_successful_operation, "node lost its input");
604             }
605         }
606     );                          // arena.execute()
607 }
608 
609 template<typename NodeType>
610 void test_input_ports_return_ref(NodeType& mip_node) {
611     typename NodeType::input_ports_type& input_ports1 = mip_node.input_ports();
612     typename NodeType::input_ports_type& input_ports2 = mip_node.input_ports();
613     CHECK_MESSAGE(&input_ports1 == &input_ports2, "input_ports() should return reference");
614 }
615 
616 template<typename NodeType>
617 void test_output_ports_return_ref(NodeType& mop_node) {
618     typename NodeType::output_ports_type& output_ports1 = mop_node.output_ports();
619     typename NodeType::output_ports_type& output_ports2 = mop_node.output_ports();
620     CHECK_MESSAGE(&output_ports1 == &output_ports2, "output_ports() should return reference");
621 }
622 
623 template< template <typename> class ReservingNodeType, typename DataType, bool DoClear >
624 class harness_reserving_body {
625     harness_reserving_body& operator=(const harness_reserving_body&) = delete;
626     ReservingNodeType<DataType> &my_reserving_node;
627     tbb::flow::buffer_node<DataType> &my_buffer_node;
628 public:
629     harness_reserving_body(ReservingNodeType<DataType> &reserving_node, tbb::flow::buffer_node<DataType> &bn) : my_reserving_node(reserving_node), my_buffer_node(bn) {}
630     void operator()(DataType i) const {
631         my_reserving_node.try_put(i);
632 #if _MSC_VER && !__INTEL_COMPILER
633 #pragma warning (push)
634 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
635 #endif
636         if (DoClear) {
637 #if _MSC_VER && !__INTEL_COMPILER
638 #pragma warning (pop)
639 #endif
640             my_reserving_node.clear();
641         }
642         my_buffer_node.try_put(i);
643         my_reserving_node.try_put(i);
644     }
645 };
646 
647 template< template <typename> class ReservingNodeType, typename DataType >
648 void test_reserving_nodes() {
649 #if TBB_TEST_LOW_WORKLOAD
650     const int N = 30;
651 #else
652     const int N = 300;
653 #endif
654 
655     tbb::flow::graph g;
656 
657     ReservingNodeType<DataType> reserving_n(g);
658 
659     tbb::flow::buffer_node<DataType> buffering_n(g);
660     tbb::flow::join_node< std::tuple<DataType, DataType>, tbb::flow::reserving > join_n(g);
661     harness_counting_receiver< std::tuple<DataType, DataType> > end_receiver(g);
662 
663     tbb::flow::make_edge(reserving_n, tbb::flow::input_port<0>(join_n));
664     tbb::flow::make_edge(buffering_n, tbb::flow::input_port<1>(join_n));
665     tbb::flow::make_edge(join_n, end_receiver);
666 
667     utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, false>(reserving_n, buffering_n));
668     g.wait_for_all();
669 
670     CHECK(end_receiver.my_count == N);
671 
672     // Should not hang
673     utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, true>(reserving_n, buffering_n));
674     g.wait_for_all();
675 
676     CHECK(end_receiver.my_count == 2 * N);
677 }
678 
679 namespace lightweight_testing {
680 
681 typedef std::tuple<int, int> output_tuple_type;
682 
683 template<typename NodeType>
684 class native_loop_body {
685     native_loop_body& operator=(const native_loop_body&) = delete;
686     NodeType& my_node;
687 public:
688     native_loop_body(NodeType& node) : my_node(node) {}
689 
690     void operator()(int) const {
691         std::thread::id this_id = std::this_thread::get_id();
692         my_node.try_put(this_id);
693     }
694 };
695 
696 std::atomic<unsigned> g_body_count;
697 
698 class concurrency_checker_body {
699 public:
700     concurrency_checker_body() { g_body_count = 0; }
701 
702     template<typename gateway_type>
703     void operator()(const std::thread::id& input, gateway_type&) { increase_and_check(input); }
704 
705     output_tuple_type operator()(const std::thread::id& input) {
706         increase_and_check(input);
707         return output_tuple_type();
708     }
709 
710 private:
711     void increase_and_check(const std::thread::id& input) {
712         ++g_body_count;
713         std::thread::id body_thread_id = std::this_thread::get_id();
714         CHECK_MESSAGE(input == body_thread_id, "Body executed as not lightweight");
715     }
716 };
717 
718 template<typename NodeType>
719 void test_unlimited_lightweight_execution(unsigned N) {
720     tbb::flow::graph g;
721     NodeType node(g, tbb::flow::unlimited, concurrency_checker_body());
722 
723     utils::NativeParallelFor(N, native_loop_body<NodeType>(node));
724     g.wait_for_all();
725 
726     CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
727 }
728 
729 std::mutex m;
730 std::condition_variable lightweight_condition;
731 std::atomic<bool> work_submitted;
732 std::atomic<bool> lightweight_work_processed;
733 
734 template<typename NodeType>
735 class native_loop_limited_body {
736     native_loop_limited_body& operator=(const native_loop_limited_body&) = delete;
737     NodeType& my_node;
738     utils::SpinBarrier& my_barrier;
739 public:
740     native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier):
741         my_node(node), my_barrier(barrier) {}
742     void operator()(int) const {
743         std::thread::id this_id = std::this_thread::get_id();
744         my_node.try_put(this_id);
745         if(!lightweight_work_processed) {
746             my_barrier.wait();
747             work_submitted = true;
748             lightweight_condition.notify_all();
749         }
750     }
751 };
752 
753 struct condition_predicate {
754     bool operator()() {
755         return work_submitted;
756     }
757 };
758 
759 std::atomic<unsigned> g_lightweight_count;
760 std::atomic<unsigned> g_task_count;
761 
762 class limited_lightweight_checker_body {
763 public:
764     limited_lightweight_checker_body() {
765         g_body_count = 0;
766         g_lightweight_count = 0;
767         g_task_count = 0;
768     }
769 private:
770     void increase_and_check(const std::thread::id& /*input*/) {
771         ++g_body_count;
772         // TODO revamp: in order not to rely on scheduler functionality anymore add
773         // __TBB_EXTRA_DEBUG for counting the number of tasks actually created by the flow graph,
774         // hence consider moving lightweight testing into whitebox test for the flow graph.
775         bool is_inside_task = false;/*tbb::task::self().state() == tbb::task::executing;*/
776         if(is_inside_task) {
777             ++g_task_count;
778         } else {
779             std::unique_lock<std::mutex> lock(m);
780             lightweight_condition.wait(lock, condition_predicate());
781             ++g_lightweight_count;
782             lightweight_work_processed = true;
783         }
784     }
785 public:
786     template<typename gateway_type>
787     void operator()(const std::thread::id& input, gateway_type&) {
788         increase_and_check(input);
789     }
790     output_tuple_type operator()(const std::thread::id& input) {
791         increase_and_check(input);
792         return output_tuple_type();
793     }
794 };
795 
796 template<typename NodeType>
797 void test_limited_lightweight_execution(unsigned N, unsigned concurrency) {
798     CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
799                   "Test for limited concurrency cannot be called with unlimited concurrency argument");
800     tbb::flow::graph g;
801     NodeType node(g, concurrency, limited_lightweight_checker_body());
802     // Execute first body as lightweight, then wait for all other threads to fill internal buffer.
803     // Then unblock the lightweight thread and check if other body executions are inside oneTBB task.
804     utils::SpinBarrier barrier(N - concurrency);
805     utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
806     g.wait_for_all();
807     CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
808     // TODO revamp: enable the following checks once whitebox flow graph testing is ready for it.
809     // CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
810     // CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
811     work_submitted = false;
812     lightweight_work_processed = false;
813 }
814 
815 template<typename NodeType>
816 void test_lightweight(unsigned N) {
817     test_unlimited_lightweight_execution<NodeType>(N);
818     test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial);
819     test_limited_lightweight_execution<NodeType>(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2));
820 }
821 
822 template<template<typename, typename, typename> class NodeType>
823 void test(unsigned N) {
824     typedef std::thread::id input_type;
825     typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight> node_type;
826     test_lightweight<node_type>(N);
827 }
828 
829 } // namespace lightweight_testing
830 
831 #endif  // __TBB_harness_graph_H
832