Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

FIX::ThreadedSocketAcceptor Class Reference

Threaded Socket implementation of Acceptor. More...

#include <ThreadedSocketAcceptor.h>

Inheritance diagram for FIX::ThreadedSocketAcceptor:

Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketAcceptor:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
virtual ~ThreadedSocketAcceptor ()

Private Types

typedef std::set< int > Sockets
typedef std::set< SessionIDSessions
typedef std::map< int, SessionsPortToSessions
typedef std::map< int, int > SocketToPort
typedef std::map< int, int > SocketToThread

Private Member Functions

bool readSettings (const SessionSettings &)
void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor.

void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize acceptor.

void onStart ()
 Implemented to start listening for connections.

bool onPoll ()
 Implemented to connect and poll for events.

void onStop ()
 Implemented to stop a running acceptor.

void addThread (int s, int t)
void removeThread (int s)

Static Private Member Functions

THREAD_PROC socketAcceptorThread (void *p)
THREAD_PROC socketConnectionThread (void *p)

Private Attributes

Sockets m_sockets
PortToSessions m_portToSessions
SocketToPort m_socketToPort
SocketToThread m_threads
Mutex m_mutex

Detailed Description

Threaded Socket implementation of Acceptor.

Definition at line 36 of file ThreadedSocketAcceptor.h.


Member Typedef Documentation

typedef std::map< int, Sessions > FIX::ThreadedSocketAcceptor::PortToSessions [private]
 

Definition at line 73 of file ThreadedSocketAcceptor.h.

typedef std::set< SessionID > FIX::ThreadedSocketAcceptor::Sessions [private]
 

Reimplemented from FIX::Acceptor.

Definition at line 72 of file ThreadedSocketAcceptor.h.

typedef std::set< int > FIX::ThreadedSocketAcceptor::Sockets [private]
 

Definition at line 71 of file ThreadedSocketAcceptor.h.

typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToPort [private]
 

Definition at line 74 of file ThreadedSocketAcceptor.h.

typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToThread [private]
 

Definition at line 75 of file ThreadedSocketAcceptor.h.

Referenced by onStop().


Constructor & Destructor Documentation

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor Application ,
MessageStoreFactory ,
const SessionSettings
throw ( ConfigError )
 

Definition at line 33 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_init().

00037 : Acceptor( application, factory, settings )
00038 { socket_init(); }

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor Application ,
MessageStoreFactory ,
const SessionSettings ,
LogFactory
throw ( ConfigError )
 

Definition at line 40 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_init().

00045 : Acceptor( application, factory, settings, logFactory )
00046 { socket_init(); }

FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor  )  [virtual]
 

Definition at line 48 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_term().

00049 { socket_term(); }


Member Function Documentation

void FIX::ThreadedSocketAcceptor::addThread int  s,
int  t
[private]
 

Definition at line 167 of file ThreadedSocketAcceptor.cpp.

References m_threads, QF_STACK_POP, and QF_STACK_PUSH.

Referenced by onStart(), and socketAcceptorThread().

00168 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00169 
00170   Locker l(m_mutex);
00171 
00172   m_threads[ s ] = t;
00173 
00174   QF_STACK_POP
00175 }

void FIX::ThreadedSocketAcceptor::onConfigure const SessionSettings s  )  throw ( ConfigError ) [private, virtual]
 

Implemented to configure acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 51 of file ThreadedSocketAcceptor.cpp.

References FIX::Dictionary::getBool(), FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.

00053 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00054 
00055   std::set<SessionID> sessions = s.getSessions();
00056   std::set<SessionID>::iterator i;
00057   for( i = sessions.begin(); i != sessions.end(); ++i )
00058   {
00059     const Dictionary& settings = s.get( *i );
00060     settings.getLong( SOCKET_ACCEPT_PORT );
00061     if( settings.has(SOCKET_REUSE_ADDRESS) )
00062       settings.getBool( SOCKET_REUSE_ADDRESS );
00063     if( settings.has(SOCKET_NODELAY) )
00064       settings.getBool( SOCKET_NODELAY );
00065   }
00066 
00067   QF_STACK_POP
00068 }

