13#include <FMEThirdPartyWarningsDisable.h>
19#include <FMEThirdPartyWarningsRestore.h>
24 template<
typename T,
T...
Ints>
28 static constexpr std::size_t
size() {
return sizeof...(Ints); }
31 template<std::size_t...
Ints>
34 template<
typename T, std::size_t
N,
T... Is>
37 template<
typename T,
T... Is>
40 template<std::
size_t N>
43 template<
typename...
T>
53 std::forward<Fnc>(
fnc)(std::get<Indices>(
tuple)...);
56 template <
typename Fnc,
typename...
Args>
82 template<
typename...
Args>
141 template<
typename Result,
typename...
Args>
150 std::unique_ptr<ExecutionPolicy>
policy_,
157 , isCurrentlyMuted(
false)
159 validities.fill(
false);
162 std::tuple<
Args...> values;
163 std::array<
bool,
sizeof...(Args)> validities;
164 std::array<
bool,
sizeof...(Args)> invalidations;
165 std::shared_ptr<Observable<Result>> target;
167 std::unique_ptr<ExecutionPolicy> policy;
168 bool isSynchronousExecutionPolicy;
169 bool muteUntilTargetSignalEmitted;
170 bool isCurrentlyMuted;
181 std::unique_ptr<ExecutionPolicy> policy,
182 bool muteUntilTargetSignalEmitted)
190 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
191 observe(state,
inputs...);
196 for (std::size_t
i = 0;
i < blocks.size(); ++
i) {
203 for (std::size_t
i = 0;
i < blocks.size(); ++
i) {
214 observe(state,
tail...);
217 template<
typename LastType>
220 do_observe<
LastType,
sizeof...(Args) - 1>(state, last);
223 template<
typename Type, std::
size_t Position>
226 state->invalidations[
Position] =
input.shouldBeInvalidatedAfterProcessing;
228 auto handle = guard.
handle();
230 auto lock = handle.lock();
232 std::get<Position>(state->values) = value;
235 if (std::all_of(state->validities.begin(), state->validities.end(), [](
bool v) { return v; })) {
236 if (state->isSynchronousExecutionPolicy) {
238 state->target->signal(state->processable->process(
args...));
242 if (state->muteUntilTargetSignalEmitted) {
243 state->isCurrentlyMuted =
true;
245 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
252 auto &values = state->values;
253 state->policy->execute([handle, values, state,
shouldUnmute] {
255 state->target->signal(state->processable->process(
args...));
260 state->isCurrentlyMuted =
false;
265 for (std::size_t
i = 0;
i < state->invalidations.size(); ++
i) {
266 if (state->invalidations[
i]) {
267 state->validities[
i] =
false;
284 template<
typename Result>
293 std::unique_ptr<ExecutionPolicy>
policy_,
300 , isCurrentlyMuted(
false)
304 std::shared_ptr<Observable<Result>> target;
305 std::shared_ptr<Processable<Result>> processable;
306 std::unique_ptr<ExecutionPolicy> policy;
307 bool isSynchronousExecutionPolicy;
308 bool muteUntilTargetSignalEmitted;
309 bool isCurrentlyMuted;
320 std::unique_ptr<ExecutionPolicy> policy,
321 bool muteUntilTargetSignalEmitted)
323 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
324 auto handle = guard.
handle();
325 connection =
input.observable->signal.connect([handle, state] {
326 auto lock = handle.lock();
328 if (!state->isCurrentlyMuted) {
329 if (state->isSynchronousExecutionPolicy) {
330 state->target->signal(state->processable->process());
333 if (state->muteUntilTargetSignalEmitted) {
334 state->isCurrentlyMuted = true;
336 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
338 state->target->signal(state->processable->process());
342 state->isCurrentlyMuted =
false;
380 std::unique_ptr<ExecutionPolicy>
policy_,
387 , isCurrentlyMuted(
false)
391 std::shared_ptr<Observable<>> target;
392 std::shared_ptr<Processable<void>> processable;
393 std::unique_ptr<ExecutionPolicy> policy;
394 bool isSynchronousExecutionPolicy;
395 bool muteUntilTargetSignalEmitted;
396 bool isCurrentlyMuted;
407 std::unique_ptr<ExecutionPolicy> policy,
408 bool muteUntilTargetSignalEmitted)
410 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
411 auto handle = guard.handle();
412 connection =
input.observable->signal.connect([state, handle] {
413 auto lock = handle.lock();
415 if (!state->isCurrentlyMuted) {
416 if (state->isSynchronousExecutionPolicy) {
417 state->processable->process();
418 state->target->signal();
421 if (state->muteUntilTargetSignalEmitted) {
422 state->isCurrentlyMuted = true;
424 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
426 state->processable->process();
427 state->target->signal();
431 state->isCurrentlyMuted =
false;
461 template<
typename...
Args>
470 std::unique_ptr<ExecutionPolicy>
policy_,
477 , isCurrentlyMuted(
false)
479 validities.fill(
false);
482 std::tuple<
Args...> values;
483 std::array<
bool,
sizeof...(Args)> validities;
484 std::array<
bool,
sizeof...(Args)> invalidations;
485 std::shared_ptr<Observable<>> target;
487 std::unique_ptr<ExecutionPolicy> policy;
488 bool isSynchronousExecutionPolicy;
489 bool muteUntilTargetSignalEmitted;
490 bool isCurrentlyMuted;
501 std::unique_ptr<ExecutionPolicy> policy,
502 bool muteUntilTargetSignalEmitted)
504 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
505 observe(state,
inputs...);
510 for (std::size_t
i = 0;
i < blocks.size(); ++
i) {
517 for (std::size_t
i = 0;
i < blocks.size(); ++
i) {
528 observe(state,
tail...);
531 template<
typename LastType>
534 do_observe<
LastType,
sizeof...(Args) - 1>(state, last);
537 template<
typename Type, std::
size_t Position>
540 state->invalidations[
Position] =
input.shouldBeInvalidatedAfterProcessing;
542 auto handle = guard.handle();
544 auto lock = handle.lock();
546 std::get<Position>(state->values) = value;
549 if (std::all_of(state->validities.begin(), state->validities.end(), [](
bool v) { return v; })) {
550 if (state->isSynchronousExecutionPolicy) {
552 state->processable->process(
args...);
553 state->target->signal();
557 if (state->muteUntilTargetSignalEmitted) {
558 state->isCurrentlyMuted =
true;
560 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
561 auto& values = state->values;
562 state->policy->execute([state, values, handle,
shouldUnmute] {
564 state->processable->process(
args...);
565 state->target->signal();
570 state->isCurrentlyMuted =
false;
575 for (std::size_t
i = 0;
i < state->invalidations.size(); ++
i) {
576 if (state->invalidations[
i]) {
577 state->validities[
i] =
false;
Abstract base class for a connection between two I/O processors.
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable< Result > > target, std::shared_ptr< Processable< Result > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable<> > target, std::shared_ptr< Processable< void, Args... > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable<> > target, std::shared_ptr< Processable< void > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection()=default
Templated implementation of the asio::Connection interface for multi-input asynchronous pipelines usi...
ObservableConnection()=default
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable< Result > > target, std::shared_ptr< Processable< Result, Args... > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
A guard that protects resources of a given class from being detroyed if another thread still works wi...
WeakHandle handle() const
An synchronous execution policy.
Target mlrange_cast(Source arg)
Generic version of checked ML casts.
boost::signals2::shared_connection_block SharedSignalConnectionBlock
boost::signals2::scoped_connection ScopedSignalConnection
void apply(Fnc &&fnc, const std::tuple< Args... > &tuple)
void apply_impl(Fnc &&fnc, const std::tuple< Args... > &tuple, std14::index_sequence< Indices... >)
static constexpr std::size_t size()