Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

FIX::ThreadedSocketConnection Class Reference

Encapsulates a socket file descriptor (multi-threaded). More...

#include <ThreadedSocketConnection.h>

Inheritance diagram for FIX::ThreadedSocketConnection:

Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketConnection:

Collaboration graph
[legend]
List of all members.

Public Types

typedef std::set< SessionIDSessions

Public Member Functions

 ThreadedSocketConnection (int s, Sessions sessions, Application &application)
 ThreadedSocketConnection (const SessionID &, int s, Application &)
virtual ~ThreadedSocketConnection ()
SessiongetSession () const
int getSocket () const
bool read ()

Private Member Functions

bool readMessage (std::string &msg) throw ( SocketRecvFailed )
void processStream ()
bool send (const std::string &)
void disconnect ()
bool setSession (const std::string &msg)

Private Attributes

int m_socket
char m_buffer [BUFSIZ]
Applicationm_application
Parser m_parser
Sessions m_sessions
Sessionm_pSession
bool m_disconnect
fd_set m_fds

Detailed Description

Encapsulates a socket file descriptor (multi-threaded).

Definition at line 43 of file ThreadedSocketConnection.h.


Member Typedef Documentation

typedef std::set<SessionID> FIX::ThreadedSocketConnection::Sessions
 

Definition at line 46 of file ThreadedSocketConnection.h.


Constructor & Destructor Documentation

FIX::ThreadedSocketConnection::ThreadedSocketConnection int  s,
Sessions  sessions,
Application application
 

Definition at line 35 of file ThreadedSocketConnection.cpp.

00036 : m_socket( s ), m_application( application ),
00037   m_sessions( sessions ), m_pSession( 0 ) 
00038 {
00039   FD_ZERO( &m_fds );
00040   FD_SET( m_socket, &m_fds );
00041 }

FIX::ThreadedSocketConnection::ThreadedSocketConnection const SessionID ,
int  s,
Application
 

Definition at line 43 of file ThreadedSocketConnection.cpp.

References FIX::Session::setResponder().

00046 : m_socket( s ), m_application( application ),
00047   m_pSession( Session::lookupSession( sessionID ) )
00048 {
00049   FD_ZERO( &m_fds );
00050   FD_SET( m_socket, &m_fds );
00051   if ( m_pSession ) m_pSession->setResponder( this );
00052 }

FIX::ThreadedSocketConnection::~ThreadedSocketConnection  )  [virtual]
 

Definition at line 54 of file ThreadedSocketConnection.cpp.

References FIX::Session::getSessionID(), and FIX::Session::setResponder().

00055 {
00056   if ( m_pSession )
00057   {
00058     m_pSession->setResponder( 0 );
00059     Session::unregisterSession( m_pSession->getSessionID() );
00060   }
00061 }


Member Function Documentation

void FIX::ThreadedSocketConnection::disconnect  )  [private, virtual]
 

Implements FIX::Responder.

Definition at line 69 of file ThreadedSocketConnection.cpp.

References m_disconnect, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_close().

Referenced by processStream(), and read().

00070 { QF_STACK_PUSH(ThreadedSocketConnection::disconnect)
00071   
00072   m_disconnect = true;
00073   socket_close( m_socket );
00074 
00075   QF_STACK_POP
00076 }

Session* FIX::ThreadedSocketConnection::getSession  )  const [inline]
 

Definition at line 52 of file ThreadedSocketConnection.h.

Referenced by FIX::ThreadedSocketInitiator::socketThread().

00052 { return m_pSession; }

int FIX::ThreadedSocketConnection::getSocket  )  const [inline]
 

Definition at line 53 of file ThreadedSocketConnection.h.

Referenced by FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().

00053 { return m_socket; }

void FIX::ThreadedSocketConnection::processStream  )  [private]
 

Definition at line 143 of file ThreadedSocketConnection.cpp.

References disconnect(), FIX::Session::isLoggedOn(), FIX::Session::next(), QF_STACK_POP, QF_STACK_PUSH, readMessage(), and setSession().

Referenced by read().

00144 { QF_STACK_PUSH(ThreadedSocketConnection::processStream)
00145 
00146   std::string msg;
00147   while( readMessage(msg) )
00148   {
00149     if ( !m_pSession )
00150     {
00151       if ( !setSession( msg ) )
00152       { disconnect(); continue; }
00153     }
00154     try
00155     {
00156       m_pSession->next( msg );
00157     }
00158     catch( InvalidMessage& )
00159     {
00160       if( !m_pSession->isLoggedOn() )
00161       {
00162         disconnect();
00163         return;
00164       }
00165     }
00166   }
00167 
00168   QF_STACK_POP
00169 }

bool FIX::ThreadedSocketConnection::read  ) 
 

Definition at line 78 of file ThreadedSocketConnection.cpp.

