//============================================================================= /** * @file Talker.cpp * * This test application tests a wide range of events that can be * demultiplexed using various ACE utilities. Events used include * ^C events, reading from STDIN, vanilla Win32 events, thread * exits, Reactor notifications, proactive reads, and proactive * writes. * * The proactive I/O events are demultiplexed by the ACE_Proactor. * The thread exits, notications, and vanilla Win32 events are * demultiplexed by the ACE_Reactor. To enable a single thread * to run all these events, the Proactor is integrated with the * Reactor. * * The test application prototypes a simple talk program. Two * instances of the application connect. Input from either console * is displayed on the others console also. Because of the evils * of Win32 STDIN, a separate thread is used to read from STDIN. * To test the Proactor and Reactor, I/O between the remote * processes is performed proactively and interactions between the * STDIN thread and the main thread are performed reactively. * * The following description of the test application is in two * parts. The participants section explains the main components * involved in the application. The collaboration section * describes how the partipants interact in response to the * multiple event types which occur. * * The Reactor test application has the following participants: * * . Reactor -- The Reactor demultiplexes Win32 "waitable" * events using WaitForMultipleObjects. * * . Proactor -- The proactor initiates and demultiplexes * overlapped I/O operations. The Proactor registers with the * Reactor so that a single-thread can demultiplex all * application events. * * . STDIN_Handler -- STDIN_Handler is an Active Object which reads * from STDIN and forwards the input to the Peer_Handler. This * runs in a separate thread to make the test more interesting. * However, STDIN is "waitable", so in general it can be waited on * by the ACE Reactor, thanks MicroSlush! * * . Peer_Handler -- The Peer_Handler connects to another instance * of test_reactor. It Proactively reads and writes data to the * peer. When the STDIN_Handler gives it messages, it fowards them * to the remote peer. When it receives messages from the remote * peer, it prints the output to the console. * * The collaborations of the participants are as follows: * * . Initialization * * Peer_Handler -- connects to the remote peer. It then begins * proactively reading from the remote connection. Note that it * will be notified by the Proactor when a read completes. It * also registers a notification strategy with message queue so * that it is notified when the STDIN_Handler posts a message * onto the queue. * * STDIN_Handler -- STDIN_Handler registers a signal handler for * SIGINT. This just captures the exception so that the kernel * doesn't kill our process; We want to exit gracefully. It also * creates an Exit_Hook object which registers the * STDIN_Handler's thread handle with the Reactor. The * Exit_Hook will get called back when the STDIN_Handler thread * exits. After registering these, it blocks reading from STDIN. * * Proactor -- is registered with the Reactor. * * The main thread of control waits in the Reactor. * * . STDIN events -- When the STDIN_Handler thread reads from * STDIN, it puts the message on Peer_Handler's message queue. It * then returns to reading from STDIN. * * . Message enqueue -- The Reactor thread wakes up and calls * Peer_Handler::handle_output. The Peer_Handler then tries to * dequeue a message from its message queue. If it can, the * message is Proactively sent to the remote peer. Note that the * Peer_Handler will be notified with this operation is complete. * The Peer_Handler then falls back into the Reactor event loop. * * . Send complete event -- When a proactive send is complete, the * Proactor is notified by the Reactor. The Proactor, in turn, * notifies the Peer_Handler. The Peer_Handler then checks for * more messages from the message queue. If there are any, it * tries to send them. If there are not, it returns to the * Reactor event loop. * * . Read complete event -- When a proactive read is complete (the * Peer_Handler initiated a proactive read when it connected to the * remote peer), the Proactor is notified by the Reactor. The * Proactor, in turn notifies the Peer_Handler. If the read was * successful the Peer_Handler just displays the received msg to * the console and reinvokes a proactive read from the network * connection. If the read failed (i.e. the remote peer exited), * the Peer_Handler sets a flag to end the event loop and returns. * This will cause the application to exit. * * . ^C events -- When the user types ^C at the console, the * STDIN_Handler's signal handler will be called. It does nothing, * but as a result of the signal, the STDIN_Handler thread will * exit. * * . STDIN_Handler thread exits -- The Exit_Hook will get called * back from the Reactor. Exit_Hook::handle_signal sets a flag * to end the event loop and returns. This will cause the * application to exit. * * To run example, start an instance of the test with an optional * local port argument (as the acceptor). Start the other instance * with -h and -p . Type in either the * client or server windows and your message should show up in the * other window. Control C to exit. * * @author Tim Harrison Irfan Pyarali */ //============================================================================= #include "ace/OS_main.h" #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) #include "ace/Reactor.h" #include "ace/Reactor_Notification_Strategy.h" #include "ace/WIN32_Proactor.h" #include "ace/Proactor.h" #include "ace/SOCK_Connector.h" #include "ace/SOCK_Acceptor.h" #include "ace/Get_Opt.h" #include "ace/Service_Config.h" #include "ace/Task.h" #include "ace/OS_NS_unistd.h" typedef ACE_Task MT_TASK; /** * @class Peer_Handler * * @brief Connect to a server. Receive messages from STDIN_Handler * and forward them to the server using proactive I/O. */ class Peer_Handler : public MT_TASK, public ACE_Handler { public: Peer_Handler (int argc, ACE_TCHAR *argv[]); ~Peer_Handler (void); //FUZZ: disable check_for_lack_ACE_OS /** * This method creates the network connection to the remote peer. * It does blocking connects and accepts depending on whether a * hostname was specified from the command line. *FUZZ: enable check_for_lack_ACE_OS */ int open (void * =0); /** * This method will be called when an asynchronous read completes on a stream. * The remote peer has sent us something. If it succeeded, print * out the message and reinitiate a read. Otherwise, fail. In both * cases, delete the message sent. */ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); /** * This method will be called when an asynchronous write completes on a strea_m. * One of our asynchronous writes to the remote peer has completed. * Make sure it succeeded and then delete the message. */ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); /** * Get the I/O handle used by this . This method will be * called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is * passed to . */ virtual ACE_HANDLE handle () const; /// Set the ACE_HANDLE value for this Handler. void handle (ACE_HANDLE); /// We've been removed from the Reactor. virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); /** * Called when output events should start. Note that this is * automatically invoked by the * . */ virtual int handle_output (ACE_HANDLE fd); private: /// Socket that we have connected to the server. ACE_SOCK_Stream stream_; /// The strategy object that the reactor uses to notify us when /// something is added to the queue. ACE_Reactor_Notification_Strategy strategy_; // = Remote peer info. /// Name of remote host. ACE_TCHAR *host_; /// Port number for remote host. u_short port_; /// Read stream ACE_Asynch_Read_Stream rd_stream_; /// Write stream ACE_Asynch_Write_Stream wr_stream_; /// Message Block for reading from the network ACE_Message_Block mb_; }; /** * @class STDIN_Handler * * @brief Active Object. Reads from STDIN and passes message blocks to * the peer handler. */ class STDIN_Handler : public ACE_Task { public: /// Initialization. STDIN_Handler (MT_TASK &ph); //FUZZ: disable check_for_lack_ACE_OS /// Activate object. virtual int open (void * = 0); /// Shut down. ///FUZZ: enable check_for_lack_ACE_OS virtual int close (u_long = 0); /// Thread runs here as an active object. int svc (void); int handle_close (ACE_HANDLE, ACE_Reactor_Mask); private: /// Handle a ^C. (Do nothing, this just illustrates how we can catch /// signals along with the other things). static void handler (int signum); /// Helper function to register with the Reactor for thread exit. void register_thread_exit_hook (void); /// The STDIN thread has exited. This means the user hit ^C. We can /// end the event loop. virtual int handle_signal (int index, siginfo_t *, ucontext_t *); /// Send all input to ph_. MT_TASK &ph_; /// Handle of our thread. ACE_HANDLE thr_handle_; }; Peer_Handler::Peer_Handler (int argc, ACE_TCHAR *argv[]) : strategy_ (ACE_Reactor::instance (), this, ACE_Event_Handler::WRITE_MASK), host_ (0), port_ (ACE_DEFAULT_SERVER_PORT), mb_ (BUFSIZ) { // This code sets up the message to notify us when a new message is // added to the queue. Actually, the queue notifies Reactor which // then notifies us. this->msg_queue ()->notification_strategy (&this->strategy_); ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("h:p:")); int c; while ((c = get_opt ()) != EOF) { switch (c) { case 'h': host_ = get_opt.opt_arg (); break; case 'p': port_ = ACE_OS::atoi (get_opt.opt_arg ()); break; } } } Peer_Handler::~Peer_Handler (void) { } // This method creates the network connection to the remote peer. It // does blocking connects and accepts depending on whether a hostname // was specified from the command line. int Peer_Handler::open (void *) { if (host_ != 0) // Connector { ACE_INET_Addr addr (port_, host_); ACE_SOCK_Connector connector; // Establish connection with server. if (connector.connect (stream_, addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); } else // Acceptor { ACE_SOCK_Acceptor acceptor; ACE_INET_Addr local_addr (port_); if ((acceptor.open (local_addr) == -1) || (acceptor.accept (this->stream_) == -1)) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); } int result = this->rd_stream_.open (*this); if (result != 0) return result; result = this->wr_stream_.open (*this); if (result != 0) return result; result = this->rd_stream_.read (this->mb_, this->mb_.size ()); return result; } // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. void Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { if (result.bytes_transferred () <= 0) ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", result.bytes_transferred ())); // This was allocated by the STDIN_Handler, queued, dequeued, passed // to the proactor, and now passed back to us. result.message_block ().release (); } // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. void Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { if (result.bytes_transferred () > 0 && this->mb_.length () > 0) { this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; // Print out the message received from the server. ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ())); } else { // If a read failed, we will assume it's because the remote peer // went away. We will end the event loop. Since we're in the // main thread, we don't need to do a notify. ACE_Reactor::end_event_loop(); return; } // Reset pointers this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ()); // Start off another read if (this->rd_stream_.read (this->mb_, this->mb_.size ()) == -1) ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler")); } // This is so the Proactor can get our handle. ACE_HANDLE Peer_Handler::handle () const { return this->stream_.get_handle (); } void Peer_Handler::handle (ACE_HANDLE handle) { this->stream_.set_handle (handle); } // We've been removed from the Reactor. int Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); return 0; } // New stuff added to the message queue. Try to dequeue a message. int Peer_Handler::handle_output (ACE_HANDLE) { ACE_Message_Block *mb = 0; ACE_Time_Value tv (ACE_Time_Value::zero); // Forward the message to the remote peer receiver. if (this->getq (mb, &tv) != -1) { if (this->wr_stream_.write (*mb, mb->length ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); } return 0; } void STDIN_Handler::handler (int signum) { ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum)); } STDIN_Handler::STDIN_Handler (MT_TASK &ph) : ph_ (ph) { // Register for ^C from the console. We just need to catch the // exception so that the kernel doesn't kill our process. // Registering this signal handler just tells the kernel that we // know what we're doing; to leave us alone. ACE_OS::signal (SIGINT, (ACE_SignalHandler) STDIN_Handler::handler); }; // Activate object. int STDIN_Handler::open (void *) { if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); return 0; } // Shut down. int STDIN_Handler::close (u_long) { ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); return 0; } // Thread runs here. int STDIN_Handler::svc (void) { this->register_thread_exit_hook (); for (;;) { ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); // Read from stdin into mb. int read_result = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); // If read succeeds, put mb to peer handler, else end the loop. if (read_result > 0) { mb->wr_ptr (read_result); // Note that this call will first enqueue mb onto the peer // handler's message queue, which will then turn around and // notify the Reactor via the Notification_Strategy. This // will subsequently signal the Peer_Handler, which will // react by calling back to its handle_output() method, // which dequeues the message and sends it to the peer // across the network. this->ph_.putq (mb); } else { mb->release (); break; } } // handle_signal will get called on the main proactor thread since // we just exited and the main thread is waiting on our thread exit. return 0; } // Register an exit hook with the reactor. void STDIN_Handler::register_thread_exit_hook (void) { // Get a real handle to our thread. ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); // Register ourselves to get called back when our thread exits. if (ACE_Reactor::instance ()-> register_handler (this, this->thr_handle_) == -1) ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); } // The STDIN thread has exited. This means the user hit ^C. We can // end the event loop and delete ourself. int STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) { if (si != 0) { ACE_TEST_ASSERT (this->thr_handle_ == si->si_handle_); ACE_Reactor::end_event_loop (); } return 0; } int STDIN_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { delete this; return 0; } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { // Let the proactor know that it will be used with Reactor // Create specific proactor ACE_WIN32_Proactor win32_proactor (0, 1); // Get the interface proactor ACE_Proactor proactor (&win32_proactor); // Put it as the instance. ACE_Proactor::instance (&proactor); // Open handler for remote peer communications this will run from // the main thread. Peer_Handler peer_handler (argc, argv); if (peer_handler.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p open failed, errno = %d.\n", "peer_handler", errno), 0); // Open active object for reading from stdin. STDIN_Handler *stdin_handler = new STDIN_Handler (peer_handler); // Spawn thread. if (stdin_handler->open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p open failed, errno = %d.\n", "stdin_handler", errno), 0); // Register proactor with Reactor so that we can demultiplex // "waitable" events and I/O operations from a single thread. if (ACE_Reactor::instance ()->register_handler (ACE_Proactor::instance ()->implementation ()) != 0) ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", argv[0]), -1); // Run main event demultiplexor. ACE_Reactor::run_event_loop (); // Remove proactor with Reactor. if (ACE_Reactor::instance ()->remove_handler (ACE_Proactor::instance ()->implementation (), ACE_Event_Handler::DONT_CALL) != 0) ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", argv[0]), -1); return 0; } #else /* !ACE_HAS_WIN32_OVERLAPPED_IO */ int ACE_TMAIN (int , ACE_TCHAR *[]) { return 0; } #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */