Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

FIX::ThreadedSocketInitiator Class Reference
[User]

Threaded Socket implementation of Initiator. More...

#include <ThreadedSocketInitiator.h>

Inheritance diagram for FIX::ThreadedSocketInitiator:

Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketInitiator:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
virtual ~ThreadedSocketInitiator ()

Private Types

typedef std::map< int, int > SocketToThread
typedef std::map< SessionID,
int > 
SessionToHostNum
typedef std::pair< ThreadedSocketInitiator *,
ThreadedSocketConnection * > 
ThreadPair

Private Member Functions

void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor.

void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize initiator.

void onStart ()
 Implemented to start connecting to targets.

bool onPoll ()
 Implemented to connect and poll for events.

void onStop ()
 Implemented to stop a running initiator.

bool doConnect (const SessionID &s, const Dictionary &d)
 Implemented to connect a session to its target.

void addThread (int s, int t)
void removeThread (int s)
void getHost (const SessionID &, const Dictionary &, std::string &, short &)

Static Private Member Functions

THREAD_PROC socketThread (void *p)

Private Attributes

SessionSettings m_settings
SessionToHostNum m_sessionToHostNum
time_t m_lastConnect
int m_reconnectInterval
bool m_noDelay
bool m_stop
SocketToThread m_threads
Mutex m_mutex

Detailed Description

Threaded Socket implementation of Initiator.

Definition at line 39 of file ThreadedSocketInitiator.h.


Member Typedef Documentation

typedef std::map< SessionID, int > FIX::ThreadedSocketInitiator::SessionToHostNum [private]
 

Definition at line 52 of file ThreadedSocketInitiator.h.

typedef std::map< int, int > FIX::ThreadedSocketInitiator::SocketToThread [private]
 

Definition at line 51 of file ThreadedSocketInitiator.h.

typedef std::pair< ThreadedSocketInitiator*, ThreadedSocketConnection* > FIX::ThreadedSocketInitiator::ThreadPair [private]
 

Definition at line 53 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and socketThread().


Constructor & Destructor Documentation

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator Application ,
MessageStoreFactory ,
const SessionSettings
throw ( ConfigError )
 

Definition at line 33 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

00037 : Initiator( application, factory, settings ),
00038   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false )
00039 { socket_init(); }

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator Application ,
MessageStoreFactory ,
const SessionSettings ,
LogFactory
throw ( ConfigError )
 

Definition at line 41 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

00046 : Initiator( application, factory, settings, logFactory ),
00047   m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false )
00048 { socket_init(); }

FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator  )  [virtual]
 

Definition at line 50 of file ThreadedSocketInitiator.cpp.

References FIX::socket_term().

00051 { socket_term(); }


Member Function Documentation

void FIX::ThreadedSocketInitiator::addThread int  s,
int  t
[private]
 

Definition at line 173 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

Referenced by doConnect().

00174 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread)
00175 
00176   Locker l(m_mutex);
00177 
00178   m_threads[ s ] = t;
00179   QF_STACK_POP
00180 }

bool FIX::ThreadedSocketInitiator::doConnect const SessionID s,
const Dictionary d
[private, virtual]
 

Implemented to connect a session to its target.

Implements FIX::Initiator.

Definition at line 130 of file ThreadedSocketInitiator.cpp.

References addThread(), FIX::Initiator::getApplication(), getHost(), FIX::Session::getLog(), FIX::Session::isSessionTime(), FIX::Log::onEvent(), QF_STACK_POP, QF_STACK_PUSH, FIX::socket_connect(), FIX::socket_createConnector(), socketThread(), FIX::thread_spawn(), and ThreadPair.

