151c0b2f7Stbbdev /*
2*a088cfa0SKonstantin Boyarinov     Copyright (c) 2020-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "conformance_flowgraph.h"
22*a088cfa0SKonstantin Boyarinov #include "common/test_invoke.h"
2351c0b2f7Stbbdev 
2451c0b2f7Stbbdev //! \file conformance_async_node.cpp
2551c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification
2651c0b2f7Stbbdev 
27de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/false>;
28de0109beSIlya Mishin using output_msg = conformance::message</*default_ctor*/false, /*copy_ctor*/false, /*copy_assign*/false>;
2951c0b2f7Stbbdev 
30de0109beSIlya Mishin //! Test async_node constructors
3151c0b2f7Stbbdev //! \brief \ref requirement
32de0109beSIlya Mishin TEST_CASE("async_node constructors"){
33de0109beSIlya Mishin     using namespace oneapi::tbb::flow;
34de0109beSIlya Mishin     graph g;
3551c0b2f7Stbbdev 
36de0109beSIlya Mishin     conformance::dummy_functor<int> fun;
37de0109beSIlya Mishin 
38de0109beSIlya Mishin     async_node<int, int> fn1(g, unlimited, fun);
39de0109beSIlya Mishin     async_node<int, int> fn2(g, unlimited, fun, oneapi::tbb::flow::node_priority_t(1));
40de0109beSIlya Mishin 
41de0109beSIlya Mishin     async_node<int, int, lightweight> lw_node1(g, serial, fun, lightweight());
42de0109beSIlya Mishin     async_node<int, int, lightweight> lw_node2(g, serial, fun, lightweight(), oneapi::tbb::flow::node_priority_t(1));
4351c0b2f7Stbbdev }
4451c0b2f7Stbbdev 
45de0109beSIlya Mishin //! Test buffering property
46de0109beSIlya Mishin //! \brief \ref requirement
47de0109beSIlya Mishin TEST_CASE("async_node buffering") {
48de0109beSIlya Mishin     conformance::dummy_functor<int> fun;
49de0109beSIlya Mishin     conformance::test_buffering<oneapi::tbb::flow::async_node<input_msg, int>, input_msg>(oneapi::tbb::flow::unlimited, fun);
5051c0b2f7Stbbdev }
5151c0b2f7Stbbdev 
52de0109beSIlya Mishin //! Test priorities work in single-threaded configuration
53de0109beSIlya Mishin //! \brief \ref requirement
54de0109beSIlya Mishin TEST_CASE("async_node priority support"){
55de0109beSIlya Mishin     conformance::test_priority<oneapi::tbb::flow::async_node<input_msg, int>, input_msg>(oneapi::tbb::flow::unlimited);
56de0109beSIlya Mishin }
57de0109beSIlya Mishin 
58de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src, has a copy of the initial body used by src, and has the same concurrency threshold as src.
59de0109beSIlya Mishin //! The predecessors and successors of src are not copied.
60de0109beSIlya Mishin //! \brief \ref requirement
61de0109beSIlya Mishin TEST_CASE("async_node copy constructor"){
62de0109beSIlya Mishin     conformance::test_copy_ctor<oneapi::tbb::flow::async_node<int, int>>();
6351c0b2f7Stbbdev }
6451c0b2f7Stbbdev 
6551c0b2f7Stbbdev //! Test calling async body
6651c0b2f7Stbbdev //! \brief \ref interface \ref requirement
6751c0b2f7Stbbdev TEST_CASE("Test async_node body") {
68de0109beSIlya Mishin     conformance::test_body_exec<oneapi::tbb::flow::async_node<input_msg, output_msg>, input_msg, output_msg>(oneapi::tbb::flow::unlimited);
6951c0b2f7Stbbdev }
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev //! Test async_node inheritance relations
7251c0b2f7Stbbdev //! \brief \ref interface
7351c0b2f7Stbbdev TEST_CASE("async_node superclasses"){
74de0109beSIlya Mishin     conformance::test_inheritance<oneapi::tbb::flow::async_node<int, int>, int, int>();
75de0109beSIlya Mishin     conformance::test_inheritance<oneapi::tbb::flow::async_node<void*, float>, void*, float>();
76de0109beSIlya Mishin     conformance::test_inheritance<oneapi::tbb::flow::async_node<input_msg, output_msg>, input_msg, output_msg>();
77de0109beSIlya Mishin }
78de0109beSIlya Mishin 
79de0109beSIlya Mishin //! Test node broadcast messages to successors
80de0109beSIlya Mishin //! \brief \ref requirement
81de0109beSIlya Mishin TEST_CASE("async_node broadcast"){
82de0109beSIlya Mishin     conformance::counting_functor<int> fun(conformance::expected);
83de0109beSIlya Mishin     conformance::test_forwarding<oneapi::tbb::flow::async_node<input_msg, int>, input_msg, int>(1, oneapi::tbb::flow::unlimited, fun);
84de0109beSIlya Mishin }
85de0109beSIlya Mishin 
86de0109beSIlya Mishin //! Test async_node has a user-settable concurrency limit. It can be set to one of predefined values.
87de0109beSIlya Mishin //! The user can also provide a value of type std::size_t to limit concurrency.
88de0109beSIlya Mishin //! Test that not more than limited threads works in parallel.
89de0109beSIlya Mishin //! \brief \ref requirement
90de0109beSIlya Mishin TEST_CASE("concurrency follows set limits"){
91de0109beSIlya Mishin     conformance::test_concurrency<oneapi::tbb::flow::async_node<int, int>>();
92de0109beSIlya Mishin }
93de0109beSIlya Mishin 
94de0109beSIlya Mishin //! Test body copying and copy_body logic
95de0109beSIlya Mishin //! Test the body object passed to a node is copied
96de0109beSIlya Mishin //! \brief \ref interface
97de0109beSIlya Mishin TEST_CASE("async_node body copying"){
98de0109beSIlya Mishin     conformance::test_copy_body_function<oneapi::tbb::flow::async_node<int, int>, conformance::copy_counting_object<int>>(oneapi::tbb::flow::unlimited);
99de0109beSIlya Mishin }
100de0109beSIlya Mishin 
101de0109beSIlya Mishin //! Test node reject the incoming message if the concurrency limit achieved.
102de0109beSIlya Mishin //! \brief \ref interface
103de0109beSIlya Mishin TEST_CASE("async_node with rejecting policy"){
104de0109beSIlya Mishin     conformance::test_rejecting<oneapi::tbb::flow::async_node<int, int, oneapi::tbb::flow::rejecting>>();
105de0109beSIlya Mishin }
106de0109beSIlya Mishin 
107de0109beSIlya Mishin //! Test node Input class meet the DefaultConstructible and CopyConstructible requirements and Output class meet the CopyConstructible requirements.
108de0109beSIlya Mishin //! \brief \ref interface \ref requirement
109de0109beSIlya Mishin TEST_CASE("Test async_node Output and Input class") {
110de0109beSIlya Mishin     using Body = conformance::copy_counting_object<int>;
111de0109beSIlya Mishin     conformance::test_output_input_class<oneapi::tbb::flow::async_node<Body, Body>, Body>();
112de0109beSIlya Mishin }
113de0109beSIlya Mishin 
114de0109beSIlya Mishin //! Test the body of assync_node typically submits the messages to an external activity for processing outside of the graph.
115de0109beSIlya Mishin //! \brief \ref interface
116de0109beSIlya Mishin TEST_CASE("async_node with rejecting policy"){
117de0109beSIlya Mishin     using async_node_type = tbb::flow::async_node<int, int>;
118de0109beSIlya Mishin     using gateway_type = async_node_type::gateway_type;
119de0109beSIlya Mishin 
120de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
121de0109beSIlya Mishin     std::atomic<bool> flag{false};
122de0109beSIlya Mishin     std::thread thr;
123de0109beSIlya Mishin     async_node_type testing_node{
124de0109beSIlya Mishin       g, tbb::flow::unlimited,
__anon83a565ab0102() 125de0109beSIlya Mishin       [&](const int& input, gateway_type& gateway) {
126de0109beSIlya Mishin           gateway.reserve_wait();
127de0109beSIlya Mishin           thr = std::thread{[&]{
128de0109beSIlya Mishin               flag = true;
129de0109beSIlya Mishin               gateway.try_put(input);
130de0109beSIlya Mishin               gateway.release_wait();
131de0109beSIlya Mishin           }};
132de0109beSIlya Mishin       }
133de0109beSIlya Mishin     };
134de0109beSIlya Mishin 
135de0109beSIlya Mishin     testing_node.try_put(1);
136de0109beSIlya Mishin     g.wait_for_all();
137de0109beSIlya Mishin     CHECK_MESSAGE((flag.load()), "The body of assync_node must submits the messages to an external activity for processing outside of the graph");
138de0109beSIlya Mishin     thr.join();
13951c0b2f7Stbbdev }
140*a088cfa0SKonstantin Boyarinov 
141*a088cfa0SKonstantin Boyarinov #if __TBB_CPP17_INVOKE_PRESENT
142*a088cfa0SKonstantin Boyarinov //! Test that async_node uses std::invoke to run the body
143*a088cfa0SKonstantin Boyarinov //! \brief \ref requirement
144*a088cfa0SKonstantin Boyarinov TEST_CASE("async_node and std::invoke") {
145*a088cfa0SKonstantin Boyarinov     using namespace oneapi::tbb::flow;
146*a088cfa0SKonstantin Boyarinov 
147*a088cfa0SKonstantin Boyarinov     using start_node_type = function_node<std::size_t, test_invoke::SmartID<std::size_t>>;
148*a088cfa0SKonstantin Boyarinov     using async_node_type = async_node<test_invoke::SmartID<std::size_t>, std::size_t>;
149*a088cfa0SKonstantin Boyarinov 
150*a088cfa0SKonstantin Boyarinov     auto async_body = &test_invoke::SmartID<std::size_t>::template send_id_to_gateway<typename async_node_type::gateway_type>;
151*a088cfa0SKonstantin Boyarinov 
152*a088cfa0SKonstantin Boyarinov     graph g;
__anon83a565ab0302(std::size_t i) 153*a088cfa0SKonstantin Boyarinov     start_node_type starter(g, serial, [](std::size_t i) -> test_invoke::SmartID<std::size_t> { return {i}; });
154*a088cfa0SKonstantin Boyarinov     async_node_type activity_submitter(g, serial, async_body);
155*a088cfa0SKonstantin Boyarinov     buffer_node<std::size_t> buf(g);
156*a088cfa0SKonstantin Boyarinov 
157*a088cfa0SKonstantin Boyarinov     make_edge(starter, activity_submitter);
158*a088cfa0SKonstantin Boyarinov     make_edge(activity_submitter, buf);
159*a088cfa0SKonstantin Boyarinov 
160*a088cfa0SKonstantin Boyarinov     starter.try_put(1);
161*a088cfa0SKonstantin Boyarinov 
162*a088cfa0SKonstantin Boyarinov     g.wait_for_all();
163*a088cfa0SKonstantin Boyarinov     std::size_t result = 0;
164*a088cfa0SKonstantin Boyarinov     CHECK(buf.try_get(result));
165*a088cfa0SKonstantin Boyarinov     CHECK(result == 1);
166*a088cfa0SKonstantin Boyarinov     CHECK(!buf.try_get(result));
167*a088cfa0SKonstantin Boyarinov }
168*a088cfa0SKonstantin Boyarinov #endif
169