00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030
00031 namespace FIX
00032 {
00033 SocketInitiator::SocketInitiator( Application& application,
00034 MessageStoreFactory& factory,
00035 const SessionSettings& settings )
00036 throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038 m_connector( 1 ), m_lastConnect( 0 ),
00039 m_reconnectInterval( 30 ), m_noDelay( false ) {}
00040
00041 SocketInitiator::SocketInitiator( Application& application,
00042 MessageStoreFactory& factory,
00043 const SessionSettings& settings,
00044 LogFactory& logFactory )
00045 throw( ConfigError )
00046 : Initiator( application, factory, settings, logFactory ),
00047 m_connector( 1 ), m_lastConnect( 0 ),
00048 m_reconnectInterval( 30 ), m_noDelay( false ) {}
00049
00050 SocketInitiator::~SocketInitiator()
00051 {
00052 SocketConnections::iterator i;
00053 for (i = m_connections.begin();
00054 i != m_connections.end(); ++i)
00055 delete i->second;
00056
00057 for (i = m_pendingConnections.begin();
00058 i != m_pendingConnections.end(); ++i)
00059 delete i->second;
00060
00061 }
00062
00063 void SocketInitiator::onConfigure( const SessionSettings& s )
00064 throw ( ConfigError )
00065 { QF_STACK_PUSH(SocketInitiator::onConfigure)
00066
00067 try { m_reconnectInterval = s.get().getLong( RECONNECT_INTERVAL ); }
00068 catch ( std::exception& ) {}
00069 if( s.get().has( SOCKET_NODELAY ) )
00070 m_noDelay = s.get().getBool( SOCKET_NODELAY );
00071
00072 QF_STACK_POP
00073 }
00074
00075 void SocketInitiator::onInitialize( const SessionSettings& s )
00076 throw ( RuntimeError )
00077 { QF_STACK_PUSH(SocketInitiator::onInitialize)
00078 QF_STACK_POP
00079 }
00080
00081 void SocketInitiator::onStart()
00082 { QF_STACK_PUSH(SocketInitiator::onStart)
00083
00084 connect();
00085 while ( !isStopped() )
00086 m_connector.block( *this );
00087
00088 time_t start = 0;
00089 time_t now = 0;
00090
00091 ::time( &start );
00092 while ( isLoggedOn() )
00093 {
00094 m_connector.block( *this );
00095 if( ::time(&now) -5 >= start )
00096 break;
00097 }
00098
00099 QF_STACK_POP
00100 }
00101
00102 bool SocketInitiator::onPoll()
00103 { QF_STACK_PUSH(SocketInitiator::onPoll)
00104
00105 time_t start = 0;
00106 time_t now = 0;
00107
00108 if( isStopped() )
00109 {
00110 if( start == 0 )
00111 ::time( &start );
00112 if( !isLoggedOn() )
00113 return false;
00114 if( ::time(&now) - 5 >= start )
00115 return false;
00116 }
00117
00118 m_connector.block( *this, true );
00119 return true;
00120
00121 QF_STACK_POP
00122 }
00123
00124 void SocketInitiator::onStop()
00125 { QF_STACK_PUSH(SocketInitiator::onStop)
00126 QF_STACK_POP
00127 }
00128
00129 bool SocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00130 { QF_STACK_PUSH(SocketInitiator::doConnect)
00131
00132 try
00133 {
00134 std::string address;
00135 short port = 0;
00136 Session* session = Session::lookupSession( s );
00137 if( !session->isSessionTime() ) return false;
00138
00139 Log* log = session->getLog();
00140
00141 getHost( s, d, address, port );
00142
00143 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00144 int result = m_connector.connect( address, port, m_noDelay );
00145 setPending( s );
00146
00147 m_pendingConnections[ result ]
00148 = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
00149
00150 return true;
00151 }
00152 catch ( std::exception& ) { return false; }
00153
00154 QF_STACK_POP
00155 }
00156
00157 void SocketInitiator::onConnect( SocketConnector&, int s )
00158 { QF_STACK_PUSH(SocketInitiator::onConnect)
00159
00160 SocketConnections::iterator i = m_pendingConnections.find( s );
00161 if( i == m_pendingConnections.end() ) return;
00162 SocketConnection* pSocketConnection = i->second;
00163
00164 m_connections[s] = pSocketConnection;
00165 m_pendingConnections.erase( i );
00166 setConnected( pSocketConnection->getSession()->getSessionID() );
00167
00168 QF_STACK_POP
00169 }
00170
00171 void SocketInitiator::onWrite( SocketConnector& connector, int s )
00172 { QF_STACK_PUSH(SocketInitiator::onWrite)
00173
00174 SocketConnections::iterator i = m_connections.find( s );
00175 if ( i == m_connections.end() ) return ;
00176 SocketConnection* pSocketConnection = i->second;
00177 if( pSocketConnection->processQueue() )
00178 pSocketConnection->unsignal();
00179
00180 QF_STACK_POP
00181 }
00182
00183 void SocketInitiator::onData( SocketConnector& connector, int s )
00184 { QF_STACK_PUSH(SocketInitiator::onData)
00185
00186 SocketConnections::iterator i = m_connections.find( s );
00187 if ( i == m_connections.end() ) return ;
00188 SocketConnection* pSocketConnection = i->second;
00189 pSocketConnection->read( connector );
00190
00191 QF_STACK_POP
00192 }
00193
00194 void SocketInitiator::onDisconnect( SocketConnector&, int s )
00195 { QF_STACK_PUSH(SocketInitiator::onDisconnect)
00196
00197 SocketConnections::iterator i = m_connections.find( s );
00198 SocketConnections::iterator j = m_pendingConnections.find( s );
00199
00200 if ( i == m_connections.end() && j == m_pendingConnections.end() )
00201 return;
00202
00203 SocketConnection* pSocketConnection = i->second;
00204 setDisconnected( pSocketConnection->getSession()->getSessionID() );
00205
00206 Session* pSession = pSocketConnection->getSession();
00207 if ( pSession )
00208 {
00209 pSession->disconnect();
00210 setDisconnected( pSession->getSessionID() );
00211 }
00212
00213 delete pSocketConnection;
00214 m_connections.erase( s );
00215 m_pendingConnections.erase( s );
00216
00217 QF_STACK_POP
00218 }
00219
00220 void SocketInitiator::onError( SocketConnector& connector )
00221 { QF_STACK_PUSH(SocketInitiator::onError)
00222 onTimeout( connector );
00223 QF_STACK_POP
00224 }
00225
00226 void SocketInitiator::onTimeout( SocketConnector& )
00227 { QF_STACK_PUSH(SocketInitiator::onTimeout)
00228
00229 time_t now;
00230 ::time( &now );
00231
00232 if ( (now - m_lastConnect) >= m_reconnectInterval )
00233 {
00234 connect();
00235 m_lastConnect = now;
00236 }
00237
00238 SocketConnections::iterator i;
00239 for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00240 i->second->onTimeout();
00241
00242 QF_STACK_POP
00243 }
00244
00245 void SocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00246 std::string& address, short& port )
00247 { QF_STACK_PUSH(SocketInitiator::getHost)
00248
00249 int num = 0;
00250 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00251 if ( i != m_sessionToHostNum.end() ) num = i->second;
00252
00253 try
00254 {
00255 std::stringstream hostStream;
00256 hostStream << SOCKET_CONNECT_HOST << num;
00257 address = d.getString( hostStream.str() );
00258
00259 std::stringstream portStream;
00260 portStream << SOCKET_CONNECT_PORT << num;
00261 port = ( short ) d.getLong( portStream.str() );
00262 }
00263 catch ( ConfigError& )
00264 {
00265 num = 0;
00266 address = d.getString( SOCKET_CONNECT_HOST );
00267 port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00268 }
00269 m_sessionToHostNum[ s ] = ++num;
00270
00271 QF_STACK_POP
00272 }
00273 }