00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "ThreadedSocketConnection.h"
00028 #include "ThreadedSocketAcceptor.h"
00029 #include "ThreadedSocketInitiator.h"
00030 #include "Session.h"
00031 #include "Utility.h"
00032
00033 namespace FIX
00034 {
00035 ThreadedSocketConnection::ThreadedSocketConnection( int s, Sessions sessions, Application& application )
00036 : m_socket( s ), m_application( application ),
00037 m_sessions( sessions ), m_pSession( 0 )
00038 {
00039 FD_ZERO( &m_fds );
00040 FD_SET( m_socket, &m_fds );
00041 }
00042
00043 ThreadedSocketConnection::ThreadedSocketConnection( const SessionID& sessionID, int s,
00044 Application& application )
00045
00046 : m_socket( s ), m_application( application ),
00047 m_pSession( Session::lookupSession( sessionID ) )
00048 {
00049 FD_ZERO( &m_fds );
00050 FD_SET( m_socket, &m_fds );
00051 if ( m_pSession ) m_pSession->setResponder( this );
00052 }
00053
00054 ThreadedSocketConnection::~ThreadedSocketConnection()
00055 {
00056 if ( m_pSession )
00057 {
00058 m_pSession->setResponder( 0 );
00059 Session::unregisterSession( m_pSession->getSessionID() );
00060 }
00061 }
00062
00063 bool ThreadedSocketConnection::send( const std::string& msg )
00064 { QF_STACK_PUSH(ThreadedSocketConnection::send)
00065 return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0;
00066 QF_STACK_POP
00067 }
00068
00069 void ThreadedSocketConnection::disconnect()
00070 { QF_STACK_PUSH(ThreadedSocketConnection::disconnect)
00071
00072 m_disconnect = true;
00073 socket_close( m_socket );
00074
00075 QF_STACK_POP
00076 }
00077
00078 bool ThreadedSocketConnection::read()
00079 { QF_STACK_PUSH(ThreadedSocketConnection::read)
00080
00081 struct timeval timeout = { 1, 0 };
00082 fd_set readset = m_fds;
00083
00084 try
00085 {
00086
00087 int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
00088
00089 if( result > 0 )
00090 {
00091
00092 int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00093 if ( size <= 0 ) { throw SocketRecvFailed( result ); }
00094 m_parser.addToStream( m_buffer, size );
00095 }
00096 else if( result == 0 && m_pSession )
00097 {
00098 m_pSession->next();
00099 }
00100 else if( result < 0 )
00101 {
00102 throw SocketRecvFailed( result );
00103 }
00104
00105 processStream();
00106 return true;
00107 }
00108 catch ( SocketRecvFailed& e )
00109 {
00110 if( m_disconnect )
00111 return false;
00112
00113 if( m_pSession )
00114 {
00115 m_pSession->getLog()->onEvent( e.what() );
00116 m_pSession->disconnect();
00117 }
00118 else
00119 {
00120 disconnect();
00121 }
00122
00123 return false;
00124 }
00125
00126 QF_STACK_POP
00127 }
00128
00129 bool ThreadedSocketConnection::readMessage( std::string& msg )
00130 throw( SocketRecvFailed )
00131 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage)
00132
00133 try
00134 {
00135 return m_parser.readFixMessage( msg );
00136 }
00137 catch ( MessageParseError& ) {}
00138 return true;
00139
00140 QF_STACK_POP
00141 }
00142
00143 void ThreadedSocketConnection::processStream()
00144 { QF_STACK_PUSH(ThreadedSocketConnection::processStream)
00145
00146 std::string msg;
00147 while( readMessage(msg) )
00148 {
00149 if ( !m_pSession )
00150 {
00151 if ( !setSession( msg ) )
00152 { disconnect(); continue; }
00153 }
00154 try
00155 {
00156 m_pSession->next( msg );
00157 }
00158 catch( InvalidMessage& )
00159 {
00160 if( !m_pSession->isLoggedOn() )
00161 {
00162 disconnect();
00163 return;
00164 }
00165 }
00166 }
00167
00168 QF_STACK_POP
00169 }
00170
00171 bool ThreadedSocketConnection::setSession( const std::string& msg )
00172 { QF_STACK_PUSH(ThreadedSocketConnection::setSession)
00173
00174 m_pSession = Session::lookupSession( msg, true );
00175 if ( !m_pSession ) return false;
00176 SessionID sessionID = m_pSession->getSessionID();
00177 m_pSession = 0;
00178
00179
00180 for( int i = 1; i <= 5; i++ )
00181 {
00182 if( !Session::isSessionRegistered( sessionID ) )
00183 m_pSession = Session::registerSession( sessionID );
00184 if( m_pSession ) break;
00185 process_sleep( 1 );
00186 }
00187
00188 if ( !m_pSession )
00189 return false;
00190 if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
00191 return false;
00192
00193 m_pSession->setResponder( this );
00194 return true;
00195
00196 QF_STACK_POP
00197 }
00198
00199 }