24 #include <core/exceptions/software.h>
25 #include <core/exceptions/system.h>
26 #include <core/threading/barrier.h>
27 #include <core/threading/mutex.h>
28 #include <core/threading/mutex_locker.h>
29 #include <core/threading/read_write_lock.h>
30 #include <core/threading/thread.h>
31 #include <core/threading/thread_finalizer.h>
32 #include <core/threading/thread_loop_listener.h>
33 #include <core/threading/thread_notification_listener.h>
34 #include <core/threading/wait_condition.h>
35 #include <core/utils/lock_list.h>
37 #if defined(__gnu_linux__) && !defined(_GNU_SOURCE)
188 pthread_mutex_t Thread::thread_key_mutex_ = PTHREAD_MUTEX_INITIALIZER;
191 pthread_key_t Thread::THREAD_KEY = PTHREAD_KEYS_MAX;
193 #define MAIN_THREAD_NAME "MainThread"
217 __constructor(
name, op_mode);
240 Thread::__constructor(
const char *name, OpMode op_mode)
244 prepfin_conc_loop_ =
false;
245 coalesce_wakeups_ =
false;
247 name_ = strdup(
name);
248 notification_listeners_ =
new LockList<ThreadNotificationListener *>();
249 loop_listeners_ =
new LockList<ThreadLoopListener *>();
252 sleep_mutex_ =
new Mutex();
253 sleep_condition_ =
new WaitCondition(sleep_mutex_);
254 waiting_for_wakeup_ =
true;
256 sleep_condition_ = NULL;
258 waiting_for_wakeup_ =
false;
266 delete_on_exit_ =
false;
267 prepfin_hold_ =
false;
268 pending_wakeups_ = 0;
274 loop_done_mutex_ =
new Mutex();
275 loop_done_waitcond_ =
new WaitCondition(loop_done_mutex_);
278 prepfin_hold_mutex_ =
new Mutex();
279 prepfin_hold_waitcond_ =
new WaitCondition(prepfin_hold_mutex_);
280 startup_barrier_ =
new Barrier(2);
289 delete sleep_condition_;
293 delete notification_listeners_;
294 delete loop_listeners_;
296 delete startup_barrier_;
297 delete prepfin_hold_mutex_;
298 delete prepfin_hold_waitcond_;
299 delete loop_done_waitcond_;
300 delete loop_done_mutex_;
312 throw Exception(
"You may not use copy constructor of class Thread");
320 Thread::operator=(
const Thread &t)
322 throw Exception(
"You may not use assignment operator of class Thread");
383 prepfin_hold_mutex_->
lock();
384 while (prepfin_hold_) {
385 prepfin_hold_waitcond_->
wait();
387 if (!prepfin_conc_loop_) {
393 if (!prepfin_conc_loop_) {
397 prepfin_hold_mutex_->
unlock();
503 throw Exception(
"You cannot start the same thread twice!");
511 if ((err = pthread_create(&thread_id_, NULL, Thread::entry,
this)) != 0) {
513 throw Exception(
"Could not start thread", err);
515 #if defined(_GNU_SOURCE) && defined(__GLIBC__) \
516 && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12) || __GLIBC__ > 2)
518 strncpy(tmpname, name_, 15);
520 pthread_setname_np(thread_id_, tmpname);
524 startup_barrier_->
wait();
528 Thread::lock_sleep_mutex()
531 sleep_mutex_->
lock();
541 Thread::entry(
void *pthis)
549 set_tsd_thread_instance(t);
552 t->lock_sleep_mutex();
555 t->notify_of_startup();
559 t->startup_barrier_->wait();
562 t->loop_mutex->lock();
564 t->loop_mutex->unlock();
584 if (delete_on_exit_) {
588 waiting_for_wakeup_ =
false;
601 pthread_join(thread_id_, &dont_care);
604 if (sleep_mutex_ != NULL) {
626 loop_listeners_->try_lock();
627 loop_listeners_->unlock();
639 pthread_detach(thread_id_);
648 if (started_ && !cancelled_) {
649 if (pthread_cancel(thread_id_) == 0) {
650 waiting_for_wakeup_ =
false;
664 pthread_kill(thread_id_, sig);
685 throw Exception(
"Cannot set thread opmode while running");
690 delete sleep_condition_;
692 sleep_condition_ = NULL;
695 sleep_mutex_ =
new Mutex();
718 prepfin_conc_loop_ = concurrent;
733 coalesce_wakeups_ = coalesce;
737 coalesce_wakeups_ = coalesce;
751 va_start(va, format);
752 char *old_name = name_;
753 if (vasprintf(&name_, format, va) == -1) {
760 #if defined(_GNU_SOURCE) && defined(__GLIBC__) \
761 && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12) || __GLIBC__ > 2)
764 strncpy(tmpname, name_, 15);
766 pthread_setname_np(thread_id_, tmpname);
783 prepfin_hold_mutex_->
lock();
785 prepfin_hold_mutex_->
unlock();
786 throw Exception(
"Thread(%s)::set_prepfin_hold: prepare_finalize() has "
787 "been called already()",
790 prepfin_hold_ = hold;
794 prepfin_hold_mutex_->
unlock();
863 return waiting_for_wakeup_;
873 pthread_testcancel();
899 return (pthread_equal(thread_id_, thread.thread_id_) != 0);
923 while (pending_wakeups_ == 0) {
924 waiting_for_wakeup_ =
true;
925 sleep_condition_->
wait();
927 pending_wakeups_ -= 1;
938 loop_listeners_->lock();
940 it != loop_listeners_->end();
942 (*it)->pre_loop(
this);
944 loop_listeners_->unlock();
950 loop_listeners_->lock();
952 it != loop_listeners_->rend();
954 (*it)->post_loop(
this);
956 loop_listeners_->unlock();
959 loop_done_mutex_->
lock();
961 loop_done_mutex_->
unlock();
967 sleep_mutex_->
lock();
974 sleep_mutex_->
lock();
976 sleep_mutex_->
lock();
979 while (pending_wakeups_ == 0) {
980 waiting_for_wakeup_ =
true;
981 sleep_condition_->
wait();
983 pending_wakeups_ -= 1;
1001 throw Exception(
"Thread(%s): wakeup() cannot be called if loop is running "
1002 "with barrier already",
1006 if (coalesce_wakeups_)
1007 pending_wakeups_ = 1;
1009 pending_wakeups_ += 1;
1010 if (waiting_for_wakeup_) {
1012 waiting_for_wakeup_ =
false;
1029 if (barrier == NULL) {
1034 if (!waiting_for_wakeup_ && barrier_) {
1035 throw Exception(
"Thread %s already running with barrier, cannot wakeup %i %p",
1037 waiting_for_wakeup_,
1041 pending_wakeups_ += 1;
1043 if (waiting_for_wakeup_) {
1045 waiting_for_wakeup_ =
false;
1054 loop_done_mutex_->
lock();
1055 while (!loop_done_) {
1056 loop_done_waitcond_->
wait();
1058 loop_done_mutex_->
unlock();
1069 if (delete_on_exit_) {
1098 delete_on_exit_ = del;
1109 return (pending_wakeups_ > 0);
1135 flags_ &= 0xFFFFFFFF ^ flag;
1164 notification_listeners_->push_back_locked(notification_listener);
1173 notification_listeners_->remove_locked(notification_listener);
1183 loop_listeners_->push_back_locked(loop_listener);
1192 loop_listeners_->remove_locked(loop_listener);
1199 Thread::notify_of_startup()
1201 notification_listeners_->lock();
1203 while (i != notification_listeners_->end()) {
1204 if (!(*i)->thread_started(
this)) {
1205 i = notification_listeners_->erase(i);
1210 notification_listeners_->unlock();
1220 notification_listeners_->lock();
1222 while (i != notification_listeners_->end()) {
1223 if (!(*i)->thread_init_failed(
this)) {
1224 i = notification_listeners_->erase(i);
1229 notification_listeners_->unlock();
1236 Thread::init_thread_key()
1238 pthread_mutex_lock(&thread_key_mutex_);
1239 if (THREAD_KEY == PTHREAD_KEYS_MAX) {
1242 if ((err = pthread_key_create(&THREAD_KEY, NULL)) != 0) {
1243 if (ENOMEM == err) {
1245 "specific data (reference to thread)");
1247 throw Exception(
"Thread key for reference to thread could not be created", err);
1251 pthread_mutex_unlock(&thread_key_mutex_);
1260 Thread::set_tsd_thread_instance(Thread *t)
1263 if ((err = pthread_setspecific(THREAD_KEY, t)) != 0) {
1264 if (ENOMEM == err) {
1265 throw OutOfMemoryException(
"Could not set specific data (reference to thread)");
1267 throw Exception(
"Could not set specific data (reference to thread), unknown reason");
1279 Thread *t =
new Thread(MAIN_THREAD_NAME, pthread_self());
1280 set_tsd_thread_instance(t);
1292 if (strcmp(t->
name(), MAIN_THREAD_NAME) == 0) {
1295 throw Exception(
"Main thread can only be destroyed in main thread");
1307 return pthread_self();
1323 #if defined(_GNU_SOURCE) && defined(__GLIBC__) \
1324 && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12) || __GLIBC__ > 2)
1327 if (pthread_getname_np(pthread_self(),
name, 16) == 0) {
1347 return t->
set_name(
"%s", thread_name.c_str());
1348 #if defined(_GNU_SOURCE) && defined(__GLIBC__) \
1349 && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12) || __GLIBC__ > 2)
1351 pthread_setname_np(pthread_self(), thread_name.c_str());
1368 if (THREAD_KEY == PTHREAD_KEYS_MAX) {
1369 throw Exception(
"No thread has been initialized");
1371 return (
Thread *)pthread_getspecific(THREAD_KEY);
1383 if (THREAD_KEY == PTHREAD_KEYS_MAX) {
1386 return (
Thread *)pthread_getspecific(THREAD_KEY);
1398 int oldstate = PTHREAD_CANCEL_ENABLE;
1399 int newstate = PTHREAD_CANCEL_ENABLE;
1401 newstate = PTHREAD_CANCEL_DISABLE;
1404 pthread_setcancelstate(newstate, &oldstate);
1406 if (old_state != NULL) {
1407 if (oldstate == PTHREAD_CANCEL_DISABLE) {
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
virtual void wait()
Wait for other threads.
Thread cannot be finalized.
Base class for exceptions in Fawkes.
Mutex mutual exclusion lock.
bool try_lock()
Tries to lock the mutex.
void lock()
Lock this mutex.
void stopby()
Shortly stop by at the mutex.
void unlock()
Unlock the mutex.
A NULL pointer was supplied where not allowed.
System ran out of memory and desired operation could not be fulfilled.
Thread loop listener interface.
Thread notification listener interface.
Thread class encapsulation of pthreads.
bool finalize_prepared
True if prepare_finalize() has been called and was not stopped with a cancel_finalize(),...
void kill(int sig)
Send signal to a thread.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
void add_loop_listener(ThreadLoopListener *loop_listener)
Add loop listener.
const char * name() const
Get name of thread.
OpMode opmode() const
Get operation mode.
static void destroy_main()
Destroy main thread wrapper instance.
void start(bool wait=true)
Call this method to start the thread.
static void init_main()
Initialize Thread wrapper instance for main thread.
void unset_flag(uint32_t flag)
Unset flag.
static const unsigned int FLAG_BAD
Standard thread flag: "thread is bad".
@ CANCEL_ENABLED
cancellation is possible
@ CANCEL_DISABLED
thread cannot be cancelled
static void set_cancel_state(CancelState new_state, CancelState *old_state=0)
Set the cancel state of the current thread.
virtual ~Thread()
Virtual destructor.
void join()
Join the thread.
static pthread_t current_thread_id()
Get the ID of the currently running thread.
void yield()
Yield the processor to another thread or process.
void set_prepfin_hold(bool hold)
Hold prepare_finalize().
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
void wait_loop_done()
Wait for the current loop iteration to finish.
bool waiting() const
Check if thread is currently waiting for wakeup.
bool running() const
Check if the thread is running.
static Thread * current_thread_noexc() noexcept
Similar to current_thread, but does never throw an exception.
bool detached() const
Check if thread has been detached.
bool started() const
Check if thread has been started.
void set_opmode(OpMode op_mode)
Set operation mode.
Thread(const char *name)
Constructor.
void exit()
Exit the thread.
void detach()
Detach the thread.
virtual void once()
Execute an action exactly once.
bool prepare_finalize()
Prepare finalization.
bool flagged_bad() const
Check if FLAG_BAD was set.
virtual void init()
Initialize the thread.
void set_name(const char *format,...)
Set name of thread.
void add_notification_listener(ThreadNotificationListener *notification_listener)
Add notification listener.
void wakeup()
Wake up thread.
virtual void loop()
Code to execute in the thread.
void remove_loop_listener(ThreadLoopListener *loop_listener)
Remove loop listener.
pthread_t thread_id() const
Get ID of thread.
void test_cancel()
Set cancellation point.
void set_flag(uint32_t flag)
Set flag for the thread.
bool wakeup_pending()
Check if wakeups are pending.
void cancel()
Cancel a thread.
void cancel_finalize()
Cancel finalization.
Mutex * loopinterrupt_antistarve_mutex
Mutex to avoid starvation when trying to lock loop_mutex.
static std::string current_thread_name()
Get the name of the current thread.
bool cancelled() const
Check if thread has been cancelled.
static Thread * current_thread()
Get the Thread instance of the currently running thread.
void notify_of_failed_init()
Notify of failed init.
virtual void run()
Code to execute in the thread.
bool operator==(const Thread &thread)
Check if two threads are the same.
void set_delete_on_exit(bool del)
Set whether the thread should be deleted on exit.
void remove_notification_listener(ThreadNotificationListener *notification_listener)
Remove notification listener.
void set_flags(uint32_t flags)
Set all flags in one go.
virtual void finalize()
Finalize the thread.
OpMode
Thread operation mode.
@ OPMODE_CONTINUOUS
operate in continuous mode (default)
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
Fawkes library namespace.