00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "ThreadedSocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030
00031 namespace FIX
00032 {
00033 ThreadedSocketInitiator::ThreadedSocketInitiator(
00034 Application& application,
00035 MessageStoreFactory& factory,
00036 const SessionSettings& settings ) throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false )
00039 { socket_init(); }
00040
00041 ThreadedSocketInitiator::ThreadedSocketInitiator(
00042 Application& application,
00043 MessageStoreFactory& factory,
00044 const SessionSettings& settings,
00045 LogFactory& logFactory ) throw( ConfigError )
00046 : Initiator( application, factory, settings, logFactory ),
00047 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false )
00048 { socket_init(); }
00049
00050 ThreadedSocketInitiator::~ThreadedSocketInitiator()
00051 { socket_term(); }
00052
00053 void ThreadedSocketInitiator::onConfigure( const SessionSettings& s )
00054 throw ( ConfigError )
00055 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure)
00056
00057 try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); }
00058 catch ( std::exception& ) {}
00059 if( s.get().has( SOCKET_NODELAY ) )
00060 m_noDelay = s.get().getBool( SOCKET_NODELAY );
00061
00062 QF_STACK_POP
00063 }
00064
00065 void ThreadedSocketInitiator::onInitialize( const SessionSettings& s )
00066 throw ( RuntimeError )
00067 { QF_STACK_PUSH(ThreadedSocketInitiator::onInitialize)
00068 QF_STACK_POP
00069 }
00070
00071 void ThreadedSocketInitiator::onStart()
00072 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart)
00073
00074 while ( !isStopped() )
00075 {
00076 time_t now;
00077 ::time( &now );
00078
00079 if ( (now - m_lastConnect) >= m_reconnectInterval )
00080 {
00081 Locker l( m_mutex );
00082 connect();
00083 m_lastConnect = now;
00084 }
00085
00086 process_sleep( 1 );
00087 }
00088
00089 QF_STACK_POP
00090 }
00091
00092 bool ThreadedSocketInitiator::onPoll()
00093 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll)
00094
00095 return false;
00096
00097 QF_STACK_POP
00098 }
00099
00100 void ThreadedSocketInitiator::onStop()
00101 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop)
00102
00103 SocketToThread threads;
00104 SocketToThread::iterator i;
00105
00106 Locker l(m_mutex);
00107
00108 time_t start = 0;
00109 time_t now = 0;
00110
00111 ::time( &start );
00112 while ( isLoggedOn() )
00113 {
00114 if( ::time(&now) -5 >= start )
00115 break;
00116 }
00117
00118 threads = m_threads;
00119
00120 for ( i = threads.begin(); i != threads.end(); ++i )
00121 socket_close( i->first );
00122
00123 for ( i = threads.begin(); i != threads.end(); ++i )
00124 thread_join( i->second );
00125 threads.clear();
00126
00127 QF_STACK_POP
00128 }
00129
00130 bool ThreadedSocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00131 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect)
00132
00133 try
00134 {
00135 Session* session = Session::lookupSession( s );
00136 if( !session->isSessionTime() ) return false;
00137
00138 std::string address;
00139 short port = 0;
00140 getHost( s, d, address, port );
00141
00142 Log* log = session->getLog();
00143 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00144 int socket = socket_createConnector();
00145
00146 if( socket_connect(socket, address.c_str(), port) < 0 )
00147 {
00148 log->onEvent( "Connection failed" );
00149 return false;
00150 }
00151
00152 log->onEvent( "Connection succeeded" );
00153
00154 ThreadedSocketConnection* pConnection =
00155 new ThreadedSocketConnection( s, socket, getApplication() );
00156
00157 ThreadPair* pair = new ThreadPair( this, pConnection );
00158
00159 {
00160 Locker l( m_mutex );
00161 unsigned thread;
00162 if ( !thread_spawn( &socketThread, pair, thread ) )
00163 delete pair;
00164 addThread( socket, thread );
00165 }
00166 return true;
00167 }
00168 catch ( std::exception& ) { return false; }
00169
00170 QF_STACK_POP
00171 }
00172
00173 void ThreadedSocketInitiator::addThread( int s, int t )
00174 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread)
00175
00176 Locker l(m_mutex);
00177
00178 m_threads[ s ] = t;
00179 QF_STACK_POP
00180 }
00181
00182 void ThreadedSocketInitiator::removeThread( int s )
00183 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread)
00184
00185 Locker l(m_mutex);
00186 SocketToThread::iterator i = m_threads.find( s );
00187
00188 if ( i != m_threads.end() )
00189 {
00190 thread_detach( i->second );
00191 m_threads.erase( i );
00192 }
00193
00194 QF_STACK_POP
00195 }
00196
00197 THREAD_PROC ThreadedSocketInitiator::socketThread( void* p )
00198 { QF_STACK_TRY
00199 QF_STACK_PUSH(ThreadedSocketInitiator::socketThread)
00200
00201 ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
00202
00203 ThreadedSocketInitiator* pInitiator = pair->first;
00204 ThreadedSocketConnection* pConnection = pair->second;
00205 FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
00206 delete pair;
00207
00208 pInitiator->setConnected( sessionID );
00209 int socket = pConnection->getSocket();
00210
00211 while ( pConnection->read() ) {}
00212
00213 delete pConnection;
00214 if( !pInitiator->isStopped() )
00215 pInitiator->removeThread( socket );
00216
00217 pInitiator->setDisconnected( sessionID );
00218 return 0;
00219
00220 QF_STACK_POP
00221 QF_STACK_CATCH
00222 }
00223
00224 void ThreadedSocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00225 std::string& address, short& port )
00226 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost)
00227
00228 int num = 0;
00229 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00230 if ( i != m_sessionToHostNum.end() ) num = i->second;
00231
00232 try
00233 {
00234 std::stringstream hostStream;
00235 hostStream << SOCKET_CONNECT_HOST << num;
00236 address = d.getString( hostStream.str() );
00237
00238 std::stringstream portStream;
00239 portStream << SOCKET_CONNECT_PORT << num;
00240 port = ( short ) d.getLong( portStream.str() );
00241 }
00242 catch ( ConfigError& )
00243 {
00244 num = 0;
00245 address = d.getString( SOCKET_CONNECT_HOST );
00246 port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00247 }
00248 m_sessionToHostNum[ s ] = ++num;
00249
00250 QF_STACK_POP
00251 }
00252
00253 }