13 #include <FMEThirdPartyWarningsDisable.h>
19 #include <FMEThirdPartyWarningsRestore.h>
24 template<
typename T,
T... Ints>
28 static constexpr std::size_t
size() {
return sizeof...(Ints); }
31 template<std::size_t... Ints>
34 template<
typename T, std::size_t
N,
T... Is>
37 template<
typename T,
T... Is>
40 template<std::
size_t N>
43 template<
typename...
T>
50 template <
typename Fnc,
typename... Args, std::size_t... Indices>
53 std::forward<Fnc>(fnc)(std::get<Indices>(tuple)...);
56 template <
typename Fnc,
typename... Args>
57 void apply(Fnc &&fnc,
const std::tuple<Args...> &tuple)
82 template<
typename... Args>
141 template<
typename Result,
typename... Args>
150 std::unique_ptr<ExecutionPolicy> policy_,
151 bool muteUntilTargetSignalEmitted_)
153 , processable(processable_)
154 , policy(std::move(policy_))
156 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
157 , isCurrentlyMuted(
false)
159 validities.fill(
false);
162 std::tuple<Args...> values;
163 std::array<bool,
sizeof...(Args)> validities;
164 std::array<bool,
sizeof...(Args)> invalidations;
165 std::shared_ptr<Observable<Result>>
target;
166 std::shared_ptr<
Processable<Result, Args...>> processable;
167 std::unique_ptr<ExecutionPolicy> policy;
168 bool isSynchronousExecutionPolicy;
169 bool muteUntilTargetSignalEmitted;
170 bool isCurrentlyMuted;
181 std::unique_ptr<ExecutionPolicy> policy,
182 bool muteUntilTargetSignalEmitted)
190 auto state = std::make_shared<State>(
target, processable, std::move(policy), muteUntilTargetSignalEmitted);
191 observe(state, inputs...);
196 for (std::size_t i = 0; i < blocks.size(); ++i) {
203 for (std::size_t i = 0; i < blocks.size(); ++i) {
210 template<
typename HeadType,
typename... TailTypes>
213 do_observe<HeadType,
sizeof...(Args) - (
sizeof...(TailTypes) + 1)>(state, head);
214 observe(state, tail...);
217 template<
typename LastType>
218 void observe(std::shared_ptr<State> state,
const ObservableConnectionInput<LastType>& last)
220 do_observe<LastType,
sizeof...(Args) - 1>(state, last);
223 template<
typename Type, std::
size_t Position>
224 void do_observe(std::shared_ptr<State> state,
const ObservableConnectionInput<Type>& input)
226 state->invalidations[
Position] = input.shouldBeInvalidatedAfterProcessing;
227 auto shouldTrigger = input.shouldTriggerProcessing;
228 auto handle = guard.
handle();
229 connections[
Position] = input.observable->signal.connect([handle, state, shouldTrigger](
const Type& value) {
230 auto lock = handle.lock();
232 std::get<Position>(state->values) = value;
234 if (shouldTrigger && !state->isCurrentlyMuted) {
235 if (std::all_of(state->validities.begin(), state->validities.end(), [](
bool v) { return v; })) {
236 if (state->isSynchronousExecutionPolicy) {
238 state->target->signal(state->processable->process(args...));
242 if (state->muteUntilTargetSignalEmitted) {
243 state->isCurrentlyMuted =
true;
245 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
252 auto &values = state->values;
253 state->policy->execute([handle, values, state, shouldUnmute] {
255 state->target->signal(state->processable->process(args...));
258 auto innerLock = handle.lock();
260 state->isCurrentlyMuted =
false;
265 for (std::size_t i = 0; i < state->invalidations.size(); ++i) {
266 if (state->invalidations[i]) {
267 state->validities[i] =
false;
284 template<
typename Result>
293 std::unique_ptr<ExecutionPolicy> policy_,
294 bool muteUntilTargetSignalEmitted_)
296 , processable(processable_)
297 , policy(std::move(policy_))
299 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
300 , isCurrentlyMuted(
false)
304 std::shared_ptr<Observable<Result>>
target;
305 std::shared_ptr<Processable<Result>> processable;
306 std::unique_ptr<ExecutionPolicy> policy;
307 bool isSynchronousExecutionPolicy;
308 bool muteUntilTargetSignalEmitted;
309 bool isCurrentlyMuted;
320 std::unique_ptr<ExecutionPolicy> policy,
321 bool muteUntilTargetSignalEmitted)
323 auto state = std::make_shared<State>(
target, processable, std::move(policy), muteUntilTargetSignalEmitted);
324 auto handle = guard.
handle();
325 connection = input.
observable->signal.connect([handle, state] {
326 auto lock = handle.lock();
328 if (!state->isCurrentlyMuted) {
329 if (state->isSynchronousExecutionPolicy) {
330 state->target->signal(state->processable->process());
333 if (state->muteUntilTargetSignalEmitted) {
334 state->isCurrentlyMuted = true;
336 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
337 state->policy->execute([handle, state, shouldUnmute]{
338 state->target->signal(state->processable->process());
340 auto innerLock = handle.lock();
342 state->isCurrentlyMuted = false;
380 std::unique_ptr<ExecutionPolicy> policy_,
381 bool muteUntilTargetSignalEmitted_)
383 , processable(processable_)
384 , policy(std::move(policy_))
386 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
387 , isCurrentlyMuted(
false)
391 std::shared_ptr<Observable<>>
target;
392 std::shared_ptr<Processable<void>> processable;
393 std::unique_ptr<ExecutionPolicy> policy;
394 bool isSynchronousExecutionPolicy;
395 bool muteUntilTargetSignalEmitted;
396 bool isCurrentlyMuted;
407 std::unique_ptr<ExecutionPolicy> policy,
408 bool muteUntilTargetSignalEmitted)
410 auto state = std::make_shared<State>(
target, processable, std::move(policy), muteUntilTargetSignalEmitted);
411 auto handle = guard.handle();
412 connection = input.
observable->signal.connect([state, handle] {
413 auto lock = handle.lock();
415 if (!state->isCurrentlyMuted) {
416 if (state->isSynchronousExecutionPolicy) {
417 state->processable->process();
418 state->target->signal();
421 if (state->muteUntilTargetSignalEmitted) {
422 state->isCurrentlyMuted = true;
424 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
425 state->policy->execute([state, handle, shouldUnmute] {
426 state->processable->process();
427 state->target->signal();
429 auto innerLock = handle.lock();
431 state->isCurrentlyMuted = false;
461 template<
typename... Args>
470 std::unique_ptr<ExecutionPolicy> policy_,
471 bool muteUntilTargetSignalEmitted_)
473 , processable(processable_)
474 , policy(std::move(policy_))
476 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
477 , isCurrentlyMuted(
false)
479 validities.fill(
false);
482 std::tuple<Args...> values;
483 std::array<bool,
sizeof...(Args)> validities;
484 std::array<bool,
sizeof...(Args)> invalidations;
485 std::shared_ptr<Observable<>>
target;
486 std::shared_ptr<
Processable<void, Args...>> processable;
487 std::unique_ptr<ExecutionPolicy> policy;
488 bool isSynchronousExecutionPolicy;
489 bool muteUntilTargetSignalEmitted;
490 bool isCurrentlyMuted;
501 std::unique_ptr<ExecutionPolicy> policy,
502 bool muteUntilTargetSignalEmitted)
504 auto state = std::make_shared<State>(
target, processable, std::move(policy), muteUntilTargetSignalEmitted);
505 observe(state, inputs...);
510 for (std::size_t i = 0; i < blocks.size(); ++i) {
517 for (std::size_t i = 0; i < blocks.size(); ++i) {
524 template<
typename HeadType,
typename... TailTypes>
527 do_observe<HeadType,
sizeof...(Args) - (
sizeof...(TailTypes) + 1)>(state, head);
528 observe(state, tail...);
531 template<
typename LastType>
532 void observe(std::shared_ptr<State> state,
const ObservableConnectionInput<LastType>& last)
534 do_observe<LastType,
sizeof...(Args) - 1>(state, last);
537 template<
typename Type, std::
size_t Position>
538 void do_observe(std::shared_ptr<State> state,
const ObservableConnectionInput<Type>& input)
540 state->invalidations[
Position] = input.shouldBeInvalidatedAfterProcessing;
541 auto shouldTrigger = input.shouldTriggerProcessing;
542 auto handle = guard.handle();
543 connections[
Position] = input.observable->signal.connect([state, handle, shouldTrigger](
const Type& value) {
544 auto lock = handle.lock();
546 std::get<Position>(state->values) = value;
548 if (shouldTrigger && !state->isCurrentlyMuted) {
549 if (std::all_of(state->validities.begin(), state->validities.end(), [](
bool v) { return v; })) {
550 if (state->isSynchronousExecutionPolicy) {
552 state->processable->process(args...);
553 state->target->signal();
557 if (state->muteUntilTargetSignalEmitted) {
558 state->isCurrentlyMuted =
true;
560 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
561 auto& values = state->values;
562 state->policy->execute([state, values, handle, shouldUnmute] {
564 state->processable->process(args...);
565 state->target->signal();
568 auto innerLock = handle.lock();
570 state->isCurrentlyMuted =
false;
575 for (std::size_t i = 0; i < state->invalidations.size(); ++i) {
576 if (state->invalidations[i]) {
577 state->validities[i] =
false;
Abstract base class for a connection between two I/O processors.
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable< Result >> target, std::shared_ptr< Processable< Result >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable<>> target, std::shared_ptr< Processable< void, Args... >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable<>> target, std::shared_ptr< Processable< void >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
Templated implementation of the asio::Connection interface for multi-input asynchronous pipelines usi...
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable< Result >> target, std::shared_ptr< Processable< Result, Args... >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
A guard that protects resources of a given class from being detroyed if another thread still works wi...
WeakHandle handle() const
An synchronous execution policy.
boost::signals2::scoped_connection ScopedSignalConnection
boost::signals2::shared_connection_block SharedSignalConnectionBlock
boost::graph_traits< ml_graph_ptr >::vertex_descriptor target(graph_traits< ml_graph_ptr >::edge_descriptor e, const ml_graph_ptr)
Returns the vertex descriptor for v of the edge (u,v) represented by e.
void apply(Fnc &&fnc, const std::tuple< Args... > &tuple)
void apply_impl(Fnc &&fnc, const std::tuple< Args... > &tuple, std14::index_sequence< Indices... >)
static constexpr std::size_t size()