1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "conformance_flowgraph.h"
22 
23 //! \file conformance_async_node.cpp
24 //! \brief Test for [flow_graph.async_node] specification
25 
26 using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true, /*copy_assign*/false>;
27 using output_msg = conformance::message</*default_ctor*/false, /*copy_ctor*/false, /*copy_assign*/false>;
28 
29 //! Test async_node constructors
30 //! \brief \ref requirement
31 TEST_CASE("async_node constructors"){
32     using namespace oneapi::tbb::flow;
33     graph g;
34 
35     conformance::dummy_functor<int> fun;
36 
37     async_node<int, int> fn1(g, unlimited, fun);
38     async_node<int, int> fn2(g, unlimited, fun, oneapi::tbb::flow::node_priority_t(1));
39 
40     async_node<int, int, lightweight> lw_node1(g, serial, fun, lightweight());
41     async_node<int, int, lightweight> lw_node2(g, serial, fun, lightweight(), oneapi::tbb::flow::node_priority_t(1));
42 }
43 
44 //! Test buffering property
45 //! \brief \ref requirement
46 TEST_CASE("async_node buffering") {
47     conformance::dummy_functor<int> fun;
48     conformance::test_buffering<oneapi::tbb::flow::async_node<input_msg, int>, input_msg>(oneapi::tbb::flow::unlimited, fun);
49 }
50 
51 //! Test priorities work in single-threaded configuration
52 //! \brief \ref requirement
53 TEST_CASE("async_node priority support"){
54     conformance::test_priority<oneapi::tbb::flow::async_node<input_msg, int>, input_msg>(oneapi::tbb::flow::unlimited);
55 }
56 
57 //! The node that is constructed has a reference to the same graph object as src, has a copy of the initial body used by src, and has the same concurrency threshold as src.
58 //! The predecessors and successors of src are not copied.
59 //! \brief \ref requirement
60 TEST_CASE("async_node copy constructor"){
61     conformance::test_copy_ctor<oneapi::tbb::flow::async_node<int, int>>();
62 }
63 
64 //! Test calling async body
65 //! \brief \ref interface \ref requirement
66 TEST_CASE("Test async_node body") {
67     conformance::test_body_exec<oneapi::tbb::flow::async_node<input_msg, output_msg>, input_msg, output_msg>(oneapi::tbb::flow::unlimited);
68 }
69 
70 //! Test async_node inheritance relations
71 //! \brief \ref interface
72 TEST_CASE("async_node superclasses"){
73     conformance::test_inheritance<oneapi::tbb::flow::async_node<int, int>, int, int>();
74     conformance::test_inheritance<oneapi::tbb::flow::async_node<void*, float>, void*, float>();
75     conformance::test_inheritance<oneapi::tbb::flow::async_node<input_msg, output_msg>, input_msg, output_msg>();
76 }
77 
78 //! Test node broadcast messages to successors
79 //! \brief \ref requirement
80 TEST_CASE("async_node broadcast"){
81     conformance::counting_functor<int> fun(conformance::expected);
82     conformance::test_forwarding<oneapi::tbb::flow::async_node<input_msg, int>, input_msg, int>(1, oneapi::tbb::flow::unlimited, fun);
83 }
84 
85 //! Test async_node has a user-settable concurrency limit. It can be set to one of predefined values.
86 //! The user can also provide a value of type std::size_t to limit concurrency.
87 //! Test that not more than limited threads works in parallel.
88 //! \brief \ref requirement
89 TEST_CASE("concurrency follows set limits"){
90     conformance::test_concurrency<oneapi::tbb::flow::async_node<int, int>>();
91 }
92 
93 //! Test body copying and copy_body logic
94 //! Test the body object passed to a node is copied
95 //! \brief \ref interface
96 TEST_CASE("async_node body copying"){
97     conformance::test_copy_body_function<oneapi::tbb::flow::async_node<int, int>, conformance::copy_counting_object<int>>(oneapi::tbb::flow::unlimited);
98 }
99 
100 //! Test node reject the incoming message if the concurrency limit achieved.
101 //! \brief \ref interface
102 TEST_CASE("async_node with rejecting policy"){
103     conformance::test_rejecting<oneapi::tbb::flow::async_node<int, int, oneapi::tbb::flow::rejecting>>();
104 }
105 
106 //! Test node Input class meet the DefaultConstructible and CopyConstructible requirements and Output class meet the CopyConstructible requirements.
107 //! \brief \ref interface \ref requirement
108 TEST_CASE("Test async_node Output and Input class") {
109     using Body = conformance::copy_counting_object<int>;
110     conformance::test_output_input_class<oneapi::tbb::flow::async_node<Body, Body>, Body>();
111 }
112 
113 //! Test the body of assync_node typically submits the messages to an external activity for processing outside of the graph.
114 //! \brief \ref interface
115 TEST_CASE("async_node with rejecting policy"){
116     using async_node_type = tbb::flow::async_node<int, int>;
117     using gateway_type = async_node_type::gateway_type;
118 
119     oneapi::tbb::flow::graph g;
120     std::atomic<bool> flag{false};
121     std::thread thr;
122     async_node_type testing_node{
123       g, tbb::flow::unlimited,
124       [&](const int& input, gateway_type& gateway) {
125           gateway.reserve_wait();
126           thr = std::thread{[&]{
127               flag = true;
128               gateway.try_put(input);
129               gateway.release_wait();
130           }};
131       }
132     };
133 
134     testing_node.try_put(1);
135     g.wait_for_all();
136     CHECK_MESSAGE((flag.load()), "The body of assync_node must submits the messages to an external activity for processing outside of the graph");
137     thr.join();
138 }
139