Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

SocketMonitor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketMonitor.h"
00028 #include "Utility.h"
00029 #include <exception>
00030 #include <set>
00031 #include <algorithm>
00032 #include <iostream>
00033 
00034 namespace FIX
00035 {
00036 SocketMonitor::SocketMonitor( int timeout )
00037 : m_timeout( timeout )
00038 {
00039   socket_init();
00040 
00041   std::pair<int, int> sockets = socket_createpair();
00042   m_signal = sockets.first;
00043   m_interrupt = sockets.second;
00044   socket_setnonblock( m_signal );
00045   socket_setnonblock( m_interrupt );
00046   m_readSockets.insert( m_interrupt );
00047 
00048   m_timeval.tv_sec = 0;
00049   m_timeval.tv_usec = 0;
00050 #ifndef SELECT_DECREMENTS_TIME
00051   m_ticks = clock();
00052 #endif
00053 }
00054 
00055 SocketMonitor::~SocketMonitor()
00056 {
00057   Sockets::iterator i;
00058   for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i )
00059     socket_close( *i );
00060   socket_close( m_signal );
00061   socket_close( m_interrupt );
00062   socket_term();
00063 }
00064 
00065 bool SocketMonitor::addConnect( int s )
00066 { QF_STACK_PUSH(SocketMonitor::addConnect)
00067 
00068   socket_setnonblock( s );
00069   Sockets::iterator i = m_connectSockets.find( s );
00070   if( i != m_connectSockets.end() ) return false;
00071 
00072   m_connectSockets.insert( s );
00073   return true;
00074 
00075   QF_STACK_POP
00076 }
00077 
00078 bool SocketMonitor::addRead( int s )
00079 { QF_STACK_PUSH(SocketMonitor::addRead)
00080 
00081   socket_setnonblock( s );
00082   Sockets::iterator i = m_readSockets.find( s );
00083   if( i != m_readSockets.end() ) return false;
00084 
00085   m_readSockets.insert( s );
00086   return true;
00087 
00088   QF_STACK_POP
00089 }
00090 
00091 bool SocketMonitor::addWrite( int s )
00092 { QF_STACK_PUSH(SocketMonitor::addWrite)
00093 
00094   if( m_readSockets.find(s) == m_readSockets.end() )
00095     return false;
00096 
00097   socket_setnonblock( s );
00098   Sockets::iterator i = m_writeSockets.find( s );
00099   if( i != m_writeSockets.end() ) return false;
00100 
00101   m_writeSockets.insert( s );
00102   return true;
00103 
00104   QF_STACK_POP
00105 }
00106 
00107 bool SocketMonitor::drop( int s )
00108 { QF_STACK_PUSH(SocketMonitor::drop)
00109 
00110   Sockets::iterator i = m_readSockets.find( s );
00111   Sockets::iterator j = m_writeSockets.find( s );
00112   Sockets::iterator k = m_connectSockets.find( s );
00113 
00114   if ( i != m_readSockets.end() || 
00115        j != m_writeSockets.end() ||
00116        k != m_connectSockets.end() )
00117   {
00118     socket_close( s );
00119     m_readSockets.erase( s );
00120     m_writeSockets.erase( s );
00121     m_connectSockets.erase( s );
00122     m_dropped.push( s );
00123     return true;
00124   }
00125   return false;
00126 
00127   QF_STACK_POP
00128 }
00129 
00130 inline timeval* SocketMonitor::getTimeval( bool poll, double timeout )
00131 { QF_STACK_PUSH(SocketMonitor::getTimeval)
00132 
00133   if ( poll )
00134   {
00135     m_timeval.tv_sec = 0;
00136     m_timeval.tv_usec = 0;
00137     return &m_timeval;
00138   }
00139 
00140   timeout = m_timeout;
00141 
00142   if ( !timeout )
00143     return 0;
00144 #ifdef SELECT_MODIFIES_TIMEVAL
00145   if ( !m_timeval.tv_sec && !m_timeval.tv_usec && timeout )
00146     m_timeval.tv_sec = timeout;
00147   return &m_timeval;
00148 #else
00149 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
00150   if ( elapsed >= timeout || elapsed == 0.0 )
00151   {
00152     m_ticks = clock();
00153     m_timeval.tv_sec = 0;
00154     m_timeval.tv_usec = (long)(timeout * 1000000);
00155   }
00156   else
00157   {
00158     m_timeval.tv_sec = 0;
00159     m_timeval.tv_usec = ( long ) ( ( timeout - elapsed ) * 1000000 );
00160   }
00161   return &m_timeval;
00162 #endif
00163 
00164   QF_STACK_POP
00165 }
00166 
00167 bool SocketMonitor::sleepIfEmpty( bool poll )
00168 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty)
00169 
00170   if( poll )
00171     return false;
00172 
00173   if ( m_readSockets.empty() && 
00174        m_writeSockets.empty() &&
00175        m_connectSockets.empty() )
00176   {
00177     process_sleep( m_timeout );
00178     return true;
00179   }
00180   else
00181     return false;
00182 
00183   QF_STACK_POP
00184 }
00185 
00186 void SocketMonitor::signal( int socket )
00187 { QF_STACK_PUSH(SocketMonitor::signal)
00188   socket_send( m_signal, (char*)&socket, sizeof(socket) );
00189   QF_STACK_POP
00190 }
00191 
00192 void SocketMonitor::unsignal( int s )
00193 { QF_STACK_PUSH(SocketMonitor::unsignal)
00194 
00195   Sockets::iterator i = m_writeSockets.find( s );
00196   if( i == m_writeSockets.end() ) return;
00197 
00198   m_writeSockets.erase( s );
00199 
00200   QF_STACK_POP
00201 }
00202 
00203 void SocketMonitor::block( Strategy& strategy, bool poll, double timeout )
00204 { QF_STACK_PUSH(SocketMonitor::block)
00205 
00206   while ( m_dropped.size() )
00207   {
00208     strategy.onError( *this, m_dropped.front() );
00209     m_dropped.pop();
00210     if ( m_dropped.size() == 0 )
00211       return ;
00212   }
00213 
00214   fd_set readSet;
00215   FD_ZERO( &readSet );
00216   buildSet( m_readSockets, readSet );
00217   fd_set writeSet;
00218   FD_ZERO( &writeSet );
00219   buildSet( m_connectSockets, writeSet );
00220   buildSet( m_writeSockets, writeSet );
00221   fd_set exceptSet;
00222   FD_ZERO( &exceptSet );
00223   buildSet( m_connectSockets, exceptSet );
00224 
00225   if ( sleepIfEmpty(poll) )
00226   {
00227     strategy.onTimeout( *this );
00228     return;
00229   }
00230 
00231   int result = select( FD_SETSIZE, &readSet, &writeSet, &exceptSet, getTimeval(poll, timeout) );
00232 
00233   if ( result == 0 )
00234   {
00235     strategy.onTimeout( *this );
00236     return;
00237   }
00238   else if ( result > 0 )
00239   {
00240     processExceptSet( strategy, exceptSet );
00241     processWriteSet( strategy, writeSet );
00242     processReadSet( strategy, readSet );
00243   }
00244   else
00245   {
00246     strategy.onError( *this );
00247   }
00248 
00249   QF_STACK_POP
00250 }
00251 
00252 void SocketMonitor::processReadSet( Strategy& strategy, fd_set& readSet )
00253 { QF_STACK_PUSH(SocketMonitor::processReadSet)
00254 
00255 #ifdef _MSC_VER
00256   for ( unsigned i = 0; i < readSet.fd_count; ++i )
00257   {
00258     int s = readSet.fd_array[ i ];
00259     if( s == m_interrupt )
00260     {
00261       int socket = 0;
00262       recv( s, (char*)&socket, sizeof(socket), 0 );
00263       addWrite( socket );
00264     }
00265     else
00266     {
00267       strategy.onEvent( *this, s );
00268     }
00269   }
00270 #else
00271     Sockets::iterator i;
00272     Sockets sockets = m_readSockets;
00273     for ( i = sockets.begin(); i != sockets.end(); ++i )
00274     {
00275       int s = *i;
00276       if ( !FD_ISSET( *i, &readSet ) )
00277         continue;
00278       if( s == m_interrupt )
00279       {
00280         int socket = 0;
00281         recv( s, (char*)&socket, sizeof(socket), 0 );
00282         addWrite( socket );
00283       }
00284       else
00285       {
00286         strategy.onEvent( *this, s );
00287       }
00288     }
00289 #endif
00290 
00291   QF_STACK_POP
00292 }
00293 
00294 void SocketMonitor::processWriteSet( Strategy& strategy, fd_set& writeSet )
00295 { QF_STACK_PUSH(SocketMonitor::processWriteSet)
00296 
00297 #ifdef _MSC_VER
00298   for ( unsigned i = 0; i < writeSet.fd_count; ++i )
00299   {
00300     int s = writeSet.fd_array[ i ];
00301     if( m_connectSockets.find(s) != m_connectSockets.end() )
00302     {
00303       m_connectSockets.erase( s );
00304       m_readSockets.insert( s );
00305       strategy.onConnect( *this, s );
00306     }
00307     else
00308     {
00309       strategy.onWrite( *this, s );
00310     }
00311   }
00312 #else
00313   Sockets::iterator i;
00314   Sockets sockets = m_connectSockets;
00315   for( i = sockets.begin(); i != sockets.end(); ++i )
00316   {
00317     int s = *i;
00318     if ( !FD_ISSET( *i, &writeSet ) )
00319       continue;
00320     m_connectSockets.erase( s );
00321     m_readSockets.insert( s );
00322     strategy.onConnect( *this, s );
00323   }
00324 
00325   sockets = m_writeSockets;
00326   for( i = sockets.begin(); i != sockets.end(); ++i )
00327   {
00328     int s = *i;
00329     if ( !FD_ISSET( *i, &writeSet ) )
00330       continue;
00331     strategy.onWrite( *this, s );
00332   }
00333 #endif
00334 
00335   QF_STACK_POP
00336 }
00337 
00338 void SocketMonitor::processExceptSet( Strategy& strategy, fd_set& exceptSet )
00339 { QF_STACK_PUSH(SocketMonitor::processExceptSet)
00340 
00341 #ifdef _MSC_VER
00342   for ( unsigned i = 0; i < exceptSet.fd_count; ++i )
00343   {
00344     int s = exceptSet.fd_array[ i ];
00345     strategy.onError( *this, s );
00346   }
00347 #else
00348     Sockets::iterator i;
00349     Sockets sockets = m_connectSockets;
00350     for ( i = sockets.begin(); i != sockets.end(); ++i )
00351     {
00352       int s = *i;
00353       if ( !FD_ISSET( *i, &exceptSet ) )
00354         continue;
00355       strategy.onError( *this, s );
00356     }
00357 #endif
00358 
00359   QF_STACK_POP
00360 }
00361 
00362 void SocketMonitor::buildSet( const Sockets& sockets, fd_set& watchSet )
00363 { QF_STACK_PUSH(SocketMonitor::buildSet)
00364 
00365   Sockets::const_iterator iter;
00366   for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
00367     FD_SET( *iter, &watchSet );
00368   }
00369   QF_STACK_POP
00370 }
00371 }

Generated on Mon Mar 1 13:41:38 2010 for QuickFIX by doxygen 1.5.8 written by Dimitri van Heesch, © 1997-2001