00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "Bank_AMOS.hh"
00011 #include "Message_AMOS.hh"
00012 #include <sstream>
00013 #include <sys/types.h>
00014 #include <sys/stat.h>
00015 #include <fcntl.h>
00016 #include <unistd.h>
00017 #include <cstdlib>
00018 #include <cstdio>
00019 #include <ctime>
00020 #include <cstring>
00021 #include <sstream>
00022 #include <iostream>
00023 using namespace AMOS;
00024 using namespace std;
00025
00026 #define _PUBSETBUF_ pubsetbuf
00027 #ifdef __GNUC__
00028 # if __GNUC__ < 3
00029 # undef _PUBSETBUF_
00030 # define _PUBSETBUF_ setbuf
00031 # endif
00032 #endif
00033
00034 #define LOCK_TIME 5
00035 #define DIR_MODE 00755
00036 #define FILE_MODE 00644
00037
00038
00039
00040
00041
00042 const Size_t Bank_t::DEFAULT_BUFFER_SIZE = 1024;
00043 const Size_t Bank_t::DEFAULT_PARTITION_SIZE = 1000000;
00044 const Size_t Bank_t::MAX_OPEN_PARTITIONS = 20;
00045
00046 const string Bank_t::BANK_VERSION = "2.8";
00047
00048 const string Bank_t::FIX_STORE_SUFFIX = ".fix";
00049 const string Bank_t::IFO_STORE_SUFFIX = ".ifo";
00050 const string Bank_t::LCK_STORE_SUFFIX = ".lck";
00051 const string Bank_t::VAR_STORE_SUFFIX = ".var";
00052 const string Bank_t::MAP_STORE_SUFFIX = ".map";
00053 const string Bank_t::TMP_STORE_SUFFIX = ".tmp";
00054
00055 const char Bank_t::WRITE_LOCK_CHAR = 'w';
00056 const char Bank_t::READ_LOCK_CHAR = 'r';
00057
00058
00059
00060 void Bank_t::addPartition (bool create)
00061 {
00062
00063 BankPartition_t * partition = new BankPartition_t (buffer_size_m);
00064 partitions_m.push_back (partition);
00065
00066 try {
00067 ostringstream ss;
00068
00069 ss << store_pfx_m << '.' << npartitions_m << FIX_STORE_SUFFIX;
00070 partition->fix_name = ss.str();
00071 ss.str (NULL_STRING);
00072
00073 ss << store_pfx_m << '.' << npartitions_m << VAR_STORE_SUFFIX;
00074 partition->var_name = ss.str();
00075 ss.str (NULL_STRING);
00076
00077
00078 touchFile (partition->fix_name, FILE_MODE, create);
00079 touchFile (partition->var_name, FILE_MODE, create);
00080 }
00081 catch (Exception_t) {
00082 partitions_m.pop_back();
00083 delete partition;
00084 throw;
00085 }
00086
00087
00088 if ( partition_size_m == 0 )
00089 partition_size_m = DEFAULT_PARTITION_SIZE;
00090
00091
00092 max_bid_m = ++ npartitions_m * partition_size_m;
00093 }
00094
00095
00096
00097 void Bank_t::append (IBankable_t & obj)
00098 {
00099
00100 idmap_m.insert (obj.iid_m, obj.eid_m, last_bid_m + 1);
00101
00102 try {
00103 appendBID (obj);
00104 }
00105 catch (Exception_t) {
00106 idmap_m.remove (obj.iid_m);
00107 idmap_m.remove (obj.eid_m);
00108 throw;
00109 }
00110 }
00111
00112
00113
00114 void Bank_t::appendBID (IBankable_t & obj)
00115 {
00116 if ( ! is_open_m || ! (mode_m & B_WRITE) )
00117 AMOS_THROW_IO ("Cannot append, bank not open for writing");
00118 if ( banktype_m != obj.getNCode() )
00119 AMOS_THROW_ARGUMENT ("Cannot append, incompatible object type");
00120
00121
00122 if ( last_bid_m == max_bid_m )
00123 addPartition (true);
00124
00125 BankPartition_t * partition = getLastPartition();
00126
00127
00128 obj.flags_m.is_removed = false;
00129 obj.flags_m.is_modified = false;
00130
00131
00132
00133
00134 partition->fix.seekp (0, ios::end);
00135 partition->var.seekp (0, ios::end);
00136 bankstreamoff fpos = partition->fix.tellp();
00137 bankstreamoff vpos = partition->var.tellp();
00138 writeLE (partition->fix, &vpos);
00139 writeLE (partition->fix, &(obj.flags_m));
00140 obj.writeRecord (partition->fix, partition->var);
00141 Size_t vsize = (std::streamoff)partition->var.tellp() - vpos;
00142 writeLE (partition->fix, &vsize);
00143
00144
00145 Size_t fsize = (std::streamoff)partition->fix.tellp() - fpos;
00146 if ( fix_size_m == 0 )
00147 fix_size_m = fsize;
00148
00149 if ( fix_size_m != fsize ||
00150 partition->fix.fail() ||
00151 partition->var.fail() )
00152 AMOS_THROW_IO ("Unknown file write error in append, bank corrupted");
00153
00154 ++ nbids_m;
00155 ++ last_bid_m;
00156 }
00157
00158
00159
00160 void Bank_t::assignEID (ID_t iid, const string & eid)
00161 {
00162 ID_t bid = lookupBID (iid);
00163 string peid (idmap_m.lookupEID (iid));
00164 idmap_m.remove (iid);
00165
00166 try {
00167 idmap_m.insert (iid, eid, bid);
00168 }
00169 catch (Exception_t) {
00170 idmap_m.insert (iid, peid, bid);
00171 throw;
00172 }
00173 }
00174
00175
00176
00177 void Bank_t::assignIID (const string & eid, ID_t iid)
00178 {
00179 ID_t bid = lookupBID (eid);
00180 ID_t piid = idmap_m.lookupIID (eid);
00181 idmap_m.remove (eid);
00182
00183 try {
00184 idmap_m.insert (iid, eid, bid);
00185 }
00186 catch (Exception_t) {
00187 idmap_m.insert (piid, eid, bid);
00188 throw;
00189 }
00190 }
00191
00192
00193
00194 void Bank_t::clean()
00195 {
00196 if ( ! is_open_m || ! (mode_m & B_READ && mode_m & B_WRITE) )
00197 AMOS_THROW_IO ("Cannot clean, bank not open for reading and writing");
00198
00199
00200 Bank_t tmpbnk (banktype_m);
00201
00202 try {
00203
00204 string tname = store_pfx_m + TMP_STORE_SUFFIX;
00205 mkdir (tname.c_str(), DIR_MODE);
00206 tmpbnk.create (tname);
00207 tmpbnk.concat (*this);
00208
00209
00210 clear();
00211 fix_size_m = tmpbnk.fix_size_m;
00212 partition_size_m = tmpbnk.partition_size_m;
00213 last_bid_m = tmpbnk.last_bid_m;
00214 nbids_m = tmpbnk.nbids_m;
00215 idmap_m = tmpbnk.idmap_m;
00216
00217
00218 for ( Size_t i = 0; i != tmpbnk.npartitions_m; ++ i )
00219 {
00220 while ( i >= npartitions_m )
00221 addPartition (true);
00222
00223 unlink (partitions_m [i]->fix_name.c_str());
00224 unlink (partitions_m [i]->var_name.c_str());
00225 if ( link (tmpbnk.partitions_m [i]->fix_name.c_str(),
00226 partitions_m [i]->fix_name.c_str()) ||
00227 link (tmpbnk.partitions_m [i]->var_name.c_str(),
00228 partitions_m [i]->var_name.c_str()) )
00229 AMOS_THROW_IO ("Unknown file link error in clean, bank corrupted");
00230 }
00231 }
00232 catch (Exception_t) {
00233 if ( tmpbnk.isOpen() )
00234 tmpbnk.destroy();
00235 throw;
00236 }
00237
00238
00239 tmpbnk.destroy();
00240 }
00241
00242
00243
00244 void Bank_t::clear()
00245 {
00246 if ( ! is_open_m ) return;
00247 if ( ! (mode_m & B_WRITE) )
00248 AMOS_THROW_IO ("Cannot clear, bank not open for writing");
00249
00250
00251 for ( Size_t i = 0; i != npartitions_m; ++ i )
00252 {
00253 partitions_m [i]->fix.close();
00254 partitions_m [i]->var.close();
00255 unlink (partitions_m [i]->fix_name.c_str());
00256 unlink (partitions_m [i]->var_name.c_str());
00257 delete (partitions_m [i]);
00258 }
00259
00260 last_bid_m = NULL_ID;
00261 max_bid_m = NULL_ID;
00262 nbids_m = NULL_ID;
00263 npartitions_m = 0;
00264 partition_size_m = 0;
00265 opened_m .clear();
00266 partitions_m.clear();
00267 idmap_m .clear();
00268 idmap_m .setType (banktype_m);
00269 }
00270
00271
00272
00273 void Bank_t::close()
00274 {
00275 if ( ! is_open_m ) return;
00276
00277 if ( (mode_m & B_WRITE) )
00278 {
00279
00280 string map_path (store_pfx_m + MAP_STORE_SUFFIX);
00281 ofstream map_stream (map_path.c_str());
00282 if ( ! map_stream.is_open() )
00283 AMOS_THROW_IO ("Could not open bank partition, " + map_path);
00284
00285 idmap_m.write (map_stream);
00286
00287 if ( map_stream.fail() )
00288 AMOS_THROW_IO ("Unknown file write error in close, bank corrupted");
00289 map_stream.close();
00290 }
00291
00292
00293 for ( Size_t i = 0; i < npartitions_m; i ++ )
00294 delete (partitions_m [i]);
00295
00296
00297 syncIFO (I_CLOSE);
00298
00299
00300 init();
00301 }
00302
00303
00304
00305 void Bank_t::concat (Bank_t & s)
00306 {
00307 if ( ! is_open_m || ! (mode_m & B_WRITE) )
00308 AMOS_THROW_IO ("Cannot concat, bank not open for writing");
00309 if ( ! s.is_open_m || ! (s.mode_m & B_READ) )
00310 AMOS_THROW_IO ("Cannot concat, source bank not open for reading");
00311 if ( banktype_m != s.banktype_m )
00312 AMOS_THROW_ARGUMENT ("Cannot concat, incompatible bank type");
00313
00314 Size_t size;
00315 Size_t tail = s.fix_size_m - sizeof (bankstreamoff) - sizeof (BankFlags_t);
00316 BankFlags_t flags;
00317
00318 Size_t buffer_size = s.fix_size_m;
00319 char * buffer = (char *) SafeMalloc (buffer_size);
00320
00321 bankstreamoff vpos;
00322 BankPartition_t * sp;
00323 BankPartition_t * tp = getLastPartition();
00324
00325
00326 const IDMap_t::HashTriple_t * stp = NULL;
00327 vector<const IDMap_t::HashTriple_t *> striples (s.last_bid_m + 1, stp);
00328 for ( IDMap_t::const_iterator idmi = s.getIDMap().begin();
00329 idmi != s.getIDMap().end(); ++ idmi )
00330 striples [idmi->bid] = idmi;
00331
00332
00333 tp->fix.seekp (0, ios::end);
00334 tp->var.seekp (0, ios::end);
00335
00336
00337 ID_t sbid = 0;
00338 for ( Size_t i = 0; i != s.npartitions_m; ++ i )
00339 {
00340
00341 sp = s.getPartition (i);
00342
00343 sp->fix.seekg (0);
00344
00345 while ( true )
00346 {
00347
00348 readLE (sp->fix, &vpos);
00349 readLE (sp->fix, &flags);
00350 if ( sp->fix.eof() )
00351 break;
00352 ++ sbid;
00353
00354
00355 if ( flags.is_removed )
00356 {
00357 sp->fix.ignore (tail);
00358 continue;
00359 }
00360
00361 sp->var.seekg (vpos);
00362
00363
00364 if ( (stp = striples [sbid]) != NULL )
00365 idmap_m.insert (stp->iid, stp->eid, last_bid_m + 1);
00366
00367
00368 if ( last_bid_m == max_bid_m )
00369 {
00370 try {
00371 addPartition (true);
00372 tp = getLastPartition();
00373 }
00374 catch (Exception_t) {
00375 if ( stp != NULL )
00376 {
00377 idmap_m.remove (stp->iid);
00378 idmap_m.remove (stp->eid);
00379 }
00380 throw;
00381 }
00382 }
00383
00384
00385 vpos = (std::streamoff)tp->var.tellp();
00386 writeLE (tp->fix, &vpos);
00387 writeLE (tp->fix, &flags);
00388
00389
00390 sp->fix.read (buffer, tail - sizeof (Size_t));
00391 readLE (sp->fix, &size);
00392 tp->fix.write (buffer, tail - sizeof (Size_t));
00393 writeLE (tp->fix, &size);
00394
00395
00396 while ( size > buffer_size )
00397 {
00398 buffer_size <<= 1;
00399 buffer = (char *) SafeRealloc (buffer, buffer_size);
00400 }
00401
00402
00403 sp->var.read (buffer, size);
00404 tp->var.write (buffer, size);
00405
00406
00407 if ( sp->fix.fail() || sp->var.fail() )
00408 AMOS_THROW_IO("Unknown file read error in concat, bank corrupted");
00409 if ( tp->fix.fail() || tp->var.fail() )
00410 AMOS_THROW_IO("Unknown file write error in concat, bank corrupted");
00411
00412 ++ nbids_m;
00413 ++ last_bid_m;
00414 }
00415 }
00416
00417
00418 if ( fix_size_m == 0 )
00419 fix_size_m = s.fix_size_m;
00420
00421 free (buffer);
00422 }
00423
00424
00425
00426 void Bank_t::create (const string & dir, BankMode_t mode)
00427 {
00428 if ( ! (mode & B_WRITE) )
00429 AMOS_THROW_IO ("Cannot create, bank not opened for writing");
00430 setMode (mode);
00431
00432 if ( is_open_m ) close();
00433
00434
00435 if ( exists (dir) )
00436 {
00437 open (dir, mode);
00438 destroy();
00439 }
00440
00441
00442
00443 try {
00444
00445 is_open_m = true;
00446 store_dir_m = dir;
00447 store_pfx_m = dir + '/' + Decode (banktype_m);
00448 mkdir (store_dir_m.c_str(), DIR_MODE);
00449
00450
00451 touchFile (store_pfx_m + IFO_STORE_SUFFIX, FILE_MODE, true);
00452 touchFile (store_pfx_m + MAP_STORE_SUFFIX, FILE_MODE, true);
00453
00454
00455 syncIFO (I_CREATE);
00456
00457
00458 addPartition (true);
00459 }
00460 catch (Exception_t) {
00461 destroy();
00462 throw;
00463 }
00464 }
00465
00466
00467
00468 void Bank_t::destroy()
00469 {
00470 if ( ! is_open_m || ! (mode_m & B_WRITE) )
00471 AMOS_THROW_IO ("Cannot destroy, bank not open for writing");
00472
00473
00474 clear();
00475
00476
00477 unlink ((store_pfx_m + MAP_STORE_SUFFIX).c_str());
00478 unlink ((store_pfx_m + IFO_STORE_SUFFIX).c_str());
00479 unlink ((store_pfx_m + LCK_STORE_SUFFIX).c_str());
00480
00481
00482 rmdir (store_dir_m.c_str());
00483
00484 init();
00485 }
00486
00487
00488
00489 bool Bank_t::exists (const string & dir) const
00490 {
00491
00492 string ifo_path (dir + '/' + Decode (banktype_m) + IFO_STORE_SUFFIX);
00493 return ( ! access (dir.c_str(), R_OK | X_OK)
00494 &&
00495 ! access (ifo_path.c_str(), R_OK) );
00496 }
00497
00498
00499
00500 void Bank_t::fetchBID (ID_t bid, IBankable_t & obj)
00501 {
00502 if ( ! is_open_m || ! (mode_m & B_READ) )
00503 AMOS_THROW_IO ("Cannot fetch, bank not open for reading");
00504 if (banktype_m != obj.getNCode())
00505 AMOS_THROW_ARGUMENT ("Cannot fetch, incompatible object type");
00506
00507
00508 BankPartition_t * partition = localizeBID (bid);
00509
00510 bankstreamoff vpos;
00511 bankstreamoff off = bid * fix_size_m;
00512 partition->fix.seekg (off);
00513 readLE (partition->fix, &vpos);
00514 readLE (partition->fix, &(obj.flags_m));
00515 partition->var.seekg (vpos);
00516 obj.readRecord (partition->fix, partition->var);
00517 partition->fix.ignore (sizeof (Size_t));
00518
00519 if ( partition->fix.fail() || partition->var.fail() )
00520 AMOS_THROW_IO ("Unknown file read error in fetch, bank corrupted");
00521 }
00522
00523
00524 void Bank_t::fetchBIDFix(ID_t bid, IBankable_t & obj)
00525 {
00526 if ( ! is_open_m || ! (mode_m & B_READ) )
00527 AMOS_THROW_IO ("Cannot fetch, bank not open for reading");
00528 if (banktype_m != obj.getNCode())
00529 AMOS_THROW_ARGUMENT ("Cannot fetch, incompatible object type");
00530
00531
00532 BankPartition_t * partition = localizeBID (bid);
00533
00534 bankstreamoff vpos;
00535 bankstreamoff off = bid * fix_size_m;
00536 partition->fix.seekg (off);
00537 readLE (partition->fix, &vpos);
00538 readLE (partition->fix, &(obj.flags_m));
00539 obj.readRecordFix (partition->fix);
00540 partition->fix.ignore (sizeof (Size_t));
00541
00542 if ( partition->fix.fail())
00543 AMOS_THROW_IO ("Unknown file read error in fetch, bank corrupted");
00544 }
00545
00546
00547
00548 ID_t Bank_t::getMaxIID() const
00549 {
00550 ID_t max = NULL_ID;
00551 IDMap_t::const_iterator i;
00552
00553 for ( i = idmap_m.begin(); i != idmap_m.end(); ++ i )
00554 if ( i->iid > max ) max = i->iid;
00555
00556 return max;
00557 }
00558
00559
00560 ID_t Bank_t::getMaxBID() const
00561 {
00562 ID_t max = NULL_ID;
00563 IDMap_t::const_iterator i;
00564
00565 for ( i = idmap_m.begin(); i != idmap_m.end(); ++ i )
00566 if ( i->bid > max ) max = i->bid;
00567
00568 return max;
00569 }
00570
00571
00572
00573 ID_t Bank_t::lookupBID (const string & eid) const
00574 {
00575 ID_t bid = idmap_m.lookupBID (eid);
00576 if ( bid == NULL_ID || bid > last_bid_m )
00577 {
00578 stringstream ss;
00579 ss << "EID '" << eid << "' does not exist in bank";
00580 AMOS_THROW_ARGUMENT (ss.str());
00581 }
00582 return bid;
00583 }
00584
00585
00586
00587 ID_t Bank_t::lookupBID (ID_t iid) const
00588 {
00589 ID_t bid = idmap_m.lookupBID (iid);
00590 if ( bid == NULL_ID || bid > last_bid_m )
00591 {
00592 stringstream ss;
00593 ss << "IID '" << iid << "' does not exist in bank";
00594 AMOS_THROW_ARGUMENT (ss.str());
00595 }
00596 return bid;
00597 }
00598
00599
00600
00601 void Bank_t::init()
00602 {
00603 fix_size_m = 0;
00604 is_open_m = false;
00605 last_bid_m = NULL_ID;
00606 max_bid_m = NULL_ID;
00607 nbids_m = NULL_ID;
00608 npartitions_m = 0;
00609 partition_size_m = 0;
00610 opened_m .clear();
00611 partitions_m.clear();
00612 store_dir_m .erase();
00613 store_pfx_m .erase();
00614 idmap_m .clear();
00615 idmap_m .setType (banktype_m);
00616 }
00617
00618
00619
00620 void Bank_t::lockIFO()
00621 {
00622 if ( (mode_m & B_SPY) ) return;
00623
00624
00625 string ifo_path (store_pfx_m + IFO_STORE_SUFFIX);
00626 string lck_path (store_pfx_m + LCK_STORE_SUFFIX);
00627 for ( int i = 0; i < LOCK_TIME; sleep(1), i ++ )
00628 if ( ! link (ifo_path.c_str(), lck_path.c_str()) )
00629 return;
00630
00631 AMOS_THROW_IO
00632 ((string)"Failed to obtain bank IFO file lock, " + strerror (errno));
00633 }
00634
00635
00636
00637 void Bank_t::open (const string & dir, BankMode_t mode)
00638 {
00639 if ( is_open_m ) close();
00640
00641 try {
00642
00643 is_open_m = true;
00644 setMode (mode);
00645 store_dir_m = dir;
00646 store_pfx_m = dir + '/' + Decode (banktype_m);
00647
00648
00649 touchFile (store_pfx_m + IFO_STORE_SUFFIX, FILE_MODE, false);
00650 touchFile (store_pfx_m + MAP_STORE_SUFFIX, FILE_MODE, false);
00651
00652
00653 syncIFO (I_OPEN);
00654
00655
00656 string map_path (store_pfx_m + MAP_STORE_SUFFIX);
00657
00658 idmap_m.read(map_path);
00659
00660
00661 if ( idmap_m.getType() != banktype_m ||
00662 idmap_m.getSize() > nbids_m ||
00663 nbids_m > last_bid_m ||
00664 last_bid_m > max_bid_m ||
00665 partitions_m.size() != npartitions_m )
00666 AMOS_THROW_IO ("Unknown file read error in open, bank corrupted");
00667 }
00668 catch (Exception_t) {
00669 init();
00670 throw;
00671 }
00672 }
00673
00674
00675
00676 Bank_t::BankPartition_t * Bank_t::openPartition (ID_t id)
00677 {
00678 BankPartition_t * partition = partitions_m [id];
00679
00680
00681 if ( partition->fix.is_open() )
00682 return partition;
00683
00684 try {
00685
00686 ios::openmode mode = ios::binary | ios::ate | ios::in;
00687 if ( (mode_m & B_WRITE) )
00688 mode |= ios::out;
00689
00690 partition->fix.open (partition->fix_name.c_str(), mode);
00691 if ( ! partition->fix.is_open() )
00692 AMOS_THROW_IO ("Could not open bank partition, " + partition->fix_name);
00693 partition->var.open (partition->var_name.c_str(), mode);
00694 if ( ! partition->var.is_open() )
00695 AMOS_THROW_IO ("Could not open bank partition, " + partition->var_name);
00696 }
00697 catch (Exception_t) {
00698 partition->fix.close();
00699 partition->var.close();
00700 throw;
00701 }
00702
00703
00704 while ( (Size_t)opened_m.size() >= max_partitions_m )
00705 {
00706 opened_m.front()->fix.close();
00707 opened_m.front()->var.close();
00708 opened_m.pop_front();
00709 }
00710 opened_m.push_back (partition);
00711
00712 return partition;
00713 }
00714
00715
00716
00717 void Bank_t::removeBID (ID_t bid)
00718 {
00719 if ( ! is_open_m || ! (mode_m & B_READ && mode_m & B_WRITE) )
00720 AMOS_THROW_IO ("Cannot remove, bank not open for reading and writing");
00721
00722
00723 BankPartition_t * partition = localizeBID (bid);
00724
00725 BankFlags_t flags;
00726 bankstreamoff off = bid * fix_size_m + sizeof (bankstreamoff);
00727 partition->fix.seekg (off);
00728 readLE (partition->fix, &flags);
00729 flags.is_removed = true;
00730 partition->fix.seekp (off);
00731 writeLE (partition->fix, &flags);
00732
00733 if ( partition->fix.fail() || partition->var.fail() )
00734 AMOS_THROW_IO ("Unknown file error in remove, bank corrupted");
00735
00736 -- nbids_m;
00737 }
00738
00739
00740
00741 void Bank_t::replace (ID_t iid, IBankable_t & obj)
00742 {
00743 ID_t bid = lookupBID (iid);
00744 string peid (idmap_m.lookupEID (iid));
00745 idmap_m.remove (iid);
00746
00747 try {
00748 idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00749 }
00750 catch (Exception_t) {
00751 idmap_m.insert (iid, peid, bid);
00752 throw;
00753 }
00754
00755 try {
00756 replaceBID (bid, obj);
00757 }
00758 catch (Exception_t) {
00759 idmap_m.remove (obj.iid_m);
00760 idmap_m.insert (iid, peid, bid);
00761 throw;
00762 }
00763 }
00764
00765
00766
00767 void Bank_t::replace (const string & eid, IBankable_t & obj)
00768 {
00769 ID_t bid = lookupBID (eid);
00770 ID_t piid = idmap_m.lookupIID (eid);
00771 idmap_m.remove (eid);
00772
00773 try {
00774 idmap_m.insert (obj.iid_m, obj.eid_m, bid);
00775 }
00776 catch (Exception_t) {
00777 idmap_m.insert (piid, eid, bid);
00778 throw;
00779 }
00780
00781 try {
00782 replaceBID (bid, obj);
00783 }
00784 catch (Exception_t) {
00785 idmap_m.remove (obj.eid_m);
00786 idmap_m.insert (piid, eid, bid);
00787 throw;
00788 }
00789 }
00790
00791
00792
00793 void Bank_t::replaceBID (ID_t bid, IBankable_t & obj)
00794 {
00795 if ( ! is_open_m || ! (mode_m & B_READ && mode_m & B_WRITE) )
00796 AMOS_THROW_IO ("Cannot replace, bank not open for reading and writing");
00797 if ( banktype_m != obj.getNCode() )
00798 AMOS_THROW_ARGUMENT ("Cannot replace, incompatible object type");
00799
00800
00801 obj.flags_m.is_removed = false;
00802 obj.flags_m.is_modified = true;
00803
00804
00805 BankPartition_t * partition = localizeBID (bid);
00806
00807 bankstreamoff off = bid * fix_size_m;
00808 partition->fix.seekp (off);
00809 partition->var.seekp (0, ios::end);
00810 bankstreamoff vpos = partition->var.tellp();
00811 writeLE (partition->fix, &vpos);
00812 writeLE (partition->fix, &(obj.flags_m));
00813 obj.writeRecord (partition->fix, partition->var);
00814 Size_t vsize = (std::streamoff)partition->var.tellp() - vpos;
00815 writeLE (partition->fix, &vsize);
00816
00817 if ( partition->fix.fail() || partition->var.fail() )
00818 AMOS_THROW_IO ("Unknown file error in replace, bank corrupted");
00819 }
00820
00821
00822
00823 void Bank_t::syncIFO (IFOMode_t mode)
00824 {
00825
00826 if ( (mode_m & B_SPY) && mode != I_OPEN )
00827 return;
00828
00829
00830 ostringstream ss;
00831 char * user = getenv ("USER");
00832 ss << ((mode_m & B_WRITE) ? WRITE_LOCK_CHAR : READ_LOCK_CHAR)
00833 << ' ' << getuid() << ' ' << (user == NULL ? "null" : user);
00834 string lock (ss.str());
00835
00836
00837 string line;
00838 NCode_t banktype;
00839 ID_t nbids, last_bid;
00840 Size_t fix_size, npartitions, partition_size;
00841 vector<string> locks;
00842 vector<string>::iterator vi;
00843
00844
00845 lockIFO();
00846
00847 try {
00848
00849
00850 if ( mode == I_OPEN || mode == I_CLOSE )
00851 {
00852 string ifo_path (store_pfx_m + IFO_STORE_SUFFIX);
00853 ifstream ifo_stream (ifo_path.c_str());
00854 if ( ! ifo_stream.is_open() )
00855 AMOS_THROW_IO ("Could not open bank partition, " + ifo_path);
00856
00857 getline (ifo_stream, line, '=');
00858 ifo_stream >> line;
00859 if ( line != BANK_VERSION )
00860 AMOS_THROW_IO
00861 ("Could not read bank, expected version: "
00862 + BANK_VERSION + ", saw version: " + line);
00863 getline (ifo_stream, line, '=');
00864 ifo_stream >> banktype;
00865 if ( banktype != banktype_m )
00866 AMOS_THROW_IO
00867 ("Could not read bank, incompatible type " + Decode (banktype));
00868
00869 getline (ifo_stream, line, '=');
00870 ifo_stream >> nbids;
00871 getline (ifo_stream, line, '=');
00872 ifo_stream >> last_bid;
00873 getline (ifo_stream, line, '=');
00874 ifo_stream >> fix_size;
00875 getline (ifo_stream, line, '=');
00876 ifo_stream >> npartitions;
00877 getline (ifo_stream, line, '=');
00878 ifo_stream >> partition_size;
00879 getline (ifo_stream, line, '=');
00880 getline (ifo_stream, line);
00881
00882 if ( ifo_stream.fail() )
00883 AMOS_THROW_IO ("Unknown file read error in sync, bank corrupted");
00884
00885
00886 bool noskip = (mode == I_OPEN);
00887 getline (ifo_stream, line);
00888 while ( ifo_stream.good() )
00889 {
00890 if ( noskip || line != lock )
00891 locks.push_back (line);
00892 else
00893 noskip = true;
00894 getline (ifo_stream, line);
00895 }
00896 ifo_stream.close();
00897
00898
00899 if ( mode == I_OPEN )
00900 {
00901 nbids_m = nbids;
00902 last_bid_m = last_bid;
00903 fix_size_m = fix_size;
00904 partition_size_m = partition_size;
00905
00906
00907 try {
00908 while ( npartitions > npartitions_m )
00909 addPartition (false);
00910 }
00911 catch (Exception_t) {
00912 for ( Size_t i = 0; i != npartitions_m; ++ i )
00913 delete (partitions_m [i]);
00914 npartitions_m = 0;
00915 throw;
00916 }
00917 }
00918 }
00919
00920
00921
00922 char ltype = NULL_CHAR;
00923 vector<string>::iterator vj;
00924 for ( vi = locks.begin(); vi != locks.end(); ++ vi )
00925 {
00926 vj = vi;
00927 ltype = vi->empty() ? NULL_CHAR : (*vi)[0];
00928 switch ( ltype )
00929 {
00930 case READ_LOCK_CHAR:
00931 break;
00932 case WRITE_LOCK_CHAR:
00933 if ( locks.size() == 1 )
00934 break;
00935
00936 default:
00937 AMOS_THROW_IO ("Invalid bank partition lock, bank corrupted");
00938 }
00939 }
00940
00941
00942
00943 if ( mode_m & B_SPY )
00944 {
00945 if ( ltype == WRITE_LOCK_CHAR )
00946 cerr << "WARNING: Disregarding '" << Decode (banktype_m)
00947 << "' bank lock, locked by '" + *vj + "'" << endl;
00948 return;
00949 }
00950
00951
00952
00953 if ( (mode_m & B_READ) && ltype == WRITE_LOCK_CHAR )
00954 AMOS_THROW_IO
00955 ("Could not open bank for reading, locked by '" + *vj + "'");
00956 if ( (mode_m & B_WRITE) && ltype == WRITE_LOCK_CHAR )
00957 AMOS_THROW_IO
00958 ("Could not open bank for writing, locked by '" + *vj + "'");
00959 if ( (mode_m & B_WRITE) && ltype == READ_LOCK_CHAR )
00960 AMOS_THROW_IO
00961 ("Could not open bank for writing, locked by '" + *vj + "'");
00962
00963
00964
00965 if ( mode != I_CLOSE )
00966 locks.push_back (lock);
00967
00968
00969
00970 if ( (mode_m & B_WRITE) )
00971 {
00972 nbids = nbids_m;
00973 last_bid = last_bid_m;
00974 fix_size = fix_size_m;
00975 npartitions = npartitions_m;
00976 partition_size = partition_size_m;
00977 }
00978
00979
00980
00981 string ifo_path (store_pfx_m + IFO_STORE_SUFFIX);
00982 ofstream ifo_stream (ifo_path.c_str());
00983 if ( ! ifo_stream.is_open() )
00984 AMOS_THROW_IO ("Could not open bank partition, " + ifo_path);
00985
00986 ifo_stream
00987 << "____" << Decode (banktype_m) << " BANK INFORMATION____" << endl
00988 << "bank version = " << BANK_VERSION << endl
00989 << "bank type = " << banktype_m << endl
00990 << "objects = " << nbids << endl
00991 << "indices = " << last_bid << endl
00992 << "bytes/index = " << fix_size << endl
00993 << "partitions = " << npartitions << endl
00994 << "indices/partition = " << partition_size << endl
00995 << "locks = " << endl;
00996
00997
00998 for ( vi = locks.begin(); vi != locks.end(); ++ vi )
00999 ifo_stream << *vi << endl;
01000
01001 if ( ifo_stream.fail() )
01002 AMOS_THROW_IO ("Unknown file write error in sync, bank corrupted");
01003 ifo_stream.close();
01004
01005 }
01006 catch (Exception_t) {
01007 unlockIFO();
01008 throw;
01009 }
01010
01011
01012 unlockIFO();
01013 }
01014
01015
01016
01017 void Bank_t::touchFile (const string & path, int mode, bool create)
01018 {
01019 int fd, flags;
01020
01021 if ( (mode_m & B_SPY) )
01022 flags = O_RDONLY;
01023 else
01024 flags = O_RDWR;
01025
01026 if ( create )
01027 flags |= O_CREAT | O_TRUNC;
01028
01029 fd = ::open (path.c_str(), flags, mode);
01030 if ( fd == -1 )
01031 {
01032 if ( create )
01033 AMOS_THROW_IO
01034 ("Could not create bank file, " + path + ", " + strerror (errno));
01035 else
01036 AMOS_THROW_IO
01037 ("Could not open bank file, " + path + ", " + strerror (errno));
01038 }
01039
01040 fd = ::close (fd);
01041 if ( fd == -1 )
01042 AMOS_THROW_IO
01043 ("Could not close bank file, " + path + ", " + strerror (errno));
01044 }
01045
01046
01047
01048 void Bank_t::unlockIFO()
01049 {
01050 if ( (mode_m & B_SPY) ) return;
01051
01052
01053 string lck_path (store_pfx_m + LCK_STORE_SUFFIX);
01054 if ( unlink (lck_path.c_str()) )
01055 AMOS_THROW_IO
01056 ((string)"Failed to release bank IFO file lock, " + strerror (errno));
01057 }
01058
01059
01060
01061 bool AMOS::BankExists (NCode_t ncode, const string & dir)
01062 {
01063
01064 string ifo_path (dir + '/' + Decode (ncode) + Bank_t::IFO_STORE_SUFFIX);
01065 return ( ! access (dir.c_str(), R_OK | X_OK)
01066 &&
01067 ! access (ifo_path.c_str(), R_OK) );
01068 }
01069
01070
01071
01072 void AMOS::PrintBankVersion (const char * s)
01073 {
01074 cerr << s << " compiled for bank version " << Bank_t::BANK_VERSION << endl;
01075 }
01076
01077
01078
01079
01080
01081 Bank_t::BankPartition_t::BankPartition_t (Size_t buffer_size)
01082 {
01083 fix_buff = (char *) SafeMalloc (buffer_size);
01084 var_buff = (char *) SafeMalloc (buffer_size);
01085
01086 fix.rdbuf()->_PUBSETBUF_ (fix_buff, buffer_size);
01087 var.rdbuf()->_PUBSETBUF_ (var_buff, buffer_size);
01088 }
01089
01090
01091
01092 Bank_t::BankPartition_t::~BankPartition_t()
01093 {
01094 fix.close();
01095 var.close();
01096
01097 free (fix_buff);
01098 free (var_buff);
01099 }