22 #include <boost/bind.hpp>    23 #include <boost/enable_shared_from_this.hpp>    39     class GZ_TRANSPORT_VISIBLE PublishTask : 
public tbb::task
    45                   const google::protobuf::Message &_message)
    48         this->msg = _message.New();
    49         this->msg->CopyFrom(_message);
    54       public: tbb::task *execute()
    56                 this->pub->WaitForConnection();
    57                 this->pub->Publish(*this->msg, 
true);
    58                 this->pub->SendMessage();
    68       private: google::protobuf::Message *msg;
    78     class GZ_TRANSPORT_VISIBLE 
Node :
    79       public boost::enable_shared_from_this<Node>
    85       public: 
virtual ~
Node();
    91       public: 
void Init(
const std::string &_space =
"");
    98       public: std::string GetTopicNamespace() 
const;
   103       public: std::string DecodeTopicName(
const std::string &_topic);
   108       public: std::string EncodeTopicName(
const std::string &_topic);
   112       public: 
unsigned int GetId() 
const;
   116       public: 
void ProcessPublishers();
   119       public: 
void ProcessIncoming();
   124       public: 
bool HasLatchedSubscriber(
const std::string &_topic) 
const;
   133       public: 
template<
typename M>
   135                   const google::protobuf::Message &_message)
   138                 PublishTask *task = 
new(tbb::task::allocate_root())
   139                   PublishTask(pub, _message);
   141                 tbb::task::enqueue(*task);
   152       public: 
template<
typename M>
   154                                         unsigned int _queueLimit = 1000,
   157         std::string decodedTopic = this->DecodeTopicName(_topic);
   160               decodedTopic, _queueLimit, _hzRate);
   162         boost::mutex::scoped_lock lock(this->publisherMutex);
   163         publisher->SetNode(shared_from_this());
   164         this->publishers.push_back(publisher);
   176       public: 
template<
typename M, 
typename T>
   178           void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
   179           bool _latching = 
false)
   182         std::string decodedTopic = this->DecodeTopicName(_topic);
   183         ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
   186           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
   194         result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
   205       public: 
template<
typename M>
   207           void(*_fp)(
const boost::shared_ptr<M const> &),
   208                      bool _latching = 
false)
   211         std::string decodedTopic = this->DecodeTopicName(_topic);
   212         ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
   215           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
   216           this->callbacks[decodedTopic].push_back(
   223         result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
   237           void(T::*_fp)(
const std::string &), T *_obj,
   238           bool _latching = 
false)
   241         std::string decodedTopic = this->DecodeTopicName(_topic);
   242         ops.
Init(decodedTopic, shared_from_this(), _latching);
   245           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
   253         result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
   266           void(*_fp)(
const std::string &), 
bool _latching = 
false)
   269         std::string decodedTopic = this->DecodeTopicName(_topic);
   270         ops.
Init(decodedTopic, shared_from_this(), _latching);
   273           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
   274           this->callbacks[decodedTopic].push_back(
   281         result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
   290       public: 
bool HandleData(
const std::string &_topic,
   291                               const std::string &_msg);
   297       public: 
bool HandleMessage(
const std::string &_topic, 
MessagePtr _msg);
   305       public: 
void InsertLatchedMsg(
const std::string &_topic,
   306                                     const std::string &_msg);
   314       public: 
void InsertLatchedMsg(
const std::string &_topic,
   320       public: std::string GetMsgType(
const std::string &_topic) 
const;
   327       public: 
void RemoveCallback(
const std::string &_topic, 
unsigned int _id);
   329       private: std::string topicNamespace;
   330       private: std::vector<PublisherPtr> publishers;
   331       private: std::vector<PublisherPtr>::iterator publishersIter;
   332       private: 
static unsigned int idCounter;
   333       private: 
unsigned int id;
   335       private: 
typedef std::list<CallbackHelperPtr> Callback_L;
   336       private: 
typedef std::map<std::string, Callback_L> Callback_M;
   337       private: Callback_M callbacks;
   338       private: std::map<std::string, std::list<std::string> > incomingMsgs;
   341       private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
   343       private: boost::mutex publisherMutex;
   344       private: boost::mutex publisherDeleteMutex;
   345       private: boost::recursive_mutex incomingMutex;
   349       private: boost::recursive_mutex processIncomingMutex;
   351       private: 
bool initialized;
 Options for a subscription. 
Definition: SubscribeOptions.hh:35
 
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic. 
Definition: Node.hh:153
 
Forward declarations for the common classes. 
Definition: Animation.hh:33
 
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback. 
Definition: Node.hh:236
 
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
 
Forward declarations for transport. 
 
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
 
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
 
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper 
Definition: CallbackHelper.hh:105
 
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback. 
Definition: Node.hh:177
 
Callback helper Template. 
Definition: CallbackHelper.hh:111
 
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback. 
Definition: Node.hh:206
 
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
 
#define NULL
Definition: CommonTypes.hh:31
 
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback. 
Definition: Node.hh:265
 
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message. 
 
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message. 
Definition: Node.hh:134
 
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177
 
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options. 
Definition: SubscribeOptions.hh:48
 
static TopicManager * Instance()
Get an instance of the singleton. 
Definition: SingletonT.hh:36