Bank_AMOS.cc

Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 #include "Bank_AMOS.hh"
00011 #include "Message_AMOS.hh"
00012 #include <sstream>
00013 #include <sys/types.h>
00014 #include <sys/stat.h>
00015 #include <fcntl.h>
00016 #include <unistd.h>
00017 #include <cstdlib>
00018 #include <cstdio>
00019 #include <ctime>
00020 #include <cstring>
00021 #include <sstream>
00022 #include <iostream>
00023 using namespace AMOS;
00024 using namespace std;
00025 
00026 #define _PUBSETBUF_ pubsetbuf
00027 #ifdef __GNUC__
00028 # if __GNUC__ < 3
00029 #  undef _PUBSETBUF_
00030 #  define _PUBSETBUF_ setbuf
00031 # endif
00032 #endif
00033 
00034 #define LOCK_TIME 5
00035 #define DIR_MODE  00755
00036 #define FILE_MODE 00644
00037 
00038 
00039 
00040 
00041 //================================================ Bank_t ======================
00042 const Size_t Bank_t::DEFAULT_BUFFER_SIZE    = 1024;
00043 const Size_t Bank_t::DEFAULT_PARTITION_SIZE = 1000000;
00044 const Size_t Bank_t::MAX_OPEN_PARTITIONS    = 20;
00045 
00046 const string Bank_t::BANK_VERSION     =  "2.8";
00047 
00048 const string Bank_t::FIX_STORE_SUFFIX = ".fix";
00049 const string Bank_t::IFO_STORE_SUFFIX = ".ifo";
00050 const string Bank_t::LCK_STORE_SUFFIX = ".lck";
00051 const string Bank_t::VAR_STORE_SUFFIX = ".var";
00052 const string Bank_t::MAP_STORE_SUFFIX = ".map";
00053 const string Bank_t::TMP_STORE_SUFFIX = ".tmp";
00054 
00055 const char Bank_t::WRITE_LOCK_CHAR    = 'w';
00056 const char Bank_t::READ_LOCK_CHAR     = 'r';
00057 
00058 
00059 //----------------------------------------------------- addPartition -----------
00060 void Bank_t::addPartition (bool create)
00061 {
00062   //-- Allocate the new partition and add it to the list
00063   BankPartition_t * partition = new BankPartition_t (buffer_size_m);
00064   partitions_m.push_back (partition);
00065 
00066   try {
00067     ostringstream ss;
00068 
00069     ss << store_pfx_m << '.' << npartitions_m << FIX_STORE_SUFFIX;
00070     partition->fix_name = ss.str();
00071     ss.str (NULL_STRING);
00072 
00073     ss << store_pfx_m << '.' << npartitions_m << VAR_STORE_SUFFIX;
00074     partition->var_name = ss.str();
00075     ss.str (NULL_STRING);
00076 
00077     //-- Try to create/open the FIX and VAR partition files
00078     touchFile (partition->fix_name, FILE_MODE, create);
00079     touchFile (partition->var_name, FILE_MODE, create);
00080   }
00081   catch (Exception_t) {
00082     partitions_m.pop_back();
00083     delete partition;
00084     throw;
00085   }
00086 
00087   //-- If partition size is unset, use the default
00088   if ( partition_size_m == 0 )
00089     partition_size_m = DEFAULT_PARTITION_SIZE;
00090 
00091   //-- New partition, so new max index and number of partitions
00092   max_bid_m = ++ npartitions_m * partition_size_m;
00093 }
00094 
00095 
00096 //----------------------------------------------------- append -----------------
00097 void Bank_t::append (IBankable_t & obj)
00098 {
00099   //-- Insert the ID triple into the map (may throw exception)
00100   idmap_m.insert (obj.iid_m, obj.eid_m, last_bid_m + 1);
00101 
00102   try {
00103     appendBID (obj);
00104   }
00105   catch (Exception_t) {
00106     idmap_m.remove (obj.iid_m);
00107     idmap_m.remove (obj.eid_m);
00108     throw;
00109   }
00110 }
00111 
00112 
00113 //----------------------------------------------------- appendBID --------------
00114 void Bank_t::appendBID (IBankable_t & obj)
00115 {
00116   if ( ! is_open_m  ||  ! (mode_m & B_WRITE) )
00117     AMOS_THROW_IO ("Cannot append, bank not open for writing");
00118   if ( banktype_m != obj.getNCode() )
00119     AMOS_THROW_ARGUMENT ("Cannot append, incompatible object type");
00120 
00121   //-- Add another partition if necessary
00122   if ( last_bid_m == max_bid_m )
00123     addPartition (true);
00124 
00125   BankPartition_t * partition = getLastPartition();
00126 
00127   //-- Prepare the object for append
00128   obj.flags_m.is_removed  = false;
00129   obj.flags_m.is_modified = false;
00130 
00131   //-- data is written in the following order to the FIX and VAR streams
00132   //   FIX = [VAR streampos] [BankableFlags] [OBJECT FIX] [VAR size]
00133   //   VAR = [OBJECT VAR]
00134   partition->fix.seekp (0, ios::end);
00135   partition->var.seekp (0, ios::end);
00136   bankstreamoff fpos = partition->fix.tellp();
00137   bankstreamoff vpos = partition->var.tellp();
00138   writeLE (partition->fix, &vpos);
00139   writeLE (partition->fix, &(obj.flags_m));
00140   obj.writeRecord (partition->fix, partition->var);
00141   Size_t vsize = (std::streamoff)partition->var.tellp() - vpos;
00142   writeLE (partition->fix, &vsize);
00143 
00144   //-- If fix_size is not yet known, calculate it
00145   Size_t fsize = (std::streamoff)partition->fix.tellp() - fpos;
00146   if ( fix_size_m == 0 )
00147     fix_size_m = fsize;
00148 
00149   if ( fix_size_m != fsize  ||
00150        partition->fix.fail()  ||
00151        partition->var.fail() )
00152     AMOS_THROW_IO ("Unknown file write error in append, bank corrupted");
00153 
00154   ++ nbids_m;
00155   ++ last_bid_m;
00156 }
00157 
00158 
00159 //----------------------------------------------------- assignEID --------------
00160 void Bank_t::assignEID (ID_t iid, const string & eid)
00161 {
00162   ID_t bid = lookupBID (iid);
00163   string peid (idmap_m.lookupEID (iid));
00164   idmap_m.remove (iid);
00165 
00166   try {
00167     idmap_m.insert (iid, eid, bid);
00168   }
00169   catch (Exception_t) {
00170     idmap_m.insert (iid, peid, bid);
00171     throw;
00172   }
00173 }
00174 
00175 
00176 //----------------------------------------------------- assignIID --------------
00177 void Bank_t::assignIID (const string & eid, ID_t iid)
00178 {
00179   ID_t bid = lookupBID (eid);
00180   ID_t piid = idmap_m.lookupIID (eid);
00181   idmap_m.remove (eid);
00182 
00183   try {
00184     idmap_m.insert (iid, eid, bid);
00185   }
00186   catch (Exception_t) {
00187     idmap_m.insert (piid, eid, bid);
00188     throw;
00189   }
00190 }
00191 
00192 
00193 //----------------------------------------------------- clean ------------------
00194 void Bank_t::clean()
00195 {
00196   if ( ! is_open_m  ||  ! (mode_m & B_READ  &&  mode_m & B_WRITE) )
00197     AMOS_THROW_IO ("Cannot clean, bank not open for reading and writing");
00198 
00199   //-- Create a temporary bank of similar type and concat this bank to it
00200   Bank_t tmpbnk (banktype_m);
00201 
00202   try {
00203     //-- Concat this bank to a temporary bank (cleans as a side effect)
00204     string tname = store_pfx_m + TMP_STORE_SUFFIX;
00205     mkdir (tname.c_str(), DIR_MODE);
00206     tmpbnk.create (tname);
00207     tmpbnk.concat (*this);
00208 
00209     //-- Reset this bank
00210     clear();
00211     fix_size_m       = tmpbnk.fix_size_m;
00212     partition_size_m = tmpbnk.partition_size_m;
00213     last_bid_m       = tmpbnk.last_bid_m;
00214     nbids_m          = tmpbnk.nbids_m;
00215     idmap_m          = tmpbnk.idmap_m;
00216     
00217     //-- Link back the now cleaned partitions
00218     for ( Size_t i = 0; i != tmpbnk.npartitions_m; ++ i )
00219       {
00220         while ( i >= npartitions_m )
00221           addPartition (true);
00222 
00223         unlink (partitions_m [i]->fix_name.c_str());
00224         unlink (partitions_m [i]->var_name.c_str());
00225         if ( link (tmpbnk.partitions_m [i]->fix_name.c_str(),
00226                             partitions_m [i]->fix_name.c_str())  ||
00227              link (tmpbnk.partitions_m [i]->var_name.c_str(),
00228                             partitions_m [i]->var_name.c_str()) )
00229           AMOS_THROW_IO ("Unknown file link error in clean, bank corrupted");
00230       }
00231   }
00232   catch (Exception_t) {
00233     if ( tmpbnk.isOpen() )
00234       tmpbnk.destroy();
00235     throw;
00236   }
00237 
00238   //-- Destroy the temporary bank
00239   tmpbnk.destroy();
00240 }
00241 
00242 
00243 //----------------------------------------------------- clear ------------------
00244 void Bank_t::clear()
00245 {
00246   if ( ! is_open_m ) return;
00247   if ( ! (mode_m & B_WRITE) )
00248     AMOS_THROW_IO ("Cannot clear, bank not open for writing");
00249 
00250   //-- Close, unlink and free the partition files
00251   for ( Size_t i = 0; i != npartitions_m; ++ i )
00252     {
00253       partitions_m [i]->fix.close();
00254       partitions_m [i]->var.close();
00255       unlink (partitions_m [i]->fix_name.c_str());
00256       unlink (partitions_m [i]->var_name.c_str());
00257       delete (partitions_m [i]);
00258     }
00259 
00260   last_bid_m    = NULL_ID;
00261   max_bid_m     = NULL_ID;
00262   nbids_m       = NULL_ID;
00263   npartitions_m    = 0;
00264   partition_size_m = 0;
00265   opened_m    .clear();
00266   partitions_m.clear();  
00267   idmap_m     .clear();
00268   idmap_m     .setType (banktype_m);
00269 }
00270 
00271 
00272 //----------------------------------------------------- close ------------------
00273 void Bank_t::close()
00274 {
00275   if ( ! is_open_m ) return;
00276 
00277   if ( (mode_m & B_WRITE) )
00278     {
00279       //-- Flush MAP partition
00280       string map_path (store_pfx_m + MAP_STORE_SUFFIX);
00281       ofstream map_stream (map_path.c_str());
00282       if ( ! map_stream.is_open() )
00283         AMOS_THROW_IO ("Could not open bank partition, " + map_path);
00284       
00285       idmap_m.write (map_stream);
00286       
00287       if ( map_stream.fail() )
00288         AMOS_THROW_IO ("Unknown file write error in close, bank corrupted");
00289       map_stream.close();
00290     }
00291   
00292   //-- Close/free the partitions
00293   for ( Size_t i = 0; i < npartitions_m; i ++ )
00294     delete (partitions_m [i]);
00295 
00296   //-- Sync the IFO partition
00297   syncIFO (I_CLOSE);
00298 
00299   //-- Reset
00300   init();
00301 }
00302 
00303 
00304 //----------------------------------------------------- concat -----------------
00305 void Bank_t::concat (Bank_t & s)
00306 {
00307   if ( ! is_open_m  ||  ! (mode_m & B_WRITE) )
00308     AMOS_THROW_IO ("Cannot concat, bank not open for writing");
00309   if ( ! s.is_open_m  ||  ! (s.mode_m & B_READ) )
00310     AMOS_THROW_IO ("Cannot concat, source bank not open for reading");
00311   if ( banktype_m != s.banktype_m )
00312     AMOS_THROW_ARGUMENT ("Cannot concat, incompatible bank type");
00313 
00314   Size_t size;
00315   Size_t tail = s.fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t);
00316   BankFlags_t flags;
00317 
00318   Size_t buffer_size = s.fix_size_m;
00319   char * buffer = (char *) SafeMalloc (buffer_size);
00320 
00321   bankstreamoff vpos;
00322   BankPartition_t * sp;
00323   BankPartition_t * tp = getLastPartition();
00324 
00325   //-- Set up the BID lookup table
00326   const IDMap_t::HashTriple_t * stp = NULL;
00327   vector<const IDMap_t::HashTriple_t *> striples (s.last_bid_m + 1, stp);
00328   for ( IDMap_t::const_iterator idmi = s.getIDMap().begin();
00329         idmi != s.getIDMap().end(); ++ idmi )
00330     striples [idmi->bid] = idmi;
00331 
00332   //-- Seek to the end of current bank
00333   tp->fix.seekp (0, ios::end);
00334   tp->var.seekp (0, ios::end);
00335 
00336   //-- For each source partition
00337   ID_t sbid = 0;
00338   for ( Size_t i = 0; i != s.npartitions_m; ++ i )
00339     {
00340       //-- Seek to the beginning of source partition
00341       sp = s.getPartition (i);
00342 
00343       sp->fix.seekg (0);
00344 
00345       while ( true )
00346         {
00347           //-- Read vpos and Bankable flags, break on EOF
00348           readLE (sp->fix, &vpos);
00349           readLE (sp->fix, &flags);
00350           if ( sp->fix.eof() )
00351             break;
00352           ++ sbid;
00353 
00354           //-- Ignore record if deleted flag is set
00355           if ( flags.is_removed )
00356             {
00357               sp->fix.ignore (tail);
00358               continue;
00359             }
00360           //-- Skip to the data
00361           sp->var.seekg (vpos);
00362 
00363           //-- Get the source triple and add it to the new bank
00364           if ( (stp = striples [sbid]) != NULL )
00365             idmap_m.insert (stp->iid, stp->eid, last_bid_m + 1);
00366 
00367           //-- Add new partition if necessary
00368           if ( last_bid_m == max_bid_m )
00369             {
00370               try {
00371                 addPartition (true);
00372                 tp = getLastPartition();
00373               }
00374               catch (Exception_t) {
00375                 if ( stp != NULL )
00376                   {
00377                     idmap_m.remove (stp->iid);
00378                     idmap_m.remove (stp->eid);
00379                   }
00380                 throw;
00381               }
00382             }
00383 
00384           //-- Write new vpos and copy Bankable flags
00385           vpos = (std::streamoff)tp->var.tellp();
00386           writeLE (tp->fix, &vpos);
00387           writeLE (tp->fix, &flags);
00388 
00389           //-- Copy object FIX data
00390           sp->fix.read (buffer, tail - sizeof (Size_t));
00391           readLE (sp->fix, &size);
00392           tp->fix.write (buffer, tail - sizeof (Size_t));
00393           writeLE (tp->fix, &size);
00394 
00395           //-- Make sure buffer is big enough for VAR data, realloc if needed
00396           while ( size > buffer_size )
00397             {
00398               buffer_size <<= 1;
00399               buffer = (char *) SafeRealloc (buffer, buffer_size);
00400             }
00401 
00402           //-- Copy object VAR data
00403           sp->var.read (buffer, size);
00404           tp->var.write (buffer, size);
00405 
00406           //-- Check the streams
00407           if ( sp->fix.fail()  ||  sp->var.fail() )
00408             AMOS_THROW_IO("Unknown file read error in concat, bank corrupted");
00409           if ( tp->fix.fail()  ||  tp->var.fail() )
00410             AMOS_THROW_IO("Unknown file write error in concat, bank corrupted");
00411 
00412           ++ nbids_m;
00413           ++ last_bid_m;
00414         }
00415     }
00416 
00417   //-- Update fix_size if needed and flush new bank info
00418   if ( fix_size_m == 0 )
00419     fix_size_m = s.fix_size_m;
00420 
00421   free (buffer);
00422 }
00423 
00424 
00425 //----------------------------------------------------- create -----------------
00426 void Bank_t::create (const string & dir, BankMode_t mode)
00427 {
00428   if ( ! (mode & B_WRITE) )
00429     AMOS_THROW_IO ("Cannot create, bank not opened for writing");
00430   setMode (mode);
00431 
00432   if ( is_open_m ) close();
00433 
00434   //-- Destroy any pre-existing bank
00435   if ( exists (dir) )
00436     {
00437       open (dir, mode); // side effect: resets the mode
00438       destroy();
00439     }
00440 
00441   //TODO eliminate race conditions
00442 
00443   try {
00444     //-- Initialize the bank
00445     is_open_m = true;
00446     store_dir_m = dir;
00447     store_pfx_m = dir + '/' + Decode (banktype_m);
00448     mkdir (store_dir_m.c_str(), DIR_MODE);
00449 
00450     //-- Try to create the IFO and MAP partition files
00451     touchFile (store_pfx_m + IFO_STORE_SUFFIX, FILE_MODE, true);
00452     touchFile (store_pfx_m + MAP_STORE_SUFFIX, FILE_MODE, true);
00453 
00454     //-- Create the IFO partition
00455     syncIFO (I_CREATE);
00456 
00457     //-- Try to create the first partition
00458     addPartition (true);
00459   }
00460   catch (Exception_t) {
00461     destroy();
00462     throw;
00463   }
00464 }
00465 
00466 
00467 //----------------------------------------------------- destroy ----------------
00468 void Bank_t::destroy()
00469 {
00470   if ( ! is_open_m  ||  ! (mode_m & B_WRITE) )
00471     AMOS_THROW_IO ("Cannot destroy, bank not open for writing");
00472 
00473   //-- Unlink the partitions
00474   clear();
00475 
00476   //-- Unlink the IFO and MAP partitions
00477   unlink ((store_pfx_m + MAP_STORE_SUFFIX).c_str());
00478   unlink ((store_pfx_m + IFO_STORE_SUFFIX).c_str());
00479   unlink ((store_pfx_m + LCK_STORE_SUFFIX).c_str());
00480 
00481   //-- Remove the dir if empty
00482   rmdir (store_dir_m.c_str());
00483 
00484   init();
00485 }
00486 
00487 
00488 //----------------------------------------------------- exists -----------------
00489 bool Bank_t::exists (const string & dir) const
00490 {
00491   //-- Return false if insufficient permissions or absent IFO partition
00492   string ifo_path (dir + '/' + Decode (banktype_m) + IFO_STORE_SUFFIX);
00493   return ( ! access (dir.c_str(), R_OK | X_OK)
00494            &&
00495            ! access (ifo_path.c_str(), R_OK) );
00496 }
00497 
00498 
00499 //----------------------------------------------------- fetchBID ---------------
00500 void Bank_t::fetchBID (ID_t bid, IBankable_t & obj)
00501 {
00502   if ( ! is_open_m  ||  ! (mode_m & B_READ) )
00503     AMOS_THROW_IO ("Cannot fetch, bank not open for reading");
00504   if (banktype_m != obj.getNCode())
00505     AMOS_THROW_ARGUMENT ("Cannot fetch, incompatible object type");
00506 
00507   //-- Seek to the record and read the data
00508   BankPartition_t * partition = localizeBID (bid);
00509 
00510   bankstreamoff vpos;
00511   bankstreamoff off = bid * fix_size_m;
00512   partition->fix.seekg (off);
00513   readLE (partition->fix, &vpos);
00514   readLE (partition->fix, &(obj.flags_m));
00515   partition->var.seekg (vpos);
00516   obj.readRecord (partition->fix, partition->var);
00517   partition->fix.ignore (sizeof (Size_t));
00518 
00519   if ( partition->fix.fail()  ||  partition->var.fail() )
00520     AMOS_THROW_IO ("Unknown file read error in fetch, bank corrupted");
00521 }
00522 
00523 
00524 void Bank_t::fetchBIDFix(ID_t bid, IBankable_t & obj)
00525 {
00526   if ( ! is_open_m  ||  ! (mode_m & B_READ) )
00527     AMOS_THROW_IO ("Cannot fetch, bank not open for reading");
00528   if (banktype_m != obj.getNCode())
00529     AMOS_THROW_ARGUMENT ("Cannot fetch, incompatible object type");
00530 
00531   //-- Seek to the record and read the data
00532   BankPartition_t * partition = localizeBID (bid);
00533 
00534   bankstreamoff vpos;
00535   bankstreamoff off = bid * fix_size_m;
00536   partition->fix.seekg (off);
00537   readLE (partition->fix, &vpos);
00538   readLE (partition->fix, &(obj.flags_m));
00539   obj.readRecordFix (partition->fix);
00540   partition->fix.ignore (sizeof (Size_t));
00541 
00542   if ( partition->fix.fail())
00543     AMOS_THROW_IO ("Unknown file read error in fetch, bank corrupted");
00544 }
00545 
00546 
00547 //----------------------------------------------------- getMaxIID --------------
00548 ID_t Bank_t::getMaxIID() const
00549 {
00550   ID_t max = NULL_ID;
00551   IDMap_t::const_iterator i;
00552 
00553   for ( i = idmap_m.begin(); i != idmap_m.end(); ++ i )
00554     if ( i->iid > max ) max = i->iid;
00555 
00556   return max;
00557 }
00558 
00559 //----------------------------------------------------- getMaxBID --------------
00560 ID_t Bank_t::getMaxBID() const
00561 {
00562   ID_t max = NULL_ID;
00563   IDMap_t::const_iterator i;
00564 
00565   for ( i = idmap_m.begin(); i != idmap_m.end(); ++ i )
00566     if ( i->bid > max ) max = i->bid;
00567 
00568   return max;
00569 }
00570 
00571 
00572 //----------------------------------------------------- lookupBID --------------
00573 ID_t Bank_t::lookupBID (const string & eid) const
00574 {
00575   ID_t bid = idmap_m.lookupBID (eid);
00576   if ( bid == NULL_ID || bid > last_bid_m )
00577     {
00578       stringstream ss;
00579       ss << "EID '" << eid << "' does not exist in bank";
00580       AMOS_THROW_ARGUMENT (ss.str());
00581     }
00582   return bid;
00583 }
00584 
00585 
00586 //----------------------------------------------------- lookupBID --------------
00587 ID_t Bank_t::lookupBID (ID_t iid) const
00588 {
00589   ID_t bid = idmap_m.lookupBID (iid);
00590   if ( bid == NULL_ID || bid > last_bid_m )
00591     {
00592       stringstream ss;
00593       ss << "IID '" << iid << "' does not exist in bank";
00594       AMOS_THROW_ARGUMENT (ss.str());
00595     }
00596   return bid;
00597 }
00598 
00599 
00600 //----------------------------------------------------- init -------------------
00601 void Bank_t::init()
00602 {
00603   fix_size_m       = 0;
00604   is_open_m        = false;
00605   last_bid_m       = NULL_ID;
00606   max_bid_m        = NULL_ID;
00607   nbids_m          = NULL_ID;
00608   npartitions_m    = 0;
00609   partition_size_m = 0;
00610   opened_m    .clear();
00611   partitions_m.clear();
00612   store_dir_m .erase();
00613   store_pfx_m .erase();
00614   idmap_m     .clear();
00615   idmap_m     .setType (banktype_m);
00616 }
00617 
00618 
00619 //----------------------------------------------------- lockIFO ----------------
00620 void Bank_t::lockIFO()
00621 {
00622   if ( (mode_m & B_SPY) ) return;
00623 
00624   //-- Attempt to obtain the lock once every second for LOCK_TIME seconds
00625   string ifo_path (store_pfx_m + IFO_STORE_SUFFIX);
00626   string lck_path (store_pfx_m + LCK_STORE_SUFFIX);
00627   for ( int i = 0; i < LOCK_TIME; sleep(1), i ++ )
00628     if ( ! link (ifo_path.c_str(), lck_path.c_str()) )
00629       return;
00630 
00631   AMOS_THROW_IO
00632     ((string)"Failed to obtain bank IFO file lock, " + strerror (errno));
00633 }
00634 
00635 
00636 //----------------------------------------------------- open -------------------
00637 void Bank_t::open (const string & dir, BankMode_t mode)
00638 {
00639   if ( is_open_m ) close();
00640 
00641   try {
00642     //-- Initialize the bank
00643     is_open_m   = true;
00644     setMode (mode);
00645     store_dir_m = dir;
00646     store_pfx_m = dir + '/' + Decode (banktype_m);
00647 
00648     //-- Try to open the IFO and MAP partition files
00649     touchFile (store_pfx_m + IFO_STORE_SUFFIX, FILE_MODE, false);
00650     touchFile (store_pfx_m + MAP_STORE_SUFFIX, FILE_MODE, false);
00651 
00652     //-- Read the IFO partition
00653     syncIFO (I_OPEN);
00654 
00655     //-- Read the MAP partition
00656     string map_path (store_pfx_m + MAP_STORE_SUFFIX);
00657 
00658     idmap_m.read(map_path);
00659 
00660     //-- Make sure nothing smells fishy
00661     if ( idmap_m.getType() != banktype_m  ||
00662          idmap_m.getSize() > nbids_m  ||
00663          nbids_m > last_bid_m  ||
00664          last_bid_m > max_bid_m  ||
00665          partitions_m.size() != npartitions_m )
00666       AMOS_THROW_IO ("Unknown file read error in open, bank corrupted");
00667   }
00668   catch (Exception_t) {
00669     init();
00670     throw;
00671   }
00672 }
00673 
00674 
00675 //----------------------------------------------------- openPartition ----------
00676 Bank_t::BankPartition_t * Bank_t::openPartition (ID_t id)
00677 {
00678   BankPartition_t * partition = partitions_m [id];
00679 
00680   //-- If already open, return it
00681   if ( partition->fix.is_open() )
00682     return partition;
00683 
00684   try {
00685     //-- Open the FIX and VAR partition files
00686     ios::openmode mode = ios::binary | ios::ate | ios::in;
00687     if ( (mode_m & B_WRITE) )
00688       mode |= ios::out;
00689 
00690     partition->fix.open (partition->fix_name.c_str(), mode);
00691     if ( ! partition->fix.is_open() )
00692       AMOS_THROW_IO ("Could not open bank partition, " + partition->fix_name);
00693     partition->var.open (partition->var_name.c_str(), mode);
00694     if ( ! partition->var.is_open() )
00695       AMOS_THROW_IO ("Could not open bank partition, " + partition->var_name);
00696   }
00697   catch (Exception_t) {
00698     partition->fix.close();
00699     partition->var.close();
00700     throw;
00701   }
00702 
00703   //-- Add it to the open queue, making room if necessary
00704   while ( (Size_t)opened_m.size() >= max_partitions_m )
00705     {
00706       opened_m.front()->fix.close();
00707       opened_m.front()->var.close();
00708       opened_m.pop_front();
00709     }
00710   opened_m.push_back (partition);
00711 
00712   return partition;
00713 }
00714 
00715 
00716 //----------------------------------------------------- removeBID --------------
00717 void Bank_t::removeBID (ID_t bid)
00718 {
00719   if ( ! is_open_m  ||  ! (mode_m & B_READ  &&  mode_m & B_WRITE) )
00720     AMOS_THROW_IO ("Cannot remove, bank not open for reading and writing");
00721 
00722   //-- Seek to FIX record and rewrite
00723   BankPartition_t * partition = localizeBID (bid);
00724 
00725   BankFlags_t flags;
00726   bankstreamoff off = bid * fix_size_m + sizeof (bankstreamoff);
00727   partition->fix.seekg (off);
00728   readLE (partition->fix, &flags);
00729   flags.is_removed = true;
00730   partition->fix.seekp (off);
00731   writeLE (partition->fix, &flags);
00732 
00733   if ( partition->fix.fail()  ||  partition->var.fail() )
00734     AMOS_THROW_IO ("Unknown file error in remove, bank corrupted");
00735 
00736   -- nbids_m;
00737 }
00738 
00739 
00740 //----------------------------------------------------- replace ----------------
00741 void Bank_t::replace (ID_t iid, IBankable_t & obj)
00742 {
00743   ID_t bid = lookupBID (iid);
00744   string peid (idmap_m.lookupEID (iid));
00745   idmap_m.remove (iid);
00746 
00747   try {
00748     idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00749   }
00750   catch (Exception_t) {
00751     idmap_m.insert (iid, peid, bid);
00752     throw;
00753   }
00754 
00755   try {
00756     replaceBID (bid, obj);
00757   }
00758   catch (Exception_t) {
00759     idmap_m.remove (obj.iid_m);
00760     idmap_m.insert (iid, peid, bid);
00761     throw;
00762   }
00763 }
00764 
00765 
00766 //----------------------------------------------------- replace ----------------
00767 void Bank_t::replace (const string & eid, IBankable_t & obj)
00768 {
00769   ID_t bid = lookupBID (eid);
00770   ID_t piid = idmap_m.lookupIID (eid);
00771   idmap_m.remove (eid);
00772 
00773   try {
00774     idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00775   }
00776   catch (Exception_t) {
00777     idmap_m.insert (piid, eid, bid);
00778     throw;
00779   }
00780 
00781   try {
00782     replaceBID (bid, obj);
00783   }
00784   catch (Exception_t) {
00785     idmap_m.remove (obj.eid_m);
00786     idmap_m.insert (piid, eid, bid);
00787     throw;
00788   }
00789 }
00790 
00791 
00792 //----------------------------------------------------- replaceBID -------------
00793 void Bank_t::replaceBID (ID_t bid, IBankable_t & obj)
00794 {
00795   if ( ! is_open_m  ||  ! (mode_m & B_READ  &&  mode_m & B_WRITE) )
00796     AMOS_THROW_IO ("Cannot replace, bank not open for reading and writing");
00797   if ( banktype_m != obj.getNCode() )
00798     AMOS_THROW_ARGUMENT ("Cannot replace, incompatible object type");
00799 
00800   //-- Set the modified flag
00801   obj.flags_m.is_removed = false;
00802   obj.flags_m.is_modified = true;
00803 
00804   //-- Seek to and write new record
00805   BankPartition_t * partition = localizeBID (bid);
00806 
00807   bankstreamoff off = bid * fix_size_m;
00808   partition->fix.seekp (off);
00809   partition->var.seekp (0, ios::end);
00810   bankstreamoff vpos = partition->var.tellp();
00811   writeLE (partition->fix, &vpos);
00812   writeLE (partition->fix, &(obj.flags_m));
00813   obj.writeRecord (partition->fix, partition->var);
00814   Size_t vsize = (std::streamoff)partition->var.tellp() - vpos;
00815   writeLE (partition->fix, &vsize);
00816 
00817   if ( partition->fix.fail()  ||  partition->var.fail() )
00818     AMOS_THROW_IO ("Unknown file error in replace, bank corrupted");
00819 }
00820 
00821 
00822 //----------------------------------------------------- syncIFO ----------------
00823 void Bank_t::syncIFO (IFOMode_t mode)
00824 {
00825   //-- B_SPY sneak out
00826   if ( (mode_m & B_SPY)  &&  mode != I_OPEN )
00827     return;
00828 
00829   //-- Generate lock string
00830   ostringstream ss;
00831   char * user = getenv ("USER");
00832   ss << ((mode_m & B_WRITE) ? WRITE_LOCK_CHAR : READ_LOCK_CHAR)
00833      << ' ' << getuid() << ' ' << (user == NULL ? "null" : user);
00834   string lock (ss.str());
00835 
00836   
00837   string line;
00838   NCode_t banktype;
00839   ID_t nbids, last_bid;
00840   Size_t fix_size, npartitions, partition_size;
00841   vector<string> locks;
00842   vector<string>::iterator vi;
00843 
00844   //-- Obtain a lock on the IFO store
00845   lockIFO();
00846 
00847   try {
00848 
00849     //-- Read IFO partition 
00850     if ( mode == I_OPEN  ||  mode == I_CLOSE )
00851       {
00852         string ifo_path (store_pfx_m + IFO_STORE_SUFFIX);
00853         ifstream ifo_stream (ifo_path.c_str());
00854         if ( ! ifo_stream.is_open() )
00855           AMOS_THROW_IO ("Could not open bank partition, " + ifo_path);
00856 
00857         getline (ifo_stream, line, '=');
00858         ifo_stream >> line;                // bank version
00859         if ( line != BANK_VERSION )
00860           AMOS_THROW_IO
00861             ("Could not read bank, expected version: "
00862              + BANK_VERSION + ", saw version: " + line);
00863         getline (ifo_stream, line, '=');
00864         ifo_stream >> banktype;            // bank type
00865         if ( banktype != banktype_m )
00866           AMOS_THROW_IO
00867             ("Could not read bank, incompatible type " + Decode (banktype));
00868 
00869         getline (ifo_stream, line, '=');
00870         ifo_stream >> nbids;               // number of objects
00871         getline (ifo_stream, line, '=');
00872         ifo_stream >> last_bid;            // last index
00873         getline (ifo_stream, line, '=');
00874         ifo_stream >> fix_size;            // index size (in bytes)
00875         getline (ifo_stream, line, '=');
00876         ifo_stream >> npartitions;         // number of partitions
00877         getline (ifo_stream, line, '=');
00878         ifo_stream >> partition_size;      // partition size (in indices)
00879         getline (ifo_stream, line, '=');   // "locks ="
00880         getline (ifo_stream, line);
00881 
00882         if ( ifo_stream.fail() )
00883           AMOS_THROW_IO ("Unknown file read error in sync, bank corrupted");
00884 
00885         //-- Read existing bank locks
00886         bool noskip = (mode == I_OPEN);
00887         getline (ifo_stream, line);
00888         while ( ifo_stream.good() )
00889           {
00890             if ( noskip  ||  line != lock )
00891               locks.push_back (line);        // add bank lock
00892             else
00893               noskip = true;                   // skipped self lock
00894             getline (ifo_stream, line);
00895           }
00896         ifo_stream.close();
00897 
00898         //-- If seeing this for the first time
00899         if ( mode == I_OPEN )
00900           {
00901             nbids_m = nbids;
00902             last_bid_m = last_bid;
00903             fix_size_m = fix_size;
00904             partition_size_m = partition_size;
00905 
00906             //-- Update the partition list
00907             try {
00908               while ( npartitions > npartitions_m )
00909                 addPartition (false);
00910             }
00911             catch (Exception_t) {
00912               for ( Size_t i = 0; i != npartitions_m; ++ i )
00913                 delete (partitions_m [i]);
00914               npartitions_m = 0;
00915               throw;
00916             }
00917           }
00918       }
00919 
00920 
00921     //-- Validate existing locks
00922     char ltype = NULL_CHAR;
00923     vector<string>::iterator vj;
00924     for ( vi = locks.begin(); vi != locks.end(); ++ vi )
00925       {
00926         vj = vi;
00927         ltype = vi->empty() ? NULL_CHAR : (*vi)[0];
00928         switch ( ltype )
00929           {
00930           case READ_LOCK_CHAR:
00931             break;
00932           case WRITE_LOCK_CHAR:
00933             if ( locks.size() == 1 )
00934               break;
00935             // fall-through
00936           default:
00937             AMOS_THROW_IO ("Invalid bank partition lock, bank corrupted");
00938           }
00939       }
00940 
00941 
00942     //-- B_SPY sneak out
00943     if ( mode_m & B_SPY )
00944       {
00945         if ( ltype == WRITE_LOCK_CHAR )
00946           cerr << "WARNING: Disregarding '" << Decode (banktype_m)
00947                << "' bank lock, locked by '" + *vj + "'" << endl;
00948         return;
00949       }
00950 
00951 
00952     //-- Check existing locks
00953     if ( (mode_m & B_READ)   &&  ltype == WRITE_LOCK_CHAR )
00954       AMOS_THROW_IO
00955         ("Could not open bank for reading, locked by '" + *vj + "'");
00956     if ( (mode_m & B_WRITE)  &&  ltype == WRITE_LOCK_CHAR )
00957       AMOS_THROW_IO
00958         ("Could not open bank for writing, locked by '" + *vj + "'");
00959     if ( (mode_m & B_WRITE)  &&  ltype == READ_LOCK_CHAR )
00960       AMOS_THROW_IO
00961         ("Could not open bank for writing, locked by '" + *vj + "'");
00962 
00963 
00964     //-- Add new lock
00965     if ( mode != I_CLOSE )
00966       locks.push_back (lock);
00967 
00968 
00969     //-- Dump memory if writing
00970     if ( (mode_m & B_WRITE) )
00971       {
00972         nbids = nbids_m;
00973         last_bid = last_bid_m;
00974         fix_size = fix_size_m;
00975         npartitions = npartitions_m;
00976         partition_size = partition_size_m;
00977       }
00978 
00979 
00980     //-- Write IFO partition
00981     string ifo_path (store_pfx_m + IFO_STORE_SUFFIX);
00982     ofstream ifo_stream (ifo_path.c_str());
00983     if ( ! ifo_stream.is_open() )
00984       AMOS_THROW_IO ("Could not open bank partition, " + ifo_path);
00985 
00986     ifo_stream
00987       << "____" << Decode (banktype_m) << " BANK INFORMATION____" << endl
00988       << "bank version = "      << BANK_VERSION         << endl
00989       << "bank type = "         << banktype_m           << endl
00990       << "objects = "           << nbids                << endl
00991       << "indices = "           << last_bid             << endl
00992       << "bytes/index = "       << fix_size             << endl
00993       << "partitions = "        << npartitions          << endl
00994       << "indices/partition = " << partition_size       << endl
00995       << "locks = " << endl;
00996 
00997     //-- Write updated locks
00998     for ( vi = locks.begin(); vi != locks.end(); ++ vi )
00999       ifo_stream << *vi << endl;
01000 
01001     if ( ifo_stream.fail() )
01002       AMOS_THROW_IO ("Unknown file write error in sync, bank corrupted");
01003     ifo_stream.close();
01004 
01005   }
01006   catch (Exception_t) {
01007     unlockIFO();
01008     throw;
01009   }
01010 
01011   //-- Release lock on the IFO store
01012   unlockIFO();
01013 }
01014 
01015 
01016 //----------------------------------------------------- touchFile --------------
01017 void Bank_t::touchFile (const string & path, int mode, bool create)
01018 {
01019   int fd, flags;
01020 
01021   if ( (mode_m & B_SPY) )
01022     flags = O_RDONLY;
01023   else // need both read/write for file locks
01024     flags = O_RDWR;
01025 
01026   if ( create )
01027     flags |= O_CREAT | O_TRUNC;
01028 
01029   fd = ::open (path.c_str(), flags, mode);
01030   if ( fd == -1 )
01031     {
01032     if ( create )
01033       AMOS_THROW_IO
01034         ("Could not create bank file, " + path + ", " + strerror (errno));
01035     else
01036       AMOS_THROW_IO
01037         ("Could not open bank file, "   + path + ", " + strerror (errno));
01038     }
01039 
01040   fd = ::close (fd);
01041   if ( fd == -1 )
01042     AMOS_THROW_IO
01043       ("Could not close bank file, "    + path + ", " + strerror (errno));
01044 }
01045 
01046 
01047 //----------------------------------------------------- unlockIFO --------------
01048 void Bank_t::unlockIFO()
01049 {
01050   if ( (mode_m & B_SPY) ) return;
01051 
01052   //-- Attempt to release the lock
01053   string lck_path (store_pfx_m + LCK_STORE_SUFFIX);
01054   if ( unlink (lck_path.c_str()) )
01055     AMOS_THROW_IO
01056       ((string)"Failed to release bank IFO file lock, " + strerror (errno));
01057 }
01058 
01059 
01060 //--------------------------------------------------- BankExists ---------------
01061 bool AMOS::BankExists (NCode_t ncode, const string & dir)
01062 {
01063   //-- Return false if insufficient permissions or absent IFO partition
01064   string ifo_path (dir + '/' + Decode (ncode) + Bank_t::IFO_STORE_SUFFIX);
01065   return ( ! access (dir.c_str(), R_OK | X_OK)
01066            &&
01067            ! access (ifo_path.c_str(), R_OK) );
01068 }
01069 
01070 
01071 //--------------------------------------------------- PrintBankVersion ---------
01072 void AMOS::PrintBankVersion (const char * s)
01073 {
01074   cerr << s << " compiled for bank version " << Bank_t::BANK_VERSION << endl;
01075 }
01076 
01077 
01078 
01079 //================================================ BankPartition_t =============
01080 //----------------------------------------------------- BankPartition_t --------
01081 Bank_t::BankPartition_t::BankPartition_t (Size_t buffer_size)
01082 {
01083   fix_buff = (char *) SafeMalloc (buffer_size);
01084   var_buff = (char *) SafeMalloc (buffer_size);
01085 
01086   fix.rdbuf()->_PUBSETBUF_ (fix_buff, buffer_size);
01087   var.rdbuf()->_PUBSETBUF_ (var_buff, buffer_size);
01088 }
01089 
01090 
01091 //----------------------------------------------------- ~BankPartition_t -------
01092 Bank_t::BankPartition_t::~BankPartition_t()
01093 {
01094   fix.close();
01095   var.close();
01096 
01097   free (fix_buff);
01098   free (var_buff);
01099 }

Generated on Mon Feb 22 17:36:27 2010 for libAMOS by  doxygen 1.4.7