00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "FileStore.h"
00028 #include "SessionID.h"
00029 #include "Parser.h"
00030 #include "Utility.h"
00031 #include <fstream>
00032
00033 namespace FIX
00034 {
00035 FileStore::FileStore( std::string path, const SessionID& s )
00036 : m_msgFile( 0 ), m_headerFile( 0 ), m_seqNumsFile( 0 ), m_sessionFile( 0 )
00037 {
00038 file_mkdir( path.c_str() );
00039
00040 if ( path.empty() ) path = ".";
00041 const std::string& begin =
00042 s.getBeginString().getString();
00043 const std::string& sender =
00044 s.getSenderCompID().getString();
00045 const std::string& target =
00046 s.getTargetCompID().getString();
00047 const std::string& qualifier =
00048 s.getSessionQualifier();
00049
00050 std::string sessionid = begin + "-" + sender + "-" + target;
00051 if( qualifier.size() )
00052 sessionid += "-" + qualifier;
00053
00054 std::string prefix
00055 = file_appendpath(path, sessionid + ".");
00056
00057 m_msgFileName = prefix + "body";
00058 m_headerFileName = prefix + "header";
00059 m_seqNumsFileName = prefix + "seqnums";
00060 m_sessionFileName = prefix + "session";
00061
00062 try
00063 {
00064 open( false );
00065 }
00066 catch ( IOException & e )
00067 {
00068 throw ConfigError( e.what() );
00069 }
00070 }
00071
00072 FileStore::~FileStore()
00073 {
00074 fclose( m_msgFile );
00075 fclose( m_headerFile );
00076 fclose( m_seqNumsFile );
00077 fclose( m_sessionFile );
00078 }
00079
00080 void FileStore::open( bool deleteFile )
00081 { QF_STACK_PUSH(FileStore::open)
00082
00083 if ( m_msgFile ) fclose( m_msgFile );
00084 if ( m_headerFile ) fclose( m_headerFile );
00085 if ( m_seqNumsFile ) fclose( m_seqNumsFile );
00086 if ( m_sessionFile ) fclose( m_sessionFile );
00087
00088 if ( deleteFile )
00089 {
00090 file_unlink( m_msgFileName.c_str() );
00091 file_unlink( m_headerFileName.c_str() );
00092 file_unlink( m_seqNumsFileName.c_str() );
00093 file_unlink( m_sessionFileName.c_str() );
00094 }
00095
00096 populateCache();
00097
00098 m_msgFile = file_fopen( m_msgFileName.c_str(), "r+" );
00099 if ( !m_msgFile ) m_msgFile = file_fopen( m_msgFileName.c_str(), "w+" );
00100 if ( !m_msgFile ) throw ConfigError( "Could not open body file" );
00101
00102 m_headerFile = file_fopen( m_headerFileName.c_str(), "r+" );
00103 if ( !m_headerFile ) m_headerFile = file_fopen( m_headerFileName.c_str(), "w+" );
00104 if ( !m_headerFile ) throw ConfigError( "Could not open header file" );
00105
00106 m_seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "r+" );
00107 if ( !m_seqNumsFile ) m_seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "w+" );
00108 if ( !m_seqNumsFile ) throw ConfigError( "Could not open seqnums file" );
00109
00110 bool setCreationTime = false;
00111 m_sessionFile = file_fopen( m_sessionFileName.c_str(), "r" );
00112 if ( !m_sessionFile ) setCreationTime = true;
00113 else fclose( m_sessionFile );
00114
00115 m_sessionFile = file_fopen( m_sessionFileName.c_str(), "r+" );
00116 if ( !m_sessionFile ) m_sessionFile = file_fopen( m_sessionFileName.c_str(), "w+" );
00117 if ( !m_sessionFile ) throw ConfigError( "Could not open session file" );
00118 if ( setCreationTime ) setSession();
00119
00120 setNextSenderMsgSeqNum( getNextSenderMsgSeqNum() );
00121 setNextTargetMsgSeqNum( getNextTargetMsgSeqNum() );
00122
00123 QF_STACK_POP
00124 }
00125
00126 void FileStore::populateCache()
00127 { QF_STACK_PUSH(FileStore::populateCache)
00128
00129 std::string msg;
00130 Message message;
00131
00132 FILE* headerFile;
00133 headerFile = file_fopen( m_headerFileName.c_str(), "r+" );
00134 if ( headerFile )
00135 {
00136 int num, offset, size;
00137 while ( FILE_FSCANF( headerFile, "%d,%d,%d ", &num, &offset, &size ) == 3 )
00138 m_offsets[ num ] = std::make_pair( offset, size );
00139 fclose( headerFile );
00140 }
00141
00142 FILE* seqNumsFile;
00143 seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "r+" );
00144 if ( seqNumsFile )
00145 {
00146 int sender, target;
00147 if ( FILE_FSCANF( seqNumsFile, "%d : %d", &sender, &target ) == 2 )
00148 {
00149 m_cache.setNextSenderMsgSeqNum( sender );
00150 m_cache.setNextTargetMsgSeqNum( target );
00151 }
00152 fclose( seqNumsFile );
00153 }
00154
00155 FILE* sessionFile;
00156 sessionFile = file_fopen( m_sessionFileName.c_str(), "r+" );
00157 if ( sessionFile )
00158 {
00159 char time[ 22 ];
00160 #ifdef HAVE_FSCANF_S
00161 int result = FILE_FSCANF( sessionFile, "%s", time, 22 );
00162 #else
00163 int result = FILE_FSCANF( sessionFile, "%s", time );
00164 #endif
00165 if( result == 1 )
00166 {
00167 m_cache.setCreationTime( UtcTimeStampConvertor::convert( time, true ) );
00168 }
00169 fclose( sessionFile );
00170 }
00171
00172 QF_STACK_POP
00173 }
00174
00175 MessageStore* FileStoreFactory::create( const SessionID& s )
00176 { QF_STACK_PUSH(FileStoreFactory::create)
00177
00178 if ( m_path.size() ) return new FileStore( m_path, s );
00179
00180 std::string path;
00181 Dictionary settings = m_settings.get( s );
00182 path = settings.getString( FILE_STORE_PATH );
00183 return new FileStore( path, s );
00184
00185 QF_STACK_POP
00186 }
00187
00188 void FileStoreFactory::destroy( MessageStore* pStore )
00189 { QF_STACK_PUSH(FileStoreFactory::destroy)
00190 delete pStore;
00191 QF_STACK_POP
00192 }
00193
00194 bool FileStore::set( int msgSeqNum, const std::string& msg )
00195 throw ( IOException )
00196 { QF_STACK_PUSH(FileStore::set)
00197
00198 if ( fseek( m_msgFile, 0, SEEK_END ) )
00199 throw IOException( "Cannot seek to end of " + m_msgFileName );
00200 if ( fseek( m_headerFile, 0, SEEK_END ) )
00201 throw IOException( "Cannot seek to end of " + m_headerFileName );
00202
00203 int offset = ftell( m_msgFile );
00204 if ( offset < 0 )
00205 throw IOException( "Unable to get file pointer position from " + m_msgFileName );
00206 int size = msg.size();
00207
00208 if ( fprintf( m_headerFile, "%d,%d,%d ", msgSeqNum, offset, size ) < 0 )
00209 throw IOException( "Unable to write to file " + m_headerFileName );
00210 m_offsets[ msgSeqNum ] = std::make_pair( offset, size );
00211 fwrite( msg.c_str(), sizeof( char ), msg.size(), m_msgFile );
00212 if ( ferror( m_msgFile ) )
00213 throw IOException( "Unable to write to file " + m_msgFileName );
00214 if ( fflush( m_msgFile ) == EOF )
00215 throw IOException( "Unable to flush file " + m_msgFileName );
00216 if ( fflush( m_headerFile ) == EOF )
00217 throw IOException( "Unable to flush file " + m_headerFileName );
00218 return true;
00219
00220 QF_STACK_POP
00221 }
00222
00223 void FileStore::get( int begin, int end,
00224 std::vector < std::string > & result ) const
00225 throw ( IOException )
00226 { QF_STACK_PUSH(FileStore::get)
00227
00228 result.clear();
00229 std::string msg;
00230 for ( int i = begin; i <= end; ++i )
00231 {
00232 if ( get( i, msg ) )
00233 result.push_back( msg );
00234 }
00235
00236 QF_STACK_POP
00237 }
00238
00239 int FileStore::getNextSenderMsgSeqNum() const throw ( IOException )
00240 { QF_STACK_PUSH(FileStore::getNextSenderMsgSeqNum)
00241 return m_cache.getNextSenderMsgSeqNum();
00242 QF_STACK_POP
00243 }
00244
00245 int FileStore::getNextTargetMsgSeqNum() const throw ( IOException )
00246 { QF_STACK_PUSH(FileStore::getNextTargetMsgSeqNum)
00247 return m_cache.getNextTargetMsgSeqNum();
00248 QF_STACK_POP
00249 }
00250
00251 void FileStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00252 { QF_STACK_PUSH(FileStore::setNextSenderMsgSeqNum)
00253 m_cache.setNextSenderMsgSeqNum( value );
00254 setSeqNum();
00255 QF_STACK_POP
00256 }
00257
00258 void FileStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00259 { QF_STACK_PUSH(FileStore::setNextTargetMsgSeqNum)
00260 m_cache.setNextTargetMsgSeqNum( value );
00261 setSeqNum();
00262 QF_STACK_POP
00263 }
00264
00265 void FileStore::incrNextSenderMsgSeqNum() throw ( IOException )
00266 { QF_STACK_PUSH(FileStore::incrNextSenderMsgSeqNum)
00267 m_cache.incrNextSenderMsgSeqNum();
00268 setSeqNum();
00269 QF_STACK_POP
00270 }
00271
00272 void FileStore::incrNextTargetMsgSeqNum() throw ( IOException )
00273 { QF_STACK_PUSH(FileStore::incrNextTargetMsgSeqNum)
00274 m_cache.incrNextTargetMsgSeqNum();
00275 setSeqNum();
00276 QF_STACK_POP
00277 }
00278
00279 UtcTimeStamp FileStore::getCreationTime() const throw ( IOException )
00280 { QF_STACK_PUSH(FileStore::getCreationTime)
00281 return m_cache.getCreationTime();
00282 QF_STACK_POP
00283 }
00284
00285 void FileStore::reset() throw ( IOException )
00286 { QF_STACK_PUSH(FileStore::reset)
00287
00288 m_cache.reset();
00289 open( true );
00290 setSession();
00291
00292 QF_STACK_POP
00293 }
00294
00295 void FileStore::refresh() throw ( IOException )
00296 { QF_STACK_PUSH(FileStore::refresh)
00297
00298 m_cache.reset();
00299 open( false );
00300
00301 QF_STACK_POP
00302 }
00303
00304 void FileStore::setSeqNum()
00305 { QF_STACK_PUSH(FileStore::setSeqNum)
00306
00307 rewind( m_seqNumsFile );
00308 fprintf( m_seqNumsFile, "%10.10d : %10.10d",
00309 getNextSenderMsgSeqNum(), getNextTargetMsgSeqNum() );
00310 if ( ferror( m_seqNumsFile ) )
00311 throw IOException( "Unable to write to file " + m_seqNumsFileName );
00312 if ( fflush( m_seqNumsFile ) )
00313 throw IOException( "Unable to flush file " + m_seqNumsFileName );
00314
00315 QF_STACK_POP
00316 }
00317
00318 void FileStore::setSession()
00319 { QF_STACK_PUSH(FileStore::setSession)
00320
00321 rewind( m_sessionFile );
00322 fprintf( m_sessionFile, "%s",
00323 UtcTimeStampConvertor::convert( m_cache.getCreationTime() ).c_str() );
00324 if ( ferror( m_sessionFile ) )
00325 throw IOException( "Unable to write to file " + m_sessionFileName );
00326 if ( fflush( m_sessionFile ) )
00327 throw IOException( "Unable to flush file " + m_sessionFileName );
00328
00329 QF_STACK_POP
00330 }
00331
00332 bool FileStore::get( int msgSeqNum, std::string& msg ) const
00333 throw ( IOException )
00334 { QF_STACK_PUSH(FileStore::get)
00335
00336 NumToOffset::const_iterator find = m_offsets.find( msgSeqNum );
00337 if ( find == m_offsets.end() ) return false;
00338 const OffsetSize& offset = find->second;
00339 if ( fseek( m_msgFile, offset.first, SEEK_SET ) )
00340 throw IOException( "Unable to seek in file " + m_msgFileName );
00341 char* buffer = new char[ offset.second + 1 ];
00342 fread( buffer, sizeof( char ), offset.second, m_msgFile );
00343 if ( ferror( m_msgFile ) )
00344 throw IOException( "Unable to read from file " + m_msgFileName );
00345 buffer[ offset.second ] = 0;
00346 msg = buffer;
00347 delete [] buffer;
00348 return true;
00349
00350 QF_STACK_POP
00351 }
00352
00353 }