OpenShot Library | OpenShotAudio  0.2.2
juce_ThreadPool.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2017 - ROLI Ltd.
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
27 {
28  ThreadPoolThread (ThreadPool& p, size_t stackSize)
29  : Thread ("Pool", stackSize), pool (p)
30  {
31  }
32 
33  void run() override
34  {
35  while (! threadShouldExit())
36  if (! pool.runNextJob (*this))
37  wait (500);
38  }
39 
40  std::atomic<ThreadPoolJob*> currentJob { nullptr };
41  ThreadPool& pool;
42 
43  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
44 };
45 
46 //==============================================================================
47 ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name)
48 {
49 }
50 
52 {
53  // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
54  // to remove it first!
55  jassert (pool == nullptr || ! pool->contains (this));
56 }
57 
59 {
60  return jobName;
61 }
62 
63 void ThreadPoolJob::setJobName (const String& newName)
64 {
65  jobName = newName;
66 }
67 
69 {
70  shouldStop = true;
71  listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
72 }
73 
75 {
76  listeners.add (listener);
77 }
78 
80 {
81  listeners.remove (listener);
82 }
83 
85 {
86  if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
87  return t->currentJob.load();
88 
89  return nullptr;
90 }
91 
92 //==============================================================================
93 ThreadPool::ThreadPool (int numThreads, size_t threadStackSize)
94 {
95  jassert (numThreads > 0); // not much point having a pool without any threads!
96 
97  createThreads (numThreads, threadStackSize);
98 }
99 
101 {
102  createThreads (SystemStats::getNumCpus());
103 }
104 
106 {
107  removeAllJobs (true, 5000);
108  stopThreads();
109 }
110 
111 void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
112 {
113  for (int i = jmax (1, numThreads); --i >= 0;)
114  threads.add (new ThreadPoolThread (*this, threadStackSize));
115 
116  for (auto* t : threads)
117  t->startThread();
118 }
119 
120 void ThreadPool::stopThreads()
121 {
122  for (auto* t : threads)
123  t->signalThreadShouldExit();
124 
125  for (auto* t : threads)
126  t->stopThread (500);
127 }
128 
129 void ThreadPool::addJob (ThreadPoolJob* job, bool deleteJobWhenFinished)
130 {
131  jassert (job != nullptr);
132  jassert (job->pool == nullptr);
133 
134  if (job->pool == nullptr)
135  {
136  job->pool = this;
137  job->shouldStop = false;
138  job->isActive = false;
139  job->shouldBeDeleted = deleteJobWhenFinished;
140 
141  {
142  const ScopedLock sl (lock);
143  jobs.add (job);
144  }
145 
146  for (auto* t : threads)
147  t->notify();
148  }
149 }
150 
151 void ThreadPool::addJob (std::function<ThreadPoolJob::JobStatus()> jobToRun)
152 {
153  struct LambdaJobWrapper : public ThreadPoolJob
154  {
155  LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
156  JobStatus runJob() override { return job(); }
157 
158  std::function<ThreadPoolJob::JobStatus()> job;
159  };
160 
161  addJob (new LambdaJobWrapper (jobToRun), true);
162 }
163 
164 void ThreadPool::addJob (std::function<void()> jobToRun)
165 {
166  struct LambdaJobWrapper : public ThreadPoolJob
167  {
168  LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (j) {}
169  JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
170 
171  std::function<void()> job;
172  };
173 
174  addJob (new LambdaJobWrapper (jobToRun), true);
175 }
176 
177 int ThreadPool::getNumJobs() const noexcept
178 {
179  const ScopedLock sl (lock);
180  return jobs.size();
181 }
182 
183 int ThreadPool::getNumThreads() const noexcept
184 {
185  return threads.size();
186 }
187 
188 ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
189 {
190  const ScopedLock sl (lock);
191  return jobs [index];
192 }
193 
194 bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
195 {
196  const ScopedLock sl (lock);
197  return jobs.contains (const_cast<ThreadPoolJob*> (job));
198 }
199 
200 bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
201 {
202  const ScopedLock sl (lock);
203  return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
204 }
205 
206 void ThreadPool::moveJobToFront (const ThreadPoolJob* job) noexcept
207 {
208  const ScopedLock sl (lock);
209 
210  auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
211 
212  if (index > 0 && ! job->isActive)
213  jobs.move (index, 0);
214 }
215 
216 bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
217 {
218  if (job != nullptr)
219  {
220  auto start = Time::getMillisecondCounter();
221 
222  while (contains (job))
223  {
224  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
225  return false;
226 
227  jobFinishedSignal.wait (2);
228  }
229  }
230 
231  return true;
232 }
233 
234 bool ThreadPool::removeJob (ThreadPoolJob* job, bool interruptIfRunning, int timeOutMs)
235 {
236  bool dontWait = true;
237  OwnedArray<ThreadPoolJob> deletionList;
238 
239  if (job != nullptr)
240  {
241  const ScopedLock sl (lock);
242 
243  if (jobs.contains (job))
244  {
245  if (job->isActive)
246  {
247  if (interruptIfRunning)
248  job->signalJobShouldExit();
249 
250  dontWait = false;
251  }
252  else
253  {
254  jobs.removeFirstMatchingValue (job);
255  addToDeleteList (deletionList, job);
256  }
257  }
258  }
259 
260  return dontWait || waitForJobToFinish (job, timeOutMs);
261 }
262 
263 bool ThreadPool::removeAllJobs (bool interruptRunningJobs, int timeOutMs,
264  ThreadPool::JobSelector* selectedJobsToRemove)
265 {
266  Array<ThreadPoolJob*> jobsToWaitFor;
267 
268  {
269  OwnedArray<ThreadPoolJob> deletionList;
270 
271  {
272  const ScopedLock sl (lock);
273 
274  for (int i = jobs.size(); --i >= 0;)
275  {
276  auto* job = jobs.getUnchecked(i);
277 
278  if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
279  {
280  if (job->isActive)
281  {
282  jobsToWaitFor.add (job);
283 
284  if (interruptRunningJobs)
285  job->signalJobShouldExit();
286  }
287  else
288  {
289  jobs.remove (i);
290  addToDeleteList (deletionList, job);
291  }
292  }
293  }
294  }
295  }
296 
297  auto start = Time::getMillisecondCounter();
298 
299  for (;;)
300  {
301  for (int i = jobsToWaitFor.size(); --i >= 0;)
302  {
303  auto* job = jobsToWaitFor.getUnchecked (i);
304 
305  if (! isJobRunning (job))
306  jobsToWaitFor.remove (i);
307  }
308 
309  if (jobsToWaitFor.size() == 0)
310  break;
311 
312  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
313  return false;
314 
315  jobFinishedSignal.wait (20);
316  }
317 
318  return true;
319 }
320 
321 StringArray ThreadPool::getNamesOfAllJobs (bool onlyReturnActiveJobs) const
322 {
323  StringArray s;
324  const ScopedLock sl (lock);
325 
326  for (auto* job : jobs)
327  if (job->isActive || ! onlyReturnActiveJobs)
328  s.add (job->getJobName());
329 
330  return s;
331 }
332 
333 bool ThreadPool::setThreadPriorities (int newPriority)
334 {
335  bool ok = true;
336 
337  for (auto* t : threads)
338  if (! t->setPriority (newPriority))
339  ok = false;
340 
341  return ok;
342 }
343 
344 ThreadPoolJob* ThreadPool::pickNextJobToRun()
345 {
346  OwnedArray<ThreadPoolJob> deletionList;
347 
348  {
349  const ScopedLock sl (lock);
350 
351  for (int i = 0; i < jobs.size(); ++i)
352  {
353  if (auto* job = jobs[i])
354  {
355  if (! job->isActive)
356  {
357  if (job->shouldStop)
358  {
359  jobs.remove (i);
360  addToDeleteList (deletionList, job);
361  --i;
362  continue;
363  }
364 
365  job->isActive = true;
366  return job;
367  }
368  }
369  }
370  }
371 
372  return nullptr;
373 }
374 
375 bool ThreadPool::runNextJob (ThreadPoolThread& thread)
376 {
377  if (auto* job = pickNextJobToRun())
378  {
379  auto result = ThreadPoolJob::jobHasFinished;
380  thread.currentJob = job;
381 
382  try
383  {
384  result = job->runJob();
385  }
386  catch (...)
387  {
388  jassertfalse; // Your runJob() method mustn't throw any exceptions!
389  }
390 
391  thread.currentJob = nullptr;
392 
393  OwnedArray<ThreadPoolJob> deletionList;
394 
395  {
396  const ScopedLock sl (lock);
397 
398  if (jobs.contains (job))
399  {
400  job->isActive = false;
401 
402  if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
403  {
404  jobs.removeFirstMatchingValue (job);
405  addToDeleteList (deletionList, job);
406 
407  jobFinishedSignal.signal();
408  }
409  else
410  {
411  // move the job to the end of the queue if it wants another go
412  jobs.move (jobs.indexOf (job), -1);
413  }
414  }
415  }
416 
417  return true;
418  }
419 
420  return false;
421 }
422 
423 void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
424 {
425  job->shouldStop = true;
426  job->pool = nullptr;
427 
428  if (job->shouldBeDeleted)
429  deletionList.add (job);
430 }
431 
432 } // namespace juce
Holds a resizable array of primitive or copy-by-value objects.
Definition: juce_Array.h:60
ElementType getUnchecked(int index) const
Returns one of the elements in the array, without checking the index passed in.
Definition: juce_Array.h:256
int size() const noexcept
Returns the current number of elements in the array.
Definition: juce_Array.h:219
void remove(int indexToRemove)
Removes an element from the array.
Definition: juce_Array.h:771
void add(const ElementType &newElement)
Appends a new element at the end of the array.
Definition: juce_Array.h:422
Automatically locks and unlocks a mutex object.
An array designed for holding objects.
A special array for holding a list of strings.
void add(String stringToAdd)
Appends a string at the end of the array.
The JUCE String class!
Definition: juce_String.h:43
static int getNumCpus() noexcept
Returns the number of logical CPU cores.
A task that is executed by a ThreadPool object.
void signalJobShouldExit()
Calling this will cause the shouldExit() method to return true, and the job should (if it's been impl...
JobStatus
These are the values that can be returned by the runJob() method.
@ jobHasFinished
indicates that the job has finished and can be removed from the pool.
@ jobNeedsRunningAgain
indicates that the job would like to be called again when a thread is free.
void setJobName(const String &newName)
Changes the job's name.
String getJobName() const
Returns the name of this job.
void addListener(Thread::Listener *)
Add a listener to this thread job which will receive a callback when signalJobShouldExit was called o...
virtual ~ThreadPoolJob()
Destructor.
static ThreadPoolJob * getCurrentThreadPoolJob()
If the calling thread is being invoked inside a runJob() method, this will return the ThreadPoolJob t...
void removeListener(Thread::Listener *)
Removes a listener added with addListener.
ThreadPoolJob(const String &name)
Creates a thread pool job object.
A callback class used when you need to select which ThreadPoolJob objects are suitable for some kind ...
virtual bool isJobSuitable(ThreadPoolJob *job)=0
Should return true if the specified thread matches your criteria for whatever operation that this obj...
A set of threads that will run a list of jobs.
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
If the given job is in the queue, this will move it to the front so that it is the next one to be exe...
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
Adds a job to the queue.
int getNumThreads() const noexcept
Returns the number of threads assigned to this thread pool.
ThreadPoolJob * getJob(int index) const noexcept
Returns one of the jobs in the queue.
int getNumJobs() const noexcept
Returns the number of jobs currently running or queued.
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
Tries to remove all jobs from the pool.
~ThreadPool()
Destructor.
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
Returns a list of the names of all the jobs currently running or queued.
bool isJobRunning(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently being run by a thread.
bool setThreadPriorities(int newPriority)
Changes the priority of all the threads.
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
Tries to remove a job from the pool.
bool contains(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently queued or running.
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
Waits until a job has finished running and has been removed from the pool.
ThreadPool()
Creates a thread pool with one thread per CPU core.
Used to receive callbacks for thread exit calls.
Definition: juce_Thread.h:185
virtual void exitSignalSent()=0
Called if Thread::signalThreadShouldExit was called.
Encapsulates a thread.
Definition: juce_Thread.h:47
static Thread *JUCE_CALLTYPE getCurrentThread()
Finds the thread object that is currently running.
bool wait(int timeOutMilliseconds) const
Suspends the execution of this thread until either the specified timeout period has elapsed,...
Thread(const String &threadName, size_t threadStackSize=0)
Creates a thread.
Definition: juce_Thread.cpp:26
bool threadShouldExit() const
Checks whether the thread has been told to stop running.
static uint32 getMillisecondCounter() noexcept
Returns the number of millisecs since a fixed event (usually system startup).
Definition: juce_Time.cpp:226
bool wait(int timeOutMilliseconds=-1) const
Suspends the calling thread until the event has been signalled.
void signal() const
Wakes up any threads that are currently waiting on this object.
void run() override
Must be implemented to perform the thread's actual code.