00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "FileStore.h"
00028 #include "SessionID.h"
00029 #include "Parser.h"
00030 #include "Utility.h"
00031 #include <fstream>
00032
00033 namespace FIX
00034 {
00035 FileStore::FileStore( std::string path, const SessionID& s )
00036 : m_msgFile( 0 ), m_headerFile( 0 ), m_seqNumsFile( 0 ), m_sessionFile( 0 )
00037 {
00038 file_mkdir( path.c_str() );
00039
00040 if ( path.empty() ) path = ".";
00041 const std::string& begin =
00042 s.getBeginString().getString();
00043 const std::string& sender =
00044 s.getSenderCompID().getString();
00045 const std::string& target =
00046 s.getTargetCompID().getString();
00047 const std::string& qualifier =
00048 s.getSessionQualifier();
00049
00050 std::string sessionid = begin + "-" + sender + "-" + target;
00051 if( qualifier.size() )
00052 sessionid += "-" + qualifier;
00053
00054 std::string prefix
00055 = file_appendpath(path, sessionid + ".");
00056
00057 m_msgFileName = prefix + "body";
00058 m_headerFileName = prefix + "header";
00059 m_seqNumsFileName = prefix + "seqnums";
00060 m_sessionFileName = prefix + "session";
00061
00062 try
00063 {
00064 open( false );
00065 }
00066 catch ( IOException & e )
00067 {
00068 throw ConfigError( e.what() );
00069 }
00070 }
00071
00072 FileStore::~FileStore()
00073 {
00074 fclose( m_msgFile );
00075 fclose( m_headerFile );
00076 fclose( m_seqNumsFile );
00077 fclose( m_sessionFile );
00078 }
00079
00080 void FileStore::open( bool deleteFile )
00081 { QF_STACK_PUSH(FileStore::open)
00082
00083 if ( m_msgFile ) fclose( m_msgFile );
00084 if ( m_headerFile ) fclose( m_headerFile );
00085 if ( m_seqNumsFile ) fclose( m_seqNumsFile );
00086 if ( m_sessionFile ) fclose( m_sessionFile );
00087
00088 if ( deleteFile )
00089 {
00090 file_unlink( m_msgFileName.c_str() );
00091 file_unlink( m_headerFileName.c_str() );
00092 file_unlink( m_seqNumsFileName.c_str() );
00093 file_unlink( m_sessionFileName.c_str() );
00094 }
00095
00096 populateCache();
00097 m_msgFile = file_fopen( m_msgFileName.c_str(), "r+" );
00098 if ( !m_msgFile ) m_msgFile = file_fopen( m_msgFileName.c_str(), "w+" );
00099 if ( !m_msgFile ) throw ConfigError( "Could not open body file: " + m_msgFileName );
00100
00101 m_headerFile = file_fopen( m_headerFileName.c_str(), "r+" );
00102 if ( !m_headerFile ) m_headerFile = file_fopen( m_headerFileName.c_str(), "w+" );
00103 if ( !m_headerFile ) throw ConfigError( "Could not open header file: " + m_headerFileName );
00104
00105 m_seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "r+" );
00106 if ( !m_seqNumsFile ) m_seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "w+" );
00107 if ( !m_seqNumsFile ) throw ConfigError( "Could not open seqnums file: " + m_seqNumsFileName );
00108
00109 bool setCreationTime = false;
00110 m_sessionFile = file_fopen( m_sessionFileName.c_str(), "r" );
00111 if ( !m_sessionFile ) setCreationTime = true;
00112 else fclose( m_sessionFile );
00113
00114 m_sessionFile = file_fopen( m_sessionFileName.c_str(), "r+" );
00115 if ( !m_sessionFile ) m_sessionFile = file_fopen( m_sessionFileName.c_str(), "w+" );
00116 if ( !m_sessionFile ) throw ConfigError( "Could not open session file" );
00117 if ( setCreationTime ) setSession();
00118
00119 setNextSenderMsgSeqNum( getNextSenderMsgSeqNum() );
00120 setNextTargetMsgSeqNum( getNextTargetMsgSeqNum() );
00121
00122 QF_STACK_POP
00123 }
00124
00125 void FileStore::populateCache()
00126 { QF_STACK_PUSH(FileStore::populateCache)
00127
00128 std::string msg;
00129 Message message;
00130
00131 FILE* headerFile;
00132 headerFile = file_fopen( m_headerFileName.c_str(), "r+" );
00133 if ( headerFile )
00134 {
00135 int num, offset, size;
00136 while ( FILE_FSCANF( headerFile, "%d,%d,%d ", &num, &offset, &size ) == 3 )
00137 m_offsets[ num ] = std::make_pair( offset, size );
00138 fclose( headerFile );
00139 }
00140
00141 FILE* seqNumsFile;
00142 seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "r+" );
00143 if ( seqNumsFile )
00144 {
00145 int sender, target;
00146 if ( FILE_FSCANF( seqNumsFile, "%d : %d", &sender, &target ) == 2 )
00147 {
00148 m_cache.setNextSenderMsgSeqNum( sender );
00149 m_cache.setNextTargetMsgSeqNum( target );
00150 }
00151 fclose( seqNumsFile );
00152 }
00153
00154 FILE* sessionFile;
00155 sessionFile = file_fopen( m_sessionFileName.c_str(), "r+" );
00156 if ( sessionFile )
00157 {
00158 char time[ 22 ];
00159 #ifdef HAVE_FSCANF_S
00160 int result = FILE_FSCANF( sessionFile, "%s", time, 22 );
00161 #else
00162 int result = FILE_FSCANF( sessionFile, "%s", time );
00163 #endif
00164 if( result == 1 )
00165 {
00166 m_cache.setCreationTime( UtcTimeStampConvertor::convert( time, true ) );
00167 }
00168 fclose( sessionFile );
00169 }
00170
00171 QF_STACK_POP
00172 }
00173
00174 MessageStore* FileStoreFactory::create( const SessionID& s )
00175 { QF_STACK_PUSH(FileStoreFactory::create)
00176
00177 if ( m_path.size() ) return new FileStore( m_path, s );
00178
00179 std::string path;
00180 Dictionary settings = m_settings.get( s );
00181 path = settings.getString( FILE_STORE_PATH );
00182 return new FileStore( path, s );
00183
00184 QF_STACK_POP
00185 }
00186
00187 void FileStoreFactory::destroy( MessageStore* pStore )
00188 { QF_STACK_PUSH(FileStoreFactory::destroy)
00189 delete pStore;
00190 QF_STACK_POP
00191 }
00192
00193 bool FileStore::set( int msgSeqNum, const std::string& msg )
00194 throw ( IOException )
00195 { QF_STACK_PUSH(FileStore::set)
00196
00197 if ( fseek( m_msgFile, 0, SEEK_END ) )
00198 throw IOException( "Cannot seek to end of " + m_msgFileName );
00199 if ( fseek( m_headerFile, 0, SEEK_END ) )
00200 throw IOException( "Cannot seek to end of " + m_headerFileName );
00201
00202 int offset = ftell( m_msgFile );
00203 if ( offset < 0 )
00204 throw IOException( "Unable to get file pointer position from " + m_msgFileName );
00205 int size = msg.size();
00206
00207 if ( fprintf( m_headerFile, "%d,%d,%d ", msgSeqNum, offset, size ) < 0 )
00208 throw IOException( "Unable to write to file " + m_headerFileName );
00209 m_offsets[ msgSeqNum ] = std::make_pair( offset, size );
00210 fwrite( msg.c_str(), sizeof( char ), msg.size(), m_msgFile );
00211 if ( ferror( m_msgFile ) )
00212 throw IOException( "Unable to write to file " + m_msgFileName );
00213 if ( fflush( m_msgFile ) == EOF )
00214 throw IOException( "Unable to flush file " + m_msgFileName );
00215 if ( fflush( m_headerFile ) == EOF )
00216 throw IOException( "Unable to flush file " + m_headerFileName );
00217 return true;
00218
00219 QF_STACK_POP
00220 }
00221
00222 void FileStore::get( int begin, int end,
00223 std::vector < std::string > & result ) const
00224 throw ( IOException )
00225 { QF_STACK_PUSH(FileStore::get)
00226
00227 result.clear();
00228 std::string msg;
00229 for ( int i = begin; i <= end; ++i )
00230 {
00231 if ( get( i, msg ) )
00232 result.push_back( msg );
00233 }
00234
00235 QF_STACK_POP
00236 }
00237
00238 int FileStore::getNextSenderMsgSeqNum() const throw ( IOException )
00239 { QF_STACK_PUSH(FileStore::getNextSenderMsgSeqNum)
00240 return m_cache.getNextSenderMsgSeqNum();
00241 QF_STACK_POP
00242 }
00243
00244 int FileStore::getNextTargetMsgSeqNum() const throw ( IOException )
00245 { QF_STACK_PUSH(FileStore::getNextTargetMsgSeqNum)
00246 return m_cache.getNextTargetMsgSeqNum();
00247 QF_STACK_POP
00248 }
00249
00250 void FileStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00251 { QF_STACK_PUSH(FileStore::setNextSenderMsgSeqNum)
00252 m_cache.setNextSenderMsgSeqNum( value );
00253 setSeqNum();
00254 QF_STACK_POP
00255 }
00256
00257 void FileStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00258 { QF_STACK_PUSH(FileStore::setNextTargetMsgSeqNum)
00259 m_cache.setNextTargetMsgSeqNum( value );
00260 setSeqNum();
00261 QF_STACK_POP
00262 }
00263
00264 void FileStore::incrNextSenderMsgSeqNum() throw ( IOException )
00265 { QF_STACK_PUSH(FileStore::incrNextSenderMsgSeqNum)
00266 m_cache.incrNextSenderMsgSeqNum();
00267 setSeqNum();
00268 QF_STACK_POP
00269 }
00270
00271 void FileStore::incrNextTargetMsgSeqNum() throw ( IOException )
00272 { QF_STACK_PUSH(FileStore::incrNextTargetMsgSeqNum)
00273 m_cache.incrNextTargetMsgSeqNum();
00274 setSeqNum();
00275 QF_STACK_POP
00276 }
00277
00278 UtcTimeStamp FileStore::getCreationTime() const throw ( IOException )
00279 { QF_STACK_PUSH(FileStore::getCreationTime)
00280 return m_cache.getCreationTime();
00281 QF_STACK_POP
00282 }
00283
00284 void FileStore::reset() throw ( IOException )
00285 { QF_STACK_PUSH(FileStore::reset)
00286
00287 m_cache.reset();
00288 open( true );
00289 setSession();
00290
00291 QF_STACK_POP
00292 }
00293
00294 void FileStore::refresh() throw ( IOException )
00295 { QF_STACK_PUSH(FileStore::refresh)
00296
00297 m_cache.reset();
00298 open( false );
00299
00300 QF_STACK_POP
00301 }
00302
00303 void FileStore::setSeqNum()
00304 { QF_STACK_PUSH(FileStore::setSeqNum)
00305
00306 rewind( m_seqNumsFile );
00307 fprintf( m_seqNumsFile, "%10.10d : %10.10d",
00308 getNextSenderMsgSeqNum(), getNextTargetMsgSeqNum() );
00309 if ( ferror( m_seqNumsFile ) )
00310 throw IOException( "Unable to write to file " + m_seqNumsFileName );
00311 if ( fflush( m_seqNumsFile ) )
00312 throw IOException( "Unable to flush file " + m_seqNumsFileName );
00313
00314 QF_STACK_POP
00315 }
00316
00317 void FileStore::setSession()
00318 { QF_STACK_PUSH(FileStore::setSession)
00319
00320 rewind( m_sessionFile );
00321 fprintf( m_sessionFile, "%s",
00322 UtcTimeStampConvertor::convert( m_cache.getCreationTime() ).c_str() );
00323 if ( ferror( m_sessionFile ) )
00324 throw IOException( "Unable to write to file " + m_sessionFileName );
00325 if ( fflush( m_sessionFile ) )
00326 throw IOException( "Unable to flush file " + m_sessionFileName );
00327
00328 QF_STACK_POP
00329 }
00330
00331 bool FileStore::get( int msgSeqNum, std::string& msg ) const
00332 throw ( IOException )
00333 { QF_STACK_PUSH(FileStore::get)
00334
00335 NumToOffset::const_iterator find = m_offsets.find( msgSeqNum );
00336 if ( find == m_offsets.end() ) return false;
00337 const OffsetSize& offset = find->second;
00338 if ( fseek( m_msgFile, offset.first, SEEK_SET ) )
00339 throw IOException( "Unable to seek in file " + m_msgFileName );
00340 char* buffer = new char[ offset.second + 1 ];
00341 fread( buffer, sizeof( char ), offset.second, m_msgFile );
00342 if ( ferror( m_msgFile ) )
00343 throw IOException( "Unable to read from file " + m_msgFileName );
00344 buffer[ offset.second ] = 0;
00345 msg = buffer;
00346 delete [] buffer;
00347 return true;
00348
00349 QF_STACK_POP
00350 }
00351
00352 }