SocketMonitor.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketMonitor.h"
00028 #include "Utility.h"
00029 #include <exception>
00030 #include <set>
00031 #include <algorithm>
00032 #include <iostream>
00033
00034 namespace FIX
00035 {
00036 SocketMonitor::SocketMonitor( int timeout )
00037 : m_timeout( timeout )
00038 {
00039 socket_init();
00040
00041 std::pair<int, int> sockets = socket_createpair();
00042 m_signal = sockets.first;
00043 m_interrupt = sockets.second;
00044 socket_setnonblock( m_signal );
00045 socket_setnonblock( m_interrupt );
00046 m_readSockets.insert( m_interrupt );
00047
00048 m_timeval.tv_sec = 0;
00049 m_timeval.tv_usec = 0;
00050 #ifndef SELECT_DECREMENTS_TIME
00051 m_ticks = clock();
00052 #endif
00053 }
00054
00055 SocketMonitor::~SocketMonitor()
00056 {
00057 Sockets::iterator i;
00058 for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i )
00059 socket_close( *i );
00060 socket_close( m_signal );
00061 socket_close( m_interrupt );
00062 socket_term();
00063 }
00064
00065 bool SocketMonitor::addConnect( int s )
00066 { QF_STACK_PUSH(SocketMonitor::addConnect)
00067
00068 socket_setnonblock( s );
00069 Sockets::iterator i = m_connectSockets.find( s );
00070 if( i != m_connectSockets.end() ) return false;
00071
00072 m_connectSockets.insert( s );
00073 return true;
00074
00075 QF_STACK_POP
00076 }
00077
00078 bool SocketMonitor::addRead( int s )
00079 { QF_STACK_PUSH(SocketMonitor::addRead)
00080
00081 socket_setnonblock( s );
00082 Sockets::iterator i = m_readSockets.find( s );
00083 if( i != m_readSockets.end() ) return false;
00084
00085 m_readSockets.insert( s );
00086 return true;
00087
00088 QF_STACK_POP
00089 }
00090
00091 bool SocketMonitor::addWrite( int s )
00092 { QF_STACK_PUSH(SocketMonitor::addWrite)
00093
00094 if( m_readSockets.find(s) == m_readSockets.end() )
00095 return false;
00096
00097 socket_setnonblock( s );
00098 Sockets::iterator i = m_writeSockets.find( s );
00099 if( i != m_writeSockets.end() ) return false;
00100
00101 m_writeSockets.insert( s );
00102 return true;
00103
00104 QF_STACK_POP
00105 }
00106
00107 bool SocketMonitor::drop( int s )
00108 { QF_STACK_PUSH(SocketMonitor::drop)
00109
00110 Sockets::iterator i = m_readSockets.find( s );
00111 Sockets::iterator j = m_writeSockets.find( s );
00112 Sockets::iterator k = m_connectSockets.find( s );
00113
00114 if ( i != m_readSockets.end() ||
00115 j != m_writeSockets.end() ||
00116 k != m_connectSockets.end() )
00117 {
00118 socket_close( s );
00119 m_readSockets.erase( s );
00120 m_writeSockets.erase( s );
00121 m_connectSockets.erase( s );
00122 m_dropped.push( s );
00123 return true;
00124 }
00125 return false;
00126
00127 QF_STACK_POP
00128 }
00129
00130 inline timeval* SocketMonitor::getTimeval( bool poll, double timeout )
00131 { QF_STACK_PUSH(SocketMonitor::getTimeval)
00132
00133 if ( poll )
00134 {
00135 m_timeval.tv_sec = 0;
00136 m_timeval.tv_usec = 0;
00137 return &m_timeval;
00138 }
00139
00140 timeout = m_timeout;
00141
00142 if ( !timeout )
00143 return 0;
00144 #ifdef SELECT_MODIFIES_TIMEVAL
00145 if ( !m_timeval.tv_sec && !m_timeval.tv_usec && timeout )
00146 m_timeval.tv_sec = timeout;
00147 return &m_timeval;
00148 #else
00149 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
00150 if ( elapsed >= timeout || elapsed == 0.0 )
00151 {
00152 m_ticks = clock();
00153 m_timeval.tv_sec = 0;
00154 m_timeval.tv_usec = (long)(timeout * 1000000);
00155 }
00156 else
00157 {
00158 m_timeval.tv_sec = 0;
00159 m_timeval.tv_usec = ( long ) ( ( timeout - elapsed ) * 1000000 );
00160 }
00161 return &m_timeval;
00162 #endif
00163
00164 QF_STACK_POP
00165 }
00166
00167 bool SocketMonitor::sleepIfEmpty( bool poll )
00168 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty)
00169
00170 if( poll )
00171 return false;
00172
00173 if ( m_readSockets.empty() &&
00174 m_writeSockets.empty() &&
00175 m_connectSockets.empty() )
00176 {
00177 process_sleep( m_timeout );
00178 return true;
00179 }
00180 else
00181 return false;
00182
00183 QF_STACK_POP
00184 }
00185
00186 void SocketMonitor::signal( int socket )
00187 { QF_STACK_PUSH(SocketMonitor::signal)
00188 socket_send( m_signal, (char*)&socket, sizeof(socket) );
00189 QF_STACK_POP
00190 }
00191
00192 void SocketMonitor::unsignal( int s )
00193 { QF_STACK_PUSH(SocketMonitor::unsignal)
00194
00195 Sockets::iterator i = m_writeSockets.find( s );
00196 if( i == m_writeSockets.end() ) return;
00197
00198 m_writeSockets.erase( s );
00199
00200 QF_STACK_POP
00201 }
00202
00203 void SocketMonitor::block( Strategy& strategy, bool poll, double timeout )
00204 { QF_STACK_PUSH(SocketMonitor::block)
00205
00206 while ( m_dropped.size() )
00207 {
00208 strategy.onError( *this, m_dropped.front() );
00209 m_dropped.pop();
00210 if ( m_dropped.size() == 0 )
00211 return ;
00212 }
00213
00214 fd_set readSet;
00215 FD_ZERO( &readSet );
00216 buildSet( m_readSockets, readSet );
00217 fd_set writeSet;
00218 FD_ZERO( &writeSet );
00219 buildSet( m_connectSockets, writeSet );
00220 buildSet( m_writeSockets, writeSet );
00221 fd_set exceptSet;
00222 FD_ZERO( &exceptSet );
00223 buildSet( m_connectSockets, exceptSet );
00224
00225 if ( sleepIfEmpty(poll) )
00226 {
00227 strategy.onTimeout( *this );
00228 return;
00229 }
00230
00231 int result = select( FD_SETSIZE, &readSet, &writeSet, &exceptSet, getTimeval(poll, timeout) );
00232
00233 if ( result == 0 )
00234 {
00235 strategy.onTimeout( *this );
00236 return;
00237 }
00238 else if ( result > 0 )
00239 {
00240 processExceptSet( strategy, exceptSet );
00241 processWriteSet( strategy, writeSet );
00242 processReadSet( strategy, readSet );
00243 }
00244 else
00245 {
00246 strategy.onError( *this );
00247 }
00248
00249 QF_STACK_POP
00250 }
00251
00252 void SocketMonitor::processReadSet( Strategy& strategy, fd_set& readSet )
00253 { QF_STACK_PUSH(SocketMonitor::processReadSet)
00254
00255 #ifdef _MSC_VER
00256 for ( unsigned i = 0; i < readSet.fd_count; ++i )
00257 {
00258 int s = readSet.fd_array[ i ];
00259 if( s == m_interrupt )
00260 {
00261 int socket = 0;
00262 recv( s, (char*)&socket, sizeof(socket), 0 );
00263 addWrite( socket );
00264 }
00265 else
00266 {
00267 strategy.onEvent( *this, s );
00268 }
00269 }
00270 #else
00271 Sockets::iterator i;
00272 Sockets sockets = m_readSockets;
00273 for ( i = sockets.begin(); i != sockets.end(); ++i )
00274 {
00275 int s = *i;
00276 if ( !FD_ISSET( *i, &readSet ) )
00277 continue;
00278 if( s == m_interrupt )
00279 {
00280 int socket = 0;
00281 recv( s, (char*)&socket, sizeof(socket), 0 );
00282 addWrite( socket );
00283 }
00284 else
00285 {
00286 strategy.onEvent( *this, s );
00287 }
00288 }
00289 #endif
00290
00291 QF_STACK_POP
00292 }
00293
00294 void SocketMonitor::processWriteSet( Strategy& strategy, fd_set& writeSet )
00295 { QF_STACK_PUSH(SocketMonitor::processWriteSet)
00296
00297 #ifdef _MSC_VER
00298 for ( unsigned i = 0; i < writeSet.fd_count; ++i )
00299 {
00300 int s = writeSet.fd_array[ i ];
00301 if( m_connectSockets.find(s) != m_connectSockets.end() )
00302 {
00303 m_connectSockets.erase( s );
00304 m_readSockets.insert( s );
00305 strategy.onConnect( *this, s );
00306 }
00307 else
00308 {
00309 strategy.onWrite( *this, s );
00310 }
00311 }
00312 #else
00313 Sockets::iterator i;
00314 Sockets sockets = m_connectSockets;
00315 for( i = sockets.begin(); i != sockets.end(); ++i )
00316 {
00317 int s = *i;
00318 if ( !FD_ISSET( *i, &writeSet ) )
00319 continue;
00320 m_connectSockets.erase( s );
00321 m_readSockets.insert( s );
00322 strategy.onConnect( *this, s );
00323 }
00324
00325 sockets = m_writeSockets;
00326 for( i = sockets.begin(); i != sockets.end(); ++i )
00327 {
00328 int s = *i;
00329 if ( !FD_ISSET( *i, &writeSet ) )
00330 continue;
00331 strategy.onWrite( *this, s );
00332 }
00333 #endif
00334
00335 QF_STACK_POP
00336 }
00337
00338 void SocketMonitor::processExceptSet( Strategy& strategy, fd_set& exceptSet )
00339 { QF_STACK_PUSH(SocketMonitor::processExceptSet)
00340
00341 #ifdef _MSC_VER
00342 for ( unsigned i = 0; i < exceptSet.fd_count; ++i )
00343 {
00344 int s = exceptSet.fd_array[ i ];
00345 strategy.onError( *this, s );
00346 }
00347 #else
00348 Sockets::iterator i;
00349 Sockets sockets = m_connectSockets;
00350 for ( i = sockets.begin(); i != sockets.end(); ++i )
00351 {
00352 int s = *i;
00353 if ( !FD_ISSET( *i, &exceptSet ) )
00354 continue;
00355 strategy.onError( *this, s );
00356 }
00357 #endif
00358
00359 QF_STACK_POP
00360 }
00361
00362 void SocketMonitor::buildSet( const Sockets& sockets, fd_set& watchSet )
00363 { QF_STACK_PUSH(SocketMonitor::buildSet)
00364
00365 Sockets::const_iterator iter;
00366 for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
00367 FD_SET( *iter, &watchSet );
00368 }
00369 QF_STACK_POP
00370 }
00371 }