References FIX::Parser::addToStream(), disconnect(), FIX::Session::disconnect(), FIX::Session::getLog(), m_disconnect, FIX::Session::next(), FIX::Log::onEvent(), processStream(), QF_STACK_POP, and QF_STACK_PUSH.

Referenced by FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().

00079 { QF_STACK_PUSH(ThreadedSocketConnection::read)
00080 
00081   struct timeval timeout = { 1, 0 };
00082   fd_set readset = m_fds;
00083 
00084   try
00085   {
00086     // Wait for input (1 second timeout)
00087     int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
00088 
00089     if( result > 0 ) // Something to read
00090     {
00091       // We can read without blocking
00092       int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00093       if ( size <= 0 ) { throw SocketRecvFailed( result ); }
00094       m_parser.addToStream( m_buffer, size );
00095     }
00096     else if( result == 0 && m_pSession ) // Timeout
00097     {
00098       m_pSession->next();
00099     }
00100     else if( result < 0 ) // Error
00101     {
00102       throw SocketRecvFailed( result );
00103     }
00104 
00105     processStream();
00106     return true;
00107   }
00108   catch ( SocketRecvFailed& e )
00109   {
00110     if( m_disconnect )
00111           return false;
00112 
00113     if( m_pSession )
00114     {
00115       m_pSession->getLog()->onEvent( e.what() );
00116       m_pSession->disconnect();
00117     }
00118     else
00119     {
00120       disconnect();
00121     }
00122 
00123     return false;
00124   }
00125 
00126   QF_STACK_POP
00127 }

bool FIX::ThreadedSocketConnection::readMessage std::string &  msg  )  throw ( SocketRecvFailed ) [private]
 

Definition at line 129 of file ThreadedSocketConnection.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

Referenced by processStream().

00131 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage)
00132 
00133   try
00134   {
00135     return m_parser.readFixMessage( msg );
00136   }
00137   catch ( MessageParseError& ) {}
00138   return true;
00139 
00140   QF_STACK_POP
00141 }

bool FIX::ThreadedSocketConnection::send const std::string &   )  [private, virtual]
 

Implements FIX::Responder.

Definition at line 63 of file ThreadedSocketConnection.cpp.

References QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_send().

00064 { QF_STACK_PUSH(ThreadedSocketConnection::send)
00065   return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0;
00066   QF_STACK_POP
00067 }

bool FIX::ThreadedSocketConnection::setSession const std::string &  msg  )  [private]
 

Definition at line 171 of file ThreadedSocketConnection.cpp.

References FIX::Session::getSessionID(), FIX::process_sleep(), QF_STACK_POP, QF_STACK_PUSH, and FIX::Session::setResponder().

Referenced by processStream().

00172 { QF_STACK_PUSH(ThreadedSocketConnection::setSession)
00173 
00174   m_pSession = Session::lookupSession( msg, true );
00175   if ( !m_pSession ) return false;
00176   SessionID sessionID = m_pSession->getSessionID();
00177   m_pSession = 0;
00178 
00179   // see if the session frees up within 5 seconds
00180   for( int i = 1; i <= 5; i++ )
00181   {
00182     if( !Session::isSessionRegistered( sessionID ) )
00183       m_pSession = Session::registerSession( sessionID );
00184     if( m_pSession ) break;
00185     process_sleep( 1 );
00186   }
00187 
00188   if ( !m_pSession ) 
00189     return false;
00190   if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
00191     return false;
00192 
00193   m_pSession->setResponder( this );
00194   return true;
00195 
00196   QF_STACK_POP
00197 }


Member Data Documentation

Application& FIX::ThreadedSocketConnection::m_application [private]
 

Definition at line 66 of file ThreadedSocketConnection.h.

char FIX::ThreadedSocketConnection::m_buffer[BUFSIZ] [private]
 

Definition at line 64 of file ThreadedSocketConnection.h.

bool FIX::ThreadedSocketConnection::m_disconnect [private]
 

Definition at line 70 of file ThreadedSocketConnection.h.

Referenced by disconnect(), and read().

fd_set FIX::ThreadedSocketConnection::m_fds [private]
 

Definition at line 71 of file ThreadedSocketConnection.h.

Parser FIX::ThreadedSocketConnection::m_parser [private]
 

Definition at line 67 of file ThreadedSocketConnection.h.

Session* FIX::ThreadedSocketConnection::m_pSession [private]
 

Definition at line 69 of file ThreadedSocketConnection.h.

Sessions FIX::ThreadedSocketConnection::m_sessions [private]
 

Definition at line 68 of file ThreadedSocketConnection.h.

int FIX::ThreadedSocketConnection::m_socket [private]
 

Definition at line 63 of file ThreadedSocketConnection.h.


The documentation for this class was generated from the following files:
Generated on Mon Jul 24 19:36:55 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001