151c0b2f7Stbbdev /*
2b15aabb3Stbbdev Copyright (c) 2020-2021 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev
2151c0b2f7Stbbdev #include "conformance_flowgraph.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev //! \file conformance_graph.cpp
2451c0b2f7Stbbdev //! \brief Test for [flow_graph.graph] specification
2551c0b2f7Stbbdev
test_continue_node_rf_reset_protocol()26*de0109beSIlya Mishin void test_continue_node_rf_reset_protocol(){
2749e08aacStbbdev using namespace oneapi::tbb::flow;
2851c0b2f7Stbbdev graph g;
2951c0b2f7Stbbdev
30*de0109beSIlya Mishin std::atomic<bool> flag = {false};
3151c0b2f7Stbbdev continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;});
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev source.try_put(continue_msg());
3451c0b2f7Stbbdev g.wait_for_all();
3551c0b2f7Stbbdev
36*de0109beSIlya Mishin CHECK_MESSAGE((flag == false), "Should be false");
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev g.reset(rf_reset_protocol);
3951c0b2f7Stbbdev
4051c0b2f7Stbbdev source.try_put(continue_msg());
4151c0b2f7Stbbdev g.wait_for_all();
42*de0109beSIlya Mishin CHECK_MESSAGE((flag == false), "Internal number of predecessors reinitialized");
4351c0b2f7Stbbdev
4451c0b2f7Stbbdev source.try_put(continue_msg());
4551c0b2f7Stbbdev g.wait_for_all();
4651c0b2f7Stbbdev CHECK_MESSAGE((flag == true), "Should be true");
47*de0109beSIlya Mishin }
4851c0b2f7Stbbdev
test_input_node_rf_reset_protocol()49*de0109beSIlya Mishin void test_input_node_rf_reset_protocol(){
50*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
5151c0b2f7Stbbdev
52*de0109beSIlya Mishin conformance::copy_counting_object<int> fun;
5351c0b2f7Stbbdev
54*de0109beSIlya Mishin oneapi::tbb::flow::input_node<int> node(g, fun);
55*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int> rejecter(g, 0);
5651c0b2f7Stbbdev
57*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node, rejecter);
58*de0109beSIlya Mishin
59*de0109beSIlya Mishin node.activate();
6051c0b2f7Stbbdev g.wait_for_all();
6151c0b2f7Stbbdev
62*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_reset_protocol);
6351c0b2f7Stbbdev
6451c0b2f7Stbbdev int tmp = -1;
65*de0109beSIlya Mishin CHECK_MESSAGE((node.try_get(tmp) == false), "Should be false");
66*de0109beSIlya Mishin }
67*de0109beSIlya Mishin
68*de0109beSIlya Mishin template<typename Node>
test_functional_nodes_rf_reset_protocol()69*de0109beSIlya Mishin void test_functional_nodes_rf_reset_protocol(){
70*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
71*de0109beSIlya Mishin size_t concurrency_limit = 1;
72*de0109beSIlya Mishin oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit);
73*de0109beSIlya Mishin
74*de0109beSIlya Mishin conformance::counting_functor<int> counting_body;
75*de0109beSIlya Mishin Node f(g, oneapi::tbb::flow::serial, counting_body);
76*de0109beSIlya Mishin
77*de0109beSIlya Mishin f.try_put(0);
78*de0109beSIlya Mishin f.try_put(0);
79*de0109beSIlya Mishin CHECK_MESSAGE((counting_body.execute_count == 0), "Body should not be executed");
80*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_reset_protocol);
81*de0109beSIlya Mishin
82*de0109beSIlya Mishin g.wait_for_all();
83*de0109beSIlya Mishin CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
84*de0109beSIlya Mishin }
85*de0109beSIlya Mishin
86*de0109beSIlya Mishin template<typename Node, typename ...Args>
test_buffering_nodes_rf_reset_protocol(Args...node_body)87*de0109beSIlya Mishin void test_buffering_nodes_rf_reset_protocol(Args... node_body){
88*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
89*de0109beSIlya Mishin Node testing_node(g, node_body...);
90*de0109beSIlya Mishin
91*de0109beSIlya Mishin int tmp = -1;
92*de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
9351c0b2f7Stbbdev CHECK_MESSAGE((tmp == -1), "Value should not be updated");
9451c0b2f7Stbbdev
95*de0109beSIlya Mishin testing_node.try_put(1);
96*de0109beSIlya Mishin g.wait_for_all();
97*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_reset_protocol);
9851c0b2f7Stbbdev
9951c0b2f7Stbbdev tmp = -1;
100*de0109beSIlya Mishin CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
10151c0b2f7Stbbdev CHECK_MESSAGE((tmp == -1), "Value should not be updated");
10251c0b2f7Stbbdev g.wait_for_all();
103*de0109beSIlya Mishin }
10451c0b2f7Stbbdev
105*de0109beSIlya Mishin template<typename Node, typename InputType, typename ...Args>
test_nodes_with_body_rf_reset_bodies(Args...node_args)106*de0109beSIlya Mishin void test_nodes_with_body_rf_reset_bodies(Args... node_args){
107*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
108*de0109beSIlya Mishin conformance::counting_functor<int> counting_body(5);
109*de0109beSIlya Mishin Node testing_node(g, node_args..., counting_body);
11051c0b2f7Stbbdev
111*de0109beSIlya Mishin testing_node.try_put(InputType());
11251c0b2f7Stbbdev g.wait_for_all();
11351c0b2f7Stbbdev
114*de0109beSIlya Mishin CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
11551c0b2f7Stbbdev
116*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_reset_bodies);
117*de0109beSIlya Mishin testing_node.try_put(InputType());
11851c0b2f7Stbbdev g.wait_for_all();
11951c0b2f7Stbbdev
120*de0109beSIlya Mishin CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be replaced with a copy of the body");
121*de0109beSIlya Mishin }
12251c0b2f7Stbbdev
test_limiter_node_rf_reset_protocol()123*de0109beSIlya Mishin void test_limiter_node_rf_reset_protocol(){
124*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
12551c0b2f7Stbbdev
126*de0109beSIlya Mishin constexpr int limit = 5;
127*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int> testing_node(g, limit);
128*de0109beSIlya Mishin conformance::test_push_receiver<int> suc_node(g);
129*de0109beSIlya Mishin
130*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(testing_node, suc_node);
131*de0109beSIlya Mishin
132*de0109beSIlya Mishin for(int i = 0; i < limit * 2; ++i)
133*de0109beSIlya Mishin testing_node.try_put(1);
13451c0b2f7Stbbdev g.wait_for_all();
13551c0b2f7Stbbdev
136*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
13751c0b2f7Stbbdev
138*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_reset_protocol);
139*de0109beSIlya Mishin
140*de0109beSIlya Mishin for(int i = 0; i < limit * 2; ++i)
141*de0109beSIlya Mishin testing_node.try_put(1);
142*de0109beSIlya Mishin g.wait_for_all();
143*de0109beSIlya Mishin
144*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
145*de0109beSIlya Mishin }
146*de0109beSIlya Mishin
test_join_node_rf_reset_protocol()147*de0109beSIlya Mishin void test_join_node_rf_reset_protocol(){
148*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
149*de0109beSIlya Mishin
150*de0109beSIlya Mishin oneapi::tbb::flow::join_node<std::tuple<int>, oneapi::tbb::flow::queueing> testing_node(g);
151*de0109beSIlya Mishin
152*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(testing_node).try_put(1);
153*de0109beSIlya Mishin
154*de0109beSIlya Mishin g.wait_for_all();
155*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_reset_protocol);
156*de0109beSIlya Mishin
157*de0109beSIlya Mishin std::tuple<int> tmp(0);
158*de0109beSIlya Mishin CHECK_MESSAGE((!testing_node.try_get(tmp)), "All buffers must be emptied");
159*de0109beSIlya Mishin }
160*de0109beSIlya Mishin
161*de0109beSIlya Mishin //! Graph reset
162*de0109beSIlya Mishin //! \brief \ref requirement
163*de0109beSIlya Mishin TEST_CASE("graph reset with rf_reset_protocol") {
164*de0109beSIlya Mishin using namespace oneapi::tbb::flow;
165*de0109beSIlya Mishin test_continue_node_rf_reset_protocol();
166*de0109beSIlya Mishin test_input_node_rf_reset_protocol();
167*de0109beSIlya Mishin test_functional_nodes_rf_reset_protocol<function_node<int, int, queueing>>();
168*de0109beSIlya Mishin test_functional_nodes_rf_reset_protocol<multifunction_node<int, std::tuple<int>, queueing>>();
169*de0109beSIlya Mishin test_functional_nodes_rf_reset_protocol<async_node<int, int, queueing>>();
170*de0109beSIlya Mishin
171*de0109beSIlya Mishin test_buffering_nodes_rf_reset_protocol<buffer_node<int>>();
172*de0109beSIlya Mishin test_buffering_nodes_rf_reset_protocol<queue_node<int>>();
173*de0109beSIlya Mishin test_buffering_nodes_rf_reset_protocol<overwrite_node<int>>();
174*de0109beSIlya Mishin test_buffering_nodes_rf_reset_protocol<write_once_node<int>>();
175*de0109beSIlya Mishin test_buffering_nodes_rf_reset_protocol<priority_queue_node<int>>();
176*de0109beSIlya Mishin conformance::sequencer_functor<int> sequencer;
177*de0109beSIlya Mishin test_buffering_nodes_rf_reset_protocol<sequencer_node<int>>(sequencer);
178*de0109beSIlya Mishin
179*de0109beSIlya Mishin test_limiter_node_rf_reset_protocol();
180*de0109beSIlya Mishin test_join_node_rf_reset_protocol();
181*de0109beSIlya Mishin }
182*de0109beSIlya Mishin
183*de0109beSIlya Mishin //! Graph reset rf_clear_edges
184*de0109beSIlya Mishin //! \brief \ref requirement
185*de0109beSIlya Mishin TEST_CASE("graph reset with rf_clear_edges") {
186*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
187*de0109beSIlya Mishin using body = conformance::dummy_functor<int>;
188*de0109beSIlya Mishin
189*de0109beSIlya Mishin oneapi::tbb::flow::queue_node<int> successor(g);
190*de0109beSIlya Mishin oneapi::tbb::flow::queue_node<std::tuple<int>> successor2(g);
191*de0109beSIlya Mishin oneapi::tbb::flow::queue_node<oneapi::tbb::flow::indexer_node<int>::output_type> successor3(g);
192*de0109beSIlya Mishin
193*de0109beSIlya Mishin //node types
194*de0109beSIlya Mishin oneapi::tbb::flow::continue_node<int> ct(g, body());
195*de0109beSIlya Mishin oneapi::tbb::flow::split_node< std::tuple<int> > s(g);
196*de0109beSIlya Mishin oneapi::tbb::flow::input_node<int> src(g, body());
197*de0109beSIlya Mishin oneapi::tbb::flow::function_node<int, int> fxn(g, oneapi::tbb::flow::unlimited, body());
198*de0109beSIlya Mishin oneapi::tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, oneapi::tbb::flow::unlimited, body());
199*de0109beSIlya Mishin oneapi::tbb::flow::broadcast_node<int> bc(g);
200*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int> lim(g, 2);
201*de0109beSIlya Mishin oneapi::tbb::flow::indexer_node<int> ind(g);
202*de0109beSIlya Mishin oneapi::tbb::flow::join_node< std::tuple< int >, oneapi::tbb::flow::queueing > j(g);
203*de0109beSIlya Mishin oneapi::tbb::flow::buffer_node<int> bf(g);
204*de0109beSIlya Mishin oneapi::tbb::flow::priority_queue_node<int> pq(g);
205*de0109beSIlya Mishin oneapi::tbb::flow::write_once_node<int> wo(g);
206*de0109beSIlya Mishin oneapi::tbb::flow::overwrite_node<int> ovw(g);
207*de0109beSIlya Mishin oneapi::tbb::flow::sequencer_node<int> seq(g, conformance::sequencer_functor<int>());
208*de0109beSIlya Mishin
209*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(ct, successor);
210*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(s, successor);
211*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(src, successor);
212*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(fxn, successor);
213*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(m_fxn, successor);
214*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(bc, successor);
215*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(lim, successor);
216*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(ind, successor3);
217*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(j, successor2);
218*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(bf, successor);
219*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(pq, successor);
220*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(wo, successor);
221*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(ovw, successor);
222*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(seq, successor);
223*de0109beSIlya Mishin
224*de0109beSIlya Mishin g.wait_for_all();
225*de0109beSIlya Mishin g.reset(oneapi::tbb::flow::rf_clear_edges);
226*de0109beSIlya Mishin
227*de0109beSIlya Mishin ct.try_put(oneapi::tbb::flow::continue_msg());
228*de0109beSIlya Mishin s.try_put(std::tuple<int>{1});
229*de0109beSIlya Mishin src.activate();
230*de0109beSIlya Mishin fxn.try_put(1);
231*de0109beSIlya Mishin m_fxn.try_put(1);
232*de0109beSIlya Mishin bc.try_put(1);
233*de0109beSIlya Mishin lim.try_put(1);
234*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(ind).try_put(1);
235*de0109beSIlya Mishin oneapi::tbb::flow::input_port<0>(j).try_put(1);
236*de0109beSIlya Mishin bf.try_put(1);
237*de0109beSIlya Mishin pq.try_put(1);
238*de0109beSIlya Mishin wo.try_put(1);
239*de0109beSIlya Mishin ovw.try_put(1);
240*de0109beSIlya Mishin seq.try_put(0);
241*de0109beSIlya Mishin
242*de0109beSIlya Mishin g.wait_for_all();
243*de0109beSIlya Mishin
244*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(successor).size() == 0), "Message should not pass when edges doesn't exist");
245*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(successor2).size() == 0), "Message should not pass when edge doesn't exist");
246*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(successor3).size() == 0), "Message should not pass when edge doesn't exist");
247*de0109beSIlya Mishin }
248*de0109beSIlya Mishin
249*de0109beSIlya Mishin //! Graph reset rf_reset_bodies
250*de0109beSIlya Mishin //! \brief \ref requirement
251*de0109beSIlya Mishin TEST_CASE("graph reset with rf_reset_bodies") {
252*de0109beSIlya Mishin using namespace oneapi::tbb::flow;
253*de0109beSIlya Mishin test_nodes_with_body_rf_reset_bodies<continue_node<int>, continue_msg>(serial);
254*de0109beSIlya Mishin test_nodes_with_body_rf_reset_bodies<function_node<int, int>, int>(serial);
255*de0109beSIlya Mishin test_nodes_with_body_rf_reset_bodies<multifunction_node<int, std::tuple<int>>, int>(serial);
256*de0109beSIlya Mishin test_nodes_with_body_rf_reset_bodies<async_node<int, int>, int>(serial);
257*de0109beSIlya Mishin
258*de0109beSIlya Mishin graph g;
259*de0109beSIlya Mishin conformance::counting_functor<int> counting_body(1);
260*de0109beSIlya Mishin input_node<int> testing_node(g,counting_body);
261*de0109beSIlya Mishin queue_node<int> q_node(g);
262*de0109beSIlya Mishin
263*de0109beSIlya Mishin make_edge(testing_node, q_node);
264*de0109beSIlya Mishin
265*de0109beSIlya Mishin testing_node.activate();
266*de0109beSIlya Mishin g.wait_for_all();
267*de0109beSIlya Mishin
268*de0109beSIlya Mishin CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be executed");
269*de0109beSIlya Mishin
270*de0109beSIlya Mishin g.reset(rf_reset_bodies);
271*de0109beSIlya Mishin testing_node.activate();
272*de0109beSIlya Mishin g.wait_for_all();
273*de0109beSIlya Mishin
274*de0109beSIlya Mishin CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be replaced with a copy of the body");
27551c0b2f7Stbbdev }
27649e08aacStbbdev
27749e08aacStbbdev //! Graph cancel
27849e08aacStbbdev //! \brief \ref requirement
27949e08aacStbbdev TEST_CASE("graph cancel") {
280*de0109beSIlya Mishin oneapi::tbb::flow::graph g;
28149e08aacStbbdev CHECK_MESSAGE(!g.is_cancelled(), "Freshly created graph should not be cancelled." );
28249e08aacStbbdev
28349e08aacStbbdev g.cancel();
28449e08aacStbbdev CHECK_MESSAGE(!g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." );
28549e08aacStbbdev
28649e08aacStbbdev g.wait_for_all();
28749e08aacStbbdev CHECK_MESSAGE(g.is_cancelled(), "Waiting should allow checking the cancellation status." );
28849e08aacStbbdev
28949e08aacStbbdev g.reset();
29049e08aacStbbdev CHECK_MESSAGE(!g.is_cancelled(), "Resetting must reset the cancellation status." );
29149e08aacStbbdev
29249e08aacStbbdev std::atomic<bool> cancelled(false);
29349e08aacStbbdev std::atomic<unsigned> executed(0);
__anon8ef3c1da0202(int) 294*de0109beSIlya Mishin oneapi::tbb::flow::function_node<int> f(g, oneapi::tbb::flow::serial, [&](int) {
29549e08aacStbbdev ++executed;
29649e08aacStbbdev while( !cancelled.load(std::memory_order_relaxed) )
29749e08aacStbbdev std::this_thread::sleep_for(std::chrono::milliseconds(1));
29849e08aacStbbdev });
29949e08aacStbbdev
30049e08aacStbbdev const unsigned N = 10;
30149e08aacStbbdev for( unsigned i = 0; i < N; ++i )
30249e08aacStbbdev f.try_put(0);
30349e08aacStbbdev
__anon8ef3c1da0302null30449e08aacStbbdev std::thread thr([&] {
30549e08aacStbbdev while( !executed )
30649e08aacStbbdev std::this_thread::sleep_for(std::chrono::milliseconds(1));
30749e08aacStbbdev g.cancel();
30849e08aacStbbdev cancelled.store(true, std::memory_order_relaxed);
30949e08aacStbbdev });
31049e08aacStbbdev g.wait_for_all();
31149e08aacStbbdev thr.join();
31249e08aacStbbdev CHECK_MESSAGE(g.is_cancelled(), "Wait for all should not change the cancellation status." );
31349e08aacStbbdev CHECK_MESSAGE(1 == executed, "Buffered messages should be dropped by the cancelled graph." );
31449e08aacStbbdev }
315