void FIX::ThreadedSocketAcceptor::onInitialize const SessionSettings s  )  throw ( RuntimeError ) [private, virtual]
 

Implemented to initialize acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 70 of file ThreadedSocketAcceptor.cpp.

References FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::socket_close(), FIX::socket_createAcceptor(), FIX::SOCKET_NODELAY, FIX::SOCKET_REUSE_ADDRESS, and FIX::socket_setsockopt().

00072 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00073 
00074   short port = 0;
00075   bool reuseAddress = false;
00076   bool noDelay = false;
00077 
00078   std::set<int> ports;
00079 
00080   std::set<SessionID> sessions = s.getSessions();
00081   std::set<SessionID>::iterator i = sessions.begin();
00082   for( ; i != sessions.end(); ++i )
00083   {
00084     Dictionary settings = s.get( *i );
00085     port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00086 
00087     m_portToSessions[port].insert( *i );
00088 
00089     if( ports.find(port) != ports.end() )
00090       continue;
00091     ports.insert( port );
00092 
00093     if( settings.has( SOCKET_REUSE_ADDRESS ) )
00094       reuseAddress = s.get().getBool( SOCKET_REUSE_ADDRESS );
00095     if( settings.has( SOCKET_NODELAY ) )
00096       noDelay = s.get().getBool( SOCKET_NODELAY );
00097 
00098     int socket = socket_createAcceptor( port, reuseAddress );
00099     if( socket < 0 )
00100     {
00101       socket_close( socket );
00102       throw RuntimeError( "Unable to create, bind, or listen to port " + IntConvertor::convert( (unsigned short)port ) );
00103     }
00104     if( noDelay )
00105       socket_setsockopt( socket, TCP_NODELAY );
00106 
00107     m_socketToPort[socket] = port;
00108     m_sockets.insert( socket );
00109   }    
00110 
00111   QF_STACK_POP
00112 }

bool FIX::ThreadedSocketAcceptor::onPoll  )  [private, virtual]
 

Implemented to connect and poll for events.

Implements FIX::Acceptor.

Definition at line 131 of file ThreadedSocketAcceptor.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

00132 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00133 
00134   return false;
00135 
00136   QF_STACK_POP
00137 }

void FIX::ThreadedSocketAcceptor::onStart  )  [private, virtual]
 

Implemented to start listening for connections.

Implements FIX::Acceptor.

Definition at line 114 of file ThreadedSocketAcceptor.cpp.

References addThread(), m_socketToPort, QF_STACK_POP, QF_STACK_PUSH, socketAcceptorThread(), and FIX::thread_spawn().

00115 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00116 
00117   Sockets::iterator i;
00118   for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00119   {
00120     Locker l( m_mutex );
00121     int port = m_socketToPort[*i];
00122     AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00123     unsigned thread;
00124     thread_spawn( &socketAcceptorThread, info, thread );
00125     addThread( *i, thread );
00126   }
00127 
00128   QF_STACK_POP
00129 }

void FIX::ThreadedSocketAcceptor::onStop  )  [private, virtual]
 

Implemented to stop a running acceptor.

Implements FIX::Acceptor.

Definition at line 139 of file ThreadedSocketAcceptor.cpp.

References FIX::Acceptor::isLoggedOn(), m_threads, QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), SocketToThread, and FIX::thread_join().

00140 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00141 
00142   Locker l(m_mutex);
00143 
00144   time_t start = 0;
00145   time_t now = 0;
00146 
00147   ::time( &start );
00148   while ( isLoggedOn() )
00149   {
00150     if( ::time(&now) -5 >= start )
00151       break;
00152   }
00153 
00154   SocketToThread threads;
00155   threads = m_threads;
00156 
00157   SocketToThread::iterator i;
00158   for ( i = threads.begin(); i != threads.end(); ++i )
00159     socket_close( i->first );
00160   for ( i = threads.begin(); i != threads.end(); ++i )
00161     thread_join( i->second );
00162   threads.clear();
00163 
00164   QF_STACK_POP
00165 }

bool FIX::ThreadedSocketAcceptor::readSettings const SessionSettings  )  [private]
 

void FIX::ThreadedSocketAcceptor::removeThread int  s  )  [private]
 

