1 /*
2 Copyright (c) 2020-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #include "conformance_flowgraph.h"
22
23 //! \file conformance_graph.cpp
24 //! \brief Test for [flow_graph.graph] specification
25
test_continue_node_rf_reset_protocol()26 void test_continue_node_rf_reset_protocol(){
27 using namespace oneapi::tbb::flow;
28 graph g;
29
30 std::atomic<bool> flag = {false};
31 continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;});
32
33 source.try_put(continue_msg());
34 g.wait_for_all();
35
36 CHECK_MESSAGE((flag == false), "Should be false");
37
38 g.reset(rf_reset_protocol);
39
40 source.try_put(continue_msg());
41 g.wait_for_all();
42 CHECK_MESSAGE((flag == false), "Internal number of predecessors reinitialized");
43
44 source.try_put(continue_msg());
45 g.wait_for_all();
46 CHECK_MESSAGE((flag == true), "Should be true");
47 }
48
test_input_node_rf_reset_protocol()49 void test_input_node_rf_reset_protocol(){
50 oneapi::tbb::flow::graph g;
51
52 conformance::copy_counting_object<int> fun;
53
54 oneapi::tbb::flow::input_node<int> node(g, fun);
55 oneapi::tbb::flow::limiter_node<int> rejecter(g, 0);
56
57 oneapi::tbb::flow::make_edge(node, rejecter);
58
59 node.activate();
60 g.wait_for_all();
61
62 g.reset(oneapi::tbb::flow::rf_reset_protocol);
63
64 int tmp = -1;
65 CHECK_MESSAGE((node.try_get(tmp) == false), "Should be false");
66 }
67
68 template<typename Node>
test_functional_nodes_rf_reset_protocol()69 void test_functional_nodes_rf_reset_protocol(){
70 oneapi::tbb::flow::graph g;
71 size_t concurrency_limit = 1;
72 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit);
73
74 conformance::counting_functor<int> counting_body;
75 Node f(g, oneapi::tbb::flow::serial, counting_body);
76
77 f.try_put(0);
78 f.try_put(0);
79 CHECK_MESSAGE((counting_body.execute_count == 0), "Body should not be executed");
80 g.reset(oneapi::tbb::flow::rf_reset_protocol);
81
82 g.wait_for_all();
83 CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
84 }
85
86 template<typename Node, typename ...Args>
test_buffering_nodes_rf_reset_protocol(Args...node_body)87 void test_buffering_nodes_rf_reset_protocol(Args... node_body){
88 oneapi::tbb::flow::graph g;
89 Node testing_node(g, node_body...);
90
91 int tmp = -1;
92 CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
93 CHECK_MESSAGE((tmp == -1), "Value should not be updated");
94
95 testing_node.try_put(1);
96 g.wait_for_all();
97 g.reset(oneapi::tbb::flow::rf_reset_protocol);
98
99 tmp = -1;
100 CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
101 CHECK_MESSAGE((tmp == -1), "Value should not be updated");
102 g.wait_for_all();
103 }
104
105 template<typename Node, typename InputType, typename ...Args>
test_nodes_with_body_rf_reset_bodies(Args...node_args)106 void test_nodes_with_body_rf_reset_bodies(Args... node_args){
107 oneapi::tbb::flow::graph g;
108 conformance::counting_functor<int> counting_body(5);
109 Node testing_node(g, node_args..., counting_body);
110
111 testing_node.try_put(InputType());
112 g.wait_for_all();
113
114 CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
115
116 g.reset(oneapi::tbb::flow::rf_reset_bodies);
117 testing_node.try_put(InputType());
118 g.wait_for_all();
119
120 CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be replaced with a copy of the body");
121 }
122
test_limiter_node_rf_reset_protocol()123 void test_limiter_node_rf_reset_protocol(){
124 oneapi::tbb::flow::graph g;
125
126 constexpr int limit = 5;
127 oneapi::tbb::flow::limiter_node<int> testing_node(g, limit);
128 conformance::test_push_receiver<int> suc_node(g);
129
130 oneapi::tbb::flow::make_edge(testing_node, suc_node);
131
132 for(int i = 0; i < limit * 2; ++i)
133 testing_node.try_put(1);
134 g.wait_for_all();
135
136 CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
137
138 g.reset(oneapi::tbb::flow::rf_reset_protocol);
139
140 for(int i = 0; i < limit * 2; ++i)
141 testing_node.try_put(1);
142 g.wait_for_all();
143
144 CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
145 }
146
test_join_node_rf_reset_protocol()147 void test_join_node_rf_reset_protocol(){
148 oneapi::tbb::flow::graph g;
149
150 oneapi::tbb::flow::join_node<std::tuple<int>, oneapi::tbb::flow::queueing> testing_node(g);
151
152 oneapi::tbb::flow::input_port<0>(testing_node).try_put(1);
153
154 g.wait_for_all();
155 g.reset(oneapi::tbb::flow::rf_reset_protocol);
156
157 std::tuple<int> tmp(0);
158 CHECK_MESSAGE((!testing_node.try_get(tmp)), "All buffers must be emptied");
159 }
160
161 //! Graph reset
162 //! \brief \ref requirement
163 TEST_CASE("graph reset with rf_reset_protocol") {
164 using namespace oneapi::tbb::flow;
165 test_continue_node_rf_reset_protocol();
166 test_input_node_rf_reset_protocol();
167 test_functional_nodes_rf_reset_protocol<function_node<int, int, queueing>>();
168 test_functional_nodes_rf_reset_protocol<multifunction_node<int, std::tuple<int>, queueing>>();
169 test_functional_nodes_rf_reset_protocol<async_node<int, int, queueing>>();
170
171 test_buffering_nodes_rf_reset_protocol<buffer_node<int>>();
172 test_buffering_nodes_rf_reset_protocol<queue_node<int>>();
173 test_buffering_nodes_rf_reset_protocol<overwrite_node<int>>();
174 test_buffering_nodes_rf_reset_protocol<write_once_node<int>>();
175 test_buffering_nodes_rf_reset_protocol<priority_queue_node<int>>();
176 conformance::sequencer_functor<int> sequencer;
177 test_buffering_nodes_rf_reset_protocol<sequencer_node<int>>(sequencer);
178
179 test_limiter_node_rf_reset_protocol();
180 test_join_node_rf_reset_protocol();
181 }
182
183 //! Graph reset rf_clear_edges
184 //! \brief \ref requirement
185 TEST_CASE("graph reset with rf_clear_edges") {
186 oneapi::tbb::flow::graph g;
187 using body = conformance::dummy_functor<int>;
188
189 oneapi::tbb::flow::queue_node<int> successor(g);
190 oneapi::tbb::flow::queue_node<std::tuple<int>> successor2(g);
191 oneapi::tbb::flow::queue_node<oneapi::tbb::flow::indexer_node<int>::output_type> successor3(g);
192
193 //node types
194 oneapi::tbb::flow::continue_node<int> ct(g, body());
195 oneapi::tbb::flow::split_node< std::tuple<int> > s(g);
196 oneapi::tbb::flow::input_node<int> src(g, body());
197 oneapi::tbb::flow::function_node<int, int> fxn(g, oneapi::tbb::flow::unlimited, body());
198 oneapi::tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, oneapi::tbb::flow::unlimited, body());
199 oneapi::tbb::flow::broadcast_node<int> bc(g);
200 oneapi::tbb::flow::limiter_node<int> lim(g, 2);
201 oneapi::tbb::flow::indexer_node<int> ind(g);
202 oneapi::tbb::flow::join_node< std::tuple< int >, oneapi::tbb::flow::queueing > j(g);
203 oneapi::tbb::flow::buffer_node<int> bf(g);
204 oneapi::tbb::flow::priority_queue_node<int> pq(g);
205 oneapi::tbb::flow::write_once_node<int> wo(g);
206 oneapi::tbb::flow::overwrite_node<int> ovw(g);
207 oneapi::tbb::flow::sequencer_node<int> seq(g, conformance::sequencer_functor<int>());
208
209 oneapi::tbb::flow::make_edge(ct, successor);
210 oneapi::tbb::flow::make_edge(s, successor);
211 oneapi::tbb::flow::make_edge(src, successor);
212 oneapi::tbb::flow::make_edge(fxn, successor);
213 oneapi::tbb::flow::make_edge(m_fxn, successor);
214 oneapi::tbb::flow::make_edge(bc, successor);
215 oneapi::tbb::flow::make_edge(lim, successor);
216 oneapi::tbb::flow::make_edge(ind, successor3);
217 oneapi::tbb::flow::make_edge(j, successor2);
218 oneapi::tbb::flow::make_edge(bf, successor);
219 oneapi::tbb::flow::make_edge(pq, successor);
220 oneapi::tbb::flow::make_edge(wo, successor);
221 oneapi::tbb::flow::make_edge(ovw, successor);
222 oneapi::tbb::flow::make_edge(seq, successor);
223
224 g.wait_for_all();
225 g.reset(oneapi::tbb::flow::rf_clear_edges);
226
227 ct.try_put(oneapi::tbb::flow::continue_msg());
228 s.try_put(std::tuple<int>{1});
229 src.activate();
230 fxn.try_put(1);
231 m_fxn.try_put(1);
232 bc.try_put(1);
233 lim.try_put(1);
234 oneapi::tbb::flow::input_port<0>(ind).try_put(1);
235 oneapi::tbb::flow::input_port<0>(j).try_put(1);
236 bf.try_put(1);
237 pq.try_put(1);
238 wo.try_put(1);
239 ovw.try_put(1);
240 seq.try_put(0);
241
242 g.wait_for_all();
243
244 CHECK_MESSAGE((conformance::get_values(successor).size() == 0), "Message should not pass when edges doesn't exist");
245 CHECK_MESSAGE((conformance::get_values(successor2).size() == 0), "Message should not pass when edge doesn't exist");
246 CHECK_MESSAGE((conformance::get_values(successor3).size() == 0), "Message should not pass when edge doesn't exist");
247 }
248
249 //! Graph reset rf_reset_bodies
250 //! \brief \ref requirement
251 TEST_CASE("graph reset with rf_reset_bodies") {
252 using namespace oneapi::tbb::flow;
253 test_nodes_with_body_rf_reset_bodies<continue_node<int>, continue_msg>(serial);
254 test_nodes_with_body_rf_reset_bodies<function_node<int, int>, int>(serial);
255 test_nodes_with_body_rf_reset_bodies<multifunction_node<int, std::tuple<int>>, int>(serial);
256 test_nodes_with_body_rf_reset_bodies<async_node<int, int>, int>(serial);
257
258 graph g;
259 conformance::counting_functor<int> counting_body(1);
260 input_node<int> testing_node(g,counting_body);
261 queue_node<int> q_node(g);
262
263 make_edge(testing_node, q_node);
264
265 testing_node.activate();
266 g.wait_for_all();
267
268 CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be executed");
269
270 g.reset(rf_reset_bodies);
271 testing_node.activate();
272 g.wait_for_all();
273
274 CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be replaced with a copy of the body");
275 }
276
277 //! Graph cancel
278 //! \brief \ref requirement
279 TEST_CASE("graph cancel") {
280 oneapi::tbb::flow::graph g;
281 CHECK_MESSAGE(!g.is_cancelled(), "Freshly created graph should not be cancelled." );
282
283 g.cancel();
284 CHECK_MESSAGE(!g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." );
285
286 g.wait_for_all();
287 CHECK_MESSAGE(g.is_cancelled(), "Waiting should allow checking the cancellation status." );
288
289 g.reset();
290 CHECK_MESSAGE(!g.is_cancelled(), "Resetting must reset the cancellation status." );
291
292 std::atomic<bool> cancelled(false);
293 std::atomic<unsigned> executed(0);
__anon8ef3c1da0202(int) 294 oneapi::tbb::flow::function_node<int> f(g, oneapi::tbb::flow::serial, [&](int) {
295 ++executed;
296 while( !cancelled.load(std::memory_order_relaxed) )
297 std::this_thread::sleep_for(std::chrono::milliseconds(1));
298 });
299
300 const unsigned N = 10;
301 for( unsigned i = 0; i < N; ++i )
302 f.try_put(0);
303
__anon8ef3c1da0302null304 std::thread thr([&] {
305 while( !executed )
306 std::this_thread::sleep_for(std::chrono::milliseconds(1));
307 g.cancel();
308 cancelled.store(true, std::memory_order_relaxed);
309 });
310 g.wait_for_all();
311 thr.join();
312 CHECK_MESSAGE(g.is_cancelled(), "Wait for all should not change the cancellation status." );
313 CHECK_MESSAGE(1 == executed, "Buffered messages should be dropped by the cancelled graph." );
314 }
315