Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
flow_graph.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_flow_graph_H
18#define __TBB_flow_graph_H
19
20#define __TBB_flow_graph_H_include_area
22
23#include "tbb_stddef.h"
24#include "atomic.h"
25#include "spin_mutex.h"
26#include "null_mutex.h"
27#include "spin_rw_mutex.h"
28#include "null_rw_mutex.h"
29#include "task.h"
31#include "tbb_exception.h"
32#include "pipeline.h"
36#include "tbb_profiling.h"
37#include "task_arena.h"
38
39#if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
40 #if __INTEL_COMPILER
41 // Disabled warning "routine is both inline and noinline"
42 #pragma warning (push)
43 #pragma warning( disable: 2196 )
44 #endif
45 #define __TBB_NOINLINE_SYM __attribute__((noinline))
46#else
47 #define __TBB_NOINLINE_SYM
48#endif
49
50#if __TBB_PREVIEW_ASYNC_MSG
51#include <vector> // std::vector in internal::async_storage
52#include <memory> // std::shared_ptr in async_msg
53#endif
54
55#if __TBB_PREVIEW_STREAMING_NODE
56// For streaming_node
57#include <array> // std::array
58#include <unordered_map> // std::unordered_map
59#include <type_traits> // std::decay, std::true_type, std::false_type
60#endif // __TBB_PREVIEW_STREAMING_NODE
61
62#if TBB_DEPRECATED_FLOW_ENQUEUE
63#define FLOW_SPAWN(a) tbb::task::enqueue((a))
64#else
65#define FLOW_SPAWN(a) tbb::task::spawn((a))
66#endif
67
68#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
69#define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
70#else
71#define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
72#endif
73
74// use the VC10 or gcc version of tuple if it is available.
75#if __TBB_CPP11_TUPLE_PRESENT
76 #include <tuple>
77namespace tbb {
78 namespace flow {
79 using std::tuple;
80 using std::tuple_size;
81 using std::tuple_element;
82 using std::get;
83 }
84}
85#else
86 #include "compat/tuple"
87#endif
88
89#include<list>
90#include<queue>
91
102namespace tbb {
103namespace flow {
104
106enum concurrency { unlimited = 0, serial = 1 };
107
108namespace interface11 {
109
111struct null_type {};
112
115
117template< typename T > class sender;
118template< typename T > class receiver;
120
121template< typename T, typename U > class limiter_node; // needed for resetting decrementer
122
123template< typename R, typename B > class run_and_put_task;
124
125namespace internal {
126
127template<typename T, typename M> class successor_cache;
128template<typename T, typename M> class broadcast_cache;
129template<typename T, typename M> class round_robin_cache;
130template<typename T, typename M> class predecessor_cache;
131template<typename T, typename M> class reservable_predecessor_cache;
132
133#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
134namespace order {
135struct following;
136struct preceding;
137}
138template<typename Order, typename... Args> struct node_set;
139#endif
140
141#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
142// Holder of edges both for caches and for those nodes which do not have predecessor caches.
143// C == receiver< ... > or sender< ... >, depending.
144template<typename C>
145class edge_container {
146
147public:
148 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
149
150 void add_edge(C &s) {
151 built_edges.push_back(&s);
152 }
153
154 void delete_edge(C &s) {
155 for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
156 if (*i == &s) {
157 (void)built_edges.erase(i);
158 return; // only remove one predecessor per request
159 }
160 }
161 }
162
163 void copy_edges(edge_list_type &v) {
164 v = built_edges;
165 }
166
167 size_t edge_count() {
168 return (size_t)(built_edges.size());
169 }
170
171 void clear() {
172 built_edges.clear();
173 }
174
175 // methods remove the statement from all predecessors/successors liste in the edge
176 // container.
177 template< typename S > void sender_extract(S &s);
178 template< typename R > void receiver_extract(R &r);
179
180private:
181 edge_list_type built_edges;
182}; // class edge_container
183#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
184
185} // namespace internal
186
187} // namespace interfaceX
188} // namespace flow
189} // namespace tbb
190
193
194namespace tbb {
195namespace flow {
196namespace interface11 {
197
198// enqueue left task if necessary. Returns the non-enqueued task if there is one.
199static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
200 // if no RHS task, don't change left.
201 if (right == NULL) return left;
202 // right != NULL
203 if (left == NULL) return right;
204 if (left == SUCCESSFULLY_ENQUEUED) return right;
205 // left contains a task
206 if (right != SUCCESSFULLY_ENQUEUED) {
207 // both are valid tasks
209 return right;
210 }
211 return left;
212}
213
214#if __TBB_PREVIEW_ASYNC_MSG
215
216template < typename T > class __TBB_DEPRECATED async_msg;
217
218namespace internal {
219
220template < typename T > class async_storage;
221
222template< typename T, typename = void >
224 typedef async_msg<T> async_type;
225 typedef T filtered_type;
226
227 static const bool is_async_type = false;
228
229 static const void* to_void_ptr(const T& t) {
230 return static_cast<const void*>(&t);
231 }
232
233 static void* to_void_ptr(T& t) {
234 return static_cast<void*>(&t);
235 }
236
237 static const T& from_void_ptr(const void* p) {
238 return *static_cast<const T*>(p);
239 }
240
241 static T& from_void_ptr(void* p) {
242 return *static_cast<T*>(p);
243 }
244
245 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
246 if (is_async) {
247 // This (T) is NOT async and incoming 'A<X> t' IS async
248 // Get data from async_msg
249 const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
250 task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
251 // finalize() must be called after subscribe() because set() can be called in finalize()
252 // and 'this_recv' client must be subscribed by this moment
253 msg.finalize();
254 return new_task;
255 }
256 else {
257 // Incoming 't' is NOT async
258 return this_recv->try_put_task(from_void_ptr(p));
259 }
260 }
261};
262
263template< typename T >
264struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
265 typedef T async_type;
266 typedef typename T::async_msg_data_type filtered_type;
267
268 static const bool is_async_type = true;
269
270 // Receiver-classes use const interfaces
271 static const void* to_void_ptr(const T& t) {
272 return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
273 }
274
275 static void* to_void_ptr(T& t) {
276 return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
277 }
278
279 // Sender-classes use non-const interfaces
280 static const T& from_void_ptr(const void* p) {
281 return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
282 }
283
284 static T& from_void_ptr(void* p) {
285 return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
286 }
287
288 // Used in receiver<T> class
289 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
290 if (is_async) {
291 // Both are async
292 return this_recv->try_put_task(from_void_ptr(p));
293 }
294 else {
295 // This (T) is async and incoming 'X t' is NOT async
296 // Create async_msg for X
298 const T msg(t);
299 return this_recv->try_put_task(msg);
300 }
301 }
302};
303
304class untyped_receiver;
305
307 template< typename, typename > friend class internal::predecessor_cache;
308 template< typename, typename > friend class internal::reservable_predecessor_cache;
309public:
312
313 virtual ~untyped_sender() {}
314
315 // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
316
317 // TODO: Prevent untyped successor registration
318
320 virtual bool register_successor( successor_type &r ) = 0;
321
323 virtual bool remove_successor( successor_type &r ) = 0;
324
326 virtual bool try_release( ) { return false; }
327
329 virtual bool try_consume( ) { return false; }
330
331#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
333 typedef internal::edge_container<successor_type> built_successors_type;
334 typedef built_successors_type::edge_list_type successor_list_type;
335 virtual built_successors_type &built_successors() = 0;
336 virtual void internal_add_built_successor( successor_type & ) = 0;
337 virtual void internal_delete_built_successor( successor_type & ) = 0;
338 virtual void copy_successors( successor_list_type &) = 0;
339 virtual size_t successor_count() = 0;
340#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
341protected:
343 template< typename X >
344 bool try_get( X &t ) {
346 }
347
349 template< typename X >
350 bool try_reserve( X &t ) {
352 }
353
354 virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
355 virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
356};
357
359 template< typename, typename > friend class run_and_put_task;
360
361 template< typename, typename > friend class internal::broadcast_cache;
362 template< typename, typename > friend class internal::round_robin_cache;
363 template< typename, typename > friend class internal::successor_cache;
364
365#if __TBB_PREVIEW_OPENCL_NODE
366 template< typename, typename > friend class proxy_dependency_receiver;
367#endif /* __TBB_PREVIEW_OPENCL_NODE */
368public:
371
373 virtual ~untyped_receiver() {}
374
376 template<typename X>
377 bool try_put(const X& t) {
378 task *res = try_put_task(t);
379 if (!res) return false;
381 return true;
382 }
383
384 // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
385
386 // TODO: Prevent untyped predecessor registration
387
389 virtual bool register_predecessor( predecessor_type & ) { return false; }
390
392 virtual bool remove_predecessor( predecessor_type & ) { return false; }
393
394#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
395 typedef internal::edge_container<predecessor_type> built_predecessors_type;
396 typedef built_predecessors_type::edge_list_type predecessor_list_type;
397 virtual built_predecessors_type &built_predecessors() = 0;
398 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
399 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
400 virtual void copy_predecessors( predecessor_list_type & ) = 0;
401 virtual size_t predecessor_count() = 0;
402#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
403protected:
404 template<typename X>
405 task *try_put_task(const X& t) {
407 }
408
409 virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
410
411 virtual graph& graph_reference() const = 0;
412
413 // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
414
417
418 virtual bool is_continue_receiver() { return false; }
419};
420
421} // namespace internal
422
424template< typename T >
426public:
429
431
433 virtual bool try_get( T & ) { return false; }
434
436 virtual bool try_reserve( T & ) { return false; }
437
438protected:
439 virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
440 // Both async OR both are NOT async
443 }
444 // Else: this (T) is async OR incoming 't' is async
445 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
446 return false;
447 }
448
449 virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
450 // Both async OR both are NOT async
453 }
454 // Else: this (T) is async OR incoming 't' is async
455 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
456 return false;
457 }
458}; // class sender<T>
459
461template< typename T >
463 template< typename > friend class internal::async_storage;
464 template< typename, typename > friend struct internal::async_helpers;
465public:
468
470
474 }
475
478 }
479
480protected:
481 virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
483 }
484
486 virtual task *try_put_task(const T& t) = 0;
487
488}; // class receiver<T>
489
490#else // __TBB_PREVIEW_ASYNC_MSG
491
493template< typename T >
494class sender {
495public:
498
501
502 virtual ~sender() {}
503
504 // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
505
508
510 __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
511
513 virtual bool try_get( T & ) { return false; }
514
516 virtual bool try_reserve( T & ) { return false; }
517
519 virtual bool try_release( ) { return false; }
520
522 virtual bool try_consume( ) { return false; }
523
524#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
526 __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
527 __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
528 __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
529 __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
530 __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
531 __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
532 __TBB_DEPRECATED virtual size_t successor_count() = 0;
533#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
534}; // class sender<T>
535
537template< typename T >
538class receiver {
539public:
542
544 __TBB_DEPRECATED typedef sender<T> predecessor_type;
545
547 virtual ~receiver() {}
548
550 bool try_put( const T& t ) {
551 task *res = try_put_task(t);
552 if (!res) return false;
554 return true;
555 }
556
558protected:
559 template< typename R, typename B > friend class run_and_put_task;
560 template< typename X, typename Y > friend class internal::broadcast_cache;
561 template< typename X, typename Y > friend class internal::round_robin_cache;
562 virtual task *try_put_task(const T& t) = 0;
563 virtual graph& graph_reference() const = 0;
564public:
565 // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
566
568 __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
569
571 __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
572
573#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
574 __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
575 __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
576 __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
577 __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
578 __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
579 __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
580 __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
581#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
582
583protected:
585 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
586
587 template<typename TT, typename M> friend class internal::successor_cache;
588 virtual bool is_continue_receiver() { return false; }
589
590#if __TBB_PREVIEW_OPENCL_NODE
591 template< typename, typename > friend class proxy_dependency_receiver;
592#endif /* __TBB_PREVIEW_OPENCL_NODE */
593}; // class receiver<T>
594
595#endif // __TBB_PREVIEW_ASYNC_MSG
596
598
599class continue_receiver : public receiver< continue_msg > {
600public:
601
604
607
610 __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
611 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
613 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
614 }
615
620 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
621 }
622
627 return true;
628 }
629
631
637 return true;
638 }
639
640#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
641 __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
642 __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
643 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
644
645 __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
647 my_built_predecessors.add_edge( s );
648 }
649
650 __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
652 my_built_predecessors.delete_edge(s);
653 }
654
655 __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
657 my_built_predecessors.copy_edges(v);
658 }
659
660 __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
662 return my_built_predecessors.edge_count();
663 }
664
665#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
666
667protected:
668 template< typename R, typename B > friend class run_and_put_task;
669 template<typename X, typename Y> friend class internal::broadcast_cache;
670 template<typename X, typename Y> friend class internal::round_robin_cache;
671 // execute body is supposed to be too small to create a task for.
673 {
677 else
679 }
680 task * res = execute();
681 return res? res : SUCCESSFULLY_ENQUEUED;
682 }
683
684#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
685 // continue_receiver must contain its own built_predecessors because it does
686 // not have a node_cache.
687 built_predecessors_type my_built_predecessors;
688#endif
694 // the friend declaration in the base class did not eliminate the "protected class"
695 // error in gcc 4.1.2
696 template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
697
700 if (f & rf_clear_edges) {
701#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
702 my_built_predecessors.clear();
703#endif
705 }
706 }
707
709
711 virtual task * execute() = 0;
712 template<typename TT, typename M> friend class internal::successor_cache;
713 bool is_continue_receiver() __TBB_override { return true; }
714
715}; // class continue_receiver
716
717} // interfaceX
718
719#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
720 template <typename K, typename T>
721 K key_from_message( const T &t ) {
722 return t.key();
723 }
724#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
725
726 using interface11::sender;
727 using interface11::receiver;
728 using interface11::continue_receiver;
729} // flow
730} // tbb
731
734
735namespace tbb {
736namespace flow {
737namespace interface11 {
738
742#if __TBB_PREVIEW_ASYNC_MSG
744#endif
746
747template <typename C, typename N>
748graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
749{
750 if (begin) current_node = my_graph->my_nodes;
751 //else it is an end iterator by default
752}
753
754template <typename C, typename N>
756 __TBB_ASSERT(current_node, "graph_iterator at end");
757 return *operator->();
758}
759
760template <typename C, typename N>
762 return current_node;
763}
764
765template <typename C, typename N>
767 if (current_node) current_node = current_node->next;
768}
769
770} // namespace interfaceX
771
772namespace interface10 {
774inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
776 own_context = true;
777 cancelled = false;
778 caught_exception = false;
779 my_context = new task_group_context(tbb::internal::FLOW_TASKS);
783 my_is_active = true;
784}
785
786inline graph::graph(task_group_context& use_this_context) :
787 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
789 own_context = false;
790 cancelled = false;
791 caught_exception = false;
795 my_is_active = true;
796}
797
799 wait_for_all();
801 tbb::task::destroy(*my_root_task);
802 if (own_context) delete my_context;
803 delete my_task_arena;
804}
805
806inline void graph::reserve_wait() {
807 if (my_root_task) {
810 }
811}
812
813inline void graph::release_wait() {
814 if (my_root_task) {
817 }
818}
819
821 n->next = NULL;
822 {
824 n->prev = my_nodes_last;
826 my_nodes_last = n;
827 if (!my_nodes) my_nodes = n;
828 }
829}
830
832 {
834 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
835 if (n->prev) n->prev->next = n->next;
836 if (n->next) n->next->prev = n->prev;
837 if (my_nodes_last == n) my_nodes_last = n->prev;
838 if (my_nodes == n) my_nodes = n->next;
839 }
840 n->prev = n->next = NULL;
841}
842
844 // reset context
846
848 cancelled = false;
849 caught_exception = false;
850 // reset all the nodes comprising the graph
851 for(iterator ii = begin(); ii != end(); ++ii) {
853 my_p->reset_node(f);
854 }
855 // Reattach the arena. Might be useful to run the graph in a particular task_arena
856 // while not limiting graph lifetime to a single task_arena::execute() call.
857 prepare_task_arena( /*reinit=*/true );
859 // now spawn the tasks necessary to start the graph
860 for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
862 }
863 my_reset_task_list.clear();
864}
865
866inline graph::iterator graph::begin() { return iterator(this, true); }
867
868inline graph::iterator graph::end() { return iterator(this, false); }
869
870inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
871
872inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
873
874inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
875
876inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
877
878#if TBB_PREVIEW_FLOW_GRAPH_TRACE
879inline void graph::set_name(const char *name) {
881}
882#endif
883
884} // namespace interface10
885
886namespace interface11 {
887
888inline graph_node::graph_node(graph& g) : my_graph(g) {
890}
891
893 my_graph.remove_node(this);
894}
895
897
898#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
899using internal::node_set;
900#endif
901
903template < typename Output >
904class input_node : public graph_node, public sender< Output > {
905public:
907 typedef Output output_type;
908
911
912 //Source node has no input type
914
915#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
916 typedef typename sender<output_type>::built_successors_type built_successors_type;
917 typedef typename sender<output_type>::successor_list_type successor_list_type;
918#endif
919
921 template< typename Body >
923 : graph_node(g), my_active(false),
924 my_body( new internal::input_body_leaf< output_type, Body>(body) ),
925 my_init_body( new internal::input_body_leaf< output_type, Body>(body) ),
926 my_reserved(false), my_has_cached_item(false)
927 {
928 my_successors.set_owner(this);
929 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
930 static_cast<sender<output_type> *>(this), this->my_body );
931 }
932
933#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
934 template <typename Body, typename... Successors>
935 input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
936 : input_node(successors.graph_reference(), body) {
937 make_edges(*this, successors);
938 }
939#endif
940
943 graph_node(src.my_graph), sender<Output>(),
944 my_active(false),
945 my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
946 my_reserved(false), my_has_cached_item(false)
947 {
948 my_successors.set_owner(this);
949 tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
950 static_cast<sender<output_type> *>(this), this->my_body );
951 }
952
954 ~input_node() { delete my_body; delete my_init_body; }
955
956#if TBB_PREVIEW_FLOW_GRAPH_TRACE
957 void set_name( const char *name ) __TBB_override {
959 }
960#endif
961
965 my_successors.register_successor(r);
966 if ( my_active )
967 spawn_put();
968 return true;
969 }
970
974 my_successors.remove_successor(r);
975 return true;
976 }
977
978#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
979
980 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
981
982 void internal_add_built_successor( successor_type &r) __TBB_override {
984 my_successors.internal_add_built_successor(r);
985 }
986
987 void internal_delete_built_successor( successor_type &r) __TBB_override {
989 my_successors.internal_delete_built_successor(r);
990 }
991
992 size_t successor_count() __TBB_override {
994 return my_successors.successor_count();
995 }
996
997 void copy_successors(successor_list_type &v) __TBB_override {
999 my_successors.copy_successors(v);
1000 }
1001#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1002
1006 if ( my_reserved )
1007 return false;
1008
1009 if ( my_has_cached_item ) {
1010 v = my_cached_item;
1011 my_has_cached_item = false;
1012 return true;
1013 }
1014 // we've been asked to provide an item, but we have none. enqueue a task to
1015 // provide one.
1016 if ( my_active )
1017 spawn_put();
1018 return false;
1019 }
1020
1024 if ( my_reserved ) {
1025 return false;
1026 }
1027
1028 if ( my_has_cached_item ) {
1029 v = my_cached_item;
1030 my_reserved = true;
1031 return true;
1032 } else {
1033 return false;
1034 }
1035 }
1036
1038
1041 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1042 my_reserved = false;
1043 if(!my_successors.empty())
1044 spawn_put();
1045 return true;
1046 }
1047
1051 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1052 my_reserved = false;
1053 my_has_cached_item = false;
1054 if ( !my_successors.empty() ) {
1055 spawn_put();
1056 }
1057 return true;
1058 }
1059
1061 void activate() {
1063 my_active = true;
1064 if (!my_successors.empty())
1065 spawn_put();
1066 }
1067
1068 template<typename Body>
1070 internal::input_body<output_type> &body_ref = *this->my_body;
1071 return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
1072 }
1073
1074#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1075 void extract( ) __TBB_override {
1076 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1077 my_active = false;
1078 my_reserved = false;
1080 }
1081#endif
1082
1083protected:
1084
1087 my_active = false;
1088 my_reserved = false;
1089 my_has_cached_item = false;
1090
1091 if(f & rf_clear_edges) my_successors.clear();
1092 if(f & rf_reset_bodies) {
1094 delete my_body;
1095 my_body = tmp;
1096 }
1097 }
1098
1099private:
1108
1109 // used by apply_body_bypass, can invoke body of node.
1112 if ( my_reserved ) {
1113 return false;
1114 }
1115 if ( !my_has_cached_item ) {
1117
1118#if TBB_DEPRECATED_INPUT_NODE_BODY
1119 bool r = (*my_body)(my_cached_item);
1120 if (r) {
1121 my_has_cached_item = true;
1122 }
1123#else
1124 flow_control control;
1125 my_cached_item = (*my_body)(control);
1127#endif
1129 }
1130 if ( my_has_cached_item ) {
1131 v = my_cached_item;
1132 my_reserved = true;
1133 return true;
1134 } else {
1135 return false;
1136 }
1137 }
1138
1140 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1142 }
1143
1145 void spawn_put( ) {
1148 }
1149 }
1150
1154 output_type v;
1155 if ( !try_reserve_apply_body(v) )
1156 return NULL;
1157
1158 task *last_task = my_successors.try_put_task(v);
1159 if ( last_task )
1160 try_consume();
1161 else
1162 try_release();
1163 return last_task;
1164 }
1165}; // class input_node
1166
1167#if TBB_USE_SOURCE_NODE_AS_ALIAS
1168template < typename Output >
1169class source_node : public input_node <Output> {
1170public:
1172 template< typename Body >
1173 __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1174 : input_node<Output>(g, body)
1175 {
1176 }
1177
1178#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1179 template <typename Body, typename... Successors>
1180 source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1181 : input_node<Output>(successors, body) {
1182 }
1183#endif
1184};
1185#else // TBB_USE_SOURCE_NODE_AS_ALIAS
1187template < typename Output > class
1188__TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1189source_node : public graph_node, public sender< Output > {
1190public:
1192 typedef Output output_type;
1193
1196
1197 //Source node has no input type
1199
1200#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1201 typedef typename sender<output_type>::built_successors_type built_successors_type;
1202 typedef typename sender<output_type>::successor_list_type successor_list_type;
1203#endif
1204
1206 template< typename Body >
1207 __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1208 : graph_node(g), my_active(is_active), init_my_active(is_active),
1209 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1210 my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1211 my_reserved(false), my_has_cached_item(false)
1212 {
1213 my_successors.set_owner(this);
1214 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1215 static_cast<sender<output_type> *>(this), this->my_body );
1216 }
1217
1218#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1219 template <typename Body, typename... Successors>
1220 source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1221 : source_node(successors.graph_reference(), body, is_active) {
1222 make_edges(*this, successors);
1223 }
1224#endif
1225
1228 graph_node(src.my_graph), sender<Output>(),
1229 my_active(src.init_my_active),
1230 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1231 my_reserved(false), my_has_cached_item(false)
1232 {
1233 my_successors.set_owner(this);
1234 tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1235 static_cast<sender<output_type> *>(this), this->my_body );
1236 }
1237
1239 ~source_node() { delete my_body; delete my_init_body; }
1240
1241#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1242 void set_name( const char *name ) __TBB_override {
1244 }
1245#endif
1246
1249 spin_mutex::scoped_lock lock(my_mutex);
1250 my_successors.register_successor(r);
1251 if ( my_active )
1252 spawn_put();
1253 return true;
1254 }
1255
1258 spin_mutex::scoped_lock lock(my_mutex);
1259 my_successors.remove_successor(r);
1260 return true;
1261 }
1262
1263#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264
1265 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1266
1267 void internal_add_built_successor( successor_type &r) __TBB_override {
1268 spin_mutex::scoped_lock lock(my_mutex);
1269 my_successors.internal_add_built_successor(r);
1270 }
1271
1272 void internal_delete_built_successor( successor_type &r) __TBB_override {
1273 spin_mutex::scoped_lock lock(my_mutex);
1274 my_successors.internal_delete_built_successor(r);
1275 }
1276
1277 size_t successor_count() __TBB_override {
1278 spin_mutex::scoped_lock lock(my_mutex);
1279 return my_successors.successor_count();
1280 }
1281
1282 void copy_successors(successor_list_type &v) __TBB_override {
1283 spin_mutex::scoped_lock l(my_mutex);
1284 my_successors.copy_successors(v);
1285 }
1286#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1287
1290 spin_mutex::scoped_lock lock(my_mutex);
1291 if ( my_reserved )
1292 return false;
1293
1294 if ( my_has_cached_item ) {
1295 v = my_cached_item;
1296 my_has_cached_item = false;
1297 return true;
1298 }
1299 // we've been asked to provide an item, but we have none. enqueue a task to
1300 // provide one.
1301 spawn_put();
1302 return false;
1303 }
1304
1307 spin_mutex::scoped_lock lock(my_mutex);
1308 if ( my_reserved ) {
1309 return false;
1310 }
1311
1312 if ( my_has_cached_item ) {
1313 v = my_cached_item;
1314 my_reserved = true;
1315 return true;
1316 } else {
1317 return false;
1318 }
1319 }
1320
1322
1324 spin_mutex::scoped_lock lock(my_mutex);
1325 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1326 my_reserved = false;
1327 if(!my_successors.empty())
1328 spawn_put();
1329 return true;
1330 }
1331
1334 spin_mutex::scoped_lock lock(my_mutex);
1335 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1336 my_reserved = false;
1337 my_has_cached_item = false;
1338 if ( !my_successors.empty() ) {
1339 spawn_put();
1340 }
1341 return true;
1342 }
1343
1345 void activate() {
1346 spin_mutex::scoped_lock lock(my_mutex);
1347 my_active = true;
1348 if (!my_successors.empty())
1349 spawn_put();
1350 }
1351
1352 template<typename Body>
1354 internal::source_body<output_type> &body_ref = *this->my_body;
1355 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1356 }
1357
1358#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1359 void extract( ) __TBB_override {
1360 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1361 my_active = init_my_active;
1362 my_reserved = false;
1363 if(my_has_cached_item) my_has_cached_item = false;
1364 }
1365#endif
1366
1367protected:
1368
1371 my_active = init_my_active;
1372 my_reserved =false;
1373 if(my_has_cached_item) {
1374 my_has_cached_item = false;
1375 }
1376 if(f & rf_clear_edges) my_successors.clear();
1377 if(f & rf_reset_bodies) {
1378 internal::source_body<output_type> *tmp = my_init_body->clone();
1379 delete my_body;
1380 my_body = tmp;
1381 }
1382 if(my_active)
1383 internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1384 }
1385
1386private:
1396
1397 // used by apply_body_bypass, can invoke body of node.
1399 spin_mutex::scoped_lock lock(my_mutex);
1400 if ( my_reserved ) {
1401 return false;
1402 }
1403 if ( !my_has_cached_item ) {
1405 bool r = (*my_body)(my_cached_item);
1406 tbb::internal::fgt_end_body( my_body );
1407 if (r) {
1408 my_has_cached_item = true;
1409 }
1410 }
1411 if ( my_has_cached_item ) {
1412 v = my_cached_item;
1413 my_reserved = true;
1414 return true;
1415 } else {
1416 return false;
1417 }
1418 }
1419
1420 // when resetting, and if the source_node was created with my_active == true, then
1421 // when we reset the node we must store a task to run the node, and spawn it only
1422 // after the reset is complete and is_active() is again true. This is why we don't
1423 // test for is_active() here.
1425 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1427 }
1428
1430 void spawn_put( ) {
1431 if(internal::is_graph_active(this->my_graph)) {
1432 internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1433 }
1434 }
1435
1436 friend class internal::source_task_bypass< source_node< output_type > >;
1439 output_type v;
1440 if ( !try_reserve_apply_body(v) )
1441 return NULL;
1442
1443 task *last_task = my_successors.try_put_task(v);
1444 if ( last_task )
1445 try_consume();
1446 else
1447 try_release();
1448 return last_task;
1449 }
1450}; // class source_node
1451#endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1452
1454template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1455 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1457 : public graph_node
1458#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1459 , public internal::function_input< Input, Output, Policy, Allocator >
1460#else
1461 , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1462#endif
1463 , public internal::function_output<Output> {
1464
1465#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1466 typedef Allocator internals_allocator;
1467#else
1469
1472 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1473 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1474 );
1475#endif
1476
1477public:
1478 typedef Input input_type;
1479 typedef Output output_type;
1485#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1486 typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1487 typedef typename fOutput_type::successor_list_type successor_list_type;
1488#endif
1490
1492 // input_queue_type is allocated here, but destroyed in the function_input_base.
1493 // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1494 // be done in one place. This would be an interface-breaking change.
1495 template< typename Body >
1498 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1499#else
1501#endif
1503 fOutput_type(g) {
1504 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1505 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1506 }
1507
1508#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1509 template <typename Body>
1510 function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1511 : function_node(g, concurrency, body, Policy(), priority) {}
1512#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1513
1514#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1515 template <typename Body, typename... Args>
1516 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1519 make_edges_in_order(nodes, *this);
1520 }
1521
1522#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1523 template <typename Body, typename... Args>
1524 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1525 : function_node(nodes, concurrency, body, Policy(), priority) {}
1526#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1527#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1528
1531 graph_node(src.my_graph),
1532 input_impl_type(src),
1533 fOutput_type(src.my_graph) {
1534 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1535 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1536 }
1537
1538#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1539 void set_name( const char *name ) __TBB_override {
1541 }
1542#endif
1543
1544#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1545 void extract( ) __TBB_override {
1546 my_predecessors.built_predecessors().receiver_extract(*this);
1547 successors().built_successors().sender_extract(*this);
1548 }
1549#endif
1550
1551protected:
1552 template< typename R, typename B > friend class run_and_put_task;
1553 template<typename X, typename Y> friend class internal::broadcast_cache;
1554 template<typename X, typename Y> friend class internal::round_robin_cache;
1556
1558
1561 // TODO: use clear() instead.
1562 if(f & rf_clear_edges) {
1563 successors().clear();
1565 }
1566 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1567 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1568 }
1569
1570}; // class function_node
1571
1573// Output is a tuple of output types.
1574template<typename Input, typename Output, typename Policy = queueing,
1575 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1577 public graph_node,
1579 <
1580 Input,
1581 typename internal::wrap_tuple_elements<
1582 tbb::flow::tuple_size<Output>::value, // #elements in tuple
1583 internal::multifunction_output, // wrap this around each element
1584 Output // the tuple providing the types
1585 >::type,
1586 Policy,
1587#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1588 Allocator
1589#else
1590 cache_aligned_allocator<Input>
1591#endif
1592 > {
1593#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1594 typedef Allocator internals_allocator;
1595#else
1597
1600 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1601 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1602 );
1603#endif
1604
1605protected:
1607public:
1608 typedef Input input_type;
1614private:
1616public:
1617 template<typename Body>
1619 graph &g, size_t concurrency,
1621 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1622#else
1624#endif
1626 tbb::internal::fgt_multioutput_node_with_body<N>(
1627 CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1628 &this->my_graph, static_cast<receiver<input_type> *>(this),
1629 this->output_ports(), this->my_body
1630 );
1631 }
1632
1633#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1634 template <typename Body>
1636 : multifunction_node(g, concurrency, body, Policy(), priority) {}
1637#endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1638
1639#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1640 template <typename Body, typename... Args>
1641 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1644 make_edges_in_order(nodes, *this);
1645 }
1646
1647#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1648 template <typename Body, typename... Args>
1649 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1650 : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1651#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1652#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1653
1655 graph_node(other.my_graph), input_impl_type(other) {
1656 tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1657 &this->my_graph, static_cast<receiver<input_type> *>(this),
1658 this->output_ports(), this->my_body );
1659 }
1660
1661#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1662 void set_name( const char *name ) __TBB_override {
1664 }
1665#endif
1666
1667#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1668 void extract( ) __TBB_override {
1669 my_predecessors.built_predecessors().receiver_extract(*this);
1670 input_impl_type::extract();
1671 }
1672#endif
1673 // all the guts are in multifunction_input...
1674protected:
1676}; // multifunction_node
1677
1679// successors. The node has unlimited concurrency, so it does not reject inputs.
1680template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1681class split_node : public graph_node, public receiver<TupleType> {
1684public:
1685 typedef TupleType input_type;
1686#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1687 typedef Allocator allocator_type;
1688#else
1691 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1692 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1693 );
1694#endif
1695#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1697 typedef typename base_type::predecessor_list_type predecessor_list_type;
1698 typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1699 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1700#endif
1701
1702 typedef typename internal::wrap_tuple_elements<
1703 N, // #elements in tuple
1704 internal::multifunction_output, // wrap this around each element
1705 TupleType // the tuple providing the types
1707
1709 : graph_node(g),
1711 {
1712 tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1713 static_cast<receiver<input_type> *>(this), this->output_ports());
1714 }
1715
1716#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1717 template <typename... Args>
1718 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1719 make_edges_in_order(nodes, *this);
1720 }
1721#endif
1722
1724 : graph_node(other.my_graph), base_type(other),
1726 {
1727 tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1728 static_cast<receiver<input_type> *>(this), this->output_ports());
1729 }
1730
1731#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1732 void set_name( const char *name ) __TBB_override {
1734 }
1735#endif
1736
1738
1739protected:
1740 task *try_put_task(const TupleType& t) __TBB_override {
1741 // Sending split messages in parallel is not justified, as overheads would prevail.
1742 // Also, we do not have successors here. So we just tell the task returned here is successful.
1744 }
1746 if (f & rf_clear_edges)
1748
1750 }
1753 return my_graph;
1754 }
1755#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756private:
1757 void extract() __TBB_override {}
1758
1760 void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1761
1763 void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1764
1765 size_t predecessor_count() __TBB_override { return 0; }
1766
1767 void copy_predecessors(predecessor_list_type&) __TBB_override {}
1768
1769 built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1770
1772 built_predecessors_type my_predessors;
1773#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1774
1775private:
1777};
1778
1780template <typename Output, typename Policy = internal::Policy<void> >
1781class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1782 public internal::function_output<Output> {
1783public:
1785 typedef Output output_type;
1788 typedef typename input_impl_type::predecessor_type predecessor_type;
1790
1792 template <typename Body >
1794 graph &g,
1796 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1797#else
1799#endif
1800 ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1801 fOutput_type(g) {
1802 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1803
1804 static_cast<receiver<input_type> *>(this),
1805 static_cast<sender<output_type> *>(this), this->my_body );
1806 }
1807
1808#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1809 template <typename Body>
1810 continue_node( graph& g, Body body, node_priority_t priority )
1811 : continue_node(g, body, Policy(), priority) {}
1812#endif
1813
1814#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1815 template <typename Body, typename... Args>
1816 continue_node( const node_set<Args...>& nodes, Body body,
1818 : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1819 make_edges_in_order(nodes, *this);
1820 }
1821#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1822 template <typename Body, typename... Args>
1823 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1824 : continue_node(nodes, body, Policy(), priority) {}
1825#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1826#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1827
1829 template <typename Body >
1831 graph &g, int number_of_predecessors,
1833 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1834#else
1836#endif
1837 ) : graph_node(g)
1838 , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1839 fOutput_type(g) {
1840 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1841 static_cast<receiver<input_type> *>(this),
1842 static_cast<sender<output_type> *>(this), this->my_body );
1843 }
1844
1845#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1846 template <typename Body>
1847 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1848 : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1849#endif
1850
1851#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1852 template <typename Body, typename... Args>
1853 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1854 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1855 : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1856 make_edges_in_order(nodes, *this);
1857 }
1858
1859#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1860 template <typename Body, typename... Args>
1861 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1862 Body body, node_priority_t priority )
1863 : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1864#endif
1865#endif
1866
1870 internal::function_output<Output>(src.my_graph) {
1871 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1872 static_cast<receiver<input_type> *>(this),
1873 static_cast<sender<output_type> *>(this), this->my_body );
1874 }
1875
1876#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1877 void set_name( const char *name ) __TBB_override {
1879 }
1880#endif
1881
1882#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1883 void extract() __TBB_override {
1884 input_impl_type::my_built_predecessors.receiver_extract(*this);
1885 successors().built_successors().sender_extract(*this);
1886 }
1887#endif
1888
1889protected:
1890 template< typename R, typename B > friend class run_and_put_task;
1891 template<typename X, typename Y> friend class internal::broadcast_cache;
1892 template<typename X, typename Y> friend class internal::round_robin_cache;
1893 using input_impl_type::try_put_task;
1895
1898 if(f & rf_clear_edges)successors().clear();
1899 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1900 }
1901}; // continue_node
1902
1904template <typename T>
1905class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1906public:
1907 typedef T input_type;
1908 typedef T output_type;
1911#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1912 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1913 typedef typename sender<output_type>::successor_list_type successor_list_type;
1914#endif
1915private:
1917#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1918 internal::edge_container<predecessor_type> my_built_predecessors;
1919 spin_mutex pred_mutex; // serialize accesses on edge_container
1920#endif
1921public:
1922
1924 my_successors.set_owner( this );
1925 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1926 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1927 }
1928
1929#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1930 template <typename... Args>
1931 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1932 make_edges_in_order(nodes, *this);
1933 }
1934#endif
1935
1936 // Copy constructor
1938 graph_node(src.my_graph), receiver<T>(), sender<T>()
1939 {
1940 my_successors.set_owner( this );
1941 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1942 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1943 }
1944
1945#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1946 void set_name( const char *name ) __TBB_override {
1948 }
1949#endif
1950
1953 my_successors.register_successor( r );
1954 return true;
1955 }
1956
1959 my_successors.remove_successor( r );
1960 return true;
1961 }
1962
1963#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1964 typedef typename sender<T>::built_successors_type built_successors_type;
1965
1966 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1967
1968 void internal_add_built_successor(successor_type &r) __TBB_override {
1969 my_successors.internal_add_built_successor(r);
1970 }
1971
1972 void internal_delete_built_successor(successor_type &r) __TBB_override {
1973 my_successors.internal_delete_built_successor(r);
1974 }
1975
1976 size_t successor_count() __TBB_override {
1977 return my_successors.successor_count();
1978 }
1979
1980 void copy_successors(successor_list_type &v) __TBB_override {
1981 my_successors.copy_successors(v);
1982 }
1983
1984 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1985
1986 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1987
1988 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1989 spin_mutex::scoped_lock l(pred_mutex);
1990 my_built_predecessors.add_edge(p);
1991 }
1992
1993 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1994 spin_mutex::scoped_lock l(pred_mutex);
1995 my_built_predecessors.delete_edge(p);
1996 }
1997
1998 size_t predecessor_count() __TBB_override {
1999 spin_mutex::scoped_lock l(pred_mutex);
2000 return my_built_predecessors.edge_count();
2001 }
2002
2003 void copy_predecessors(predecessor_list_type &v) __TBB_override {
2004 spin_mutex::scoped_lock l(pred_mutex);
2005 my_built_predecessors.copy_edges(v);
2006 }
2007
2008 void extract() __TBB_override {
2009 my_built_predecessors.receiver_extract(*this);
2010 my_successors.built_successors().sender_extract(*this);
2011 }
2012#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2013
2014protected:
2015 template< typename R, typename B > friend class run_and_put_task;
2016 template<typename X, typename Y> friend class internal::broadcast_cache;
2017 template<typename X, typename Y> friend class internal::round_robin_cache;
2020 task *new_task = my_successors.try_put_task(t);
2021 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2022 return new_task;
2023 }
2024
2026 return my_graph;
2027 }
2028
2030
2032 if (f&rf_clear_edges) {
2033 my_successors.clear();
2034#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2035 my_built_predecessors.clear();
2036#endif
2037 }
2038 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2039 }
2040}; // broadcast_node
2041
2043template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2045 : public graph_node
2046#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2047 , public internal::reservable_item_buffer< T, Allocator >
2048#else
2049 , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2050#endif
2051 , public receiver<T>, public sender<T> {
2052#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2053 typedef Allocator internals_allocator;
2054#else
2056#endif
2057public:
2058 typedef T input_type;
2059 typedef T output_type;
2063#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2064 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2065 typedef typename sender<output_type>::successor_list_type successor_list_type;
2066#endif
2067#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2070 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2071 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2072 );
2073#endif
2074
2075protected:
2076 typedef size_t size_type;
2078
2079#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2080 internal::edge_container<predecessor_type> my_built_predecessors;
2081#endif
2082
2084
2086#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2087 , add_blt_succ, del_blt_succ,
2088 add_blt_pred, del_blt_pred,
2089 blt_succ_cnt, blt_pred_cnt,
2090 blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
2091#endif
2093
2094 // implements the aggregator_operation concept
2095 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2096 public:
2097 char type;
2098#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2099 task * ltask;
2100 union {
2104 size_t cnt_val;
2105 successor_list_type *svec;
2106 predecessor_list_type *pvec;
2107 };
2108#else
2112#endif
2113 buffer_operation(const T& e, op_type t) : type(char(t))
2114
2115#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2116 , ltask(NULL), elem(const_cast<T*>(&e))
2117#else
2118 , elem(const_cast<T*>(&e)) , ltask(NULL)
2119#endif
2120 {}
2121 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
2122 };
2123
2128
2129 virtual void handle_operations(buffer_operation *op_list) {
2130 handle_operations_impl(op_list, this);
2131 }
2132
2133 template<typename derived_type>
2134 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2135 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2136
2137 buffer_operation *tmp = NULL;
2138 bool try_forwarding = false;
2139 while (op_list) {
2140 tmp = op_list;
2141 op_list = op_list->next;
2142 switch (tmp->type) {
2143 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2144 case rem_succ: internal_rem_succ(tmp); break;
2145 case req_item: internal_pop(tmp); break;
2146 case res_item: internal_reserve(tmp); break;
2147 case rel_res: internal_release(tmp); try_forwarding = true; break;
2148 case con_res: internal_consume(tmp); try_forwarding = true; break;
2149 case put_item: try_forwarding = internal_push(tmp); break;
2150 case try_fwd_task: internal_forward_task(tmp); break;
2151#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2152 // edge recording
2153 case add_blt_succ: internal_add_built_succ(tmp); break;
2154 case del_blt_succ: internal_del_built_succ(tmp); break;
2155 case add_blt_pred: internal_add_built_pred(tmp); break;
2156 case del_blt_pred: internal_del_built_pred(tmp); break;
2157 case blt_succ_cnt: internal_succ_cnt(tmp); break;
2158 case blt_pred_cnt: internal_pred_cnt(tmp); break;
2159 case blt_succ_cpy: internal_copy_succs(tmp); break;
2160 case blt_pred_cpy: internal_copy_preds(tmp); break;
2161#endif
2162 }
2163 }
2164
2165 derived->order();
2166
2167 if (try_forwarding && !forwarder_busy) {
2169 forwarder_busy = true;
2170 task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2171 forward_task_bypass<class_type>(*this);
2172 // tmp should point to the last item handled by the aggregator. This is the operation
2173 // the handling thread enqueued. So modifying that record will be okay.
2174 // workaround for icc bug
2175 tbb::task *z = tmp->ltask;
2176 graph &g = this->my_graph;
2177 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
2178 }
2179 }
2180 } // handle_operations
2181
2183 return op_data.ltask;
2184 }
2185
2187 task *ft = grab_forwarding_task(op_data);
2188 if(ft) {
2190 return true;
2191 }
2192 return false;
2193 }
2194
2196 virtual task *forward_task() {
2198 task *last_task = NULL;
2199 do {
2200 op_data.status = internal::WAIT;
2201 op_data.ltask = NULL;
2202 my_aggregator.execute(&op_data);
2203
2204 // workaround for icc bug
2205 tbb::task *xtask = op_data.ltask;
2206 graph& g = this->my_graph;
2207 last_task = combine_tasks(g, last_task, xtask);
2208 } while (op_data.status ==internal::SUCCEEDED);
2209 return last_task;
2210 }
2211
2214 my_successors.register_successor(*(op->r));
2216 }
2217
2220 my_successors.remove_successor(*(op->r));
2222 }
2223
2224#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2225 typedef typename sender<T>::built_successors_type built_successors_type;
2226
2227 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2228
2229 virtual void internal_add_built_succ(buffer_operation *op) {
2230 my_successors.internal_add_built_successor(*(op->r));
2232 }
2233
2234 virtual void internal_del_built_succ(buffer_operation *op) {
2235 my_successors.internal_delete_built_successor(*(op->r));
2237 }
2238
2239 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2240
2241 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2242
2243 virtual void internal_add_built_pred(buffer_operation *op) {
2244 my_built_predecessors.add_edge(*(op->p));
2246 }
2247
2248 virtual void internal_del_built_pred(buffer_operation *op) {
2249 my_built_predecessors.delete_edge(*(op->p));
2251 }
2252
2253 virtual void internal_succ_cnt(buffer_operation *op) {
2254 op->cnt_val = my_successors.successor_count();
2256 }
2257
2258 virtual void internal_pred_cnt(buffer_operation *op) {
2259 op->cnt_val = my_built_predecessors.edge_count();
2261 }
2262
2263 virtual void internal_copy_succs(buffer_operation *op) {
2264 my_successors.copy_successors(*(op->svec));
2266 }
2267
2268 virtual void internal_copy_preds(buffer_operation *op) {
2269 my_built_predecessors.copy_edges(*(op->pvec));
2271 }
2272
2273#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2274
2275private:
2276 void order() {}
2277
2279 return this->my_item_valid(this->my_tail - 1);
2280 }
2281
2282 void try_put_and_add_task(task*& last_task) {
2283 task *new_task = my_successors.try_put_task(this->back());
2284 if (new_task) {
2285 // workaround for icc bug
2286 graph& g = this->my_graph;
2287 last_task = combine_tasks(g, last_task, new_task);
2288 this->destroy_back();
2289 }
2290 }
2291
2292protected:
2296 }
2297
2298 template<typename derived_type>
2299 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2300 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2301
2302 if (this->my_reserved || !derived->is_item_valid()) {
2304 this->forwarder_busy = false;
2305 return;
2306 }
2307 // Try forwarding, giving each successor a chance
2308 task * last_task = NULL;
2309 size_type counter = my_successors.size();
2310 for (; counter > 0 && derived->is_item_valid(); --counter)
2311 derived->try_put_and_add_task(last_task);
2312
2313 op->ltask = last_task; // return task
2314 if (last_task && !counter) {
2316 }
2317 else {
2319 forwarder_busy = false;
2320 }
2321 }
2322
2324 this->push_back(*(op->elem));
2326 return true;
2327 }
2328
2329 virtual void internal_pop(buffer_operation *op) {
2330 if(this->pop_back(*(op->elem))) {
2332 }
2333 else {
2335 }
2336 }
2337
2339 if(this->reserve_front(*(op->elem))) {
2341 }
2342 else {
2344 }
2345 }
2346
2348 this->consume_front();
2350 }
2351
2353 this->release_front();
2355 }
2356
2357public:
2361 sender<T>(), forwarder_busy(false)
2362 {
2363 my_successors.set_owner(this);
2364 my_aggregator.initialize_handler(handler_type(this));
2365 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2366 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2367 }
2368
2369#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2370 template <typename... Args>
2371 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2372 make_edges_in_order(nodes, *this);
2373 }
2374#endif
2375
2379 receiver<T>(), sender<T>(), forwarder_busy(false)
2380 {
2381 my_successors.set_owner(this);
2382 my_aggregator.initialize_handler(handler_type(this));
2383 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2384 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2385 }
2386
2387#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2388 void set_name( const char *name ) __TBB_override {
2390 }
2391#endif
2392
2393 //
2394 // message sender implementation
2395 //
2396
2398
2400 buffer_operation op_data(reg_succ);
2401 op_data.r = &r;
2402 my_aggregator.execute(&op_data);
2403 (void)enqueue_forwarding_task(op_data);
2404 return true;
2405 }
2406
2407#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2408 void internal_add_built_successor( successor_type &r) __TBB_override {
2409 buffer_operation op_data(add_blt_succ);
2410 op_data.r = &r;
2411 my_aggregator.execute(&op_data);
2412 }
2413
2414 void internal_delete_built_successor( successor_type &r) __TBB_override {
2415 buffer_operation op_data(del_blt_succ);
2416 op_data.r = &r;
2417 my_aggregator.execute(&op_data);
2418 }
2419
2420 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2421 buffer_operation op_data(add_blt_pred);
2422 op_data.p = &p;
2423 my_aggregator.execute(&op_data);
2424 }
2425
2426 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2427 buffer_operation op_data(del_blt_pred);
2428 op_data.p = &p;
2429 my_aggregator.execute(&op_data);
2430 }
2431
2432 size_t predecessor_count() __TBB_override {
2433 buffer_operation op_data(blt_pred_cnt);
2434 my_aggregator.execute(&op_data);
2435 return op_data.cnt_val;
2436 }
2437
2438 size_t successor_count() __TBB_override {
2439 buffer_operation op_data(blt_succ_cnt);
2440 my_aggregator.execute(&op_data);
2441 return op_data.cnt_val;
2442 }
2443
2444 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2445 buffer_operation op_data(blt_pred_cpy);
2446 op_data.pvec = &v;
2447 my_aggregator.execute(&op_data);
2448 }
2449
2450 void copy_successors( successor_list_type &v ) __TBB_override {
2451 buffer_operation op_data(blt_succ_cpy);
2452 op_data.svec = &v;
2453 my_aggregator.execute(&op_data);
2454 }
2455
2456#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2457
2459
2462 r.remove_predecessor(*this);
2463 buffer_operation op_data(rem_succ);
2464 op_data.r = &r;
2465 my_aggregator.execute(&op_data);
2466 // even though this operation does not cause a forward, if we are the handler, and
2467 // a forward is scheduled, we may be the first to reach this point after the aggregator,
2468 // and so should check for the task.
2469 (void)enqueue_forwarding_task(op_data);
2470 return true;
2471 }
2472
2474
2477 buffer_operation op_data(req_item);
2478 op_data.elem = &v;
2479 my_aggregator.execute(&op_data);
2480 (void)enqueue_forwarding_task(op_data);
2481 return (op_data.status==internal::SUCCEEDED);
2482 }
2483
2485
2488 buffer_operation op_data(res_item);
2489 op_data.elem = &v;
2490 my_aggregator.execute(&op_data);
2491 (void)enqueue_forwarding_task(op_data);
2492 return (op_data.status==internal::SUCCEEDED);
2493 }
2494
2496
2498 buffer_operation op_data(rel_res);
2499 my_aggregator.execute(&op_data);
2500 (void)enqueue_forwarding_task(op_data);
2501 return true;
2502 }
2503
2505
2507 buffer_operation op_data(con_res);
2508 my_aggregator.execute(&op_data);
2509 (void)enqueue_forwarding_task(op_data);
2510 return true;
2511 }
2512
2513protected:
2514
2515 template< typename R, typename B > friend class run_and_put_task;
2516 template<typename X, typename Y> friend class internal::broadcast_cache;
2517 template<typename X, typename Y> friend class internal::round_robin_cache;
2520 buffer_operation op_data(t, put_item);
2521 my_aggregator.execute(&op_data);
2522 task *ft = grab_forwarding_task(op_data);
2523 // sequencer_nodes can return failure (if an item has been previously inserted)
2524 // We have to spawn the returned task if our own operation fails.
2525
2526 if(ft && op_data.status ==internal::FAILED) {
2527 // we haven't succeeded queueing the item, but for some reason the
2528 // call returned a task (if another request resulted in a successful
2529 // forward this could happen.) Queue the task and reset the pointer.
2531 }
2532 else if(!ft && op_data.status ==internal::SUCCEEDED) {
2534 }
2535 return ft;
2536 }
2537
2539 return my_graph;
2540 }
2541
2543
2544#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2545public:
2546 void extract() __TBB_override {
2547 my_built_predecessors.receiver_extract(*this);
2548 my_successors.built_successors().sender_extract(*this);
2549 }
2550#endif
2551
2552protected:
2555 // TODO: just clear structures
2556 if (f&rf_clear_edges) {
2557 my_successors.clear();
2558#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2559 my_built_predecessors.clear();
2560#endif
2561 }
2562 forwarder_busy = false;
2563 }
2564}; // buffer_node
2565
2567template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2568class queue_node : public buffer_node<T, Allocator> {
2569#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2572 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2573 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2574 );
2575#endif
2576protected:
2581
2582private:
2583 template<typename, typename> friend class buffer_node;
2584
2586 return this->my_item_valid(this->my_head);
2587 }
2588
2589 void try_put_and_add_task(task*& last_task) {
2590 task *new_task = this->my_successors.try_put_task(this->front());
2591 if (new_task) {
2592 // workaround for icc bug
2593 graph& graph_ref = this->graph_reference();
2594 last_task = combine_tasks(graph_ref, last_task, new_task);
2595 this->destroy_front();
2596 }
2597 }
2598
2599protected:
2600 void internal_forward_task(queue_operation *op) __TBB_override {
2601 this->internal_forward_task_impl(op, this);
2602 }
2603
2605 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2607 }
2608 else {
2609 this->pop_front(*(op->elem));
2611 }
2612 }
2614 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2616 }
2617 else {
2618 this->reserve_front(*(op->elem));
2620 }
2621 }
2623 this->consume_front();
2625 }
2626
2627public:
2628 typedef T input_type;
2629 typedef T output_type;
2632
2635 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2636 static_cast<receiver<input_type> *>(this),
2637 static_cast<sender<output_type> *>(this) );
2638 }
2639
2640#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2641 template <typename... Args>
2642 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2643 make_edges_in_order(nodes, *this);
2644 }
2645#endif
2646
2649 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2650 static_cast<receiver<input_type> *>(this),
2651 static_cast<sender<output_type> *>(this) );
2652 }
2653
2654#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2655 void set_name( const char *name ) __TBB_override {
2657 }
2658#endif
2659
2660protected:
2663 }
2664}; // queue_node
2665
2667template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2668class sequencer_node : public queue_node<T, Allocator> {
2670 // my_sequencer should be a benign function and must be callable
2671 // from a parallel context. Does this mean it needn't be reset?
2672public:
2673#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2676 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2677 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2678 );
2679#endif
2680 typedef T input_type;
2681 typedef T output_type;
2684
2686 template< typename Sequencer >
2687 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2688 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2689 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2690 static_cast<receiver<input_type> *>(this),
2691 static_cast<sender<output_type> *>(this) );
2692 }
2693
2694#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2695 template <typename Sequencer, typename... Args>
2696 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2697 : sequencer_node(nodes.graph_reference(), s) {
2698 make_edges_in_order(nodes, *this);
2699 }
2700#endif
2701
2704 my_sequencer( src.my_sequencer->clone() ) {
2705 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2706 static_cast<receiver<input_type> *>(this),
2707 static_cast<sender<output_type> *>(this) );
2708 }
2709
2712
2713#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2714 void set_name( const char *name ) __TBB_override {
2716 }
2717#endif
2718
2719protected:
2722
2723private:
2725 size_type tag = (*my_sequencer)(*(op->elem));
2726#if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2727 if (tag < this->my_head) {
2728 // have already emitted a message with this tag
2730 return false;
2731 }
2732#endif
2733 // cannot modify this->my_tail now; the buffer would be inconsistent.
2734 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2735
2736 if (this->size(new_tail) > this->capacity()) {
2737 this->grow_my_array(this->size(new_tail));
2738 }
2739 this->my_tail = new_tail;
2740
2741 const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2742 __TBB_store_with_release(op->status, res);
2743 return res ==internal::SUCCEEDED;
2744 }
2745}; // sequencer_node
2746
2748template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2749class priority_queue_node : public buffer_node<T, Allocator> {
2750public:
2751#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2754 "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2755 "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2756 );
2757#endif
2758 typedef T input_type;
2759 typedef T output_type;
2764
2766 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2767 : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2768 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2769 static_cast<receiver<input_type> *>(this),
2770 static_cast<sender<output_type> *>(this) );
2771 }
2772
2773#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2774 template <typename... Args>
2775 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2776 : priority_queue_node(nodes.graph_reference(), comp) {
2777 make_edges_in_order(nodes, *this);
2778 }
2779#endif
2780
2783 : buffer_node<T, Allocator>(src), mark(0)
2784 {
2785 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2786 static_cast<receiver<input_type> *>(this),
2787 static_cast<sender<output_type> *>(this) );
2788 }
2789
2790#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2791 void set_name( const char *name ) __TBB_override {
2793 }
2794#endif
2795
2796protected:
2797
2799 mark = 0;
2801 }
2802
2806
2809 this->internal_forward_task_impl(op, this);
2810 }
2811
2813 this->handle_operations_impl(op_list, this);
2814 }
2815
2817 prio_push(*(op->elem));
2819 return true;
2820 }
2821
2823 // if empty or already reserved, don't pop
2824 if ( this->my_reserved == true || this->my_tail == 0 ) {
2826 return;
2827 }
2828
2829 *(op->elem) = prio();
2831 prio_pop();
2832
2833 }
2834
2835 // pops the highest-priority item, saves copy
2837 if (this->my_reserved == true || this->my_tail == 0) {
2839 return;
2840 }
2841 this->my_reserved = true;
2842 *(op->elem) = prio();
2843 reserved_item = *(op->elem);
2845 prio_pop();
2846 }
2847
2850 this->my_reserved = false;
2852 }
2853
2854 void internal_release(prio_operation *op) __TBB_override {
2857 this->my_reserved = false;
2859 }
2860
2861private:
2862 template<typename, typename> friend class buffer_node;
2863
2864 void order() {
2865 if (mark < this->my_tail) heapify();
2866 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2867 }
2868
2870 return this->my_tail > 0;
2871 }
2872
2873 void try_put_and_add_task(task*& last_task) {
2874 task * new_task = this->my_successors.try_put_task(this->prio());
2875 if (new_task) {
2876 // workaround for icc bug
2877 graph& graph_ref = this->graph_reference();
2878 last_task = combine_tasks(graph_ref, last_task, new_task);
2879 prio_pop();
2880 }
2881 }
2882
2883private:
2884 Compare compare;
2886
2888
2889 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2891 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2892 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2893 }
2894
2895 // prio_push: checks that the item will fit, expand array if necessary, put at end
2896 void prio_push(const T &src) {
2897 if ( this->my_tail >= this->my_array_size )
2898 this->grow_my_array( this->my_tail + 1 );
2899 (void) this->place_item(this->my_tail, src);
2900 ++(this->my_tail);
2901 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2902 }
2903
2904 // prio_pop: deletes highest priority item from the array, and if it is item
2905 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2906 // and mark. Assumes the array has already been tested for emptiness; no failure.
2907 void prio_pop() {
2908 if (prio_use_tail()) {
2909 // there are newly pushed elements; last one higher than top
2910 // copy the data
2911 this->destroy_item(this->my_tail-1);
2912 --(this->my_tail);
2913 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2914 return;
2915 }
2916 this->destroy_item(0);
2917 if(this->my_tail > 1) {
2918 // push the last element down heap
2919 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2920 this->move_item(0,this->my_tail - 1);
2921 }
2922 --(this->my_tail);
2923 if(mark > this->my_tail) --mark;
2924 if (this->my_tail > 1) // don't reheap for heap of size 1
2925 reheap();
2926 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2927 }
2928
2929 const T& prio() {
2930 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2931 }
2932
2933 // turn array into heap
2934 void heapify() {
2935 if(this->my_tail == 0) {
2936 mark = 0;
2937 return;
2938 }
2939 if (!mark) mark = 1;
2940 for (; mark<this->my_tail; ++mark) { // for each unheaped element
2941 size_type cur_pos = mark;
2942 input_type to_place;
2943 this->fetch_item(mark,to_place);
2944 do { // push to_place up the heap
2945 size_type parent = (cur_pos-1)>>1;
2946 if (!compare(this->get_my_item(parent), to_place))
2947 break;
2948 this->move_item(cur_pos, parent);
2949 cur_pos = parent;
2950 } while( cur_pos );
2951 (void) this->place_item(cur_pos, to_place);
2952 }
2953 }
2954
2955 // otherwise heapified array with new root element; rearrange to heap
2956 void reheap() {
2957 size_type cur_pos=0, child=1;
2958 while (child < mark) {
2959 size_type target = child;
2960 if (child+1<mark &&
2961 compare(this->get_my_item(child),
2962 this->get_my_item(child+1)))
2963 ++target;
2964 // target now has the higher priority child
2965 if (compare(this->get_my_item(target),
2966 this->get_my_item(cur_pos)))
2967 break;
2968 // swap
2969 this->swap_items(cur_pos, target);
2970 cur_pos = target;
2971 child = (cur_pos<<1)+1;
2972 }
2973 }
2974}; // priority_queue_node
2975
2976} // interfaceX
2977
2978namespace interface11 {
2979
2981
2984template< typename T, typename DecrementType=continue_msg >
2985class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2986public:
2987 typedef T input_type;
2988 typedef T output_type;
2991#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2992 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2993 typedef typename sender<output_type>::built_successors_type built_successors_type;
2994 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2995 typedef typename sender<output_type>::successor_list_type successor_list_type;
2996#endif
2997 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2998
2999private:
3001 size_t my_count; //number of successful puts
3002 size_t my_tries; //number of active put attempts
3006 __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
3007
3009
3010 // Let decrementer call decrement_counter()
3011 friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3012
3013 bool check_conditions() { // always called under lock
3014 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3015 }
3016
3017 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3019 input_type v;
3020 task *rval = NULL;
3021 bool reserved = false;
3022 {
3024 if ( check_conditions() )
3025 ++my_tries;
3026 else
3027 return NULL;
3028 }
3029
3030 //SUCCESS
3031 // if we can reserve and can put, we consume the reservation
3032 // we increment the count and decrement the tries
3033 if ( (my_predecessors.try_reserve(v)) == true ){
3034 reserved=true;
3035 if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036 {
3038 ++my_count;
3039 --my_tries;
3040 my_predecessors.try_consume();
3041 if ( check_conditions() ) {
3042 if ( internal::is_graph_active(this->my_graph) ) {
3043 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3046 }
3047 }
3048 }
3049 return rval;
3050 }
3051 }
3052 //FAILURE
3053 //if we can't reserve, we decrement the tries
3054 //if we can reserve but can't put, we decrement the tries and release the reservation
3055 {
3057 --my_tries;
3058 if (reserved) my_predecessors.try_release();
3059 if ( check_conditions() ) {
3060 if ( internal::is_graph_active(this->my_graph) ) {
3061 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3063 __TBB_ASSERT(!rval, "Have two tasks to handle");
3064 return rtask;
3065 }
3066 }
3067 return rval;
3068 }
3069 }
3070
3071 void forward() {
3072 __TBB_ASSERT(false, "Should never be called");
3073 return;
3074 }
3075
3076 task* decrement_counter( long long delta ) {
3077 {
3079 if( delta > 0 && size_t(delta) > my_count )
3080 my_count = 0;
3081 else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3083 else
3084 my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085 }
3086 return forward_task();
3087 }
3088
3089 void initialize() {
3090 my_predecessors.set_owner(this);
3091 my_successors.set_owner(this);
3092 decrement.set_owner(this);
3094 CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096 static_cast<sender<output_type> *>(this)
3097 );
3098 }
3099public:
3102
3103#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3105 "Deprecated interface of the limiter node can be used only in conjunction "
3106 "with continue_msg as the type of DecrementType template parameter." );
3107#endif // Check for incompatible interface
3108
3111 __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3112 : graph_node(g), my_threshold(threshold), my_count(0),
3114 my_tries(0), decrement(),
3115 init_decrement_predecessors(num_decrement_predecessors),
3116 decrement(num_decrement_predecessors)) {
3117 initialize();
3118 }
3119
3120#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3121 template <typename... Args>
3122 limiter_node(const node_set<Args...>& nodes, size_t threshold)
3123 : limiter_node(nodes.graph_reference(), threshold) {
3124 make_edges_in_order(nodes, *this);
3125 }
3126#endif
3127
3130 graph_node(src.my_graph), receiver<T>(), sender<T>(),
3133 my_tries(0), decrement(),
3134 init_decrement_predecessors(src.init_decrement_predecessors),
3135 decrement(src.init_decrement_predecessors)) {
3136 initialize();
3137 }
3138
3139#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3140 void set_name( const char *name ) __TBB_override {
3142 }
3143#endif
3144
3148 bool was_empty = my_successors.empty();
3149 my_successors.register_successor(r);
3150 //spawn a forward task if this is the only successor
3151 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152 if ( internal::is_graph_active(this->my_graph) ) {
3153 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3156 }
3157 }
3158 return true;
3159 }
3160
3162
3164 r.remove_predecessor(*this);
3165 my_successors.remove_successor(r);
3166 return true;
3167 }
3168
3169#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3170 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3171 built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3172
3173 void internal_add_built_successor(successor_type &src) __TBB_override {
3174 my_successors.internal_add_built_successor(src);
3175 }
3176
3177 void internal_delete_built_successor(successor_type &src) __TBB_override {
3178 my_successors.internal_delete_built_successor(src);
3179 }
3180
3181 size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3182
3183 void copy_successors(successor_list_type &v) __TBB_override {
3184 my_successors.copy_successors(v);
3185 }
3186
3187 void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3188 my_predecessors.internal_add_built_predecessor(src);
3189 }
3190
3191 void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3192 my_predecessors.internal_delete_built_predecessor(src);
3193 }
3194
3195 size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3196
3197 void copy_predecessors(predecessor_list_type &v) __TBB_override {
3198 my_predecessors.copy_predecessors(v);
3199 }
3200
3201 void extract() __TBB_override {
3202 my_count = 0;
3203 my_successors.built_successors().sender_extract(*this);
3204 my_predecessors.built_predecessors().receiver_extract(*this);
3205 decrement.built_predecessors().receiver_extract(decrement);
3206 }
3207#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3208
3212 my_predecessors.add( src );
3213 if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3214 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3217 }
3218 return true;
3219 }
3220
3223 my_predecessors.remove( src );
3224 return true;
3225 }
3226
3227protected:
3228
3229 template< typename R, typename B > friend class run_and_put_task;
3230 template<typename X, typename Y> friend class internal::broadcast_cache;
3231 template<typename X, typename Y> friend class internal::round_robin_cache;
3234 {
3236 if ( my_count + my_tries >= my_threshold )
3237 return NULL;
3238 else
3239 ++my_tries;
3240 }
3241
3242 task * rtask = my_successors.try_put_task(t);
3243
3244 if ( !rtask ) { // try_put_task failed.
3246 --my_tries;
3248 rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3250 }
3251 }
3252 else {
3254 ++my_count;
3255 --my_tries;
3256 }
3257 return rtask;
3258 }
3259
3261
3263 __TBB_ASSERT(false,NULL); // should never be called
3264 }
3265
3267 my_count = 0;
3268 if(f & rf_clear_edges) {
3269 my_predecessors.clear();
3270 my_successors.clear();
3271 }
3272 else
3273 {
3274 my_predecessors.reset( );
3275 }
3276 decrement.reset_receiver(f);
3277 }
3278}; // limiter_node
3279
3281
3287
3288template<typename OutputTuple, typename JP=queueing> class join_node;
3289
3290template<typename OutputTuple>
3291class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3292private:
3295public:
3296 typedef OutputTuple output_type;
3299 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3300 this->input_ports(), static_cast< sender< output_type > *>(this) );
3301 }
3302
3303#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3304 template <typename... Args>
3305 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3306 make_edges_in_order(nodes, *this);
3307 }
3308#endif
3309
3311 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3312 this->input_ports(), static_cast< sender< output_type > *>(this) );
3313 }
3314
3315#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3316 void set_name( const char *name ) __TBB_override {
3318 }
3319#endif
3320
3321};
3322
3323template<typename OutputTuple>
3324class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3325private:
3328public:
3329 typedef OutputTuple output_type;
3332 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3333 this->input_ports(), static_cast< sender< output_type > *>(this) );
3334 }
3335
3336#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3337 template <typename... Args>
3338 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3339 make_edges_in_order(nodes, *this);
3340 }
3341#endif
3342
3344 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3345 this->input_ports(), static_cast< sender< output_type > *>(this) );
3346 }
3347
3348#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349 void set_name( const char *name ) __TBB_override {
3351 }
3352#endif
3353
3354};
3355
3356// template for key_matching join_node
3357// tag_matching join_node is a specialization of key_matching, and is source-compatible.
3358template<typename OutputTuple, typename K, typename KHash>
3359class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3360 key_matching_port, OutputTuple, key_matching<K,KHash> > {
3361private:
3364public:
3365 typedef OutputTuple output_type;
3367
3368#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3370
3371#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3372 template <typename... Args>
3373 join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3374 : join_node(nodes.graph_reference()) {
3375 make_edges_in_order(nodes, *this);
3376 }
3377#endif
3378
3379#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3380
3381 template<typename __TBB_B0, typename __TBB_B1>
3382 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3383 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3384 this->input_ports(), static_cast< sender< output_type > *>(this) );
3385 }
3386 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3387 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3388 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3389 this->input_ports(), static_cast< sender< output_type > *>(this) );
3390 }
3391 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3392 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3393 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3394 this->input_ports(), static_cast< sender< output_type > *>(this) );
3395 }
3396 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3397 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3398 unfolded_type(g, b0, b1, b2, b3, b4) {
3399 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400 this->input_ports(), static_cast< sender< output_type > *>(this) );
3401 }
3402#if __TBB_VARIADIC_MAX >= 6
3403 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3404 typename __TBB_B5>
3405 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3406 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3407 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3408 this->input_ports(), static_cast< sender< output_type > *>(this) );
3409 }
3410#endif
3411#if __TBB_VARIADIC_MAX >= 7
3412 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3413 typename __TBB_B5, typename __TBB_B6>
3414 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3415 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3416 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3417 this->input_ports(), static_cast< sender< output_type > *>(this) );
3418 }
3419#endif
3420#if __TBB_VARIADIC_MAX >= 8
3421 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3422 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3423 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3424 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3425 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3426 this->input_ports(), static_cast< sender< output_type > *>(this) );
3427 }
3428#endif
3429#if __TBB_VARIADIC_MAX >= 9
3430 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3431 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3432 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3433 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3434 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3435 this->input_ports(), static_cast< sender< output_type > *>(this) );
3436 }
3437#endif
3438#if __TBB_VARIADIC_MAX >= 10
3439 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3440 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3441 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3442 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3443 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3444 this->input_ports(), static_cast< sender< output_type > *>(this) );
3445 }
3446#endif
3447
3448#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3449 template <typename... Args, typename... Bodies>
3450 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3451 : join_node(nodes.graph_reference(), bodies...) {
3452 make_edges_in_order(nodes, *this);
3453 }
3454#endif
3455
3457 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3458 this->input_ports(), static_cast< sender< output_type > *>(this) );
3459 }
3460
3461#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3462 void set_name( const char *name ) __TBB_override {
3464 }
3465#endif
3466
3467};
3468
3469// indexer node
3471
3472// TODO: Implement interface with variadic template or tuple
3473template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3474 typename T4=null_type, typename T5=null_type, typename T6=null_type,
3475 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3476
3477//indexer node specializations
3478template<typename T0>
3479class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3480private:
3481 static const int N = 1;
3482public:
3483 typedef tuple<T0> InputTuple;
3487 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3488 this->input_ports(), static_cast< sender< output_type > *>(this) );
3489 }
3490
3491#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3492 template <typename... Args>
3493 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3494 make_edges_in_order(nodes, *this);
3495 }
3496#endif
3497
3498 // Copy constructor
3500 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3501 this->input_ports(), static_cast< sender< output_type > *>(this) );
3502 }
3503
3504#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3505 void set_name( const char *name ) __TBB_override {
3507 }
3508#endif
3509};
3510
3511template<typename T0, typename T1>
3512class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3513private:
3514 static const int N = 2;
3515public:
3516 typedef tuple<T0, T1> InputTuple;
3520 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3521 this->input_ports(), static_cast< sender< output_type > *>(this) );
3522 }
3523
3524#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3525 template <typename... Args>
3526 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3527 make_edges_in_order(nodes, *this);
3528 }
3529#endif
3530
3531 // Copy constructor
3533 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3534 this->input_ports(), static_cast< sender< output_type > *>(this) );
3535 }
3536
3537#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3538 void set_name( const char *name ) __TBB_override {
3540 }
3541#endif
3542};
3543
3544template<typename T0, typename T1, typename T2>
3545class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3546private:
3547 static const int N = 3;
3548public:
3549 typedef tuple<T0, T1, T2> InputTuple;
3553 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3554 this->input_ports(), static_cast< sender< output_type > *>(this) );
3555 }
3556
3557#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3558 template <typename... Args>
3559 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3560 make_edges_in_order(nodes, *this);
3561 }
3562#endif
3563
3564 // Copy constructor
3566 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3567 this->input_ports(), static_cast< sender< output_type > *>(this) );
3568 }
3569
3570#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3571 void set_name( const char *name ) __TBB_override {
3573 }
3574#endif
3575};
3576
3577template<typename T0, typename T1, typename T2, typename T3>
3578class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3579private:
3580 static const int N = 4;
3581public:
3582 typedef tuple<T0, T1, T2, T3> InputTuple;
3586 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3587 this->input_ports(), static_cast< sender< output_type > *>(this) );
3588 }
3589
3590#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3591 template <typename... Args>
3592 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3593 make_edges_in_order(nodes, *this);
3594 }
3595#endif
3596
3597 // Copy constructor
3599 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3600 this->input_ports(), static_cast< sender< output_type > *>(this) );
3601 }
3602
3603#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3604 void set_name( const char *name ) __TBB_override {
3606 }
3607#endif
3608};
3609
3610template<typename T0, typename T1, typename T2, typename T3, typename T4>
3611class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3612private:
3613 static const int N = 5;
3614public:
3615 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3619 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3620 this->input_ports(), static_cast< sender< output_type > *>(this) );
3621 }
3622
3623#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3624 template <typename... Args>
3625 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3626 make_edges_in_order(nodes, *this);
3627 }
3628#endif
3629
3630 // Copy constructor
3632 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3633 this->input_ports(), static_cast< sender< output_type > *>(this) );
3634 }
3635
3636#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3637 void set_name( const char *name ) __TBB_override {
3639 }
3640#endif
3641};
3642
3643#if __TBB_VARIADIC_MAX >= 6
3644template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3645class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3646private:
3647 static const int N = 6;
3648public:
3649 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3653 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3654 this->input_ports(), static_cast< sender< output_type > *>(this) );
3655 }
3656
3657#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3658 template <typename... Args>
3659 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3660 make_edges_in_order(nodes, *this);
3661 }
3662#endif
3663
3664 // Copy constructor
3666 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3667 this->input_ports(), static_cast< sender< output_type > *>(this) );
3668 }
3669
3670#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3671 void set_name( const char *name ) __TBB_override {
3673 }
3674#endif
3675};
3676#endif //variadic max 6
3677
3678#if __TBB_VARIADIC_MAX >= 7
3679template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3680 typename T6>
3681class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3682private:
3683 static const int N = 7;
3684public:
3685 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3689 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3690 this->input_ports(), static_cast< sender< output_type > *>(this) );
3691 }
3692
3693#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3694 template <typename... Args>
3695 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3696 make_edges_in_order(nodes, *this);
3697 }
3698#endif
3699
3700 // Copy constructor
3702 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3703 this->input_ports(), static_cast< sender< output_type > *>(this) );
3704 }
3705
3706#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3707 void set_name( const char *name ) __TBB_override {
3709 }
3710#endif
3711};
3712#endif //variadic max 7
3713
3714#if __TBB_VARIADIC_MAX >= 8
3715template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3716 typename T6, typename T7>
3717class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3718private:
3719 static const int N = 8;
3720public:
3721 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3725 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3726 this->input_ports(), static_cast< sender< output_type > *>(this) );
3727 }
3728
3729#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3730 template <typename... Args>
3731 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3732 make_edges_in_order(nodes, *this);
3733 }
3734#endif
3735
3736 // Copy constructor
3737 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3738 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3739 this->input_ports(), static_cast< sender< output_type > *>(this) );
3740 }
3741
3742#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3743 void set_name( const char *name ) __TBB_override {
3745 }
3746#endif
3747};
3748#endif //variadic max 8
3749
3750#if __TBB_VARIADIC_MAX >= 9
3751template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3752 typename T6, typename T7, typename T8>
3753class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3754private:
3755 static const int N = 9;
3756public:
3757 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3761 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3762 this->input_ports(), static_cast< sender< output_type > *>(this) );
3763 }
3764
3765#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3766 template <typename... Args>
3767 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3768 make_edges_in_order(nodes, *this);
3769 }
3770#endif
3771
3772 // Copy constructor
3774 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3775 this->input_ports(), static_cast< sender< output_type > *>(this) );
3776 }
3777
3778#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3779 void set_name( const char *name ) __TBB_override {
3781 }
3782#endif
3783};
3784#endif //variadic max 9
3785
3786#if __TBB_VARIADIC_MAX >= 10
3787template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3788 typename T6, typename T7, typename T8, typename T9>
3789class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3790private:
3791 static const int N = 10;
3792public:
3793 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3797 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3798 this->input_ports(), static_cast< sender< output_type > *>(this) );
3799 }
3800
3801#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3802 template <typename... Args>
3803 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3804 make_edges_in_order(nodes, *this);
3805 }
3806#endif
3807
3808 // Copy constructor
3810 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3811 this->input_ports(), static_cast< sender< output_type > *>(this) );
3812 }
3813
3814#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3815 void set_name( const char *name ) __TBB_override {
3817 }
3818#endif
3819};
3820#endif //variadic max 10
3821
3822#if __TBB_PREVIEW_ASYNC_MSG
3824#else
3825template< typename T >
3826inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3827#endif
3828#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3829 s.internal_add_built_predecessor(p);
3830 p.internal_add_built_successor(s);
3831#endif
3832 p.register_successor( s );
3834}
3835
3837template< typename T >
3838inline void make_edge( sender<T> &p, receiver<T> &s ) {
3840}
3841
3842#if __TBB_PREVIEW_ASYNC_MSG
3843template< typename TS, typename TR,
3846inline void make_edge( TS &p, TR &s ) {
3848}
3849
3850template< typename T >
3853}
3854
3855template< typename T >
3858}
3859
3860#endif // __TBB_PREVIEW_ASYNC_MSG
3861
3862#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3863//Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3864template< typename T, typename V,
3865 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3866inline void make_edge( T& output, V& input) {
3867 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3868}
3869
3870//Makes an edge from port 0 of a multi-output predecessor to a receiver.
3871template< typename T, typename R,
3872 typename = typename T::output_ports_type >
3873inline void make_edge( T& output, receiver<R>& input) {
3874 make_edge(get<0>(output.output_ports()), input);
3875}
3876
3877//Makes an edge from a sender to port 0 of a multi-input successor.
3878template< typename S, typename V,
3879 typename = typename V::input_ports_type >
3880inline void make_edge( sender<S>& output, V& input) {
3881 make_edge(output, get<0>(input.input_ports()));
3882}
3883#endif
3884
3885#if __TBB_PREVIEW_ASYNC_MSG
3887#else
3888template< typename T >
3889inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3890#endif
3891 p.remove_successor( s );
3892#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3893 // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3894 p.internal_delete_built_successor(s);
3895 s.internal_delete_built_predecessor(p);
3896#endif
3898}
3899
3901template< typename T >
3904}
3905
3906#if __TBB_PREVIEW_ASYNC_MSG
3907template< typename TS, typename TR,
3910inline void remove_edge( TS &p, TR &s ) {
3912}
3913
3914template< typename T >
3917}
3918
3919template< typename T >
3922}
3923#endif // __TBB_PREVIEW_ASYNC_MSG
3924
3925#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3926//Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3927template< typename T, typename V,
3928 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3929inline void remove_edge( T& output, V& input) {
3930 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3931}
3932
3933//Removes an edge between port 0 of a multi-output predecessor and a receiver.
3934template< typename T, typename R,
3935 typename = typename T::output_ports_type >
3936inline void remove_edge( T& output, receiver<R>& input) {
3937 remove_edge(get<0>(output.output_ports()), input);
3938}
3939//Removes an edge between a sender and port 0 of a multi-input successor.
3940template< typename S, typename V,
3941 typename = typename V::input_ports_type >
3942inline void remove_edge( sender<S>& output, V& input) {
3943 remove_edge(output, get<0>(input.input_ports()));
3944}
3945#endif
3946
3947#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3948template<typename C >
3949template< typename S >
3950void internal::edge_container<C>::sender_extract( S &s ) {
3951 edge_list_type e = built_edges;
3952 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3953 remove_edge(s, **i);
3954 }
3955}
3956
3957template<typename C >
3958template< typename R >
3959void internal::edge_container<C>::receiver_extract( R &r ) {
3960 edge_list_type e = built_edges;
3961 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3962 remove_edge(**i, r);
3963 }
3964}
3965#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3966
3968template< typename Body, typename Node >
3969Body copy_body( Node &n ) {
3970 return n.template copy_function_object<Body>();
3971}
3972
3973#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3974
3975//composite_node
3976template< typename InputTuple, typename OutputTuple > class composite_node;
3977
3978template< typename... InputTypes, typename... OutputTypes>
3979class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3980
3981public:
3982 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3983 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3984
3985private:
3986 std::unique_ptr<input_ports_type> my_input_ports;
3987 std::unique_ptr<output_ports_type> my_output_ports;
3988
3989 static const size_t NUM_INPUTS = sizeof...(InputTypes);
3990 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3991
3992protected:
3994
3995public:
3996#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3997 composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3998 tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4000 }
4001#else
4003 tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4004 }
4005#endif
4006
4007 template<typename T1, typename T2>
4008 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4009 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4010 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4011 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4012 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4013
4016 }
4017
4018 template< typename... NodeTypes >
4019 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4020
4021 template< typename... NodeTypes >
4022 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4023
4024#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4025 void set_name( const char *name ) __TBB_override {
4027 }
4028#endif
4029
4031 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4032 return *my_input_ports;
4033 }
4034
4036 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4037 return *my_output_ports;
4038 }
4039
4040#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4041 void extract() __TBB_override {
4042 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4043 }
4044#endif
4045}; // class composite_node
4046
4047//composite_node with only input ports
4048template< typename... InputTypes>
4049class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4050public:
4051 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4052
4053private:
4054 std::unique_ptr<input_ports_type> my_input_ports;
4055 static const size_t NUM_INPUTS = sizeof...(InputTypes);
4056
4057protected:
4059
4060public:
4061#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4062 composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4065 }
4066#else
4069 }
4070#endif
4071
4072 template<typename T>
4073 void set_external_ports(T&& input_ports_tuple) {
4074 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4075
4076 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4077
4078 tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4079 }
4080
4081 template< typename... NodeTypes >
4082 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4083
4084 template< typename... NodeTypes >
4085 void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4086
4087#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4088 void set_name( const char *name ) __TBB_override {
4090 }
4091#endif
4092
4094 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4095 return *my_input_ports;
4096 }
4097
4098#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4099 void extract() __TBB_override {
4100 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4101 }
4102#endif
4103
4104}; // class composite_node
4105
4106//composite_nodes with only output_ports
4107template<typename... OutputTypes>
4108class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4109public:
4110 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4111
4112private:
4113 std::unique_ptr<output_ports_type> my_output_ports;
4114 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4115
4116protected:
4118
4119public:
4120#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4121 __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4124 }
4125#else
4128 }
4129#endif
4130
4131 template<typename T>
4132 void set_external_ports(T&& output_ports_tuple) {
4133 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4134
4135 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4136
4137 tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4138 }
4139
4140 template<typename... NodeTypes >
4141 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4142
4143 template<typename... NodeTypes >
4144 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4145
4146#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4147 void set_name( const char *name ) __TBB_override {
4149 }
4150#endif
4151
4153 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4154 return *my_output_ports;
4155 }
4156
4157#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4158 void extract() __TBB_override {
4159 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4160 }
4161#endif
4162
4163}; // class composite_node
4164
4165#endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4166
4167namespace internal {
4168
4169template<typename Gateway>
4171public:
4172 typedef Gateway gateway_type;
4173
4175 void set_gateway(gateway_type *gateway) {
4176 my_gateway = gateway;
4177 }
4178
4179protected:
4181};
4182
4183template<typename Input, typename Ports, typename Gateway, typename Body>
4184class async_body: public async_body_base<Gateway> {
4185public:
4187 typedef Gateway gateway_type;
4188
4189 async_body(const Body &body, gateway_type *gateway)
4190 : base_type(gateway), my_body(body) { }
4191
4192 void operator()( const Input &v, Ports & ) {
4193 my_body(v, *this->my_gateway);
4194 }
4195
4196 Body get_body() { return my_body; }
4197
4198private:
4200};
4201
4202} // namespace internal
4203
4204} // namespace interfaceX
4205namespace interface11 {
4206
4208template < typename Input, typename Output,
4209 typename Policy = queueing_lightweight,
4210 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4212 : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4213{
4214#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4217 "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4218 "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4219 );
4220#endif
4223
4224public:
4225 typedef Input input_type;
4226 typedef Output output_type;
4233
4234private:
4238 // TODO: pass value by copy since we do not want to block asynchronous thread.
4239 const Output *value;
4241 try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4242 void operator()() {
4243 result = port->try_put(*value);
4244 }
4245 };
4246
4248 public:
4249 receiver_gateway_impl(async_node* node): my_node(node) {}
4251 tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4252 my_node->my_graph.reserve_wait();
4253 }
4254
4256 my_node->my_graph.release_wait();
4257 tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4258 }
4259
4261 bool try_put(const Output &i) __TBB_override {
4262 return my_node->try_put_impl(i);
4263 }
4264
4265 private:
4267 } my_gateway;
4268
4269 //The substitute of 'this' for member construction, to prevent compiler warnings
4270 async_node* self() { return this; }
4271
4273 bool try_put_impl(const Output &i) {
4274 internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4275 internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4277 task_list tasks;
4278 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4279 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4280 "Return status is inconsistent with the method operation." );
4281
4282 while( !tasks.empty() ) {
4283 internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4284 }
4286 return is_at_least_one_put_successful;
4287 }
4288
4289public:
4290 template<typename Body>
4292 graph &g, size_t concurrency,
4295#else
4297#endif
4298 ) : base_type(
4299 g, concurrency,
4300 internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4301 (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4302 tbb::internal::fgt_multioutput_node_with_body<1>(
4303 CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4304 &this->my_graph, static_cast<receiver<input_type> *>(this),
4305 this->output_ports(), this->my_body
4306 );
4307 }
4308
4309#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4310 template <typename Body, typename... Args>
4311 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4312 : async_node(g, concurrency, body, Policy(), priority) {}
4313#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4314
4315#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4316 template <typename Body, typename... Args>
4318 const node_set<Args...>& nodes, size_t concurrency, Body body,
4320 ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4321 make_edges_in_order(nodes, *this);
4322 }
4323
4324#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4325 template <typename Body, typename... Args>
4326 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4327 : async_node(nodes, concurrency, body, Policy(), priority) {}
4328#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4329#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4330
4331 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4332 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4333 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4334
4335 tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4336 &this->my_graph, static_cast<receiver<input_type> *>(this),
4337 this->output_ports(), this->my_body );
4338 }
4339
4341 return my_gateway;
4342 }
4343
4344#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4345 void set_name( const char *name ) __TBB_override {
4347 }
4348#endif
4349
4350 // Define sender< Output >
4351
4354 return internal::output_port<0>(*this).register_successor(r);
4355 }
4356
4359 return internal::output_port<0>(*this).remove_successor(r);
4360 }
4361
4362 template<typename Body>
4366 mfn_body_type &body_ref = *this->my_body;
4367 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4368 return ab.get_body();
4369 }
4370
4371#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4373 typedef typename internal::edge_container<successor_type> built_successors_type;
4374 typedef typename built_successors_type::edge_list_type successor_list_type;
4375 built_successors_type &built_successors() __TBB_override {
4376 return internal::output_port<0>(*this).built_successors();
4377 }
4378
4379 void internal_add_built_successor( successor_type &r ) __TBB_override {
4380 internal::output_port<0>(*this).internal_add_built_successor(r);
4381 }
4382
4383 void internal_delete_built_successor( successor_type &r ) __TBB_override {
4384 internal::output_port<0>(*this).internal_delete_built_successor(r);
4385 }
4386
4387 void copy_successors( successor_list_type &l ) __TBB_override {
4388 internal::output_port<0>(*this).copy_successors(l);
4389 }
4390
4391 size_t successor_count() __TBB_override {
4392 return internal::output_port<0>(*this).successor_count();
4393 }
4394#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4395
4396protected:
4397
4399 base_type::reset_node(f);
4400 }
4401};
4402
4403#if __TBB_PREVIEW_STREAMING_NODE
4405#endif // __TBB_PREVIEW_STREAMING_NODE
4406
4408
4409template< typename T >
4410class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4411public:
4412 typedef T input_type;
4413 typedef T output_type;
4416#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4417 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4418 typedef typename sender<output_type>::built_successors_type built_successors_type;
4419 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4420 typedef typename sender<output_type>::successor_list_type successor_list_type;
4421#endif
4422
4423 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4424 my_successors.set_owner( this );
4425 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4426 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4427 }
4428
4429#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4430 template <typename... Args>
4431 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4432 make_edges_in_order(nodes, *this);
4433 }
4434#endif
4435
4438 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4439 {
4440 my_successors.set_owner( this );
4441 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4442 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4443 }
4444
4446
4447#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4448 void set_name( const char *name ) __TBB_override {
4450 }
4451#endif
4452
4454 spin_mutex::scoped_lock l( my_mutex );
4455 if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4456 // We have a valid value that must be forwarded immediately.
4457 bool ret = s.try_put( my_buffer );
4458 if ( ret ) {
4459 // We add the successor that accepted our put
4460 my_successors.register_successor( s );
4461 } else {
4462 // In case of reservation a race between the moment of reservation and register_successor can appear,
4463 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4464 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4465 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4466 task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4467 register_predecessor_task( *this, s );
4468 internal::spawn_in_graph_arena( my_graph, *rtask );
4469 }
4470 } else {
4471 // No valid value yet, just add as successor
4472 my_successors.register_successor( s );
4473 }
4474 return true;
4475 }
4476
4478 spin_mutex::scoped_lock l( my_mutex );
4479 my_successors.remove_successor(s);
4480 return true;
4481 }
4482
4483#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4484 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4485 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4486
4487 void internal_add_built_successor( successor_type &s) __TBB_override {
4488 spin_mutex::scoped_lock l( my_mutex );
4489 my_successors.internal_add_built_successor(s);
4490 }
4491
4492 void internal_delete_built_successor( successor_type &s) __TBB_override {
4493 spin_mutex::scoped_lock l( my_mutex );
4494 my_successors.internal_delete_built_successor(s);
4495 }
4496
4497 size_t successor_count() __TBB_override {
4498 spin_mutex::scoped_lock l( my_mutex );
4499 return my_successors.successor_count();
4500 }
4501
4502 void copy_successors(successor_list_type &v) __TBB_override {
4503 spin_mutex::scoped_lock l( my_mutex );
4504 my_successors.copy_successors(v);
4505 }
4506
4507 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4508 spin_mutex::scoped_lock l( my_mutex );
4509 my_built_predecessors.add_edge(p);
4510 }
4511
4512 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4513 spin_mutex::scoped_lock l( my_mutex );
4514 my_built_predecessors.delete_edge(p);
4515 }
4516
4517 size_t predecessor_count() __TBB_override {
4518 spin_mutex::scoped_lock l( my_mutex );
4519 return my_built_predecessors.edge_count();
4520 }
4521
4522 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4523 spin_mutex::scoped_lock l( my_mutex );
4524 my_built_predecessors.copy_edges(v);
4525 }
4526
4527 void extract() __TBB_override {
4528 my_buffer_is_valid = false;
4529 built_successors().sender_extract(*this);
4530 built_predecessors().receiver_extract(*this);
4531 }
4532
4533#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4534
4536 spin_mutex::scoped_lock l( my_mutex );
4537 if ( my_buffer_is_valid ) {
4538 v = my_buffer;
4539 return true;
4540 }
4541 return false;
4542 }
4543
4546 return try_get(v);
4547 }
4548
4550 bool try_release() __TBB_override { return true; }
4551
4553 bool try_consume() __TBB_override { return true; }
4554
4555 bool is_valid() {
4556 spin_mutex::scoped_lock l( my_mutex );
4557 return my_buffer_is_valid;
4558 }
4559
4560 void clear() {
4561 spin_mutex::scoped_lock l( my_mutex );
4562 my_buffer_is_valid = false;
4563 }
4564
4565protected:
4566
4567 template< typename R, typename B > friend class run_and_put_task;
4568 template<typename X, typename Y> friend class internal::broadcast_cache;
4569 template<typename X, typename Y> friend class internal::round_robin_cache;
4572 return try_put_task_impl(v);
4573 }
4574
4576 my_buffer = v;
4577 my_buffer_is_valid = true;
4578 task * rtask = my_successors.try_put_task(v);
4579 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4580 return rtask;
4581 }
4582
4584 return my_graph;
4585 }
4586
4589
4591 o(owner), s(succ) {};
4592
4594 if (!s.register_predecessor(o)) {
4595 o.register_successor(s);
4596 }
4597 return NULL;
4598 }
4599
4602 };
4603
4606#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4607 internal::edge_container<predecessor_type> my_built_predecessors;
4608#endif
4612
4614 my_buffer_is_valid = false;
4615 if (f&rf_clear_edges) {
4616 my_successors.clear();
4617 }
4618 }
4619}; // overwrite_node
4620
4621template< typename T >
4623public:
4624 typedef T input_type;
4625 typedef T output_type;
4629
4632 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4633 static_cast<receiver<input_type> *>(this),
4634 static_cast<sender<output_type> *>(this) );
4635 }
4636
4637#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4638 template <typename... Args>
4639 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4640 make_edges_in_order(nodes, *this);
4641 }
4642#endif
4643
4646 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4647 static_cast<receiver<input_type> *>(this),
4648 static_cast<sender<output_type> *>(this) );
4649 }
4650
4651#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4652 void set_name( const char *name ) __TBB_override {
4654 }
4655#endif
4656
4657protected:
4658 template< typename R, typename B > friend class run_and_put_task;
4659 template<typename X, typename Y> friend class internal::broadcast_cache;
4660 template<typename X, typename Y> friend class internal::round_robin_cache;
4662 spin_mutex::scoped_lock l( this->my_mutex );
4663 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4664 }
4665};
4666
4667} // interfaceX
4668
4673
4674 using interface11::graph;
4675 using interface11::graph_node;
4676 using interface11::continue_msg;
4677 using interface11::source_node;
4678 using interface11::input_node;
4679 using interface11::function_node;
4680 using interface11::multifunction_node;
4681 using interface11::split_node;
4683 using interface11::indexer_node;
4684 using interface11::internal::tagged_msg;
4687 using interface11::continue_node;
4688 using interface11::overwrite_node;
4689 using interface11::write_once_node;
4690 using interface11::broadcast_node;
4691 using interface11::buffer_node;
4692 using interface11::queue_node;
4693 using interface11::sequencer_node;
4694 using interface11::priority_queue_node;
4695 using interface11::limiter_node;
4696 using namespace interface11::internal::graph_policy_namespace;
4697 using interface11::join_node;
4703#if __TBB_FLOW_GRAPH_CPP11_FEATURES
4704 using interface11::composite_node;
4705#endif
4706 using interface11::async_node;
4707#if __TBB_PREVIEW_ASYNC_MSG
4709#endif
4710#if __TBB_PREVIEW_STREAMING_NODE
4713#endif // __TBB_PREVIEW_STREAMING_NODE
4714#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4717#endif
4718
4719#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4720 using interface11::internal::follows;
4721 using interface11::internal::precedes;
4722 using interface11::internal::make_node_set;
4723 using interface11::internal::make_edges;
4724#endif
4725
4726} // flow
4727} // tbb
4728
4729// Include deduction guides for node classes
4731
4732#undef __TBB_PFG_RESET_ARG
4733#undef __TBB_COMMA
4734#undef __TBB_DEFAULT_NODE_ALLOCATOR
4735
4737#undef __TBB_flow_graph_H_include_area
4738
4739#if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4740 #undef __TBB_NOINLINE_SYM
4741#endif
4742
4743#endif // __TBB_flow_graph_H
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
#define CODEPTR()
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
class __TBB_DEPRECATED streaming_node
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:47
#define __TBB_DEFAULT_NODE_ALLOCATOR(T)
Definition: flow_graph.h:71
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
STL namespace.
The graph class.
class __TBB_DEPRECATED_MSG("tbb::tbb_hash is deprecated, use std::hash") tbb_hash
static void fgt_async_try_put_end(void *, void *)
static void fgt_async_reserve(void *, void *)
static void fgt_multioutput_node_desc(const NodeType *, const char *)
static void fgt_async_try_put_begin(void *, void *)
static void fgt_begin_body(void *)
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
static void fgt_node(void *, string_index, void *, void *)
static void fgt_reserve_wait(void *)
static void fgt_graph(void *)
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
static void fgt_release_wait(void *)
static void fgt_async_commit(void *, void *)
static void fgt_composite(void *, void *, void *)
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
static void fgt_remove_edge(void *, void *)
static void fgt_end_body(void *)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
static void fgt_node_desc(const NodeType *, const char *)
static void fgt_make_edge(void *, void *)
static void fgt_graph_desc(void *, const char *)
K key_from_message(const T &t)
Definition: flow_graph.h:721
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:106
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3886
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3823
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3838
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
class __TBB_DEPRECATED async_msg
Definition: flow_graph.h:216
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3902
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3969
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void deactivate_graph(tbb::flow::interface10::graph &g)
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
bool is_graph_active(tbb::flow::interface10::graph &g)
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
void activate_graph(tbb::flow::interface10::graph &g)
void add_nodes_impl(CompositeType *, bool)
tbb::internal::uint64_t tag_value
tbb::flow::tuple_element< N, typenameJNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
const V & cast_to(T const &t)
tbb::flow::tuple_element< N, typenameMOP::output_ports_type >::type & output_port(MOP &op)
bool is_a(T const &t)
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
static const node_priority_t no_priority
unsigned int node_priority_t
static tbb::task *const SUCCESSFULLY_ENQUEUED
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:114
Forward declaration section.
Definition: flow_graph.h:425
__TBB_DEPRECATED typedef T output_type
The output type of this sender.
Definition: flow_graph.h:428
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:433
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:439
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:436
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:430
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:449
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:462
__TBB_DEPRECATED typedef T input_type
The input type of this receiver.
Definition: flow_graph.h:467
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:469
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:481
bool try_put(const typename internal::async_helpers< T >::async_type &t)
Definition: flow_graph.h:476
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:472
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:2985
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3163
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:3146
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:3233
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:3110
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:3210
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:3005
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:3129
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3262
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:3222
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:3260
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:3101
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:3003
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2989
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3266
task * decrement_counter(long long delta)
Definition: flow_graph.h:3076
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2990
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:229
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:237
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:245
virtual bool try_get_wrapper(void *p, bool is_async)=0
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:329
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:350
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:311
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:326
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:344
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:389
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:377
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:392
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:370
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
Base class for receivers of completion messages.
Definition: flow_graph.h:599
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:672
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:603
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:624
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:698
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:609
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:713
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:617
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:606
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:634
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:904
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1039
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:972
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1145
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1110
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:963
__TBB_NOINLINE_SYM input_node(const input_node &src)
Copy constructor.
Definition: flow_graph.h:942
internal::input_body< output_type > * my_init_body
Definition: flow_graph.h:1103
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1022
internal::input_body< output_type > * my_body
Definition: flow_graph.h:1102
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1061
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1104
void reset_node(reset_flags f) __TBB_override
resets the input_node to its initial state
Definition: flow_graph.h:1086
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:910
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:907
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1049
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1004
__TBB_NOINLINE_SYM input_node(graph &g, Body body)
Constructor for a node with a successor.
Definition: flow_graph.h:922
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1153
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:1189
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1333
__TBB_NOINLINE_SYM source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:1227
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:1257
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1398
__TBB_NOINLINE_SYM source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:1207
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1345
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:1248
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:1192
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1370
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1438
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1391
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1390
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:1195
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1392
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1323
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1289
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1306
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1463
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1557
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1484
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1482
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1481
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1483
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1468
__TBB_NOINLINE_SYM function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1530
internal::function_input< input_type, output_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1480
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1496
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1559
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1592
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1675
__TBB_NOINLINE_SYM multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1654
__TBB_NOINLINE_SYM multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1618
internal::multifunction_input< input_type, output_ports_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1612
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1613
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1610
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1596
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1681
output_ports_type & output_ports()
Definition: flow_graph.h:1737
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1740
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1708
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1706
receiver< TupleType > base_type
Definition: flow_graph.h:1683
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1752
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1751
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1745
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1723
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1782
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1894
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1786
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1787
__TBB_NOINLINE_SYM continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1868
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1793
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1896
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1788
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1789
__TBB_NOINLINE_SYM continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1830
Forwards messages of type T to all successors.
Definition: flow_graph.h:1905
__TBB_NOINLINE_SYM broadcast_node(graph &g)
Definition: flow_graph.h:1923
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1916
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2029
__TBB_NOINLINE_SYM broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1937
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2025
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1910
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2031
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1909
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:2019
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1952
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1958
Forwards messages in arbitrary order.
Definition: flow_graph.h:2051
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2506
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2487
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2061
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:2323
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:2359
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2282
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2060
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:2134
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2497
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:2219
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:2077
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2377
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2476
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:2347
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:2329
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:2338
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2538
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:2127
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:2125
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2542
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
buffer_node< T, Allocator > class_type
Definition: flow_graph.h:2062
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2553
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2399
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2186
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2519
cache_aligned_allocator< T > internals_allocator
Definition: flow_graph.h:2055
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:2299
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:2129
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:2294
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2182
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:2213
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2461
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:2352
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:2196
Forwards messages in FIFO order.
Definition: flow_graph.h:2568
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2648
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2634
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2589
base_type::size_type size_type
Definition: flow_graph.h:2578
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2604
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2622
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2577
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2630
void internal_forward_task(queue_operation *op) __TBB_override
Definition: flow_graph.h:2600
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2661
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2613
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2579
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2631
Forwards messages in sequence order.
Definition: flow_graph.h:2668
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2720
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2682
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2724
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2687
buffer_node< T, Allocator >::buffer_operation sequencer_operation
Definition: flow_graph.h:2721
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2683
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2669
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2703
Forwards messages in priority order.
Definition: flow_graph.h:2749
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2816
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2873
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2808
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2822
buffer_node< T, Allocator >::item_type item_type
Definition: flow_graph.h:2804
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
buffer_node< T, Allocator >::buffer_operation prio_operation
Definition: flow_graph.h:2805
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2803
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2760
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2762
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2812
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2763
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2782
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2836
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2848
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2766
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2854
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2798
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3310
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:3294
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3343
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:3327
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:3363
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3387
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:3382
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3414
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3432
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3441
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3397
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3405
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3392
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3423
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3796
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3793
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3795
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3794
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3809
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3499
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3484
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3486
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3485
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3518
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3517
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3519
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3532
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3550
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3551
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3565
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3584
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3598
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3583
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3631
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3616
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3617
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3665
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3651
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3650
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3701
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3686
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3687
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3723
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3722
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3773
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3759
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3758
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:4189
async_body_base< Gateway > base_type
Definition: flow_graph.h:4186
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:4192
Implements async node.
Definition: flow_graph.h:4213
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:4228
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4273
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:4221
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:4222
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4358
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:4232
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:4231
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:4331
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4229
receiver< input_type > receiver_type
Definition: flow_graph.h:4227
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4353
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4398
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:4291
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:4230
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:4236
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:4241
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:4250
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:4255
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4261
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4535
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4605
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4570
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4414
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4453
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:4437
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4550
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4423
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4611
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4583
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4575
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4415
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4545
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4553
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4613
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4477
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:4588
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4590
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4593
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4628
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4661
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4645
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4627
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4631
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
field of type K being used for matching.
virtual input_body * clone()=0
The leaf for input_body.
virtual source_body * clone()=0
The leaf for source_body.
function_body that takes an Input and a set of output ports
leaf for multifunction. OutputSet can be a std::tuple or a vector.
A task that calls a node's forward_task function.
A task that calls a node's apply_body_bypass function with no input.
An abstract cache of successors.
A cache of successors that are broadcast to.
A cache of successors that are put in a round-robin fashion.
Base class for tasks generated by graph nodes.
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:820
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:874
tbb::task_group_context * my_context
tbb::flow::interface11::graph_node * my_nodes
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:843
iterator end()
end iterator
Definition: flow_graph.h:868
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:774
const_iterator cend() const
end const iterator
Definition: flow_graph.h:876
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:831
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:806
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
void prepare_task_arena(bool reinit=false)
tbb::flow::interface11::graph_node * my_nodes_last
__TBB_DEPRECATED tbb::task * root_task()
Returns the root task of the graph.
iterator begin()
start iterator
Definition: flow_graph.h:866
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:813
~graph()
Destroys the graph.
Definition: flow_graph.h:798
The base of all graph nodes.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
void grow_my_array(size_t minimum_size)
Grows the internal array.
item_buffer with reservable front-end. NOTE: if reserving, do not
The two-phase join port.
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Implements methods for a function node that takes a type Input as input and sends.
void reset_function_input(reset_flags f)
Implements methods for a function node that takes a type Input as input.
static task * emit_this(graph &g, const T &t, P &p)
Implements methods for an executable node that takes continue_msg as input.
void reset_receiver(reset_flags f) __TBB_override
Implements methods for both executable and function nodes that puts Output to its successors.
broadcast_cache_type & successors()
broadcast_cache_type my_successors
sender< output_type >::successor_type successor_type
bool try_put(const output_type &i)
Enables one or the other code branches.
Detects whether two given types are the same.
input_filter control to signal end-of-input for parallel_pipeline
Definition: pipeline.h:316
A lock that occupies a single byte.
Definition: spin_mutex.h:39
friend class scoped_lock
Definition: spin_mutex.h:179
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Used to form groups of tasks.
Definition: task.h:358
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
Base class for user-defined tasks.
Definition: task.h:615
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:788
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:663
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:771
task that does nothing. Useful for synchronization.
Definition: task.h:1042
A list of children.
Definition: task.h:1074
task & pop_front()
Pop the front task from the list.
Definition: task.h:1109
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1088
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Class for determining type of std::allocator<T>::value_type.
Definition: tbb_stddef.h:471

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.