Fawkes API  Fawkes Development Version
communicator.cpp
1 
2 /***************************************************************************
3  * communicator.cpp - protobuf network communication for CLIPS
4  *
5  * Created: Tue Apr 16 13:51:14 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <core/threading/mutex_locker.h>
38 #include <google/protobuf/descriptor.h>
39 #include <logging/logger.h>
40 #include <protobuf_clips/communicator.h>
41 #include <protobuf_comm/client.h>
42 #include <protobuf_comm/peer.h>
43 #include <protobuf_comm/server.h>
44 
45 #include <boost/format.hpp>
46 
47 using namespace google::protobuf;
48 using namespace protobuf_comm;
49 using namespace boost::placeholders;
50 
51 namespace protobuf_clips {
52 
53 /** @class ClipsProtobufCommunicator <protobuf_clips/communicator.h>
54  * CLIPS protobuf integration class.
55  * This class adds functionality related to protobuf to a given CLIPS
56  * environment. It supports the creation of communication channels
57  * through protobuf_comm. An instance maintains its own message register
58  * shared among server, peer, and clients.
59  * @author Tim Niemueller
60  */
61 
62 /** Constructor.
63  * @param env CLIPS environment to which to provide the protobuf functionality
64  * @param env_mutex mutex to lock when operating on the CLIPS environment.
65  * @param logger optional logger for informational output
66  */
67 ClipsProtobufCommunicator::ClipsProtobufCommunicator(CLIPS::Environment *env,
68  fawkes::Mutex & env_mutex,
69  fawkes::Logger * logger)
70 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
71 {
72  message_register_ = new MessageRegister();
73  setup_clips();
74 }
75 
76 /** Constructor.
77  * @param env CLIPS environment to which to provide the protobuf functionality
78  * @param env_mutex mutex to lock when operating on the CLIPS environment.
79  * @param proto_path proto path passed to a newly instantiated message register
80  * @param logger optional logger for informational output
81  */
83  fawkes::Mutex & env_mutex,
84  std::vector<std::string> &proto_path,
85  fawkes::Logger * logger)
86 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
87 {
88  message_register_ = new MessageRegister(proto_path);
89  setup_clips();
90 }
91 
92 /** Destructor. */
94 {
95  {
96  fawkes::MutexLocker lock(&clips_mutex_);
97 
98  for (auto f : functions_) {
99  clips_->remove_function(f);
100  }
101  functions_.clear();
102  }
103 
104  for (auto c : clients_) {
105  delete c.second;
106  }
107  clients_.clear();
108 
109  delete message_register_;
110  delete server_;
111 }
112 
113 #define ADD_FUNCTION(n, s) \
114  clips_->add_function(n, s); \
115  functions_.push_back(n);
116 
117 /** Setup CLIPS environment. */
118 void
119 ClipsProtobufCommunicator::setup_clips()
120 {
121  fawkes::MutexLocker lock(&clips_mutex_);
122 
123  ADD_FUNCTION("pb-register-type",
124  (sigc::slot<CLIPS::Value, std::string>(
125  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_register_type))));
126  ADD_FUNCTION("pb-field-names",
127  (sigc::slot<CLIPS::Values, void *>(
128  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_names))));
129  ADD_FUNCTION("pb-field-type",
130  (sigc::slot<CLIPS::Value, void *, std::string>(
131  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_type))));
132  ADD_FUNCTION("pb-has-field",
133  (sigc::slot<CLIPS::Value, void *, std::string>(
134  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_has_field))));
135  ADD_FUNCTION("pb-field-label",
136  (sigc::slot<CLIPS::Value, void *, std::string>(
137  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_label))));
138  ADD_FUNCTION("pb-field-value",
139  (sigc::slot<CLIPS::Value, void *, std::string>(
140  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_value))));
141  ADD_FUNCTION("pb-field-list",
142  (sigc::slot<CLIPS::Values, void *, std::string>(
143  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_list))));
144  ADD_FUNCTION("pb-field-is-list",
145  (sigc::slot<CLIPS::Value, void *, std::string>(
146  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
147  ADD_FUNCTION("pb-create",
148  (sigc::slot<CLIPS::Value, std::string>(
149  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_create))));
150  ADD_FUNCTION("pb-destroy",
151  (sigc::slot<void, void *>(
152  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_destroy))));
153  ADD_FUNCTION("pb-ref",
154  (sigc::slot<CLIPS::Value, void *>(
155  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_ref))));
156  ADD_FUNCTION("pb-set-field",
157  (sigc::slot<void, void *, std::string, CLIPS::Value>(
158  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_set_field))));
159  ADD_FUNCTION("pb-add-list",
160  (sigc::slot<void, void *, std::string, CLIPS::Value>(
161  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_add_list))));
162  ADD_FUNCTION("pb-send",
163  (sigc::slot<void, long int, void *>(
164  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_send))));
165  ADD_FUNCTION("pb-tostring",
166  (sigc::slot<std::string, void *>(
167  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_tostring))));
168  ADD_FUNCTION("pb-server-enable",
169  (sigc::slot<void, int>(
170  sigc::mem_fun(*this, &ClipsProtobufCommunicator::enable_server))));
171  ADD_FUNCTION("pb-server-disable",
172  (sigc::slot<void>(
173  sigc::mem_fun(*this, &ClipsProtobufCommunicator::disable_server))));
174  ADD_FUNCTION("pb-peer-create",
175  (sigc::slot<long int, std::string, int>(
176  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
177  ADD_FUNCTION("pb-peer-create-local",
178  (sigc::slot<long int, std::string, int, int>(
179  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
180  ADD_FUNCTION("pb-peer-create-crypto",
181  (sigc::slot<long int, std::string, int, std::string, std::string>(
182  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
183  ADD_FUNCTION("pb-peer-create-local-crypto",
184  (sigc::slot<long int, std::string, int, int, std::string, std::string>(sigc::mem_fun(
185  *this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
186  ADD_FUNCTION("pb-peer-destroy",
187  (sigc::slot<void, long int>(
188  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
189  ADD_FUNCTION("pb-peer-setup-crypto",
190  (sigc::slot<void, long int, std::string, std::string>(
191  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
192  ADD_FUNCTION("pb-broadcast",
193  (sigc::slot<void, long int, void *>(
194  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
195  ADD_FUNCTION("pb-connect",
196  (sigc::slot<long int, std::string, int>(
197  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
198  ADD_FUNCTION("pb-disconnect",
199  (sigc::slot<void, long int>(
200  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
201 }
202 
203 /** Enable protobuf stream server.
204  * @param port TCP port to listen on for connections
205  */
206 void
208 {
209  if ((port > 0) && !server_) {
210  server_ = new protobuf_comm::ProtobufStreamServer(port, message_register_);
211 
212  server_->signal_connected().connect(
213  boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected, this, _1, _2));
214  server_->signal_disconnected().connect(
215  boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected, this, _1, _2));
216  server_->signal_received().connect(
217  boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg, this, _1, _2, _3, _4));
218  server_->signal_receive_failed().connect(
219  boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail, this, _1, _2, _3, _4));
220  }
221 }
222 
223 /** Disable protobu stream server. */
224 void
226 {
227  delete server_;
228  server_ = NULL;
229 }
230 
231 /** Enable protobuf peer.
232  * @param address IP address to send messages to
233  * @param send_port UDP port to send messages to
234  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
235  * @param crypto_key encryption key
236  * @param cipher cipher suite, see BufferEncryptor for supported types
237  * @return peer identifier
238  */
239 long int
240 ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address,
241  int send_port,
242  int recv_port,
243  std::string crypto_key,
244  std::string cipher)
245 {
246  if (recv_port <= 0)
247  recv_port = send_port;
248 
249  if (send_port > 0) {
250  protobuf_comm::ProtobufBroadcastPeer *peer = new protobuf_comm::ProtobufBroadcastPeer(
251  address, send_port, recv_port, message_register_, crypto_key, cipher);
252 
253  long int peer_id;
254  {
255  fawkes::MutexLocker lock(&map_mutex_);
256  peer_id = ++next_client_id_;
257  peers_[peer_id] = peer;
258  }
259 
260  peer->signal_received().connect(
261  boost::bind(&ClipsProtobufCommunicator::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
262  peer->signal_recv_error().connect(
263  boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error, this, peer_id, _1, _2));
264  peer->signal_send_error().connect(
265  boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error, this, peer_id, _1));
266 
267  return peer_id;
268  } else {
269  return 0;
270  }
271 }
272 
273 /** Enable protobuf peer.
274  * @param address IP address to send messages to
275  * @param port UDP port to send and receive messages
276  * @param crypto_key encryption key
277  * @param cipher cipher suite, see BufferEncryptor for supported types
278  * @return peer identifier
279  */
280 long int
281 ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address,
282  int port,
283  std::string crypto_key,
284  std::string cipher)
285 {
286  return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
287 }
288 
289 /** Enable protobuf peer.
290  * @param address IP address to send messages to
291  * @param port UDP port to send and receive messages
292  * @return peer identifier
293  */
294 long int
295 ClipsProtobufCommunicator::clips_pb_peer_create(std::string address, int port)
296 {
297  return clips_pb_peer_create_local_crypto(address, port, port);
298 }
299 
300 /** Enable protobuf peer.
301  * @param address IP address to send messages to
302  * @param send_port UDP port to send messages to
303  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
304  * @return peer identifier
305  */
306 long int
307 ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address,
308  int send_port,
309  int recv_port)
310 {
311  return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
312 }
313 
314 /** Disable peer.
315  * @param peer_id ID of the peer to destroy
316  */
317 void
318 ClipsProtobufCommunicator::clips_pb_peer_destroy(long int peer_id)
319 {
320  if (peers_.find(peer_id) != peers_.end()) {
321  delete peers_[peer_id];
322  peers_.erase(peer_id);
323  }
324 }
325 
326 /** Setup crypto for peer.
327  * @param peer_id ID of the peer to destroy
328  * @param crypto_key encryption key
329  * @param cipher cipher suite, see BufferEncryptor for supported types
330  */
331 void
332 ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(long int peer_id,
333  std::string crypto_key,
334  std::string cipher)
335 {
336  if (peers_.find(peer_id) != peers_.end()) {
337  peers_[peer_id]->setup_crypto(crypto_key, cipher);
338  }
339 }
340 
341 /** Register a new message type.
342  * @param full_name full name of type to register
343  * @return true if the type was successfully registered, false otherwise
344  */
345 CLIPS::Value
346 ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
347 {
348  try {
349  message_register_->add_message_type(full_name);
350  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
351  } catch (std::runtime_error &e) {
352  if (logger_) {
353  logger_->log_error("CLIPS-Protobuf",
354  "Registering type %s failed: %s",
355  full_name.c_str(),
356  e.what());
357  }
358  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
359  }
360 }
361 
362 CLIPS::Value
363 ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
364 {
365  try {
366  std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
367  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(m));
368  } catch (std::runtime_error &e) {
369  if (logger_) {
370  logger_->log_warn("CLIPS-Protobuf",
371  "Cannot create message of type %s: %s",
372  full_name.c_str(),
373  e.what());
374  }
375  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>());
376  }
377 }
378 
379 CLIPS::Value
380 ClipsProtobufCommunicator::clips_pb_ref(void *msgptr)
381 {
382  std::shared_ptr<google::protobuf::Message> *m =
383  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
384  if (!*m)
385  return new std::shared_ptr<google::protobuf::Message>();
386 
387  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(*m));
388 }
389 
390 void
391 ClipsProtobufCommunicator::clips_pb_destroy(void *msgptr)
392 {
393  std::shared_ptr<google::protobuf::Message> *m =
394  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
395  if (!*m)
396  return;
397 
398  delete m;
399 }
400 
401 CLIPS::Values
402 ClipsProtobufCommunicator::clips_pb_field_names(void *msgptr)
403 {
404  std::shared_ptr<google::protobuf::Message> *m =
405  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
406  if (!*m)
407  return CLIPS::Values();
408 
409  const Descriptor *desc = (*m)->GetDescriptor();
410  const int field_count = desc->field_count();
411  CLIPS::Values field_names(field_count);
412  for (int i = 0; i < field_count; ++i) {
413  field_names[i].set(desc->field(i)->name(), true);
414  }
415  return field_names;
416 }
417 
418 CLIPS::Value
419 ClipsProtobufCommunicator::clips_pb_field_type(void *msgptr, std::string field_name)
420 {
421  std::shared_ptr<google::protobuf::Message> *m =
422  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
423  if (!*m)
424  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
425 
426  const Descriptor * desc = (*m)->GetDescriptor();
427  const FieldDescriptor *field = desc->FindFieldByName(field_name);
428  if (!field) {
429  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
430  }
431  switch (field->type()) {
432  case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value("DOUBLE", CLIPS::TYPE_SYMBOL);
433  case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value("FLOAT", CLIPS::TYPE_SYMBOL);
434  case FieldDescriptor::TYPE_INT64: return CLIPS::Value("INT64", CLIPS::TYPE_SYMBOL);
435  case FieldDescriptor::TYPE_UINT64: return CLIPS::Value("UINT64", CLIPS::TYPE_SYMBOL);
436  case FieldDescriptor::TYPE_INT32: return CLIPS::Value("INT32", CLIPS::TYPE_SYMBOL);
437  case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value("FIXED64", CLIPS::TYPE_SYMBOL);
438  case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value("FIXED32", CLIPS::TYPE_SYMBOL);
439  case FieldDescriptor::TYPE_BOOL: return CLIPS::Value("BOOL", CLIPS::TYPE_SYMBOL);
440  case FieldDescriptor::TYPE_STRING: return CLIPS::Value("STRING", CLIPS::TYPE_SYMBOL);
441  case FieldDescriptor::TYPE_MESSAGE: return CLIPS::Value("MESSAGE", CLIPS::TYPE_SYMBOL);
442  case FieldDescriptor::TYPE_BYTES: return CLIPS::Value("BYTES", CLIPS::TYPE_SYMBOL);
443  case FieldDescriptor::TYPE_UINT32: return CLIPS::Value("UINT32", CLIPS::TYPE_SYMBOL);
444  case FieldDescriptor::TYPE_ENUM: return CLIPS::Value("ENUM", CLIPS::TYPE_SYMBOL);
445  case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value("SFIXED32", CLIPS::TYPE_SYMBOL);
446  case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value("SFIXED64", CLIPS::TYPE_SYMBOL);
447  case FieldDescriptor::TYPE_SINT32: return CLIPS::Value("SINT32", CLIPS::TYPE_SYMBOL);
448  case FieldDescriptor::TYPE_SINT64: return CLIPS::Value("SINT64", CLIPS::TYPE_SYMBOL);
449  default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
450  }
451 }
452 
453 CLIPS::Value
454 ClipsProtobufCommunicator::clips_pb_has_field(void *msgptr, std::string field_name)
455 {
456  std::shared_ptr<google::protobuf::Message> *m =
457  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
458  if (!*m)
459  return false;
460 
461  const Descriptor * desc = (*m)->GetDescriptor();
462  const FieldDescriptor *field = desc->FindFieldByName(field_name);
463  if (!field)
464  return false;
465 
466  const Reflection *refl = (*m)->GetReflection();
467 
468  if (field->is_repeated()) {
469  return CLIPS::Value((refl->FieldSize(**m, field) > 0) ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
470  } else if (field->is_optional()) {
471  return CLIPS::Value(refl->HasField(**m, field) ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
472  } else {
473  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
474  }
475 }
476 
477 CLIPS::Value
478 ClipsProtobufCommunicator::clips_pb_field_label(void *msgptr, std::string field_name)
479 {
480  std::shared_ptr<google::protobuf::Message> *m =
481  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
482  if (!*m)
483  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
484 
485  const Descriptor * desc = (*m)->GetDescriptor();
486  const FieldDescriptor *field = desc->FindFieldByName(field_name);
487  if (!field) {
488  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
489  }
490  switch (field->label()) {
491  case FieldDescriptor::LABEL_OPTIONAL: return CLIPS::Value("OPTIONAL", CLIPS::TYPE_SYMBOL);
492  case FieldDescriptor::LABEL_REQUIRED: return CLIPS::Value("REQUIRED", CLIPS::TYPE_SYMBOL);
493  case FieldDescriptor::LABEL_REPEATED: return CLIPS::Value("REPEATED", CLIPS::TYPE_SYMBOL);
494  default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
495  }
496 }
497 
498 CLIPS::Value
499 ClipsProtobufCommunicator::clips_pb_field_value(void *msgptr, std::string field_name)
500 {
501  std::shared_ptr<google::protobuf::Message> *m =
502  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
503  if (!(m && *m)) {
504  if (logger_) {
505  logger_->log_warn("CLIPS-Protobuf", "Invalid message when setting %s", field_name.c_str());
506  }
507  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
508  }
509 
510  const Descriptor * desc = (*m)->GetDescriptor();
511  const FieldDescriptor *field = desc->FindFieldByName(field_name);
512  if (!field) {
513  if (logger_) {
514  logger_->log_warn("CLIPS-Protobuf",
515  "Field %s of %s does not exist",
516  field_name.c_str(),
517  (*m)->GetTypeName().c_str());
518  }
519  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
520  }
521  const Reflection *refl = (*m)->GetReflection();
522  if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
523  if (logger_) {
524  logger_->log_warn("CLIPS-Protobuf",
525  "Field %s of %s not set",
526  field_name.c_str(),
527  (*m)->GetTypeName().c_str());
528  }
529  return CLIPS::Value("NOT-SET", CLIPS::TYPE_SYMBOL);
530  }
531  switch (field->type()) {
532  case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value(refl->GetDouble(**m, field));
533  case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value(refl->GetFloat(**m, field));
534  case FieldDescriptor::TYPE_INT64: return CLIPS::Value(refl->GetInt64(**m, field));
535  case FieldDescriptor::TYPE_UINT64: return CLIPS::Value((long int)refl->GetUInt64(**m, field));
536  case FieldDescriptor::TYPE_INT32: return CLIPS::Value(refl->GetInt32(**m, field));
537  case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value((long int)refl->GetUInt64(**m, field));
538  case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value(refl->GetUInt32(**m, field));
539  case FieldDescriptor::TYPE_BOOL:
540  //Booleans are represented as Symbols in CLIPS
541  if (refl->GetBool(**m, field)) {
542  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
543  } else {
544  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
545  }
546  case FieldDescriptor::TYPE_STRING: return CLIPS::Value(refl->GetString(**m, field));
547  case FieldDescriptor::TYPE_MESSAGE: {
548  const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
549  google::protobuf::Message * mcopy = mfield.New();
550  mcopy->CopyFrom(mfield);
551  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
552  return CLIPS::Value(ptr);
553  }
554  case FieldDescriptor::TYPE_BYTES: return CLIPS::Value((char *)"bytes");
555  case FieldDescriptor::TYPE_UINT32: return CLIPS::Value(refl->GetUInt32(**m, field));
556  case FieldDescriptor::TYPE_ENUM:
557  return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
558  case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value(refl->GetInt32(**m, field));
559  case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value(refl->GetInt64(**m, field));
560  case FieldDescriptor::TYPE_SINT32: return CLIPS::Value(refl->GetInt32(**m, field));
561  case FieldDescriptor::TYPE_SINT64: return CLIPS::Value(refl->GetInt64(**m, field));
562  default: throw std::logic_error("Unknown protobuf field type encountered");
563  }
564 }
565 
566 void
567 ClipsProtobufCommunicator::clips_pb_set_field(void * msgptr,
568  std::string field_name,
569  CLIPS::Value value)
570 {
571  std::shared_ptr<google::protobuf::Message> *m =
572  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
573  if (!(m && *m))
574  return;
575 
576  const Descriptor * desc = (*m)->GetDescriptor();
577  const FieldDescriptor *field = desc->FindFieldByName(field_name);
578  if (!field) {
579  if (logger_) {
580  logger_->log_warn("CLIPS-Protobuf", "Could not find field %s", field_name.c_str());
581  }
582  return;
583  }
584  const Reflection *refl = (*m)->GetReflection();
585 
586  try {
587  switch (field->type()) {
588  case FieldDescriptor::TYPE_DOUBLE: refl->SetDouble(m->get(), field, value.as_float()); break;
589  case FieldDescriptor::TYPE_FLOAT: refl->SetFloat(m->get(), field, value.as_float()); break;
590  case FieldDescriptor::TYPE_SFIXED64:
591  case FieldDescriptor::TYPE_SINT64:
592  case FieldDescriptor::TYPE_INT64: refl->SetInt64(m->get(), field, value.as_integer()); break;
593  case FieldDescriptor::TYPE_FIXED64:
594  case FieldDescriptor::TYPE_UINT64: refl->SetUInt64(m->get(), field, value.as_integer()); break;
595  case FieldDescriptor::TYPE_SFIXED32:
596  case FieldDescriptor::TYPE_SINT32:
597  case FieldDescriptor::TYPE_INT32: refl->SetInt32(m->get(), field, value.as_integer()); break;
598  case FieldDescriptor::TYPE_BOOL: refl->SetBool(m->get(), field, (value == "TRUE")); break;
599  case FieldDescriptor::TYPE_STRING: refl->SetString(m->get(), field, value.as_string()); break;
600  case FieldDescriptor::TYPE_MESSAGE: {
601  std::shared_ptr<google::protobuf::Message> *mfrom =
602  static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
603  Message *mut_msg = refl->MutableMessage(m->get(), field);
604  mut_msg->CopyFrom(**mfrom);
605  delete mfrom;
606  } break;
607  case FieldDescriptor::TYPE_BYTES: break;
608  case FieldDescriptor::TYPE_FIXED32:
609  case FieldDescriptor::TYPE_UINT32: refl->SetUInt32(m->get(), field, value.as_integer()); break;
610  case FieldDescriptor::TYPE_ENUM: {
611  const EnumDescriptor * enumdesc = field->enum_type();
612  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
613  if (enumval) {
614  refl->SetEnum(m->get(), field, enumval);
615  } else {
616  if (logger_) {
617  logger_->log_warn("CLIPS-Protobuf",
618  "%s: cannot set invalid "
619  "enum value '%s' on '%s'",
620  (*m)->GetTypeName().c_str(),
621  value.as_string().c_str(),
622  field_name.c_str());
623  }
624  }
625  } break;
626  default: throw std::logic_error("Unknown protobuf field type encountered");
627  }
628  } catch (std::logic_error &e) {
629  if (logger_) {
630  logger_->log_warn("CLIPS-Protobuf",
631  "Failed to set field %s of %s: %s "
632  "(type %d, as string %s)",
633  field_name.c_str(),
634  (*m)->GetTypeName().c_str(),
635  e.what(),
636  value.type(),
637  to_string(value).c_str());
638  }
639  }
640 }
641 
642 void
643 ClipsProtobufCommunicator::clips_pb_add_list(void * msgptr,
644  std::string field_name,
645  CLIPS::Value value)
646 {
647  std::shared_ptr<google::protobuf::Message> *m =
648  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
649  if (!(m && *m))
650  return;
651 
652  const Descriptor * desc = (*m)->GetDescriptor();
653  const FieldDescriptor *field = desc->FindFieldByName(field_name);
654  if (!field) {
655  if (logger_) {
656  logger_->log_warn("CLIPS-Protobuf", "Could not find field %s", field_name.c_str());
657  }
658  return;
659  }
660  const Reflection *refl = (*m)->GetReflection();
661 
662  try {
663  switch (field->type()) {
664  case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value); break;
665  case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value); break;
666  case FieldDescriptor::TYPE_SFIXED64:
667  case FieldDescriptor::TYPE_SINT64:
668  case FieldDescriptor::TYPE_INT64: refl->AddInt64(m->get(), field, value); break;
669  case FieldDescriptor::TYPE_FIXED64:
670  case FieldDescriptor::TYPE_UINT64: refl->AddUInt64(m->get(), field, (long int)value); break;
671  case FieldDescriptor::TYPE_SFIXED32:
672  case FieldDescriptor::TYPE_SINT32:
673  case FieldDescriptor::TYPE_INT32: refl->AddInt32(m->get(), field, value); break;
674  case FieldDescriptor::TYPE_BOOL: refl->AddBool(m->get(), field, (value == "TRUE")); break;
675  case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value); break;
676  case FieldDescriptor::TYPE_MESSAGE: {
677  std::shared_ptr<google::protobuf::Message> *mfrom =
678  static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
679  Message *new_msg = refl->AddMessage(m->get(), field);
680  new_msg->CopyFrom(**mfrom);
681  delete mfrom;
682  } break;
683  case FieldDescriptor::TYPE_BYTES: break;
684  case FieldDescriptor::TYPE_FIXED32:
685  case FieldDescriptor::TYPE_UINT32: refl->AddUInt32(m->get(), field, value); break;
686  case FieldDescriptor::TYPE_ENUM: {
687  const EnumDescriptor * enumdesc = field->enum_type();
688  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
689  if (enumval)
690  refl->AddEnum(m->get(), field, enumval);
691  } break;
692  default: throw std::logic_error("Unknown protobuf field type encountered");
693  }
694  } catch (std::logic_error &e) {
695  if (logger_) {
696  logger_->log_warn("CLIPS-Protobuf",
697  "Failed to add field %s of %s: %s",
698  field_name.c_str(),
699  (*m)->GetTypeName().c_str(),
700  e.what());
701  }
702  }
703 }
704 
705 long int
706 ClipsProtobufCommunicator::clips_pb_client_connect(std::string host, int port)
707 {
708  if (port <= 0)
709  return false;
710 
711  ProtobufStreamClient *client = new ProtobufStreamClient(message_register_);
712 
713  long int client_id;
714  {
715  fawkes::MutexLocker lock(&map_mutex_);
716  client_id = ++next_client_id_;
717  clients_[client_id] = client;
718  }
719 
720  client->signal_connected().connect(
721  boost::bind(&ClipsProtobufCommunicator::handle_client_connected, this, client_id));
722  client->signal_disconnected().connect(
723  boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
724  this,
725  client_id,
726  boost::asio::placeholders::error));
727  client->signal_received().connect(
728  boost::bind(&ClipsProtobufCommunicator::handle_client_msg, this, client_id, _1, _2, _3));
729  client->signal_receive_failed().connect(boost::bind(
730  &ClipsProtobufCommunicator::handle_client_receive_fail, this, client_id, _1, _2, _3));
731 
732  client->async_connect(host.c_str(), port);
733  return CLIPS::Value(client_id);
734 }
735 
736 void
737 ClipsProtobufCommunicator::clips_pb_send(long int client_id, void *msgptr)
738 {
739  std::shared_ptr<google::protobuf::Message> *m =
740  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
741  if (!(m && *m)) {
742  if (logger_) {
743  logger_->log_warn("CLIPS-Protobuf", "Cannot send to %li: invalid message", client_id);
744  }
745  return;
746  }
747 
748  try {
749  fawkes::MutexLocker lock(&map_mutex_);
750 
751  if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
752  //printf("***** SENDING via SERVER\n");
753  server_->send(server_clients_[client_id], *m);
754  sig_server_sent_(server_clients_[client_id], *m);
755  } else if (clients_.find(client_id) != clients_.end()) {
756  //printf("***** SENDING via CLIENT\n");
757  clients_[client_id]->send(*m);
758  std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
759  sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
760  } else if (peers_.find(client_id) != peers_.end()) {
761  //printf("***** SENDING via CLIENT\n");
762  peers_[client_id]->send(*m);
763  sig_peer_sent_(client_id, *m);
764  } else {
765  //printf("Client ID %li is unknown, cannot send message of type %s\n",
766  // client_id, (*m)->GetTypeName().c_str());
767  }
768  } catch (google::protobuf::FatalException &e) {
769  if (logger_) {
770  logger_->log_warn("CLIPS-Profobuf",
771  "Failed to send message of type %s: %s",
772  (*m)->GetTypeName().c_str(),
773  e.what());
774  }
775  } catch (fawkes::Exception &e) {
776  if (logger_) {
777  logger_->log_warn("CLIPS-Protobuf",
778  "Failed to send message of type %s: %s",
779  (*m)->GetTypeName().c_str(),
780  e.what_no_backtrace());
781  }
782  } catch (std::runtime_error &e) {
783  if (logger_) {
784  logger_->log_warn("CLIPS-Protobuf",
785  "Failed to send message of type %s: %s",
786  (*m)->GetTypeName().c_str(),
787  e.what());
788  }
789  }
790 }
791 
792 std::string
793 ClipsProtobufCommunicator::clips_pb_tostring(void *msgptr)
794 {
795  std::shared_ptr<google::protobuf::Message> *m =
796  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
797  if (!(m && *m)) {
798  if (logger_) {
799  logger_->log_warn("CLIPS-Protobuf", "Cannot convert message to string: invalid message");
800  }
801  return "";
802  }
803 
804  return (*m)->DebugString();
805 }
806 
807 void
808 ClipsProtobufCommunicator::clips_pb_broadcast(long int peer_id, void *msgptr)
809 {
810  std::shared_ptr<google::protobuf::Message> *m =
811  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
812  if (!(m && *m)) {
813  if (logger_) {
814  logger_->log_warn("CLIPS-Protobuf", "Cannot send broadcast: invalid message");
815  }
816  return;
817  }
818 
819  fawkes::MutexLocker lock(&map_mutex_);
820  if (peers_.find(peer_id) == peers_.end())
821  return;
822 
823  //logger_->log_info("CLIPS-Protobuf", "Broadcasting %s", (*m)->GetTypeName().c_str());
824  try {
825  peers_[peer_id]->send(*m);
826  } catch (google::protobuf::FatalException &e) {
827  if (logger_) {
828  logger_->log_warn("CLIPS-Protobuf",
829  "Failed to broadcast message of type %s: %s",
830  (*m)->GetTypeName().c_str(),
831  e.what());
832  }
833  } catch (fawkes::Exception &e) {
834  if (logger_) {
835  logger_->log_warn("CLIPS-Protobuf",
836  "Failed to broadcast message of type %s: %s",
837  (*m)->GetTypeName().c_str(),
838  e.what_no_backtrace());
839  }
840  } catch (std::runtime_error &e) {
841  if (logger_) {
842  logger_->log_warn("CLIPS-Protobuf",
843  "Failed to broadcast message of type %s: %s",
844  (*m)->GetTypeName().c_str(),
845  e.what());
846  }
847  }
848 
849  sig_peer_sent_(peer_id, *m);
850 }
851 
852 void
853 ClipsProtobufCommunicator::clips_pb_disconnect(long int client_id)
854 {
855  //logger_->log_info("CLIPS-Protobuf", "Disconnecting client %li", client_id);
856 
857  try {
858  fawkes::MutexLocker lock(&map_mutex_);
859 
860  if (server_clients_.find(client_id) != server_clients_.end()) {
861  protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
862  server_->disconnect(srv_client);
863  server_clients_.erase(client_id);
864  rev_server_clients_.erase(srv_client);
865  } else if (clients_.find(client_id) != clients_.end()) {
866  delete clients_[client_id];
867  clients_.erase(client_id);
868  }
869  } catch (std::runtime_error &e) {
870  if (logger_) {
871  logger_->log_warn("CLIPS-Protobuf",
872  "Failed to disconnect from client %li: %s",
873  client_id,
874  e.what());
875  }
876  }
877 }
878 
879 CLIPS::Values
880 ClipsProtobufCommunicator::clips_pb_field_list(void *msgptr, std::string field_name)
881 {
882  std::shared_ptr<google::protobuf::Message> *m =
883  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
884  if (!(m && *m))
885  return CLIPS::Values(1, CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
886 
887  const Descriptor * desc = (*m)->GetDescriptor();
888  const FieldDescriptor *field = desc->FindFieldByName(field_name);
889  if (!field) {
890  return CLIPS::Values(1, CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
891  }
892  if (field->label() == FieldDescriptor::LABEL_REQUIRED
893  || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
894  CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
895  return rv;
896  }
897 
898  const Reflection *refl = (*m)->GetReflection();
899  int field_size = refl->FieldSize(**m, field);
900  CLIPS::Values rv(field_size);
901  for (int i = 0; i < field_size; ++i) {
902  switch (field->type()) {
903  case FieldDescriptor::TYPE_DOUBLE:
904  rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
905  break;
906  case FieldDescriptor::TYPE_FLOAT:
907  rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
908  break;
909  break;
910  case FieldDescriptor::TYPE_UINT64:
911  case FieldDescriptor::TYPE_FIXED64:
912  rv[i] = CLIPS::Value((long int)refl->GetRepeatedUInt64(**m, field, i));
913  break;
914  case FieldDescriptor::TYPE_UINT32:
915  case FieldDescriptor::TYPE_FIXED32:
916  rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
917  break;
918  case FieldDescriptor::TYPE_BOOL:
919  //Booleans are represented as Symbols in CLIPS
920  if (refl->GetRepeatedBool(**m, field, i)) {
921  rv[i] = CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
922  } else {
923  rv[i] = CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
924  }
925  break;
926  case FieldDescriptor::TYPE_STRING:
927  rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
928  break;
929  case FieldDescriptor::TYPE_MESSAGE: {
930  const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
931  google::protobuf::Message * mcopy = msg.New();
932  mcopy->CopyFrom(msg);
933  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
934  rv[i] = CLIPS::Value(ptr);
935  } break;
936  case FieldDescriptor::TYPE_BYTES:
937  rv[i] = CLIPS::Value((char *)"BYTES", CLIPS::TYPE_SYMBOL);
938  break;
939  case FieldDescriptor::TYPE_ENUM:
940  rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
941  break;
942  case FieldDescriptor::TYPE_SFIXED32:
943  case FieldDescriptor::TYPE_INT32:
944  case FieldDescriptor::TYPE_SINT32:
945  rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
946  break;
947  case FieldDescriptor::TYPE_SFIXED64:
948  case FieldDescriptor::TYPE_SINT64:
949  case FieldDescriptor::TYPE_INT64:
950  rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
951  break;
952  default: throw std::logic_error("Unknown protobuf field type encountered");
953  }
954  }
955 
956  return rv;
957 }
958 
959 CLIPS::Value
960 ClipsProtobufCommunicator::clips_pb_field_is_list(void *msgptr, std::string field_name)
961 {
962  std::shared_ptr<google::protobuf::Message> *m =
963  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
964  if (!(m && *m))
965  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
966 
967  const Descriptor * desc = (*m)->GetDescriptor();
968  const FieldDescriptor *field = desc->FindFieldByName(field_name);
969  if (!field)
970  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
971  return CLIPS::Value(field->is_repeated() ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
972 }
973 
974 void
975 ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
976  uint16_t comp_id,
977  uint16_t msg_type,
978  std::shared_ptr<google::protobuf::Message> &msg,
979  ClipsProtobufCommunicator::ClientType ct,
980  long int client_id)
981 {
982  CLIPS::Template::pointer temp = clips_->get_template("protobuf-msg");
983  if (temp) {
984  struct timeval tv;
985  gettimeofday(&tv, 0);
986  void * ptr = new std::shared_ptr<google::protobuf::Message>(msg);
987  CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
988  fact->set_slot("type", msg->GetTypeName());
989  fact->set_slot("comp-id", comp_id);
990  fact->set_slot("msg-type", msg_type);
991  fact->set_slot("rcvd-via",
992  CLIPS::Value((ct == CT_PEER) ? "BROADCAST" : "STREAM", CLIPS::TYPE_SYMBOL));
993  CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
994  rcvd_at[0] = tv.tv_sec;
995  rcvd_at[1] = tv.tv_usec;
996  fact->set_slot("rcvd-at", rcvd_at);
997  CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
998  host_port[0] = endpoint.first;
999  host_port[1] = CLIPS::Value(endpoint.second);
1000  fact->set_slot("rcvd-from", host_port);
1001  fact->set_slot("client-type",
1002  CLIPS::Value(ct == CT_CLIENT ? "CLIENT" : (ct == CT_SERVER ? "SERVER" : "PEER"),
1003  CLIPS::TYPE_SYMBOL));
1004  fact->set_slot("client-id", client_id);
1005  fact->set_slot("ptr", CLIPS::Value(ptr));
1006  CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
1007 
1008  if (!new_fact) {
1009  if (logger_) {
1010  logger_->log_warn("CLIPS-Protobuf", "Asserting protobuf-msg fact failed");
1011  }
1012  delete static_cast<std::shared_ptr<google::protobuf::Message> *>(ptr);
1013  }
1014  } else {
1015  if (logger_) {
1016  logger_->log_warn("CLIPS-Protobuf", "Did not get template, did you load protobuf.clp?");
1017  }
1018  }
1019 }
1020 
1021 void
1022 ClipsProtobufCommunicator::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1023  boost::asio::ip::tcp::endpoint &endpoint)
1024 {
1025  long int client_id = -1;
1026  {
1027  fawkes::MutexLocker lock(&map_mutex_);
1028  client_id = ++next_client_id_;
1029  client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1030  server_clients_[client_id] = client;
1031  rev_server_clients_[client] = client_id;
1032  }
1033 
1034  fawkes::MutexLocker lock(&clips_mutex_);
1035  clips_->assert_fact_f("(protobuf-server-client-connected %li %s %u)",
1036  client_id,
1037  endpoint.address().to_string().c_str(),
1038  endpoint.port());
1039 }
1040 
1041 void
1042 ClipsProtobufCommunicator::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1043  const boost::system::error_code &error)
1044 {
1045  long int client_id = -1;
1046  {
1047  fawkes::MutexLocker lock(&map_mutex_);
1048  RevServerClientMap::iterator c;
1049  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1050  client_id = c->second;
1051  rev_server_clients_.erase(c);
1052  server_clients_.erase(client_id);
1053  }
1054  }
1055 
1056  if (client_id >= 0) {
1057  fawkes::MutexLocker lock(&clips_mutex_);
1058  clips_->assert_fact_f("(protobuf-server-client-disconnected %li)", client_id);
1059  }
1060 }
1061 
1062 /** Handle message that came from a client.
1063  * @param client client ID
1064  * @param component_id component the message was addressed to
1065  * @param msg_type type of the message
1066  * @param msg the message
1067  */
1068 void
1069 ClipsProtobufCommunicator::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1070  uint16_t component_id,
1071  uint16_t msg_type,
1072  std::shared_ptr<google::protobuf::Message> msg)
1073 {
1074  fawkes::MutexLocker lock(&clips_mutex_);
1075  fawkes::MutexLocker lock2(&map_mutex_);
1076  RevServerClientMap::iterator c;
1077  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1078  clips_assert_message(
1079  client_endpoints_[c->second], component_id, msg_type, msg, CT_SERVER, c->second);
1080  }
1081 }
1082 
1083 /** Handle server reception failure
1084  * @param client client ID
1085  * @param component_id component the message was addressed to
1086  * @param msg_type type of the message
1087  * @param msg the message string
1088  */
1089 void
1090 ClipsProtobufCommunicator::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1091  uint16_t component_id,
1092  uint16_t msg_type,
1093  std::string msg)
1094 {
1095  fawkes::MutexLocker lock(&map_mutex_);
1096  RevServerClientMap::iterator c;
1097  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1098  fawkes::MutexLocker lock(&clips_mutex_);
1099  clips_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1100  "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1101  "(rcvd-from (\"%s\" %u)))",
1102  component_id,
1103  msg_type,
1104  c->second,
1105  msg.c_str(),
1106  client_endpoints_[c->second].first.c_str(),
1107  client_endpoints_[c->second].second);
1108  }
1109 }
1110 
1111 /** Handle message that came from a peer/robot
1112  * @param endpoint the endpoint from which the message was received
1113  * @param component_id component the message was addressed to
1114  * @param msg_type type of the message
1115  * @param msg the message
1116  */
1117 void
1118 ClipsProtobufCommunicator::handle_peer_msg(long int peer_id,
1119  boost::asio::ip::udp::endpoint & endpoint,
1120  uint16_t component_id,
1121  uint16_t msg_type,
1122  std::shared_ptr<google::protobuf::Message> msg)
1123 {
1124  fawkes::MutexLocker lock(&clips_mutex_);
1125  std::pair<std::string, unsigned short> endpp =
1126  std::make_pair(endpoint.address().to_string(), endpoint.port());
1127  clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1128 }
1129 
1130 /** Handle error during peer message processing.
1131  * @param endpoint endpoint of incoming message
1132  * @param msg error message
1133  */
1134 void
1135 ClipsProtobufCommunicator::handle_peer_recv_error(long int peer_id,
1136  boost::asio::ip::udp::endpoint &endpoint,
1137  std::string msg)
1138 {
1139  if (logger_) {
1140  logger_->log_warn("CLIPS-Protobuf",
1141  "Failed to receive peer message from %s:%u: %s",
1142  endpoint.address().to_string().c_str(),
1143  endpoint.port(),
1144  msg.c_str());
1145  }
1146 }
1147 
1148 /** Handle error during peer message processing.
1149  * @param msg error message
1150  */
1151 void
1152 ClipsProtobufCommunicator::handle_peer_send_error(long int peer_id, std::string msg)
1153 {
1154  if (logger_) {
1155  logger_->log_warn("CLIPS-Protobuf", "Failed to send peer message: %s", msg.c_str());
1156  }
1157 }
1158 
1159 void
1160 ClipsProtobufCommunicator::handle_client_connected(long int client_id)
1161 {
1162  fawkes::MutexLocker lock(&clips_mutex_);
1163  clips_->assert_fact_f("(protobuf-client-connected %li)", client_id);
1164 }
1165 
1166 void
1167 ClipsProtobufCommunicator::handle_client_disconnected(long int client_id,
1168  const boost::system::error_code &error)
1169 {
1170  fawkes::MutexLocker lock(&clips_mutex_);
1171  clips_->assert_fact_f("(protobuf-client-disconnected %li)", client_id);
1172 }
1173 
1174 void
1175 ClipsProtobufCommunicator::handle_client_msg(long int client_id,
1176  uint16_t comp_id,
1177  uint16_t msg_type,
1178  std::shared_ptr<google::protobuf::Message> msg)
1179 {
1180  fawkes::MutexLocker lock(&clips_mutex_);
1181  std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1182  clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1183 }
1184 
1185 void
1186 ClipsProtobufCommunicator::handle_client_receive_fail(long int client_id,
1187  uint16_t comp_id,
1188  uint16_t msg_type,
1189  std::string msg)
1190 {
1191  fawkes::MutexLocker lock(&clips_mutex_);
1192  clips_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1193  "(comp-id %u) (msg-type %u) (message \"%s\"))",
1194  client_id,
1195  comp_id,
1196  msg_type,
1197  msg.c_str());
1198 }
1199 
1200 std::string
1201 ClipsProtobufCommunicator::to_string(const CLIPS::Value &v)
1202 {
1203  switch (v.type()) {
1204  case CLIPS::TYPE_UNKNOWN: return "Unknown Type";
1205  case CLIPS::TYPE_FLOAT: return std::to_string(v.as_float());
1206  case CLIPS::TYPE_INTEGER: return std::to_string(v.as_integer());
1207  case CLIPS::TYPE_SYMBOL:
1208  case CLIPS::TYPE_INSTANCE_NAME:
1209  case CLIPS::TYPE_STRING: return v.as_string();
1210  case CLIPS::TYPE_INSTANCE_ADDRESS:
1211  case CLIPS::TYPE_EXTERNAL_ADDRESS: return boost::str(boost::format("%p") % v.as_address());
1212  }
1213  return "Implicit unknown type";
1214 }
1215 
1216 } // end namespace protobuf_clips
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what_no_backtrace() const noexcept
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
Interface for logging.
Definition: logger.h:42
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void enable_server(int port)
Enable protobuf stream server.
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
void disable_server()
Disable protobu stream server.