Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

/home/orenmnero/autobuild/quickfix/src/C++/ThreadedSocketConnection.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "ThreadedSocketConnection.h"
00028 #include "ThreadedSocketAcceptor.h"
00029 #include "ThreadedSocketInitiator.h"
00030 #include "Session.h"
00031 #include "Utility.h"
00032 
00033 namespace FIX
00034 {
00035 ThreadedSocketConnection::ThreadedSocketConnection( int s, Sessions sessions, Application& application )
00036 : m_socket( s ), m_application( application ),
00037   m_sessions( sessions ), m_pSession( 0 ) 
00038 {
00039   FD_ZERO( &m_fds );
00040   FD_SET( m_socket, &m_fds );
00041 }
00042 
00043 ThreadedSocketConnection::ThreadedSocketConnection( const SessionID& sessionID, int s,
00044     Application& application )
00045 
00046 : m_socket( s ), m_application( application ),
00047   m_pSession( Session::lookupSession( sessionID ) )
00048 {
00049   FD_ZERO( &m_fds );
00050   FD_SET( m_socket, &m_fds );
00051   if ( m_pSession ) m_pSession->setResponder( this );
00052 }
00053 
00054 ThreadedSocketConnection::~ThreadedSocketConnection()
00055 {
00056   if ( m_pSession )
00057   {
00058     m_pSession->setResponder( 0 );
00059     Session::unregisterSession( m_pSession->getSessionID() );
00060   }
00061 }
00062 
00063 bool ThreadedSocketConnection::send( const std::string& msg )
00064 { QF_STACK_PUSH(ThreadedSocketConnection::send)
00065   return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0;
00066   QF_STACK_POP
00067 }
00068 
00069 void ThreadedSocketConnection::disconnect()
00070 { QF_STACK_PUSH(ThreadedSocketConnection::disconnect)
00071   
00072   m_disconnect = true;
00073   socket_close( m_socket );
00074 
00075   QF_STACK_POP
00076 }
00077 
00078 bool ThreadedSocketConnection::read()
00079 { QF_STACK_PUSH(ThreadedSocketConnection::read)
00080 
00081   struct timeval timeout = { 1, 0 };
00082   fd_set readset = m_fds;
00083 
00084   try
00085   {
00086     // Wait for input (1 second timeout)
00087     int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
00088 
00089     if( result > 0 ) // Something to read
00090     {
00091       // We can read without blocking
00092       int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00093       if ( size <= 0 ) { throw SocketRecvFailed( result ); }
00094       m_parser.addToStream( m_buffer, size );
00095     }
00096     else if( result == 0 && m_pSession ) // Timeout
00097     {
00098       m_pSession->next();
00099     }
00100     else if( result < 0 ) // Error
00101     {
00102       throw SocketRecvFailed( result );
00103     }
00104 
00105     processStream();
00106     return true;
00107   }
00108   catch ( SocketRecvFailed& e )
00109   {
00110     if( m_disconnect )
00111           return false;
00112 
00113     if( m_pSession )
00114     {
00115       m_pSession->getLog()->onEvent( e.what() );
00116       m_pSession->disconnect();
00117     }
00118     else
00119     {
00120       disconnect();
00121     }
00122 
00123     return false;
00124   }
00125 
00126   QF_STACK_POP
00127 }
00128 
00129 bool ThreadedSocketConnection::readMessage( std::string& msg )
00130 throw( SocketRecvFailed )
00131 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage)
00132 
00133   try
00134   {
00135     return m_parser.readFixMessage( msg );
00136   }
00137   catch ( MessageParseError& ) {}
00138   return true;
00139 
00140   QF_STACK_POP
00141 }
00142 
00143 void ThreadedSocketConnection::processStream()
00144 { QF_STACK_PUSH(ThreadedSocketConnection::processStream)
00145 
00146   std::string msg;
00147   while( readMessage(msg) )
00148   {
00149     if ( !m_pSession )
00150     {
00151       if ( !setSession( msg ) )
00152       { disconnect(); continue; }
00153     }
00154     try
00155     {
00156       m_pSession->next( msg );
00157     }
00158     catch( InvalidMessage& )
00159     {
00160       if( !m_pSession->isLoggedOn() )
00161       {
00162         disconnect();
00163         return;
00164       }
00165     }
00166   }
00167 
00168   QF_STACK_POP
00169 }
00170 
00171 bool ThreadedSocketConnection::setSession( const std::string& msg )
00172 { QF_STACK_PUSH(ThreadedSocketConnection::setSession)
00173 
00174   m_pSession = Session::lookupSession( msg, true );
00175   if ( !m_pSession ) return false;
00176   SessionID sessionID = m_pSession->getSessionID();
00177   m_pSession = 0;
00178 
00179   // see if the session frees up within 5 seconds
00180   for( int i = 1; i <= 5; i++ )
00181   {
00182     if( !Session::isSessionRegistered( sessionID ) )
00183       m_pSession = Session::registerSession( sessionID );
00184     if( m_pSession ) break;
00185     process_sleep( 1 );
00186   }
00187 
00188   if ( !m_pSession ) 
00189     return false;
00190   if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
00191     return false;
00192 
00193   m_pSession->setResponder( this );
00194   return true;
00195 
00196   QF_STACK_POP
00197 }
00198 
00199 } // namespace FIX

Generated on Mon Jul 24 19:36:30 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001