00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "Acceptor.h"
00028 #include "Utility.h"
00029 #include "Session.h"
00030 #include "SessionFactory.h"
00031 #include <algorithm>
00032 #include <fstream>
00033
00034 namespace FIX
00035 {
00036 Acceptor::Acceptor( Application& application,
00037 MessageStoreFactory& messageStoreFactory,
00038 const SessionSettings& settings )
00039 throw( ConfigError )
00040 : m_threadid( 0 ),
00041 m_application( application ),
00042 m_messageStoreFactory( messageStoreFactory ),
00043 m_settings( settings ),
00044 m_pLogFactory( 0 ),
00045 m_pLog( 0 ),
00046 m_firstPoll( true ),
00047 m_stop( false )
00048 {
00049 initialize();
00050 }
00051
00052 Acceptor::Acceptor( Application& application,
00053 MessageStoreFactory& messageStoreFactory,
00054 const SessionSettings& settings,
00055 LogFactory& logFactory )
00056 throw( ConfigError )
00057 : m_threadid( 0 ),
00058 m_application( application ),
00059 m_messageStoreFactory( messageStoreFactory ),
00060 m_settings( settings ),
00061 m_pLogFactory( &logFactory ),
00062 m_pLog( logFactory.create() ),
00063 m_firstPoll( true ),
00064 m_stop( false )
00065 {
00066 initialize();
00067 }
00068
00069 void Acceptor::initialize() throw ( ConfigError )
00070 { QF_STACK_PUSH( Acceptor::initialize )
00071
00072 std::set < SessionID > sessions = m_settings.getSessions();
00073 std::set < SessionID > ::iterator i;
00074
00075 if ( !sessions.size() )
00076 throw ConfigError( "No sessions defined" );
00077
00078 SessionFactory factory( m_application, m_messageStoreFactory,
00079 m_pLogFactory );
00080
00081 for ( i = sessions.begin(); i != sessions.end(); ++i )
00082 {
00083 if ( m_settings.get( *i ).getString( CONNECTION_TYPE ) == "acceptor" )
00084 {
00085 m_sessionIDs.insert( *i );
00086 m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
00087 }
00088 }
00089
00090 if ( !m_sessions.size() )
00091 throw ConfigError( "No sessions defined for acceptor" );
00092
00093 QF_STACK_POP
00094 }
00095
00096 Acceptor::~Acceptor()
00097 { QF_STACK_IGNORE_BEGIN
00098
00099 Sessions::iterator i;
00100 for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
00101 delete i->second;
00102
00103 if( m_pLogFactory && m_pLog )
00104 m_pLogFactory->destroy( m_pLog );
00105
00106 QF_STACK_IGNORE_END
00107 }
00108
00109 Session* Acceptor::getSession
00110 ( const std::string& msg, Responder& responder )
00111 { QF_STACK_PUSH( Acceptor::getSession )
00112
00113 Message message;
00114 if ( !message.setStringHeader( msg ) )
00115 return 0;
00116
00117 BeginString beginString;
00118 SenderCompID clSenderCompID;
00119 TargetCompID clTargetCompID;
00120 MsgType msgType;
00121 try
00122 {
00123 message.getHeader().getField( beginString );
00124 message.getHeader().getField( clSenderCompID );
00125 message.getHeader().getField( clTargetCompID );
00126 message.getHeader().getField( msgType );
00127 if ( msgType != "A" ) return 0;
00128
00129 SenderCompID senderCompID( clTargetCompID );
00130 TargetCompID targetCompID( clSenderCompID );
00131 SessionID sessionID( beginString, senderCompID, targetCompID );
00132
00133 Sessions::iterator i = m_sessions.find( sessionID );
00134 if ( i != m_sessions.end() )
00135 {
00136 i->second->setResponder( &responder );
00137 return i->second;
00138 }
00139 }
00140 catch ( FieldNotFound& ) {}
00141 return 0;
00142
00143 QF_STACK_POP
00144 }
00145
00146 void Acceptor::start() throw ( ConfigError, RuntimeError )
00147 { QF_STACK_PUSH( Acceptor::start )
00148
00149 m_stop = false;
00150 onConfigure( m_settings );
00151 onInitialize( m_settings );
00152
00153 if( !thread_spawn( &startThread, this, m_threadid ) )
00154 throw RuntimeError("Unable to spawn thread");
00155
00156 QF_STACK_POP
00157 }
00158
00159 void Acceptor::block() throw ( ConfigError, RuntimeError )
00160 { QF_STACK_PUSH( Acceptor::start )
00161
00162 onConfigure( m_settings );
00163 onInitialize( m_settings );
00164
00165 startThread(this);
00166
00167 QF_STACK_POP
00168 }
00169
00170 bool Acceptor::poll() throw ( ConfigError, RuntimeError )
00171 { QF_STACK_PUSH( Acceptor::poll )
00172
00173 if( m_firstPoll )
00174 {
00175 onConfigure( m_settings );
00176 onInitialize( m_settings );
00177 m_firstPoll = false;
00178 }
00179
00180 return onPoll();
00181
00182 QF_STACK_POP
00183 }
00184
00185 void Acceptor::stop( bool force )
00186 { QF_STACK_PUSH( Acceptor::stop )
00187
00188 if( isStopped() ) return;
00189
00190 std::vector<Session*> enabledSessions;
00191
00192 Sessions sessions = m_sessions;
00193 Sessions::iterator i = sessions.begin();
00194 for ( ; i != sessions.end(); ++i )
00195 {
00196 Session* pSession = Session::lookupSession(i->first);
00197 if( pSession->isEnabled() )
00198 {
00199 enabledSessions.push_back( pSession );
00200 pSession->logout();
00201 Session::unregisterSession( pSession->getSessionID() );
00202 }
00203 }
00204
00205 if( !force )
00206 {
00207 for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
00208 process_sleep( 1 );
00209 }
00210
00211 m_stop = true;
00212 onStop();
00213 if( m_threadid )
00214 thread_join( m_threadid );
00215 m_threadid = 0;
00216
00217 std::vector<Session*>::iterator session = enabledSessions.begin();
00218 for( ; session != enabledSessions.end(); ++session )
00219 (*session)->logon();
00220
00221 QF_STACK_POP
00222 }
00223
00224 bool Acceptor::isLoggedOn()
00225 { QF_STACK_PUSH(Acceptor::isLoggedOn)
00226
00227 Sessions sessions = m_sessions;
00228 Sessions::iterator i = sessions.begin();
00229 for ( ; i != sessions.end(); ++i )
00230 {
00231 if( i->second->isLoggedOn() )
00232 return true;
00233 }
00234 return false;
00235
00236 QF_STACK_POP
00237 }
00238
00239 THREAD_PROC Acceptor::startThread( void* p )
00240 { QF_STACK_TRY
00241 QF_STACK_PUSH( Acceptor::startThread )
00242
00243 Acceptor * pAcceptor = static_cast < Acceptor* > ( p );
00244 pAcceptor->onStart();
00245 return 0;
00246
00247 QF_STACK_POP
00248 QF_STACK_CATCH
00249 }
00250 }