Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

/home/orenmnero/autobuild/quickfix/src/C++/ThreadedSocketAcceptor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "ThreadedSocketAcceptor.h"
00028 #include "Settings.h"
00029 #include "Utility.h"
00030 
00031 namespace FIX
00032 {
00033 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00034   Application& application,
00035   MessageStoreFactory& factory,
00036   const SessionSettings& settings ) throw( ConfigError )
00037 : Acceptor( application, factory, settings )
00038 { socket_init(); }
00039 
00040 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00041   Application& application,
00042   MessageStoreFactory& factory,
00043   const SessionSettings& settings,
00044   LogFactory& logFactory ) throw( ConfigError )
00045 : Acceptor( application, factory, settings, logFactory )
00046 { socket_init(); }
00047 
00048 ThreadedSocketAcceptor::~ThreadedSocketAcceptor()
00049 { socket_term(); }
00050 
00051 void ThreadedSocketAcceptor::onConfigure( const SessionSettings& s )
00052 throw ( ConfigError )
00053 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00054 
00055   std::set<SessionID> sessions = s.getSessions();
00056   std::set<SessionID>::iterator i;
00057   for( i = sessions.begin(); i != sessions.end(); ++i )
00058   {
00059     const Dictionary& settings = s.get( *i );
00060     settings.getLong( SOCKET_ACCEPT_PORT );
00061     if( settings.has(SOCKET_REUSE_ADDRESS) )
00062       settings.getBool( SOCKET_REUSE_ADDRESS );
00063     if( settings.has(SOCKET_NODELAY) )
00064       settings.getBool( SOCKET_NODELAY );
00065   }
00066 
00067   QF_STACK_POP
00068 }
00069 
00070 void ThreadedSocketAcceptor::onInitialize( const SessionSettings& s )
00071 throw ( RuntimeError )
00072 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00073 
00074   short port = 0;
00075   bool reuseAddress = false;
00076   bool noDelay = false;
00077 
00078   std::set<int> ports;
00079 
00080   std::set<SessionID> sessions = s.getSessions();
00081   std::set<SessionID>::iterator i = sessions.begin();
00082   for( ; i != sessions.end(); ++i )
00083   {
00084     Dictionary settings = s.get( *i );
00085     port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00086 
00087     m_portToSessions[port].insert( *i );
00088 
00089     if( ports.find(port) != ports.end() )
00090       continue;
00091     ports.insert( port );
00092 
00093     if( settings.has( SOCKET_REUSE_ADDRESS ) )
00094       reuseAddress = s.get().getBool( SOCKET_REUSE_ADDRESS );
00095     if( settings.has( SOCKET_NODELAY ) )
00096       noDelay = s.get().getBool( SOCKET_NODELAY );
00097 
00098     int socket = socket_createAcceptor( port, reuseAddress );
00099     if( socket < 0 )
00100     {
00101       socket_close( socket );
00102       throw RuntimeError( "Unable to create, bind, or listen to port " + IntConvertor::convert( (unsigned short)port ) );
00103     }
00104     if( noDelay )
00105       socket_setsockopt( socket, TCP_NODELAY );
00106 
00107     m_socketToPort[socket] = port;
00108     m_sockets.insert( socket );
00109   }    
00110 
00111   QF_STACK_POP
00112 }
00113 
00114 void ThreadedSocketAcceptor::onStart()
00115 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00116 
00117   Sockets::iterator i;
00118   for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00119   {
00120     Locker l( m_mutex );
00121     int port = m_socketToPort[*i];
00122     AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00123     unsigned thread;
00124     thread_spawn( &socketAcceptorThread, info, thread );
00125     addThread( *i, thread );
00126   }
00127 
00128   QF_STACK_POP
00129 }
00130 
00131 bool ThreadedSocketAcceptor::onPoll()
00132 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00133 
00134   return false;
00135 
00136   QF_STACK_POP
00137 }
00138 
00139 void ThreadedSocketAcceptor::onStop()
00140 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00141 
00142   Locker l(m_mutex);
00143 
00144   time_t start = 0;
00145   time_t now = 0;
00146 
00147   ::time( &start );
00148   while ( isLoggedOn() )
00149   {
00150     if( ::time(&now) -5 >= start )
00151       break;
00152   }
00153 
00154   SocketToThread threads;
00155   threads = m_threads;
00156 
00157   SocketToThread::iterator i;
00158   for ( i = threads.begin(); i != threads.end(); ++i )
00159     socket_close( i->first );
00160   for ( i = threads.begin(); i != threads.end(); ++i )
00161     thread_join( i->second );
00162   threads.clear();
00163 
00164   QF_STACK_POP
00165 }
00166 
00167 void ThreadedSocketAcceptor::addThread( int s, int t )
00168 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00169 
00170   Locker l(m_mutex);
00171 
00172   m_threads[ s ] = t;
00173 
00174   QF_STACK_POP
00175 }
00176 
00177 void ThreadedSocketAcceptor::removeThread( int s )
00178 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00179 
00180   Locker l(m_mutex);
00181   SocketToThread::iterator i = m_threads.find( s );
00182   if ( i != m_threads.end() )
00183   {
00184     thread_detach( i->second );
00185     m_threads.erase( i );
00186   }
00187 
00188   QF_STACK_POP
00189 }
00190 
00191 THREAD_PROC ThreadedSocketAcceptor::socketAcceptorThread( void* p )
00192 { QF_STACK_TRY
00193   QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00194 
00195   AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00196 
00197   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00198   int s = info->m_socket;
00199   int port = info->m_port;
00200   delete info;
00201 
00202   int noDelay = 0;
00203   socket_getsockopt( s, TCP_NODELAY, noDelay );
00204 
00205   int socket = 0;
00206   while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00207   {
00208     if( noDelay )
00209       socket_setsockopt( socket, TCP_NODELAY );
00210 
00211     Sessions sessions = pAcceptor->m_portToSessions[port];
00212 
00213     ThreadedSocketConnection * pConnection =
00214       new ThreadedSocketConnection( socket, sessions, pAcceptor->getApplication() );
00215 
00216     ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00217 
00218     {
00219       Locker l( pAcceptor->m_mutex );
00220 
00221       std::stringstream stream;
00222       stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00223       pAcceptor->log( stream.str() );
00224 
00225       unsigned thread;
00226       if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00227         delete info;
00228       pAcceptor->addThread( socket, thread );
00229     }
00230   }
00231 
00232   if( !pAcceptor->isStopped() )
00233     pAcceptor->removeThread( s );
00234   return 0;
00235 
00236   QF_STACK_POP
00237   QF_STACK_CATCH
00238 }
00239 
00240 THREAD_PROC ThreadedSocketAcceptor::socketConnectionThread( void* p )
00241 { QF_STACK_TRY
00242   QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00243 
00244   ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00245 
00246   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00247   ThreadedSocketConnection* pConnection = info->m_pConnection;
00248   delete info;
00249 
00250   int socket = pConnection->getSocket();
00251 
00252   while ( pConnection->read() ) {}
00253   delete pConnection;
00254   if( !pAcceptor->isStopped() )
00255     pAcceptor->removeThread( socket );
00256   return 0;
00257 
00258   QF_STACK_POP
00259   QF_STACK_CATCH
00260 }
00261 }

Generated on Mon Jul 24 19:36:30 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001