xref: /oneTBB/test/common/graph_utils.h (revision c21e688a)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 /** @file harness_graph.cpp
18     This contains common helper classes and functions for testing graph nodes
19 **/
20 
21 #ifndef __TBB_harness_graph_H
22 #define __TBB_harness_graph_H
23 
24 #include "config.h"
25 
26 #include "oneapi/tbb/flow_graph.h"
27 #include "oneapi/tbb/task.h"
28 #include "oneapi/tbb/null_rw_mutex.h"
29 #include "oneapi/tbb/concurrent_unordered_set.h"
30 
31 #include <atomic>
32 #include <thread>
33 #include <mutex>
34 #include <condition_variable>
35 
36 #include "common/spin_barrier.h"
37 
38 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
39 
40 // Needed conversion to and from continue_msg, but didn't want to add
41 // conversion operators to the class, since we don't want it in general,
42 // only in these tests.
43 template<typename InputType, typename OutputType>
44 struct converter {
convert_valueconverter45     static OutputType convert_value(const InputType &i) {
46         return OutputType(i);
47     }
48 };
49 
50 template<typename InputType>
51 struct converter<InputType,tbb::flow::continue_msg> {
52     static tbb::flow::continue_msg convert_value(const InputType &/*i*/) {
53         return tbb::flow::continue_msg();
54     }
55 };
56 
57 template<typename OutputType>
58 struct converter<tbb::flow::continue_msg,OutputType> {
59     static OutputType convert_value(const tbb::flow::continue_msg &/*i*/) {
60         return OutputType();
61     }
62 };
63 
64 // helper for multifunction_node tests.
65 template<size_t N>
66 struct mof_helper {
67     template<typename InputType, typename ports_type>
68     static inline void output_converted_value(const InputType &i, ports_type &p) {
69         (void)std::get<N-1>(p).try_put(converter<InputType,typename std::tuple_element<N-1,ports_type>::type::output_type>::convert_value(i));
70         output_converted_value<N-1>(i, p);
71     }
72 };
73 
74 template<>
75 struct mof_helper<1> {
76     template<typename InputType, typename ports_type>
77     static inline void output_converted_value(const InputType &i, ports_type &p) {
78         // just emit a default-constructed object
79         (void)std::get<0>(p).try_put(converter<InputType,typename std::tuple_element<0,ports_type>::type::output_type>::convert_value(i));
80     }
81 };
82 
83 template< typename InputType, typename OutputType >
84 struct harness_graph_default_functor {
85     static OutputType construct( InputType v ) {
86         return OutputType(v);
87     }
88 };
89 
90 template< typename OutputType >
91 struct harness_graph_default_functor< tbb::flow::continue_msg, OutputType > {
92     static OutputType construct( tbb::flow::continue_msg ) {
93         return OutputType();
94     }
95 };
96 
97 template< typename InputType >
98 struct harness_graph_default_functor< InputType, tbb::flow::continue_msg > {
99     static tbb::flow::continue_msg construct( InputType ) {
100         return tbb::flow::continue_msg();
101     }
102 };
103 
104 template< >
105 struct harness_graph_default_functor< tbb::flow::continue_msg, tbb::flow::continue_msg > {
106     static tbb::flow::continue_msg construct( tbb::flow::continue_msg ) {
107         return tbb::flow::continue_msg();
108     }
109 };
110 
111 template<typename InputType, typename OutputSet>
112 struct harness_graph_default_multifunction_functor {
113     static const int N = std::tuple_size<OutputSet>::value;
114     typedef typename tbb::flow::multifunction_node<InputType,OutputSet>::output_ports_type ports_type;
115     static void construct(const InputType &i, ports_type &p) {
116         mof_helper<N>::output_converted_value(i, p);
117     }
118 };
119 
120 //! An executor that accepts InputType and generates OutputType
121 template< typename InputType, typename OutputType >
122 struct harness_graph_executor {
123 
124     typedef OutputType (*function_ptr_type)( InputType v );
125 
126     template<typename RW>
127     struct mutex_holder { static RW mutex; };
128 
129     static function_ptr_type fptr;
130     static std::atomic<size_t> execute_count;
131     static std::atomic<size_t> current_executors;
132     static size_t max_executors;
133 
134     static inline OutputType func( InputType v ) {
135         size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture
136         c = current_executors++;
137         if (max_executors != 0) {
138             CHECK(c <= max_executors);
139         }
140         ++execute_count;
141         OutputType v2 = (*fptr)(v);
142         --current_executors;
143         return v2;
144     }
145 
146     template< typename RW >
147     static inline OutputType tfunc( InputType v ) {
148         // Invocations allowed to be concurrent, the lock is acquired in shared ("read") mode.
149         // A test can take it exclusively, thus creating a barrier for invocations.
150         typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false );
151         return func(v);
152     }
153 
154     template< typename RW >
155     struct tfunctor {
156         std::atomic<size_t> my_execute_count;
157         tfunctor() { my_execute_count = 0; }
158         tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); }
159         OutputType operator()( InputType i ) {
160            typename RW::scoped_lock l( harness_graph_executor::mutex_holder<RW>::mutex, /*write=*/false );
161            ++my_execute_count;
162            return harness_graph_executor::func(i);
163         }
164     };
165     typedef tfunctor<tbb::null_rw_mutex> functor;
166 
167 };
168 
169 //! A multifunction executor that accepts InputType and has only one Output of OutputType.
170 template< typename InputType, typename OutputTuple >
171 struct harness_graph_multifunction_executor {
172     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type ports_type;
173     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
174 
175     typedef void (*mfunction_ptr_type)( const InputType& v, ports_type &p );
176 
177     template<typename RW>
178     struct mutex_holder { static RW mutex; };
179 
180     static mfunction_ptr_type fptr;
181     static std::atomic<size_t> execute_count;
182     static std::atomic<size_t> current_executors;
183     static size_t max_executors;
184 
185     static inline void empty_func( const InputType&, ports_type& ) {
186     }
187 
188     static inline void func( const InputType &v, ports_type &p ) {
189         size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture
190         c = current_executors++;
191         CHECK( (max_executors == 0 || c <= max_executors) );
192         CHECK( (std::tuple_size<OutputTuple>::value == 1) );
193         ++execute_count;
194         (*fptr)(v,p);
195         --current_executors;
196     }
197 
198     template< typename RW >
199     static inline void tfunc( const InputType& v, ports_type &p ) {
200         // Shared lock in invocations, exclusive in a test; see a comment in harness_graph_executor.
201         typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false );
202         func(v,p);
203     }
204 
205     template< typename RW >
206     struct tfunctor {
207         std::atomic<size_t> my_execute_count;
208         tfunctor() { my_execute_count = 0; }
209         tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); }
210         void operator()( const InputType &i, ports_type &p ) {
211            typename RW::scoped_lock l( harness_graph_multifunction_executor::mutex_holder<RW>::mutex, /*write=*/false );
212            ++my_execute_count;
213            harness_graph_multifunction_executor::func(i,p);
214         }
215     };
216     typedef tfunctor<tbb::null_rw_mutex> functor;
217 
218 };
219 
220 // static vars for function_node tests
221 template< typename InputType, typename OutputType >
222 template< typename RW >
223 RW harness_graph_executor<InputType, OutputType>::mutex_holder<RW>::mutex;
224 
225 template< typename InputType, typename OutputType >
226 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::execute_count;
227 
228 template< typename InputType, typename OutputType >
229 typename harness_graph_executor<InputType, OutputType>::function_ptr_type harness_graph_executor<InputType, OutputType>::fptr
230     = harness_graph_default_functor< InputType, OutputType >::construct;
231 
232 template< typename InputType, typename OutputType >
233 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::current_executors;
234 
235 template< typename InputType, typename OutputType >
236 size_t harness_graph_executor<InputType, OutputType>::max_executors = 0;
237 
238 // static vars for multifunction_node tests
239 template< typename InputType, typename OutputTuple >
240 template< typename RW >
241 RW harness_graph_multifunction_executor<InputType, OutputTuple>::mutex_holder<RW>::mutex;
242 
243 template< typename InputType, typename OutputTuple >
244 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
245 
246 template< typename InputType, typename OutputTuple >
247 typename harness_graph_multifunction_executor<InputType, OutputTuple>::mfunction_ptr_type harness_graph_multifunction_executor<InputType, OutputTuple>::fptr
248     = harness_graph_default_multifunction_functor< InputType, OutputTuple >::construct;
249 
250 template< typename InputType, typename OutputTuple >
251 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors;
252 
253 template< typename InputType, typename OutputTuple >
254 size_t harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
255 
256 //! Counts the number of puts received
257 template< typename T >
258 struct harness_counting_receiver : public tbb::flow::receiver<T> {
259     harness_counting_receiver& operator=(const harness_counting_receiver&) = delete;
260 
261     std::atomic< size_t > my_count;
262     T max_value;
263     size_t num_copies;
264     tbb::flow::graph& my_graph;
265 
266     harness_counting_receiver(tbb::flow::graph& g) : num_copies(1), my_graph(g) {
267        my_count = 0;
268     }
269 
270     void initialize_map( const T& m, size_t c ) {
271        my_count = 0;
272        max_value = m;
273        num_copies = c;
274     }
275 
276     tbb::flow::graph& graph_reference() const override {
277         return my_graph;
278     }
279 
280     tbb::detail::d1::graph_task *try_put_task( const T & ) override {
281       ++my_count;
282       return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED);
283     }
284 
285     void validate() {
286         size_t n = my_count;
287         CHECK( n == num_copies*max_value );
288     }
289  };
290 
291 //! Counts the number of puts received
292 template< typename T >
293 struct harness_mapped_receiver : public tbb::flow::receiver<T> {
294     harness_mapped_receiver(const harness_mapped_receiver&) = delete;
295     harness_mapped_receiver& operator=(const harness_mapped_receiver&) = delete;
296 
297     std::atomic< size_t > my_count;
298     T max_value;
299     size_t num_copies;
300     typedef tbb::concurrent_unordered_multiset<T> multiset_type;
301     multiset_type *my_multiset;
302     tbb::flow::graph& my_graph;
303 
304     harness_mapped_receiver(tbb::flow::graph& g) : my_multiset(nullptr), my_graph(g) {
305        my_count = 0;
306     }
307 
308 #if __INTEL_COMPILER <= 2021
309     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
310     // class while the parent class has the virtual keyword for the destrocutor.
311     virtual
312 #endif
313     ~harness_mapped_receiver() {
314         delete my_multiset;
315         my_multiset = nullptr;
316     }
317 
318     void initialize_map( const T& m, size_t c ) {
319        my_count = 0;
320        max_value = m;
321        num_copies = c;
322        delete my_multiset;
323        my_multiset = new multiset_type;
324     }
325 
326     tbb::detail::d1::graph_task* try_put_task( const T &t ) override {
327       if ( my_multiset ) {
328           (*my_multiset).emplace( t );
329       } else {
330           ++my_count;
331       }
332       return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED);
333     }
334 
335     tbb::flow::graph& graph_reference() const override {
336         return my_graph;
337     }
338 
339     void validate() {
340         if ( my_multiset ) {
341             for ( size_t i = 0; i < (size_t)max_value; ++i ) {
342                 auto it = (*my_multiset).find((int)i);
343                 CHECK_MESSAGE( it != my_multiset->end(), "Expected element in the map." );
344                 size_t n = (*my_multiset).count(int(i));
345                 CHECK( n == num_copies );
346             }
347         } else {
348             size_t n = my_count;
349             CHECK( n == num_copies*max_value );
350         }
351     }
352 
353     void reset_receiver(tbb::flow::reset_flags /*f*/) {
354         my_count = 0;
355         if(my_multiset) delete my_multiset;
356         my_multiset = new multiset_type;
357     }
358 
359 };
360 
361 //! Counts the number of puts received
362 template< typename T >
363 struct harness_counting_sender : public tbb::flow::sender<T> {
364     harness_counting_sender(const harness_counting_sender&) = delete;
365     harness_counting_sender& operator=(const harness_counting_sender&) = delete;
366 
367     typedef typename tbb::flow::sender<T>::successor_type successor_type;
368     std::atomic< successor_type * > my_receiver;
369     std::atomic< size_t > my_count;
370     std::atomic< size_t > my_received;
371     size_t my_limit;
372 
373     harness_counting_sender( ) : my_limit(~size_t(0)) {
374        my_receiver = nullptr;
375        my_count = 0;
376        my_received = 0;
377     }
378 
379     harness_counting_sender( size_t limit ) : my_limit(limit) {
380        my_receiver = nullptr;
381        my_count = 0;
382        my_received = 0;
383     }
384 
385     bool register_successor( successor_type &r ) override {
386         my_receiver = &r;
387         return true;
388     }
389 
390     bool remove_successor( successor_type &r ) override {
391         successor_type *s = my_receiver.exchange( nullptr );
392         CHECK( s == &r );
393         return true;
394     }
395 
396     bool try_get( T & v ) override {
397         size_t i = my_count++;
398         if ( i < my_limit ) {
399            v = T( i );
400            ++my_received;
401            return true;
402         } else {
403            return false;
404         }
405     }
406 
407     bool try_put_once() {
408         successor_type *s = my_receiver;
409         size_t i = my_count++;
410         if ( s->try_put( T(i) ) ) {
411             ++my_received;
412             return true;
413         } else {
414             return false;
415         }
416     }
417 
418     void try_put_until_false() {
419         successor_type *s = my_receiver;
420         size_t i = my_count++;
421 
422         while ( s->try_put( T(i) ) ) {
423             ++my_received;
424             i = my_count++;
425         }
426     }
427 
428     void try_put_until_limit() {
429         successor_type *s = my_receiver;
430 
431         for ( int i = 0; i < (int)my_limit; ++i ) {
432             CHECK( s->try_put( T(i) ) );
433             ++my_received;
434         }
435         CHECK( my_received == my_limit );
436     }
437 
438 };
439 
440 template< typename InputType >
441 struct parallel_put_until_limit {
442     parallel_put_until_limit& operator=(const parallel_put_until_limit&) = delete;
443 
444     typedef std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders_t;
445 
446     senders_t& my_senders;
447 
448     parallel_put_until_limit( senders_t& senders ) : my_senders(senders) {}
449 
450     void operator()( int i ) const  { my_senders[i]->try_put_until_limit(); }
451 };
452 
453 // test for resets of buffer-type nodes.
454 std::atomic<int> serial_fn_state0;
455 std::atomic<int> serial_fn_state1;
456 std::atomic<int> serial_continue_state0;
457 
458 template<typename T>
459 struct serial_fn_body {
460     std::atomic<int>& my_flag;
461     serial_fn_body(std::atomic<int>& flag) : my_flag(flag) { }
462     T operator()(const T& in) {
463         if (my_flag == 0) {
464             my_flag = 1;
465 
466             // wait until we are released
467             utils::SpinWaitWhileEq(my_flag, 1);
468         }
469         return in;
470     }
471 };
472 
473 template<typename T>
474 struct serial_continue_body {
475     std::atomic<int>& my_flag;
476     serial_continue_body(std::atomic<int> &flag) : my_flag(flag) {}
477     T operator()(const tbb::flow::continue_msg& /*in*/) {
478         // signal we have received a value
479         my_flag = 1;
480         // wait until we are released
481         utils::SpinWaitWhileEq(my_flag, 1);
482         return (T)1;
483     }
484 };
485 
486 template<typename T, typename BufferType>
487 void test_resets() {
488     const int NN = 3;
489     bool nFound[NN];
490     tbb::task_arena arena{4};
491     arena.execute(
492         [&] {
493             tbb::task_group_context   tgc;
494             tbb::flow::graph          g(tgc);
495             BufferType                b0(g);
496             tbb::flow::queue_node<T>  q0(g);
497             T j{};
498 
499             // reset empties buffer
500             for(T i = 0; i < NN; ++i) {
501                 b0.try_put(i);
502                 nFound[(int)i] = false;
503             }
504             g.wait_for_all();
505             g.reset();
506             CHECK_MESSAGE(!b0.try_get(j), "reset did not empty buffer");
507 
508             // reset doesn't delete edge
509 
510             tbb::flow::make_edge(b0,q0);
511             g.wait_for_all(); // TODO: invesigate why make_edge to buffer_node always creates a forwarding task
512             g.reset();
513             for(T i = 0; i < NN; ++i) {
514                 b0.try_put(i);
515             }
516 
517             g.wait_for_all();
518             for( T i = 0; i < NN; ++i) {
519                 CHECK_MESSAGE(q0.try_get(j), "Missing value from buffer");
520                 CHECK_MESSAGE(!nFound[(int)j], "Duplicate value found");
521                 nFound[(int)j] = true;
522             }
523 
524             for(int ii = 0; ii < NN; ++ii) {
525                 CHECK_MESSAGE(nFound[ii], "missing value");
526             }
527             CHECK_MESSAGE(!q0.try_get(j), "Extra values in output");
528 
529             // reset reverses a reversed edge.
530             // we will use a serial rejecting node to get the edge to reverse.
531             tbb::flow::function_node<T, T, tbb::flow::rejecting> sfn(g, tbb::flow::serial, serial_fn_body<T>(serial_fn_state0));
532             tbb::flow::queue_node<T> outq(g);
533             tbb::flow::remove_edge(b0,q0);
534             tbb::flow::make_edge(b0, sfn);
535             tbb::flow::make_edge(sfn,outq);
536             g.wait_for_all();  // wait for all the tasks started by building the graph are done.
537             serial_fn_state0 = 0;
538 
539             // b0 ------> sfn ------> outq
540             for(int icnt = 0; icnt < 2; ++icnt) {
541                 g.wait_for_all();
542                 serial_fn_state0 = 0;
543                 std::thread t([&] {
544                     b0.try_put((T)0);  // will start sfn
545                     g.wait_for_all();  // wait for all the tasks to complete.
546                 });
547                 // wait until function_node starts
548                 utils::SpinWaitWhileEq(serial_fn_state0, 0);
549                 // now the function_node is executing.
550                 // this will start a task to forward the second item
551                 // to the serial function node
552                 b0.try_put((T)1);  // first item will be consumed by task completing the execution
553                 b0.try_put((T)2);  // second item will remain after cancellation
554                 // now wait for the task that attempts to forward the buffer item to
555                 // complete.
556                 // now cancel the graph.
557                 CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled");
558                 serial_fn_state0 = 0;  // release the function_node.
559                 t.join();
560                 // check that at most one output reached the queue_node
561                 T outt;
562                 T outt2;
563                 bool got_item1 = outq.try_get(outt);
564                 bool got_item2 = outq.try_get(outt2);
565                 // either the output queue was empty (if the function_node tested for cancellation before putting the
566                 // result to the queue) or there was one element in the queue (the 0).
567                 bool is_successful_operation = got_item1 && (int)outt == 0 && !got_item2;
568                 CHECK_MESSAGE( is_successful_operation, "incorrect output from function_node");
569                 // the edge between the buffer and the function_node should be reversed, and the last
570                 // message we put in the buffer should still be there.  We can't directly test for the
571                 // edge reversal.
572                 got_item1 = b0.try_get(outt);
573                 CHECK_MESSAGE(got_item1, " buffer lost a message");
574                 is_successful_operation = (2 == (int)outt || 1 == (int)outt);
575                 CHECK_MESSAGE(is_successful_operation, " buffer had incorrect message");  // the one not consumed by the node.
576                 CHECK_MESSAGE(g.is_cancelled(), "Graph was not cancelled");
577                 g.reset();
578             }  // icnt
579 
580             // reset with remove_edge removes edge.  (icnt ==0 => forward edge, 1 => reversed edge
581             for(int icnt = 0; icnt < 2; ++icnt) {
582                 if(icnt == 1) {
583                     // set up reversed edge
584                     tbb::flow::make_edge(b0, sfn);
585                     tbb::flow::make_edge(sfn,outq);
586                     serial_fn_state0 = 0;
587                     std::thread t([&] {
588                         b0.try_put((T)0);  // starts up the function node
589                         b0.try_put((T)1);  // should reverse the edge
590                         g.wait_for_all();  // wait for all the tasks to complete.
591                     });
592                     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for edge reversal
593                     CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled");
594                     serial_fn_state0 = 0;  // release the function_node.
595                     t.join();
596                 }
597                 g.reset(tbb::flow::rf_clear_edges);
598                 // test that no one is a successor to the buffer now.
599                 serial_fn_state0 = 1;  // let the function_node go if it gets an input message
600                 b0.try_put((T)23);
601                 g.wait_for_all();
602                 CHECK_MESSAGE((int)serial_fn_state0 == 1, "function_node executed when it shouldn't");
603                 T outt;
604                 bool is_successful_operation = b0.try_get(outt) && (T)23 == outt && !outq.try_get(outt);
605                 CHECK_MESSAGE(is_successful_operation, "node lost its input");
606             }
607         }
608     );                          // arena.execute()
609 }
610 
611 template<typename NodeType>
612 void test_input_ports_return_ref(NodeType& mip_node) {
613     typename NodeType::input_ports_type& input_ports1 = mip_node.input_ports();
614     typename NodeType::input_ports_type& input_ports2 = mip_node.input_ports();
615     CHECK_MESSAGE(&input_ports1 == &input_ports2, "input_ports() should return reference");
616 }
617 
618 template<typename NodeType>
619 void test_output_ports_return_ref(NodeType& mop_node) {
620     typename NodeType::output_ports_type& output_ports1 = mop_node.output_ports();
621     typename NodeType::output_ports_type& output_ports2 = mop_node.output_ports();
622     CHECK_MESSAGE(&output_ports1 == &output_ports2, "output_ports() should return reference");
623 }
624 
625 template< template <typename> class ReservingNodeType, typename DataType, bool DoClear >
626 class harness_reserving_body {
627     harness_reserving_body& operator=(const harness_reserving_body&) = delete;
628     ReservingNodeType<DataType> &my_reserving_node;
629     tbb::flow::buffer_node<DataType> &my_buffer_node;
630 public:
631     harness_reserving_body(ReservingNodeType<DataType> &reserving_node, tbb::flow::buffer_node<DataType> &bn) : my_reserving_node(reserving_node), my_buffer_node(bn) {}
632     void operator()(DataType i) const {
633         my_reserving_node.try_put(i);
634 #if _MSC_VER && !__INTEL_COMPILER
635 #pragma warning (push)
636 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
637 #endif
638         if (DoClear) {
639 #if _MSC_VER && !__INTEL_COMPILER
640 #pragma warning (pop)
641 #endif
642             my_reserving_node.clear();
643         }
644         my_buffer_node.try_put(i);
645         my_reserving_node.try_put(i);
646     }
647 };
648 
649 template< template <typename> class ReservingNodeType, typename DataType >
650 void test_reserving_nodes() {
651 #if TBB_TEST_LOW_WORKLOAD
652     const int N = 30;
653 #else
654     const int N = 300;
655 #endif
656 
657     tbb::flow::graph g;
658 
659     ReservingNodeType<DataType> reserving_n(g);
660 
661     tbb::flow::buffer_node<DataType> buffering_n(g);
662     tbb::flow::join_node< std::tuple<DataType, DataType>, tbb::flow::reserving > join_n(g);
663     harness_counting_receiver< std::tuple<DataType, DataType> > end_receiver(g);
664 
665     tbb::flow::make_edge(reserving_n, tbb::flow::input_port<0>(join_n));
666     tbb::flow::make_edge(buffering_n, tbb::flow::input_port<1>(join_n));
667     tbb::flow::make_edge(join_n, end_receiver);
668 
669     utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, false>(reserving_n, buffering_n));
670     g.wait_for_all();
671 
672     CHECK(end_receiver.my_count == N);
673 
674     // Should not hang
675     utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, true>(reserving_n, buffering_n));
676     g.wait_for_all();
677 
678     CHECK(end_receiver.my_count == 2 * N);
679 }
680 
681 namespace lightweight_testing {
682 
683 typedef std::tuple<int, int> output_tuple_type;
684 
685 template<typename NodeType>
686 class native_loop_body {
687     native_loop_body& operator=(const native_loop_body&) = delete;
688     NodeType& my_node;
689 public:
690     native_loop_body(NodeType& node) : my_node(node) {}
691 
692     void operator()(int) const noexcept {
693         std::thread::id this_id = std::this_thread::get_id();
694         my_node.try_put(this_id);
695     }
696 };
697 
698 std::atomic<unsigned> g_body_count;
699 
700 class concurrency_checker_body {
701 public:
702     concurrency_checker_body() { g_body_count = 0; }
703 
704     template<typename gateway_type>
705     void operator()(const std::thread::id& input, gateway_type&) noexcept { increase_and_check(input); }
706 
707     output_tuple_type operator()(const std::thread::id& input) noexcept {
708         increase_and_check(input);
709         return output_tuple_type();
710     }
711 
712 private:
713     void increase_and_check(const std::thread::id& input) {
714         ++g_body_count;
715         std::thread::id body_thread_id = std::this_thread::get_id();
716         CHECK_MESSAGE(input == body_thread_id, "Body executed as not lightweight");
717     }
718 };
719 
720 template<typename NodeType>
721 void test_unlimited_lightweight_execution(unsigned N) {
722     tbb::flow::graph g;
723     NodeType node(g, tbb::flow::unlimited, concurrency_checker_body());
724 
725     utils::NativeParallelFor(N, native_loop_body<NodeType>(node));
726     g.wait_for_all();
727 
728     CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
729 }
730 
731 std::mutex m;
732 std::condition_variable lightweight_condition;
733 std::atomic<bool> work_submitted;
734 std::atomic<bool> lightweight_work_processed;
735 
736 template<typename NodeType>
737 class native_loop_limited_body {
738     native_loop_limited_body& operator=(const native_loop_limited_body&) = delete;
739     NodeType& my_node;
740     utils::SpinBarrier& my_barrier;
741 public:
742     native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier):
743         my_node(node), my_barrier(barrier) {}
744     void operator()(int) const noexcept {
745         std::thread::id this_id = std::this_thread::get_id();
746         my_node.try_put(this_id);
747         if(!lightweight_work_processed) {
748             my_barrier.wait();
749             work_submitted = true;
750             lightweight_condition.notify_all();
751         }
752     }
753 };
754 
755 struct condition_predicate {
756     bool operator()() {
757         return work_submitted;
758     }
759 };
760 
761 std::atomic<unsigned> g_lightweight_count;
762 std::atomic<unsigned> g_task_count;
763 
764 template <bool NoExcept>
765 class limited_lightweight_checker_body {
766 public:
767     limited_lightweight_checker_body() {
768         g_body_count = 0;
769         g_lightweight_count = 0;
770         g_task_count = 0;
771     }
772 private:
773     void increase_and_check(const std::thread::id& /*input*/) {
774         ++g_body_count;
775 
776         bool is_inside_task = oneapi::tbb::task::current_context() != nullptr;
777 
778         if(is_inside_task) {
779             ++g_task_count;
780         } else {
781             std::unique_lock<std::mutex> lock(m);
782             lightweight_condition.wait(lock, condition_predicate());
783             ++g_lightweight_count;
784             lightweight_work_processed = true;
785         }
786     }
787 public:
788     template<typename gateway_type>
789     void operator()(const std::thread::id& input, gateway_type&) noexcept(NoExcept) {
790         increase_and_check(input);
791     }
792     output_tuple_type operator()(const std::thread::id& input) noexcept(NoExcept) {
793         increase_and_check(input);
794         return output_tuple_type();
795     }
796 };
797 
798 template<typename NodeType>
799 void test_limited_lightweight_execution(unsigned N, unsigned concurrency) {
800     CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
801                   "Test for limited concurrency cannot be called with unlimited concurrency argument");
802     tbb::flow::graph g;
803     NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/true>());
804     // Execute first body as lightweight, then wait for all other threads to fill internal buffer.
805     // Then unblock the lightweight thread and check if other body executions are inside oneTBB task.
806     utils::SpinBarrier barrier(N - concurrency);
807     utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
808     g.wait_for_all();
809     CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
810     CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
811     CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
812     work_submitted = false;
813     lightweight_work_processed = false;
814 }
815 
816 template<typename NodeType>
817 void test_limited_lightweight_execution_with_throwing_body(unsigned N, unsigned concurrency) {
818     CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
819                   "Test for limited concurrency cannot be called with unlimited concurrency argument");
820     tbb::flow::graph g;
821     NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/false>());
822     // Body is no noexcept, in this case it must be executed as tasks, instead of lightweight execution
823     utils::SpinBarrier barrier(N);
824     utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
825     g.wait_for_all();
826     CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
827     CHECK_MESSAGE(g_lightweight_count == 0, "Body needs to be executed with queueing policy");
828     CHECK_MESSAGE(g_task_count == N, "Body needs to be executed as task N times");
829     work_submitted = false;
830     lightweight_work_processed = false;
831 }
832 
833 template <int Threshold>
834 struct throwing_body{
835     std::atomic<int>& my_counter;
836 
837     throwing_body(std::atomic<int>& counter) : my_counter(counter) {}
838 
839     template<typename input_type, typename gateway_type>
840     void operator()(const input_type&, gateway_type&) {
841         ++my_counter;
842         if(my_counter == Threshold)
843             throw Threshold;
844     }
845 
846     template<typename input_type>
847     output_tuple_type operator()(const input_type&) {
848         ++my_counter;
849         if(my_counter == Threshold)
850             throw Threshold;
851         return output_tuple_type();
852     }
853 };
854 
855 #if TBB_USE_EXCEPTIONS
856 //! Test excesption thrown in node with lightweight policy was rethrown by graph
857 template<template<typename, typename, typename> class NodeType>
858 void test_exception_ligthweight_policy(){
859     std::atomic<int> counter {0};
860     constexpr int threshold = 10;
861 
862     using IndexerNodeType = oneapi::tbb::flow::indexer_node<int, int>;
863     using FuncNodeType = NodeType<IndexerNodeType::output_type, output_tuple_type, tbb::flow::lightweight>;
864     oneapi::tbb::flow::graph g;
865 
866     IndexerNodeType indexer(g);
867     FuncNodeType tested_node(g, oneapi::tbb::flow::serial, throwing_body<threshold>(counter));
868     oneapi::tbb::flow::make_edge(indexer, tested_node);
869 
870     utils::NativeParallelFor( threshold * 2, [&](int i){
871         if(i % 2)
872             std::get<1>(indexer.input_ports()).try_put(1);
873         else
874             std::get<0>(indexer.input_ports()).try_put(0);
875     });
876 
877     bool catchException = false;
878     try
879     {
880         g.wait_for_all();
881     }
882     catch (const int& exc)
883     {
884         catchException = true;
885         CHECK_MESSAGE( exc == threshold, "graph.wait_for_all() rethrow current exception" );
886     }
887     CHECK_MESSAGE( catchException, "The exception must be thrown from graph.wait_for_all()" );
888     CHECK_MESSAGE( counter == threshold, "Graph must cancel all tasks after exception" );
889 }
890 #endif /* TBB_USE_EXCEPTIONS */
891 
892 template<typename NodeType>
893 void test_lightweight(unsigned N) {
894     test_unlimited_lightweight_execution<NodeType>(N);
895     test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial);
896     test_limited_lightweight_execution<NodeType>(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2));
897 
898     test_limited_lightweight_execution_with_throwing_body<NodeType>(N, tbb::flow::serial);
899 }
900 
901 template<template<typename, typename, typename> class NodeType>
902 void test(unsigned N) {
903     typedef std::thread::id input_type;
904     typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight> node_type;
905     test_lightweight<node_type>(N);
906 
907 #if TBB_USE_EXCEPTIONS
908     test_exception_ligthweight_policy<NodeType>();
909 #endif /* TBB_USE_EXCEPTIONS */
910 }
911 
912 } // namespace lightweight_testing
913 
914 #endif  // __TBB_harness_graph_H
915