MeVisLab Toolbox Reference
ObservableConnection.h
Go to the documentation of this file.
1 // Copyright (c) Fraunhofer MEVIS, Germany. All rights reserved.
2 // **InsertLicense** code
3 
4 #pragma once
5 
6 #include <asio/Connection.h>
7 #include <asio/Observable.h>
8 #include <asio/Processable.h>
9 #include <asio/ExecutionPolicy.h>
12 
13 #include <ThirdPartyWarningsDisable.h>
14 #include <memory>
15 #include <utility>
16 #include <tuple>
17 #include <array>
18 #include <cstddef>
19 #include <ThirdPartyWarningsRestore.h>
20 
21 
22 namespace std14
23 {
24  template<typename T, T... Ints>
26  {
27  using value_type = T;
28  static constexpr std::size_t size() { return sizeof...(Ints); }
29  };
30 
31  template<std::size_t... Ints>
32  using index_sequence = integer_sequence<std::size_t, Ints...>;
33 
34  template<typename T, std::size_t N, T... Is>
35  struct make_integer_sequence : make_integer_sequence<T, N - 1, N - 1, Is...> {};
36 
37  template<typename T, T... Is>
38  struct make_integer_sequence<T, 0, Is...> : integer_sequence<T, Is...> {};
39 
40  template<std::size_t N>
42 
43  template<typename... T>
44  using index_sequence_for = make_index_sequence<sizeof...(T)>;
45 }
46 
47 
48 namespace detail {
49 
50  template <typename Fnc, typename... Args, std::size_t... Indices>
51  void apply_impl(Fnc &&fnc, const std::tuple<Args...> &tuple, std14::index_sequence<Indices...>)
52  {
53  std::forward<Fnc>(fnc)(std::get<Indices>(tuple)...);
54  }
55 
56  template <typename Fnc, typename... Args>
57  void apply(Fnc &&fnc, const std::tuple<Args...> &tuple)
58  {
59  apply_impl(std::forward<Fnc>(fnc), tuple, std14::index_sequence_for<Args...>());
60  }
61 
62 }
63 
64 
65 namespace asio {
66 
67 
82  template<typename... Args>
84  {
85  public:
86 
87  std::shared_ptr<Observable<Args...>> observable;
90  };
91 
92 
141  template<typename Result, typename... Args>
143  {
144  class State final
145  {
146  public:
147  State(
148  std::shared_ptr<Observable<Result>> target_,
149  std::shared_ptr<Processable<Result, Args...>> processable_,
150  std::unique_ptr<ExecutionPolicy> policy_,
151  bool muteUntilTargetSignalEmitted_)
152  : target(target_)
153  , processable(processable_)
154  , policy(std::move(policy_))
155  , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
156  , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
157  , isCurrentlyMuted(false)
158  {
159  validities.fill(false);
160  }
161 
162  std::tuple<Args...> values;
163  std::array<bool, sizeof...(Args)> validities;
164  std::array<bool, sizeof...(Args)> invalidations;
165  std::shared_ptr<Observable<Result>> target;
166  std::shared_ptr<Processable<Result, Args...>> processable;
167  std::unique_ptr<ExecutionPolicy> policy;
168  bool isSynchronousExecutionPolicy;
169  bool muteUntilTargetSignalEmitted;
170  bool isCurrentlyMuted;
171  };
172 
173  public:
174 
175  ObservableConnection() = default;
176 
179  std::shared_ptr<Observable<Result>> target,
180  std::shared_ptr<Processable<Result, Args...>> processable,
181  std::unique_ptr<ExecutionPolicy> policy,
182  bool muteUntilTargetSignalEmitted)
183  {
184  // We need a state object as we can't use the this pointer
185  // as capture inside the connection lambda down below due to
186  // vtable issues (because we're inside a constructor).
187  // An alternative would be to move the connection into a
188  // public method, but this would uglify the API - the connection
189  // should be well established once the object is constructed.
190  auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
191  observe(state, inputs...);
192  }
193 
194  void suspend() override
195  {
196  for (std::size_t i = 0; i < blocks.size(); ++i) {
197  blocks[i] = SharedSignalConnectionBlock(connections[i]);
198  }
199  }
200 
201  void resume() override
202  {
203  for (std::size_t i = 0; i < blocks.size(); ++i) {
204  blocks[i] = SharedSignalConnectionBlock();
205  }
206  }
207 
208  private:
209 
210  template<typename HeadType, typename... TailTypes>
211  void observe(std::shared_ptr<State> state, const ObservableConnectionInput<HeadType>& head, const ObservableConnectionInput<TailTypes>&... tail)
212  {
213  do_observe<HeadType, sizeof...(Args) - (sizeof...(TailTypes) + 1)>(state, head);
214  observe(state, tail...);
215  }
216 
217  template<typename LastType>
218  void observe(std::shared_ptr<State> state, const ObservableConnectionInput<LastType>& last)
219  {
220  do_observe<LastType, sizeof...(Args) - 1>(state, last);
221  }
222 
223  template<typename Type, std::size_t Position>
224  void do_observe(std::shared_ptr<State> state, const ObservableConnectionInput<Type>& input)
225  {
226  state->invalidations[Position] = input.shouldBeInvalidatedAfterProcessing;
227  auto shouldTrigger = input.shouldTriggerProcessing;
228  auto handle = guard.handle();
229  connections[Position] = input.observable->signal.connect([handle, state, shouldTrigger](const Type& value) {
230  auto lock = handle.lock();
231  if (lock) {
232  std::get<Position>(state->values) = value;
233  state->validities[Position] = true;
234  if (shouldTrigger && !state->isCurrentlyMuted) {
235  if (std::all_of(state->validities.begin(), state->validities.end(), [](bool v) { return v; })) {
236  if (state->isSynchronousExecutionPolicy) {
237  detail::apply([&state](const Args&... args) {
238  state->target->signal(state->processable->process(args...));
239  }, state->values);
240  }
241  else {
242  if (state->muteUntilTargetSignalEmitted) {
243  state->isCurrentlyMuted = true;
244  }
245  auto shouldUnmute = state->muteUntilTargetSignalEmitted;
246 
247  // For non-synchronous execution policies we need to capture
248  // the values as they are at this very timepoint in the execution
249  // lambda, because following input changes will overwrite them in
250  // the shared state. We use a reference here and by-value capture to
251  // avoid multiple copies.
252  auto &values = state->values;
253  state->policy->execute([handle, values, state, shouldUnmute] {
254  detail::apply([&state](const Args&... args) {
255  state->target->signal(state->processable->process(args...));
256  }, values);
257  if (shouldUnmute) {
258  auto innerLock = handle.lock();
259  if (innerLock) {
260  state->isCurrentlyMuted = false;
261  }
262  }
263  });
264  }
265  for (std::size_t i = 0; i < state->invalidations.size(); ++i) {
266  if (state->invalidations[i]) {
267  state->validities[i] = false;
268  }
269  }
270  }
271  }
272  }
273  });
274  }
275 
276  std::array<ScopedSignalConnection, sizeof...(Args)> connections;
277  std::array<SharedSignalConnectionBlock, sizeof...(Args)> blocks;
278 
280  };
281 
282 
283  // Specialization for parameterless processing with a trigger, i.e., asio::Observable<>:
284  template<typename Result>
285  class ObservableConnection<Result> : public Connection
286  {
287  class State final
288  {
289  public:
290  State(
291  std::shared_ptr<Observable<Result>> target_,
292  std::shared_ptr<Processable<Result>> processable_,
293  std::unique_ptr<ExecutionPolicy> policy_,
294  bool muteUntilTargetSignalEmitted_)
295  : target(target_)
296  , processable(processable_)
297  , policy(std::move(policy_))
298  , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
299  , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
300  , isCurrentlyMuted(false)
301  {
302  }
303 
304  std::shared_ptr<Observable<Result>> target;
305  std::shared_ptr<Processable<Result>> processable;
306  std::unique_ptr<ExecutionPolicy> policy;
307  bool isSynchronousExecutionPolicy;
308  bool muteUntilTargetSignalEmitted;
309  bool isCurrentlyMuted;
310  };
311 
312  public:
313 
314  ObservableConnection() = default;
315 
318  std::shared_ptr<Observable<Result>> target,
319  std::shared_ptr<Processable<Result>> processable,
320  std::unique_ptr<ExecutionPolicy> policy,
321  bool muteUntilTargetSignalEmitted)
322  {
323  auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
324  auto handle = guard.handle();
325  connection = input.observable->signal.connect([handle, state] {
326  auto lock = handle.lock();
327  if (lock) {
328  if (!state->isCurrentlyMuted) {
329  if (state->isSynchronousExecutionPolicy) {
330  state->target->signal(state->processable->process());
331  }
332  else {
333  if (state->muteUntilTargetSignalEmitted) {
334  state->isCurrentlyMuted = true;
335  }
336  auto shouldUnmute = state->muteUntilTargetSignalEmitted;
337  state->policy->execute([handle, state, shouldUnmute]{
338  state->target->signal(state->processable->process());
339  if (shouldUnmute) {
340  auto innerLock = handle.lock();
341  if (innerLock) {
342  state->isCurrentlyMuted = false;
343  }
344  }
345  });
346  }
347  }
348  }
349  });
350  }
351 
352  void suspend() override
353  {
354  block = SharedSignalConnectionBlock(connection);
355  }
356 
357  void resume() override
358  {
359  block = SharedSignalConnectionBlock();
360  }
361 
362  private:
363 
364  ScopedSignalConnection connection;
366 
368  };
369 
370  // Specialization for connecting two triggers with a void processable without parameters
371  template<>
372  class ObservableConnection<void> : public Connection
373  {
374  class State final
375  {
376  public:
377  State(
378  std::shared_ptr<Observable<>> target_,
379  std::shared_ptr<Processable<void>> processable_,
380  std::unique_ptr<ExecutionPolicy> policy_,
381  bool muteUntilTargetSignalEmitted_)
382  : target(target_)
383  , processable(processable_)
384  , policy(std::move(policy_))
385  , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
386  , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
387  , isCurrentlyMuted(false)
388  {
389  }
390 
391  std::shared_ptr<Observable<>> target;
392  std::shared_ptr<Processable<void>> processable;
393  std::unique_ptr<ExecutionPolicy> policy;
394  bool isSynchronousExecutionPolicy;
395  bool muteUntilTargetSignalEmitted;
396  bool isCurrentlyMuted;
397  };
398 
399  public:
400 
401  ObservableConnection() = default;
402 
405  std::shared_ptr<Observable<>> target,
406  std::shared_ptr<Processable<void>> processable,
407  std::unique_ptr<ExecutionPolicy> policy,
408  bool muteUntilTargetSignalEmitted)
409  {
410  auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
411  auto handle = guard.handle();
412  connection = input.observable->signal.connect([state, handle] {
413  auto lock = handle.lock();
414  if (lock) {
415  if (!state->isCurrentlyMuted) {
416  if (state->isSynchronousExecutionPolicy) {
417  state->processable->process();
418  state->target->signal();
419  }
420  else {
421  if (state->muteUntilTargetSignalEmitted) {
422  state->isCurrentlyMuted = true;
423  }
424  auto shouldUnmute = state->muteUntilTargetSignalEmitted;
425  state->policy->execute([state, handle, shouldUnmute] {
426  state->processable->process();
427  state->target->signal();
428  if (shouldUnmute) {
429  auto innerLock = handle.lock();
430  if (innerLock) {
431  state->isCurrentlyMuted = false;
432  }
433  }
434  });
435  }
436  }
437  }
438  });
439  }
440 
441  void suspend() override
442  {
443  block = SharedSignalConnectionBlock(connection);
444  }
445 
446  void resume() override
447  {
448  block = SharedSignalConnectionBlock();
449  }
450 
451  private:
452 
453  ScopedSignalConnection connection;
455 
457  };
458 
459 
460  // Specialization for void types
461  template<typename... Args>
462  class ObservableConnection<void, Args...> : public Connection
463  {
464  class State final
465  {
466  public:
467  State(
468  std::shared_ptr<Observable<>> target_,
469  std::shared_ptr<Processable<void, Args...>> processable_,
470  std::unique_ptr<ExecutionPolicy> policy_,
471  bool muteUntilTargetSignalEmitted_)
472  : target(target_)
473  , processable(processable_)
474  , policy(std::move(policy_))
475  , isSynchronousExecutionPolicy(dynamic_cast<asio::SynchronousExecution*>(policy.get()) != nullptr)
476  , muteUntilTargetSignalEmitted(muteUntilTargetSignalEmitted_)
477  , isCurrentlyMuted(false)
478  {
479  validities.fill(false);
480  }
481 
482  std::tuple<Args...> values;
483  std::array<bool, sizeof...(Args)> validities;
484  std::array<bool, sizeof...(Args)> invalidations;
485  std::shared_ptr<Observable<>> target;
486  std::shared_ptr<Processable<void, Args...>> processable;
487  std::unique_ptr<ExecutionPolicy> policy;
488  bool isSynchronousExecutionPolicy;
489  bool muteUntilTargetSignalEmitted;
490  bool isCurrentlyMuted;
491  };
492 
493  public:
494 
495  ObservableConnection() = default;
496 
499  std::shared_ptr<Observable<>> target,
500  std::shared_ptr<Processable<void, Args...>> processable,
501  std::unique_ptr<ExecutionPolicy> policy,
502  bool muteUntilTargetSignalEmitted)
503  {
504  auto state = std::make_shared<State>(target, processable, std::move(policy), muteUntilTargetSignalEmitted);
505  observe(state, inputs...);
506  }
507 
508  void suspend() override
509  {
510  for (std::size_t i = 0; i < blocks.size(); ++i) {
511  blocks[i] = SharedSignalConnectionBlock(connections[i]);
512  }
513  }
514 
515  void resume() override
516  {
517  for (std::size_t i = 0; i < blocks.size(); ++i) {
518  blocks[i] = SharedSignalConnectionBlock();
519  }
520  }
521 
522  private:
523 
524  template<typename HeadType, typename... TailTypes>
525  void observe(std::shared_ptr<State> state, const ObservableConnectionInput<HeadType>& head, const ObservableConnectionInput<TailTypes>&... tail)
526  {
527  do_observe<HeadType, sizeof...(Args) - (sizeof...(TailTypes) + 1)>(state, head);
528  observe(state, tail...);
529  }
530 
531  template<typename LastType>
532  void observe(std::shared_ptr<State> state, const ObservableConnectionInput<LastType>& last)
533  {
534  do_observe<LastType, sizeof...(Args) - 1>(state, last);
535  }
536 
537  template<typename Type, std::size_t Position>
538  void do_observe(std::shared_ptr<State> state, const ObservableConnectionInput<Type>& input)
539  {
540  state->invalidations[Position] = input.shouldBeInvalidatedAfterProcessing;
541  auto shouldTrigger = input.shouldTriggerProcessing;
542  auto handle = guard.handle();
543  connections[Position] = input.observable->signal.connect([state, handle, shouldTrigger](const Type& value) {
544  auto lock = handle.lock();
545  if (lock) {
546  std::get<Position>(state->values) = value;
547  state->validities[Position] = true;
548  if (shouldTrigger && !state->isCurrentlyMuted) {
549  if (std::all_of(state->validities.begin(), state->validities.end(), [](bool v) { return v; })) {
550  if (state->isSynchronousExecutionPolicy) {
551  detail::apply([&state](const Args&... args) {
552  state->processable->process(args...);
553  state->target->signal();
554  }, state->values);
555  }
556  else {
557  if (state->muteUntilTargetSignalEmitted) {
558  state->isCurrentlyMuted = true;
559  }
560  auto shouldUnmute = state->muteUntilTargetSignalEmitted;
561  auto& values = state->values;
562  state->policy->execute([state, values, handle, shouldUnmute] {
563  detail::apply([&state](const Args&... args) {
564  state->processable->process(args...);
565  state->target->signal();
566  }, values);
567  if (shouldUnmute) {
568  auto innerLock = handle.lock();
569  if (innerLock) {
570  state->isCurrentlyMuted = false;
571  }
572  }
573  });
574  }
575  for (std::size_t i = 0; i < state->invalidations.size(); ++i) {
576  if (state->invalidations[i]) {
577  state->validities[i] = false;
578  }
579  }
580  }
581  }
582  }
583  });
584  }
585 
586  std::array<ScopedSignalConnection, sizeof...(Args)> connections;
587  std::array<SharedSignalConnectionBlock, sizeof...(Args)> blocks;
588 
590  };
591 
592 }
@ T
Definition: SoKeyGrabber.h:71
@ N
Definition: SoKeyGrabber.h:65
Abstract base class for a connection between two I/O processors.
Definition: Connection.h:16
Templated input configuration class for the asio::ObservableConnection.
std::shared_ptr< Observable< Args... > > observable
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable< Result >> target, std::shared_ptr< Processable< Result >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable<>> target, std::shared_ptr< Processable< void, Args... >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
ObservableConnection(ObservableConnectionInput<> input, std::shared_ptr< Observable<>> target, std::shared_ptr< Processable< void >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
Templated implementation of the asio::Connection interface for multi-input asynchronous pipelines usi...
ObservableConnection(ObservableConnectionInput< Args >... inputs, std::shared_ptr< Observable< Result >> target, std::shared_ptr< Processable< Result, Args... >> processable, std::unique_ptr< ExecutionPolicy > policy, bool muteUntilTargetSignalEmitted)
A guard that protects resources of a given class from being detroyed if another thread still works wi...
An synchronous execution policy.
boost::signals2::scoped_connection ScopedSignalConnection
Definition: Signal.h:25
boost::signals2::shared_connection_block SharedSignalConnectionBlock
Definition: Signal.h:27
boost::graph_traits< ml_graph_ptr >::vertex_descriptor target(graph_traits< ml_graph_ptr >::edge_descriptor e, const ml_graph_ptr)
Returns the vertex descriptor for v of the edge (u,v) represented by e.
void apply(Fnc &&fnc, const std::tuple< Args... > &tuple)
void apply_impl(Fnc &&fnc, const std::tuple< Args... > &tuple, std14::index_sequence< Indices... >)
static constexpr std::size_t size()