ML Reference
mlMultiThreadedPageRequestProcessor.h
Go to the documentation of this file.
1/*************************************************************************************
2**
3** Copyright 2009, MeVis Medical Solutions AG
4**
5** The user may use this file in accordance with the license agreement provided with
6** the Software or, alternatively, in accordance with the terms contained in a
7** written agreement between the user and MeVis Medical Solutions AG.
8**
9** For further information use the contact form at https://www.mevislab.de/contact
10**
11**************************************************************************************/
12
13#ifndef ML_MULTI_THREADED_PAGE_REQUEST_PROCESSOR_H
14#define ML_MULTI_THREADED_PAGE_REQUEST_PROCESSOR_H
15
16#include "mlInitSystemML.h"
17
19#include "mlAtomicCounter.h"
21
22#include "ThirdPartyWarningsDisable.h"
23#include <boost/thread.hpp>
24#include "ThirdPartyWarningsRestore.h"
25#include <mlMutex.h>
26#include <mlWaitCondition.h>
27
29#define ML_HOST_START_SINGLETHREADED_TIMEOUT 0.0005
31#define ML_HOST_PAGEREQUEST_QUEUE_LIMIT 2
33#define ML_HOST_PAGEREQUEST_PRODUCER_BATCH_SIZE 2
35#define ML_HOST_START_SINGLETHREADED_PAGEREQUEST_LIMIT 10
37#define ML_HOST_MINIMAL_GUITHREAD_SLEEP_MS 10
38
39ML_START_NAMESPACE
40
41//-------------------------------------------------------------------------------------------
44//-------------------------------------------------------------------------------------------
46{
47public:
50
52 bool supportsMultiThreading() override { return true; }
53
55 void setNumWorkerThreads(std::size_t threads) override;
56
58 void addRootTileRequest(TileRequest* tileRequest) override;
59
61 void processAll() override;
62
64 void process(double timeBudget) override;
65
68
70 bool needsProcessing() override;
71
73 void append(PageRequest* request) override;
74
76 void enterProcessingScope() override;
78 void leaveProcessingScope() override;
79
81 bool processingWasSingleThreaded() const override { return _processingWasSingleThreaded; }
82
84 void setAllowLateMultiThreadingStart(bool flag) { _allowLateMultiThreadingStart = flag; }
85
86private:
87 struct WorkerThreadState;
88
90 bool processNextGUIRequest(int millisecondsToWait);
91
93 void processNextWorkerRequest(WorkerThreadState* state, PerThreadStorage& perThreadStorage);
94
96 MLErrorCode processRequest(PageRequest* request, bool isGUIThread, PerThreadStorage& perThreadStorage);
97
99 static void rootTileRequestFinishedCB(void* data, TileRequest* request);
100
102 void rootTileRequestFinished(TileRequest* request);
103
105 void runPageRequestProducerLoop();
106
108 void addNewCursors();
109
111 void runWorkerThreadLoop(WorkerThreadState* state);
112
114 void notifyAllThreads();
115
117 void launchWorkerThreads();
118
120 int getCurrentNumberOfPageRequestsInQueues();
121
123 void wakeupProducerIfWaitingForQueue();
124
126 void handleErrorsInGUIThread() override;
127
129 void stopProcessingOnAllThreads();
130
132 void moveRequestsFromGUIQueuesToOtherQueues();
133
135 void getAllPageRequestQueues(std::vector< std::deque<PageRequest*>* >& queues) override;
136
138 void startCreatingWorkSingleThreaded();
139
141 bool _shutdown;
142
144 bool _producerWorking;
146 int _workerThreadsWorking;
149 int _maxPageRequestsInQueues;
152 bool _producerWaitingForQueue;
153
155 bool _processingAllowed;
156
157 bool _useMultiThreading;
158 bool _processingWasSingleThreaded;
159 bool _allowLateMultiThreadingStart;
160
162 WaitCondition _stoppedProcessing;
163
165 WaitCondition _guiThreadWakeup;
166 WaitCondition _producerWakeup;
167 WaitCondition _workerWakeup;
168 WaitCondition _ioThreadWakeup;
169
171 Mutex _mutex;
172
174 std::vector<PageRequestCursor*> _newCursors;
175
177 std::deque<PageRequest*> _parallelWorkQueue;
178 std::deque<PageRequest*> _ioWorkQueue;
179
181 struct WorkerThreadState
182 {
183 WorkerThreadState();
184 ~WorkerThreadState();
185
187 boost::thread* _thread;
188
190 bool _working;
192 int _workerThreadId;
195
197 WaitCondition* _wakeup;
199 std::deque<PageRequest*>* _queue;
200
202 bool _shutdown;
203
205 TimeCounter _profilingTimer;
206 };
207
209 std::size_t _numDesiredWorkerThreads;
210
212 std::vector<WorkerThreadState*> _workerThreads;
213
215 WorkerThreadState* _ioThread;
216
218 boost::thread* _producerThread;
219
221 ProcessingTimeLine::TimeLine* _producerThreadTimeLine;
222
224 struct ProfilingInfo {
225 ProfilingInfo() {
226 count = 0;
227 elapsedTime = 0;
228 }
229
231 int count;
232
234 double elapsedTime;
235 };
236
238 std::map<PagedImage*, ProfilingInfo> _profilingData;
239
241 void addThreadTimesToMLProfiling();
242
243};
244
245ML_END_NAMESPACE
246
247#endif
248
The Host is the central image processing class in the ML.
Definition mlHost.h:115
A multi-threaded processor that takes one or even multiple tile requests and can process them iterati...
void leaveProcessingScope() override
Leaves a recursive processing scope (always called from main thread!)
bool needsProcessing() override
Returns if the tile request needs some more processing.
void enterProcessingScope() override
Enters a recursive processing scope (always called from main thread!)
bool supportsMultiThreading() override
Returns if multi-threading is supported.
void process(double timeBudget) override
Process the requests for the given timeBudget given in seconds.
void processAll() override
Processes all requests until the cursors have traversed the whole tree and the queue is empty.
MultiThreadedPageRequestProcessor(Host *host=nullptr)
void setAllowLateMultiThreadingStart(bool flag)
This can be set to false to debug single threaded part of this class...
void addRootTileRequest(TileRequest *tileRequest) override
Adds the root tileRequest that should be processed (the ownership stays with the caller).
bool processingWasSingleThreaded() const override
Processing is always single-threaded, only implemented for testing purposes.
void launchMultiThreading()
Launch the multi-threading.
void setNumWorkerThreads(std::size_t threads) override
Set number of worker threads.
void append(PageRequest *request) override
Implements PageRequestQueueInterface interface.
Base class for single and multi-threaded processor.
virtual void handleErrorsInGUIThread()
handle all errors that have been accumulated
virtual void getAllPageRequestQueues(std::vector< std::deque< PageRequest * > * > &queues)
get all page request queues (to be derived when there are more than the GUI queues)
A PageRequest represents the request for the calculation of a single page of a PagedImage.
Timeline for a single thread.
A TileRequest either represents the input sub image that is needed by a PageRequest or if it is a roo...
Class to measure precise time intervals.
WaitCondition implements a wait condition for thread synchronization.
MLint32 MLErrorCode
Type of an ML Error code.
Definition mlTypeDefs.h:716
boost::mutex Mutex
Defines a non-recursive mutex.
Definition mlMutex.h:39
#define MLEXPORT
To export symbols from a dll/shared object, we need to mark them with the MLEXPORT symbol.