00131 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect)
00132 
00133   try
00134   {
00135     Session* session = Session::lookupSession( s );
00136     if( !session->isSessionTime() ) return false;
00137 
00138     std::string address;
00139     short port = 0;
00140     getHost( s, d, address, port );
00141 
00142     Log* log = session->getLog();
00143     log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00144     int socket = socket_createConnector();
00145 
00146     if( socket_connect(socket, address.c_str(), port) < 0 )
00147     {
00148       log->onEvent( "Connection failed" );
00149       return false;
00150     }
00151 
00152     log->onEvent( "Connection succeeded" );
00153 
00154     ThreadedSocketConnection* pConnection =
00155       new ThreadedSocketConnection( s, socket, getApplication() );
00156 
00157     ThreadPair* pair = new ThreadPair( this, pConnection );
00158 
00159     {
00160       Locker l( m_mutex );
00161       unsigned thread;
00162       if ( !thread_spawn( &socketThread, pair, thread ) )
00163         delete pair;
00164       addThread( socket, thread );
00165     }
00166     return true;
00167   }
00168   catch ( std::exception& ) { return false; }
00169 
00170   QF_STACK_POP
00171 }

void FIX::ThreadedSocketInitiator::getHost const SessionID ,
const Dictionary ,
std::string &  ,
short & 
[private]
 

Definition at line 224 of file ThreadedSocketInitiator.cpp.

References FIX::Dictionary::getLong(), FIX::Dictionary::getString(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_CONNECT_HOST, and FIX::SOCKET_CONNECT_PORT.

Referenced by doConnect().

00226 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost)
00227 
00228   int num = 0;
00229   SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00230   if ( i != m_sessionToHostNum.end() ) num = i->second;
00231 
00232   try
00233   {
00234     std::stringstream hostStream;
00235     hostStream << SOCKET_CONNECT_HOST << num;
00236     address = d.getString( hostStream.str() );
00237 
00238     std::stringstream portStream;
00239     portStream << SOCKET_CONNECT_PORT << num;
00240     port = ( short ) d.getLong( portStream.str() );
00241   }
00242   catch ( ConfigError& )
00243   {
00244     num = 0;
00245     address = d.getString( SOCKET_CONNECT_HOST );
00246     port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00247   }
00248   m_sessionToHostNum[ s ] = ++num;
00249 
00250   QF_STACK_POP
00251 }

void FIX::ThreadedSocketInitiator::onConfigure const SessionSettings s  )  throw ( ConfigError ) [private, virtual]
 

Implemented to configure acceptor.

Reimplemented from FIX::Initiator.

Definition at line 53 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, QF_STACK_PUSH, and FIX::SOCKET_NODELAY.

00055 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure)
00056 
00057   try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); }
00058   catch ( std::exception& ) {}
00059   if( s.get().has( SOCKET_NODELAY ) )
00060     m_noDelay = s.get().getBool( SOCKET_NODELAY );
00061 
00062   QF_STACK_POP
00063 }

void FIX::ThreadedSocketInitiator::onInitialize const SessionSettings s  )  throw ( RuntimeError ) [private, virtual]
 

Implemented to initialize initiator.

Reimplemented from FIX::Initiator.

Definition at line 65 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

00067 { QF_STACK_PUSH(ThreadedSocketInitiator::onInitialize)
00068   QF_STACK_POP
00069 }

bool FIX::ThreadedSocketInitiator::onPoll  )  [private, virtual]
 

Implemented to connect and poll for events.

Implements FIX::Initiator.

Definition at line 92 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

00093 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll)
00094 
00095   return false;
00096 
00097   QF_STACK_POP
00098 }

void FIX::ThreadedSocketInitiator::onStart  )  [private, virtual]
 

Implemented to start connecting to targets.

Implements FIX::Initiator.

Definition at line 71 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::connect(), FIX::Initiator::isStopped(), FIX::process_sleep(), QF_STACK_POP, and QF_STACK_PUSH.

00072 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart)
00073 
00074   while ( !isStopped() )
00075   {
00076     time_t now;
00077     ::time( &now );
00078 
00079     if ( (now - m_lastConnect) >= m_reconnectInterval )
00080     {
00081       Locker l( m_mutex );
00082       connect();
00083       m_lastConnect = now;
00084     }
00085 
00086     process_sleep( 1 );
00087   }
00088 
00089   QF_STACK_POP
00090 }

void FIX::ThreadedSocketInitiator::onStop  )  [private, virtual]
 

Implemented to stop a running initiator.

Implements FIX::Initiator.

Definition at line 100 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::isLoggedOn(), QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), and FIX::thread_join().

