24 #include <aspect/blocked_timing.h>
25 #include <baseapp/thread_manager.h>
26 #include <core/exceptions/software.h>
27 #include <core/exceptions/system.h>
28 #include <core/threading/mutex_locker.h>
29 #include <core/threading/thread.h>
30 #include <core/threading/thread_finalizer.h>
31 #include <core/threading/thread_initializer.h>
32 #include <core/threading/wait_condition.h>
56 ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(
57 ThreadManager *parent_manager)
59 parent_manager_ = parent_manager;
63 ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
65 BlockedTimingAspect *timed_thread;
67 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
68 if ((timed_thread =
dynamic_cast<BlockedTimingAspect *
>(*i)) != NULL) {
69 throw IllegalArgumentException(
70 "ThreadProducerAspect may not add threads with BlockedTimingAspect");
74 parent_manager_->add_maybelocked(tl,
false);
78 ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
80 BlockedTimingAspect *timed_thread;
82 if ((timed_thread =
dynamic_cast<BlockedTimingAspect *
>(t)) != NULL) {
83 throw IllegalArgumentException(
84 "ThreadProducerAspect may not add threads with BlockedTimingAspect");
87 parent_manager_->add_maybelocked(t,
false);
91 ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
93 BlockedTimingAspect *timed_thread;
95 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
96 if ((timed_thread =
dynamic_cast<BlockedTimingAspect *
>(*i)) != NULL) {
97 throw IllegalArgumentException(
98 "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
102 parent_manager_->remove_maybelocked(tl,
false);
106 ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
108 BlockedTimingAspect *timed_thread;
110 if ((timed_thread =
dynamic_cast<BlockedTimingAspect *
>(t)) != NULL) {
111 throw IllegalArgumentException(
112 "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
115 parent_manager_->remove_maybelocked(t,
false);
121 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
125 ThreadManager::ThreadManagerAspectCollector::force_remove(
fawkes::Thread *t)
127 throw AccessViolationException(
"ThreadManagerAspect threads may not force removal of threads");
140 interrupt_timed_thread_wait_ =
false;
141 aspect_collector_ =
new ThreadManagerAspectCollector(
this);
156 interrupt_timed_thread_wait_ =
false;
157 aspect_collector_ =
new ThreadManagerAspectCollector(
this);
166 for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
168 tit_->second.force_stop(finalizer_);
178 delete waitcond_timedthreads_;
179 delete aspect_collector_;
190 initializer_ = initializer;
191 finalizer_ = finalizer;
202 ThreadManager::internal_remove_thread(
Thread *t)
209 if (threads_.find(hook) != threads_.end()) {
210 threads_[hook].remove_locked(t);
211 if (threads_[hook].empty())
212 threads_.erase(hook);
227 ThreadManager::internal_add_thread(Thread *t)
229 BlockedTimingAspect *timed_thread;
230 if ((timed_thread =
dynamic_cast<BlockedTimingAspect *
>(t)) != NULL) {
233 if (threads_.find(hook) == threads_.end()) {
234 threads_[hook].set_name(
"ThreadManagerList Hook %i", hook);
235 threads_[hook].set_maintain_barrier(
true);
237 threads_[hook].push_back_locked(t);
257 ThreadManager::add_maybelocked(ThreadList &tl,
bool lock)
259 if (!(initializer_ && finalizer_)) {
260 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
264 throw Exception(
"Not accepting new threads from list that is not fresh, "
265 "list '%s' already sealed",
273 tl.init(initializer_, finalizer_);
274 }
catch (Exception &e) {
283 MutexLocker locker(threads_.mutex(), lock);
284 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
285 internal_add_thread(*i);
302 ThreadManager::add_maybelocked(Thread *thread,
bool lock)
304 if (thread == NULL) {
305 throw NullPointerException(
"FawkesThreadMananger: cannot add NULL as thread");
308 if (!(initializer_ && finalizer_)) {
309 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
313 initializer_->
init(thread);
314 }
catch (CannotInitializeThreadException &e) {
315 thread->notify_of_failed_init();
316 e.append(
"Adding thread in ThreadManager failed");
325 }
catch (CannotInitializeThreadException &e) {
326 thread->notify_of_failed_init();
329 }
catch (Exception &e) {
330 thread->notify_of_failed_init();
331 CannotInitializeThreadException cite(e);
332 cite.append(
"Could not initialize thread '%s' (ThreadManager)", thread->name());
335 }
catch (std::exception &e) {
336 thread->notify_of_failed_init();
337 CannotInitializeThreadException cite;
338 cite.append(
"Caught std::exception: %s", e.what());
339 cite.append(
"Could not initialize thread '%s' (ThreadManager)", thread->name());
343 thread->notify_of_failed_init();
344 CannotInitializeThreadException cite(
"Could not initialize thread '%s' (ThreadManager)",
346 cite.append(
"Unknown exception caught");
352 MutexLocker locker(threads_.mutex(), lock);
353 internal_add_thread(thread);
371 ThreadManager::remove_maybelocked(ThreadList &tl,
bool lock)
373 if (!(initializer_ && finalizer_)) {
374 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
378 throw ThreadListNotSealedException(
"(ThreadManager) Cannot remove unsealed thread "
379 "list. Not accepting unsealed list '%s' for removal",
384 MutexLocker locker(threads_.mutex(), lock);
387 if (!tl.prepare_finalize(finalizer_)) {
388 tl.cancel_finalize();
390 throw CannotFinalizeThreadException(
"One or more threads in list '%s' cannot be "
394 }
catch (CannotFinalizeThreadException &e) {
397 }
catch (Exception &e) {
399 e.append(
"One or more threads in list '%s' cannot be finalized", tl.name());
400 throw CannotFinalizeThreadException(e);
405 tl.finalize(finalizer_);
406 }
catch (Exception &e) {
411 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
412 internal_remove_thread(*i);
431 ThreadManager::remove_maybelocked(Thread *thread,
bool lock)
436 if (!(initializer_ && finalizer_)) {
437 throw NullPointerException(
"ThreadManager: initializer/finalizer not set");
440 MutexLocker locker(threads_.mutex(), lock);
442 if (!thread->prepare_finalize()) {
443 thread->cancel_finalize();
444 throw CannotFinalizeThreadException(
"Thread '%s'cannot be finalized", thread->name());
446 }
catch (CannotFinalizeThreadException &e) {
447 e.append(
"ThreadManager cannot stop thread '%s'", thread->name());
448 thread->cancel_finalize();
457 internal_remove_thread(thread);
484 threads_.mutex()->stopby();
485 bool caught_exception =
false;
486 Exception exc(
"Forced removal of thread list %s failed", tl.
name());
490 caught_exception =
true;
494 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
495 internal_remove_thread(*i);
500 if (caught_exception) {
537 internal_remove_thread(thread);
545 unsigned int timeout_sec = 0;
546 if (timeout_usec >= 1000000) {
547 timeout_sec = timeout_usec / 1000000;
548 timeout_usec -= timeout_sec * 1000000;
552 if (threads_.find(hook) != threads_.end()) {
553 threads_[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
562 if (threads_.find(hook) != threads_.end()) {
564 threads_[hook].wakeup(barrier);
566 threads_[hook].wakeup();
568 if (threads_[hook].size() == 0) {
569 threads_.erase(hook);
578 for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
579 tit_->second.try_recover(recovered_threads);
587 return (threads_.size() > 0);
593 interrupt_timed_thread_wait_ =
false;
594 waitcond_timedthreads_->
wait();
595 if (interrupt_timed_thread_wait_) {
596 interrupt_timed_thread_wait_ =
false;
604 interrupt_timed_thread_wait_ =
true;
614 return aspect_collector_;
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Thread aspect to use blocked timing.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
The current system call has been interrupted (for instance by a signal).
Thread finalizer interface.
virtual void finalize(Thread *thread)=0
Finalize a thread.
Thread initializer interface.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
Thread list not sealed exception.
void remove_locked(Thread *thread)
Remove with lock protection.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
const char * name()
Name of the thread list.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
bool sealed()
Check if list is sealed.
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
virtual ~ThreadManager()
Destructor.
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
virtual void wait_for_timed_threads()
Wait for timed threads.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
virtual bool timed_threads_exist()
Check if any timed threads exist.
ThreadManager()
Constructor.
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Thread class encapsulation of pthreads.
void join()
Join the thread.
bool prepare_finalize()
Prepare finalization.
void cancel()
Cancel a thread.
virtual void finalize()
Finalize the thread.
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
Fawkes library namespace.