00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "Initiator.h"
00028 #include "Utility.h"
00029 #include "Session.h"
00030 #include "SessionFactory.h"
00031 #include <algorithm>
00032 #include <fstream>
00033
00034 namespace FIX
00035 {
00036 Initiator::Initiator( Application& application,
00037 MessageStoreFactory& messageStoreFactory,
00038 const SessionSettings& settings ) throw( ConfigError )
00039 : m_threadid( 0 ),
00040 m_application( application ),
00041 m_messageStoreFactory( messageStoreFactory ),
00042 m_settings( settings ),
00043 m_pLogFactory( 0 ),
00044 m_stop( false )
00045 { initialize(); }
00046
00047 Initiator::Initiator( Application& application,
00048 MessageStoreFactory& messageStoreFactory,
00049 const SessionSettings& settings,
00050 LogFactory& logFactory ) throw( ConfigError )
00051 : m_threadid( 0 ),
00052 m_application( application ),
00053 m_messageStoreFactory( messageStoreFactory ),
00054 m_settings( settings ),
00055 m_pLogFactory( &logFactory ),
00056 m_stop( false )
00057 { initialize(); }
00058
00059 void Initiator::initialize() throw ( ConfigError )
00060 { QF_STACK_PUSH(Initiator::initialize)
00061
00062 std::set < SessionID > sessions = m_settings.getSessions();
00063 std::set < SessionID > ::iterator i;
00064
00065 if ( !sessions.size() )
00066 throw ConfigError( "No sessions defined" );
00067
00068 SessionFactory factory( m_application, m_messageStoreFactory,
00069 m_pLogFactory );
00070
00071 for ( i = sessions.begin(); i != sessions.end(); ++i )
00072 {
00073 if ( m_settings.get( *i ).getString( "ConnectionType" ) == "initiator" )
00074 {
00075 m_sessionIDs.insert( *i );
00076 m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
00077 setDisconnected( *i );
00078 }
00079 }
00080
00081 if ( !m_sessions.size() )
00082 throw ConfigError( "No sessions defined for initiator" );
00083
00084 QF_STACK_POP
00085 }
00086
00087 Initiator::~Initiator()
00088 { QF_STACK_IGNORE_BEGIN
00089
00090 Sessions::iterator i;
00091 for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
00092 delete i->second;
00093
00094 QF_STACK_IGNORE_END
00095 }
00096
00097 Session* Initiator::getSession( const SessionID& sessionID,
00098 Responder& responder )
00099 { QF_STACK_PUSH(Initiator::getSession)
00100
00101 Sessions::iterator i = m_sessions.find( sessionID );
00102 if ( i != m_sessions.end() )
00103 {
00104 i->second->setResponder( &responder );
00105 return i->second;
00106 }
00107 return 0;
00108
00109 QF_STACK_POP
00110 }
00111
00112 void Initiator::connect()
00113 { QF_STACK_PUSH(Initiator::connect)
00114
00115 Locker l(m_mutex);
00116
00117 SessionIDs disconnected = m_disconnected;
00118 SessionIDs::iterator i = disconnected.begin();
00119 for ( ; i != disconnected.end(); ++i )
00120 {
00121 Session* pSession = Session::lookupSession( *i );
00122 if ( pSession->isEnabled() && pSession->isSessionTime() )
00123 doConnect( *i, m_settings.get( *i ));
00124 }
00125
00126 QF_STACK_POP
00127 }
00128
00129 void Initiator::setPending( const SessionID& sessionID )
00130 { QF_STACK_PUSH(Initiator::setPending)
00131
00132 Locker l(m_mutex);
00133
00134 m_pending.insert( sessionID );
00135 m_connected.erase( sessionID );
00136 m_disconnected.erase( sessionID );
00137
00138 QF_STACK_POP
00139 }
00140
00141 void Initiator::setConnected( const SessionID& sessionID )
00142 { QF_STACK_PUSH(Initiator::setConnected)
00143
00144 Locker l(m_mutex);
00145
00146 m_pending.erase( sessionID );
00147 m_connected.insert( sessionID );
00148 m_disconnected.erase( sessionID );
00149
00150 QF_STACK_POP
00151 }
00152
00153 void Initiator::setDisconnected( const SessionID& sessionID )
00154 { QF_STACK_PUSH(Initiator::setDisconnected)
00155
00156 Locker l(m_mutex);
00157
00158 m_pending.erase( sessionID );
00159 m_connected.erase( sessionID );
00160 m_disconnected.insert( sessionID );
00161
00162 QF_STACK_POP
00163 }
00164
00165 bool Initiator::isPending( const SessionID& sessionID )
00166 { QF_STACK_PUSH(Initiator::isPending)
00167
00168 Locker l(m_mutex);
00169 return m_pending.find( sessionID ) != m_pending.end();
00170
00171 QF_STACK_POP
00172 }
00173
00174 bool Initiator::isConnected( const SessionID& sessionID )
00175 { QF_STACK_PUSH(Initiator::isConnected)
00176
00177 Locker l(m_mutex);
00178 return m_connected.find( sessionID ) != m_connected.end();
00179
00180 QF_STACK_POP
00181 }
00182
00183 bool Initiator::isDisconnected( const SessionID& sessionID )
00184 { QF_STACK_PUSH(Initiator::isDisconnected)
00185
00186 Locker l(m_mutex);
00187 return m_disconnected.find( sessionID ) != m_disconnected.end();
00188
00189 QF_STACK_POP
00190 }
00191
00192 void Initiator::start() throw ( ConfigError, RuntimeError )
00193 { QF_STACK_PUSH(Initiator::start)
00194
00195 m_stop = false;
00196 onConfigure( m_settings );
00197 onInitialize( m_settings );
00198
00199 if( !thread_spawn( &startThread, this, m_threadid ) )
00200 throw RuntimeError("Unable to spawn thread");
00201
00202 QF_STACK_POP
00203 }
00204
00205
00206 void Initiator::block() throw ( ConfigError, RuntimeError )
00207 { QF_STACK_PUSH(Initiator::block)
00208
00209 onConfigure( m_settings );
00210 onInitialize( m_settings );
00211
00212 startThread(this);
00213
00214 QF_STACK_POP
00215 }
00216
00217 bool Initiator::poll() throw ( ConfigError, RuntimeError )
00218 { QF_STACK_PUSH(Initiator::poll)
00219
00220 if( m_firstPoll )
00221 {
00222 onConfigure( m_settings );
00223 onInitialize( m_settings );
00224 m_firstPoll = false;
00225 }
00226
00227 return onPoll();
00228
00229 QF_STACK_POP
00230 }
00231
00232 void Initiator::stop( bool force )
00233 { QF_STACK_PUSH(Initiator::stop)
00234
00235 if( isStopped() ) return;
00236
00237 std::vector<Session*> enabledSessions;
00238
00239 SessionIDs connected = m_connected;
00240 SessionIDs::iterator i = connected.begin();
00241 for ( ; i != connected.end(); ++i )
00242 {
00243 Session* pSession = Session::lookupSession(*i);
00244 if( pSession->isEnabled() )
00245 {
00246 enabledSessions.push_back( pSession );
00247 pSession->logout();
00248 }
00249 }
00250
00251 if( !force )
00252 {
00253 for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
00254 process_sleep( 1 );
00255 }
00256
00257 {
00258 Locker l(m_mutex);
00259 for ( i = connected.begin(); i != connected.end(); ++i )
00260 setDisconnected( Session::lookupSession(*i)->getSessionID() );
00261 }
00262
00263 m_stop = true;
00264 onStop();
00265 if( m_threadid )
00266 thread_join( m_threadid );
00267 m_threadid = 0;
00268
00269 std::vector<Session*>::iterator session = enabledSessions.begin();
00270 for( ; session != enabledSessions.end(); ++session )
00271 (*session)->logon();
00272
00273 QF_STACK_POP
00274 }
00275
00276 bool Initiator::isLoggedOn()
00277 { QF_STACK_PUSH(Initiator::isLoggedOn)
00278
00279 Locker l(m_mutex);
00280
00281 SessionIDs connected = m_connected;
00282 SessionIDs::iterator i = connected.begin();
00283 for ( ; i != connected.end(); ++i )
00284 {
00285 if( Session::lookupSession(*i)->isLoggedOn() )
00286 return true;
00287 }
00288 return false;
00289
00290 QF_STACK_POP
00291 }
00292
00293 THREAD_PROC Initiator::startThread( void* p )
00294 { QF_STACK_TRY
00295 QF_STACK_PUSH(Initiator::startThread)
00296
00297 Initiator * pInitiator = static_cast < Initiator* > ( p );
00298 pInitiator->onStart();
00299 return 0;
00300
00301 QF_STACK_POP
00302 QF_STACK_CATCH
00303 }
00304 }