17 #ifndef _CONNECTION_HH_    18 #define _CONNECTION_HH_    21 #include <google/protobuf/message.h>    23 #include <boost/asio.hpp>    24 #include <boost/bind.hpp>    25 #include <boost/function.hpp>    26 #include <boost/thread.hpp>    27 #include <boost/tuple/tuple.hpp>    41 #define HEADER_LENGTH 8    56     class GZ_TRANSPORT_VISIBLE ConnectionReadTask : 
public tbb::task
    62       public: ConnectionReadTask(
    63                   boost::function<
void (
const std::string &)> _func,
    64                   const std::string &_data)
    72       public: tbb::task *execute()
    74                 this->func(this->data);
    79       private: boost::function<void (const std::string &)> func;
    82       private: std::string data;
   101       public boost::enable_shared_from_this<Connection>
   113       public: 
bool Connect(
const std::string &_host, 
unsigned int _port);
   122       public: 
void Listen(
unsigned int _port, 
const AcceptCallback &_acceptCB);
   130       public: 
void StartRead(
const ReadCallback &_cb);
   133       public: 
void StopRead();
   136       public: 
void Shutdown();
   140       public: 
bool IsOpen() 
const;
   143       private: 
void Close();
   146       public: 
void Cancel();
   151       public: 
bool Read(std::string &_data);
   160       public: 
void EnqueueMsg(
const std::string &_buffer,
   161                   boost::function<
void(uint32_t)> _cb, uint32_t _id,
   162                   bool _force = 
false);
   168       public: 
void EnqueueMsg(
const std::string &_buffer, 
bool _force = 
false);
   172       public: std::string GetLocalURI() 
const;
   176       public: std::string GetRemoteURI() 
const;
   180       public: std::string GetLocalAddress() 
const;
   184       public: 
unsigned int GetLocalPort() 
const;
   188       public: std::string GetRemoteAddress() 
const;
   192       public: 
unsigned int GetRemotePort() 
const;
   196       public: std::string GetRemoteHostname() 
const;
   200       public: 
static std::string GetLocalHostname();
   204       public: 
template<
typename Handler>
   207                 boost::mutex::scoped_lock lock(this->socketMutex);
   210                   gzerr << 
"AsyncRead on a closed socket\n";
   214                 void (
Connection::*f)(
const boost::system::error_code &,
   215                     boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
   218                 boost::asio::async_read(*this->socket,
   219                     boost::asio::buffer(this->inboundHeader),
   221                                 boost::asio::placeholders::error,
   222                                 boost::make_tuple(_handler)));
   232       private: 
template<
typename Handler>
   233                void OnReadHeader(
const boost::system::error_code &_e,
   234                                  boost::tuple<Handler> _handler)
   238                   if (_e.message() == 
"End of file")
   239                     this->isOpen = 
false;
   243                   std::size_t inboundData_size = 0;
   244                   std::string header(&this->inboundHeader[0],
   245                                       this->inboundHeader.size());
   246                   this->inboundHeader.clear();
   248                   inboundData_size = this->ParseHeader(header);
   250                  if (inboundData_size > 0)
   253                     this->inboundData.resize(inboundData_size);
   255                     void (
Connection::*f)(
const boost::system::error_code &e,
   256                         boost::tuple<Handler>) =
   257                       &Connection::OnReadData<Handler>;
   259                     boost::asio::async_read(*this->socket,
   260                         boost::asio::buffer(this->inboundData),
   262                                     boost::asio::placeholders::error,
   267                     gzerr << 
"Header is empty\n";
   268                     boost::get<0>(_handler)(
"");
   292       private: 
template<
typename Handler>
   293                void OnReadData(
const boost::system::error_code &_e,
   294                               boost::tuple<Handler> _handler)
   298                   if (_e.message() == 
"End of file")
   299                     this->isOpen = 
false;
   303                 std::string data(&this->inboundData[0],
   304                                   this->inboundData.size());
   305                 this->inboundData.clear();
   308                   gzerr << 
"OnReadData got empty data!!!\n";
   312                   ConnectionReadTask *task = 
new(tbb::task::allocate_root())
   313                         ConnectionReadTask(boost::get<0>(_handler), data);
   314                   tbb::task::enqueue(*task);
   326               { 
return this->
shutdown.Connect(_subscriber); }
   332               {this->
shutdown.Disconnect(_subscriber);}
   335       public: 
void ProcessWriteQueue(
bool _blocking = 
false);
   339       public: 
unsigned int GetId() 
const;
   344       public: 
static bool ValidateIP(
const std::string &_ip);
   349       public: std::string GetIPWhiteList() 
const;
   354       private: 
void OnWrite(
const boost::system::error_code &_e);
   358       private: 
void OnAccept(
const boost::system::error_code &_e);
   362       private: std::size_t ParseHeader(
const std::string &_header);
   365       private: 
void ReadLoop(
const ReadCallback &_cb);
   369       private: 
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
   373       private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() 
const;
   377       private: 
static std::string GetHostname(
   378                    boost::asio::ip::tcp::endpoint _ep);
   383       private: 
void OnConnect(
const boost::system::error_code &_error,
   384                   boost::asio::ip::tcp::resolver::iterator _endPointIter);
   387       private: boost::asio::ip::tcp::socket *socket;
   390       private: boost::asio::ip::tcp::acceptor *acceptor;
   393       private: std::deque<std::string> writeQueue;
   398                std::pair<boost::function<void(uint32_t)>, uint32_t> > callbacks;
   401       private: boost::mutex connectMutex;
   404       private: boost::recursive_mutex writeMutex;
   407       private: boost::recursive_mutex readMutex;
   410       private: 
mutable boost::mutex socketMutex;
   413       private: boost::condition_variable connectCondition;
   416       private: AcceptCallback acceptCB;
   419       private: std::vector<char> inboundHeader;
   422       private: std::vector<char> inboundData;
   425       private: 
bool readQuit;
   428       private: 
unsigned int id;
   431       private: 
static unsigned int idCounter;
   434       private: ConnectionPtr acceptConn;
   443       private: 
unsigned int writeCount;
   446       private: std::string localURI;
   449       private: std::string localAddress;
   452       private: std::string remoteURI;
   455       private: std::string remoteAddress;
   458       private: 
bool connectError;
   461       private: std::string ipWhiteList;
   464       private: 
char *headerBuffer;
   467       private: 
bool dropMsgLogged;
   471       private: 
unsigned int callbackIndex;
   474       private: 
bool isOpen;
 boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback. 
Definition: Connection.hh:116
 
Forward declarations for the common classes. 
Definition: Animation.hh:33
 
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback. 
Definition: Connection.hh:125
 
void DisconnectShutdown(event::ConnectionPtr _subscriber)
Unregister a function to be called when the connection is shut down. 
Definition: Connection.hh:331
 
#define HEADER_LENGTH
Definition: Connection.hh:41
 
#define gzerr
Output an error message. 
Definition: Console.hh:50
 
Manages boost::asio IO. 
Definition: IOManager.hh:33
 
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down. 
Definition: Connection.hh:324
 
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:50
 
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation. 
 
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:153
 
#define NULL
Definition: CommonTypes.hh:31
 
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data. 
Definition: Connection.hh:205
 
bool is_stopped()
Is the transport system stopped? 
 
Single TCP/IP connection manager. 
Definition: Connection.hh:100