Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

SocketInitiator.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030 
00031 namespace FIX
00032 {
00033 SocketInitiator::SocketInitiator( Application& application,
00034                                   MessageStoreFactory& factory,
00035                                   const SessionSettings& settings )
00036 throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038   m_connector( 1 ), m_lastConnect( 0 ),
00039   m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00040   m_rcvBufSize( 0 ) 
00041 {
00042 }
00043 
00044 SocketInitiator::SocketInitiator( Application& application,
00045                                   MessageStoreFactory& factory,
00046                                   const SessionSettings& settings,
00047                                   LogFactory& logFactory )
00048 throw( ConfigError )
00049 : Initiator( application, factory, settings, logFactory ),
00050   m_connector( 1 ), m_lastConnect( 0 ),
00051   m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00052   m_rcvBufSize( 0 )
00053 {
00054 }
00055 
00056 SocketInitiator::~SocketInitiator()
00057 {
00058   SocketConnections::iterator i;
00059   for (i = m_connections.begin();
00060        i != m_connections.end(); ++i)
00061     delete i->second;
00062 
00063   for (i = m_pendingConnections.begin();
00064        i != m_pendingConnections.end(); ++i)
00065     delete i->second;
00066 }
00067 
00068 void SocketInitiator::onConfigure( const SessionSettings& s )
00069 throw ( ConfigError )
00070 { QF_STACK_PUSH(SocketInitiator::onConfigure)
00071 
00072   try { m_reconnectInterval = s.get().getLong( RECONNECT_INTERVAL ); }
00073   catch ( std::exception& ) {}
00074   if( s.get().has( SOCKET_NODELAY ) )
00075     m_noDelay = s.get().getBool( SOCKET_NODELAY );
00076   if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00077     m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00078   if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00079     m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00080 
00081   QF_STACK_POP
00082 }
00083 
00084 void SocketInitiator::onInitialize( const SessionSettings& s )
00085 throw ( RuntimeError )
00086 { QF_STACK_PUSH(SocketInitiator::onInitialize)
00087   QF_STACK_POP
00088 }
00089 
00090 void SocketInitiator::onStart()
00091 { QF_STACK_PUSH(SocketInitiator::onStart)
00092 
00093   connect();
00094 
00095   while ( !isStopped() )
00096     m_connector.block( *this );
00097 
00098   time_t start = 0;
00099   time_t now = 0;
00100 
00101   ::time( &start );
00102   while ( isLoggedOn() )
00103   {
00104     m_connector.block( *this );
00105     if( ::time(&now) -5 >= start )
00106       break;
00107   }
00108 
00109   QF_STACK_POP
00110 }
00111 
00112 bool SocketInitiator::onPoll( double timeout )
00113 { QF_STACK_PUSH(SocketInitiator::onPoll)
00114 
00115   time_t start = 0;
00116   time_t now = 0;
00117 
00118   if( isStopped() )
00119   {
00120     if( start == 0 )
00121       ::time( &start );
00122     if( !isLoggedOn() )
00123       return false;
00124     if( ::time(&now) - 5 >= start )
00125       return false;
00126   }
00127 
00128   m_connector.block( *this, true, timeout );
00129   return true;
00130 
00131   QF_STACK_POP
00132 }
00133 
00134 void SocketInitiator::onStop()
00135 { QF_STACK_PUSH(SocketInitiator::onStop)
00136   QF_STACK_POP
00137 }
00138 
00139 void SocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00140 { QF_STACK_PUSH(SocketInitiator::doConnect)
00141 
00142   try
00143   {
00144     std::string address;
00145     short port = 0;
00146     Session* session = Session::lookupSession( s );
00147     if( !session->isSessionTime() ) return;
00148 
00149     Log* log = session->getLog();
00150 
00151     getHost( s, d, address, port );
00152 
00153     log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00154     int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize );
00155     setPending( s );
00156 
00157     m_pendingConnections[ result ] 
00158       = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
00159   }
00160   catch ( std::exception& ) {}
00161 
00162   QF_STACK_POP
00163 }
00164 
00165 void SocketInitiator::onConnect( SocketConnector&, int s )
00166 { QF_STACK_PUSH(SocketInitiator::onConnect)
00167 
00168   SocketConnections::iterator i = m_pendingConnections.find( s );
00169   if( i == m_pendingConnections.end() ) return;
00170   SocketConnection* pSocketConnection = i->second;
00171   
00172   m_connections[s] = pSocketConnection;
00173   m_pendingConnections.erase( i );
00174   setConnected( pSocketConnection->getSession()->getSessionID() );
00175   pSocketConnection->onTimeout();
00176 
00177   QF_STACK_POP
00178 }
00179 
00180 void SocketInitiator::onWrite( SocketConnector& connector, int s )
00181 { QF_STACK_PUSH(SocketInitiator::onWrite)
00182 
00183   SocketConnections::iterator i = m_connections.find( s );
00184   if ( i == m_connections.end() ) return ;
00185   SocketConnection* pSocketConnection = i->second;
00186   if( pSocketConnection->processQueue() )
00187     pSocketConnection->unsignal();
00188   
00189   QF_STACK_POP
00190 }
00191 
00192 bool SocketInitiator::onData( SocketConnector& connector, int s )
00193 { QF_STACK_PUSH(SocketInitiator::onData)
00194 
00195   SocketConnections::iterator i = m_connections.find( s );
00196   if ( i == m_connections.end() ) return false;
00197   SocketConnection* pSocketConnection = i->second;
00198   return pSocketConnection->read( connector );
00199 
00200   QF_STACK_POP
00201 }
00202 
00203 void SocketInitiator::onDisconnect( SocketConnector&, int s )
00204 { QF_STACK_PUSH(SocketInitiator::onDisconnect)
00205 
00206   SocketConnections::iterator i = m_connections.find( s );
00207   SocketConnections::iterator j = m_pendingConnections.find( s );
00208 
00209   SocketConnection* pSocketConnection = 0;
00210   if( i != m_connections.end() ) 
00211           pSocketConnection = i->second;
00212   if( j != m_pendingConnections.end() )
00213           pSocketConnection = j->second;
00214   if( !pSocketConnection )
00215           return;
00216 
00217   setDisconnected( pSocketConnection->getSession()->getSessionID() );
00218 
00219   Session* pSession = pSocketConnection->getSession();
00220   if ( pSession )
00221   {
00222     pSession->disconnect();
00223     setDisconnected( pSession->getSessionID() );
00224   }
00225 
00226   delete pSocketConnection;
00227   m_connections.erase( s );
00228   m_pendingConnections.erase( s );
00229 
00230   QF_STACK_POP
00231 }
00232 
00233 void SocketInitiator::onError( SocketConnector& connector )
00234 { QF_STACK_PUSH(SocketInitiator::onError)
00235   onTimeout( connector );
00236   QF_STACK_POP
00237 }
00238 
00239 void SocketInitiator::onTimeout( SocketConnector& )
00240 { QF_STACK_PUSH(SocketInitiator::onTimeout)
00241 
00242   time_t now;
00243   ::time( &now );
00244 
00245   if ( (now - m_lastConnect) >= m_reconnectInterval )
00246   {
00247     connect();
00248     m_lastConnect = now;
00249   }
00250 
00251   SocketConnections::iterator i;
00252   for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00253     i->second->onTimeout();
00254 
00255   QF_STACK_POP
00256 }
00257 
00258 void SocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00259                                std::string& address, short& port )
00260 { QF_STACK_PUSH(SocketInitiator::getHost)
00261 
00262   int num = 0;
00263   SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00264   if ( i != m_sessionToHostNum.end() ) num = i->second;
00265 
00266   std::stringstream hostStream;
00267   hostStream << SOCKET_CONNECT_HOST << num;
00268   std::string hostString = hostStream.str();
00269 
00270   std::stringstream portStream;
00271   std::string portString = portStream.str();
00272   portStream << SOCKET_CONNECT_PORT << num;
00273 
00274   if( d.has(hostString) && d.has(portString) )
00275   {
00276     address = d.getString( hostString );
00277     port = ( short ) d.getLong( portString );
00278   }
00279   else
00280   {
00281     num = 0;
00282     address = d.getString( SOCKET_CONNECT_HOST );
00283     port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00284   }
00285 
00286   m_sessionToHostNum[ s ] = ++num;
00287 
00288   QF_STACK_POP
00289 }
00290 }

Generated on Mon Mar 1 13:41:38 2010 for QuickFIX by doxygen 1.5.8 written by Dimitri van Heesch, © 1997-2001