1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/test.h"
22 
23 #include "common/utils.h"
24 #include "common/graph_utils.h"
25 
26 #include "oneapi/tbb/flow_graph.h"
27 #include "oneapi/tbb/task_arena.h"
28 #include "oneapi/tbb/global_control.h"
29 
30 #include "conformance_flowgraph.h"
31 
32 //! \file conformance_graph.cpp
33 //! \brief Test for [flow_graph.graph] specification
34 
35 using namespace oneapi::tbb::flow;
36 using namespace std;
37 
38 //! Graph reset
39 //! \brief \ref requirement
40 TEST_CASE("graph reset") {
41     graph g;
42     size_t concurrency_limit = 1;
43     oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit);
44 
45     // Functional nodes
46     // TODO: Check input_node, multifunction_node, async_node similarly
47 
48     // continue_node
49     bool flag = false;
50     continue_node<int> source(g, 2, [&](const continue_msg&){ flag = true; return 1;});
51 
52     source.try_put(continue_msg());
53     g.wait_for_all();
54 
55     CHECK_MESSAGE( (flag == false), "Should still be false");
56 
57     g.reset(rf_reset_protocol);
58 
59     source.try_put(continue_msg());
60     g.wait_for_all();
61     CHECK_MESSAGE( (flag == false), "Should still be false");
62 
63     source.try_put(continue_msg());
64     g.wait_for_all();
65     CHECK_MESSAGE( (flag == true), "Should be true");
66 
67     // functional_node
68     int flag_fun = 0;
69     function_node<int, int, queueing> f(g, serial, [&](const int& v){ flag_fun++; return v;});
70 
71     f.try_put(0);
72     f.try_put(0);
73 
74     CHECK_MESSAGE( (flag_fun == 0), "Should not be updated");
75     g.reset(rf_reset_protocol);
76 
77     g.wait_for_all();
78     CHECK_MESSAGE( (flag_fun == 1), "Should be updated");
79 
80     // Buffering nodes
81     // TODO: Check overwrite_node write_once_node priority_queue_node sequencer_node similarly
82 
83     // buffer_node
84     buffer_node<int> buff(g);
85 
86     int tmp = -1;
87     CHECK_MESSAGE( (buff.try_get(tmp) == false), "try_get should not succeed");
88     CHECK_MESSAGE( (tmp == -1), "Value should not be updated");
89 
90     buff.try_put(1);
91 
92     g.reset();
93 
94     tmp = -1;
95     CHECK_MESSAGE( (buff.try_get(tmp) == false), "try_get should not succeed");
96     CHECK_MESSAGE( (tmp == -1), "Value should not be updated");
97     g.wait_for_all();
98 
99     // queue_node
100     queue_node<int> q(g);
101 
102     tmp = -1;
103     CHECK_MESSAGE( (q.try_get(tmp) == false), "try_get should not succeed");
104     CHECK_MESSAGE( (tmp == -1), "Value should not be updated");
105 
106     q.try_put(1);
107 
108     g.reset();
109 
110     tmp = -1;
111     CHECK_MESSAGE( (q.try_get(tmp) == false), "try_get should not succeed");
112     CHECK_MESSAGE( (tmp == -1), "Value should not be updated");
113     g.wait_for_all();
114 
115     // Check rf_clear_edges
116     continue_node<int> src(g, [&](const continue_msg&){ return 1;});
117     queue_node<int> dest(g);
118     make_edge(src, dest);
119 
120     src.try_put(continue_msg());
121     g.wait_for_all();
122 
123     tmp = -1;
124     CHECK_MESSAGE( (dest.try_get(tmp)== true), "Message should pass when edge exists");
125     CHECK_MESSAGE( (tmp == 1 ), "Message should pass when edge exists");
126     CHECK_MESSAGE( (dest.try_get(tmp)== false), "Message should not pass after item is consumed");
127 
128     g.reset(rf_clear_edges);
129 
130     tmp = -1;
131     src.try_put(continue_msg());
132     g.wait_for_all();
133 
134     CHECK_MESSAGE( (dest.try_get(tmp)== false), "Message should not pass when edge doesn't exist");
135     CHECK_MESSAGE( (tmp == -1), "Value should not be altered");
136 
137     // TODO: Add check that default invocaiton is the same as with rf_reset_protocol
138     // TODO: See if specification for broadcast_node and other service nodes is sufficient for reset checks
139 }
140 
141 //! Graph cancel
142 //! \brief \ref requirement
143 TEST_CASE("graph cancel") {
144     graph g;
145     CHECK_MESSAGE( !g.is_cancelled(), "Freshly created graph should not be cancelled." );
146 
147     g.cancel();
148     CHECK_MESSAGE( !g.is_cancelled(), "Cancelled status should appear only after the wait_for_all() call." );
149 
150     g.wait_for_all();
151     CHECK_MESSAGE( g.is_cancelled(), "Waiting should allow checking the cancellation status." );
152 
153     g.reset();
154     CHECK_MESSAGE( !g.is_cancelled(), "Resetting must reset the cancellation status." );
155 
156     std::atomic<bool> cancelled(false);
157     std::atomic<unsigned> executed(0);
158     function_node<int> f(g, serial, [&](int) {
159         ++executed;
160         while( !cancelled.load(std::memory_order_relaxed) )
161             std::this_thread::sleep_for(std::chrono::milliseconds(1));
162     });
163 
164     const unsigned N = 10;
165     for( unsigned i = 0; i < N; ++i )
166         f.try_put(0);
167 
168     std::thread thr([&] {
169         while( !executed )
170             std::this_thread::sleep_for(std::chrono::milliseconds(1));
171         g.cancel();
172         cancelled.store(true, std::memory_order_relaxed);
173     });
174     g.wait_for_all();
175     thr.join();
176     CHECK_MESSAGE( g.is_cancelled(), "Wait for all should not change the cancellation status." );
177     CHECK_MESSAGE( 1 == executed, "Buffered messages should be dropped by the cancelled graph." );
178 
179 }
180