Fawkes API  Fawkes Development Version
thread_manager.cpp
00001 
00002 /***************************************************************************
00003  *  thread_manager.cpp - Thread manager
00004  *
00005  *  Created: Thu Nov  3 19:11:31 2006 (on train to Cologne)
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <mainapp/thread_manager.h>
00025 #include <core/threading/thread.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <core/threading/wait_condition.h>
00028 #include <core/threading/thread_initializer.h>
00029 #include <core/threading/thread_finalizer.h>
00030 #include <core/exceptions/software.h>
00031 #include <core/exceptions/system.h>
00032 
00033 #include <aspect/blocked_timing.h>
00034 
00035 using namespace fawkes;
00036 
00037 /** @class FawkesThreadManager mainapp/thread_manager.h
00038  * Thread Manager.
00039  * This class provides a manager for the threads. Threads are memorized by
00040  * their wakeup hook. When the thread manager is deleted, all threads are
00041  * appropriately cancelled, joined and deleted. Thus the thread manager
00042  * can be used for "garbage collection" of threads.
00043  *
00044  * The thread manager allows easy wakeup of threads of a given wakeup hook.
00045  *
00046  * The thread manager needs a thread initializer. Each thread that is added
00047  * to the thread manager is initialized with this. The runtime type information
00048  * (RTTI) supplied by C++ can be used to initialize threads if appropriate
00049  * (if the thread has certain aspects that need special treatment).
00050  *
00051  * @author Tim Niemueller
00052  */
00053 
00054 FawkesThreadManager::FawkesThreadManagerAspectCollector::FawkesThreadManagerAspectCollector(FawkesThreadManager *parent_manager)
00055 {
00056   __parent_manager = parent_manager;
00057 }
00058 
00059 
00060 void
00061 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(ThreadList &tl)
00062 {
00063   BlockedTimingAspect *timed_thread;
00064 
00065   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00066     if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00067       throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00068     }
00069   }
00070 
00071   __parent_manager->add_maybelocked(tl, /* lock */ false);
00072 }
00073 
00074 
00075 void
00076 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(Thread *t)
00077 {
00078   BlockedTimingAspect *timed_thread;
00079 
00080   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00081     throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00082   }
00083 
00084   __parent_manager->add_maybelocked(t, /* lock */ false);
00085 }
00086 
00087 
00088 void
00089 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(ThreadList &tl)
00090 {
00091   BlockedTimingAspect *timed_thread;
00092 
00093   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00094     if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00095       throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00096     }
00097   }
00098 
00099   __parent_manager->remove_maybelocked(tl, /* lock */ false);
00100 }
00101 
00102 
00103 void
00104 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(Thread *t)
00105 {
00106   BlockedTimingAspect *timed_thread;
00107 
00108   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00109     throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00110   }
00111 
00112   __parent_manager->remove_maybelocked(t, /* lock */ false);
00113 }
00114 
00115 
00116 void
00117 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
00118 {
00119   throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00120 }
00121 
00122 void
00123 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
00124 {
00125   throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00126 }
00127 
00128 
00129 /** Constructor.
00130  */
00131 FawkesThreadManager::FawkesThreadManager()
00132 {
00133   initializer = NULL;
00134   finalizer   = NULL;
00135   threads.clear();
00136   waitcond_timedthreads = new WaitCondition();
00137   __interrupt_timed_thread_wait = false;
00138   __aspect_collector = new FawkesThreadManagerAspectCollector(this);
00139 }
00140 
00141 
00142 /** Destructor. */
00143 FawkesThreadManager::~FawkesThreadManager()
00144 {
00145   // stop all threads, we call finalize, and we run through it as long as there are
00146   // still running threads, after that, we force the thread's death.
00147   for (tit = threads.begin(); tit != threads.end(); ++tit) {
00148     (*tit).second.force_stop(finalizer);
00149   }
00150   untimed_threads.force_stop(finalizer);
00151   threads.clear();
00152 
00153   delete waitcond_timedthreads;
00154   delete __aspect_collector;
00155 }
00156 
00157 
00158 /** Set initializer/finalizer.
00159  * This method has to be called before any thread is added/removed.
00160  * @param initializer thread initializer
00161  * @param finalizer thread finalizer
00162  */
00163 void
00164 FawkesThreadManager::set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
00165 {
00166   this->initializer = initializer;
00167   this->finalizer   = finalizer;
00168 }
00169 
00170 
00171 /** Remove the given thread from internal structures.
00172  * Thread is removed from the internal structures. If the thread has the
00173  * BlockedTimingAspect then the hook is added to the changed list.
00174  *
00175  * @param t thread to remove
00176  * @param changed list of changed hooks, appropriate hook is added if necessary
00177  */
00178 void
00179 FawkesThreadManager::internal_remove_thread(Thread *t)
00180 {
00181   BlockedTimingAspect *timed_thread;
00182 
00183   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00184     // find thread and remove
00185     BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00186     if ( threads.find(hook) != threads.end() ) {
00187       threads[hook].remove_locked(t);
00188       if (threads[hook].empty())  threads.erase(hook);
00189     }
00190   } else {
00191     untimed_threads.remove_locked(t);
00192   }
00193 }
00194 
00195 
00196 /** Add the given thread to internal structures.
00197  * Thread is added to the internal structures. If the thread has the
00198  * BlockedTimingAspect then the hook is added to the changed list.
00199  *
00200  * @param t thread to add
00201  * @param changed list of changed hooks, appropriate hook is added if necessary
00202  */
00203 void
00204 FawkesThreadManager::internal_add_thread(Thread *t)
00205 {
00206   BlockedTimingAspect *timed_thread;
00207   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00208     BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00209 
00210     if ( threads.find(hook) == threads.end() ) {
00211       threads[hook].set_name("FawkesThreadManagerList Hook %i", hook);
00212       threads[hook].set_maintain_barrier(true);
00213     }
00214     threads[hook].push_back_locked(t);
00215 
00216     waitcond_timedthreads->wake_all();
00217   } else {
00218     untimed_threads.push_back_locked(t);
00219   }
00220 }
00221 
00222 
00223 /** Add threads.
00224  * Add the given threads to the thread manager. The threads are initialised
00225  * as appropriate and started. See the class documentation for supported
00226  * specialisations of threads and the performed initialisation steps.
00227  * If the thread initializer cannot initalize one or more threads no thread
00228  * is added. In this regard the operation is atomic, either all threads are
00229  * added or none.
00230  * @param tl thread list with threads to add
00231  * @exception CannotInitializeThreadException thrown if at least one of the
00232  * threads could not be initialised
00233  */
00234 void
00235 FawkesThreadManager::add_maybelocked(ThreadList &tl, bool lock)
00236 {
00237   if ( ! (initializer && finalizer) ) {
00238     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00239   }
00240 
00241   if ( tl.sealed() ) {
00242     throw Exception("Not accepting new threads from list that is not fresh, "
00243                     "list '%s' already sealed", tl.name());
00244   }
00245 
00246   tl.lock();
00247 
00248   // Try to initialise all threads
00249   try {
00250     tl.init(initializer, finalizer);
00251   } catch (Exception &e) {
00252     tl.unlock();
00253     throw;
00254   }
00255 
00256   tl.seal();
00257   tl.start();
00258 
00259   // All thread initialized, now add threads to internal structure
00260   MutexLocker locker(threads.mutex(), lock);
00261   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00262     internal_add_thread(*i);
00263   }
00264 
00265   tl.unlock();
00266 }
00267 
00268 
00269 /** Add one thread.
00270  * Add the given thread to the thread manager. The thread is initialized
00271  * as appropriate and started. See the class documentation for supported
00272  * specialisations of threads and the performed initialisation steps.
00273  * If the thread initializer cannot initalize the thread it is not added.
00274  * @param thread thread to add
00275  * @param lock if true the environment is locked before adding the thread
00276  * @exception CannotInitializeThreadException thrown if at least the
00277  * thread could not be initialised
00278  */
00279 void
00280 FawkesThreadManager::add_maybelocked(Thread *thread, bool lock)
00281 {
00282   if ( thread == NULL ) {
00283     throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
00284   }
00285 
00286   if ( ! (initializer && finalizer) ) {
00287     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00288   }
00289 
00290   try {
00291     initializer->init(thread);
00292   } catch (CannotInitializeThreadException &e) {
00293     e.append("Adding thread in FawkesThreadManager failed");
00294     throw;
00295   }
00296 
00297   thread->start();
00298   MutexLocker locker(threads.mutex(), lock);
00299   internal_add_thread(thread);
00300 }
00301 
00302 
00303 /** Remove the given threads.
00304  * The thread manager tries to finalize and stop the threads and then removes the
00305  * threads from the internal structures.
00306  *
00307  * This may fail if at least one thread of the given list cannot be finalized, for
00308  * example if prepare_finalize() returns false or if the thread finalizer cannot
00309  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
00310  *
00311  * @param tl threads to remove.
00312  * @exception CannotFinalizeThreadException At least one thread cannot be safely
00313  * finalized
00314  * @exception ThreadListNotSealedException if the given thread lits tl is not
00315  * sealed the thread manager will refuse to remove it
00316  */
00317 void
00318 FawkesThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
00319 {
00320   if ( ! (initializer && finalizer) ) {
00321     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00322   }
00323 
00324 
00325   if ( ! tl.sealed() ) {
00326     throw ThreadListNotSealedException("(FawkesThreadManager) Cannot remove unsealed thread "
00327                                        "list. Not accepting unsealed list '%s' for removal",
00328                                        tl.name());
00329   }
00330 
00331   tl.lock();
00332   MutexLocker locker(threads.mutex(), lock);
00333 
00334   try {
00335     if ( ! tl.prepare_finalize(finalizer) ) {
00336       tl.cancel_finalize();
00337       tl.unlock();
00338       throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
00339                                           "finalized", tl.name());
00340     }
00341   } catch (CannotFinalizeThreadException &e) {
00342     tl.unlock();
00343     throw;
00344   } catch (Exception &e) {
00345     tl.unlock();
00346     e.append("One or more threads in list '%s' cannot be finalized", tl.name());
00347     throw CannotFinalizeThreadException(e);
00348   }
00349 
00350   tl.stop();
00351   tl.finalize(finalizer);
00352 
00353   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00354     internal_remove_thread(*i);
00355   }
00356 
00357   tl.unlock();
00358 }
00359 
00360 
00361 /** Remove the given thread.
00362  * The thread manager tries to finalize and stop the thread and then removes the
00363  * thread from the internal structures.
00364  *
00365  * This may fail if the thread cannot be finalized, for
00366  * example if prepare_finalize() returns false or if the thread finalizer cannot
00367  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
00368  *
00369  * @param thread thread to remove.
00370  * @exception CannotFinalizeThreadException At least one thread cannot be safely
00371  * finalized
00372  */
00373 void
00374 FawkesThreadManager::remove_maybelocked(Thread *thread, bool lock)
00375 {
00376   if ( thread == NULL ) return;
00377 
00378   if ( ! (initializer && finalizer) ) {
00379     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00380   }
00381 
00382   MutexLocker locker(threads.mutex(), lock);
00383   try {
00384     if ( ! thread->prepare_finalize() ) {
00385       thread->cancel_finalize();
00386       throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
00387     }
00388   } catch (CannotFinalizeThreadException &e) {
00389     e.append("FawkesThreadManager cannot stop thread '%s'", thread->name());
00390     thread->cancel_finalize();
00391     throw;
00392   }
00393 
00394   thread->cancel();
00395   thread->join();
00396   finalizer->finalize(thread);
00397   thread->finalize();
00398 
00399   internal_remove_thread(thread);
00400 }
00401 
00402 
00403 
00404 
00405 /** Force removal of the given threads.
00406  * The thread manager tries to finalize and stop the threads and then removes the
00407  * threads from the internal structures.
00408  *
00409  * This will succeed even if a thread of the given list cannot be finalized, for
00410  * example if prepare_finalize() returns false or if the thread finalizer cannot
00411  * finalize the thread.
00412  *
00413  * <b>Caution, using this function may damage your robot.</b>
00414  *
00415  * @param tl threads to remove.
00416  * @exception ThreadListNotSealedException if the given thread lits tl is not
00417  * sealed the thread manager will refuse to remove it
00418  * The threads are removed from thread manager control. The threads will be stopped
00419  * before they are removed (may cause unpredictable results otherwise).
00420  */
00421 void
00422 FawkesThreadManager::force_remove(ThreadList &tl)
00423 {
00424   if ( ! tl.sealed() ) {
00425     throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal",
00426                                        tl.name());
00427   }
00428 
00429   tl.lock();
00430   threads.mutex()->stopby();
00431   tl.force_stop(finalizer);
00432 
00433   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00434     internal_remove_thread(*i);
00435   }
00436 
00437   tl.unlock();
00438 }
00439 
00440 
00441 /** Force removal of the given thread.
00442  * The thread manager tries to finalize and stop the thread and then removes the
00443  * thread from the internal structures.
00444  *
00445  * This will succeed even if the thread cannot be finalized, for
00446  * example if prepare_finalize() returns false or if the thread finalizer cannot
00447  * finalize the thread.
00448  *
00449  * <b>Caution, using this function may damage your robot.</b>
00450  *
00451  * @param thread thread to remove.
00452  * @exception ThreadListNotSealedException if the given thread lits tl is not
00453  * sealed the thread manager will refuse to remove it
00454  * The threads are removed from thread manager control. The threads will be stopped
00455  * before they are removed (may cause unpredictable results otherwise).
00456  */
00457 void
00458 FawkesThreadManager::force_remove(fawkes::Thread *thread)
00459 {
00460   MutexLocker lock(threads.mutex());
00461   try {
00462     thread->prepare_finalize();
00463   } catch (Exception &e) {
00464     // ignore
00465   }
00466 
00467   thread->cancel();
00468   thread->join();
00469   if (finalizer) finalizer->finalize(thread);
00470   thread->finalize();
00471 
00472   internal_remove_thread(thread);
00473 }
00474 
00475 
00476 void
00477 FawkesThreadManager::wakeup_and_wait(BlockedTimingAspect::WakeupHook hook,
00478                                      unsigned int timeout_usec)
00479 {
00480   MutexLocker lock(threads.mutex());
00481 
00482   unsigned int timeout_sec = 0;
00483   if (timeout_usec >= 1000000) {
00484     timeout_sec   = timeout_usec / 1000000;
00485     timeout_usec -= timeout_sec  * 1000000;
00486   }
00487 
00488   // Note that the following lines might throw an exception, we just pass it on
00489   if ( threads.find(hook) != threads.end() ) {
00490     threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
00491   }
00492 }
00493 
00494 
00495 void
00496 FawkesThreadManager::wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier)
00497 {
00498   MutexLocker lock(threads.mutex());
00499 
00500   if ( threads.find(hook) != threads.end() ) {
00501     if ( barrier ) {
00502       threads[hook].wakeup(barrier);
00503     } else {
00504       threads[hook].wakeup();
00505     }
00506     if ( threads[hook].size() == 0 ) {
00507       threads.erase(hook);
00508     }
00509   }
00510 }
00511 
00512 
00513 void
00514 FawkesThreadManager::try_recover(std::list<std::string> &recovered_threads)
00515 {
00516   threads.lock();
00517   for (tit = threads.begin(); tit != threads.end(); ++tit) {
00518     tit->second.try_recover(recovered_threads);
00519   }
00520   threads.unlock();
00521 }
00522 
00523 
00524 bool
00525 FawkesThreadManager::timed_threads_exist()
00526 {
00527   return (threads.size() > 0);
00528 }
00529 
00530 
00531 void
00532 FawkesThreadManager::wait_for_timed_threads()
00533 {
00534   __interrupt_timed_thread_wait = false;
00535   waitcond_timedthreads->wait();
00536   if ( __interrupt_timed_thread_wait ) {
00537     __interrupt_timed_thread_wait = false;
00538     throw InterruptedException("Waiting for timed threads was interrupted");
00539   }
00540 }
00541 
00542 void
00543 FawkesThreadManager::interrupt_timed_thread_wait()
00544 {
00545   __interrupt_timed_thread_wait = true;
00546   waitcond_timedthreads->wake_all();
00547 }
00548 
00549 
00550 
00551 /** Get a thread collector to be used for an aspect initializer.
00552  * @return thread collector instance to use for ThreadProducerAspect.
00553  */
00554 ThreadCollector *
00555 FawkesThreadManager::aspect_collector() const
00556 {
00557   return __aspect_collector;
00558 }