1 /*
2 Copyright (c) 2020-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #define SEQUENCER_NODE
22
23 #include "conformance_flowgraph.h"
24 #include "common/test_invoke.h"
25
26 //! \file conformance_sequencer_node.cpp
27 //! \brief Test for [flow_graph.sequencer_node] specification
28
29
30 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
31 template <typename Body>
test_deduction_guides_common(Body body)32 void test_deduction_guides_common(Body body) {
33 using namespace tbb::flow;
34 graph g;
35 broadcast_node<int> br(g);
36
37 sequencer_node s1(g, body);
38 static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
39
40 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
41 sequencer_node s2(follows(br), body);
42 static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
43 #endif
44
45 sequencer_node s3(s1);
46 static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
47 }
48
sequencer_body_f(const int &)49 std::size_t sequencer_body_f(const int&) { return 1; }
50
test_deduction_guides()51 void test_deduction_guides() {
52 test_deduction_guides_common([](const int&)->std::size_t { return 1; });
53 test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; });
54 test_deduction_guides_common(sequencer_body_f);
55 }
56 #endif
57
58 //! Test deduction guides
59 //! \brief \ref interface \ref requirement
60 TEST_CASE("Deduction guides"){
61 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
62 test_deduction_guides();
63 #endif
64 }
65
66 //! Test sequencer_node single_push
67 //! \brief \ref requirement
68 TEST_CASE("sequencer_node single_push"){
69 conformance::sequencer_functor<int> sequencer;
70 conformance::test_forwarding_single_push<oneapi::tbb::flow::sequencer_node<int>>(sequencer);
71 }
72
73 //! Test function_node buffering
74 //! \brief \ref requirement
75 TEST_CASE("sequencer_node buffering"){
76 conformance::sequencer_functor<int> sequencer;
77 conformance::test_buffering<oneapi::tbb::flow::sequencer_node<int>, int>(sequencer);
78 }
79
80 //! Constructs an empty sequencer_node that belongs to the same graph g as src.
81 //! Any intermediate state of src, including its links to predecessors and successors, is not copied.
82 //! \brief \ref requirement
83 TEST_CASE("sequencer_node copy constructor"){
84 conformance::sequencer_functor<int> sequencer;
85 conformance::test_copy_ctor_for_buffering_nodes<oneapi::tbb::flow::sequencer_node<int>>(sequencer);
86 }
87
88 //! Test inheritance relations
89 //! \brief \ref interface
90 TEST_CASE("sequencer_node superclasses"){
91 conformance::test_inheritance<oneapi::tbb::flow::sequencer_node<int>, int, int>();
92 conformance::test_inheritance<oneapi::tbb::flow::sequencer_node<void*>, void*, void*>();
93 }
94
95 //! Test the sequencer_node rejects duplicate sequencer numbers
96 //! \brief \ref interface
97 TEST_CASE("sequencer_node rejects duplicate"){
98 oneapi::tbb::flow::graph g;
99 conformance::sequencer_functor<int> sequencer;
100
101 oneapi::tbb::flow::sequencer_node<int> node(g, sequencer);
102
103 node.try_put(1);
104
105 CHECK_MESSAGE((node.try_put(1) == false), "sequencer_node must rejects duplicate sequencer numbers");
106 g.wait_for_all();
107 }
108
109 //! Test queue_node node `try_put()` and `try_get()`
110 //! \brief \ref requirement
111 TEST_CASE("queue_node methods"){
112 oneapi::tbb::flow::graph g;
113 conformance::sequencer_functor<int> sequencer;
114
115 oneapi::tbb::flow::sequencer_node<int> node(g, sequencer);
116
117 node.try_put(1);
118 node.try_put(0);
119 node.try_put(1);
120 g.wait_for_all();
121
122 int tmp = -1;
123 CHECK_MESSAGE((node.try_get(tmp) == true), "Getting from sequencer should succeed");
124 CHECK_MESSAGE((tmp == 0), "Received value should be correct");
125
126 tmp = -1;
127 CHECK_MESSAGE((node.try_get(tmp) == true), "Getting from sequencer should succeed");
128 CHECK_MESSAGE((tmp == 1), "Received value should be correct");
129
130 tmp = -1;
131 CHECK_MESSAGE((node.try_get(tmp) == false), "Getting from sequencer should not succeed");
132 }
133
134 //! The example demonstrates ordering capabilities of the sequencer_node.
135 //! While being processed in parallel, the data is passed to the successor node in the exact same order it was read.
136 //! \brief \ref requirement
137 TEST_CASE("sequencer_node ordering"){
138 using namespace oneapi::tbb::flow;
139 using message = conformance::sequencer_functor<int>::seq_message;
140 graph g;
141
142 // Due to parallelism the node can push messages to its successors in any order
__anona3025c380302(message msg) 143 function_node<message, message> process(g, unlimited, [] (message msg) {
144 msg.data++;
145 return msg;
146 });
147
148 sequencer_node<message> ordering(g, conformance::sequencer_functor<int>());
149
150 std::atomic<std::size_t> counter{0};
__anona3025c380402(const message& msg) 151 function_node<message> writer(g, tbb::flow::serial, [&] (const message& msg) {
152 CHECK_MESSAGE((msg.id == counter++), "The data is passed to the successor node in the exact same order it was read");
153 });
154
155 tbb::flow::make_edge(process, ordering);
156 tbb::flow::make_edge(ordering, writer);
157
158 for (std::size_t i = 0; i < 100; ++i) {
159 message msg = {i, 0};
160 process.try_put(msg);
161 }
162
163 g.wait_for_all();
164 }
165
166 #if __TBB_CPP17_INVOKE_PRESENT
167 //! Test that sequencer node uses std::invoke to execute the body
168 //! \brief \ref requirement
169 TEST_CASE("sequencer_node and std::invoke") {
170 using namespace oneapi::tbb::flow;
171
172 graph g;
173
__anona3025c380502(std::size_t x) 174 function_node<std::size_t, test_invoke::SmartID<std::size_t>> starter(g, unlimited, [](std::size_t x) { return test_invoke::SmartID(x); });
175 sequencer_node<test_invoke::SmartID<std::size_t>> seq1(g, &test_invoke::SmartID<std::size_t>::get_id); // Member function
176 sequencer_node<test_invoke::SmartID<std::size_t>> seq2(g, &test_invoke::SmartID<std::size_t>::id); // Member object
177
178 std::size_t expected_item = 0;
179
__anona3025c380602(const test_invoke::SmartID<std::size_t>& x) 180 function_node<test_invoke::SmartID<std::size_t>, std::size_t> check(g, serial, [&](const test_invoke::SmartID<std::size_t>& x) {
181 CHECK(x.id == expected_item);
182 ++expected_item;
183 return x.id;
184 });
185
186 // Build the first graph
187 make_edge(starter, seq1);
188 make_edge(seq1, check);
189
190 std::size_t objects_count = 10;
191 for (std::size_t i = 0; i < objects_count; ++i) {
192 starter.try_put(objects_count - i - 1);
193 }
194
195 g.wait_for_all();
196
197 CHECK(expected_item == objects_count);
198
199 // Rebuild the graph
200 g.reset(reset_flags::rf_clear_edges);
201 make_edge(starter, seq2);
202 make_edge(seq2, check);
203 expected_item = 0;
204
205 for (std::size_t i = 0; i < objects_count; ++i) {
206 starter.try_put(objects_count - i - 1);
207 }
208
209 g.wait_for_all();
210
211 CHECK(expected_item == objects_count);
212 }
213
214 #endif // __TBB_CPP17_INVOKE_PRESENT
215