BankStream_AMOS.cc

Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 #include "BankStream_AMOS.hh"
00011 using namespace AMOS;
00012 using namespace std;
00013 
00014 
00015 
00016 
00017 //================================================ BankStream_t ================
00018 const Size_t BankStream_t::DEFAULT_BUFFER_SIZE = 1024;
00019 const Size_t BankStream_t::MAX_OPEN_PARTITIONS = 2;
00020 
00021 
00022 //----------------------------------------------------- assignEID --------------
00023 void BankStream_t::assignEID (ID_t iid, const string & eid)
00024 {
00025   ID_t bid = lookupBID (iid);
00026   string peid (idmap_m.lookupEID (iid));
00027   idmap_m.remove (iid);
00028 
00029   try {
00030     triples_m [bid] = idmap_m.insert (iid, eid, bid);
00031   }
00032   catch (Exception_t) {
00033     triples_m [bid] = idmap_m.insert (iid, peid, bid);
00034     throw;
00035   }
00036 }
00037 
00038 
00039 //----------------------------------------------------- assignIID --------------
00040 void BankStream_t::assignIID (const string & eid, ID_t iid)
00041 {
00042   ID_t bid = lookupBID (eid);
00043   ID_t piid = idmap_m.lookupIID (eid);
00044   idmap_m.remove (eid);
00045 
00046   try {
00047     triples_m [bid] = idmap_m.insert (iid, eid, bid);
00048   }
00049   catch (Exception_t) {
00050     triples_m [bid] = idmap_m.insert (piid, eid, bid);
00051     throw;
00052   }
00053 }
00054 
00055 
00056 //----------------------------------------------------- clean ------------------
00057 void BankStream_t::clean()
00058 {
00059   Bank_t::clean();
00060 
00061   init();
00062 
00063   const IDMap_t::HashTriple_t * tp = NULL;
00064   triples_m.resize (last_bid_m + 1, tp);
00065   for ( IDMap_t::const_iterator idmi = getIDMap().begin();
00066         idmi != getIDMap().end(); ++ idmi )
00067     triples_m [idmi->bid] = idmi;
00068 }
00069 
00070 
00071 //----------------------------------------------------- concat -----------------
00072 void BankStream_t::concat (BankStream_t & s)
00073 {
00074   Bank_t::concat (s);
00075 
00076   eof_m = !inrange();
00077 
00078   const IDMap_t::HashTriple_t * tp = NULL;
00079   triples_m.resize (last_bid_m + 1, tp);
00080   for ( IDMap_t::const_iterator idmi = getIDMap().begin();
00081         idmi != getIDMap().end(); ++ idmi )
00082     triples_m [idmi->bid] = idmi;
00083 
00084   oldPartition_m = NULL;
00085 }
00086 
00087 
00088 //----------------------------------------------------- ignore -----------------
00089 BankStream_t & BankStream_t::ignore (bankstreamoff n)
00090 {
00091   if ( ! is_open_m  ||  ! (mode_m & B_READ) )
00092     AMOS_THROW_IO ("Cannot ignore: bank not open for reading");
00093 
00094   ID_t lid;
00095   BankFlags_t bf;
00096   bankstreamoff off;
00097   BankPartition_t * partition;
00098   Size_t skip = fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t);
00099 
00100   while ( n > 0  &&  inrange() )
00101     {
00102       lid = curr_bid_m;
00103       partition = localizeBID (lid);
00104       off = lid * fix_size_m;
00105 
00106       partition->fix.seekg (off);
00107  
00108       partition->fix.ignore (sizeof (bankstreamoff));
00109       readLE (partition->fix, &bf);
00110       partition->fix.ignore (skip);
00111 
00112       if ( ! bf.is_removed )
00113         -- n;
00114       ++ curr_bid_m;
00115     }
00116 
00117   oldPartition_m = NULL;
00118 
00119   eof_m = !inrange();
00120   return *this;
00121 }
00122 
00123 
00124 //----------------------------------------------------- open -------------------
00125 void BankStream_t::open (const std::string & dir, BankMode_t mode)
00126 {
00127   Bank_t::open (dir, mode);
00128 
00129   init();
00130   
00131   const IDMap_t::HashTriple_t * tp = NULL;
00132   triples_m.resize (last_bid_m + 1, tp);
00133   for ( IDMap_t::const_iterator idmi = getIDMap().begin();
00134         idmi != getIDMap().end(); ++ idmi )
00135     triples_m [idmi->bid] = idmi;
00136 }
00137 
00138 
00139 //----------------------------------------------------- operator>> -------------
00140 BankStream_t & BankStream_t::operator>> (IBankable_t & obj)
00141 {
00142   if ( ! is_open_m  ||  ! (mode_m & B_READ) )
00143     AMOS_THROW_IO ("Cannot stream fetch: bank not open for reading");
00144   if ( banktype_m != obj.getNCode() )
00145     AMOS_THROW_ARGUMENT ("Cannot stream fetch: incompatible object type");
00146   if ( eof() )
00147     AMOS_THROW_ARGUMENT ("Cannot stream fetch: beyond end of stream");
00148 
00149   ID_t lid;
00150   BankFlags_t flags;
00151   bankstreamoff off;
00152   bankstreamoff vpos;
00153   BankPartition_t * partition;
00154   Size_t skip = fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t);
00155 
00156   //-- Seek to the record and read the data
00157   flags.is_removed = true;
00158   while ( flags.is_removed )
00159     {
00160       if ( !inrange() )
00161         {
00162           eof_m = true;
00163           return *this;
00164         }
00165 
00166       lid = curr_bid_m;
00167       partition = localizeBID (lid);
00168 
00169       if (partition != oldPartition_m)
00170       {
00171         off = lid * fix_size_m;
00172         partition->fix.seekg (off);
00173         oldPartition_m = partition;
00174       }
00175 
00176       readLE (partition->fix, &vpos);
00177       readLE (partition->fix, &flags);
00178       if ( flags.is_removed )
00179         partition->fix.ignore (skip);
00180 
00181       ++ curr_bid_m;
00182     }
00183 
00184 
00185   const IDMap_t::HashTriple_t * trip = triples_m [curr_bid_m - 1];
00186   if ( trip == NULL )
00187     {
00188       obj.iid_m = NULL_ID;
00189       obj.eid_m.erase();
00190     }
00191   else
00192     {
00193       obj.iid_m = trip->iid;
00194       obj.eid_m.assign (trip->eid);
00195     }
00196 
00197   obj.flags_m = flags;
00198 
00199   if (fixed_store_only_m)
00200   {
00201     obj.readRecordFix (partition->fix);
00202   }
00203   else
00204   {
00205     partition->var.seekg (vpos);
00206     obj.readRecord (partition->fix, partition->var);
00207 
00208     if ( partition->var.fail() )
00209       AMOS_THROW_IO ("Unknown file read error in variable stream fetch, bank corrupted");
00210   }
00211 
00212   partition->fix.ignore (sizeof (Size_t));
00213 
00214   if ( partition->fix.fail() )
00215     AMOS_THROW_IO ("Unknown file read error in fixed stream fetch, bank corrupted");
00216 
00217   return *this;
00218 }
00219 
00220 
00221 //--------------------------------------------------- operator<< -------------
00222 BankStream_t & BankStream_t::operator<< (IBankable_t & obj)
00223 {
00224   oldPartition_m = NULL;
00225 
00226   if ( ! is_open_m  ||  ! (mode_m & B_WRITE) )
00227     AMOS_THROW_IO ("Cannot stream append: bank not open for writing");
00228   if ( banktype_m != obj.getNCode() )
00229     AMOS_THROW_ARGUMENT ("Cannot stream append: incompatible object type");
00230 
00231   //-- Insert the ID triple into the map (may throw exception)
00232   triples_m.push_back
00233     (idmap_m.insert (obj.iid_m, obj.eid_m, last_bid_m + 1));
00234 
00235   try {
00236     //-- Add another partition if necessary
00237     if ( last_bid_m == max_bid_m )
00238       addPartition (true);
00239 
00240     BankPartition_t * partition = getLastPartition();
00241 
00242     //-- Prepare the object for append
00243     obj.flags_m.is_removed  = false;
00244     obj.flags_m.is_modified = false;
00245 
00246     if ( !ate_m )
00247       {
00248         partition->fix.seekp (0, ios::end);
00249         partition->var.seekp (0, ios::end);
00250         ate_m = true;
00251       }
00252 
00253     //-- data is written in the following order to the FIX and VAR streams
00254     //   FIX = [VAR streampos] [BankableFlags] [OBJECT FIX] [VAR size]
00255     //   VAR = [OBJECT VAR]
00256     bankstreamoff fpos = (std::streamoff)partition->fix.tellp();
00257     bankstreamoff vpos = (std::streamoff)partition->var.tellp();
00258     writeLE (partition->fix, &vpos);
00259     writeLE (partition->fix, &(obj.flags_m));
00260     obj.writeRecord (partition->fix, partition->var);
00261     Size_t vsize = (std::streamoff)partition->var.tellp() - vpos;
00262     writeLE (partition->fix, &vsize);
00263 
00264     //-- If fix_size is not yet known, calculate it
00265     Size_t fsize = (std::streamoff)partition->fix.tellp() - fpos;
00266     if ( fix_size_m == 0 )
00267       fix_size_m = fsize;
00268 
00269     if ( fix_size_m != fsize  ||
00270          partition->fix.fail()  ||
00271          partition->var.fail() )
00272       AMOS_THROW_IO
00273         ("Unknown file write error in stream append, bank corrupted");
00274 
00275     ++ nbids_m;
00276     ++ last_bid_m;
00277   }
00278   catch (Exception_t) {
00279     triples_m.pop_back();
00280     idmap_m.remove (obj.iid_m);
00281     idmap_m.remove (obj.eid_m);
00282     throw;
00283   }
00284 
00285   return *this;
00286 }
00287 
00288 
00289 //----------------------------------------------------- replace ----------------
00290 void BankStream_t::replace (ID_t iid, IBankable_t & obj)
00291 {
00292   ate_m = false;
00293   oldPartition_m = NULL;
00294 
00295   ID_t bid = lookupBID (iid);
00296   string peid (idmap_m.lookupEID (iid));
00297   idmap_m.remove (iid);
00298 
00299   try {
00300     triples_m [bid] = idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00301   }
00302   catch (Exception_t) {
00303     triples_m [bid] = idmap_m.insert (iid, peid, bid);
00304     throw;
00305   }
00306 
00307   try {
00308     replaceBID (bid, obj);
00309   }
00310   catch (Exception_t) {
00311     idmap_m.remove (obj.iid_m);
00312     triples_m [bid] = idmap_m.insert (iid, peid, bid);
00313     throw;
00314   }
00315 }
00316 
00317 
00318 //----------------------------------------------------- replace ----------------
00319 void BankStream_t::replace (const string & eid, IBankable_t & obj)
00320 {
00321   ate_m = false;
00322   oldPartition_m = NULL;
00323 
00324   ID_t bid = lookupBID (eid);
00325   ID_t piid = idmap_m.lookupIID (eid);
00326   idmap_m.remove (eid);
00327 
00328   try {
00329     triples_m [bid] = idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00330   }
00331   catch (Exception_t) {
00332     triples_m [bid] = idmap_m.insert (piid, eid, bid);
00333     throw;
00334   }
00335 
00336   try {
00337     replaceBID (bid, obj);
00338   }
00339   catch (Exception_t) {
00340     idmap_m.remove (obj.eid_m);
00341     triples_m [bid] = idmap_m.insert (piid, eid, bid);
00342     throw;
00343   }
00344 }
00345 
00346 
00347 //--------------------------------------------------- replaceByBID -----------
00348 void BankStream_t::replaceByBID(ID_t bid, IBankable_t & obj)
00349 {
00350   if (bid < 0 || bid > last_bid_m)
00351   {
00352     AMOS_THROW_IO ("Cannot replaceByBID: outside valid bid range");
00353   }
00354 
00355   oldPartition_m = NULL;
00356 
00357   if (triples_m[bid])
00358   {
00359     if (triples_m[bid]->iid) 
00360     { 
00361       replace(triples_m[bid]->iid, obj);
00362     }
00363     else if (!triples_m[bid]->eid.empty()) 
00364     { 
00365       replace(triples_m[bid]->eid, obj);
00366     }
00367     else
00368     {
00369       cerr << "WTF???" << endl;
00370     }
00371   }
00372   else
00373   {
00374     ate_m = false;
00375     replaceBID(bid, obj);
00376     triples_m[bid] = idmap_m.insert(obj.iid_m, obj.eid_m, bid);
00377   }
00378 }
00379 

Generated on Mon Feb 22 17:36:27 2010 for libAMOS by  doxygen 1.4.7