1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #define SEQUENCER_NODE
22 
23 #include "conformance_flowgraph.h"
24 #include "common/test_invoke.h"
25 
26 //! \file conformance_sequencer_node.cpp
27 //! \brief Test for [flow_graph.sequencer_node] specification
28 
29 
30 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
31 template <typename Body>
test_deduction_guides_common(Body body)32 void test_deduction_guides_common(Body body) {
33     using namespace tbb::flow;
34     graph g;
35     broadcast_node<int> br(g);
36 
37     sequencer_node s1(g, body);
38     static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
39 
40 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
41     sequencer_node s2(follows(br), body);
42     static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
43 #endif
44 
45     sequencer_node s3(s1);
46     static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
47 }
48 
sequencer_body_f(const int &)49 std::size_t sequencer_body_f(const int&) { return 1; }
50 
test_deduction_guides()51 void test_deduction_guides() {
52     test_deduction_guides_common([](const int&)->std::size_t { return 1; });
53     test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; });
54     test_deduction_guides_common(sequencer_body_f);
55 }
56 #endif
57 
58 //! Test deduction guides
59 //! \brief \ref interface \ref requirement
60 TEST_CASE("Deduction guides"){
61 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
62     test_deduction_guides();
63 #endif
64 }
65 
66 //! Test sequencer_node single_push
67 //! \brief \ref requirement
68 TEST_CASE("sequencer_node single_push"){
69     conformance::sequencer_functor<int> sequencer;
70     conformance::test_forwarding_single_push<oneapi::tbb::flow::sequencer_node<int>>(sequencer);
71 }
72 
73 //! Test function_node buffering
74 //! \brief \ref requirement
75 TEST_CASE("sequencer_node buffering"){
76     conformance::sequencer_functor<int> sequencer;
77     conformance::test_buffering<oneapi::tbb::flow::sequencer_node<int>, int>(sequencer);
78 }
79 
80 //! Constructs an empty sequencer_node that belongs to the same graph g as src.
81 //! Any intermediate state of src, including its links to predecessors and successors, is not copied.
82 //! \brief \ref requirement
83 TEST_CASE("sequencer_node copy constructor"){
84     conformance::sequencer_functor<int> sequencer;
85     conformance::test_copy_ctor_for_buffering_nodes<oneapi::tbb::flow::sequencer_node<int>>(sequencer);
86 }
87 
88 //! Test inheritance relations
89 //! \brief \ref interface
90 TEST_CASE("sequencer_node superclasses"){
91     conformance::test_inheritance<oneapi::tbb::flow::sequencer_node<int>, int, int>();
92     conformance::test_inheritance<oneapi::tbb::flow::sequencer_node<void*>, void*, void*>();
93 }
94 
95 //! Test the sequencer_node rejects duplicate sequencer numbers
96 //! \brief \ref interface
97 TEST_CASE("sequencer_node rejects duplicate"){
98     oneapi::tbb::flow::graph g;
99     conformance::sequencer_functor<int> sequencer;
100 
101     oneapi::tbb::flow::sequencer_node<int> node(g, sequencer);
102 
103     node.try_put(1);
104 
105     CHECK_MESSAGE((node.try_put(1) == false), "sequencer_node must rejects duplicate sequencer numbers");
106     g.wait_for_all();
107 }
108 
109 //! Test queue_node node `try_put()` and `try_get()`
110 //! \brief \ref requirement
111 TEST_CASE("queue_node methods"){
112     oneapi::tbb::flow::graph g;
113     conformance::sequencer_functor<int> sequencer;
114 
115     oneapi::tbb::flow::sequencer_node<int> node(g, sequencer);
116 
117     node.try_put(1);
118     node.try_put(0);
119     node.try_put(1);
120     g.wait_for_all();
121 
122     int tmp = -1;
123     CHECK_MESSAGE((node.try_get(tmp) == true), "Getting from sequencer should succeed");
124     CHECK_MESSAGE((tmp == 0), "Received value should be correct");
125 
126     tmp = -1;
127     CHECK_MESSAGE((node.try_get(tmp) == true), "Getting from sequencer should succeed");
128     CHECK_MESSAGE((tmp == 1), "Received value should be correct");
129 
130     tmp = -1;
131     CHECK_MESSAGE((node.try_get(tmp) == false), "Getting from sequencer should not succeed");
132 }
133 
134 //! The example demonstrates ordering capabilities of the sequencer_node.
135 //! While being processed in parallel, the data is passed to the successor node in the exact same order it was read.
136 //! \brief \ref requirement
137 TEST_CASE("sequencer_node ordering"){
138     using namespace oneapi::tbb::flow;
139     using message = conformance::sequencer_functor<int>::seq_message;
140     graph g;
141 
142     // Due to parallelism the node can push messages to its successors in any order
__anona3025c380302(message msg) 143     function_node<message, message> process(g, unlimited, [] (message msg) {
144         msg.data++;
145         return msg;
146     });
147 
148     sequencer_node<message> ordering(g, conformance::sequencer_functor<int>());
149 
150     std::atomic<std::size_t> counter{0};
__anona3025c380402(const message& msg) 151     function_node<message> writer(g, tbb::flow::serial, [&] (const message& msg) {
152         CHECK_MESSAGE((msg.id == counter++), "The data is passed to the successor node in the exact same order it was read");
153     });
154 
155     tbb::flow::make_edge(process, ordering);
156     tbb::flow::make_edge(ordering, writer);
157 
158     for (std::size_t i = 0; i < 100; ++i) {
159         message msg = {i, 0};
160         process.try_put(msg);
161     }
162 
163     g.wait_for_all();
164 }
165 
166 #if __TBB_CPP17_INVOKE_PRESENT
167 //! Test that sequencer node uses std::invoke to execute the body
168 //! \brief \ref requirement
169 TEST_CASE("sequencer_node and std::invoke") {
170     using namespace oneapi::tbb::flow;
171 
172     graph g;
173 
__anona3025c380502(std::size_t x) 174     function_node<std::size_t, test_invoke::SmartID<std::size_t>> starter(g, unlimited, [](std::size_t x) { return test_invoke::SmartID(x); });
175     sequencer_node<test_invoke::SmartID<std::size_t>> seq1(g, &test_invoke::SmartID<std::size_t>::get_id); // Member function
176     sequencer_node<test_invoke::SmartID<std::size_t>> seq2(g, &test_invoke::SmartID<std::size_t>::id); // Member object
177 
178     std::size_t expected_item = 0;
179 
__anona3025c380602(const test_invoke::SmartID<std::size_t>& x) 180     function_node<test_invoke::SmartID<std::size_t>, std::size_t> check(g, serial, [&](const test_invoke::SmartID<std::size_t>& x) {
181         CHECK(x.id == expected_item);
182         ++expected_item;
183         return x.id;
184     });
185 
186     // Build the first graph
187     make_edge(starter, seq1);
188     make_edge(seq1, check);
189 
190     std::size_t objects_count = 10;
191     for (std::size_t i = 0; i < objects_count; ++i) {
192         starter.try_put(objects_count - i - 1);
193     }
194 
195     g.wait_for_all();
196 
197     CHECK(expected_item == objects_count);
198 
199     // Rebuild the graph
200     g.reset(reset_flags::rf_clear_edges);
201     make_edge(starter, seq2);
202     make_edge(seq2, check);
203     expected_item = 0;
204 
205     for (std::size_t i = 0; i < objects_count; ++i) {
206         starter.try_put(objects_count - i - 1);
207     }
208 
209     g.wait_for_all();
210 
211     CHECK(expected_item == objects_count);
212 }
213 
214 #endif // __TBB_CPP17_INVOKE_PRESENT
215