24 #include <core/exception.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/thread_collector.h>
28 #include <netcomm/fawkes/handler.h>
29 #include <netcomm/fawkes/message.h>
30 #include <netcomm/fawkes/message_content.h>
31 #include <netcomm/fawkes/message_queue.h>
32 #include <netcomm/fawkes/server_client_thread.h>
33 #include <netcomm/fawkes/server_thread.h>
34 #include <netcomm/utils/acceptor_thread.h>
61 const std::string &listen_ipv4,
62 const std::string &listen_ipv6,
63 unsigned int fawkes_port,
65 :
Thread(
"FawkesNetworkServerThread",
Thread::OPMODE_WAITFORWAKEUP)
67 this->thread_collector = thread_collector;
74 this,
Socket::IPv4, listen_ipv4, fawkes_port,
"FawkesNetworkAcceptorThread"));
78 this,
Socket::IPv6, listen_ipv6, fawkes_port,
"FawkesNetworkAcceptorThread"));
81 if (thread_collector) {
82 for (
size_t i = 0; i < acceptor_threads.size(); ++i) {
83 thread_collector->
add(acceptor_threads[i]);
86 for (
size_t i = 0; i < acceptor_threads.size(); ++i) {
87 acceptor_threads[i]->start();
95 for (cit = clients.begin(); cit != clients.end(); ++cit) {
96 if (thread_collector) {
97 thread_collector->
remove((*cit).second);
99 (*cit).second->cancel();
100 (*cit).second->join();
102 delete (*cit).second;
104 for (
size_t i = 0; i < acceptor_threads.size(); ++i) {
105 if (thread_collector) {
106 thread_collector->
remove(acceptor_threads[i]);
108 acceptor_threads[i]->cancel();
109 acceptor_threads[i]->join();
111 delete acceptor_threads[i];
113 acceptor_threads.clear();
115 delete inbound_messages;
129 if (thread_collector) {
130 thread_collector->add(client);
134 unsigned int cid = next_client_id++;
135 clients[cid] = client;
139 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
140 (*hit).second->client_connected(cid);
154 if (handlers.find(handler->
id()) != handlers.end()) {
155 throw Exception(
"Handler already registered");
157 handlers[handler->
id()] = handler;
167 if (handlers.find(handler->
id()) != handlers.end()) {
168 handlers.erase(handler->
id());
181 std::list<unsigned int> dead_clients;
184 for (cit = clients.begin(); cit != clients.end(); ++cit) {
185 if (!cit->second->alive()) {
186 dead_clients.push_back(cit->first);
191 std::list<unsigned int>::iterator dci;
192 for (dci = dead_clients.begin(); dci != dead_clients.end(); ++dci) {
193 const unsigned int clid = *dci;
197 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
198 (*hit).second->client_disconnected(clid);
204 if (thread_collector) {
205 thread_collector->
remove(clients[clid]);
207 clients[clid]->cancel();
208 clients[clid]->join();
211 delete clients[clid];
217 inbound_messages->
lock();
218 while (!inbound_messages->empty()) {
222 if (handlers.find(m->
cid()) != handlers.end()) {
223 handlers[m->
cid()]->handle_network_message(m);
227 inbound_messages->pop();
229 inbound_messages->
unlock();
237 for (cit = clients.begin(); cit != clients.end(); ++cit) {
238 (*cit).second->force_send();
253 for (cit = clients.begin(); cit != clients.end(); ++cit) {
254 if ((*cit).second->alive()) {
256 (*cit).second->enqueue(msg);
273 unsigned short int msg_id,
275 unsigned int payload_size)
307 unsigned int clid = msg->
clid();
308 if (clients.find(clid) != clients.end()) {
309 if (clients[clid]->alive()) {
310 clients[clid]->enqueue(msg);
326 unsigned short int component_id,
327 unsigned short int msg_id,
329 unsigned int payload_size)
346 unsigned short int component_id,
347 unsigned short int msg_id,
364 unsigned short int component_id,
365 unsigned short int msg_id)
Base class for exceptions in Fawkes.
Network handler abstract base class.
unsigned short int id() const
Get the component ID for this handler.
Fawkes network message content.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Representation of a message that is sent over the network.
unsigned short int cid() const
Get component ID.
unsigned int clid() const
Get client ID.
Fawkes Network Client Thread for server.
void set_clid(unsigned int client_id)
Set client ID.
virtual void broadcast(FawkesNetworkMessage *msg)
Broadcast a message.
void add_connection(StreamSocket *s) noexcept
Add a new connection.
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
virtual void add_handler(FawkesNetworkHandler *handler)
Add a handler.
virtual void loop()
Fawkes network thread loop.
virtual ~FawkesNetworkServerThread()
Destructor.
void force_send()
Force sending of all pending messages.
virtual void remove_handler(FawkesNetworkHandler *handler)
Remove handler.
virtual void send(FawkesNetworkMessage *msg)
Send a message.
FawkesNetworkServerThread(bool enable_ipv4, bool enable_ipv6, const std::string &listen_ipv4, const std::string &listen_ipv6, unsigned int fawkes_port, ThreadCollector *thread_collector=0)
Constructor.
void push_locked(const Type &x)
Push element to queue with lock protection.
void lock() const
Lock queue.
void unlock() const
Unlock list.
void unlock()
Unlock the mutex.
void unref()
Decrement reference count and conditionally delete this instance.
void ref()
Increment reference count.
TCP stream socket over IP.
virtual void add(ThreadList &tl)=0
Add multiple threads.
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
Thread class encapsulation of pthreads.
void start(bool wait=true)
Call this method to start the thread.
Fawkes library namespace.