xref: /oneTBB/test/tbb/test_function_node.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
24 // parts in all of tests might make testing of the product, which is different from what is actually
25 // released.
26 #define __TBB_EXTRA_DEBUG 1
27 #include "tbb/flow_graph.h"
28 #include "tbb/spin_rw_mutex.h"
29 #include "tbb/global_control.h"
30 
31 #include "common/test.h"
32 #include "common/utils.h"
33 #include "common/graph_utils.h"
34 #include "common/test_follows_and_precedes_api.h"
35 
36 
37 //! \file test_function_node.cpp
38 //! \brief Test for [flow_graph.function_node] specification
39 
40 
41 #define N 100
42 #define MAX_NODES 4
43 
44 //! Performs test on function nodes with limited concurrency and buffering
45 /** These tests check:
46     1) that the number of executing copies never exceed the concurrency limit
47     2) that the node never rejects
48     3) that no items are lost
49     and 4) all of this happens even if there are multiple predecessors and successors
50 */
51 
52 template<typename IO>
53 struct pass_through {
54     IO operator()(const IO& i) { return i; }
55 };
56 
57 template< typename InputType, typename OutputType, typename Body >
58 void buffered_levels( size_t concurrency, Body body ) {
59 
60    // Do for lc = 1 to concurrency level
61    for ( size_t lc = 1; lc <= concurrency; ++lc ) {
62    tbb::flow::graph g;
63 
64    // Set the execute_counter back to zero in the harness
65    harness_graph_executor<InputType, OutputType>::execute_count = 0;
66    // Set the number of current executors to zero.
67    harness_graph_executor<InputType, OutputType>::current_executors = 0;
68    // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
69    harness_graph_executor<InputType, OutputType>::max_executors = lc;
70 
71    // Create the function_node with the appropriate concurrency level, and use default buffering
72    tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
73    tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
74 
75    // Create a vector of identical exe_nodes and pass_thrus
76    std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
77    std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
78    // Attach each pass_thru to its corresponding exe_node
79    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
80        tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
81    }
82 
83    // TODO: why the test is executed serially for the node pairs, not concurrently?
84    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
85    // For num_receivers = 1 to MAX_NODES
86        for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
87            // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
88            std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
89            for (size_t i = 0; i < num_receivers; i++) {
90                receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
91            }
92 
93            for (size_t r = 0; r < num_receivers; ++r ) {
94                tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
95            }
96 
97            // Do the test with varying numbers of senders
98            std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
99            for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
100                // Create num_senders senders, set there message limit each to N, and connect them to
101                // pass_thru_vec[node_idx]
102                senders.clear();
103                for (size_t s = 0; s < num_senders; ++s ) {
104                    senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
105                    senders.back()->my_limit = N;
106                    senders.back()->register_successor(pass_thru_vec[node_idx] );
107                }
108 
109                // Initialize the receivers so they know how many senders and messages to check for
110                for (size_t r = 0; r < num_receivers; ++r ) {
111                    receivers[r]->initialize_map( N, num_senders );
112                }
113 
114                // Do the test
115                utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
116                g.wait_for_all();
117 
118                // confirm that each sender was requested from N times
119                for (size_t s = 0; s < num_senders; ++s ) {
120                    size_t n = senders[s]->my_received;
121                    CHECK( n == N );
122                    CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] );
123                }
124                // validate the receivers
125                for (size_t r = 0; r < num_receivers; ++r ) {
126                    receivers[r]->validate();
127                }
128            }
129            for (size_t r = 0; r < num_receivers; ++r ) {
130                tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
131            }
132            CHECK( exe_vec[node_idx].try_put( InputType() ) == true );
133            g.wait_for_all();
134            for (size_t r = 0; r < num_receivers; ++r ) {
135                // since it's detached, nothing should have changed
136                receivers[r]->validate();
137            }
138 
139        } // for num_receivers
140     } // for node_idx
141     } // for concurrency level lc
142 }
143 
144 const size_t Offset = 123;
145 std::atomic<size_t> global_execute_count;
146 
147 struct inc_functor {
148 
149     std::atomic<size_t> local_execute_count;
150     inc_functor( ) { local_execute_count = 0; }
151     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
152     void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
153 
154     int operator()( int i ) {
155        ++global_execute_count;
156        ++local_execute_count;
157        return i;
158     }
159 
160 };
161 
162 template< typename InputType, typename OutputType >
163 void buffered_levels_with_copy( size_t concurrency ) {
164 
165     // Do for lc = 1 to concurrency level
166     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
167         tbb::flow::graph g;
168 
169         inc_functor cf;
170         cf.local_execute_count = Offset;
171         global_execute_count = Offset;
172 
173         tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
174 
175         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
176 
177             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
178             for (size_t i = 0; i < num_receivers; i++) {
179                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
180             }
181 
182             for (size_t r = 0; r < num_receivers; ++r ) {
183                 tbb::flow::make_edge( exe_node, *receivers[r] );
184             }
185 
186             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
187             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
188                 senders.clear();
189                 for (size_t s = 0; s < num_senders; ++s ) {
190                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
191                     senders.back()->my_limit = N;
192                     tbb::flow::make_edge( *senders.back(), exe_node );
193                 }
194 
195                 for (size_t r = 0; r < num_receivers; ++r ) {
196                     receivers[r]->initialize_map( N, num_senders );
197                 }
198 
199                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
200                 g.wait_for_all();
201 
202                 for (size_t s = 0; s < num_senders; ++s ) {
203                     size_t n = senders[s]->my_received;
204                     CHECK( n == N );
205                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
206                 }
207                 for (size_t r = 0; r < num_receivers; ++r ) {
208                     receivers[r]->validate();
209                 }
210             }
211             for (size_t r = 0; r < num_receivers; ++r ) {
212                 tbb::flow::remove_edge( exe_node, *receivers[r] );
213             }
214             CHECK( exe_node.try_put( InputType() ) == true );
215             g.wait_for_all();
216             for (size_t r = 0; r < num_receivers; ++r ) {
217                 receivers[r]->validate();
218             }
219         }
220 
221         // validate that the local body matches the global execute_count and both are correct
222         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
223         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
224         size_t global_count = global_execute_count;
225         size_t inc_count = body_copy.local_execute_count;
226         CHECK(global_count == expected_count);
227         CHECK(global_count == inc_count );
228         g.reset(tbb::flow::rf_reset_bodies);
229         body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
230         inc_count = body_copy.local_execute_count;
231         CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
232     }
233 }
234 
235 template< typename InputType, typename OutputType >
236 void run_buffered_levels( int c ) {
237     buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
238     buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
239     buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
240     buffered_levels_with_copy<InputType,OutputType>( c );
241 }
242 
243 
244 //! Performs test on executable nodes with limited concurrency
245 /** These tests check:
246     1) that the nodes will accepts puts up to the concurrency limit,
247     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
248     3) the nodes will receive puts from multiple successors simultaneously,
249     and 4) the nodes will send to multiple predecessors.
250     There is no checking of the contents of the messages for corruption.
251 */
252 
253 template< typename InputType, typename OutputType, typename Body >
254 void concurrency_levels( size_t concurrency, Body body ) {
255 
256     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
257         tbb::flow::graph g;
258 
259         // Set the execute_counter back to zero in the harness
260         harness_graph_executor<InputType, OutputType>::execute_count = 0;
261         // Set the number of current executors to zero.
262         harness_graph_executor<InputType, OutputType>::current_executors = 0;
263         // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
264         harness_graph_executor<InputType, OutputType>::max_executors = lc;
265 
266         typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
267         fnode_type exe_node( g, lc, body );
268 
269         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
270 
271             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
272             for (size_t i = 0; i < num_receivers; ++i) {
273                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
274             }
275 
276             for (size_t r = 0; r < num_receivers; ++r ) {
277                 tbb::flow::make_edge( exe_node, *receivers[r] );
278             }
279 
280             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
281 
282             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
283                 senders.clear();
284                 {
285                     // Exclusively lock m to prevent exe_node from finishing
286                     tbb::spin_rw_mutex::scoped_lock l(
287                         harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex
288                     );
289 
290                     // put to lc level, it will accept and then block at m
291                     for ( size_t c = 0 ; c < lc ; ++c ) {
292                         CHECK( exe_node.try_put( InputType() ) == true );
293                     }
294                     // it only accepts to lc level
295                     CHECK( exe_node.try_put( InputType() ) == false );
296 
297                     for (size_t s = 0; s < num_senders; ++s ) {
298                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
299                         // register a sender
300                         senders.back()->my_limit = N;
301                         exe_node.register_predecessor( *senders.back() );
302                     }
303 
304                 } // release lock at end of scope, setting the exe node free to continue
305                 // wait for graph to settle down
306                 g.wait_for_all();
307 
308                 // confirm that each sender was requested from N times
309                 for (size_t s = 0; s < num_senders; ++s ) {
310                     size_t n = senders[s]->my_received;
311                     CHECK( n == N );
312                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
313                 }
314                 // confirm that each receivers got N * num_senders + the initial lc puts
315                 for (size_t r = 0; r < num_receivers; ++r ) {
316                     size_t n = receivers[r]->my_count;
317                     CHECK( n == num_senders*N+lc );
318                     receivers[r]->my_count = 0;
319                 }
320             }
321             for (size_t r = 0; r < num_receivers; ++r ) {
322                 tbb::flow::remove_edge( exe_node, *receivers[r] );
323             }
324             CHECK( exe_node.try_put( InputType() ) == true );
325             g.wait_for_all();
326             for (size_t r = 0; r < num_receivers; ++r ) {
327                 CHECK( int(receivers[r]->my_count) == 0 );
328             }
329         }
330     }
331 }
332 
333 
334 template< typename InputType, typename OutputType >
335 void run_concurrency_levels( int c ) {
336     concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
337     concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
338     concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
339 }
340 
341 
342 struct empty_no_assign {
343    empty_no_assign() {}
344    empty_no_assign( int ) {}
345    operator int() { return 0; }
346 };
347 
348 template< typename InputType >
349 struct parallel_puts : private utils::NoAssign {
350 
351     tbb::flow::receiver< InputType > * const my_exe_node;
352 
353     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
354 
355     void operator()( int ) const  {
356         for ( int i = 0; i < N; ++i ) {
357             // the nodes will accept all puts
358             CHECK( my_exe_node->try_put( InputType() ) == true );
359         }
360     }
361 
362 };
363 
364 //! Performs test on executable nodes with unlimited concurrency
365 /** These tests check:
366     1) that the nodes will accept all puts
367     2) the nodes will receive puts from multiple predecessors simultaneously,
368     and 3) the nodes will send to multiple successors.
369     There is no checking of the contents of the messages for corruption.
370 */
371 
372 template< typename InputType, typename OutputType, typename Body >
373 void unlimited_concurrency( Body body ) {
374 
375     for (unsigned p = 1; p < 2*utils::MaxThread; ++p) {
376         tbb::flow::graph g;
377         tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
378 
379         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
380 
381             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
382             for (size_t i = 0; i < num_receivers; ++i) {
383                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
384             }
385 
386             harness_graph_executor<InputType, OutputType>::execute_count = 0;
387 
388             for (size_t r = 0; r < num_receivers; ++r ) {
389                 tbb::flow::make_edge( exe_node, *receivers[r] );
390             }
391 
392             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
393             g.wait_for_all();
394 
395             // 2) the nodes will receive puts from multiple predecessors simultaneously,
396             size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
397             CHECK( ec == p*N );
398             for (size_t r = 0; r < num_receivers; ++r ) {
399                 size_t c = receivers[r]->my_count;
400                 // 3) the nodes will send to multiple successors.
401                 CHECK( c == p*N );
402             }
403             for (size_t r = 0; r < num_receivers; ++r ) {
404                 tbb::flow::remove_edge( exe_node, *receivers[r] );
405             }
406             }
407         }
408     }
409 
410 template< typename InputType, typename OutputType >
411 void run_unlimited_concurrency() {
412     harness_graph_executor<InputType, OutputType>::max_executors = 0;
413     unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
414     unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
415     unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
416 }
417 
418 struct continue_msg_to_int {
419     int my_int;
420     continue_msg_to_int(int x) : my_int(x) {}
421     int operator()(tbb::flow::continue_msg) { return my_int; }
422 };
423 
424 void test_function_node_with_continue_msg_as_input() {
425     // If this function terminates, then this test is successful
426     tbb::flow::graph g;
427 
428     tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
429 
430     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
431     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
432 
433     tbb::flow::make_edge( Start, FN1 );
434     tbb::flow::make_edge( Start, FN2 );
435 
436     Start.try_put( tbb::flow::continue_msg() );
437     g.wait_for_all();
438 }
439 
440 //! Tests limited concurrency cases for nodes that accept data messages
441 void test_concurrency(int num_threads) {
442     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads);
443     run_concurrency_levels<int,int>(num_threads);
444     run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
445     run_buffered_levels<int, int>(num_threads);
446     run_unlimited_concurrency<int,int>();
447     run_unlimited_concurrency<int,empty_no_assign>();
448     run_unlimited_concurrency<empty_no_assign,int>();
449     run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
450     run_unlimited_concurrency<int,tbb::flow::continue_msg>();
451     run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
452     test_function_node_with_continue_msg_as_input();
453 }
454 
455 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
456 #include <array>
457 #include <vector>
458 void test_follows_and_precedes_api() {
459     using msg_t = tbb::flow::continue_msg;
460 
461     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
462     std::vector<msg_t> messages_for_precedes = { msg_t() };
463 
464     pass_through<msg_t> pass_msg;
465 
466     follows_and_precedes_testing::test_follows
467         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
468         (messages_for_follows, tbb::flow::unlimited, pass_msg);
469     follows_and_precedes_testing::test_precedes
470         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
471         (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1));
472 }
473 #endif
474 
475 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
476 
477 int function_body_f(const int&) { return 1; }
478 
479 template <typename Body>
480 void test_deduction_guides_common(Body body) {
481     using namespace tbb::flow;
482     graph g;
483 
484     function_node f1(g, unlimited, body);
485     static_assert(std::is_same_v<decltype(f1), function_node<int, int>>);
486 
487     function_node f2(g, unlimited, body, rejecting());
488     static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>);
489 
490     function_node f3(g, unlimited, body, node_priority_t(5));
491     static_assert(std::is_same_v<decltype(f3), function_node<int, int>>);
492 
493     function_node f4(g, unlimited, body, rejecting(), node_priority_t(5));
494     static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>);
495 
496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
497     function_node f5(follows(f2), unlimited, body);
498     static_assert(std::is_same_v<decltype(f5), function_node<int, int>>);
499 
500     function_node f6(follows(f5), unlimited, body, rejecting());
501     static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>);
502 
503     function_node f7(follows(f6), unlimited, body, node_priority_t(5));
504     static_assert(std::is_same_v<decltype(f7), function_node<int, int>>);
505 
506     function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5));
507     static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>);
508 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
509 
510     function_node f9(f1);
511     static_assert(std::is_same_v<decltype(f9), function_node<int, int>>);
512 }
513 
514 void test_deduction_guides() {
515     test_deduction_guides_common([](const int&)->int { return 1; });
516     test_deduction_guides_common([](const int&) mutable ->int { return 1; });
517     test_deduction_guides_common(function_body_f);
518 }
519 
520 #endif
521 
522 //! Test various node bodies with concurrency
523 //! \brief \ref error_guessing
524 TEST_CASE("Concurrency test") {
525     for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) {
526         test_concurrency(p);
527     }
528 }
529 
530 //! NativeParallelFor testing with various concurrency settings
531 //! \brief \ref error_guessing
532 TEST_CASE("Lightweight testing"){
533    lightweight_testing::test<tbb::flow::function_node>(10);
534 }
535 
536 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
537 //! Test follows and precedes API
538 //! \brief \ref error_guessing
539 TEST_CASE("Flowgraph node set test"){
540      test_follows_and_precedes_api();
541 }
542 #endif
543 
544 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
545 //! Test decution guides
546 //! \brief \ref requirement
547 TEST_CASE("Deduction guides test"){
548      test_deduction_guides();
549 }
550 #endif
551 
552 //! try_release and try_consume test
553 //! \brief \ref error_guessing
554 TEST_CASE("try_release try_consume"){
555     tbb::flow::graph g;
556 
557     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;});
558 
559     CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node");
560     CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node");
561 }
562