00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketMonitor.h"
00028 #include "Utility.h"
00029 #include <exception>
00030 #include <set>
00031 #include <algorithm>
00032 #include <iostream>
00033
00034 namespace FIX
00035 {
00036 SocketMonitor::SocketMonitor( int timeout )
00037 : m_timeout( timeout )
00038 {
00039 socket_init();
00040
00041 std::pair<int, int> sockets = socket_createpair();
00042 m_signal = sockets.first;
00043 m_interrupt = sockets.second;
00044 m_readSockets.insert( m_interrupt );
00045
00046 m_timeval.tv_sec = 0;
00047 m_timeval.tv_usec = 0;
00048 #ifndef SELECT_DECREMENTS_TIME
00049 m_ticks = clock();
00050 #endif
00051 }
00052
00053 SocketMonitor::~SocketMonitor()
00054 {
00055 Sockets::iterator i;
00056 for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i )
00057 socket_close( *i );
00058 socket_close( m_signal );
00059 socket_close( m_interrupt );
00060 socket_term();
00061 }
00062
00063 bool SocketMonitor::addConnect( int s )
00064 { QF_STACK_PUSH(SocketMonitor::addConnect)
00065
00066 socket_setnonblock( s );
00067 Sockets::iterator i = m_connectSockets.find( s );
00068 if( i != m_connectSockets.end() ) return false;
00069
00070 m_connectSockets.insert( s );
00071 return true;
00072
00073 QF_STACK_POP
00074 }
00075
00076 bool SocketMonitor::addRead( int s )
00077 { QF_STACK_PUSH(SocketMonitor::addRead)
00078
00079 socket_setnonblock( s );
00080 Sockets::iterator i = m_readSockets.find( s );
00081 if( i != m_readSockets.end() ) return false;
00082
00083 m_readSockets.insert( s );
00084 return true;
00085
00086 QF_STACK_POP
00087 }
00088
00089 bool SocketMonitor::addWrite( int s )
00090 { QF_STACK_PUSH(SocketMonitor::addWrite)
00091
00092 socket_setnonblock( s );
00093 Sockets::iterator i = m_writeSockets.find( s );
00094 if( i != m_writeSockets.end() ) return false;
00095
00096 m_writeSockets.insert( s );
00097 return true;
00098
00099 QF_STACK_POP
00100 }
00101
00102 bool SocketMonitor::drop( int s )
00103 { QF_STACK_PUSH(SocketMonitor::drop)
00104
00105 Sockets::iterator i = m_readSockets.find( s );
00106 Sockets::iterator j = m_writeSockets.find( s );
00107 Sockets::iterator k = m_connectSockets.find( s );
00108
00109 if ( i != m_readSockets.end() ||
00110 j != m_writeSockets.end() ||
00111 k != m_connectSockets.end() )
00112 {
00113 socket_close( s );
00114 m_readSockets.erase( s );
00115 m_writeSockets.erase( s );
00116 m_connectSockets.erase( s );
00117 m_dropped.push( s );
00118 return true;
00119 }
00120 return false;
00121
00122 QF_STACK_POP
00123 }
00124
00125 inline timeval* SocketMonitor::getTimeval( bool poll )
00126 { QF_STACK_PUSH(SocketMonitor::getTimeval)
00127
00128 if ( poll )
00129 {
00130 m_timeval.tv_sec = 0;
00131 m_timeval.tv_usec = 0;
00132 return &m_timeval;
00133 }
00134
00135 if ( !m_timeout )
00136 return 0;
00137 #ifdef SELECT_MODIFIES_TIMEVAL
00138 if ( !m_timeval.tv_sec && !m_timeval.tv_usec && m_timeout )
00139 m_timeval.tv_sec = m_timeout;
00140 return &m_timeval;
00141 #else
00142 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
00143 if ( elapsed >= m_timeout || elapsed == 0.0 )
00144 {
00145 m_ticks = clock();
00146 m_timeval.tv_sec = 0;
00147 m_timeval.tv_usec = m_timeout * 1000000;
00148 }
00149 else
00150 {
00151 m_timeval.tv_sec = 0;
00152 m_timeval.tv_usec = ( long ) ( ( m_timeout - elapsed ) * 1000000 );
00153 }
00154 return &m_timeval;
00155 #endif
00156
00157 QF_STACK_POP
00158 }
00159
00160 bool SocketMonitor::sleepIfEmpty( bool poll )
00161 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty)
00162
00163 if( poll )
00164 return false;
00165
00166 if ( m_readSockets.empty() &&
00167 m_writeSockets.empty() &&
00168 m_connectSockets.empty() )
00169 {
00170 process_sleep( m_timeout );
00171 return true;
00172 }
00173 else
00174 return false;
00175
00176 QF_STACK_POP
00177 }
00178
00179 void SocketMonitor::signal( int socket )
00180 { QF_STACK_PUSH(SocketMonitor::signal)
00181 socket_send( m_signal, (char*)&socket, sizeof(socket) );
00182 QF_STACK_POP
00183 }
00184
00185 void SocketMonitor::unsignal( int s )
00186 { QF_STACK_PUSH(SocketMonitor::unsignal)
00187
00188 Sockets::iterator i = m_writeSockets.find( s );
00189 if( i == m_writeSockets.end() ) return;
00190
00191 m_writeSockets.erase( s );
00192
00193 QF_STACK_POP
00194 }
00195
00196 void SocketMonitor::block( Strategy& strategy, bool poll )
00197 { QF_STACK_PUSH(SocketMonitor::block)
00198
00199 while ( m_dropped.size() )
00200 {
00201 strategy.onError( *this, m_dropped.front() );
00202 m_dropped.pop();
00203 if ( m_dropped.size() == 0 )
00204 return ;
00205 }
00206
00207 fd_set readSet;
00208 FD_ZERO( &readSet );
00209 buildSet( m_readSockets, readSet );
00210 fd_set writeSet;
00211 FD_ZERO( &writeSet );
00212 buildSet( m_connectSockets, writeSet );
00213 buildSet( m_writeSockets, writeSet );
00214
00215 if ( sleepIfEmpty(poll) )
00216 {
00217 strategy.onTimeout( *this );
00218 return ;
00219 }
00220
00221 int result = select( FD_SETSIZE, &readSet, &writeSet, 0, getTimeval(poll) );
00222
00223 if ( result == 0 )
00224 {
00225 strategy.onTimeout( *this );
00226 return;
00227 }
00228 else if ( result > 0 )
00229 {
00230 processWriteSet( strategy, writeSet );
00231 processReadSet( strategy, readSet );
00232 }
00233 #ifndef _MSC_VER
00234 else if ( errno == EBADF )
00235 {
00236 Sockets::iterator i;
00237 Sockets sockets = m_readSockets;
00238 for ( i = sockets.begin(); i != sockets.end(); ++i )
00239 {
00240 if ( socket_isBad( *i ) )
00241 {
00242 m_readSockets.erase( *i );
00243 strategy.onError( *this, *i );
00244 }
00245 }
00246 return ;
00247 }
00248 #endif
00249 else
00250 strategy.onError( *this );
00251
00252 QF_STACK_POP
00253 }
00254
00255 void SocketMonitor::processReadSet( Strategy& strategy, fd_set& readSet )
00256 { QF_STACK_PUSH(SocketMonitor::processReadSet)
00257
00258 #ifdef _MSC_VER
00259 for ( unsigned i = 0; i < readSet.fd_count; ++i )
00260 {
00261 int s = readSet.fd_array[ i ];
00262 if( s == m_interrupt )
00263 {
00264 int socket = 0;
00265 recv( s, (char*)&socket, sizeof(socket), 0 );
00266 addWrite( socket );
00267 }
00268 else
00269 {
00270 strategy.onEvent( *this, s );
00271 }
00272 }
00273 #else
00274 Sockets::iterator i;
00275 Sockets sockets = m_readSockets;
00276 for ( i = sockets.begin(); i != sockets.end(); ++i )
00277 {
00278 int s = *i;
00279 if ( !FD_ISSET( *i, &readSet ) )
00280 continue;
00281 if( s == m_interrupt )
00282 {
00283 int socket = 0;
00284 recv( s, (char*)&socket, sizeof(socket), 0 );
00285 addWrite( socket );
00286 }
00287 else
00288 {
00289 strategy.onEvent( *this, s );
00290 }
00291 }
00292 #endif
00293
00294 QF_STACK_POP
00295 }
00296
00297 void SocketMonitor::processWriteSet( Strategy& strategy, fd_set& writeSet )
00298 { QF_STACK_PUSH(SocketMonitor::processWriteSet)
00299
00300 #ifdef _MSC_VER
00301 for ( unsigned i = 0; i < writeSet.fd_count; ++i )
00302 {
00303 int s = writeSet.fd_array[ i ];
00304 if( m_connectSockets.find(s) != m_connectSockets.end() )
00305 {
00306 m_connectSockets.erase( s );
00307 m_readSockets.insert( s );
00308 strategy.onConnect( *this, s );
00309 }
00310 else
00311 {
00312 strategy.onWrite( *this, s );
00313 }
00314 }
00315 #else
00316 Sockets::iterator i;
00317 Sockets sockets = m_connectSockets;
00318 for( i = sockets.begin(); i != sockets.end(); ++i )
00319 {
00320 int s = *i;
00321 if ( !FD_ISSET( *i, &writeSet ) )
00322 continue;
00323 m_connectSockets.erase( s );
00324 m_readSockets.insert( s );
00325 strategy.onConnect( *this, s );
00326 }
00327
00328 sockets = m_writeSockets;
00329 for( i = sockets.begin(); i != sockets.end(); ++i )
00330 {
00331 int s = *i;
00332 if ( !FD_ISSET( *i, &writeSet ) )
00333 continue;
00334 strategy.onWrite( *this, s );
00335 }
00336 #endif
00337
00338 QF_STACK_POP
00339 }
00340
00341 void SocketMonitor::buildSet( const Sockets& sockets, fd_set& watchSet )
00342 { QF_STACK_PUSH(SocketMonitor::buildSet)
00343
00344 Sockets::const_iterator iter;
00345 for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
00346 FD_SET( *iter, &watchSet );
00347 }
00348
00349 QF_STACK_POP
00350 }
00351 }