Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

Bank_AMOS.cc

Go to the documentation of this file.
00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 #include "Bank_AMOS.hh" 00011 #include "Message_AMOS.hh" 00012 #include <sstream> 00013 #include <sys/types.h> 00014 #include <sys/stat.h> 00015 #include <fcntl.h> 00016 #include <unistd.h> 00017 #include <cstdlib> 00018 #include <cstdio> 00019 #include <ctime> 00020 #include <cstring> 00021 #include <sstream> 00022 #include <iostream> 00023 using namespace AMOS; 00024 using namespace std; 00025 00026 #define _PUBSETBUF_ pubsetbuf 00027 #ifdef __GNUC__ 00028 # if __GNUC__ < 3 00029 # undef _PUBSETBUF_ 00030 # define _PUBSETBUF_ setbuf 00031 # endif 00032 #endif 00033 00034 #define LOCK_TIME 5 00035 #define DIR_MODE 00755 00036 #define FILE_MODE 00644 00037 00038 00039 00040 00041 //================================================ Bank_t ====================== 00042 const Size_t Bank_t::DEFAULT_BUFFER_SIZE = 1024; 00043 const Size_t Bank_t::DEFAULT_PARTITION_SIZE = 1000000; 00044 const Size_t Bank_t::MAX_OPEN_PARTITIONS = 20; 00045 00046 const string Bank_t::BANK_VERSION = "2.8"; 00047 00048 const string Bank_t::FIX_STORE_SUFFIX = ".fix"; 00049 const string Bank_t::IFO_STORE_SUFFIX = ".ifo"; 00050 const string Bank_t::LCK_STORE_SUFFIX = ".lck"; 00051 const string Bank_t::VAR_STORE_SUFFIX = ".var"; 00052 const string Bank_t::MAP_STORE_SUFFIX = ".map"; 00053 const string Bank_t::TMP_STORE_SUFFIX = ".tmp"; 00054 00055 const char Bank_t::WRITE_LOCK_CHAR = 'w'; 00056 const char Bank_t::READ_LOCK_CHAR = 'r'; 00057 00058 00059 //----------------------------------------------------- addPartition ----------- 00060 void Bank_t::addPartition (bool create) 00061 { 00062 //-- Allocate the new partition and add it to the list 00063 BankPartition_t * partition = new BankPartition_t (buffer_size_m); 00064 partitions_m . push_back (partition); 00065 00066 try { 00067 ostringstream ss; 00068 00069 ss << store_pfx_m << '.' << npartitions_m << FIX_STORE_SUFFIX; 00070 partition -> fix_name = ss . str( ); 00071 ss . str (NULL_STRING); 00072 00073 ss << store_pfx_m << '.' << npartitions_m << VAR_STORE_SUFFIX; 00074 partition -> var_name = ss . str( ); 00075 ss . str (NULL_STRING); 00076 00077 //-- Try to create/open the FIX and VAR partition files 00078 touchFile (partition -> fix_name, FILE_MODE, create); 00079 touchFile (partition -> var_name, FILE_MODE, create); 00080 } 00081 catch (Exception_t) { 00082 partitions_m . pop_back( ); 00083 delete partition; 00084 throw; 00085 } 00086 00087 //-- If partition size is unset, use the default 00088 if ( partition_size_m == 0 ) 00089 partition_size_m = DEFAULT_PARTITION_SIZE; 00090 00091 //-- New partition, so new max index and number of partitions 00092 max_bid_m = ++ npartitions_m * partition_size_m; 00093 } 00094 00095 00096 //----------------------------------------------------- append ----------------- 00097 void Bank_t::append (IBankable_t & obj) 00098 { 00099 //-- Insert the ID triple into the map (may throw exception) 00100 idmap_m . insert (obj . iid_m, obj . eid_m, last_bid_m + 1); 00101 00102 try { 00103 appendBID (obj); 00104 } 00105 catch (Exception_t) { 00106 idmap_m . remove (obj . iid_m); 00107 idmap_m . remove (obj . eid_m); 00108 throw; 00109 } 00110 } 00111 00112 00113 //----------------------------------------------------- appendBID -------------- 00114 void Bank_t::appendBID (IBankable_t & obj) 00115 { 00116 if ( ! is_open_m || ! (mode_m & B_WRITE) ) 00117 AMOS_THROW_IO ("Cannot append, bank not open for writing"); 00118 if ( banktype_m != obj.getNCode( ) ) 00119 AMOS_THROW_ARGUMENT ("Cannot append, incompatible object type"); 00120 00121 //-- Add another partition if necessary 00122 if ( last_bid_m == max_bid_m ) 00123 addPartition (true); 00124 00125 BankPartition_t * partition = getLastPartition( ); 00126 00127 //-- Prepare the object for append 00128 obj . flags_m . is_removed = false; 00129 obj . flags_m . is_modified = false; 00130 00131 //-- data is written in the following order to the FIX and VAR streams 00132 // FIX = [VAR streampos] [BankableFlags] [OBJECT FIX] [VAR size] 00133 // VAR = [OBJECT VAR] 00134 partition -> fix . seekp (0, ios::end); 00135 partition -> var . seekp (0, ios::end); 00136 bankstreamoff fpos = partition -> fix . tellp( ); 00137 bankstreamoff vpos = partition -> var . tellp( ); 00138 writeLE (partition -> fix, &vpos); 00139 writeLE (partition -> fix, &(obj . flags_m)); 00140 obj . writeRecord (partition -> fix, partition -> var); 00141 Size_t vsize = (std::streamoff)partition -> var . tellp( ) - vpos; 00142 writeLE (partition -> fix, &vsize); 00143 00144 //-- If fix_size is not yet known, calculate it 00145 Size_t fsize = (std::streamoff)partition -> fix . tellp( ) - fpos; 00146 if ( fix_size_m == 0 ) 00147 fix_size_m = fsize; 00148 00149 if ( fix_size_m != fsize || 00150 ! partition -> fix . good( ) || 00151 ! partition -> var . good( ) ) 00152 AMOS_THROW_IO ("Unknown file write error in append, bank corrupted"); 00153 00154 ++ nbids_m; 00155 ++ last_bid_m; 00156 } 00157 00158 00159 //----------------------------------------------------- assignEID -------------- 00160 void Bank_t::assignEID (ID_t iid, const string & eid) 00161 { 00162 ID_t bid = lookupBID (iid); 00163 string peid (idmap_m . lookupEID (iid)); 00164 idmap_m . remove (iid); 00165 00166 try { 00167 idmap_m . insert (iid, eid, bid); 00168 } 00169 catch (Exception_t) { 00170 idmap_m . insert (iid, peid, bid); 00171 throw; 00172 } 00173 } 00174 00175 00176 //----------------------------------------------------- assignIID -------------- 00177 void Bank_t::assignIID (const string & eid, ID_t iid) 00178 { 00179 ID_t bid = lookupBID (eid); 00180 ID_t piid = idmap_m . lookupIID (eid); 00181 idmap_m . remove (eid); 00182 00183 try { 00184 idmap_m . insert (iid, eid, bid); 00185 } 00186 catch (Exception_t) { 00187 idmap_m . insert (piid, eid, bid); 00188 throw; 00189 } 00190 } 00191 00192 00193 //----------------------------------------------------- clean ------------------ 00194 void Bank_t::clean ( ) 00195 { 00196 if ( ! is_open_m || ! (mode_m & B_READ && mode_m & B_WRITE) ) 00197 AMOS_THROW_IO ("Cannot clean, bank not open for reading and writing"); 00198 00199 //-- Create a temporary bank of similar type and concat this bank to it 00200 Bank_t tmpbnk (banktype_m); 00201 00202 try { 00203 //-- Concat this bank to a temporary bank (cleans as a side effect) 00204 string tname = store_pfx_m + TMP_STORE_SUFFIX; 00205 mkdir (tname . c_str( ), DIR_MODE); 00206 tmpbnk . create (tname); 00207 tmpbnk . concat (*this); 00208 00209 //-- Reset this bank 00210 clear( ); 00211 fix_size_m = tmpbnk . fix_size_m; 00212 partition_size_m = tmpbnk . partition_size_m; 00213 last_bid_m = tmpbnk . last_bid_m; 00214 nbids_m = tmpbnk . nbids_m; 00215 idmap_m = tmpbnk . idmap_m; 00216 00217 //-- Link back the now cleaned partitions 00218 for ( Size_t i = 0; i != tmpbnk . npartitions_m; ++ i ) 00219 { 00220 while ( i >= npartitions_m ) 00221 addPartition (true); 00222 00223 unlink (partitions_m [i] -> fix_name . c_str( )); 00224 unlink (partitions_m [i] -> var_name . c_str( )); 00225 if ( link (tmpbnk . partitions_m [i] -> fix_name . c_str( ), 00226 partitions_m [i] -> fix_name . c_str( )) || 00227 link (tmpbnk . partitions_m [i] -> var_name . c_str( ), 00228 partitions_m [i] -> var_name . c_str( )) ) 00229 AMOS_THROW_IO ("Unknown file link error in clean, bank corrupted"); 00230 } 00231 } 00232 catch (Exception_t) { 00233 if ( tmpbnk . isOpen( ) ) 00234 tmpbnk . destroy( ); 00235 throw; 00236 } 00237 00238 //-- Destroy the temporary bank 00239 tmpbnk . destroy( ); 00240 } 00241 00242 00243 //----------------------------------------------------- clear ------------------ 00244 void Bank_t::clear ( ) 00245 { 00246 if ( ! is_open_m ) return; 00247 if ( ! (mode_m & B_WRITE) ) 00248 AMOS_THROW_IO ("Cannot clear, bank not open for writing"); 00249 00250 //-- Close, unlink and free the partition files 00251 for ( Size_t i = 0; i != npartitions_m; ++ i ) 00252 { 00253 partitions_m [i] -> fix . close( ); 00254 partitions_m [i] -> var . close( ); 00255 unlink (partitions_m [i] -> fix_name . c_str( )); 00256 unlink (partitions_m [i] -> var_name . c_str( )); 00257 delete (partitions_m [i]); 00258 } 00259 00260 last_bid_m = NULL_ID; 00261 max_bid_m = NULL_ID; 00262 nbids_m = NULL_ID; 00263 npartitions_m = 0; 00264 partition_size_m = 0; 00265 opened_m . clear( ); 00266 partitions_m . clear( ); 00267 idmap_m . clear( ); 00268 idmap_m . setType (banktype_m); 00269 } 00270 00271 00272 //----------------------------------------------------- close ------------------ 00273 void Bank_t::close ( ) 00274 { 00275 if ( ! is_open_m ) return; 00276 00277 if ( (mode_m & B_WRITE) ) 00278 { 00279 //-- Flush MAP partition 00280 string map_path (store_pfx_m + MAP_STORE_SUFFIX); 00281 ofstream map_stream (map_path . c_str( )); 00282 if ( ! map_stream . is_open( ) ) 00283 AMOS_THROW_IO ("Could not open bank partition, " + map_path); 00284 00285 idmap_m . write (map_stream); 00286 00287 if ( ! map_stream . good( ) ) 00288 AMOS_THROW_IO ("Unknown file write error in close, bank corrupted"); 00289 map_stream . close( ); 00290 } 00291 00292 //-- Close/free the partitions 00293 for ( Size_t i = 0; i < npartitions_m; i ++ ) 00294 delete (partitions_m [i]); 00295 00296 //-- Sync the IFO partition 00297 syncIFO (I_CLOSE); 00298 00299 //-- Reset 00300 init( ); 00301 } 00302 00303 00304 //----------------------------------------------------- concat ----------------- 00305 void Bank_t::concat (Bank_t & s) 00306 { 00307 if ( ! is_open_m || ! (mode_m & B_WRITE) ) 00308 AMOS_THROW_IO ("Cannot concat, bank not open for writing"); 00309 if ( ! s . is_open_m || ! (s . mode_m & B_READ) ) 00310 AMOS_THROW_IO ("Cannot concat, source bank not open for reading"); 00311 if ( banktype_m != s . banktype_m ) 00312 AMOS_THROW_ARGUMENT ("Cannot concat, incompatible bank type"); 00313 00314 Size_t size; 00315 Size_t tail = s . fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t); 00316 BankFlags_t flags; 00317 00318 Size_t buffer_size = s . fix_size_m; 00319 char * buffer = (char *) SafeMalloc (buffer_size); 00320 00321 bankstreamoff vpos; 00322 BankPartition_t * sp; 00323 BankPartition_t * tp = getLastPartition( ); 00324 00325 //-- Set up the BID lookup table 00326 const IDMap_t::HashTriple_t * stp = NULL; 00327 vector<const IDMap_t::HashTriple_t *> striples (s . last_bid_m + 1, stp); 00328 for ( IDMap_t::const_iterator idmi = s . getIDMap( ) . begin( ); 00329 idmi != s . getIDMap( ) . end( ); ++ idmi ) 00330 striples [idmi -> bid] = idmi; 00331 00332 //-- Seek to the end of current bank 00333 tp -> fix . seekp (0, ios::end); 00334 tp -> var . seekp (0, ios::end); 00335 00336 //-- For each source partition 00337 ID_t sbid = 0; 00338 for ( Size_t i = 0; i != s . npartitions_m; ++ i ) 00339 { 00340 //-- Seek to the beginning of source partition 00341 sp = s . getPartition (i); 00342 sp -> fix . seekg (0, ios::beg); 00343 00344 while ( true ) 00345 { 00346 //-- Read vpos and Bankable flags, break on EOF 00347 readLE (sp -> fix, &vpos); 00348 readLE (sp -> fix, &flags); 00349 if ( sp -> fix . eof( ) ) 00350 break; 00351 ++ sbid; 00352 00353 //-- Ignore record if deleted flag is set 00354 if ( flags . is_removed ) 00355 { 00356 sp -> fix . ignore (tail); 00357 continue; 00358 } 00359 //-- Skip to the data 00360 sp -> var . seekg (vpos); 00361 00362 //-- Get the source triple and add it to the new bank 00363 if ( (stp = striples [sbid]) != NULL ) 00364 idmap_m . insert (stp -> iid, stp -> eid, last_bid_m + 1); 00365 00366 //-- Add new partition if necessary 00367 if ( last_bid_m == max_bid_m ) 00368 { 00369 try { 00370 addPartition (true); 00371 tp = getLastPartition( ); 00372 } 00373 catch (Exception_t) { 00374 if ( stp != NULL ) 00375 { 00376 idmap_m . remove (stp -> iid); 00377 idmap_m . remove (stp -> eid); 00378 } 00379 throw; 00380 } 00381 } 00382 00383 //-- Write new vpos and copy Bankable flags 00384 vpos = (std::streamoff)tp -> var . tellp( ); 00385 writeLE (tp -> fix, &vpos); 00386 writeLE (tp -> fix, &flags); 00387 00388 //-- Copy object FIX data 00389 sp -> fix . read (buffer, tail - sizeof (Size_t)); 00390 readLE (sp -> fix, &size); 00391 tp -> fix . write (buffer, tail - sizeof (Size_t)); 00392 writeLE (tp -> fix, &size); 00393 00394 //-- Make sure buffer is big enough for VAR data, realloc if needed 00395 while ( size > buffer_size ) 00396 { 00397 buffer_size <<= 1; 00398 buffer = (char *) SafeRealloc (buffer, buffer_size); 00399 } 00400 00401 //-- Copy object VAR data 00402 sp -> var . read (buffer, size); 00403 tp -> var . write (buffer, size); 00404 00405 //-- Check the streams 00406 if ( ! sp -> fix . good( ) || ! sp -> var . good( ) ) 00407 AMOS_THROW_IO("Unknown file read error in concat, bank corrupted"); 00408 if ( ! tp -> fix . good( ) || ! tp -> var . good( ) ) 00409 AMOS_THROW_IO("Unknown file write error in concat, bank corrupted"); 00410 00411 ++ nbids_m; 00412 ++ last_bid_m; 00413 } 00414 } 00415 00416 //-- Update fix_size if needed and flush new bank info 00417 if ( fix_size_m == 0 ) 00418 fix_size_m = s . fix_size_m; 00419 00420 free (buffer); 00421 } 00422 00423 00424 //----------------------------------------------------- create ----------------- 00425 void Bank_t::create (const string & dir, BankMode_t mode) 00426 { 00427 if ( ! (mode & B_WRITE) ) 00428 AMOS_THROW_IO ("Cannot create, bank not opened for writing"); 00429 setMode (mode); 00430 00431 if ( is_open_m ) close( ); 00432 00433 //-- Destroy any pre-existing bank 00434 if ( exists (dir) ) 00435 { 00436 open (dir, mode); // side effect: resets the mode 00437 destroy( ); 00438 } 00439 00440 //TODO eliminate race conditions 00441 00442 try { 00443 //-- Initialize the bank 00444 is_open_m = true; 00445 store_dir_m = dir; 00446 store_pfx_m = dir + '/' + Decode (banktype_m); 00447 mkdir (store_dir_m . c_str( ), DIR_MODE); 00448 00449 //-- Try to create the IFO and MAP partition files 00450 touchFile (store_pfx_m + IFO_STORE_SUFFIX, FILE_MODE, true); 00451 touchFile (store_pfx_m + MAP_STORE_SUFFIX, FILE_MODE, true); 00452 00453 //-- Create the IFO partition 00454 syncIFO (I_CREATE); 00455 00456 //-- Try to create the first partition 00457 addPartition (true); 00458 } 00459 catch (Exception_t) { 00460 destroy( ); 00461 throw; 00462 } 00463 } 00464 00465 00466 //----------------------------------------------------- destroy ---------------- 00467 void Bank_t::destroy ( ) 00468 { 00469 if ( ! is_open_m || ! (mode_m & B_WRITE) ) 00470 AMOS_THROW_IO ("Cannot destroy, bank not open for writing"); 00471 00472 //-- Unlink the partitions 00473 clear( ); 00474 00475 //-- Unlink the IFO and MAP partitions 00476 unlink ((store_pfx_m + MAP_STORE_SUFFIX) . c_str( )); 00477 unlink ((store_pfx_m + IFO_STORE_SUFFIX) . c_str( )); 00478 unlink ((store_pfx_m + LCK_STORE_SUFFIX) . c_str( )); 00479 00480 //-- Remove the dir if empty 00481 rmdir (store_dir_m . c_str( )); 00482 00483 init( ); 00484 } 00485 00486 00487 //----------------------------------------------------- exists ----------------- 00488 bool Bank_t::exists (const string & dir) const 00489 { 00490 //-- Return false if insufficient permissions or absent IFO partition 00491 string ifo_path (dir + '/' + Decode (banktype_m) + IFO_STORE_SUFFIX); 00492 return ( ! access (dir . c_str( ), R_OK | X_OK) 00493 && 00494 ! access (ifo_path . c_str( ), R_OK) ); 00495 } 00496 00497 00498 //----------------------------------------------------- fetchBID --------------- 00499 void Bank_t::fetchBID (ID_t bid, IBankable_t & obj) 00500 { 00501 if ( ! is_open_m || ! (mode_m & B_READ) ) 00502 AMOS_THROW_IO ("Cannot fetch, bank not open for reading"); 00503 if (banktype_m != obj.getNCode( )) 00504 AMOS_THROW_ARGUMENT ("Cannot fetch, incompatible object type"); 00505 00506 //-- Seek to the record and read the data 00507 BankPartition_t * partition = localizeBID (bid); 00508 00509 bankstreamoff vpos; 00510 bankstreamoff off = bid * fix_size_m; 00511 partition -> fix . seekg (off, ios::beg); 00512 readLE (partition -> fix, &vpos); 00513 readLE (partition -> fix, &(obj . flags_m)); 00514 partition -> var . seekg (vpos); 00515 obj . readRecord (partition -> fix, partition -> var); 00516 00517 if ( ! partition -> fix . good( ) || ! partition -> var . good( ) ) 00518 AMOS_THROW_IO ("Unknown file read error in fetch, bank corrupted"); 00519 } 00520 00521 00522 //----------------------------------------------------- getMaxIID -------------- 00523 ID_t Bank_t::getMaxIID ( ) const 00524 { 00525 ID_t max = NULL_ID; 00526 IDMap_t::const_iterator i; 00527 00528 for ( i = idmap_m . begin( ); i != idmap_m . end( ); ++ i ) 00529 if ( i -> iid > max ) max = i -> iid; 00530 00531 return max; 00532 } 00533 00534 00535 //----------------------------------------------------- lookupBID -------------- 00536 ID_t Bank_t::lookupBID (const string & eid) const 00537 { 00538 ID_t bid = idmap_m . lookupBID (eid); 00539 if ( bid == NULL_ID || bid > last_bid_m ) 00540 { 00541 stringstream ss; 00542 ss << "EID '" << eid << "' does not exist in bank"; 00543 AMOS_THROW_ARGUMENT (ss . str( )); 00544 } 00545 return bid; 00546 } 00547 00548 00549 //----------------------------------------------------- lookupBID -------------- 00550 ID_t Bank_t::lookupBID (ID_t iid) const 00551 { 00552 ID_t bid = idmap_m . lookupBID (iid); 00553 if ( bid == NULL_ID || bid > last_bid_m ) 00554 { 00555 stringstream ss; 00556 ss << "IID '" << iid << "' does not exist in bank"; 00557 AMOS_THROW_ARGUMENT (ss . str( )); 00558 } 00559 return bid; 00560 } 00561 00562 00563 //----------------------------------------------------- init ------------------- 00564 void Bank_t::init ( ) 00565 { 00566 fix_size_m = 0; 00567 is_open_m = false; 00568 last_bid_m = NULL_ID; 00569 max_bid_m = NULL_ID; 00570 nbids_m = NULL_ID; 00571 npartitions_m = 0; 00572 partition_size_m = 0; 00573 opened_m . clear( ); 00574 partitions_m . clear( ); 00575 store_dir_m . erase( ); 00576 store_pfx_m . erase( ); 00577 idmap_m . clear( ); 00578 idmap_m . setType (banktype_m); 00579 } 00580 00581 00582 //----------------------------------------------------- lockIFO ---------------- 00583 void Bank_t::lockIFO ( ) 00584 { 00585 if ( (mode_m & B_SPY) ) return; 00586 00587 //-- Attempt to obtain the lock once every second for LOCK_TIME seconds 00588 string ifo_path (store_pfx_m + IFO_STORE_SUFFIX); 00589 string lck_path (store_pfx_m + LCK_STORE_SUFFIX); 00590 for ( int i = 0; i < LOCK_TIME; sleep(1), i ++ ) 00591 if ( ! link (ifo_path.c_str( ), lck_path.c_str( )) ) 00592 return; 00593 00594 AMOS_THROW_IO 00595 ((string)"Failed to obtain bank IFO file lock, " + strerror (errno)); 00596 } 00597 00598 00599 //----------------------------------------------------- open ------------------- 00600 void Bank_t::open (const string & dir, BankMode_t mode) 00601 { 00602 if ( is_open_m ) close( ); 00603 00604 try { 00605 //-- Initialize the bank 00606 is_open_m = true; 00607 setMode (mode); 00608 store_dir_m = dir; 00609 store_pfx_m = dir + '/' + Decode (banktype_m); 00610 00611 //-- Try to open the IFO and MAP partition files 00612 touchFile (store_pfx_m + IFO_STORE_SUFFIX, FILE_MODE, false); 00613 touchFile (store_pfx_m + MAP_STORE_SUFFIX, FILE_MODE, false); 00614 00615 //-- Read the IFO partition 00616 syncIFO (I_OPEN); 00617 00618 //-- Read the MAP partition 00619 string map_path (store_pfx_m + MAP_STORE_SUFFIX); 00620 ifstream map_stream (map_path . c_str( )); 00621 if ( ! map_stream . is_open( ) ) 00622 AMOS_THROW_IO ("Could not open bank partition, " + map_path); 00623 00624 idmap_m . read (map_stream); 00625 00626 if ( ! map_stream . good( ) ) 00627 AMOS_THROW_IO ("Unknown file read error in open, bank corrupted"); 00628 map_stream . close( ); 00629 00630 //-- Make sure nothing smells fishy 00631 if ( idmap_m . getType( ) != banktype_m || 00632 idmap_m . getSize( ) > nbids_m || 00633 nbids_m > last_bid_m || 00634 last_bid_m > max_bid_m || 00635 partitions_m . size( ) != npartitions_m ) 00636 AMOS_THROW_IO ("Unknown file read error in open, bank corrupted"); 00637 } 00638 catch (Exception_t) { 00639 init( ); 00640 throw; 00641 } 00642 } 00643 00644 00645 //----------------------------------------------------- openPartition ---------- 00646 Bank_t::BankPartition_t * Bank_t::openPartition (ID_t id) 00647 { 00648 BankPartition_t * partition = partitions_m [id]; 00649 00650 //-- If already open, return it 00651 if ( partition -> fix . is_open( ) ) 00652 return partition; 00653 00654 try { 00655 //-- Open the FIX and VAR partition files 00656 ios::openmode mode = ios::binary | ios::ate | ios::in; 00657 if ( (mode_m & B_WRITE) ) 00658 mode |= ios::out; 00659 00660 partition -> fix . open (partition -> fix_name . c_str( ), mode); 00661 if ( ! partition -> fix . is_open( ) ) 00662 AMOS_THROW_IO ("Could not open bank partition, " + partition -> fix_name); 00663 partition -> var . open (partition -> var_name . c_str( ), mode); 00664 if ( ! partition -> var . is_open( ) ) 00665 AMOS_THROW_IO ("Could not open bank partition, " + partition -> var_name); 00666 } 00667 catch (Exception_t) { 00668 partition -> fix . close( ); 00669 partition -> var . close( ); 00670 throw; 00671 } 00672 00673 //-- Add it to the open queue, making room if necessary 00674 while ( (Size_t)opened_m . size( ) >= max_partitions_m ) 00675 { 00676 opened_m . front( ) -> fix . close( ); 00677 opened_m . front( ) -> var . close( ); 00678 opened_m . pop_front( ); 00679 } 00680 opened_m . push_back (partition); 00681 00682 return partition; 00683 } 00684 00685 00686 //----------------------------------------------------- removeBID -------------- 00687 void Bank_t::removeBID (ID_t bid) 00688 { 00689 if ( ! is_open_m || ! (mode_m & B_READ && mode_m & B_WRITE) ) 00690 AMOS_THROW_IO ("Cannot remove, bank not open for reading and writing"); 00691 00692 //-- Seek to FIX record and rewrite 00693 BankPartition_t * partition = localizeBID (bid); 00694 00695 BankFlags_t flags; 00696 bankstreamoff off = bid * fix_size_m + sizeof (bankstreamoff); 00697 partition -> fix . seekg (off, ios::beg); 00698 readLE (partition -> fix, &flags); 00699 flags . is_removed = true; 00700 partition -> fix . seekp (off, ios::beg); 00701 writeLE (partition -> fix, &flags); 00702 00703 if ( ! partition -> fix . good( ) || ! partition -> var . good( ) ) 00704 AMOS_THROW_IO ("Unknown file error in remove, bank corrupted"); 00705 00706 -- nbids_m; 00707 } 00708 00709 00710 //----------------------------------------------------- replace ---------------- 00711 void Bank_t::replace (ID_t iid, IBankable_t & obj) 00712 { 00713 ID_t bid = lookupBID (iid); 00714 string peid (idmap_m . lookupEID (iid)); 00715 idmap_m . remove (iid); 00716 00717 try { 00718 idmap_m . insert (obj . iid_m, obj . eid_m, bid); 00719 } 00720 catch (Exception_t) { 00721 idmap_m . insert (iid, peid, bid); 00722 throw; 00723 } 00724 00725 try { 00726 replaceBID (bid, obj); 00727 } 00728 catch (Exception_t) { 00729 idmap_m . remove (obj . iid_m); 00730 idmap_m . insert (iid, peid, bid); 00731 throw; 00732 } 00733 } 00734 00735 00736 //----------------------------------------------------- replace ---------------- 00737 void Bank_t::replace (const string & eid, IBankable_t & obj) 00738 { 00739 ID_t bid = lookupBID (eid); 00740 ID_t piid = idmap_m . lookupIID (eid); 00741 idmap_m . remove (eid); 00742 00743 try { 00744 idmap_m . insert (obj . iid_m, obj . eid_m, bid); 00745 } 00746 catch (Exception_t) { 00747 idmap_m . insert (piid, eid, bid); 00748 throw; 00749 } 00750 00751 try { 00752 replaceBID (bid, obj); 00753 } 00754 catch (Exception_t) { 00755 idmap_m . remove (obj . eid_m); 00756 idmap_m . insert (piid, eid, bid); 00757 throw; 00758 } 00759 } 00760 00761 00762 //----------------------------------------------------- replaceBID ------------- 00763 void Bank_t::replaceBID (ID_t bid, IBankable_t & obj) 00764 { 00765 if ( ! is_open_m || ! (mode_m & B_READ && mode_m & B_WRITE) ) 00766 AMOS_THROW_IO ("Cannot replace, bank not open for reading and writing"); 00767 if ( banktype_m != obj.getNCode( ) ) 00768 AMOS_THROW_ARGUMENT ("Cannot replace, incompatible object type"); 00769 00770 //-- Set the modified flag 00771 obj . flags_m . is_removed = false; 00772 obj . flags_m . is_modified = true; 00773 00774 //-- Seek to and write new record 00775 BankPartition_t * partition = localizeBID (bid); 00776 00777 bankstreamoff off = bid * fix_size_m; 00778 partition -> fix . seekp (off, ios::beg); 00779 partition -> var . seekp (0, ios::end); 00780 bankstreamoff vpos = partition -> var . tellp( ); 00781 writeLE (partition -> fix, &vpos); 00782 writeLE (partition -> fix, &(obj . flags_m)); 00783 obj . writeRecord (partition -> fix, partition -> var); 00784 Size_t vsize = (std::streamoff)partition -> var . tellp( ) - vpos; 00785 writeLE (partition -> fix, &vsize); 00786 00787 if ( ! partition -> fix . good( ) || ! partition -> var . good( ) ) 00788 AMOS_THROW_IO ("Unknown file error in replace, bank corrupted"); 00789 } 00790 00791 00792 //----------------------------------------------------- syncIFO ---------------- 00793 void Bank_t::syncIFO (IFOMode_t mode) 00794 { 00795 //-- B_SPY sneak out 00796 if ( (mode_m & B_SPY) && mode != I_OPEN ) 00797 return; 00798 00799 //-- Generate lock string 00800 ostringstream ss; 00801 char * user = getenv ("USER"); 00802 ss << ((mode_m & B_WRITE) ? WRITE_LOCK_CHAR : READ_LOCK_CHAR) 00803 << ' ' << getuid( ) << ' ' << (user == NULL ? "null" : user); 00804 string lock (ss . str( )); 00805 00806 00807 string line; 00808 NCode_t banktype; 00809 ID_t nbids, last_bid; 00810 Size_t fix_size, npartitions, partition_size; 00811 vector<string> locks; 00812 vector<string>::iterator vi; 00813 00814 //-- Obtain a lock on the IFO store 00815 lockIFO( ); 00816 00817 try { 00818 00819 //-- Read IFO partition 00820 if ( mode == I_OPEN || mode == I_CLOSE ) 00821 { 00822 string ifo_path (store_pfx_m + IFO_STORE_SUFFIX); 00823 ifstream ifo_stream (ifo_path . c_str( )); 00824 if ( ! ifo_stream . is_open( ) ) 00825 AMOS_THROW_IO ("Could not open bank partition, " + ifo_path); 00826 00827 getline (ifo_stream, line, '='); 00828 ifo_stream >> line; // bank version 00829 if ( line != BANK_VERSION ) 00830 AMOS_THROW_IO 00831 ("Could not read bank, incompatible bank version " + line); 00832 getline (ifo_stream, line, '='); 00833 ifo_stream >> banktype; // bank type 00834 if ( banktype != banktype_m ) 00835 AMOS_THROW_IO 00836 ("Could not read bank, incompatible type " + Decode (banktype)); 00837 00838 getline (ifo_stream, line, '='); 00839 ifo_stream >> nbids; // number of objects 00840 getline (ifo_stream, line, '='); 00841 ifo_stream >> last_bid; // last index 00842 getline (ifo_stream, line, '='); 00843 ifo_stream >> fix_size; // index size (in bytes) 00844 getline (ifo_stream, line, '='); 00845 ifo_stream >> npartitions; // number of partitions 00846 getline (ifo_stream, line, '='); 00847 ifo_stream >> partition_size; // partition size (in indices) 00848 getline (ifo_stream, line, '='); // "locks =" 00849 getline (ifo_stream, line); 00850 00851 if ( ! ifo_stream . good( ) ) 00852 AMOS_THROW_IO ("Unknown file read error in sync, bank corrupted"); 00853 00854 //-- Read existing bank locks 00855 bool noskip = (mode == I_OPEN); 00856 getline (ifo_stream, line); 00857 while ( ifo_stream . good( ) ) 00858 { 00859 if ( noskip || line != lock ) 00860 locks . push_back (line); // add bank lock 00861 else 00862 noskip = true; // skipped self lock 00863 getline (ifo_stream, line); 00864 } 00865 ifo_stream . close( ); 00866 00867 //-- If seeing this for the first time 00868 if ( mode == I_OPEN ) 00869 { 00870 nbids_m = nbids; 00871 last_bid_m = last_bid; 00872 fix_size_m = fix_size; 00873 partition_size_m = partition_size; 00874 00875 //-- Update the partition list 00876 try { 00877 while ( npartitions > npartitions_m ) 00878 addPartition (false); 00879 } 00880 catch (Exception_t) { 00881 for ( Size_t i = 0; i != npartitions_m; ++ i ) 00882 delete (partitions_m [i]); 00883 npartitions_m = 0; 00884 throw; 00885 } 00886 } 00887 } 00888 00889 00890 //-- Validate existing locks 00891 char ltype = NULL_CHAR; 00892 vector<string>::iterator vj; 00893 for ( vi = locks . begin( ); vi != locks . end( ); ++ vi ) 00894 { 00895 vj = vi; 00896 ltype = vi -> empty( ) ? NULL_CHAR : (*vi)[0]; 00897 switch ( ltype ) 00898 { 00899 case READ_LOCK_CHAR: 00900 break; 00901 case WRITE_LOCK_CHAR: 00902 if ( locks . size( ) == 1 ) 00903 break; 00904 // fall-through 00905 default: 00906 AMOS_THROW_IO ("Invalid bank partition lock, bank corrupted"); 00907 } 00908 } 00909 00910 00911 //-- B_SPY sneak out 00912 if ( mode_m & B_SPY ) 00913 { 00914 if ( ltype == WRITE_LOCK_CHAR ) 00915 cerr << "WARNING: Disregarding '" << Decode (banktype_m) 00916 << "' bank lock, locked by '" + *vj + "'" << endl; 00917 return; 00918 } 00919 00920 00921 //-- Check existing locks 00922 if ( (mode_m & B_READ) && ltype == WRITE_LOCK_CHAR ) 00923 AMOS_THROW_IO 00924 ("Could not open bank for reading, locked by '" + *vj + "'"); 00925 if ( (mode_m & B_WRITE) && ltype == WRITE_LOCK_CHAR ) 00926 AMOS_THROW_IO 00927 ("Could not open bank for writing, locked by '" + *vj + "'"); 00928 if ( (mode_m & B_WRITE) && ltype == READ_LOCK_CHAR ) 00929 AMOS_THROW_IO 00930 ("Could not open bank for writing, locked by '" + *vj + "'"); 00931 00932 00933 //-- Add new lock 00934 if ( mode != I_CLOSE ) 00935 locks . push_back (lock); 00936 00937 00938 //-- Dump memory if writing 00939 if ( (mode_m & B_WRITE) ) 00940 { 00941 nbids = nbids_m; 00942 last_bid = last_bid_m; 00943 fix_size = fix_size_m; 00944 npartitions = npartitions_m; 00945 partition_size = partition_size_m; 00946 } 00947 00948 00949 //-- Write IFO partition 00950 string ifo_path (store_pfx_m + IFO_STORE_SUFFIX); 00951 ofstream ifo_stream (ifo_path . c_str( )); 00952 if ( ! ifo_stream . is_open( ) ) 00953 AMOS_THROW_IO ("Could not open bank partition, " + ifo_path); 00954 00955 ifo_stream 00956 << "____" << Decode (banktype_m) << " BANK INFORMATION____" << endl 00957 << "bank version = " << BANK_VERSION << endl 00958 << "bank type = " << banktype_m << endl 00959 << "objects = " << nbids << endl 00960 << "indices = " << last_bid << endl 00961 << "bytes/index = " << fix_size << endl 00962 << "partitions = " << npartitions << endl 00963 << "indices/partition = " << partition_size << endl 00964 << "locks = " << endl; 00965 00966 //-- Write updated locks 00967 for ( vi = locks . begin( ); vi != locks . end( ); ++ vi ) 00968 ifo_stream << *vi << endl; 00969 00970 if ( ! ifo_stream . good( ) ) 00971 AMOS_THROW_IO ("Unknown file write error in sync, bank corrupted"); 00972 ifo_stream . close( ); 00973 00974 } 00975 catch (Exception_t) { 00976 unlockIFO( ); 00977 throw; 00978 } 00979 00980 //-- Release lock on the IFO store 00981 unlockIFO( ); 00982 } 00983 00984 00985 //----------------------------------------------------- touchFile -------------- 00986 void Bank_t::touchFile (const string & path, int mode, bool create) 00987 { 00988 int fd, flags; 00989 00990 if ( (mode_m & B_SPY) ) 00991 flags = O_RDONLY; 00992 else // need both read/write for file locks 00993 flags = O_RDWR; 00994 00995 if ( create ) 00996 flags |= O_CREAT | O_TRUNC; 00997 00998 fd = ::open (path . c_str( ), flags, mode); 00999 if ( fd == -1 ) 01000 { 01001 if ( create ) 01002 AMOS_THROW_IO 01003 ("Could not create bank file, " + path + ", " + strerror (errno)); 01004 else 01005 AMOS_THROW_IO 01006 ("Could not open bank file, " + path + ", " + strerror (errno)); 01007 } 01008 01009 fd = ::close (fd); 01010 if ( fd == -1 ) 01011 AMOS_THROW_IO 01012 ("Could not close bank file, " + path + ", " + strerror (errno)); 01013 } 01014 01015 01016 //----------------------------------------------------- unlockIFO -------------- 01017 void Bank_t::unlockIFO ( ) 01018 { 01019 if ( (mode_m & B_SPY) ) return; 01020 01021 //-- Attempt to release the lock 01022 string lck_path (store_pfx_m + LCK_STORE_SUFFIX); 01023 if ( unlink (lck_path . c_str( )) ) 01024 AMOS_THROW_IO 01025 ((string)"Failed to release bank IFO file lock, " + strerror (errno)); 01026 } 01027 01028 01029 //--------------------------------------------------- BankExists --------------- 01030 bool AMOS::BankExists (NCode_t ncode, const string & dir) 01031 { 01032 //-- Return false if insufficient permissions or absent IFO partition 01033 string ifo_path (dir + '/' + Decode (ncode) + Bank_t::IFO_STORE_SUFFIX); 01034 return ( ! access (dir . c_str( ), R_OK | X_OK) 01035 && 01036 ! access (ifo_path . c_str( ), R_OK) ); 01037 } 01038 01039 01040 //--------------------------------------------------- PrintBankVersion --------- 01041 void AMOS::PrintBankVersion (const char * s) 01042 { 01043 cerr << s << " compiled for bank version " << Bank_t::BANK_VERSION << endl; 01044 } 01045 01046 01047 01048 //================================================ BankPartition_t ============= 01049 //----------------------------------------------------- BankPartition_t -------- 01050 Bank_t::BankPartition_t::BankPartition_t (Size_t buffer_size) 01051 { 01052 fix_buff = (char *) SafeMalloc (buffer_size); 01053 var_buff = (char *) SafeMalloc (buffer_size); 01054 01055 fix . rdbuf( ) -> _PUBSETBUF_ (fix_buff, buffer_size); 01056 var . rdbuf( ) -> _PUBSETBUF_ (var_buff, buffer_size); 01057 } 01058 01059 01060 //----------------------------------------------------- ~BankPartition_t ------- 01061 Bank_t::BankPartition_t::~BankPartition_t ( ) 01062 { 01063 fix . close( ); 01064 var . close( ); 01065 01066 free (fix_buff); 01067 free (var_buff); 01068 }

Generated on Tue May 17 15:19:01 2005 for libAMOS by doxygen 1.3.8