00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketConnection.h"
00028 #include "SocketAcceptor.h"
00029 #include "SocketConnector.h"
00030 #include "SocketInitiator.h"
00031 #include "Session.h"
00032 #include "Utility.h"
00033
00034 namespace FIX
00035 {
00036 SocketConnection::SocketConnection( int s, Sessions sessions,
00037 SocketMonitor* pMonitor )
00038 : m_socket( s ), m_sendLength( 0 ),
00039 m_sessions(sessions), m_pSession( 0 ), m_pMonitor( pMonitor )
00040 {
00041 FD_ZERO( &m_fds );
00042 FD_SET( m_socket, &m_fds );
00043 }
00044
00045 SocketConnection::SocketConnection( SocketInitiator& i,
00046 const SessionID& sessionID, int s,
00047 SocketMonitor* pMonitor )
00048 : m_socket( s ), m_sendLength( 0 ),
00049 m_pSession( i.getSession( sessionID, *this ) ),
00050 m_pMonitor( pMonitor )
00051 {
00052 FD_ZERO( &m_fds );
00053 FD_SET( m_socket, &m_fds );
00054 m_sessions.insert( sessionID );
00055 }
00056
00057 SocketConnection::~SocketConnection()
00058 {
00059 if ( m_pSession )
00060 Session::unregisterSession( m_pSession->getSessionID() );
00061 }
00062
00063 bool SocketConnection::send( const std::string& msg )
00064 { QF_STACK_PUSH(SocketConnection::send)
00065
00066 Locker l( m_mutex );
00067
00068 m_sendQueue.push_back( msg );
00069 processQueue();
00070 signal();
00071 return true;
00072
00073 QF_STACK_POP
00074 }
00075
00076 bool SocketConnection::processQueue()
00077 { QF_STACK_PUSH(SocketConnection::processQueue)
00078
00079 Locker l( m_mutex );
00080
00081 if( !m_sendQueue.size() ) return true;
00082
00083 struct timeval timeout = { 0, 0 };
00084 fd_set writeset = m_fds;
00085 if( select( 1 + m_socket, 0, &writeset, 0, &timeout ) <= 0 )
00086 return false;
00087
00088 const std::string& msg = m_sendQueue.front();
00089
00090 int result = socket_send
00091 ( m_socket, msg.c_str() + m_sendLength, msg.length() - m_sendLength );
00092
00093 if( result > 0 )
00094 m_sendLength += result;
00095
00096 if( m_sendLength == msg.length() )
00097 {
00098 m_sendLength = 0;
00099 m_sendQueue.pop_front();
00100 }
00101
00102 return !m_sendQueue.size();
00103
00104 QF_STACK_POP
00105 }
00106
00107 void SocketConnection::disconnect()
00108 { QF_STACK_PUSH(SocketConnection::disconnect)
00109
00110 if ( m_pMonitor )
00111 m_pMonitor->drop( m_socket );
00112
00113 QF_STACK_POP
00114 }
00115
00116 bool SocketConnection::read( SocketConnector& s )
00117 { QF_STACK_PUSH(SocketConnection::read)
00118
00119 if ( !m_pSession ) return false;
00120
00121 try
00122 {
00123 readFromSocket();
00124 readMessages( s.getMonitor() );
00125 }
00126 catch( SocketRecvFailed& e )
00127 {
00128 m_pSession->getLog()->onEvent( e.what() );
00129 s.getMonitor().drop( m_socket );
00130 }
00131 return true;
00132
00133 QF_STACK_POP
00134 }
00135
00136 bool SocketConnection::read( SocketAcceptor& a, SocketServer& s )
00137 { QF_STACK_PUSH(SocketConnection::read)
00138
00139 std::string msg;
00140 try
00141 {
00142 readFromSocket();
00143
00144 if ( !m_pSession )
00145 {
00146 if ( !readMessage( msg ) ) return false;
00147 m_pSession = Session::lookupSession( msg, true );
00148 if( !isValidSession() )
00149 m_pSession = 0;
00150 if( m_pSession )
00151 m_pSession = a.getSession( msg, *this );
00152 if( m_pSession )
00153 m_pSession->next( msg );
00154 if( !m_pSession )
00155 {
00156 s.getMonitor().drop( m_socket );
00157 return false;
00158 }
00159
00160 Session::registerSession( m_pSession->getSessionID() );
00161 }
00162
00163 readMessages( s.getMonitor() );
00164 return true;
00165 }
00166 catch ( SocketRecvFailed& e )
00167 {
00168 if( m_pSession )
00169 m_pSession->getLog()->onEvent( e.what() );
00170 s.getMonitor().drop( m_socket );
00171 }
00172 catch ( InvalidMessage& )
00173 {
00174 s.getMonitor().drop( m_socket );
00175 }
00176 return false;
00177
00178 QF_STACK_POP
00179 }
00180
00181 bool SocketConnection::isValidSession()
00182 { QF_STACK_PUSH(SocketConnection::isValidSession)
00183
00184 if( m_pSession == 0 )
00185 return false;
00186 SessionID sessionID = m_pSession->getSessionID();
00187 if( Session::isSessionRegistered(sessionID) )
00188 return false;
00189 return !( m_sessions.find(sessionID) == m_sessions.end() );
00190
00191 QF_STACK_POP
00192 }
00193
00194 void SocketConnection::readFromSocket()
00195 throw( SocketRecvFailed )
00196 { QF_STACK_PUSH(SocketConnection::readFromSocket)
00197
00198 int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00199 if( size <= 0 ) throw SocketRecvFailed( size );
00200 m_parser.addToStream( m_buffer, size );
00201
00202 QF_STACK_POP
00203 }
00204
00205 bool SocketConnection::readMessage( std::string& msg )
00206 { QF_STACK_PUSH(SocketConnection::readMessage)
00207
00208 try
00209 {
00210 return m_parser.readFixMessage( msg );
00211 }
00212 catch ( MessageParseError& ) {}
00213 return true;
00214
00215 QF_STACK_POP
00216 }
00217
00218 void SocketConnection::readMessages( SocketMonitor& s )
00219 {
00220 if( !m_pSession ) return;
00221
00222 std::string msg;
00223 while( readMessage( msg ) )
00224 {
00225 try
00226 {
00227 m_pSession->next( msg );
00228 }
00229 catch ( InvalidMessage& )
00230 {
00231 if( !m_pSession->isLoggedOn() )
00232 s.drop( m_socket );
00233 }
00234 }
00235 }
00236
00237 void SocketConnection::onTimeout()
00238 { QF_STACK_PUSH(SocketConnection::onTimeout)
00239 if ( m_pSession ) m_pSession->next();
00240 QF_STACK_POP
00241 }
00242 }