1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "conformance_flowgraph.h"
22 
23 //! \file conformance_graph.cpp
24 //! \brief Test for [flow_graph.graph] specification
25 
test_continue_node_rf_reset_protocol()26 void test_continue_node_rf_reset_protocol(){
27     using namespace oneapi::tbb::flow;
28     graph g;
29 
30     std::atomic<bool> flag = {false};
31     continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;});
32 
33     source.try_put(continue_msg());
34     g.wait_for_all();
35 
36     CHECK_MESSAGE((flag == false), "Should be false");
37 
38     g.reset(rf_reset_protocol);
39 
40     source.try_put(continue_msg());
41     g.wait_for_all();
42     CHECK_MESSAGE((flag == false), "Internal number of predecessors reinitialized");
43 
44     source.try_put(continue_msg());
45     g.wait_for_all();
46     CHECK_MESSAGE((flag == true), "Should be true");
47 }
48 
test_input_node_rf_reset_protocol()49 void test_input_node_rf_reset_protocol(){
50     oneapi::tbb::flow::graph g;
51 
52     conformance::copy_counting_object<int> fun;
53 
54     oneapi::tbb::flow::input_node<int> node(g, fun);
55     oneapi::tbb::flow::limiter_node<int> rejecter(g, 0);
56 
57     oneapi::tbb::flow::make_edge(node, rejecter);
58 
59     node.activate();
60     g.wait_for_all();
61 
62     g.reset(oneapi::tbb::flow::rf_reset_protocol);
63 
64     int tmp = -1;
65     CHECK_MESSAGE((node.try_get(tmp) == false), "Should be false");
66 }
67 
68 template<typename Node>
test_functional_nodes_rf_reset_protocol()69 void test_functional_nodes_rf_reset_protocol(){
70     oneapi::tbb::flow::graph g;
71     size_t concurrency_limit = 1;
72     oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit);
73 
74     conformance::counting_functor<int> counting_body;
75     Node f(g, oneapi::tbb::flow::serial, counting_body);
76 
77     f.try_put(0);
78     f.try_put(0);
79     CHECK_MESSAGE((counting_body.execute_count == 0), "Body should not be executed");
80     g.reset(oneapi::tbb::flow::rf_reset_protocol);
81 
82     g.wait_for_all();
83     CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
84 }
85 
86 template<typename Node, typename ...Args>
test_buffering_nodes_rf_reset_protocol(Args...node_body)87 void test_buffering_nodes_rf_reset_protocol(Args... node_body){
88     oneapi::tbb::flow::graph g;
89     Node testing_node(g, node_body...);
90 
91     int tmp = -1;
92     CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
93     CHECK_MESSAGE((tmp == -1), "Value should not be updated");
94 
95     testing_node.try_put(1);
96     g.wait_for_all();
97     g.reset(oneapi::tbb::flow::rf_reset_protocol);
98 
99     tmp = -1;
100     CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
101     CHECK_MESSAGE((tmp == -1), "Value should not be updated");
102     g.wait_for_all();
103 }
104 
105 template<typename Node, typename InputType, typename ...Args>
test_nodes_with_body_rf_reset_bodies(Args...node_args)106 void test_nodes_with_body_rf_reset_bodies(Args... node_args){
107     oneapi::tbb::flow::graph g;
108     conformance::counting_functor<int> counting_body(5);
109     Node testing_node(g, node_args..., counting_body);
110 
111     testing_node.try_put(InputType());
112     g.wait_for_all();
113 
114     CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
115 
116     g.reset(oneapi::tbb::flow::rf_reset_bodies);
117     testing_node.try_put(InputType());
118     g.wait_for_all();
119 
120     CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be replaced with a copy of the body");
121 }
122 
test_limiter_node_rf_reset_protocol()123 void test_limiter_node_rf_reset_protocol(){
124     oneapi::tbb::flow::graph g;
125 
126     constexpr int limit = 5;
127     oneapi::tbb::flow::limiter_node<int> testing_node(g, limit);
128     conformance::test_push_receiver<int> suc_node(g);
129 
130     oneapi::tbb::flow::make_edge(testing_node, suc_node);
131 
132     for(int i = 0; i < limit * 2; ++i)
133         testing_node.try_put(1);
134     g.wait_for_all();
135 
136     CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
137 
138     g.reset(oneapi::tbb::flow::rf_reset_protocol);
139 
140     for(int i = 0; i < limit * 2; ++i)
141         testing_node.try_put(1);
142     g.wait_for_all();
143 
144     CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
145 }
146 
test_join_node_rf_reset_protocol()147 void test_join_node_rf_reset_protocol(){
148     oneapi::tbb::flow::graph g;
149 
150     oneapi::tbb::flow::join_node<std::tuple<int>, oneapi::tbb::flow::queueing> testing_node(g);
151 
152     oneapi::tbb::flow::input_port<0>(testing_node).try_put(1);
153 
154     g.wait_for_all();
155     g.reset(oneapi::tbb::flow::rf_reset_protocol);
156 
157     std::tuple<int> tmp(0);
158     CHECK_MESSAGE((!testing_node.try_get(tmp)), "All buffers must be emptied");
159 }
160 
161 //! Graph reset
162 //! \brief \ref requirement
163 TEST_CASE("graph reset with rf_reset_protocol") {
164     using namespace oneapi::tbb::flow;
165     test_continue_node_rf_reset_protocol();
166     test_input_node_rf_reset_protocol();
167     test_functional_nodes_rf_reset_protocol<function_node<int, int, queueing>>();
168     test_functional_nodes_rf_reset_protocol<multifunction_node<int, std::tuple<int>, queueing>>();
169     test_functional_nodes_rf_reset_protocol<async_node<int, int, queueing>>();
170 
171     test_buffering_nodes_rf_reset_protocol<buffer_node<int>>();
172     test_buffering_nodes_rf_reset_protocol<queue_node<int>>();
173     test_buffering_nodes_rf_reset_protocol<overwrite_node<int>>();
174     test_buffering_nodes_rf_reset_protocol<write_once_node<int>>();
175     test_buffering_nodes_rf_reset_protocol<priority_queue_node<int>>();
176     conformance::sequencer_functor<int> sequencer;
177     test_buffering_nodes_rf_reset_protocol<sequencer_node<int>>(sequencer);
178 
179     test_limiter_node_rf_reset_protocol();
180     test_join_node_rf_reset_protocol();
181 }
182 
183 //! Graph reset rf_clear_edges
184 //! \brief \ref requirement
185 TEST_CASE("graph reset with rf_clear_edges") {
186     oneapi::tbb::flow::graph g;
187     using body = conformance::dummy_functor<int>;
188 
189     oneapi::tbb::flow::queue_node<int> successor(g);
190     oneapi::tbb::flow::queue_node<std::tuple<int>> successor2(g);
191     oneapi::tbb::flow::queue_node<oneapi::tbb::flow::indexer_node<int>::output_type> successor3(g);
192 
193     //node types
194     oneapi::tbb::flow::continue_node<int> ct(g, body());
195     oneapi::tbb::flow::split_node< std::tuple<int> > s(g);
196     oneapi::tbb::flow::input_node<int> src(g, body());
197     oneapi::tbb::flow::function_node<int, int> fxn(g, oneapi::tbb::flow::unlimited, body());
198     oneapi::tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, oneapi::tbb::flow::unlimited, body());
199     oneapi::tbb::flow::broadcast_node<int> bc(g);
200     oneapi::tbb::flow::limiter_node<int> lim(g, 2);
201     oneapi::tbb::flow::indexer_node<int> ind(g);
202     oneapi::tbb::flow::join_node< std::tuple< int >, oneapi::tbb::flow::queueing > j(g);
203     oneapi::tbb::flow::buffer_node<int> bf(g);
204     oneapi::tbb::flow::priority_queue_node<int> pq(g);
205     oneapi::tbb::flow::write_once_node<int> wo(g);
206     oneapi::tbb::flow::overwrite_node<int> ovw(g);
207     oneapi::tbb::flow::sequencer_node<int> seq(g, conformance::sequencer_functor<int>());
208 
209     oneapi::tbb::flow::make_edge(ct, successor);
210     oneapi::tbb::flow::make_edge(s, successor);
211     oneapi::tbb::flow::make_edge(src, successor);
212     oneapi::tbb::flow::make_edge(fxn, successor);
213     oneapi::tbb::flow::make_edge(m_fxn, successor);
214     oneapi::tbb::flow::make_edge(bc, successor);
215     oneapi::tbb::flow::make_edge(lim, successor);
216     oneapi::tbb::flow::make_edge(ind, successor3);
217     oneapi::tbb::flow::make_edge(j, successor2);
218     oneapi::tbb::flow::make_edge(bf, successor);
219     oneapi::tbb::flow::make_edge(pq, successor);
220     oneapi::tbb::flow::make_edge(wo, successor);
221     oneapi::tbb::flow::make_edge(ovw, successor);
222     oneapi::tbb::flow::make_edge(seq, successor);
223 
224     g.wait_for_all();
225     g.reset(oneapi::tbb::flow::rf_clear_edges);
226 
227     ct.try_put(oneapi::tbb::flow::continue_msg());
228     s.try_put(std::tuple<int>{1});
229     src.activate();
230     fxn.try_put(1);
231     m_fxn.try_put(1);
232     bc.try_put(1);
233     lim.try_put(1);
234     oneapi::tbb::flow::input_port<0>(ind).try_put(1);
235     oneapi::tbb::flow::input_port<0>(j).try_put(1);
236     bf.try_put(1);
237     pq.try_put(1);
238     wo.try_put(1);
239     ovw.try_put(1);
240     seq.try_put(0);
241 
242     g.wait_for_all();
243 
244     CHECK_MESSAGE((conformance::get_values(successor).size() == 0), "Message should not pass when edges doesn't exist");
245     CHECK_MESSAGE((conformance::get_values(successor2).size() == 0), "Message should not pass when edge doesn't exist");
246     CHECK_MESSAGE((conformance::get_values(successor3).size() == 0), "Message should not pass when edge doesn't exist");
247 }
248 
249 //! Graph reset rf_reset_bodies
250 //! \brief \ref requirement
251 TEST_CASE("graph reset with rf_reset_bodies") {
252     using namespace oneapi::tbb::flow;
253     test_nodes_with_body_rf_reset_bodies<continue_node<int>, continue_msg>(serial);
254     test_nodes_with_body_rf_reset_bodies<function_node<int, int>, int>(serial);
255     test_nodes_with_body_rf_reset_bodies<multifunction_node<int, std::tuple<int>>, int>(serial);
256     test_nodes_with_body_rf_reset_bodies<async_node<int, int>, int>(serial);
257 
258     graph g;
259     conformance::counting_functor<int> counting_body(1);
260     input_node<int> testing_node(g,counting_body);
261     queue_node<int> q_node(g);
262 
263     make_edge(testing_node, q_node);
264 
265     testing_node.activate();
266     g.wait_for_all();
267 
268     CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be executed");
269 
270     g.reset(rf_reset_bodies);
271     testing_node.activate();
272     g.wait_for_all();
273 
274     CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be replaced with a copy of the body");
275 }
276 
277 //! Graph cancel
278 //! \brief \ref requirement
279 TEST_CASE("graph cancel") {
280     oneapi::tbb::flow::graph g;
281     CHECK_MESSAGE(!g.is_cancelled(), "Freshly created graph should not be cancelled." );
282 
283     g.cancel();
284     CHECK_MESSAGE(!g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." );
285 
286     g.wait_for_all();
287     CHECK_MESSAGE(g.is_cancelled(), "Waiting should allow checking the cancellation status." );
288 
289     g.reset();
290     CHECK_MESSAGE(!g.is_cancelled(), "Resetting must reset the cancellation status." );
291 
292     std::atomic<bool> cancelled(false);
293     std::atomic<unsigned> executed(0);
__anon8ef3c1da0202(int) 294     oneapi::tbb::flow::function_node<int> f(g, oneapi::tbb::flow::serial, [&](int) {
295         ++executed;
296         while( !cancelled.load(std::memory_order_relaxed) )
297             std::this_thread::sleep_for(std::chrono::milliseconds(1));
298     });
299 
300     const unsigned N = 10;
301     for( unsigned i = 0; i < N; ++i )
302         f.try_put(0);
303 
__anon8ef3c1da0302null304     std::thread thr([&] {
305         while( !executed )
306             std::this_thread::sleep_for(std::chrono::milliseconds(1));
307         g.cancel();
308         cancelled.store(true, std::memory_order_relaxed);
309     });
310     g.wait_for_all();
311     thr.join();
312     CHECK_MESSAGE(g.is_cancelled(), "Wait for all should not change the cancellation status." );
313     CHECK_MESSAGE(1 == executed, "Buffered messages should be dropped by the cancelled graph." );
314 }
315