151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2020-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "conformance_flowgraph.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev //! \file conformance_graph.cpp
2451c0b2f7Stbbdev //! \brief Test for [flow_graph.graph] specification
2551c0b2f7Stbbdev 
test_continue_node_rf_reset_protocol()26*de0109beSIlya Mishin void test_continue_node_rf_reset_protocol(){
2749e08aacStbbdev     using namespace oneapi::tbb::flow;
2851c0b2f7Stbbdev     graph g;
2951c0b2f7Stbbdev 
30*de0109beSIlya Mishin     std::atomic<bool> flag = {false};
3151c0b2f7Stbbdev     continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;});
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev     source.try_put(continue_msg());
3451c0b2f7Stbbdev     g.wait_for_all();
3551c0b2f7Stbbdev 
36*de0109beSIlya Mishin     CHECK_MESSAGE((flag == false), "Should be false");
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev     g.reset(rf_reset_protocol);
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev     source.try_put(continue_msg());
4151c0b2f7Stbbdev     g.wait_for_all();
42*de0109beSIlya Mishin     CHECK_MESSAGE((flag == false), "Internal number of predecessors reinitialized");
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev     source.try_put(continue_msg());
4551c0b2f7Stbbdev     g.wait_for_all();
4651c0b2f7Stbbdev     CHECK_MESSAGE((flag == true), "Should be true");
47*de0109beSIlya Mishin }
4851c0b2f7Stbbdev 
test_input_node_rf_reset_protocol()49*de0109beSIlya Mishin void test_input_node_rf_reset_protocol(){
50*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
5151c0b2f7Stbbdev 
52*de0109beSIlya Mishin     conformance::copy_counting_object<int> fun;
5351c0b2f7Stbbdev 
54*de0109beSIlya Mishin     oneapi::tbb::flow::input_node<int> node(g, fun);
55*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int> rejecter(g, 0);
5651c0b2f7Stbbdev 
57*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node, rejecter);
58*de0109beSIlya Mishin 
59*de0109beSIlya Mishin     node.activate();
6051c0b2f7Stbbdev     g.wait_for_all();
6151c0b2f7Stbbdev 
62*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_reset_protocol);
6351c0b2f7Stbbdev 
6451c0b2f7Stbbdev     int tmp = -1;
65*de0109beSIlya Mishin     CHECK_MESSAGE((node.try_get(tmp) == false), "Should be false");
66*de0109beSIlya Mishin }
67*de0109beSIlya Mishin 
68*de0109beSIlya Mishin template<typename Node>
test_functional_nodes_rf_reset_protocol()69*de0109beSIlya Mishin void test_functional_nodes_rf_reset_protocol(){
70*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
71*de0109beSIlya Mishin     size_t concurrency_limit = 1;
72*de0109beSIlya Mishin     oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit);
73*de0109beSIlya Mishin 
74*de0109beSIlya Mishin     conformance::counting_functor<int> counting_body;
75*de0109beSIlya Mishin     Node f(g, oneapi::tbb::flow::serial, counting_body);
76*de0109beSIlya Mishin 
77*de0109beSIlya Mishin     f.try_put(0);
78*de0109beSIlya Mishin     f.try_put(0);
79*de0109beSIlya Mishin     CHECK_MESSAGE((counting_body.execute_count == 0), "Body should not be executed");
80*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_reset_protocol);
81*de0109beSIlya Mishin 
82*de0109beSIlya Mishin     g.wait_for_all();
83*de0109beSIlya Mishin     CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
84*de0109beSIlya Mishin }
85*de0109beSIlya Mishin 
86*de0109beSIlya Mishin template<typename Node, typename ...Args>
test_buffering_nodes_rf_reset_protocol(Args...node_body)87*de0109beSIlya Mishin void test_buffering_nodes_rf_reset_protocol(Args... node_body){
88*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
89*de0109beSIlya Mishin     Node testing_node(g, node_body...);
90*de0109beSIlya Mishin 
91*de0109beSIlya Mishin     int tmp = -1;
92*de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
9351c0b2f7Stbbdev     CHECK_MESSAGE((tmp == -1), "Value should not be updated");
9451c0b2f7Stbbdev 
95*de0109beSIlya Mishin     testing_node.try_put(1);
96*de0109beSIlya Mishin     g.wait_for_all();
97*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_reset_protocol);
9851c0b2f7Stbbdev 
9951c0b2f7Stbbdev     tmp = -1;
100*de0109beSIlya Mishin     CHECK_MESSAGE((testing_node.try_get(tmp) == false), "try_get should not succeed");
10151c0b2f7Stbbdev     CHECK_MESSAGE((tmp == -1), "Value should not be updated");
10251c0b2f7Stbbdev     g.wait_for_all();
103*de0109beSIlya Mishin }
10451c0b2f7Stbbdev 
105*de0109beSIlya Mishin template<typename Node, typename InputType, typename ...Args>
test_nodes_with_body_rf_reset_bodies(Args...node_args)106*de0109beSIlya Mishin void test_nodes_with_body_rf_reset_bodies(Args... node_args){
107*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
108*de0109beSIlya Mishin     conformance::counting_functor<int> counting_body(5);
109*de0109beSIlya Mishin     Node testing_node(g, node_args..., counting_body);
11051c0b2f7Stbbdev 
111*de0109beSIlya Mishin     testing_node.try_put(InputType());
11251c0b2f7Stbbdev     g.wait_for_all();
11351c0b2f7Stbbdev 
114*de0109beSIlya Mishin     CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be executed");
11551c0b2f7Stbbdev 
116*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_reset_bodies);
117*de0109beSIlya Mishin     testing_node.try_put(InputType());
11851c0b2f7Stbbdev     g.wait_for_all();
11951c0b2f7Stbbdev 
120*de0109beSIlya Mishin     CHECK_MESSAGE((counting_body.execute_count == 1), "Body should be replaced with a copy of the body");
121*de0109beSIlya Mishin }
12251c0b2f7Stbbdev 
test_limiter_node_rf_reset_protocol()123*de0109beSIlya Mishin void test_limiter_node_rf_reset_protocol(){
124*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
12551c0b2f7Stbbdev 
126*de0109beSIlya Mishin     constexpr int limit = 5;
127*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int> testing_node(g, limit);
128*de0109beSIlya Mishin     conformance::test_push_receiver<int> suc_node(g);
129*de0109beSIlya Mishin 
130*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(testing_node, suc_node);
131*de0109beSIlya Mishin 
132*de0109beSIlya Mishin     for(int i = 0; i < limit * 2; ++i)
133*de0109beSIlya Mishin         testing_node.try_put(1);
13451c0b2f7Stbbdev     g.wait_for_all();
13551c0b2f7Stbbdev 
136*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
13751c0b2f7Stbbdev 
138*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_reset_protocol);
139*de0109beSIlya Mishin 
140*de0109beSIlya Mishin     for(int i = 0; i < limit * 2; ++i)
141*de0109beSIlya Mishin         testing_node.try_put(1);
142*de0109beSIlya Mishin     g.wait_for_all();
143*de0109beSIlya Mishin 
144*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(suc_node).size() == limit), "Descendant of the node needs be receive limited number of messages");
145*de0109beSIlya Mishin }
146*de0109beSIlya Mishin 
test_join_node_rf_reset_protocol()147*de0109beSIlya Mishin void test_join_node_rf_reset_protocol(){
148*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
149*de0109beSIlya Mishin 
150*de0109beSIlya Mishin     oneapi::tbb::flow::join_node<std::tuple<int>, oneapi::tbb::flow::queueing> testing_node(g);
151*de0109beSIlya Mishin 
152*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(testing_node).try_put(1);
153*de0109beSIlya Mishin 
154*de0109beSIlya Mishin     g.wait_for_all();
155*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_reset_protocol);
156*de0109beSIlya Mishin 
157*de0109beSIlya Mishin     std::tuple<int> tmp(0);
158*de0109beSIlya Mishin     CHECK_MESSAGE((!testing_node.try_get(tmp)), "All buffers must be emptied");
159*de0109beSIlya Mishin }
160*de0109beSIlya Mishin 
161*de0109beSIlya Mishin //! Graph reset
162*de0109beSIlya Mishin //! \brief \ref requirement
163*de0109beSIlya Mishin TEST_CASE("graph reset with rf_reset_protocol") {
164*de0109beSIlya Mishin     using namespace oneapi::tbb::flow;
165*de0109beSIlya Mishin     test_continue_node_rf_reset_protocol();
166*de0109beSIlya Mishin     test_input_node_rf_reset_protocol();
167*de0109beSIlya Mishin     test_functional_nodes_rf_reset_protocol<function_node<int, int, queueing>>();
168*de0109beSIlya Mishin     test_functional_nodes_rf_reset_protocol<multifunction_node<int, std::tuple<int>, queueing>>();
169*de0109beSIlya Mishin     test_functional_nodes_rf_reset_protocol<async_node<int, int, queueing>>();
170*de0109beSIlya Mishin 
171*de0109beSIlya Mishin     test_buffering_nodes_rf_reset_protocol<buffer_node<int>>();
172*de0109beSIlya Mishin     test_buffering_nodes_rf_reset_protocol<queue_node<int>>();
173*de0109beSIlya Mishin     test_buffering_nodes_rf_reset_protocol<overwrite_node<int>>();
174*de0109beSIlya Mishin     test_buffering_nodes_rf_reset_protocol<write_once_node<int>>();
175*de0109beSIlya Mishin     test_buffering_nodes_rf_reset_protocol<priority_queue_node<int>>();
176*de0109beSIlya Mishin     conformance::sequencer_functor<int> sequencer;
177*de0109beSIlya Mishin     test_buffering_nodes_rf_reset_protocol<sequencer_node<int>>(sequencer);
178*de0109beSIlya Mishin 
179*de0109beSIlya Mishin     test_limiter_node_rf_reset_protocol();
180*de0109beSIlya Mishin     test_join_node_rf_reset_protocol();
181*de0109beSIlya Mishin }
182*de0109beSIlya Mishin 
183*de0109beSIlya Mishin //! Graph reset rf_clear_edges
184*de0109beSIlya Mishin //! \brief \ref requirement
185*de0109beSIlya Mishin TEST_CASE("graph reset with rf_clear_edges") {
186*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
187*de0109beSIlya Mishin     using body = conformance::dummy_functor<int>;
188*de0109beSIlya Mishin 
189*de0109beSIlya Mishin     oneapi::tbb::flow::queue_node<int> successor(g);
190*de0109beSIlya Mishin     oneapi::tbb::flow::queue_node<std::tuple<int>> successor2(g);
191*de0109beSIlya Mishin     oneapi::tbb::flow::queue_node<oneapi::tbb::flow::indexer_node<int>::output_type> successor3(g);
192*de0109beSIlya Mishin 
193*de0109beSIlya Mishin     //node types
194*de0109beSIlya Mishin     oneapi::tbb::flow::continue_node<int> ct(g, body());
195*de0109beSIlya Mishin     oneapi::tbb::flow::split_node< std::tuple<int> > s(g);
196*de0109beSIlya Mishin     oneapi::tbb::flow::input_node<int> src(g, body());
197*de0109beSIlya Mishin     oneapi::tbb::flow::function_node<int, int> fxn(g, oneapi::tbb::flow::unlimited, body());
198*de0109beSIlya Mishin     oneapi::tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, oneapi::tbb::flow::unlimited, body());
199*de0109beSIlya Mishin     oneapi::tbb::flow::broadcast_node<int> bc(g);
200*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int> lim(g, 2);
201*de0109beSIlya Mishin     oneapi::tbb::flow::indexer_node<int> ind(g);
202*de0109beSIlya Mishin     oneapi::tbb::flow::join_node< std::tuple< int >, oneapi::tbb::flow::queueing > j(g);
203*de0109beSIlya Mishin     oneapi::tbb::flow::buffer_node<int> bf(g);
204*de0109beSIlya Mishin     oneapi::tbb::flow::priority_queue_node<int> pq(g);
205*de0109beSIlya Mishin     oneapi::tbb::flow::write_once_node<int> wo(g);
206*de0109beSIlya Mishin     oneapi::tbb::flow::overwrite_node<int> ovw(g);
207*de0109beSIlya Mishin     oneapi::tbb::flow::sequencer_node<int> seq(g, conformance::sequencer_functor<int>());
208*de0109beSIlya Mishin 
209*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(ct, successor);
210*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(s, successor);
211*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(src, successor);
212*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(fxn, successor);
213*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(m_fxn, successor);
214*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(bc, successor);
215*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(lim, successor);
216*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(ind, successor3);
217*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(j, successor2);
218*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(bf, successor);
219*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(pq, successor);
220*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(wo, successor);
221*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(ovw, successor);
222*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(seq, successor);
223*de0109beSIlya Mishin 
224*de0109beSIlya Mishin     g.wait_for_all();
225*de0109beSIlya Mishin     g.reset(oneapi::tbb::flow::rf_clear_edges);
226*de0109beSIlya Mishin 
227*de0109beSIlya Mishin     ct.try_put(oneapi::tbb::flow::continue_msg());
228*de0109beSIlya Mishin     s.try_put(std::tuple<int>{1});
229*de0109beSIlya Mishin     src.activate();
230*de0109beSIlya Mishin     fxn.try_put(1);
231*de0109beSIlya Mishin     m_fxn.try_put(1);
232*de0109beSIlya Mishin     bc.try_put(1);
233*de0109beSIlya Mishin     lim.try_put(1);
234*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(ind).try_put(1);
235*de0109beSIlya Mishin     oneapi::tbb::flow::input_port<0>(j).try_put(1);
236*de0109beSIlya Mishin     bf.try_put(1);
237*de0109beSIlya Mishin     pq.try_put(1);
238*de0109beSIlya Mishin     wo.try_put(1);
239*de0109beSIlya Mishin     ovw.try_put(1);
240*de0109beSIlya Mishin     seq.try_put(0);
241*de0109beSIlya Mishin 
242*de0109beSIlya Mishin     g.wait_for_all();
243*de0109beSIlya Mishin 
244*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(successor).size() == 0), "Message should not pass when edges doesn't exist");
245*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(successor2).size() == 0), "Message should not pass when edge doesn't exist");
246*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(successor3).size() == 0), "Message should not pass when edge doesn't exist");
247*de0109beSIlya Mishin }
248*de0109beSIlya Mishin 
249*de0109beSIlya Mishin //! Graph reset rf_reset_bodies
250*de0109beSIlya Mishin //! \brief \ref requirement
251*de0109beSIlya Mishin TEST_CASE("graph reset with rf_reset_bodies") {
252*de0109beSIlya Mishin     using namespace oneapi::tbb::flow;
253*de0109beSIlya Mishin     test_nodes_with_body_rf_reset_bodies<continue_node<int>, continue_msg>(serial);
254*de0109beSIlya Mishin     test_nodes_with_body_rf_reset_bodies<function_node<int, int>, int>(serial);
255*de0109beSIlya Mishin     test_nodes_with_body_rf_reset_bodies<multifunction_node<int, std::tuple<int>>, int>(serial);
256*de0109beSIlya Mishin     test_nodes_with_body_rf_reset_bodies<async_node<int, int>, int>(serial);
257*de0109beSIlya Mishin 
258*de0109beSIlya Mishin     graph g;
259*de0109beSIlya Mishin     conformance::counting_functor<int> counting_body(1);
260*de0109beSIlya Mishin     input_node<int> testing_node(g,counting_body);
261*de0109beSIlya Mishin     queue_node<int> q_node(g);
262*de0109beSIlya Mishin 
263*de0109beSIlya Mishin     make_edge(testing_node, q_node);
264*de0109beSIlya Mishin 
265*de0109beSIlya Mishin     testing_node.activate();
266*de0109beSIlya Mishin     g.wait_for_all();
267*de0109beSIlya Mishin 
268*de0109beSIlya Mishin     CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be executed");
269*de0109beSIlya Mishin 
270*de0109beSIlya Mishin     g.reset(rf_reset_bodies);
271*de0109beSIlya Mishin     testing_node.activate();
272*de0109beSIlya Mishin     g.wait_for_all();
273*de0109beSIlya Mishin 
274*de0109beSIlya Mishin     CHECK_MESSAGE((counting_body.execute_count == 2), "Body should be replaced with a copy of the body");
27551c0b2f7Stbbdev }
27649e08aacStbbdev 
27749e08aacStbbdev //! Graph cancel
27849e08aacStbbdev //! \brief \ref requirement
27949e08aacStbbdev TEST_CASE("graph cancel") {
280*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
28149e08aacStbbdev     CHECK_MESSAGE(!g.is_cancelled(), "Freshly created graph should not be cancelled." );
28249e08aacStbbdev 
28349e08aacStbbdev     g.cancel();
28449e08aacStbbdev     CHECK_MESSAGE(!g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." );
28549e08aacStbbdev 
28649e08aacStbbdev     g.wait_for_all();
28749e08aacStbbdev     CHECK_MESSAGE(g.is_cancelled(), "Waiting should allow checking the cancellation status." );
28849e08aacStbbdev 
28949e08aacStbbdev     g.reset();
29049e08aacStbbdev     CHECK_MESSAGE(!g.is_cancelled(), "Resetting must reset the cancellation status." );
29149e08aacStbbdev 
29249e08aacStbbdev     std::atomic<bool> cancelled(false);
29349e08aacStbbdev     std::atomic<unsigned> executed(0);
__anon8ef3c1da0202(int) 294*de0109beSIlya Mishin     oneapi::tbb::flow::function_node<int> f(g, oneapi::tbb::flow::serial, [&](int) {
29549e08aacStbbdev         ++executed;
29649e08aacStbbdev         while( !cancelled.load(std::memory_order_relaxed) )
29749e08aacStbbdev             std::this_thread::sleep_for(std::chrono::milliseconds(1));
29849e08aacStbbdev     });
29949e08aacStbbdev 
30049e08aacStbbdev     const unsigned N = 10;
30149e08aacStbbdev     for( unsigned i = 0; i < N; ++i )
30249e08aacStbbdev         f.try_put(0);
30349e08aacStbbdev 
__anon8ef3c1da0302null30449e08aacStbbdev     std::thread thr([&] {
30549e08aacStbbdev         while( !executed )
30649e08aacStbbdev             std::this_thread::sleep_for(std::chrono::milliseconds(1));
30749e08aacStbbdev         g.cancel();
30849e08aacStbbdev         cancelled.store(true, std::memory_order_relaxed);
30949e08aacStbbdev     });
31049e08aacStbbdev     g.wait_for_all();
31149e08aacStbbdev     thr.join();
31249e08aacStbbdev     CHECK_MESSAGE(g.is_cancelled(), "Wait for all should not change the cancellation status." );
31349e08aacStbbdev     CHECK_MESSAGE(1 == executed, "Buffered messages should be dropped by the cancelled graph." );
31449e08aacStbbdev }
315