Definition at line 177 of file ThreadedSocketAcceptor.cpp.

References m_threads, QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach().

Referenced by socketAcceptorThread(), and socketConnectionThread().

00178 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00179 
00180   Locker l(m_mutex);
00181   SocketToThread::iterator i = m_threads.find( s );
00182   if ( i != m_threads.end() )
00183   {
00184     thread_detach( i->second );
00185     m_threads.erase( i );
00186   }
00187 
00188   QF_STACK_POP
00189 }

THREAD_PROC FIX::ThreadedSocketAcceptor::socketAcceptorThread void *  p  )  [static, private]
 

Definition at line 191 of file ThreadedSocketAcceptor.cpp.

References addThread(), FIX::Acceptor::getApplication(), FIX::Acceptor::isStopped(), FIX::Acceptor::log(), m_mutex, m_portToSessions, QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, removeThread(), FIX::socket_accept(), FIX::socket_getsockopt(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), THREAD_PROC, and FIX::thread_spawn().

Referenced by onStart().

00192 { QF_STACK_TRY
00193   QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00194 
00195   AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00196 
00197   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00198   int s = info->m_socket;
00199   int port = info->m_port;
00200   delete info;
00201 
00202   int noDelay = 0;
00203   socket_getsockopt( s, TCP_NODELAY, noDelay );
00204 
00205   int socket = 0;
00206   while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00207   {
00208     if( noDelay )
00209       socket_setsockopt( socket, TCP_NODELAY );
00210 
00211     Sessions sessions = pAcceptor->m_portToSessions[port];
00212 
00213     ThreadedSocketConnection * pConnection =
00214       new ThreadedSocketConnection( socket, sessions, pAcceptor->getApplication() );
00215 
00216     ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00217 
00218     {
00219       Locker l( pAcceptor->m_mutex );
00220 
00221       std::stringstream stream;
00222       stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00223       pAcceptor->log( stream.str() );
00224 
00225       unsigned thread;
00226       if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00227         delete info;
00228       pAcceptor->addThread( socket, thread );
00229     }
00230   }
00231 
00232   if( !pAcceptor->isStopped() )
00233     pAcceptor->removeThread( s );
00234   return 0;
00235 
00236   QF_STACK_POP
00237   QF_STACK_CATCH
00238 }

THREAD_PROC FIX::ThreadedSocketAcceptor::socketConnectionThread void *  p  )  [static, private]
 

Definition at line 240 of file ThreadedSocketAcceptor.cpp.

References FIX::ThreadedSocketConnection::getSocket(), FIX::Acceptor::isStopped(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, FIX::ThreadedSocketConnection::read(), removeThread(), and THREAD_PROC.

Referenced by socketAcceptorThread().

00241 { QF_STACK_TRY
00242   QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00243 
00244   ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00245 
00246   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00247   ThreadedSocketConnection* pConnection = info->m_pConnection;
00248   delete info;
00249 
00250   int socket = pConnection->getSocket();
00251 
00252   while ( pConnection->read() ) {}
00253   delete pConnection;
00254   if( !pAcceptor->isStopped() )
00255     pAcceptor->removeThread( socket );
00256   return 0;
00257 
00258   QF_STACK_POP
00259   QF_STACK_CATCH
00260 }


Member Data Documentation

Mutex FIX::ThreadedSocketAcceptor::m_mutex [private]
 

Definition at line 93 of file ThreadedSocketAcceptor.h.

Referenced by socketAcceptorThread().

PortToSessions FIX::ThreadedSocketAcceptor::m_portToSessions [private]
 

Definition at line 90 of file ThreadedSocketAcceptor.h.

Referenced by socketAcceptorThread().

Sockets FIX::ThreadedSocketAcceptor::m_sockets [private]
 

Definition at line 89 of file ThreadedSocketAcceptor.h.

SocketToPort FIX::ThreadedSocketAcceptor::m_socketToPort [private]
 

Definition at line 91 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

SocketToThread FIX::ThreadedSocketAcceptor::m_threads [private]
 

Definition at line 92 of file ThreadedSocketAcceptor.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:
Generated on Mon Jul 24 19:36:53 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001