MeVisLab Toolbox Reference
ObservableConnection.h
Go to the documentation of this file.
1// Copyright (c) Fraunhofer MEVIS, Germany. All rights reserved.
2// **InsertLicense** code
3
4#pragma once
5
6#include <asio/Connection.h>
7#include <asio/Observable.h>
8#include <asio/Processable.h>
12
13#include <FMEThirdPartyWarningsDisable.h>
14#include <memory>
15#include <utility>
16#include <tuple>
17#include <array>
18#include <cstddef>
19#include <FMEThirdPartyWarningsRestore.h>
20
21
22namespace std14
23{
24 template<typename T, T... Ints>
26 {
27 using value_type = T;
28 static constexpr std::size_t size() { return sizeof...(Ints); }
29 };
30
31 template<std::size_t... Ints>
32 using index_sequence = integer_sequence<std::size_t, Ints...>;
33
34 template<typename T, std::size_t N, T... Is>
35 struct make_integer_sequence : make_integer_sequence<T, N - 1, N - 1, Is...> {};
36
37 template<typename T, T... Is>
38 struct make_integer_sequence<T, 0, Is...> : integer_sequence<T, Is...> {};
39
40 template<std::size_t N>
42
43 template<typename... T>
45}
46
47
48namespace detail {
49
50 template <typename Fnc, typename... Args, std::size_t... Indices>
51 void apply_impl(Fnc &&fnc, const std::tuple<Args...> &tuple, std14::index_sequence<Indices...>)
52 {
53 std::forward<Fnc>(fnc)(std::get<Indices>(tuple)...);
54 }
55
56 template <typename Fnc, typename... Args>
57 void apply(Fnc &&fnc, const std::tuple<Args...> &tuple)
58 {
60 }
61
62}
63
64
65namespace asio {
66
67
82 template<typename... Args>
91
92
141 template<typename Result, typename... Args>
143 {
144 class State final
145 {
146 public:
147 State(
148 std::shared_ptr<Observable<Result>> target_,
150 std::unique_ptr<ExecutionPolicy> policy_,
152 : target(target_)
153 , processable(processable_)
154 , policy(std::move(policy_))
155 , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
156 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
157 , isCurrentlyMuted(false)
158 {
159 validities.fill(false);
160 }
161
162 std::tuple<Args...> values;
163 std::array<bool, sizeof...(Args)> validities;
164 std::array<bool, sizeof...(Args)> invalidations;
165 std::shared_ptr<Observable<Result>> target;
166 std::shared_ptr<Processable<Result, Args...>> processable;
167 std::unique_ptr<ExecutionPolicy> policy;
168 bool isSynchronousExecutionPolicy;
169 bool muteUntilTargetSignalEmitted;
170 bool isCurrentlyMuted;
171 };
172
173 public:
174
176
179 std::shared_ptr<Observable<Result>> target,
180 std::shared_ptr<Processable<Result, Args...>> processable,
181 std::unique_ptr<ExecutionPolicy> policy,
182 bool muteUntilTargetSignalEmitted)
183 {
184 // We need a state object as we can't use the this pointer
185 // as capture inside the connection lambda down below due to
186 // vtable issues (because we're inside a constructor).
187 // An alternative would be to move the connection into a
188 // public method, but this would uglify the API - the connection
189 // should be well established once the object is constructed.
190 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
191 observe(state, inputs...);
192 }
193
194 void suspend() override
195 {
196 for (std::size_t i = 0; i < blocks.size(); ++i) {
197 blocks[i] = SharedSignalConnectionBlock(connections[i]);
198 }
199 }
200
201 void resume() override
202 {
203 for (std::size_t i = 0; i < blocks.size(); ++i) {
204 blocks[i] = SharedSignalConnectionBlock();
205 }
206 }
207
208 private:
209
210 template<typename HeadType, typename... TailTypes>
211 void observe(std::shared_ptr<State> state, const ObservableConnectionInput<HeadType>& head, const ObservableConnectionInput<TailTypes>&... tail)
212 {
213 do_observe<HeadType, sizeof...(Args) - (sizeof...(TailTypes) + 1)>(state, head);
214 observe(state, tail...);
215 }
216
217 template<typename LastType>
218 void observe(std::shared_ptr<State> state, const ObservableConnectionInput<LastType>& last)
219 {
220 do_observe<LastType, sizeof...(Args) - 1>(state, last);
221 }
222
223 template<typename Type, std::size_t Position>
224 void do_observe(std::shared_ptr<State> state, const ObservableConnectionInput<Type>& input)
225 {
226 state->invalidations[Position] = input.shouldBeInvalidatedAfterProcessing;
227 auto shouldTrigger = input.shouldTriggerProcessing;
228 auto handle = guard.handle();
229 connections[Position] = input.observable->signal.connect([handle, state, shouldTrigger](const Type& value) {
230 auto lock = handle.lock();
231 if (lock) {
232 std::get<Position>(state->values) = value;
233 state->validities[Position] = true;
234 if (shouldTrigger && !state->isCurrentlyMuted) {
235 if (std::all_of(state->validities.begin(), state->validities.end(), [](bool v) { return v; })) {
236 if (state->isSynchronousExecutionPolicy) {
237 detail::apply([&state](const Args&... args) {
238 state->target->signal(state->processable->process(args...));
239 }, state->values);
240 }
241 else {
242 if (state->muteUntilTargetSignalEmitted) {
243 state->isCurrentlyMuted = true;
244 }
245 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
246
247 // For non-synchronous execution policies we need to capture
248 // the values as they are at this very timepoint in the execution
249 // lambda, because following input changes will overwrite them in
250 // the shared state. We use a reference here and by-value capture to
251 // avoid multiple copies.
252 auto &values = state->values;
253 state->policy->execute([handle, values, state, shouldUnmute] {
254 detail::apply([&state](const Args&... args) {
255 state->target->signal(state->processable->process(args...));
256 }, values);
257 if (shouldUnmute) {
258 auto innerLock = handle.lock();
259 if (innerLock) {
260 state->isCurrentlyMuted = false;
261 }
262 }
263 });
264 }
265 for (std::size_t i = 0; i < state->invalidations.size(); ++i) {
266 if (state->invalidations[i]) {
267 state->validities[i] = false;
268 }
269 }
270 }
271 }
272 }
273 });
274 }
275
276 std::array<ScopedSignalConnection, sizeof...(Args)> connections;
277 std::array<SharedSignalConnectionBlock, sizeof...(Args)> blocks;
278
280 };
281
282
283 // Specialization for parameterless processing with a trigger, i.e., asio::Observable<>:
284 template<typename Result>
285 class ObservableConnection<Result> : public Connection
286 {
287 class State final
288 {
289 public:
290 State(
291 std::shared_ptr<Observable<Result>> target_,
292 std::shared_ptr<Processable<Result>> processable_,
293 std::unique_ptr<ExecutionPolicy> policy_,
295 : target(target_)
296 , processable(processable_)
297 , policy(std::move(policy_))
298 , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
299 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
300 , isCurrentlyMuted(false)
301 {
302 }
303
304 std::shared_ptr<Observable<Result>> target;
305 std::shared_ptr<Processable<Result>> processable;
306 std::unique_ptr<ExecutionPolicy> policy;
307 bool isSynchronousExecutionPolicy;
308 bool muteUntilTargetSignalEmitted;
309 bool isCurrentlyMuted;
310 };
311
312 public:
313
315
318 std::shared_ptr<Observable<Result>> target,
319 std::shared_ptr<Processable<Result>> processable,
320 std::unique_ptr<ExecutionPolicy> policy,
321 bool muteUntilTargetSignalEmitted)
322 {
323 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
324 auto handle = guard.handle();
325 connection = input.observable->signal.connect([handle, state] {
326 auto lock = handle.lock();
327 if (lock) {
328 if (!state->isCurrentlyMuted) {
329 if (state->isSynchronousExecutionPolicy) {
330 state->target->signal(state->processable->process());
331 }
332 else {
333 if (state->muteUntilTargetSignalEmitted) {
334 state->isCurrentlyMuted = true;
335 }
336 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
337 state->policy->execute([handle, state, shouldUnmute]{
338 state->target->signal(state->processable->process());
339 if (shouldUnmute) {
340 auto innerLock = handle.lock();
341 if (innerLock) {
342 state->isCurrentlyMuted = false;
343 }
344 }
345 });
346 }
347 }
348 }
349 });
350 }
351
352 void suspend() override
353 {
354 block = SharedSignalConnectionBlock(connection);
355 }
356
357 void resume() override
358 {
360 }
361
362 private:
363
364 ScopedSignalConnection connection;
366
368 };
369
370 // Specialization for connecting two triggers with a void processable without parameters
371 template<>
373 {
374 class State final
375 {
376 public:
377 State(
378 std::shared_ptr<Observable<>> target_,
379 std::shared_ptr<Processable<void>> processable_,
380 std::unique_ptr<ExecutionPolicy> policy_,
382 : target(target_)
383 , processable(processable_)
384 , policy(std::move(policy_))
385 , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
386 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
387 , isCurrentlyMuted(false)
388 {
389 }
390
391 std::shared_ptr<Observable<>> target;
392 std::shared_ptr<Processable<void>> processable;
393 std::unique_ptr<ExecutionPolicy> policy;
394 bool isSynchronousExecutionPolicy;
395 bool muteUntilTargetSignalEmitted;
396 bool isCurrentlyMuted;
397 };
398
399 public:
400
402
405 std::shared_ptr<Observable<>> target,
406 std::shared_ptr<Processable<void>> processable,
407 std::unique_ptr<ExecutionPolicy> policy,
408 bool muteUntilTargetSignalEmitted)
409 {
410 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
411 auto handle = guard.handle();
412 connection = input.observable->signal.connect([state, handle] {
413 auto lock = handle.lock();
414 if (lock) {
415 if (!state->isCurrentlyMuted) {
416 if (state->isSynchronousExecutionPolicy) {
417 state->processable->process();
418 state->target->signal();
419 }
420 else {
421 if (state->muteUntilTargetSignalEmitted) {
422 state->isCurrentlyMuted = true;
423 }
424 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
425 state->policy->execute([state, handle, shouldUnmute] {
426 state->processable->process();
427 state->target->signal();
428 if (shouldUnmute) {
429 auto innerLock = handle.lock();
430 if (innerLock) {
431 state->isCurrentlyMuted = false;
432 }
433 }
434 });
435 }
436 }
437 }
438 });
439 }
440
441 void suspend() override
442 {
443 block = SharedSignalConnectionBlock(connection);
444 }
445
446 void resume() override
447 {
449 }
450
451 private:
452
453 ScopedSignalConnection connection;
455
457 };
458
459
460 // Specialization for void types
461 template<typename... Args>
463 {
464 class State final
465 {
466 public:
467 State(
468 std::shared_ptr<Observable<>> target_,
470 std::unique_ptr<ExecutionPolicy> policy_,
472 : target(target_)
473 , processable(processable_)
474 , policy(std::move(policy_))
475 , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
476 , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
477 , isCurrentlyMuted(false)
478 {
479 validities.fill(false);
480 }
481
482 std::tuple<Args...> values;
483 std::array<bool, sizeof...(Args)> validities;
484 std::array<bool, sizeof...(Args)> invalidations;
485 std::shared_ptr<Observable<>> target;
486 std::shared_ptr<Processable<void, Args...>> processable;
487 std::unique_ptr<ExecutionPolicy> policy;
488 bool isSynchronousExecutionPolicy;
489 bool muteUntilTargetSignalEmitted;
490 bool isCurrentlyMuted;
491 };
492
493 public:
494
496
499 std::shared_ptr<Observable<>> target,
500 std::shared_ptr<Processable<void, Args...>> processable,
501 std::unique_ptr<ExecutionPolicy> policy,
502 bool muteUntilTargetSignalEmitted)
503 {
504 auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
505 observe(state, inputs...);
506 }
507
508 void suspend() override
509 {
510 for (std::size_t i = 0; i < blocks.size(); ++i) {
511 blocks[i] = SharedSignalConnectionBlock(connections[i]);
512 }
513 }
514
515 void resume() override
516 {
517 for (std::size_t i = 0; i < blocks.size(); ++i) {
518 blocks[i] = SharedSignalConnectionBlock();
519 }
520 }
521
522 private:
523
524 template<typename HeadType, typename... TailTypes>
525 void observe(std::shared_ptr<State> state, const ObservableConnectionInput<HeadType>& head, const ObservableConnectionInput<TailTypes>&... tail)
526 {
527 do_observe<HeadType, sizeof...(Args) - (sizeof...(TailTypes) + 1)>(state, head);
528 observe(state, tail...);
529 }
530
531 template<typename LastType>
532 void observe(std::shared_ptr<State> state, const ObservableConnectionInput<LastType>& last)
533 {
534 do_observe<LastType, sizeof...(Args) - 1>(state, last);
535 }
536
537 template<typename Type, std::size_t Position>
538 void do_observe(std::shared_ptr<State> state, const ObservableConnectionInput<Type>& input)
539 {
540 state->invalidations[Position] = input.shouldBeInvalidatedAfterProcessing;
541 auto shouldTrigger = input.shouldTriggerProcessing;
542 auto handle = guard.handle();
543 connections[Position] = input.observable->signal.connect([state, handle, shouldTrigger](const Type& value) {
544 auto lock = handle.lock();
545 if (lock) {
546 std::get<Position>(state->values) = value;
547 state->validities[Position] = true;
548 if (shouldTrigger && !state->isCurrentlyMuted) {
549 if (std::all_of(state->validities.begin(), state->validities.end(), [](bool v) { return v; })) {
550 if (state->isSynchronousExecutionPolicy) {
551 detail::apply([&state](const Args&... args) {
552 state->processable->process(args...);
553 state->target->signal();
554 }, state->values);
555 }
556 else {
557 if (state->muteUntilTargetSignalEmitted) {
558 state->isCurrentlyMuted = true;
559 }
560 auto shouldUnmute = state->muteUntilTargetSignalEmitted;
561 auto& values = state->values;
562 state->policy->execute([state, values, handle, shouldUnmute] {
563 detail::apply([&state](const Args&... args) {
564 state->processable->process(args...);
565 state->target->signal();
566 }, values);
567 if (shouldUnmute) {
568 auto innerLock = handle.lock();
569 if (innerLock) {
570 state->isCurrentlyMuted = false;
571 }
572 }
573 });
574 }
575 for (std::size_t i = 0; i < state->invalidations.size(); ++i) {
576 if (state->invalidations[i]) {
577 state->validities[i] = false;
578 }
579 }
580 }
581 }
582 }
583 });
584 }
585
586 std::array<ScopedSignalConnection, sizeof...(Args)> connections;
587 std::array<SharedSignalConnectionBlock, sizeof...(Args)> blocks;
588
590 };
591
592}
@ T
@ N
Abstract base class for a connection between two I/O processors.
Definition Connection.h:16
Templated input configuration class for the asio::ObservableConnection.
std::shared_ptr< Observable< Args... > > observable
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable< Result > > target, std::shared_ptr< Processable< Result > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable<> > target, std::shared_ptr< Processable< void, Args... > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable<> > target, std::shared_ptr< Processable< void > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
Templated implementation of the asio::Connection interface for multi-input asynchronous pipelines usi...
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable< Result > > target, std::shared_ptr< Processable< Result, Args... > > processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
A guard that protects resources of a given class from being detroyed if another thread still works wi...
An synchronous execution policy.
Target mlrange_cast(Source arg)
Generic version of checked ML casts.
boost::signals2::shared_connection_block SharedSignalConnectionBlock
Definition Signal.h:27
boost::signals2::scoped_connection ScopedSignalConnection
Definition Signal.h:25
void apply(Fnc &&fnc, const std::tuple< Args... > &tuple)
void apply_impl(Fnc &&fnc, const std::tuple< Args... > &tuple, std14::index_sequence< Indices... >)
static constexpr std::size_t size()