Fawkes API  Fawkes Development Version
server_client_thread.cpp
1 
2 /***************************************************************************
3  * server_client_thread.cpp - Thread handling Fawkes network client
4  *
5  * Created: Fri Nov 17 17:23:24 2006
6  * Copyright 2006-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/system.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/wait_condition.h>
27 #include <netcomm/fawkes/message_queue.h>
28 #include <netcomm/fawkes/server_client_thread.h>
29 #include <netcomm/fawkes/server_thread.h>
30 #include <netcomm/fawkes/transceiver.h>
31 #include <netcomm/socket/stream.h>
32 #include <netcomm/utils/exceptions.h>
33 
34 #include <unistd.h>
35 
36 namespace fawkes {
37 
38 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h>
39  * Sending thread for a Fawkes client connected to the server.
40  * This thread is spawned for each client connected to the server to handle the
41  * server-side sending
42  * @ingroup NetComm
43  * @author Tim Niemueller
44  */
45 
47 {
48 public:
49  /** Constructor.
50  * @param s client stream socket
51  * @param parent parent FawkesNetworkServerClientThread instance
52  */
54  : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
55  {
56  s_ = s;
57  parent_ = parent;
58  outbound_mutex_ = new Mutex();
59  outbound_msgqs_[0] = new FawkesNetworkMessageQueue();
60  outbound_msgqs_[1] = new FawkesNetworkMessageQueue();
61  outbound_active_ = 0;
62  outbound_msgq_ = outbound_msgqs_[0];
63  }
64 
65  /** Destructor. */
67  {
68  for (unsigned int i = 0; i < 2; ++i) {
69  while (!outbound_msgqs_[i]->empty()) {
70  FawkesNetworkMessage *m = outbound_msgqs_[i]->front();
71  m->unref();
72  outbound_msgqs_[i]->pop();
73  }
74  }
75  delete outbound_msgqs_[0];
76  delete outbound_msgqs_[1];
77  delete outbound_mutex_;
78  }
79 
80  virtual void
81  loop()
82  {
83  if (!parent_->alive())
84  return;
85 
86  while (outbound_havemore_) {
87  outbound_mutex_->lock();
88  outbound_havemore_ = false;
89  FawkesNetworkMessageQueue *q = outbound_msgq_;
90  outbound_active_ = 1 - outbound_active_;
91  outbound_msgq_ = outbound_msgqs_[outbound_active_];
92  outbound_mutex_->unlock();
93 
94  if (!q->empty()) {
95  try {
97  } catch (ConnectionDiedException &e) {
98  parent_->connection_died();
99  exit();
100  }
101  }
102  }
103  }
104 
105  /** Enqueue message to outbound queue.
106  * This enqueues the given message to the outbound queue. The message will
107  * be sent in the next loop iteration. This method takes ownership of the
108  * transmitted message. If you want to use the message after enqueuing you
109  * must reference it explicitly.
110  * @param msg message to enqueue
111  */
112  void
114  {
115  outbound_mutex_->lock();
116  outbound_msgq_->push(msg);
117  outbound_havemore_ = true;
118  outbound_mutex_->unlock();
119  wakeup();
120  }
121 
122  /** Wait until all data has been sent. */
123  void
125  {
126  loop_mutex->lock();
127  loop_mutex->unlock();
128  }
129 
130  /** Stub to see name in backtrace for easier debugging. @see Thread::run() */
131 protected:
132  virtual void
133  run()
134  {
135  Thread::run();
136  }
137 
138 private:
139  StreamSocket * s_;
141 
142  Mutex * outbound_mutex_;
143  unsigned int outbound_active_;
144  bool outbound_havemore_;
145  FawkesNetworkMessageQueue *outbound_msgq_;
146  FawkesNetworkMessageQueue *outbound_msgqs_[2];
147 };
148 
149 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h
150  * Fawkes Network Client Thread for server.
151  * The FawkesNetworkServerThread spawns an instance of this class for every incoming
152  * connection. It is then used to handle the client.
153  * The thread will start another thread, an instance of
154  * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing
155  * traffic.
156  *
157  * @ingroup NetComm
158  * @author Tim Niemueller
159  */
160 
161 /** Constructor.
162  * @param s socket to client
163  * @param parent parent network thread
164  */
167 : Thread("FawkesNetworkServerClientThread")
168 {
169  _s = s;
170  _parent = parent;
171  _alive = true;
172  _clid = 0;
173  _inbound_queue = new FawkesNetworkMessageQueue();
174 
175  _send_slave = new FawkesNetworkServerClientSendThread(_s, this);
176 
177  set_prepfin_conc_loop(true);
178 }
179 
180 /** Destructor. */
182 {
183  _send_slave->cancel();
184  _send_slave->join();
185  delete _send_slave;
186  delete _s;
187  delete _inbound_queue;
188 }
189 
190 /** Get client ID.
191  * The client ID can be used to send replies.
192  * @return client ID
193  */
194 unsigned int
196 {
197  return _clid;
198 }
199 
200 /** Set client ID.
201  * @param client_id new client ID
202  */
203 void
205 {
206  _clid = client_id;
207 }
208 
209 /** Receive data.
210  * Receives data from the network if there is any and then dispatches all
211  * inbound messages via the parent FawkesNetworkThread::dispatch()
212  */
213 void
214 FawkesNetworkServerClientThread::recv()
215 {
216  try {
217  FawkesNetworkTransceiver::recv(_s, _inbound_queue);
218 
219  _inbound_queue->lock();
220  while (!_inbound_queue->empty()) {
221  FawkesNetworkMessage *m = _inbound_queue->front();
222  m->set_client_id(_clid);
223  _parent->dispatch(m);
224  m->unref();
225  _inbound_queue->pop();
226  }
227  _parent->wakeup();
228  _inbound_queue->unlock();
229 
230  } catch (ConnectionDiedException &e) {
231  _alive = false;
232  _s->close();
233  _parent->wakeup();
234  }
235 }
236 
237 void
239 {
240  _send_slave->start();
241 }
242 
243 /** Thread loop.
244  * The client thread loop polls on the socket for 10 ms (wait for events
245  * on the socket like closed connection or data that can be read). If any
246  * event occurs it is processed. If the connection died or any other
247  * error occured the thread is cancelled and the parent FawkesNetworkThread
248  * is woken up to carry out any action that is needed when a client dies.
249  * If data is available for reading thedata is received and dispatched
250  * via recv().
251  * Afterwards the outbound message queue is processed and alle messages are
252  * sent. This is also done if the operation could block (POLL_OUT is not
253  * honored).
254  */
255 void
257 {
258  if (!_alive) {
259  usleep(1000000);
260  return;
261  }
262 
263  short p = 0;
264  try {
265  p = _s->poll(); // block until we got a message
266  } catch (InterruptedException &e) {
267  // we just ignore this and try it again
268  return;
269  }
270 
271  if ((p & Socket::POLL_ERR) || (p & Socket::POLL_HUP) || (p & Socket::POLL_RDHUP)) {
272  _alive = false;
273  _parent->wakeup();
274  } else if (p & Socket::POLL_IN) {
275  // Data can be read
276  recv();
277  }
278 }
279 
280 /** Enqueue message to outbound queue.
281  * This enqueues the given message to the outbound queue. The message will be send
282  * in the next loop iteration.
283  * @param msg message to enqueue
284  */
285 void
287 {
288  _send_slave->enqueue(msg);
289 }
290 
291 /** Check aliveness of connection.
292  * @return true if connection is still alive, false otherwise.
293  */
294 bool
296 {
297  return _alive;
298 }
299 
300 /** Force sending of all pending outbound messages.
301  * This is a blocking operation. The current poll will be interrupted by sending
302  * a signal to this thread (and ignoring it) and then wait for the sending to
303  * finish.
304  */
305 void
307 {
308  _send_slave->wait_for_all_sent();
309 }
310 
311 /** Connection died notification.
312  * To be called only be the send slave thread.
313  */
314 void
316 {
317  _alive = false;
318  _parent->wakeup();
319 }
320 
321 } // end namespace fawkes
Thrown if the connection died during an operation.
Definition: exceptions.h:32
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:33
Representation of a message that is sent over the network.
Definition: message.h:77
void set_client_id(unsigned int clid)
Set client ID.
Definition: message.cpp:330
Sending thread for a Fawkes client connected to the server.
virtual void loop()
Code to execute in the thread.
FawkesNetworkServerClientSendThread(StreamSocket *s, FawkesNetworkServerClientThread *parent)
Constructor.
void wait_for_all_sent()
Wait until all data has been sent.
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
virtual void run()
Stub to see name in backtrace for easier debugging.
Fawkes Network Client Thread for server.
unsigned int clid() const
Get client ID.
void force_send()
Force sending of all pending outbound messages.
bool alive() const
Check aliveness of connection.
void set_clid(unsigned int client_id)
Set client ID.
void connection_died()
Connection died notification.
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
FawkesNetworkServerClientThread(StreamSocket *s, FawkesNetworkServerThread *parent)
Constructor.
virtual void once()
Execute an action exactly once.
Fawkes Network Thread.
Definition: server_thread.h:49
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
Definition: transceiver.cpp:51
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Definition: transceiver.cpp:85
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void lock() const
Lock queue.
Definition: lock_queue.h:114
void unlock() const
Unlock list.
Definition: lock_queue.h:128
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
Definition: socket.h:69
static const short POLL_HUP
Hang up.
Definition: socket.h:71
static const short POLL_IN
Data can be read.
Definition: socket.h:66
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
static const short POLL_ERR
Error condition.
Definition: socket.h:70
virtual void close()
Close socket.
Definition: socket.cpp:311
TCP stream socket over IP.
Definition: stream.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
Definition: thread.h:152
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
void join()
Join the thread.
Definition: thread.cpp:597
void exit()
Exit the thread.
Definition: thread.cpp:582
void wakeup()
Wake up thread.
Definition: thread.cpp:995
void cancel()
Cancel a thread.
Definition: thread.cpp:646
virtual void run()
Code to execute in the thread.
Definition: thread.cpp:918
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
Definition: thread.h:58
Fawkes library namespace.