SocketInitiator.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030
00031 namespace FIX
00032 {
00033 SocketInitiator::SocketInitiator( Application& application,
00034 MessageStoreFactory& factory,
00035 const SessionSettings& settings )
00036 throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038 m_connector( 1 ), m_lastConnect( 0 ),
00039 m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00040 m_rcvBufSize( 0 )
00041 {
00042 }
00043
00044 SocketInitiator::SocketInitiator( Application& application,
00045 MessageStoreFactory& factory,
00046 const SessionSettings& settings,
00047 LogFactory& logFactory )
00048 throw( ConfigError )
00049 : Initiator( application, factory, settings, logFactory ),
00050 m_connector( 1 ), m_lastConnect( 0 ),
00051 m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00052 m_rcvBufSize( 0 )
00053 {
00054 }
00055
00056 SocketInitiator::~SocketInitiator()
00057 {
00058 SocketConnections::iterator i;
00059 for (i = m_connections.begin();
00060 i != m_connections.end(); ++i)
00061 delete i->second;
00062
00063 for (i = m_pendingConnections.begin();
00064 i != m_pendingConnections.end(); ++i)
00065 delete i->second;
00066 }
00067
00068 void SocketInitiator::onConfigure( const SessionSettings& s )
00069 throw ( ConfigError )
00070 { QF_STACK_PUSH(SocketInitiator::onConfigure)
00071
00072 try { m_reconnectInterval = s.get().getLong( RECONNECT_INTERVAL ); }
00073 catch ( std::exception& ) {}
00074 if( s.get().has( SOCKET_NODELAY ) )
00075 m_noDelay = s.get().getBool( SOCKET_NODELAY );
00076 if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00077 m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00078 if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00079 m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00080
00081 QF_STACK_POP
00082 }
00083
00084 void SocketInitiator::onInitialize( const SessionSettings& s )
00085 throw ( RuntimeError )
00086 { QF_STACK_PUSH(SocketInitiator::onInitialize)
00087 QF_STACK_POP
00088 }
00089
00090 void SocketInitiator::onStart()
00091 { QF_STACK_PUSH(SocketInitiator::onStart)
00092
00093 connect();
00094
00095 while ( !isStopped() )
00096 m_connector.block( *this );
00097
00098 time_t start = 0;
00099 time_t now = 0;
00100
00101 ::time( &start );
00102 while ( isLoggedOn() )
00103 {
00104 m_connector.block( *this );
00105 if( ::time(&now) -5 >= start )
00106 break;
00107 }
00108
00109 QF_STACK_POP
00110 }
00111
00112 bool SocketInitiator::onPoll( double timeout )
00113 { QF_STACK_PUSH(SocketInitiator::onPoll)
00114
00115 time_t start = 0;
00116 time_t now = 0;
00117
00118 if( isStopped() )
00119 {
00120 if( start == 0 )
00121 ::time( &start );
00122 if( !isLoggedOn() )
00123 return false;
00124 if( ::time(&now) - 5 >= start )
00125 return false;
00126 }
00127
00128 m_connector.block( *this, true, timeout );
00129 return true;
00130
00131 QF_STACK_POP
00132 }
00133
00134 void SocketInitiator::onStop()
00135 { QF_STACK_PUSH(SocketInitiator::onStop)
00136 QF_STACK_POP
00137 }
00138
00139 void SocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00140 { QF_STACK_PUSH(SocketInitiator::doConnect)
00141
00142 try
00143 {
00144 std::string address;
00145 short port = 0;
00146 Session* session = Session::lookupSession( s );
00147 if( !session->isSessionTime() ) return;
00148
00149 Log* log = session->getLog();
00150
00151 getHost( s, d, address, port );
00152
00153 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00154 int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize );
00155 setPending( s );
00156
00157 m_pendingConnections[ result ]
00158 = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
00159 }
00160 catch ( std::exception& ) {}
00161
00162 QF_STACK_POP
00163 }
00164
00165 void SocketInitiator::onConnect( SocketConnector&, int s )
00166 { QF_STACK_PUSH(SocketInitiator::onConnect)
00167
00168 SocketConnections::iterator i = m_pendingConnections.find( s );
00169 if( i == m_pendingConnections.end() ) return;
00170 SocketConnection* pSocketConnection = i->second;
00171
00172 m_connections[s] = pSocketConnection;
00173 m_pendingConnections.erase( i );
00174 setConnected( pSocketConnection->getSession()->getSessionID() );
00175 pSocketConnection->onTimeout();
00176
00177 QF_STACK_POP
00178 }
00179
00180 void SocketInitiator::onWrite( SocketConnector& connector, int s )
00181 { QF_STACK_PUSH(SocketInitiator::onWrite)
00182
00183 SocketConnections::iterator i = m_connections.find( s );
00184 if ( i == m_connections.end() ) return ;
00185 SocketConnection* pSocketConnection = i->second;
00186 if( pSocketConnection->processQueue() )
00187 pSocketConnection->unsignal();
00188
00189 QF_STACK_POP
00190 }
00191
00192 bool SocketInitiator::onData( SocketConnector& connector, int s )
00193 { QF_STACK_PUSH(SocketInitiator::onData)
00194
00195 SocketConnections::iterator i = m_connections.find( s );
00196 if ( i == m_connections.end() ) return false;
00197 SocketConnection* pSocketConnection = i->second;
00198 return pSocketConnection->read( connector );
00199
00200 QF_STACK_POP
00201 }
00202
00203 void SocketInitiator::onDisconnect( SocketConnector&, int s )
00204 { QF_STACK_PUSH(SocketInitiator::onDisconnect)
00205
00206 SocketConnections::iterator i = m_connections.find( s );
00207 SocketConnections::iterator j = m_pendingConnections.find( s );
00208
00209 SocketConnection* pSocketConnection = 0;
00210 if( i != m_connections.end() )
00211 pSocketConnection = i->second;
00212 if( j != m_pendingConnections.end() )
00213 pSocketConnection = j->second;
00214 if( !pSocketConnection )
00215 return;
00216
00217 setDisconnected( pSocketConnection->getSession()->getSessionID() );
00218
00219 Session* pSession = pSocketConnection->getSession();
00220 if ( pSession )
00221 {
00222 pSession->disconnect();
00223 setDisconnected( pSession->getSessionID() );
00224 }
00225
00226 delete pSocketConnection;
00227 m_connections.erase( s );
00228 m_pendingConnections.erase( s );
00229
00230 QF_STACK_POP
00231 }
00232
00233 void SocketInitiator::onError( SocketConnector& connector )
00234 { QF_STACK_PUSH(SocketInitiator::onError)
00235 onTimeout( connector );
00236 QF_STACK_POP
00237 }
00238
00239 void SocketInitiator::onTimeout( SocketConnector& )
00240 { QF_STACK_PUSH(SocketInitiator::onTimeout)
00241
00242 time_t now;
00243 ::time( &now );
00244
00245 if ( (now - m_lastConnect) >= m_reconnectInterval )
00246 {
00247 connect();
00248 m_lastConnect = now;
00249 }
00250
00251 SocketConnections::iterator i;
00252 for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00253 i->second->onTimeout();
00254
00255 QF_STACK_POP
00256 }
00257
00258 void SocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00259 std::string& address, short& port )
00260 { QF_STACK_PUSH(SocketInitiator::getHost)
00261
00262 int num = 0;
00263 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00264 if ( i != m_sessionToHostNum.end() ) num = i->second;
00265
00266 std::stringstream hostStream;
00267 hostStream << SOCKET_CONNECT_HOST << num;
00268 std::string hostString = hostStream.str();
00269
00270 std::stringstream portStream;
00271 std::string portString = portStream.str();
00272 portStream << SOCKET_CONNECT_PORT << num;
00273
00274 if( d.has(hostString) && d.has(portString) )
00275 {
00276 address = d.getString( hostString );
00277 port = ( short ) d.getLong( portString );
00278 }
00279 else
00280 {
00281 num = 0;
00282 address = d.getString( SOCKET_CONNECT_HOST );
00283 port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00284 }
00285
00286 m_sessionToHostNum[ s ] = ++num;
00287
00288 QF_STACK_POP
00289 }
00290 }