00101 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop)
00102 
00103   SocketToThread threads;
00104   SocketToThread::iterator i;
00105   
00106   Locker l(m_mutex);
00107 
00108   time_t start = 0;
00109   time_t now = 0;
00110 
00111   ::time( &start );
00112   while ( isLoggedOn() )
00113   {
00114     if( ::time(&now) -5 >= start )
00115       break;
00116   }
00117 
00118   threads = m_threads;
00119 
00120   for ( i = threads.begin(); i != threads.end(); ++i )
00121     socket_close( i->first );
00122   
00123   for ( i = threads.begin(); i != threads.end(); ++i )
00124     thread_join( i->second );
00125   threads.clear();
00126 
00127   QF_STACK_POP
00128 }

void FIX::ThreadedSocketInitiator::removeThread int  s  )  [private]
 

Definition at line 182 of file ThreadedSocketInitiator.cpp.

References QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach().

Referenced by socketThread().

00183 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread)
00184 
00185   Locker l(m_mutex);
00186   SocketToThread::iterator i = m_threads.find( s );
00187 
00188   if ( i != m_threads.end() )
00189   {
00190     thread_detach( i->second );
00191     m_threads.erase( i );
00192   }
00193 
00194   QF_STACK_POP
00195 }

THREAD_PROC FIX::ThreadedSocketInitiator::socketThread void *  p  )  [static, private]
 

Definition at line 197 of file ThreadedSocketInitiator.cpp.

References FIX::ThreadedSocketConnection::getSession(), FIX::Session::getSessionID(), FIX::ThreadedSocketConnection::getSocket(), FIX::Initiator::isStopped(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, FIX::ThreadedSocketConnection::read(), removeThread(), FIX::Initiator::setConnected(), FIX::Initiator::setDisconnected(), THREAD_PROC, and ThreadPair.

Referenced by doConnect().

00198 { QF_STACK_TRY
00199   QF_STACK_PUSH(ThreadedSocketInitiator::socketThread)
00200 
00201   ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
00202 
00203   ThreadedSocketInitiator* pInitiator = pair->first;
00204   ThreadedSocketConnection* pConnection = pair->second;
00205   FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
00206   delete pair;
00207 
00208   pInitiator->setConnected( sessionID );
00209   int socket = pConnection->getSocket();
00210 
00211   while ( pConnection->read() ) {}
00212 
00213   delete pConnection;
00214   if( !pInitiator->isStopped() )
00215     pInitiator->removeThread( socket );
00216   
00217   pInitiator->setDisconnected( sessionID );
00218   return 0;
00219 
00220   QF_STACK_POP
00221   QF_STACK_CATCH
00222 }


Member Data Documentation

time_t FIX::ThreadedSocketInitiator::m_lastConnect [private]
 

Definition at line 72 of file ThreadedSocketInitiator.h.

Mutex FIX::ThreadedSocketInitiator::m_mutex [private]
 

Reimplemented from FIX::Initiator.

Definition at line 77 of file ThreadedSocketInitiator.h.

bool FIX::ThreadedSocketInitiator::m_noDelay [private]
 

Definition at line 74 of file ThreadedSocketInitiator.h.

int FIX::ThreadedSocketInitiator::m_reconnectInterval [private]
 

Definition at line 73 of file ThreadedSocketInitiator.h.

SessionToHostNum FIX::ThreadedSocketInitiator::m_sessionToHostNum [private]
 

Definition at line 71 of file ThreadedSocketInitiator.h.

SessionSettings FIX::ThreadedSocketInitiator::m_settings [private]
 

Reimplemented from FIX::Initiator.

Definition at line 70 of file ThreadedSocketInitiator.h.

bool FIX::ThreadedSocketInitiator::m_stop [private]
 

Reimplemented from FIX::Initiator.

Definition at line 75 of file ThreadedSocketInitiator.h.

SocketToThread FIX::ThreadedSocketInitiator::m_threads [private]
 

Definition at line 76 of file ThreadedSocketInitiator.h.


The documentation for this class was generated from the following files:
Generated on Mon Jul 24 19:36:55 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001