00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "ThreadedSocketAcceptor.h"
00028 #include "Settings.h"
00029 #include "Utility.h"
00030
00031 namespace FIX
00032 {
00033 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00034 Application& application,
00035 MessageStoreFactory& factory,
00036 const SessionSettings& settings ) throw( ConfigError )
00037 : Acceptor( application, factory, settings )
00038 { socket_init(); }
00039
00040 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00041 Application& application,
00042 MessageStoreFactory& factory,
00043 const SessionSettings& settings,
00044 LogFactory& logFactory ) throw( ConfigError )
00045 : Acceptor( application, factory, settings, logFactory )
00046 { socket_init(); }
00047
00048 ThreadedSocketAcceptor::~ThreadedSocketAcceptor()
00049 { socket_term(); }
00050
00051 void ThreadedSocketAcceptor::onConfigure( const SessionSettings& s )
00052 throw ( ConfigError )
00053 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00054
00055 std::set<SessionID> sessions = s.getSessions();
00056 std::set<SessionID>::iterator i;
00057 for( i = sessions.begin(); i != sessions.end(); ++i )
00058 {
00059 const Dictionary& settings = s.get( *i );
00060 settings.getLong( SOCKET_ACCEPT_PORT );
00061 if( settings.has(SOCKET_REUSE_ADDRESS) )
00062 settings.getBool( SOCKET_REUSE_ADDRESS );
00063 if( settings.has(SOCKET_NODELAY) )
00064 settings.getBool( SOCKET_NODELAY );
00065 }
00066
00067 QF_STACK_POP
00068 }
00069
00070 void ThreadedSocketAcceptor::onInitialize( const SessionSettings& s )
00071 throw ( RuntimeError )
00072 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00073
00074 short port = 0;
00075 bool reuseAddress = false;
00076 bool noDelay = false;
00077
00078 std::set<int> ports;
00079
00080 std::set<SessionID> sessions = s.getSessions();
00081 std::set<SessionID>::iterator i = sessions.begin();
00082 for( ; i != sessions.end(); ++i )
00083 {
00084 Dictionary settings = s.get( *i );
00085 port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00086
00087 m_portToSessions[port].insert( *i );
00088
00089 if( ports.find(port) != ports.end() )
00090 continue;
00091 ports.insert( port );
00092
00093 if( settings.has( SOCKET_REUSE_ADDRESS ) )
00094 reuseAddress = s.get().getBool( SOCKET_REUSE_ADDRESS );
00095 if( settings.has( SOCKET_NODELAY ) )
00096 noDelay = s.get().getBool( SOCKET_NODELAY );
00097
00098 int socket = socket_createAcceptor( port, reuseAddress );
00099 if( socket < 0 )
00100 {
00101 socket_close( socket );
00102 throw RuntimeError( "Unable to create, bind, or listen to port " + IntConvertor::convert( (unsigned short)port ) );
00103 }
00104 if( noDelay )
00105 socket_setsockopt( socket, TCP_NODELAY );
00106
00107 m_socketToPort[socket] = port;
00108 m_sockets.insert( socket );
00109 }
00110
00111 QF_STACK_POP
00112 }
00113
00114 void ThreadedSocketAcceptor::onStart()
00115 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00116
00117 Sockets::iterator i;
00118 for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00119 {
00120 Locker l( m_mutex );
00121 int port = m_socketToPort[*i];
00122 AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00123 unsigned thread;
00124 thread_spawn( &socketAcceptorThread, info, thread );
00125 addThread( *i, thread );
00126 }
00127
00128 QF_STACK_POP
00129 }
00130
00131 bool ThreadedSocketAcceptor::onPoll()
00132 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00133
00134 return false;
00135
00136 QF_STACK_POP
00137 }
00138
00139 void ThreadedSocketAcceptor::onStop()
00140 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00141
00142 Locker l(m_mutex);
00143
00144 time_t start = 0;
00145 time_t now = 0;
00146
00147 ::time( &start );
00148 while ( isLoggedOn() )
00149 {
00150 if( ::time(&now) -5 >= start )
00151 break;
00152 }
00153
00154 SocketToThread threads;
00155 threads = m_threads;
00156
00157 SocketToThread::iterator i;
00158 for ( i = threads.begin(); i != threads.end(); ++i )
00159 socket_close( i->first );
00160 for ( i = threads.begin(); i != threads.end(); ++i )
00161 thread_join( i->second );
00162 threads.clear();
00163
00164 QF_STACK_POP
00165 }
00166
00167 void ThreadedSocketAcceptor::addThread( int s, int t )
00168 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00169
00170 Locker l(m_mutex);
00171
00172 m_threads[ s ] = t;
00173
00174 QF_STACK_POP
00175 }
00176
00177 void ThreadedSocketAcceptor::removeThread( int s )
00178 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00179
00180 Locker l(m_mutex);
00181 SocketToThread::iterator i = m_threads.find( s );
00182 if ( i != m_threads.end() )
00183 {
00184 thread_detach( i->second );
00185 m_threads.erase( i );
00186 }
00187
00188 QF_STACK_POP
00189 }
00190
00191 THREAD_PROC ThreadedSocketAcceptor::socketAcceptorThread( void* p )
00192 { QF_STACK_TRY
00193 QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00194
00195 AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00196
00197 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00198 int s = info->m_socket;
00199 int port = info->m_port;
00200 delete info;
00201
00202 int noDelay = 0;
00203 socket_getsockopt( s, TCP_NODELAY, noDelay );
00204
00205 int socket = 0;
00206 while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00207 {
00208 if( noDelay )
00209 socket_setsockopt( socket, TCP_NODELAY );
00210
00211 Sessions sessions = pAcceptor->m_portToSessions[port];
00212
00213 ThreadedSocketConnection * pConnection =
00214 new ThreadedSocketConnection( socket, sessions, pAcceptor->getApplication() );
00215
00216 ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00217
00218 {
00219 Locker l( pAcceptor->m_mutex );
00220
00221 std::stringstream stream;
00222 stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00223 pAcceptor->log( stream.str() );
00224
00225 unsigned thread;
00226 if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00227 delete info;
00228 pAcceptor->addThread( socket, thread );
00229 }
00230 }
00231
00232 if( !pAcceptor->isStopped() )
00233 pAcceptor->removeThread( s );
00234 return 0;
00235
00236 QF_STACK_POP
00237 QF_STACK_CATCH
00238 }
00239
00240 THREAD_PROC ThreadedSocketAcceptor::socketConnectionThread( void* p )
00241 { QF_STACK_TRY
00242 QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00243
00244 ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00245
00246 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00247 ThreadedSocketConnection* pConnection = info->m_pConnection;
00248 delete info;
00249
00250 int socket = pConnection->getSocket();
00251
00252 while ( pConnection->read() ) {}
00253 delete pConnection;
00254 if( !pAcceptor->isStopped() )
00255 pAcceptor->removeThread( socket );
00256 return 0;
00257
00258 QF_STACK_POP
00259 QF_STACK_CATCH
00260 }
00261 }