Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy   
 

/home/orenmnero/autobuild/quickfix/src/C++/SocketMonitor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketMonitor.h"
00028 #include "Utility.h"
00029 #include <exception>
00030 #include <set>
00031 #include <algorithm>
00032 #include <iostream>
00033 
00034 namespace FIX
00035 {
00036 SocketMonitor::SocketMonitor( int timeout )
00037 : m_timeout( timeout )
00038 {
00039   socket_init();
00040 
00041   std::pair<int, int> sockets = socket_createpair();
00042   m_signal = sockets.first;
00043   m_interrupt = sockets.second;
00044   m_readSockets.insert( m_interrupt );
00045 
00046   m_timeval.tv_sec = 0;
00047   m_timeval.tv_usec = 0;
00048 #ifndef SELECT_DECREMENTS_TIME
00049   m_ticks = clock();
00050 #endif
00051 }
00052 
00053 SocketMonitor::~SocketMonitor()
00054 {
00055   Sockets::iterator i;
00056   for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i )
00057     socket_close( *i );
00058   socket_close( m_signal );
00059   socket_close( m_interrupt );
00060   socket_term();
00061 }
00062 
00063 bool SocketMonitor::addConnect( int s )
00064 { QF_STACK_PUSH(SocketMonitor::addConnect)
00065 
00066   socket_setnonblock( s );
00067   Sockets::iterator i = m_connectSockets.find( s );
00068   if( i != m_connectSockets.end() ) return false;
00069 
00070   m_connectSockets.insert( s );
00071   return true;
00072 
00073   QF_STACK_POP
00074 }
00075 
00076 bool SocketMonitor::addRead( int s )
00077 { QF_STACK_PUSH(SocketMonitor::addRead)
00078 
00079   socket_setnonblock( s );
00080   Sockets::iterator i = m_readSockets.find( s );
00081   if( i != m_readSockets.end() ) return false;
00082 
00083   m_readSockets.insert( s );
00084   return true;
00085 
00086   QF_STACK_POP
00087 }
00088 
00089 bool SocketMonitor::addWrite( int s )
00090 { QF_STACK_PUSH(SocketMonitor::addWrite)
00091 
00092   socket_setnonblock( s );
00093   Sockets::iterator i = m_writeSockets.find( s );
00094   if( i != m_writeSockets.end() ) return false;
00095 
00096   m_writeSockets.insert( s );
00097   return true;
00098 
00099   QF_STACK_POP
00100 }
00101 
00102 bool SocketMonitor::drop( int s )
00103 { QF_STACK_PUSH(SocketMonitor::drop)
00104 
00105   Sockets::iterator i = m_readSockets.find( s );
00106   Sockets::iterator j = m_writeSockets.find( s );
00107   Sockets::iterator k = m_connectSockets.find( s );
00108 
00109   if ( i != m_readSockets.end() || 
00110        j != m_writeSockets.end() ||
00111        k != m_connectSockets.end() )
00112   {
00113     socket_close( s );
00114     m_readSockets.erase( s );
00115     m_writeSockets.erase( s );
00116     m_connectSockets.erase( s );
00117     m_dropped.push( s );
00118     return true;
00119   }
00120   return false;
00121 
00122   QF_STACK_POP
00123 }
00124 
00125 inline timeval* SocketMonitor::getTimeval( bool poll )
00126 { QF_STACK_PUSH(SocketMonitor::getTimeval)
00127 
00128   if ( poll )
00129   {
00130     m_timeval.tv_sec = 0;
00131     m_timeval.tv_usec = 0;
00132     return &m_timeval;
00133   }
00134 
00135   if ( !m_timeout )
00136     return 0;
00137 #ifdef SELECT_MODIFIES_TIMEVAL
00138   if ( !m_timeval.tv_sec && !m_timeval.tv_usec && m_timeout )
00139     m_timeval.tv_sec = m_timeout;
00140   return &m_timeval;
00141 #else
00142 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
00143   if ( elapsed >= m_timeout || elapsed == 0.0 )
00144   {
00145     m_ticks = clock();
00146     m_timeval.tv_sec = 0;
00147     m_timeval.tv_usec = m_timeout * 1000000;
00148   }
00149   else
00150   {
00151     m_timeval.tv_sec = 0;
00152     m_timeval.tv_usec = ( long ) ( ( m_timeout - elapsed ) * 1000000 );
00153   }
00154   return &m_timeval;
00155 #endif
00156 
00157   QF_STACK_POP
00158 }
00159 
00160 bool SocketMonitor::sleepIfEmpty( bool poll )
00161 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty)
00162 
00163   if( poll )
00164     return false;
00165 
00166   if ( m_readSockets.empty() && 
00167        m_writeSockets.empty() &&
00168        m_connectSockets.empty() )
00169   {
00170     process_sleep( m_timeout );
00171     return true;
00172   }
00173   else
00174     return false;
00175 
00176   QF_STACK_POP
00177 }
00178 
00179 void SocketMonitor::signal( int socket )
00180 { QF_STACK_PUSH(SocketMonitor::signal)
00181   socket_send( m_signal, (char*)&socket, sizeof(socket) );
00182   QF_STACK_POP
00183 }
00184 
00185 void SocketMonitor::unsignal( int s )
00186 { QF_STACK_PUSH(SocketMonitor::unsignal)
00187 
00188   Sockets::iterator i = m_writeSockets.find( s );
00189   if( i == m_writeSockets.end() ) return;
00190 
00191   m_writeSockets.erase( s );
00192 
00193   QF_STACK_POP
00194 }
00195 
00196 void SocketMonitor::block( Strategy& strategy, bool poll )
00197 { QF_STACK_PUSH(SocketMonitor::block)
00198 
00199   while ( m_dropped.size() )
00200   {
00201     strategy.onError( *this, m_dropped.front() );
00202     m_dropped.pop();
00203     if ( m_dropped.size() == 0 )
00204       return ;
00205   }
00206 
00207   fd_set readSet;
00208   FD_ZERO( &readSet );
00209   buildSet( m_readSockets, readSet );
00210   fd_set writeSet;
00211   FD_ZERO( &writeSet );
00212   buildSet( m_connectSockets, writeSet );
00213   buildSet( m_writeSockets, writeSet );
00214 
00215   if ( sleepIfEmpty(poll) )
00216   {
00217     strategy.onTimeout( *this );
00218     return ;
00219   }
00220 
00221   int result = select( FD_SETSIZE, &readSet, &writeSet, 0, getTimeval(poll) );
00222 
00223   if ( result == 0 )
00224   {
00225     strategy.onTimeout( *this );
00226     return;
00227   }
00228   else if ( result > 0 )
00229   {
00230     processWriteSet( strategy, writeSet );
00231     processReadSet( strategy, readSet );
00232   }
00233 #ifndef _MSC_VER
00234   else if ( errno == EBADF )
00235   {
00236     Sockets::iterator i;
00237     Sockets sockets = m_readSockets;
00238     for ( i = sockets.begin(); i != sockets.end(); ++i )
00239     {
00240       if ( socket_isBad( *i ) )
00241       {
00242         m_readSockets.erase( *i );
00243         strategy.onError( *this, *i );
00244       }
00245     }
00246     return ;
00247   }
00248 #endif
00249   else
00250     strategy.onError( *this );
00251 
00252   QF_STACK_POP
00253 }
00254 
00255 void SocketMonitor::processReadSet( Strategy& strategy, fd_set& readSet )
00256 { QF_STACK_PUSH(SocketMonitor::processReadSet)
00257 
00258 #ifdef _MSC_VER
00259   for ( unsigned i = 0; i < readSet.fd_count; ++i )
00260   {
00261     int s = readSet.fd_array[ i ];
00262     if( s == m_interrupt )
00263     {
00264       int socket = 0;
00265       recv( s, (char*)&socket, sizeof(socket), 0 );
00266       addWrite( socket );
00267     }
00268     else
00269     {
00270       strategy.onEvent( *this, s );
00271     }
00272   }
00273 #else
00274     Sockets::iterator i;
00275     Sockets sockets = m_readSockets;
00276     for ( i = sockets.begin(); i != sockets.end(); ++i )
00277     {
00278       int s = *i;
00279       if ( !FD_ISSET( *i, &readSet ) )
00280         continue;
00281       if( s == m_interrupt )
00282       {
00283         int socket = 0;
00284         recv( s, (char*)&socket, sizeof(socket), 0 );
00285         addWrite( socket );
00286       }
00287       else
00288       {
00289         strategy.onEvent( *this, s );
00290       }
00291     }
00292 #endif
00293 
00294   QF_STACK_POP
00295 }
00296 
00297 void SocketMonitor::processWriteSet( Strategy& strategy, fd_set& writeSet )
00298 { QF_STACK_PUSH(SocketMonitor::processWriteSet)
00299 
00300 #ifdef _MSC_VER
00301   for ( unsigned i = 0; i < writeSet.fd_count; ++i )
00302   {
00303     int s = writeSet.fd_array[ i ];
00304     if( m_connectSockets.find(s) != m_connectSockets.end() )
00305     {
00306       m_connectSockets.erase( s );
00307       m_readSockets.insert( s );
00308       strategy.onConnect( *this, s );
00309     }
00310     else
00311     {
00312       strategy.onWrite( *this, s );
00313     }
00314   }
00315 #else
00316   Sockets::iterator i;
00317   Sockets sockets = m_connectSockets;
00318   for( i = sockets.begin(); i != sockets.end(); ++i )
00319   {
00320     int s = *i;
00321     if ( !FD_ISSET( *i, &writeSet ) )
00322       continue;
00323     m_connectSockets.erase( s );
00324     m_readSockets.insert( s );
00325     strategy.onConnect( *this, s );
00326   }
00327 
00328   sockets = m_writeSockets;
00329   for( i = sockets.begin(); i != sockets.end(); ++i )
00330   {
00331     int s = *i;
00332     if ( !FD_ISSET( *i, &writeSet ) )
00333       continue;
00334     strategy.onWrite( *this, s );
00335   }
00336 #endif
00337 
00338   QF_STACK_POP
00339 }
00340 
00341 void SocketMonitor::buildSet( const Sockets& sockets, fd_set& watchSet )
00342 { QF_STACK_PUSH(SocketMonitor::buildSet)
00343 
00344   Sockets::const_iterator iter;
00345   for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
00346     FD_SET( *iter, &watchSet );
00347   }
00348 
00349   QF_STACK_POP
00350 }
00351 }

Generated on Mon Jul 24 19:36:30 2006 for QuickFIX by doxygen 1.3.6-20040222 written by Dimitri van Heesch, © 1997-2001