![]() |
![]() |
|
Index
Source Files
Annotated Class List
Alphabetical Class List
Class Hierarchy
Graphical Class Hierarchy
|
||
![]() |
![]() |
#include <ThreadedSocketAcceptor.h>
Inheritance diagram for FIX::ThreadedSocketAcceptor:


Public Member Functions | |
| ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError ) | |
| ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError ) | |
| virtual | ~ThreadedSocketAcceptor () |
Private Types | |
| typedef std::set< int > | Sockets |
| typedef std::set< SessionID > | Sessions |
| typedef std::map< int, Sessions > | PortToSessions |
| typedef std::map< int, int > | SocketToPort |
| typedef std::map< int, int > | SocketToThread |
Private Member Functions | |
| bool | readSettings (const SessionSettings &) |
| void | onConfigure (const SessionSettings &) throw ( ConfigError ) |
| Implemented to configure acceptor. | |
| void | onInitialize (const SessionSettings &) throw ( RuntimeError ) |
| Implemented to initialize acceptor. | |
| void | onStart () |
| Implemented to start listening for connections. | |
| bool | onPoll () |
| Implemented to connect and poll for events. | |
| void | onStop () |
| Implemented to stop a running acceptor. | |
| void | addThread (int s, int t) |
| void | removeThread (int s) |
Static Private Member Functions | |
| THREAD_PROC | socketAcceptorThread (void *p) |
| THREAD_PROC | socketConnectionThread (void *p) |
Private Attributes | |
| Sockets | m_sockets |
| PortToSessions | m_portToSessions |
| SocketToPort | m_socketToPort |
| SocketToThread | m_threads |
| Mutex | m_mutex |
Definition at line 36 of file ThreadedSocketAcceptor.h.
|
|
Definition at line 73 of file ThreadedSocketAcceptor.h. |
|
|
Reimplemented from FIX::Acceptor. Definition at line 72 of file ThreadedSocketAcceptor.h. |
|
|
Definition at line 71 of file ThreadedSocketAcceptor.h. |
|
|
Definition at line 74 of file ThreadedSocketAcceptor.h. |
|
|
Definition at line 75 of file ThreadedSocketAcceptor.h. Referenced by onStop(). |
|
||||||||||||||||
|
Definition at line 33 of file ThreadedSocketAcceptor.cpp. References FIX::socket_init().
00037 : Acceptor( application, factory, settings ) 00038 { socket_init(); } |
|
||||||||||||||||||||
|
Definition at line 40 of file ThreadedSocketAcceptor.cpp. References FIX::socket_init().
00045 : Acceptor( application, factory, settings, logFactory ) 00046 { socket_init(); } |
|
|
Definition at line 48 of file ThreadedSocketAcceptor.cpp. References FIX::socket_term().
00049 { socket_term(); }
|
|
||||||||||||
|
Definition at line 167 of file ThreadedSocketAcceptor.cpp. References m_threads, QF_STACK_POP, and QF_STACK_PUSH. Referenced by onStart(), and socketAcceptorThread().
00168 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00169
00170 Locker l(m_mutex);
00171
00172 m_threads[ s ] = t;
00173
00174 QF_STACK_POP
00175 }
|
|
|
Implemented to configure acceptor.
Reimplemented from FIX::Acceptor. Definition at line 51 of file ThreadedSocketAcceptor.cpp. References FIX::Dictionary::getBool(), FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.
00053 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00054
00055 std::set<SessionID> sessions = s.getSessions();
00056 std::set<SessionID>::iterator i;
00057 for( i = sessions.begin(); i != sessions.end(); ++i )
00058 {
00059 const Dictionary& settings = s.get( *i );
00060 settings.getLong( SOCKET_ACCEPT_PORT );
00061 if( settings.has(SOCKET_REUSE_ADDRESS) )
00062 settings.getBool( SOCKET_REUSE_ADDRESS );
00063 if( settings.has(SOCKET_NODELAY) )
00064 settings.getBool( SOCKET_NODELAY );
00065 }
00066
00067 QF_STACK_POP
00068 }
|
|
|
Implemented to initialize acceptor.
Reimplemented from FIX::Acceptor. Definition at line 70 of file ThreadedSocketAcceptor.cpp. References FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::socket_close(), FIX::socket_createAcceptor(), FIX::SOCKET_NODELAY, FIX::SOCKET_REUSE_ADDRESS, and FIX::socket_setsockopt().
00072 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00073
00074 short port = 0;
00075 bool reuseAddress = false;
00076 bool noDelay = false;
00077
00078 std::set<int> ports;
00079
00080 std::set<SessionID> sessions = s.getSessions();
00081 std::set<SessionID>::iterator i = sessions.begin();
00082 for( ; i != sessions.end(); ++i )
00083 {
00084 Dictionary settings = s.get( *i );
00085 port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00086
00087 m_portToSessions[port].insert( *i );
00088
00089 if( ports.find(port) != ports.end() )
00090 continue;
00091 ports.insert( port );
00092
00093 if( settings.has( SOCKET_REUSE_ADDRESS ) )
00094 reuseAddress = s.get().getBool( SOCKET_REUSE_ADDRESS );
00095 if( settings.has( SOCKET_NODELAY ) )
00096 noDelay = s.get().getBool( SOCKET_NODELAY );
00097
00098 int socket = socket_createAcceptor( port, reuseAddress );
00099 if( socket < 0 )
00100 {
00101 socket_close( socket );
00102 throw RuntimeError( "Unable to create, bind, or listen to port " + IntConvertor::convert( (unsigned short)port ) );
00103 }
00104 if( noDelay )
00105 socket_setsockopt( socket, TCP_NODELAY );
00106
00107 m_socketToPort[socket] = port;
00108 m_sockets.insert( socket );
00109 }
00110
00111 QF_STACK_POP
00112 }
|
|
|
Implemented to connect and poll for events.
Implements FIX::Acceptor. Definition at line 131 of file ThreadedSocketAcceptor.cpp. References QF_STACK_POP, and QF_STACK_PUSH.
00132 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00133
00134 return false;
00135
00136 QF_STACK_POP
00137 }
|
|
|
Implemented to start listening for connections.
Implements FIX::Acceptor. Definition at line 114 of file ThreadedSocketAcceptor.cpp. References addThread(), m_socketToPort, QF_STACK_POP, QF_STACK_PUSH, socketAcceptorThread(), and FIX::thread_spawn().
00115 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00116
00117 Sockets::iterator i;
00118 for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00119 {
00120 Locker l( m_mutex );
00121 int port = m_socketToPort[*i];
00122 AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00123 unsigned thread;
00124 thread_spawn( &socketAcceptorThread, info, thread );
00125 addThread( *i, thread );
00126 }
00127
00128 QF_STACK_POP
00129 }
|
|
|
Implemented to stop a running acceptor.
Implements FIX::Acceptor. Definition at line 139 of file ThreadedSocketAcceptor.cpp. References FIX::Acceptor::isLoggedOn(), m_threads, QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), SocketToThread, and FIX::thread_join().
00140 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00141
00142 Locker l(m_mutex);
00143
00144 time_t start = 0;
00145 time_t now = 0;
00146
00147 ::time( &start );
00148 while ( isLoggedOn() )
00149 {
00150 if( ::time(&now) -5 >= start )
00151 break;
00152 }
00153
00154 SocketToThread threads;
00155 threads = m_threads;
00156
00157 SocketToThread::iterator i;
00158 for ( i = threads.begin(); i != threads.end(); ++i )
00159 socket_close( i->first );
00160 for ( i = threads.begin(); i != threads.end(); ++i )
00161 thread_join( i->second );
00162 threads.clear();
00163
00164 QF_STACK_POP
00165 }
|
|
|
|
|
|
Definition at line 177 of file ThreadedSocketAcceptor.cpp. References m_threads, QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach(). Referenced by socketAcceptorThread(), and socketConnectionThread().
00178 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00179
00180 Locker l(m_mutex);
00181 SocketToThread::iterator i = m_threads.find( s );
00182 if ( i != m_threads.end() )
00183 {
00184 thread_detach( i->second );
00185 m_threads.erase( i );
00186 }
00187
00188 QF_STACK_POP
00189 }
|
|
|
Definition at line 191 of file ThreadedSocketAcceptor.cpp. References addThread(), FIX::Acceptor::getApplication(), FIX::Acceptor::isStopped(), FIX::Acceptor::log(), m_mutex, m_portToSessions, QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, removeThread(), FIX::socket_accept(), FIX::socket_getsockopt(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), THREAD_PROC, and FIX::thread_spawn(). Referenced by onStart().
00192 { QF_STACK_TRY
00193 QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00194
00195 AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00196
00197 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00198 int s = info->m_socket;
00199 int port = info->m_port;
00200 delete info;
00201
00202 int noDelay = 0;
00203 socket_getsockopt( s, TCP_NODELAY, noDelay );
00204
00205 int socket = 0;
00206 while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00207 {
00208 if( noDelay )
00209 socket_setsockopt( socket, TCP_NODELAY );
00210
00211 Sessions sessions = pAcceptor->m_portToSessions[port];
00212
00213 ThreadedSocketConnection * pConnection =
00214 new ThreadedSocketConnection( socket, sessions, pAcceptor->getApplication() );
00215
00216 ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00217
00218 {
00219 Locker l( pAcceptor->m_mutex );
00220
00221 std::stringstream stream;
00222 stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00223 pAcceptor->log( stream.str() );
00224
00225 unsigned thread;
00226 if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00227 delete info;
00228 pAcceptor->addThread( socket, thread );
00229 }
00230 }
00231
00232 if( !pAcceptor->isStopped() )
00233 pAcceptor->removeThread( s );
00234 return 0;
00235
00236 QF_STACK_POP
00237 QF_STACK_CATCH
00238 }
|
|
|
Definition at line 240 of file ThreadedSocketAcceptor.cpp. References FIX::ThreadedSocketConnection::getSocket(), FIX::Acceptor::isStopped(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, FIX::ThreadedSocketConnection::read(), removeThread(), and THREAD_PROC. Referenced by socketAcceptorThread().
00241 { QF_STACK_TRY
00242 QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00243
00244 ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00245
00246 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00247 ThreadedSocketConnection* pConnection = info->m_pConnection;
00248 delete info;
00249
00250 int socket = pConnection->getSocket();
00251
00252 while ( pConnection->read() ) {}
00253 delete pConnection;
00254 if( !pAcceptor->isStopped() )
00255 pAcceptor->removeThread( socket );
00256 return 0;
00257
00258 QF_STACK_POP
00259 QF_STACK_CATCH
00260 }
|
|
|
Definition at line 93 of file ThreadedSocketAcceptor.h. Referenced by socketAcceptorThread(). |
|
|
Definition at line 90 of file ThreadedSocketAcceptor.h. Referenced by socketAcceptorThread(). |
|
|
Definition at line 89 of file ThreadedSocketAcceptor.h. |
|
|
Definition at line 91 of file ThreadedSocketAcceptor.h. Referenced by onStart(). |
|
|
Definition at line 92 of file ThreadedSocketAcceptor.h. Referenced by addThread(), onStop(), and removeThread(). |
1.3.6-20040222 written by Dimitri van Heesch,
© 1997-2001