Fawkes API  Fawkes Development Version
thread_manager.cpp
1 
2 /***************************************************************************
3  * thread_manager.cpp - Thread manager
4  *
5  * Created: Thu Nov 3 19:11:31 2006 (on train to Cologne)
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <aspect/blocked_timing.h>
25 #include <baseapp/thread_manager.h>
26 #include <core/exceptions/software.h>
27 #include <core/exceptions/system.h>
28 #include <core/threading/mutex_locker.h>
29 #include <core/threading/thread.h>
30 #include <core/threading/thread_finalizer.h>
31 #include <core/threading/thread_initializer.h>
32 #include <core/threading/wait_condition.h>
33 
34 namespace fawkes {
35 
36 /** @class ThreadManager <baseapp/thread_manager.h>
37  * Base application thread manager.
38  * This class provides a manager for the threads. Threads are memorized by
39  * their wakeup hook. When the thread manager is deleted, all threads are
40  * appropriately cancelled, joined and deleted. Thus the thread manager
41  * can be used for "garbage collection" of threads.
42  *
43  * The thread manager allows easy wakeup of threads of a given wakeup hook.
44  *
45  * The thread manager needs a thread initializer. Each thread that is added
46  * to the thread manager is initialized with this. The runtime type information
47  * (RTTI) supplied by C++ can be used to initialize threads if appropriate
48  * (if the thread has certain aspects that need special treatment).
49  *
50  * @author Tim Niemueller
51  */
52 
53 /** Constructor.
54  * @param parent_manager parent thread manager
55  */
56 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(
57  ThreadManager *parent_manager)
58 {
59  parent_manager_ = parent_manager;
60 }
61 
62 void
63 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
64 {
65  BlockedTimingAspect *timed_thread;
66 
67  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
68  if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL) {
69  throw IllegalArgumentException(
70  "ThreadProducerAspect may not add threads with BlockedTimingAspect");
71  }
72  }
73 
74  parent_manager_->add_maybelocked(tl, /* lock */ false);
75 }
76 
77 void
78 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
79 {
80  BlockedTimingAspect *timed_thread;
81 
82  if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
83  throw IllegalArgumentException(
84  "ThreadProducerAspect may not add threads with BlockedTimingAspect");
85  }
86 
87  parent_manager_->add_maybelocked(t, /* lock */ false);
88 }
89 
90 void
91 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
92 {
93  BlockedTimingAspect *timed_thread;
94 
95  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
96  if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL) {
97  throw IllegalArgumentException(
98  "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
99  }
100  }
101 
102  parent_manager_->remove_maybelocked(tl, /* lock */ false);
103 }
104 
105 void
106 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
107 {
108  BlockedTimingAspect *timed_thread;
109 
110  if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
111  throw IllegalArgumentException(
112  "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
113  }
114 
115  parent_manager_->remove_maybelocked(t, /* lock */ false);
116 }
117 
118 void
119 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
120 {
121  throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
122 }
123 
124 void
125 ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
126 {
127  throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
128 }
129 
130 /** Constructor.
131  * When using this constructor you need to make sure to call set_inifin()
132  * before any thread is added.
133  */
135 {
136  initializer_ = NULL;
137  finalizer_ = NULL;
138  threads_.clear();
139  waitcond_timedthreads_ = new WaitCondition();
140  interrupt_timed_thread_wait_ = false;
141  aspect_collector_ = new ThreadManagerAspectCollector(this);
142 }
143 
144 /** Constructor.
145  * This contsructor is equivalent to the one without parameters followed
146  * by a call to set_inifins().
147  * @param initializer thread initializer
148  * @param finalizer thread finalizer
149  */
151 {
152  initializer_ = NULL;
153  finalizer_ = NULL;
154  threads_.clear();
155  waitcond_timedthreads_ = new WaitCondition();
156  interrupt_timed_thread_wait_ = false;
157  aspect_collector_ = new ThreadManagerAspectCollector(this);
158  set_inifin(initializer, finalizer);
159 }
160 
161 /** Destructor. */
163 {
164  // stop all threads, we call finalize, and we run through it as long as there are
165  // still running threads, after that, we force the thread's death.
166  for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
167  try {
168  tit_->second.force_stop(finalizer_);
169  } catch (Exception &e) {
170  } // ignore
171  }
172  try {
173  untimed_threads_.force_stop(finalizer_);
174  } catch (Exception &e) {
175  } // ignore
176  threads_.clear();
177 
178  delete waitcond_timedthreads_;
179  delete aspect_collector_;
180 }
181 
182 /** Set initializer/finalizer.
183  * This method has to be called before any thread is added/removed.
184  * @param initializer thread initializer
185  * @param finalizer thread finalizer
186  */
187 void
189 {
190  initializer_ = initializer;
191  finalizer_ = finalizer;
192 }
193 
194 /** Remove the given thread from internal structures.
195  * Thread is removed from the internal structures. If the thread has the
196  * BlockedTimingAspect then the hook is added to the changed list.
197  *
198  * @param t thread to remove
199  * @param changed list of changed hooks, appropriate hook is added if necessary
200  */
201 void
202 ThreadManager::internal_remove_thread(Thread *t)
203 {
204  BlockedTimingAspect *timed_thread;
205 
206  if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
207  // find thread and remove
209  if (threads_.find(hook) != threads_.end()) {
210  threads_[hook].remove_locked(t);
211  if (threads_[hook].empty())
212  threads_.erase(hook);
213  }
214  } else {
215  untimed_threads_.remove_locked(t);
216  }
217 }
218 
219 /** Add the given thread to internal structures.
220  * Thread is added to the internal structures. If the thread has the
221  * BlockedTimingAspect then the hook is added to the changed list.
222  *
223  * @param t thread to add
224  * @param changed list of changed hooks, appropriate hook is added if necessary
225  */
226 void
227 ThreadManager::internal_add_thread(Thread *t)
228 {
229  BlockedTimingAspect *timed_thread;
230  if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
231  BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
232 
233  if (threads_.find(hook) == threads_.end()) {
234  threads_[hook].set_name("ThreadManagerList Hook %i", hook);
235  threads_[hook].set_maintain_barrier(true);
236  }
237  threads_[hook].push_back_locked(t);
238 
239  waitcond_timedthreads_->wake_all();
240  } else {
241  untimed_threads_.push_back_locked(t);
242  }
243 }
244 
245 /** Add threads.
246  * Add the given threads to the thread manager. The threads are initialised
247  * as appropriate and started. See the class documentation for supported
248  * specialisations of threads and the performed initialisation steps.
249  * If the thread initializer cannot initalize one or more threads no thread
250  * is added. In this regard the operation is atomic, either all threads are
251  * added or none.
252  * @param tl thread list with threads to add
253  * @exception CannotInitializeThreadException thrown if at least one of the
254  * threads could not be initialised
255  */
256 void
257 ThreadManager::add_maybelocked(ThreadList &tl, bool lock)
258 {
259  if (!(initializer_ && finalizer_)) {
260  throw NullPointerException("ThreadManager: initializer/finalizer not set");
261  }
262 
263  if (tl.sealed()) {
264  throw Exception("Not accepting new threads from list that is not fresh, "
265  "list '%s' already sealed",
266  tl.name());
267  }
268 
269  tl.lock();
270 
271  // Try to initialise all threads
272  try {
273  tl.init(initializer_, finalizer_);
274  } catch (Exception &e) {
275  tl.unlock();
276  throw;
277  }
278 
279  tl.seal();
280  tl.start();
281 
282  // All thread initialized, now add threads to internal structure
283  MutexLocker locker(threads_.mutex(), lock);
284  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
285  internal_add_thread(*i);
286  }
287 
288  tl.unlock();
289 }
290 
291 /** Add one thread.
292  * Add the given thread to the thread manager. The thread is initialized
293  * as appropriate and started. See the class documentation for supported
294  * specialisations of threads and the performed initialisation steps.
295  * If the thread initializer cannot initalize the thread it is not added.
296  * @param thread thread to add
297  * @param lock if true the environment is locked before adding the thread
298  * @exception CannotInitializeThreadException thrown if at least the
299  * thread could not be initialised
300  */
301 void
302 ThreadManager::add_maybelocked(Thread *thread, bool lock)
303 {
304  if (thread == NULL) {
305  throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
306  }
307 
308  if (!(initializer_ && finalizer_)) {
309  throw NullPointerException("ThreadManager: initializer/finalizer not set");
310  }
311 
312  try {
313  initializer_->init(thread);
314  } catch (CannotInitializeThreadException &e) {
315  thread->notify_of_failed_init();
316  e.append("Adding thread in ThreadManager failed");
317  throw;
318  }
319 
320  // if the thread's init() method fails, we need to finalize that very
321  // thread only with the finalizer, already initialized threads muts be
322  // fully finalized
323  try {
324  thread->init();
325  } catch (CannotInitializeThreadException &e) {
326  thread->notify_of_failed_init();
327  finalizer_->finalize(thread);
328  throw;
329  } catch (Exception &e) {
330  thread->notify_of_failed_init();
331  CannotInitializeThreadException cite(e);
332  cite.append("Could not initialize thread '%s' (ThreadManager)", thread->name());
333  finalizer_->finalize(thread);
334  throw cite;
335  } catch (std::exception &e) {
336  thread->notify_of_failed_init();
337  CannotInitializeThreadException cite;
338  cite.append("Caught std::exception: %s", e.what());
339  cite.append("Could not initialize thread '%s' (ThreadManager)", thread->name());
340  finalizer_->finalize(thread);
341  throw cite;
342  } catch (...) {
343  thread->notify_of_failed_init();
344  CannotInitializeThreadException cite("Could not initialize thread '%s' (ThreadManager)",
345  thread->name());
346  cite.append("Unknown exception caught");
347  finalizer_->finalize(thread);
348  throw cite;
349  }
350 
351  thread->start();
352  MutexLocker locker(threads_.mutex(), lock);
353  internal_add_thread(thread);
354 }
355 
356 /** Remove the given threads.
357  * The thread manager tries to finalize and stop the threads and then removes the
358  * threads from the internal structures.
359  *
360  * This may fail if at least one thread of the given list cannot be finalized, for
361  * example if prepare_finalize() returns false or if the thread finalizer cannot
362  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
363  *
364  * @param tl threads to remove.
365  * @exception CannotFinalizeThreadException At least one thread cannot be safely
366  * finalized
367  * @exception ThreadListNotSealedException if the given thread lits tl is not
368  * sealed the thread manager will refuse to remove it
369  */
370 void
371 ThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
372 {
373  if (!(initializer_ && finalizer_)) {
374  throw NullPointerException("ThreadManager: initializer/finalizer not set");
375  }
376 
377  if (!tl.sealed()) {
378  throw ThreadListNotSealedException("(ThreadManager) Cannot remove unsealed thread "
379  "list. Not accepting unsealed list '%s' for removal",
380  tl.name());
381  }
382 
383  tl.lock();
384  MutexLocker locker(threads_.mutex(), lock);
385 
386  try {
387  if (!tl.prepare_finalize(finalizer_)) {
388  tl.cancel_finalize();
389  tl.unlock();
390  throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
391  "finalized",
392  tl.name());
393  }
394  } catch (CannotFinalizeThreadException &e) {
395  tl.unlock();
396  throw;
397  } catch (Exception &e) {
398  tl.unlock();
399  e.append("One or more threads in list '%s' cannot be finalized", tl.name());
400  throw CannotFinalizeThreadException(e);
401  }
402 
403  tl.stop();
404  try {
405  tl.finalize(finalizer_);
406  } catch (Exception &e) {
407  tl.unlock();
408  throw;
409  }
410 
411  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
412  internal_remove_thread(*i);
413  }
414 
415  tl.unlock();
416 }
417 
418 /** Remove the given thread.
419  * The thread manager tries to finalize and stop the thread and then removes the
420  * thread from the internal structures.
421  *
422  * This may fail if the thread cannot be finalized, for
423  * example if prepare_finalize() returns false or if the thread finalizer cannot
424  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
425  *
426  * @param thread thread to remove.
427  * @exception CannotFinalizeThreadException At least one thread cannot be safely
428  * finalized
429  */
430 void
431 ThreadManager::remove_maybelocked(Thread *thread, bool lock)
432 {
433  if (thread == NULL)
434  return;
435 
436  if (!(initializer_ && finalizer_)) {
437  throw NullPointerException("ThreadManager: initializer/finalizer not set");
438  }
439 
440  MutexLocker locker(threads_.mutex(), lock);
441  try {
442  if (!thread->prepare_finalize()) {
443  thread->cancel_finalize();
444  throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
445  }
446  } catch (CannotFinalizeThreadException &e) {
447  e.append("ThreadManager cannot stop thread '%s'", thread->name());
448  thread->cancel_finalize();
449  throw;
450  }
451 
452  thread->cancel();
453  thread->join();
454  thread->finalize();
455  finalizer_->finalize(thread);
456 
457  internal_remove_thread(thread);
458 }
459 
460 /** Force removal of the given threads.
461  * The thread manager tries to finalize and stop the threads and then removes the
462  * threads from the internal structures.
463  *
464  * This will succeed even if a thread of the given list cannot be finalized, for
465  * example if prepare_finalize() returns false or if the thread finalizer cannot
466  * finalize the thread.
467  *
468  * <b>Caution, using this function may damage your robot.</b>
469  *
470  * @param tl threads to remove.
471  * @exception ThreadListNotSealedException if the given thread lits tl is not
472  * sealed the thread manager will refuse to remove it
473  * The threads are removed from thread manager control. The threads will be stopped
474  * before they are removed (may cause unpredictable results otherwise).
475  */
476 void
478 {
479  if (!tl.sealed()) {
480  throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal", tl.name());
481  }
482 
483  tl.lock();
484  threads_.mutex()->stopby();
485  bool caught_exception = false;
486  Exception exc("Forced removal of thread list %s failed", tl.name());
487  try {
488  tl.force_stop(finalizer_);
489  } catch (Exception &e) {
490  caught_exception = true;
491  exc = e;
492  }
493 
494  for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
495  internal_remove_thread(*i);
496  }
497 
498  tl.unlock();
499 
500  if (caught_exception) {
501  throw exc;
502  }
503 }
504 
505 /** Force removal of the given thread.
506  * The thread manager tries to finalize and stop the thread and then removes the
507  * thread from the internal structures.
508  *
509  * This will succeed even if the thread cannot be finalized, for
510  * example if prepare_finalize() returns false or if the thread finalizer cannot
511  * finalize the thread.
512  *
513  * <b>Caution, using this function may damage your robot.</b>
514  *
515  * @param thread thread to remove.
516  * @exception ThreadListNotSealedException if the given thread lits tl is not
517  * sealed the thread manager will refuse to remove it
518  * The threads are removed from thread manager control. The threads will be stopped
519  * before they are removed (may cause unpredictable results otherwise).
520  */
521 void
523 {
524  MutexLocker lock(threads_.mutex());
525  try {
526  thread->prepare_finalize();
527  } catch (Exception &e) {
528  // ignore
529  }
530 
531  thread->cancel();
532  thread->join();
533  thread->finalize();
534  if (finalizer_)
535  finalizer_->finalize(thread);
536 
537  internal_remove_thread(thread);
538 }
539 
540 void
542 {
543  MutexLocker lock(threads_.mutex());
544 
545  unsigned int timeout_sec = 0;
546  if (timeout_usec >= 1000000) {
547  timeout_sec = timeout_usec / 1000000;
548  timeout_usec -= timeout_sec * 1000000;
549  }
550 
551  // Note that the following lines might throw an exception, we just pass it on
552  if (threads_.find(hook) != threads_.end()) {
553  threads_[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
554  }
555 }
556 
557 void
559 {
560  MutexLocker lock(threads_.mutex());
561 
562  if (threads_.find(hook) != threads_.end()) {
563  if (barrier) {
564  threads_[hook].wakeup(barrier);
565  } else {
566  threads_[hook].wakeup();
567  }
568  if (threads_[hook].size() == 0) {
569  threads_.erase(hook);
570  }
571  }
572 }
573 
574 void
575 ThreadManager::try_recover(std::list<std::string> &recovered_threads)
576 {
577  threads_.lock();
578  for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
579  tit_->second.try_recover(recovered_threads);
580  }
581  threads_.unlock();
582 }
583 
584 bool
586 {
587  return (threads_.size() > 0);
588 }
589 
590 void
592 {
593  interrupt_timed_thread_wait_ = false;
594  waitcond_timedthreads_->wait();
595  if (interrupt_timed_thread_wait_) {
596  interrupt_timed_thread_wait_ = false;
597  throw InterruptedException("Waiting for timed threads was interrupted");
598  }
599 }
600 
601 void
603 {
604  interrupt_timed_thread_wait_ = true;
605  waitcond_timedthreads_->wake_all();
606 }
607 
608 /** Get a thread collector to be used for an aspect initializer.
609  * @return thread collector instance to use for ThreadProducerAspect.
610  */
613 {
614  return aspect_collector_;
615 }
616 
617 } // end namespace fawkes
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
Thread aspect to use blocked timing.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
Definition: exception.h:36
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
Mutex locking helper.
Definition: mutex_locker.h:34
Thread finalizer interface.
virtual void finalize(Thread *thread)=0
Finalize a thread.
Thread initializer interface.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
Thread list not sealed exception.
Definition: thread_list.h:50
List of threads.
Definition: thread_list.h:56
void remove_locked(Thread *thread)
Remove with lock protection.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
const char * name()
Name of the thread list.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
bool sealed()
Check if list is sealed.
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
virtual ~ThreadManager()
Destructor.
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
virtual void wait_for_timed_threads()
Wait for timed threads.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
virtual bool timed_threads_exist()
Check if any timed threads exist.
ThreadManager()
Constructor.
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Thread class encapsulation of pthreads.
Definition: thread.h:46
void join()
Join the thread.
Definition: thread.cpp:597
bool prepare_finalize()
Prepare finalization.
Definition: thread.cpp:375
void cancel()
Cancel a thread.
Definition: thread.cpp:646
virtual void finalize()
Finalize the thread.
Definition: thread.cpp:463
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
Fawkes library namespace.