1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 
22 #include "common/test.h"
23 
24 #include "common/utils.h"
25 #include "common/graph_utils.h"
26 
27 #include "oneapi/tbb/flow_graph.h"
28 #include "oneapi/tbb/task_arena.h"
29 #include "oneapi/tbb/global_control.h"
30 
31 #include "conformance_flowgraph.h"
32 
33 //! \file conformance_multifunction_node.cpp
34 //! \brief Test for [flow_graph.function_node] specification
35 
36 /*
37 TODO: implement missing conformance tests for multifunction_node:
38   - [ ] Implement test_forwarding that checks messages are broadcast to all the successors connected
39     to the output port the message is being sent to. And check that the value passed is the
40     actual one received.
41   - [ ] Explicit test for copy constructor of the node.
42   - [ ] Constructor with explicitly passed Policy parameter: `template<typename Body>
43     multifunction_node( graph &g, size_t concurrency, Body body, Policy(), node_priority_t priority = no_priority )'.
44   - [ ] Concurrency testing of the node: make a loop over possible concurrency levels. It is
45     important to test at least on five values: 1, oneapi::tbb::flow::serial, `max_allowed_parallelism'
46     obtained from `oneapi::tbb::global_control', `oneapi::tbb::flow::unlimited', and, if `max allowed
47     parallelism' is > 2, use something in the middle of the [1, max_allowed_parallelism]
48     interval. Use `utils::ExactConcurrencyLevel' entity (extending it if necessary).
49   - [ ] make `test_rejecting' deterministic, i.e. avoid dependency on OS scheduling of the threads;
50     add check that `try_put()' returns `false'
51   - [ ] The `copy_body' function copies altered body (e.g. after successful `try_put()' call).
52   - [ ] `output_ports_type' is defined and accessible by the user.
53   - [ ] Explicit test on `mfn::output_ports()' method.
54   - [ ] The copy constructor and copy assignment are called for the node's input and output types.
55   - [ ] Add CTAD test.
56 */
57 
58 template< typename OutputType >
59 struct mf_functor {
60 
61     std::atomic<std::size_t>& local_execute_count;
62 
63     mf_functor(std::atomic<std::size_t>& execute_count ) :
64         local_execute_count (execute_count)
65     {  }
66 
67     mf_functor( const mf_functor &f ) : local_execute_count(f.local_execute_count) { }
68     void operator=(const mf_functor &f) { local_execute_count = std::size_t(f.local_execute_count); }
69 
70     void operator()( const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) {
71        ++local_execute_count;
72        std::get<0>(op).try_put(argument);
73     }
74 
75 };
76 
77 template<typename I, typename O>
78 void test_inheritance(){
79     using namespace oneapi::tbb::flow;
80 
81     CHECK_MESSAGE( (std::is_base_of<graph_node, multifunction_node<I, O>>::value), "multifunction_node should be derived from graph_node");
82     CHECK_MESSAGE( (std::is_base_of<receiver<I>, multifunction_node<I, O>>::value), "multifunction_node should be derived from receiver<Input>");
83 }
84 
85 void test_multifunc_body(){
86     oneapi::tbb::flow::graph g;
87     std::atomic<size_t> local_count(0);
88     mf_functor<std::tuple<int>> fun(local_count);
89 
90     oneapi::tbb::flow::multifunction_node<int, std::tuple<int>, oneapi::tbb::flow::rejecting> node1(g, oneapi::tbb::flow::unlimited, fun);
91 
92     const size_t n = 10;
93     for(size_t i = 0; i < n; ++i) {
94         CHECK_MESSAGE((node1.try_put(1) == true), "try_put needs to return true");
95     }
96     g.wait_for_all();
97 
98     CHECK_MESSAGE( (local_count == n), "Body of the node needs to be executed N times");
99 }
100 
101 template<typename I, typename O>
102 struct CopyCounterBody{
103     size_t copy_count;
104 
105     CopyCounterBody():
106         copy_count(0) {}
107 
108     CopyCounterBody(const CopyCounterBody<I, O>& other):
109         copy_count(other.copy_count + 1) {}
110 
111     CopyCounterBody& operator=(const CopyCounterBody<I, O>& other)
112     { copy_count = other.copy_count + 1; return *this;}
113 
114     void operator()( const I& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) {
115        std::get<0>(op).try_put(argument);
116     }
117 };
118 
119 void test_copies(){
120      using namespace oneapi::tbb::flow;
121 
122      CopyCounterBody<int, std::tuple<int>> b;
123 
124      graph g;
125      multifunction_node<int, std::tuple<int>> fn(g, unlimited, b);
126 
127      CopyCounterBody<int, std::tuple<int>> b2 = copy_body<CopyCounterBody<int, std::tuple<int>>,
128                                                           multifunction_node<int, std::tuple<int>>>(fn);
129 
130      CHECK_MESSAGE( (b.copy_count + 2 <= b2.copy_count), "copy_body and constructor should copy bodies");
131 }
132 
133 template< typename OutputType >
134 struct id_functor {
135     void operator()( const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) {
136        std::get<0>(op).try_put(argument);
137     }
138 };
139 
140 void test_forwarding(){
141     oneapi::tbb::flow::graph g;
142     id_functor<int> fun;
143 
144     oneapi::tbb::flow::multifunction_node<int, std::tuple<int>> node1(g, oneapi::tbb::flow::unlimited, fun);
145     test_push_receiver<int> node2(g);
146     test_push_receiver<int> node3(g);
147 
148     oneapi::tbb::flow::make_edge(node1, node2);
149     oneapi::tbb::flow::make_edge(node1, node3);
150 
151     node1.try_put(1);
152     g.wait_for_all();
153 
154     CHECK_MESSAGE( (get_count(node3) == 1), "Descendant of the node must receive one message.");
155     CHECK_MESSAGE( (get_count(node2) == 1), "Descendant of the node must receive one message.");
156 }
157 
158 void test_rejecting_buffering(){
159     oneapi::tbb::flow::graph g;
160     id_functor<int> fun;
161 
162     oneapi::tbb::flow::multifunction_node<int, std::tuple<int>, oneapi::tbb::flow::rejecting> node(g, oneapi::tbb::flow::unlimited, fun);
163     oneapi::tbb::flow::limiter_node<int> rejecter(g, 0);
164 
165     oneapi::tbb::flow::make_edge(node, rejecter);
166     node.try_put(1);
167 
168     int tmp = -1;
169     CHECK_MESSAGE( (std::get<0>(node.output_ports()).try_get(tmp) == false), "try_get after rejection should not succeed");
170     CHECK_MESSAGE( (tmp == -1), "try_get after rejection should alter passed value");
171     g.wait_for_all();
172 }
173 
174 void test_policy_ctors(){
175     using namespace oneapi::tbb::flow;
176     graph g;
177 
178     id_functor<int> fun;
179 
180     multifunction_node<int, std::tuple<int>, lightweight> lw_node(g, oneapi::tbb::flow::serial, fun);
181     multifunction_node<int, std::tuple<int>, queueing_lightweight> qlw_node(g, oneapi::tbb::flow::serial, fun);
182     multifunction_node<int, std::tuple<int>, rejecting_lightweight> rlw_node(g, oneapi::tbb::flow::serial, fun);
183 
184 }
185 
186 std::atomic<size_t> my_concurrency;
187 std::atomic<size_t> my_max_concurrency;
188 
189 struct concurrency_functor {
190     void operator()( const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) {
191         ++my_concurrency;
192 
193         size_t old_value = my_max_concurrency;
194         while(my_max_concurrency < my_concurrency &&
195               !my_max_concurrency.compare_exchange_weak(old_value, my_concurrency))
196             ;
197 
198         size_t ms = 1000;
199         std::chrono::milliseconds sleep_time( ms );
200         std::this_thread::sleep_for( sleep_time );
201 
202         --my_concurrency;
203         std::get<0>(op).try_put(argument);
204     }
205 
206 };
207 
208 void test_node_concurrency(){
209     my_concurrency = 0;
210     my_max_concurrency = 0;
211 
212     oneapi::tbb::flow::graph g;
213 
214     concurrency_functor counter;
215     oneapi::tbb::flow::multifunction_node <int, std::tuple<int>> fnode(g, oneapi::tbb::flow::serial, counter);
216 
217     test_push_receiver<int> sink(g);
218 
219     make_edge(std::get<0>(fnode.output_ports()), sink);
220 
221     for(int i = 0; i < 10; ++i){
222         fnode.try_put(i);
223     }
224 
225     g.wait_for_all();
226     CHECK_MESSAGE( ( my_max_concurrency.load() == 1), "Measured parallelism over limit");
227 }
228 
229 
230 void test_priority(){
231     size_t concurrency_limit = 1;
232     oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit);
233 
234     oneapi::tbb::flow::graph g;
235 
236     oneapi::tbb::flow::continue_node<int> source(g,
237                                          [](oneapi::tbb::flow::continue_msg){ return 1;});
238     source.try_put(oneapi::tbb::flow::continue_msg());
239 
240     first_functor<int>::first_id = -1;
241     first_functor<int> low_functor(1);
242     first_functor<int> high_functor(2);
243 
244     oneapi::tbb::flow::multifunction_node<int, std::tuple<int>> high(g, oneapi::tbb::flow::unlimited, high_functor, oneapi::tbb::flow::node_priority_t(1));
245     oneapi::tbb::flow::multifunction_node<int, std::tuple<int>> low(g, oneapi::tbb::flow::unlimited, low_functor);
246 
247     make_edge(source, low);
248     make_edge(source, high);
249 
250     g.wait_for_all();
251 
252     CHECK_MESSAGE( (first_functor<int>::first_id == 2), "High priority node should execute first");
253 }
254 
255 void test_rejecting(){
256     oneapi::tbb::flow::graph g;
257     oneapi::tbb::flow::multifunction_node <int, std::tuple<int>, oneapi::tbb::flow::rejecting> fnode(g, oneapi::tbb::flow::serial,
258                                                                     [&](const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ){
259                                                                         size_t ms = 50;
260                                                                         std::chrono::milliseconds sleep_time( ms );
261                                                                         std::this_thread::sleep_for( sleep_time );
262                                                                         std::get<0>(op).try_put(argument);
263                                                                     });
264 
265     test_push_receiver<int> sink(g);
266 
267     make_edge(std::get<0>(fnode.output_ports()), sink);
268 
269     for(int i = 0; i < 10; ++i){
270         fnode.try_put(i);
271     }
272 
273     g.wait_for_all();
274     CHECK_MESSAGE( (get_count(sink) == 1), "Messages should be rejected while the first is being processed");
275 }
276 
277 //! Test multifunction_node with rejecting policy
278 //! \brief \ref interface
279 TEST_CASE("multifunction_node with rejecting policy"){
280     test_rejecting();
281 }
282 
283 //! Test priorities
284 //! \brief \ref interface
285 TEST_CASE("multifunction_node priority"){
286     test_priority();
287 }
288 
289 //! Test concurrency
290 //! \brief \ref interface
291 TEST_CASE("multifunction_node concurrency"){
292     test_node_concurrency();
293 }
294 
295 //! Test constructors
296 //! \brief \ref interface
297 TEST_CASE("multifunction_node constructors"){
298     test_policy_ctors();
299 }
300 
301 //! Test function_node buffering
302 //! \brief \ref requirement
303 TEST_CASE("multifunction_node buffering"){
304     test_rejecting_buffering();
305 }
306 
307 //! Test function_node broadcasting
308 //! \brief \ref requirement
309 TEST_CASE("multifunction_node broadcast"){
310     test_forwarding();
311 }
312 
313 //! Test body copying and copy_body logic
314 //! \brief \ref interface
315 TEST_CASE("multifunction_node constructors"){
316     test_copies();
317 }
318 
319 //! Test calling function body
320 //! \brief \ref interface \ref requirement
321 TEST_CASE("multifunction_node body") {
322     test_multifunc_body();
323 }
324 
325 //! Test inheritance relations
326 //! \brief \ref interface
327 TEST_CASE("multifunction_node superclasses"){
328     test_inheritance<int, std::tuple<int>>();
329     test_inheritance<void*, std::tuple<float>>();
330 }
331