00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "BankStream_AMOS.hh"
00011 using namespace AMOS;
00012 using namespace std;
00013
00014
00015
00016
00017
00018 const Size_t BankStream_t::DEFAULT_BUFFER_SIZE = 1024;
00019 const Size_t BankStream_t::MAX_OPEN_PARTITIONS = 2;
00020
00021
00022
00023 void BankStream_t::assignEID (ID_t iid, const string & eid)
00024 {
00025 ID_t bid = lookupBID (iid);
00026 string peid (idmap_m.lookupEID (iid));
00027 idmap_m.remove (iid);
00028
00029 try {
00030 triples_m [bid] = idmap_m.insert (iid, eid, bid);
00031 }
00032 catch (Exception_t) {
00033 triples_m [bid] = idmap_m.insert (iid, peid, bid);
00034 throw;
00035 }
00036 }
00037
00038
00039
00040 void BankStream_t::assignIID (const string & eid, ID_t iid)
00041 {
00042 ID_t bid = lookupBID (eid);
00043 ID_t piid = idmap_m.lookupIID (eid);
00044 idmap_m.remove (eid);
00045
00046 try {
00047 triples_m [bid] = idmap_m.insert (iid, eid, bid);
00048 }
00049 catch (Exception_t) {
00050 triples_m [bid] = idmap_m.insert (piid, eid, bid);
00051 throw;
00052 }
00053 }
00054
00055
00056
00057 void BankStream_t::clean()
00058 {
00059 Bank_t::clean();
00060
00061 init();
00062
00063 const IDMap_t::HashTriple_t * tp = NULL;
00064 triples_m.resize (last_bid_m + 1, tp);
00065 for ( IDMap_t::const_iterator idmi = getIDMap().begin();
00066 idmi != getIDMap().end(); ++ idmi )
00067 triples_m [idmi->bid] = idmi;
00068 }
00069
00070
00071
00072 void BankStream_t::concat (BankStream_t & s)
00073 {
00074 Bank_t::concat (s);
00075
00076 eof_m = !inrange();
00077
00078 const IDMap_t::HashTriple_t * tp = NULL;
00079 triples_m.resize (last_bid_m + 1, tp);
00080 for ( IDMap_t::const_iterator idmi = getIDMap().begin();
00081 idmi != getIDMap().end(); ++ idmi )
00082 triples_m [idmi->bid] = idmi;
00083
00084 oldPartition_m = NULL;
00085 }
00086
00087
00088
00089 BankStream_t & BankStream_t::ignore (bankstreamoff n)
00090 {
00091 if ( ! is_open_m || ! (mode_m & B_READ) )
00092 AMOS_THROW_IO ("Cannot ignore: bank not open for reading");
00093
00094 ID_t lid;
00095 BankFlags_t bf;
00096 bankstreamoff off;
00097 BankPartition_t * partition;
00098 Size_t skip = fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t);
00099
00100 while ( n > 0 && inrange() )
00101 {
00102 lid = curr_bid_m;
00103 partition = localizeBID (lid);
00104 off = lid * fix_size_m;
00105
00106 partition->fix.seekg (off);
00107
00108 partition->fix.ignore (sizeof (bankstreamoff));
00109 readLE (partition->fix, &bf);
00110 partition->fix.ignore (skip);
00111
00112 if ( ! bf.is_removed )
00113 -- n;
00114 ++ curr_bid_m;
00115 }
00116
00117 oldPartition_m = NULL;
00118
00119 eof_m = !inrange();
00120 return *this;
00121 }
00122
00123
00124
00125 void BankStream_t::open (const std::string & dir, BankMode_t mode)
00126 {
00127 Bank_t::open (dir, mode);
00128
00129 init();
00130
00131 const IDMap_t::HashTriple_t * tp = NULL;
00132 triples_m.resize (last_bid_m + 1, tp);
00133 for ( IDMap_t::const_iterator idmi = getIDMap().begin();
00134 idmi != getIDMap().end(); ++ idmi )
00135 triples_m [idmi->bid] = idmi;
00136 }
00137
00138
00139
00140 BankStream_t & BankStream_t::operator>> (IBankable_t & obj)
00141 {
00142 if ( ! is_open_m || ! (mode_m & B_READ) )
00143 AMOS_THROW_IO ("Cannot stream fetch: bank not open for reading");
00144 if ( banktype_m != obj.getNCode() )
00145 AMOS_THROW_ARGUMENT ("Cannot stream fetch: incompatible object type");
00146 if ( eof() )
00147 AMOS_THROW_ARGUMENT ("Cannot stream fetch: beyond end of stream");
00148
00149 ID_t lid;
00150 BankFlags_t flags;
00151 bankstreamoff off;
00152 bankstreamoff vpos;
00153 BankPartition_t * partition;
00154 Size_t skip = fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t);
00155
00156
00157 flags.is_removed = true;
00158 while ( flags.is_removed )
00159 {
00160 if ( !inrange() )
00161 {
00162 eof_m = true;
00163 return *this;
00164 }
00165
00166 lid = curr_bid_m;
00167 partition = localizeBID (lid);
00168
00169 if (partition != oldPartition_m)
00170 {
00171 off = lid * fix_size_m;
00172 partition->fix.seekg (off);
00173 oldPartition_m = partition;
00174 }
00175
00176 readLE (partition->fix, &vpos);
00177 readLE (partition->fix, &flags);
00178 if ( flags.is_removed )
00179 partition->fix.ignore (skip);
00180
00181 ++ curr_bid_m;
00182 }
00183
00184
00185 const IDMap_t::HashTriple_t * trip = triples_m [curr_bid_m - 1];
00186 if ( trip == NULL )
00187 {
00188 obj.iid_m = NULL_ID;
00189 obj.eid_m.erase();
00190 }
00191 else
00192 {
00193 obj.iid_m = trip->iid;
00194 obj.eid_m.assign (trip->eid);
00195 }
00196
00197 obj.flags_m = flags;
00198
00199 if (fixed_store_only_m)
00200 {
00201 obj.readRecordFix (partition->fix);
00202 }
00203 else
00204 {
00205 partition->var.seekg (vpos);
00206 obj.readRecord (partition->fix, partition->var);
00207
00208 if ( partition->var.fail() )
00209 AMOS_THROW_IO ("Unknown file read error in variable stream fetch, bank corrupted");
00210 }
00211
00212 partition->fix.ignore (sizeof (Size_t));
00213
00214 if ( partition->fix.fail() )
00215 AMOS_THROW_IO ("Unknown file read error in fixed stream fetch, bank corrupted");
00216
00217 return *this;
00218 }
00219
00220
00221
00222 BankStream_t & BankStream_t::operator<< (IBankable_t & obj)
00223 {
00224 oldPartition_m = NULL;
00225
00226 if ( ! is_open_m || ! (mode_m & B_WRITE) )
00227 AMOS_THROW_IO ("Cannot stream append: bank not open for writing");
00228 if ( banktype_m != obj.getNCode() )
00229 AMOS_THROW_ARGUMENT ("Cannot stream append: incompatible object type");
00230
00231
00232 triples_m.push_back
00233 (idmap_m.insert (obj.iid_m, obj.eid_m, last_bid_m + 1));
00234
00235 try {
00236
00237 if ( last_bid_m == max_bid_m )
00238 addPartition (true);
00239
00240 BankPartition_t * partition = getLastPartition();
00241
00242
00243 obj.flags_m.is_removed = false;
00244 obj.flags_m.is_modified = false;
00245
00246 if ( !ate_m )
00247 {
00248 partition->fix.seekp (0, ios::end);
00249 partition->var.seekp (0, ios::end);
00250 ate_m = true;
00251 }
00252
00253
00254
00255
00256 bankstreamoff fpos = (std::streamoff)partition->fix.tellp();
00257 bankstreamoff vpos = (std::streamoff)partition->var.tellp();
00258 writeLE (partition->fix, &vpos);
00259 writeLE (partition->fix, &(obj.flags_m));
00260 obj.writeRecord (partition->fix, partition->var);
00261 Size_t vsize = (std::streamoff)partition->var.tellp() - vpos;
00262 writeLE (partition->fix, &vsize);
00263
00264
00265 Size_t fsize = (std::streamoff)partition->fix.tellp() - fpos;
00266 if ( fix_size_m == 0 )
00267 fix_size_m = fsize;
00268
00269 if ( fix_size_m != fsize ||
00270 partition->fix.fail() ||
00271 partition->var.fail() )
00272 AMOS_THROW_IO
00273 ("Unknown file write error in stream append, bank corrupted");
00274
00275 ++ nbids_m;
00276 ++ last_bid_m;
00277 }
00278 catch (Exception_t) {
00279 triples_m.pop_back();
00280 idmap_m.remove (obj.iid_m);
00281 idmap_m.remove (obj.eid_m);
00282 throw;
00283 }
00284
00285 return *this;
00286 }
00287
00288
00289
00290 void BankStream_t::replace (ID_t iid, IBankable_t & obj)
00291 {
00292 ate_m = false;
00293 oldPartition_m = NULL;
00294
00295 ID_t bid = lookupBID (iid);
00296 string peid (idmap_m.lookupEID (iid));
00297 idmap_m.remove (iid);
00298
00299 try {
00300 triples_m [bid] = idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00301 }
00302 catch (Exception_t) {
00303 triples_m [bid] = idmap_m.insert (iid, peid, bid);
00304 throw;
00305 }
00306
00307 try {
00308 replaceBID (bid, obj);
00309 }
00310 catch (Exception_t) {
00311 idmap_m.remove (obj.iid_m);
00312 triples_m [bid] = idmap_m.insert (iid, peid, bid);
00313 throw;
00314 }
00315 }
00316
00317
00318
00319 void BankStream_t::replace (const string & eid, IBankable_t & obj)
00320 {
00321 ate_m = false;
00322 oldPartition_m = NULL;
00323
00324 ID_t bid = lookupBID (eid);
00325 ID_t piid = idmap_m.lookupIID (eid);
00326 idmap_m.remove (eid);
00327
00328 try {
00329 triples_m [bid] = idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00330 }
00331 catch (Exception_t) {
00332 triples_m [bid] = idmap_m.insert (piid, eid, bid);
00333 throw;
00334 }
00335
00336 try {
00337 replaceBID (bid, obj);
00338 }
00339 catch (Exception_t) {
00340 idmap_m.remove (obj.eid_m);
00341 triples_m [bid] = idmap_m.insert (piid, eid, bid);
00342 throw;
00343 }
00344 }
00345
00346
00347
00348 void BankStream_t::replaceByBID(ID_t bid, IBankable_t & obj)
00349 {
00350 if (bid < 0 || bid > last_bid_m)
00351 {
00352 AMOS_THROW_IO ("Cannot replaceByBID: outside valid bid range");
00353 }
00354
00355 oldPartition_m = NULL;
00356
00357 if (triples_m[bid])
00358 {
00359 if (triples_m[bid]->iid)
00360 {
00361 replace(triples_m[bid]->iid, obj);
00362 }
00363 else if (!triples_m[bid]->eid.empty())
00364 {
00365 replace(triples_m[bid]->eid, obj);
00366 }
00367 else
00368 {
00369 cerr << "WTF???" << endl;
00370 }
00371 }
00372 else
00373 {
00374 ate_m = false;
00375 replaceBID(bid, obj);
00376 triples_m[bid] = idmap_m.insert(obj.iid_m, obj.eid_m, bid);
00377 }
00378 }
00379