Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

/home/orenmnero/autobuild/quickfix/src/C++/ThreadedSocketInitiator.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "ThreadedSocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030 
00031 namespace FIX
00032 {
00033 ThreadedSocketInitiator::ThreadedSocketInitiator(
00034   Application& application,
00035   MessageStoreFactory& factory,
00036   const SessionSettings& settings ) throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false )
00039 { socket_init(); }
00040 
00041 ThreadedSocketInitiator::ThreadedSocketInitiator(
00042   Application& application,
00043   MessageStoreFactory& factory,
00044   const SessionSettings& settings,
00045   LogFactory& logFactory ) throw( ConfigError )
00046 : Initiator( application, factory, settings, logFactory ),
00047   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false )
00048 { socket_init(); }
00049 
00050 ThreadedSocketInitiator::~ThreadedSocketInitiator()
00051 { socket_term(); }
00052 
00053 void ThreadedSocketInitiator::onConfigure( const SessionSettings& s )
00054 throw ( ConfigError )
00055 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure)
00056 
00057   try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); }
00058   catch ( std::exception& ) {}
00059   if( s.get().has( SOCKET_NODELAY ) )
00060     m_noDelay = s.get().getBool( SOCKET_NODELAY );
00061 
00062   QF_STACK_POP
00063 }
00064 
00065 void ThreadedSocketInitiator::onInitialize( const SessionSettings& s )
00066 throw ( RuntimeError )
00067 { QF_STACK_PUSH(ThreadedSocketInitiator::onInitialize)
00068   QF_STACK_POP
00069 }
00070 
00071 void ThreadedSocketInitiator::onStart()
00072 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart)
00073 
00074   while ( !isStopped() )
00075   {
00076     time_t now;
00077     ::time( &now );
00078 
00079     if ( (now - m_lastConnect) >= m_reconnectInterval )
00080     {
00081       Locker l( m_mutex );
00082       connect();
00083       m_lastConnect = now;
00084     }
00085 
00086     process_sleep( 1 );
00087   }
00088 
00089   QF_STACK_POP
00090 }
00091 
00092 bool ThreadedSocketInitiator::onPoll()
00093 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll)
00094 
00095   return false;
00096 
00097   QF_STACK_POP
00098 }
00099 
00100 void ThreadedSocketInitiator::onStop()
00101 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop)
00102 
00103   SocketToThread threads;
00104   SocketToThread::iterator i;
00105   
00106   Locker l(m_mutex);
00107 
00108   time_t start = 0;
00109   time_t now = 0;
00110 
00111   ::time( &start );
00112   while ( isLoggedOn() )
00113   {
00114     if( ::time(&now) -5 >= start )
00115       break;
00116   }
00117 
00118   threads = m_threads;
00119 
00120   for ( i = threads.begin(); i != threads.end(); ++i )
00121     socket_close( i->first );
00122   
00123   for ( i = threads.begin(); i != threads.end(); ++i )
00124     thread_join( i->second );
00125   threads.clear();
00126 
00127   QF_STACK_POP
00128 }
00129 
00130 bool ThreadedSocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00131 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect)
00132 
00133   try
00134   {
00135     Session* session = Session::lookupSession( s );
00136     if( !session->isSessionTime() ) return false;
00137 
00138     std::string address;
00139     short port = 0;
00140     getHost( s, d, address, port );
00141 
00142     Log* log = session->getLog();
00143     log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00144     int socket = socket_createConnector();
00145 
00146     if( socket_connect(socket, address.c_str(), port) < 0 )
00147     {
00148       log->onEvent( "Connection failed" );
00149       return false;
00150     }
00151 
00152     log->onEvent( "Connection succeeded" );
00153 
00154     ThreadedSocketConnection* pConnection =
00155       new ThreadedSocketConnection( s, socket, getApplication() );
00156 
00157     ThreadPair* pair = new ThreadPair( this, pConnection );
00158 
00159     {
00160       Locker l( m_mutex );
00161       unsigned thread;
00162       if ( !thread_spawn( &socketThread, pair, thread ) )
00163         delete pair;
00164       addThread( socket, thread );
00165     }
00166     return true;
00167   }
00168   catch ( std::exception& ) { return false; }
00169 
00170   QF_STACK_POP
00171 }
00172 
00173 void ThreadedSocketInitiator::addThread( int s, int t )
00174 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread)
00175 
00176   Locker l(m_mutex);
00177 
00178   m_threads[ s ] = t;
00179   QF_STACK_POP
00180 }
00181 
00182 void ThreadedSocketInitiator::removeThread( int s )
00183 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread)
00184 
00185   Locker l(m_mutex);
00186   SocketToThread::iterator i = m_threads.find( s );
00187 
00188   if ( i != m_threads.end() )
00189   {
00190     thread_detach( i->second );
00191     m_threads.erase( i );
00192   }
00193 
00194   QF_STACK_POP
00195 }
00196 
00197 THREAD_PROC ThreadedSocketInitiator::socketThread( void* p )
00198 { QF_STACK_TRY
00199   QF_STACK_PUSH(ThreadedSocketInitiator::socketThread)
00200 
00201   ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
00202 
00203   ThreadedSocketInitiator* pInitiator = pair->first;
00204   ThreadedSocketConnection* pConnection = pair->second;
00205   FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
00206   delete pair;
00207 
00208   pInitiator->setConnected( sessionID );
00209   int socket = pConnection->getSocket();
00210 
00211   while ( pConnection->read() ) {}
00212 
00213   delete pConnection;
00214   if( !pInitiator->isStopped() )
00215     pInitiator->removeThread( socket );
00216   
00217   pInitiator->setDisconnected( sessionID );
00218   return 0;
00219 
00220   QF_STACK_POP
00221   QF_STACK_CATCH
00222 }
00223 
00224 void ThreadedSocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00225                                        std::string& address, short& port )
00226 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost)
00227 
00228   int num = 0;
00229   SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00230   if ( i != m_sessionToHostNum.end() ) num = i->second;
00231 
00232   try
00233   {
00234     std::stringstream hostStream;
00235     hostStream << SOCKET_CONNECT_HOST << num;
00236     address = d.getString( hostStream.str() );
00237 
00238     std::stringstream portStream;
00239     portStream << SOCKET_CONNECT_PORT << num;
00240     port = ( short ) d.getLong( portStream.str() );
00241   }
00242   catch ( ConfigError& )
00243   {
00244     num = 0;
00245     address = d.getString( SOCKET_CONNECT_HOST );
00246     port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00247   }
00248   m_sessionToHostNum[ s ] = ++num;
00249 
00250   QF_STACK_POP
00251 }
00252 
00253 }

Generated on Mon Jul 24 19:36:31 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001