151c0b2f7Stbbdev /*
251c0b2f7Stbbdev     Copyright (c) 2020 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev 
1851c0b2f7Stbbdev #include "common/test.h"
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev #include "common/utils.h"
2151c0b2f7Stbbdev #include "common/graph_utils.h"
2251c0b2f7Stbbdev 
23*49e08aacStbbdev #include "oneapi/tbb/flow_graph.h"
24*49e08aacStbbdev #include "oneapi/tbb/task_arena.h"
25*49e08aacStbbdev #include "oneapi/tbb/global_control.h"
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev #include "conformance_flowgraph.h"
2851c0b2f7Stbbdev 
2951c0b2f7Stbbdev //! \file conformance_async_node.cpp
3051c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev /*
3351c0b2f7Stbbdev TODO: implement missing conformance tests for async_node:
3451c0b2f7Stbbdev   - [ ] Write `test_forwarding()'.
3551c0b2f7Stbbdev   - [ ] Improve test of the node's copy-constructor.
3651c0b2f7Stbbdev   - [ ] Write `test_priority'.
3751c0b2f7Stbbdev   - [ ] Rename `test_discarding' to `test_buffering'.
3851c0b2f7Stbbdev   - [ ] Write inheritance test.
3951c0b2f7Stbbdev   - [ ] Constructor with explicitly passed Policy parameter.
4051c0b2f7Stbbdev   - [ ] Concurrency testing of the node: make a loop over possible concurrency levels. It is
41*49e08aacStbbdev     important to test at least on five values: 1, oneapi::tbb::flow::serial, `max_allowed_parallelism'
42*49e08aacStbbdev     obtained from `oneapi::tbb::global_control', `oneapi::tbb::flow::unlimited', and, if `max allowed
4351c0b2f7Stbbdev     parallelism' is > 2, use something in the middle of the [1, max_allowed_parallelism]
4451c0b2f7Stbbdev     interval. Use `utils::ExactConcurrencyLevel' entity (extending it if necessary).
4551c0b2f7Stbbdev   - [ ] Write `test_rejecting', where avoid dependency on OS scheduling of the threads; add check
4651c0b2f7Stbbdev     that `try_put()' returns `false'
4751c0b2f7Stbbdev   - [ ] The `copy_body' function copies altered body (e.g. after successful `try_put()' call).
4851c0b2f7Stbbdev   - [ ] The copy constructor and copy assignment are called for the node's input and output types.
4951c0b2f7Stbbdev   - [ ] Add CTAD test.
5051c0b2f7Stbbdev */
5151c0b2f7Stbbdev 
5251c0b2f7Stbbdev template<typename I, typename O>
5351c0b2f7Stbbdev void test_inheritance(){
54*49e08aacStbbdev     using namespace oneapi::tbb::flow;
5551c0b2f7Stbbdev 
5651c0b2f7Stbbdev     CHECK_MESSAGE( (std::is_base_of<graph_node, async_node<I, O>>::value), "async_node should be derived from graph_node");
5751c0b2f7Stbbdev     CHECK_MESSAGE( (std::is_base_of<receiver<I>, async_node<I, O>>::value), "async_node should be derived from receiver<Input>");
5851c0b2f7Stbbdev     CHECK_MESSAGE( (std::is_base_of<sender<O>, async_node<I, O>>::value), "async_node should be derived from sender<Output>");
5951c0b2f7Stbbdev }
6051c0b2f7Stbbdev 
6151c0b2f7Stbbdev template< typename OutputType >
6251c0b2f7Stbbdev struct as_inc_functor {
6351c0b2f7Stbbdev     std::thread my_thread;
6451c0b2f7Stbbdev 
6551c0b2f7Stbbdev     std::atomic<size_t>& local_execute_count;
6651c0b2f7Stbbdev 
6751c0b2f7Stbbdev     as_inc_functor(std::atomic<size_t>& execute_count ) :
6851c0b2f7Stbbdev         local_execute_count (execute_count)
6951c0b2f7Stbbdev     {  }
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev     as_inc_functor( const as_inc_functor &f ) : local_execute_count(f.local_execute_count) { }
7251c0b2f7Stbbdev     void operator=(const as_inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
7351c0b2f7Stbbdev 
74*49e08aacStbbdev     void operator()( int num , oneapi::tbb::flow::async_node<int, int>::gateway_type& g) {
7551c0b2f7Stbbdev         ++local_execute_count;
7651c0b2f7Stbbdev         g.try_put(num);
7751c0b2f7Stbbdev         //    my_thread = std::thread([&](){
7851c0b2f7Stbbdev         //                                g.try_put(num);
7951c0b2f7Stbbdev         //                            });
8051c0b2f7Stbbdev     }
8151c0b2f7Stbbdev 
8251c0b2f7Stbbdev };
8351c0b2f7Stbbdev 
8451c0b2f7Stbbdev void test_async_body(){
85*49e08aacStbbdev     oneapi::tbb::flow::graph g;
8651c0b2f7Stbbdev 
8751c0b2f7Stbbdev     std::atomic<size_t> local_count(0);
8851c0b2f7Stbbdev     as_inc_functor<int> fun(local_count);
8951c0b2f7Stbbdev 
90*49e08aacStbbdev     oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun);
9151c0b2f7Stbbdev 
9251c0b2f7Stbbdev     const size_t n = 10;
9351c0b2f7Stbbdev     for(size_t i = 0; i < n; ++i) {
9451c0b2f7Stbbdev         CHECK_MESSAGE((node1.try_put(1) == true), "try_put needs to return true");
9551c0b2f7Stbbdev     }
9651c0b2f7Stbbdev 
9751c0b2f7Stbbdev     //fun.my_thread.join();
9851c0b2f7Stbbdev     g.wait_for_all();
9951c0b2f7Stbbdev 
10051c0b2f7Stbbdev     CHECK_MESSAGE( (fun.local_execute_count.load() == n), "Body of the node needs to be executed N times");
10151c0b2f7Stbbdev }
10251c0b2f7Stbbdev 
10351c0b2f7Stbbdev void test_copy(){
104*49e08aacStbbdev     oneapi::tbb::flow::graph g;
10551c0b2f7Stbbdev     std::atomic<size_t> local_count(0);
10651c0b2f7Stbbdev     as_inc_functor<int> fun(local_count);
10751c0b2f7Stbbdev 
108*49e08aacStbbdev     oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun);
109*49e08aacStbbdev     oneapi::tbb::flow::async_node<int, int> node2(node1);
11051c0b2f7Stbbdev }
11151c0b2f7Stbbdev 
11251c0b2f7Stbbdev void test_priority(){
113*49e08aacStbbdev     oneapi::tbb::flow::graph g;
11451c0b2f7Stbbdev     std::atomic<size_t> local_count(0);
11551c0b2f7Stbbdev     as_inc_functor<int> fun(local_count);
11651c0b2f7Stbbdev 
117*49e08aacStbbdev     oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun, oneapi::tbb::flow::no_priority);
11851c0b2f7Stbbdev }
11951c0b2f7Stbbdev 
12051c0b2f7Stbbdev void test_discarding(){
121*49e08aacStbbdev     oneapi::tbb::flow::graph g;
12251c0b2f7Stbbdev 
12351c0b2f7Stbbdev     std::atomic<size_t> local_count(0);
12451c0b2f7Stbbdev     as_inc_functor<int> fun(local_count);
12551c0b2f7Stbbdev 
126*49e08aacStbbdev     oneapi::tbb::flow::async_node<int, int> node1(g, oneapi::tbb::flow::unlimited, fun);
12751c0b2f7Stbbdev 
128*49e08aacStbbdev     oneapi::tbb::flow::limiter_node< int > rejecter1( g,0);
129*49e08aacStbbdev     oneapi::tbb::flow::limiter_node< int > rejecter2( g,0);
13051c0b2f7Stbbdev 
13151c0b2f7Stbbdev     make_edge(node1, rejecter2);
13251c0b2f7Stbbdev     make_edge(node1, rejecter1);
13351c0b2f7Stbbdev 
13451c0b2f7Stbbdev     node1.try_put(1);
13551c0b2f7Stbbdev 
13651c0b2f7Stbbdev     int tmp = -1;
13751c0b2f7Stbbdev     CHECK_MESSAGE((node1.try_get(tmp) == false), "Value should be discarded after rejection");
13851c0b2f7Stbbdev 
13951c0b2f7Stbbdev     g.wait_for_all();
14051c0b2f7Stbbdev }
14151c0b2f7Stbbdev 
14251c0b2f7Stbbdev //! Test discarding property
14351c0b2f7Stbbdev //! \brief \ref requirement
14451c0b2f7Stbbdev TEST_CASE("async_node discarding") {
14551c0b2f7Stbbdev     test_discarding();
14651c0b2f7Stbbdev 
14751c0b2f7Stbbdev }
14851c0b2f7Stbbdev 
14951c0b2f7Stbbdev //! Test async_node priority interface
15051c0b2f7Stbbdev //! \brief \ref interface
15151c0b2f7Stbbdev TEST_CASE("async_node priority interface"){
15251c0b2f7Stbbdev     test_priority();
15351c0b2f7Stbbdev }
15451c0b2f7Stbbdev 
15551c0b2f7Stbbdev //! Test async_node copy
15651c0b2f7Stbbdev //! \brief \ref interface
15751c0b2f7Stbbdev TEST_CASE("async_node copy"){
15851c0b2f7Stbbdev     test_copy();
15951c0b2f7Stbbdev }
16051c0b2f7Stbbdev 
16151c0b2f7Stbbdev //! Test calling async body
16251c0b2f7Stbbdev //! \brief \ref interface \ref requirement
16351c0b2f7Stbbdev TEST_CASE("Test async_node body") {
16451c0b2f7Stbbdev     test_async_body();
16551c0b2f7Stbbdev }
16651c0b2f7Stbbdev 
16751c0b2f7Stbbdev //! Test async_node inheritance relations
16851c0b2f7Stbbdev //! \brief \ref interface
16951c0b2f7Stbbdev TEST_CASE("async_node superclasses"){
17051c0b2f7Stbbdev     test_inheritance<int, int>();
17151c0b2f7Stbbdev     test_inheritance<void*, float>();
17251c0b2f7Stbbdev }
173