xref: /oneTBB/test/tbb/test_function_node.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20 // parts in all of tests might make testing of the product, which is different from what is actually
21 // released.
22 #define __TBB_EXTRA_DEBUG 1
23 #include "tbb/flow_graph.h"
24 #include "tbb/spin_rw_mutex.h"
25 #include "tbb/global_control.h"
26 
27 #include "common/test.h"
28 #include "common/utils.h"
29 #include "common/graph_utils.h"
30 #include "common/test_follows_and_precedes_api.h"
31 
32 
33 //! \file test_function_node.cpp
34 //! \brief Test for [flow_graph.function_node] specification
35 
36 
37 #define N 100
38 #define MAX_NODES 4
39 
40 //! Performs test on function nodes with limited concurrency and buffering
41 /** These tests check:
42     1) that the number of executing copies never exceed the concurrency limit
43     2) that the node never rejects
44     3) that no items are lost
45     and 4) all of this happens even if there are multiple predecessors and successors
46 */
47 
48 template<typename IO>
49 struct pass_through {
50     IO operator()(const IO& i) { return i; }
51 };
52 
53 template< typename InputType, typename OutputType, typename Body >
54 void buffered_levels( size_t concurrency, Body body ) {
55 
56    // Do for lc = 1 to concurrency level
57    for ( size_t lc = 1; lc <= concurrency; ++lc ) {
58    tbb::flow::graph g;
59 
60    // Set the execute_counter back to zero in the harness
61    harness_graph_executor<InputType, OutputType>::execute_count = 0;
62    // Set the number of current executors to zero.
63    harness_graph_executor<InputType, OutputType>::current_executors = 0;
64    // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
65    harness_graph_executor<InputType, OutputType>::max_executors = lc;
66 
67    // Create the function_node with the appropriate concurrency level, and use default buffering
68    tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
69    tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
70 
71    // Create a vector of identical exe_nodes and pass_thrus
72    std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
73    std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
74    // Attach each pass_thru to its corresponding exe_node
75    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
76        tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
77    }
78 
79    // TODO: why the test is executed serially for the node pairs, not concurrently?
80    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
81    // For num_receivers = 1 to MAX_NODES
82        for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
83            // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
84            std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
85            for (size_t i = 0; i < num_receivers; i++) {
86                receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
87            }
88 
89            for (size_t r = 0; r < num_receivers; ++r ) {
90                tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
91            }
92 
93            // Do the test with varying numbers of senders
94            std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
95            for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
96                // Create num_senders senders, set there message limit each to N, and connect them to
97                // pass_thru_vec[node_idx]
98                senders.clear();
99                for (size_t s = 0; s < num_senders; ++s ) {
100                    senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
101                    senders.back()->my_limit = N;
102                    senders.back()->register_successor(pass_thru_vec[node_idx] );
103                }
104 
105                // Initialize the receivers so they know how many senders and messages to check for
106                for (size_t r = 0; r < num_receivers; ++r ) {
107                    receivers[r]->initialize_map( N, num_senders );
108                }
109 
110                // Do the test
111                utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
112                g.wait_for_all();
113 
114                // confirm that each sender was requested from N times
115                for (size_t s = 0; s < num_senders; ++s ) {
116                    size_t n = senders[s]->my_received;
117                    CHECK( n == N );
118                    CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] );
119                }
120                // validate the receivers
121                for (size_t r = 0; r < num_receivers; ++r ) {
122                    receivers[r]->validate();
123                }
124            }
125            for (size_t r = 0; r < num_receivers; ++r ) {
126                tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
127            }
128            CHECK( exe_vec[node_idx].try_put( InputType() ) == true );
129            g.wait_for_all();
130            for (size_t r = 0; r < num_receivers; ++r ) {
131                // since it's detached, nothing should have changed
132                receivers[r]->validate();
133            }
134 
135        } // for num_receivers
136     } // for node_idx
137     } // for concurrency level lc
138 }
139 
140 const size_t Offset = 123;
141 std::atomic<size_t> global_execute_count;
142 
143 struct inc_functor {
144 
145     std::atomic<size_t> local_execute_count;
146     inc_functor( ) { local_execute_count = 0; }
147     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
148     void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
149 
150     int operator()( int i ) {
151        ++global_execute_count;
152        ++local_execute_count;
153        return i;
154     }
155 
156 };
157 
158 template< typename InputType, typename OutputType >
159 void buffered_levels_with_copy( size_t concurrency ) {
160 
161     // Do for lc = 1 to concurrency level
162     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
163         tbb::flow::graph g;
164 
165         inc_functor cf;
166         cf.local_execute_count = Offset;
167         global_execute_count = Offset;
168 
169         tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
170 
171         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
172 
173             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
174             for (size_t i = 0; i < num_receivers; i++) {
175                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
176             }
177 
178             for (size_t r = 0; r < num_receivers; ++r ) {
179                 tbb::flow::make_edge( exe_node, *receivers[r] );
180             }
181 
182             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
183             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
184                 senders.clear();
185                 for (size_t s = 0; s < num_senders; ++s ) {
186                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
187                     senders.back()->my_limit = N;
188                     tbb::flow::make_edge( *senders.back(), exe_node );
189                 }
190 
191                 for (size_t r = 0; r < num_receivers; ++r ) {
192                     receivers[r]->initialize_map( N, num_senders );
193                 }
194 
195                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
196                 g.wait_for_all();
197 
198                 for (size_t s = 0; s < num_senders; ++s ) {
199                     size_t n = senders[s]->my_received;
200                     CHECK( n == N );
201                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
202                 }
203                 for (size_t r = 0; r < num_receivers; ++r ) {
204                     receivers[r]->validate();
205                 }
206             }
207             for (size_t r = 0; r < num_receivers; ++r ) {
208                 tbb::flow::remove_edge( exe_node, *receivers[r] );
209             }
210             CHECK( exe_node.try_put( InputType() ) == true );
211             g.wait_for_all();
212             for (size_t r = 0; r < num_receivers; ++r ) {
213                 receivers[r]->validate();
214             }
215         }
216 
217         // validate that the local body matches the global execute_count and both are correct
218         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
219         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
220         size_t global_count = global_execute_count;
221         size_t inc_count = body_copy.local_execute_count;
222         CHECK(global_count == expected_count);
223         CHECK(global_count == inc_count );
224         g.reset(tbb::flow::rf_reset_bodies);
225         body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
226         inc_count = body_copy.local_execute_count;
227         CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
228     }
229 }
230 
231 template< typename InputType, typename OutputType >
232 void run_buffered_levels( int c ) {
233     buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
234     buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
235     buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
236     buffered_levels_with_copy<InputType,OutputType>( c );
237 }
238 
239 
240 //! Performs test on executable nodes with limited concurrency
241 /** These tests check:
242     1) that the nodes will accepts puts up to the concurrency limit,
243     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
244     3) the nodes will receive puts from multiple successors simultaneously,
245     and 4) the nodes will send to multiple predecessors.
246     There is no checking of the contents of the messages for corruption.
247 */
248 
249 template< typename InputType, typename OutputType, typename Body >
250 void concurrency_levels( size_t concurrency, Body body ) {
251 
252     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
253         tbb::flow::graph g;
254 
255         // Set the execute_counter back to zero in the harness
256         harness_graph_executor<InputType, OutputType>::execute_count = 0;
257         // Set the number of current executors to zero.
258         harness_graph_executor<InputType, OutputType>::current_executors = 0;
259         // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
260         harness_graph_executor<InputType, OutputType>::max_executors = lc;
261 
262         typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
263         fnode_type exe_node( g, lc, body );
264 
265         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
266 
267             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
268             for (size_t i = 0; i < num_receivers; ++i) {
269                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
270             }
271 
272             for (size_t r = 0; r < num_receivers; ++r ) {
273                 tbb::flow::make_edge( exe_node, *receivers[r] );
274             }
275 
276             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
277 
278             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
279                 senders.clear();
280                 {
281                     // Exclusively lock m to prevent exe_node from finishing
282                     tbb::spin_rw_mutex::scoped_lock l(
283                         harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex
284                     );
285 
286                     // put to lc level, it will accept and then block at m
287                     for ( size_t c = 0 ; c < lc ; ++c ) {
288                         CHECK( exe_node.try_put( InputType() ) == true );
289                     }
290                     // it only accepts to lc level
291                     CHECK( exe_node.try_put( InputType() ) == false );
292 
293                     for (size_t s = 0; s < num_senders; ++s ) {
294                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
295                         // register a sender
296                         senders.back()->my_limit = N;
297                         exe_node.register_predecessor( *senders.back() );
298                     }
299 
300                 } // release lock at end of scope, setting the exe node free to continue
301                 // wait for graph to settle down
302                 g.wait_for_all();
303 
304                 // confirm that each sender was requested from N times
305                 for (size_t s = 0; s < num_senders; ++s ) {
306                     size_t n = senders[s]->my_received;
307                     CHECK( n == N );
308                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
309                 }
310                 // confirm that each receivers got N * num_senders + the initial lc puts
311                 for (size_t r = 0; r < num_receivers; ++r ) {
312                     size_t n = receivers[r]->my_count;
313                     CHECK( n == num_senders*N+lc );
314                     receivers[r]->my_count = 0;
315                 }
316             }
317             for (size_t r = 0; r < num_receivers; ++r ) {
318                 tbb::flow::remove_edge( exe_node, *receivers[r] );
319             }
320             CHECK( exe_node.try_put( InputType() ) == true );
321             g.wait_for_all();
322             for (size_t r = 0; r < num_receivers; ++r ) {
323                 CHECK( int(receivers[r]->my_count) == 0 );
324             }
325         }
326     }
327 }
328 
329 
330 template< typename InputType, typename OutputType >
331 void run_concurrency_levels( int c ) {
332     concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
333     concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
334     concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
335 }
336 
337 
338 struct empty_no_assign {
339    empty_no_assign() {}
340    empty_no_assign( int ) {}
341    operator int() { return 0; }
342 };
343 
344 template< typename InputType >
345 struct parallel_puts : private utils::NoAssign {
346 
347     tbb::flow::receiver< InputType > * const my_exe_node;
348 
349     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
350 
351     void operator()( int ) const  {
352         for ( int i = 0; i < N; ++i ) {
353             // the nodes will accept all puts
354             CHECK( my_exe_node->try_put( InputType() ) == true );
355         }
356     }
357 
358 };
359 
360 //! Performs test on executable nodes with unlimited concurrency
361 /** These tests check:
362     1) that the nodes will accept all puts
363     2) the nodes will receive puts from multiple predecessors simultaneously,
364     and 3) the nodes will send to multiple successors.
365     There is no checking of the contents of the messages for corruption.
366 */
367 
368 template< typename InputType, typename OutputType, typename Body >
369 void unlimited_concurrency( Body body ) {
370 
371     for (unsigned p = 1; p < 2*utils::MaxThread; ++p) {
372         tbb::flow::graph g;
373         tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
374 
375         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
376 
377             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
378             for (size_t i = 0; i < num_receivers; ++i) {
379                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
380             }
381 
382             harness_graph_executor<InputType, OutputType>::execute_count = 0;
383 
384             for (size_t r = 0; r < num_receivers; ++r ) {
385                 tbb::flow::make_edge( exe_node, *receivers[r] );
386             }
387 
388             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
389             g.wait_for_all();
390 
391             // 2) the nodes will receive puts from multiple predecessors simultaneously,
392             size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
393             CHECK( ec == p*N );
394             for (size_t r = 0; r < num_receivers; ++r ) {
395                 size_t c = receivers[r]->my_count;
396                 // 3) the nodes will send to multiple successors.
397                 CHECK( c == p*N );
398             }
399             for (size_t r = 0; r < num_receivers; ++r ) {
400                 tbb::flow::remove_edge( exe_node, *receivers[r] );
401             }
402             }
403         }
404     }
405 
406 template< typename InputType, typename OutputType >
407 void run_unlimited_concurrency() {
408     harness_graph_executor<InputType, OutputType>::max_executors = 0;
409     unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
410     unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
411     unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
412 }
413 
414 struct continue_msg_to_int {
415     int my_int;
416     continue_msg_to_int(int x) : my_int(x) {}
417     int operator()(tbb::flow::continue_msg) { return my_int; }
418 };
419 
420 void test_function_node_with_continue_msg_as_input() {
421     // If this function terminates, then this test is successful
422     tbb::flow::graph g;
423 
424     tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
425 
426     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
427     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
428 
429     tbb::flow::make_edge( Start, FN1 );
430     tbb::flow::make_edge( Start, FN2 );
431 
432     Start.try_put( tbb::flow::continue_msg() );
433     g.wait_for_all();
434 }
435 
436 //! Tests limited concurrency cases for nodes that accept data messages
437 void test_concurrency(int num_threads) {
438     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads);
439     run_concurrency_levels<int,int>(num_threads);
440     run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
441     run_buffered_levels<int, int>(num_threads);
442     run_unlimited_concurrency<int,int>();
443     run_unlimited_concurrency<int,empty_no_assign>();
444     run_unlimited_concurrency<empty_no_assign,int>();
445     run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
446     run_unlimited_concurrency<int,tbb::flow::continue_msg>();
447     run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
448     test_function_node_with_continue_msg_as_input();
449 }
450 
451 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
452 #include <array>
453 #include <vector>
454 void test_follows_and_precedes_api() {
455     using msg_t = tbb::flow::continue_msg;
456 
457     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
458     std::vector<msg_t> messages_for_precedes = { msg_t() };
459 
460     pass_through<msg_t> pass_msg;
461 
462     follows_and_precedes_testing::test_follows
463         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
464         (messages_for_follows, tbb::flow::unlimited, pass_msg);
465     follows_and_precedes_testing::test_precedes
466         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
467         (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1));
468 }
469 #endif
470 
471 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
472 
473 int function_body_f(const int&) { return 1; }
474 
475 template <typename Body>
476 void test_deduction_guides_common(Body body) {
477     using namespace tbb::flow;
478     graph g;
479 
480     function_node f1(g, unlimited, body);
481     static_assert(std::is_same_v<decltype(f1), function_node<int, int>>);
482 
483     function_node f2(g, unlimited, body, rejecting());
484     static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>);
485 
486     function_node f3(g, unlimited, body, node_priority_t(5));
487     static_assert(std::is_same_v<decltype(f3), function_node<int, int>>);
488 
489     function_node f4(g, unlimited, body, rejecting(), node_priority_t(5));
490     static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>);
491 
492 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
493     function_node f5(follows(f2), unlimited, body);
494     static_assert(std::is_same_v<decltype(f5), function_node<int, int>>);
495 
496     function_node f6(follows(f5), unlimited, body, rejecting());
497     static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>);
498 
499     function_node f7(follows(f6), unlimited, body, node_priority_t(5));
500     static_assert(std::is_same_v<decltype(f7), function_node<int, int>>);
501 
502     function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5));
503     static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>);
504 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
505 
506     function_node f9(f1);
507     static_assert(std::is_same_v<decltype(f9), function_node<int, int>>);
508 }
509 
510 void test_deduction_guides() {
511     test_deduction_guides_common([](const int&)->int { return 1; });
512     test_deduction_guides_common([](const int&) mutable ->int { return 1; });
513     test_deduction_guides_common(function_body_f);
514 }
515 
516 #endif
517 
518 //! Test various node bodies with concurrency
519 //! \brief \ref error_guessing
520 TEST_CASE("Concurrency test") {
521     for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) {
522         test_concurrency(p);
523     }
524 }
525 
526 //! NativeParallelFor testing with various concurrency settings
527 //! \brief \ref error_guessing
528 TEST_CASE("Lightweight testing"){
529    lightweight_testing::test<tbb::flow::function_node>(10);
530 }
531 
532 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
533 //! Test follows and precedes API
534 //! \brief \ref error_guessing
535 TEST_CASE("Flowgraph node set test"){
536      test_follows_and_precedes_api();
537 }
538 #endif
539 
540 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
541 //! Test decution guides
542 //! \brief \ref requirement
543 TEST_CASE("Deduction guides test"){
544      test_deduction_guides();
545 }
546 #endif
547 
548 //! try_release and try_consume test
549 //! \brief \ref error_guessing
550 TEST_CASE("try_release try_consume"){
551     tbb::flow::graph g;
552 
553     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;});
554 
555     CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node");
556     CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node");
557 }
558