Fawkes API  Fawkes Development Version
protobuf_thread.h
1 
2 /***************************************************************************
3  * Protoboard plugin template
4  * - Header for the ProtoBuf thread
5  *
6  * Copyright 2019 Victor MatarĂ©
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #ifndef MESSAGE_HANDLER_H
23 #define MESSAGE_HANDLER_H
24 
25 #include <aspect/blackboard.h>
26 #include <aspect/configurable.h>
27 #include <aspect/logging.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/thread.h>
30 #include <protobuf_comm/server.h>
31 
32 #include <list>
33 #include <map>
34 #include <queue>
35 
36 #define CFG_PREFIX "/plugins/protoboard"
37 
38 namespace protobuf_comm {
39 class ProtobufStreamClient;
40 class ProtobufBroadcastPeer;
41 } // namespace protobuf_comm
42 
43 namespace protoboard {
44 
45 class BlackboardManager;
46 
47 /**
48  * Receive incoming ProtoBuf messages and pass them on to the @a BlackboardManager
49  * for publication to the appropriate blackboard interface.
50  */
52  public fawkes::LoggingAspect,
55 {
56 public:
57  /// Empty-initialization constructor
58  ProtobufThead();
59  /// Destructor
60  virtual ~ProtobufThead() override;
61 
62  /// @return whether incoming ProtoBuf messages are in the queue
63  bool pb_queue_incoming();
64 
65  /// Wrapper for a ProtoBuf message and its metadata
67  {
68  /// The ProtoBuf peer ID that received this message
69  long int peer_id;
70  /// The boost::asio UDP endpoint used by the receiving peer
71  boost::asio::ip::udp::endpoint endpoint;
72  /// The ProtoBuf component ID
73  uint16_t component_id;
74  /// The ProtoBuf type ID
75  uint16_t msg_type;
76  /// The message itself
77  std::shared_ptr<google::protobuf::Message> msg;
78  };
79 
80  /// @return The head of the incoming ProtoBuf message queue (popped)
82 
83  long int peer_create(const std::string &host, int port);
84  long int peer_create_local(const std::string &host, int send_to_port, int recv_on_port);
85  long int peer_create_crypto(const std::string &host,
86  int port,
87  const std::string &crypto_key = "",
88  const std::string &cipher = "");
89  long int peer_create_local_crypto(const std::string &host,
90  int send_to_port,
91  int recv_on_port,
92  const std::string &crypto_key = "",
93  const std::string &cipher = "");
94  void peer_destroy(long int peer_id);
95 
96  /**
97  * Send a ProtoBuf message to the given peer
98  * @param peer_id The peer to send to
99  * @param msg The message
100  */
101  void send(long int peer_id, std::shared_ptr<google::protobuf::Message> msg);
102 
103  /**
104  * Deferred initialization of the pointer to the BlackboardManager
105  * @param bb_manager the BlackboardManager to use
106  */
107  void
109  {
110  bb_manager_ = bb_manager;
111  }
112 
113  /**
114  * Helper to give ProtoBuf converters access to the BlackBoard instance in use
115  * @return A ready-to-use pointer to the BlackBoard
116  */
119  {
120  return blackboard;
121  }
122 
123 protected:
124  virtual void init() override;
125 
126 private:
127  /** Get protobuf_comm peers.
128  * @return protobuf_comm peer */
129  const std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> &
130  peers() const
131  {
132  return peers_;
133  }
134 
135  /** Signal invoked for a message that has been sent via broadcast.
136  * @return signal
137  */
138  boost::signals2::signal<void(long, std::shared_ptr<google::protobuf::Message>)> &
139  signal_peer_sent()
140  {
141  return sig_peer_sent_;
142  }
143 
144  void
145  peer_setup_crypto(long int peer_id, const std::string &crypto_key, const std::string &cipher);
146 
147  void handle_peer_msg(long int peer_id,
148  boost::asio::ip::udp::endpoint & endpoint,
149  uint16_t component_id,
150  uint16_t msg_type,
151  std::shared_ptr<google::protobuf::Message> msg);
152  void handle_peer_recv_error(long int peer_id,
153  boost::asio::ip::udp::endpoint &endpoint,
154  std::string msg);
155  void handle_peer_send_error(long int peer_id, std::string msg);
156 
157  protobuf_comm::MessageRegister *message_register_;
158 
159  boost::signals2::signal<void(long int, std::shared_ptr<google::protobuf::Message>)>
160  sig_peer_sent_;
161 
162  fawkes::Mutex map_mutex_;
163  fawkes::Mutex msgq_mutex_;
164  long int next_client_id_;
165 
166  std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> peers_;
167 
168  BlackboardManager * bb_manager_;
169  std::queue<incoming_message> pb_queue_;
170 };
171 
172 } // namespace protoboard
173 
174 #endif // MESSAGE_HANDLER_H
Thread aspect to access to BlackBoard.
Definition: blackboard.h:34
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
The BlackBoard abstract class.
Definition: blackboard.h:46
Thread aspect to access configuration data.
Definition: configurable.h:33
Thread aspect to log output.
Definition: logging.h:33
Mutex mutual exclusion lock.
Definition: mutex.h:33
Thread class encapsulation of pthreads.
Definition: thread.h:46
The main thread that is woken each time a message arrives on any of the interfaces watched by a bb_if...
Receive incoming ProtoBuf messages and pass them on to the BlackboardManager for publication to the a...
incoming_message pb_queue_pop()
void peer_destroy(long int peer_id)
Disable peer.
long int peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
fawkes::BlackBoard * get_blackboard()
Helper to give ProtoBuf converters access to the BlackBoard instance in use.
virtual void init() override
Initialize the thread.
long int peer_create(const std::string &host, int port)
Enable protobuf peer.
long int peer_create_local(const std::string &host, int send_to_port, int recv_on_port)
Enable protobuf peer.
ProtobufThead()
Empty-initialization constructor.
void set_bb_manager(BlackboardManager *bb_manager)
Deferred initialization of the pointer to the BlackboardManager.
long int peer_create_local_crypto(const std::string &host, int send_to_port, int recv_on_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
void send(long int peer_id, std::shared_ptr< google::protobuf::Message > msg)
Send a ProtoBuf message to the given peer.
virtual ~ProtobufThead() override
Destructor.
Wrapper for a ProtoBuf message and its metadata.
std::shared_ptr< google::protobuf::Message > msg
The message itself.
long int peer_id
The ProtoBuf peer ID that received this message.
boost::asio::ip::udp::endpoint endpoint
The boost::asio UDP endpoint used by the receiving peer.
uint16_t component_id
The ProtoBuf component ID.
uint16_t msg_type
The ProtoBuf type ID.