MeVisLab Toolbox Reference
mlMultiThreadedPageRequestProcessor.h
Go to the documentation of this file.
1 /*************************************************************************************
2 **
3 ** Copyright 2009, MeVis Medical Solutions AG
4 **
5 ** The user may use this file in accordance with the license agreement provided with
6 ** the Software or, alternatively, in accordance with the terms contained in a
7 ** written agreement between the user and MeVis Medical Solutions AG.
8 **
9 ** For further information use the contact form at https://www.mevislab.de/contact
10 **
11 **************************************************************************************/
12 
13 #ifndef ML_MULTI_THREADED_PAGE_REQUEST_PROCESSOR_H
14 #define ML_MULTI_THREADED_PAGE_REQUEST_PROCESSOR_H
15 
16 #include "mlInitSystemML.h"
17 
19 #include "mlAtomicCounter.h"
20 #include "mlProcessingTimeLine.h"
21 
22 #include "ThirdPartyWarningsDisable.h"
23 #include <boost/thread.hpp>
24 #include "ThirdPartyWarningsRestore.h"
25 #include <mlMutex.h>
26 #include <mlWaitCondition.h>
27 
29 #define ML_HOST_START_SINGLETHREADED_TIMEOUT 0.0005
31 #define ML_HOST_PAGEREQUEST_QUEUE_LIMIT 2
33 #define ML_HOST_PAGEREQUEST_PRODUCER_BATCH_SIZE 2
35 #define ML_HOST_START_SINGLETHREADED_PAGEREQUEST_LIMIT 10
37 #define ML_HOST_MINIMAL_GUITHREAD_SLEEP_MS 10
38 
39 ML_START_NAMESPACE
40 
41 //-------------------------------------------------------------------------------------------
44 //-------------------------------------------------------------------------------------------
46 {
47 public:
50 
52  bool supportsMultiThreading() override { return true; }
53 
55  void setNumWorkerThreads(std::size_t threads) override;
56 
58  void addRootTileRequest(TileRequest* tileRequest) override;
59 
61  void processAll() override;
62 
64  void process(double timeBudget) override;
65 
68 
70  bool needsProcessing() override;
71 
73  void append(PageRequest* request) override;
74 
76  void enterProcessingScope() override;
78  void leaveProcessingScope() override;
79 
81  bool processingWasSingleThreaded() const override { return _processingWasSingleThreaded; }
82 
84  void setAllowLateMultiThreadingStart(bool flag) { _allowLateMultiThreadingStart = flag; }
85 
86 private:
87  struct WorkerThreadState;
88 
90  bool processNextGUIRequest(int millisecondsToWait);
91 
93  void processNextWorkerRequest(WorkerThreadState* state, PerThreadStorage& perThreadStorage);
94 
96  MLErrorCode processRequest(PageRequest* request, bool isGUIThread, PerThreadStorage& perThreadStorage);
97 
99  static void rootTileRequestFinishedCB(void* data, TileRequest* request);
100 
102  void rootTileRequestFinished(TileRequest* request);
103 
105  void runPageRequestProducerLoop();
106 
108  void addNewCursors();
109 
111  void runWorkerThreadLoop(WorkerThreadState* state);
112 
114  void notifyAllThreads();
115 
117  void launchWorkerThreads();
118 
120  int getCurrentNumberOfPageRequestsInQueues();
121 
123  void wakeupProducerIfWaitingForQueue();
124 
126  void handleErrorsInGUIThread() override;
127 
129  void stopProcessingOnAllThreads();
130 
132  void moveRequestsFromGUIQueuesToOtherQueues();
133 
135  void getAllPageRequestQueues(std::vector< std::deque<PageRequest*>* >& queues) override;
136 
138  void startCreatingWorkSingleThreaded();
139 
141  bool _shutdown;
142 
144  bool _producerWorking;
146  int _workerThreadsWorking;
149  int _maxPageRequestsInQueues;
152  bool _producerWaitingForQueue;
153 
155  bool _processingAllowed;
156 
157  bool _useMultiThreading;
158  bool _processingWasSingleThreaded;
159  bool _allowLateMultiThreadingStart;
160 
162  WaitCondition _stoppedProcessing;
163 
165  WaitCondition _guiThreadWakeup;
166  WaitCondition _producerWakeup;
167  WaitCondition _workerWakeup;
168  WaitCondition _ioThreadWakeup;
169 
171  Mutex _mutex;
172 
174  std::vector<PageRequestCursor*> _newCursors;
175 
177  std::deque<PageRequest*> _parallelWorkQueue;
178  std::deque<PageRequest*> _ioWorkQueue;
179 
181  struct WorkerThreadState
182  {
183  WorkerThreadState();
184  ~WorkerThreadState();
185 
187  boost::thread* _thread;
188 
190  bool _working;
192  int _workerThreadId;
194  ProcessingTimeLine::TimeLine* _timeLine;
195 
197  WaitCondition* _wakeup;
199  std::deque<PageRequest*>* _queue;
200 
202  bool _shutdown;
203 
205  TimeCounter _profilingTimer;
206  };
207 
209  std::size_t _numDesiredWorkerThreads;
210 
212  std::vector<WorkerThreadState*> _workerThreads;
213 
215  WorkerThreadState* _ioThread;
216 
218  boost::thread* _producerThread;
219 
221  ProcessingTimeLine::TimeLine* _producerThreadTimeLine;
222 
224  struct ProfilingInfo {
225  ProfilingInfo() {
226  count = 0;
227  elapsedTime = 0;
228  }
229 
231  int count;
232 
234  double elapsedTime;
235  };
236 
238  std::map<PagedImage*, ProfilingInfo> _profilingData;
239 
241  void addThreadTimesToMLProfiling();
242 
243 };
244 
245 ML_END_NAMESPACE
246 
247 #endif
248 
The Host is the central image processing class in the ML.
Definition: mlHost.h:115
A multi-threaded processor that takes one or even multiple tile requests and can process them iterati...
void leaveProcessingScope() override
Leaves a recursive processing scope (always called from main thread!)
bool needsProcessing() override
Returns if the tile request needs some more processing.
void enterProcessingScope() override
Enters a recursive processing scope (always called from main thread!)
bool supportsMultiThreading() override
Returns if multi-threading is supported.
void process(double timeBudget) override
Process the requests for the given timeBudget given in seconds.
void processAll() override
Processes all requests until the cursors have traversed the whole tree and the queue is empty.
MultiThreadedPageRequestProcessor(Host *host=nullptr)
void setAllowLateMultiThreadingStart(bool flag)
This can be set to false to debug single threaded part of this class...
void addRootTileRequest(TileRequest *tileRequest) override
Adds the root tileRequest that should be processed (the ownership stays with the caller).
bool processingWasSingleThreaded() const override
Processing is always single-threaded, only implemented for testing purposes.
void launchMultiThreading()
Launch the multi-threading.
void setNumWorkerThreads(std::size_t threads) override
Set number of worker threads.
void append(PageRequest *request) override
Implements PageRequestQueueInterface interface.
Base class for single and multi-threaded processor.
virtual void handleErrorsInGUIThread()
handle all errors that have been accumulated
virtual void getAllPageRequestQueues(std::vector< std::deque< PageRequest * > * > &queues)
get all page request queues (to be derived when there are more than the GUI queues)
A PageRequest represents the request for the calculation of a single page of a PagedImage.
Definition: mlPageRequest.h:32
Timeline for a single thread.
A TileRequest either represents the input sub image that is needed by a PageRequest or if it is a roo...
Definition: mlTileRequest.h:50
Class to measure precise time intervals.
Definition: mlTimeCounter.h:26
WaitCondition implements a wait condition for thread synchronization.
MLint32 MLErrorCode
Type of an ML Error code.
Definition: mlTypeDefs.h:818
boost::mutex Mutex
Defines a non-recursive mutex.
Definition: mlMutex.h:39
#define MLEXPORT
To export symbols from a dll/shared object, we need to mark them with the MLEXPORT symbol.