00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
#include "Bank_AMOS.hh"
00011
#include "Message_AMOS.hh"
00012
#include <sstream>
00013
#include <sys/types.h>
00014
#include <sys/stat.h>
00015
#include <fcntl.h>
00016
#include <unistd.h>
00017
#include <cstdlib>
00018
#include <cstdio>
00019
#include <ctime>
00020
#include <cstring>
00021
#include <sstream>
00022
#include <iostream>
00023
using namespace AMOS;
00024
using namespace std;
00025
00026 #define _PUBSETBUF_ pubsetbuf
00027
#ifdef __GNUC__
00028
# if __GNUC__ < 3
00029
# undef _PUBSETBUF_
00030
# define _PUBSETBUF_ setbuf
00031
# endif
00032
#endif
00033
00034 #define LOCK_TIME 5
00035 #define DIR_MODE 00755
00036 #define FILE_MODE 00644
00037
00038
00039
00040
00041
00042 const Size_t Bank_t::DEFAULT_BUFFER_SIZE = 1024;
00043 const Size_t Bank_t::DEFAULT_PARTITION_SIZE = 1000000;
00044 const Size_t Bank_t::MAX_OPEN_PARTITIONS = 20;
00045
00046 const string Bank_t::BANK_VERSION =
"2.8";
00047
00048 const string Bank_t::FIX_STORE_SUFFIX =
".fix";
00049 const string Bank_t::IFO_STORE_SUFFIX =
".ifo";
00050 const string Bank_t::LCK_STORE_SUFFIX =
".lck";
00051 const string Bank_t::VAR_STORE_SUFFIX =
".var";
00052 const string Bank_t::MAP_STORE_SUFFIX =
".map";
00053 const string Bank_t::TMP_STORE_SUFFIX =
".tmp";
00054
00055 const char Bank_t::WRITE_LOCK_CHAR =
'w';
00056 const char Bank_t::READ_LOCK_CHAR =
'r';
00057
00058
00059
00060 void Bank_t::addPartition (
bool create)
00061 {
00062
00063
BankPartition_t * partition =
new BankPartition_t (
buffer_size_m);
00064
partitions_m . push_back (partition);
00065
00066
try {
00067 ostringstream ss;
00068
00069 ss <<
store_pfx_m <<
'.' <<
npartitions_m <<
FIX_STORE_SUFFIX;
00070 partition -> fix_name = ss . str( );
00071 ss . str (
NULL_STRING);
00072
00073 ss <<
store_pfx_m <<
'.' <<
npartitions_m <<
VAR_STORE_SUFFIX;
00074 partition -> var_name = ss . str( );
00075 ss . str (
NULL_STRING);
00076
00077
00078
touchFile (partition -> fix_name,
FILE_MODE, create);
00079
touchFile (partition -> var_name,
FILE_MODE, create);
00080 }
00081
catch (
Exception_t) {
00082
partitions_m . pop_back( );
00083
delete partition;
00084
throw;
00085 }
00086
00087
00088
if (
partition_size_m == 0 )
00089
partition_size_m =
DEFAULT_PARTITION_SIZE;
00090
00091
00092
max_bid_m = ++
npartitions_m *
partition_size_m;
00093 }
00094
00095
00096
00097 void Bank_t::append (
IBankable_t & obj)
00098 {
00099
00100
idmap_m . insert (obj . iid_m, obj . eid_m,
last_bid_m + 1);
00101
00102
try {
00103
appendBID (obj);
00104 }
00105
catch (
Exception_t) {
00106
idmap_m .
remove (obj . iid_m);
00107
idmap_m .
remove (obj . eid_m);
00108
throw;
00109 }
00110 }
00111
00112
00113
00114 void Bank_t::appendBID (
IBankable_t & obj)
00115 {
00116
if ( !
is_open_m || ! (
mode_m &
B_WRITE) )
00117
AMOS_THROW_IO (
"Cannot append, bank not open for writing");
00118
if (
banktype_m != obj.
getNCode( ) )
00119
AMOS_THROW_ARGUMENT (
"Cannot append, incompatible object type");
00120
00121
00122
if (
last_bid_m ==
max_bid_m )
00123
addPartition (
true);
00124
00125
BankPartition_t * partition =
getLastPartition( );
00126
00127
00128 obj . flags_m . is_removed =
false;
00129 obj . flags_m . is_modified =
false;
00130
00131
00132
00133
00134 partition -> fix . seekp (0, ios::end);
00135 partition -> var . seekp (0, ios::end);
00136
bankstreamoff fpos = partition -> fix . tellp( );
00137
bankstreamoff vpos = partition -> var . tellp( );
00138
writeLE (partition -> fix, &vpos);
00139
writeLE (partition -> fix, &(obj . flags_m));
00140 obj . writeRecord (partition -> fix, partition -> var);
00141
Size_t vsize = (std::streamoff)partition -> var . tellp( ) - vpos;
00142
writeLE (partition -> fix, &vsize);
00143
00144
00145
Size_t fsize = (std::streamoff)partition -> fix . tellp( ) - fpos;
00146
if (
fix_size_m == 0 )
00147
fix_size_m = fsize;
00148
00149
if (
fix_size_m != fsize ||
00150 ! partition -> fix . good( ) ||
00151 ! partition -> var . good( ) )
00152
AMOS_THROW_IO (
"Unknown file write error in append, bank corrupted");
00153
00154 ++
nbids_m;
00155 ++
last_bid_m;
00156 }
00157
00158
00159
00160 void Bank_t::assignEID (
ID_t iid,
const string & eid)
00161 {
00162
ID_t bid =
lookupBID (iid);
00163 string peid (
idmap_m .
lookupEID (iid));
00164
idmap_m .
remove (iid);
00165
00166
try {
00167
idmap_m . insert (iid, eid, bid);
00168 }
00169
catch (
Exception_t) {
00170
idmap_m . insert (iid, peid, bid);
00171
throw;
00172 }
00173 }
00174
00175
00176
00177 void Bank_t::assignIID (
const string & eid,
ID_t iid)
00178 {
00179
ID_t bid =
lookupBID (eid);
00180
ID_t piid =
idmap_m .
lookupIID (eid);
00181
idmap_m .
remove (eid);
00182
00183
try {
00184
idmap_m . insert (iid, eid, bid);
00185 }
00186
catch (
Exception_t) {
00187
idmap_m . insert (piid, eid, bid);
00188
throw;
00189 }
00190 }
00191
00192
00193
00194 void Bank_t::clean ( )
00195 {
00196
if ( !
is_open_m || ! (
mode_m &
B_READ &&
mode_m &
B_WRITE) )
00197
AMOS_THROW_IO (
"Cannot clean, bank not open for reading and writing");
00198
00199
00200
Bank_t tmpbnk (
banktype_m);
00201
00202
try {
00203
00204 string tname =
store_pfx_m +
TMP_STORE_SUFFIX;
00205 mkdir (tname . c_str( ),
DIR_MODE);
00206 tmpbnk .
create (tname);
00207 tmpbnk .
concat (*
this);
00208
00209
00210
clear( );
00211
fix_size_m = tmpbnk .
fix_size_m;
00212
partition_size_m = tmpbnk .
partition_size_m;
00213
last_bid_m = tmpbnk .
last_bid_m;
00214
nbids_m = tmpbnk .
nbids_m;
00215
idmap_m = tmpbnk .
idmap_m;
00216
00217
00218
for (
Size_t i = 0; i != tmpbnk .
npartitions_m; ++ i )
00219 {
00220
while ( i >=
npartitions_m )
00221
addPartition (
true);
00222
00223 unlink (
partitions_m [i] -> fix_name . c_str( ));
00224 unlink (
partitions_m [i] -> var_name . c_str( ));
00225
if ( link (tmpbnk .
partitions_m [i] -> fix_name . c_str( ),
00226
partitions_m [i] -> fix_name . c_str( )) ||
00227 link (tmpbnk .
partitions_m [i] -> var_name . c_str( ),
00228
partitions_m [i] -> var_name . c_str( )) )
00229
AMOS_THROW_IO (
"Unknown file link error in clean, bank corrupted");
00230 }
00231 }
00232
catch (
Exception_t) {
00233
if ( tmpbnk .
isOpen( ) )
00234 tmpbnk .
destroy( );
00235
throw;
00236 }
00237
00238
00239 tmpbnk .
destroy( );
00240 }
00241
00242
00243
00244 void Bank_t::clear ( )
00245 {
00246
if ( !
is_open_m )
return;
00247
if ( ! (
mode_m &
B_WRITE) )
00248
AMOS_THROW_IO (
"Cannot clear, bank not open for writing");
00249
00250
00251
for (
Size_t i = 0; i !=
npartitions_m; ++ i )
00252 {
00253
partitions_m [i] -> fix .
close( );
00254
partitions_m [i] -> var .
close( );
00255 unlink (
partitions_m [i] -> fix_name . c_str( ));
00256 unlink (
partitions_m [i] -> var_name . c_str( ));
00257
delete (
partitions_m [i]);
00258 }
00259
00260
last_bid_m =
NULL_ID;
00261
max_bid_m =
NULL_ID;
00262
nbids_m =
NULL_ID;
00263
npartitions_m = 0;
00264
partition_size_m = 0;
00265
opened_m .
clear( );
00266
partitions_m .
clear( );
00267
idmap_m .
clear( );
00268
idmap_m . setType (
banktype_m);
00269 }
00270
00271
00272
00273 void Bank_t::close ( )
00274 {
00275
if ( !
is_open_m )
return;
00276
00277
if ( (
mode_m &
B_WRITE) )
00278 {
00279
00280 string map_path (
store_pfx_m +
MAP_STORE_SUFFIX);
00281 ofstream map_stream (map_path . c_str( ));
00282
if ( ! map_stream . is_open( ) )
00283
AMOS_THROW_IO (
"Could not open bank partition, " + map_path);
00284
00285
idmap_m . write (map_stream);
00286
00287
if ( ! map_stream . good( ) )
00288
AMOS_THROW_IO (
"Unknown file write error in close, bank corrupted");
00289 map_stream .
close( );
00290 }
00291
00292
00293
for (
Size_t i = 0; i <
npartitions_m; i ++ )
00294
delete (
partitions_m [i]);
00295
00296
00297
syncIFO (
I_CLOSE);
00298
00299
00300
init( );
00301 }
00302
00303
00304
00305 void Bank_t::concat (
Bank_t & s)
00306 {
00307
if ( !
is_open_m || ! (
mode_m &
B_WRITE) )
00308
AMOS_THROW_IO (
"Cannot concat, bank not open for writing");
00309
if ( ! s .
is_open_m || ! (s .
mode_m &
B_READ) )
00310
AMOS_THROW_IO (
"Cannot concat, source bank not open for reading");
00311
if (
banktype_m != s .
banktype_m )
00312
AMOS_THROW_ARGUMENT (
"Cannot concat, incompatible bank type");
00313
00314
Size_t size;
00315
Size_t tail = s .
fix_size_m -
sizeof (
bankstreamoff) -
sizeof (
BankFlags_t);
00316
BankFlags_t flags;
00317
00318
Size_t buffer_size = s .
fix_size_m;
00319
char * buffer = (
char *)
SafeMalloc (buffer_size);
00320
00321
bankstreamoff vpos;
00322
BankPartition_t * sp;
00323
BankPartition_t * tp =
getLastPartition( );
00324
00325
00326
const IDMap_t::HashTriple_t * stp = NULL;
00327 vector<const IDMap_t::HashTriple_t *> striples (s .
last_bid_m + 1, stp);
00328
for (
IDMap_t::const_iterator idmi = s .
getIDMap( ) . begin( );
00329 idmi != s .
getIDMap( ) . end( ); ++ idmi )
00330 striples [idmi -> bid] = idmi;
00331
00332
00333 tp -> fix . seekp (0, ios::end);
00334 tp -> var . seekp (0, ios::end);
00335
00336
00337
ID_t sbid = 0;
00338
for (
Size_t i = 0; i != s .
npartitions_m; ++ i )
00339 {
00340
00341 sp = s .
getPartition (i);
00342 sp -> fix . seekg (0, ios::beg);
00343
00344
while (
true )
00345 {
00346
00347
readLE (sp -> fix, &vpos);
00348
readLE (sp -> fix, &flags);
00349
if ( sp -> fix . eof( ) )
00350
break;
00351 ++ sbid;
00352
00353
00354
if ( flags . is_removed )
00355 {
00356 sp -> fix . ignore (tail);
00357
continue;
00358 }
00359
00360 sp -> var . seekg (vpos);
00361
00362
00363
if ( (stp = striples [sbid]) != NULL )
00364
idmap_m . insert (stp -> iid, stp -> eid, last_bid_m + 1);
00365
00366
00367
if ( last_bid_m ==
max_bid_m )
00368 {
00369
try {
00370
addPartition (
true);
00371 tp =
getLastPartition( );
00372 }
00373
catch (
Exception_t) {
00374
if ( stp != NULL )
00375 {
00376
idmap_m .
remove (stp -> iid);
00377
idmap_m .
remove (stp -> eid);
00378 }
00379
throw;
00380 }
00381 }
00382
00383
00384 vpos = (std::streamoff)tp -> var . tellp( );
00385
writeLE (tp -> fix, &vpos);
00386
writeLE (tp -> fix, &flags);
00387
00388
00389 sp -> fix . read (buffer, tail -
sizeof (
Size_t));
00390
readLE (sp -> fix, &size);
00391 tp -> fix . write (buffer, tail -
sizeof (
Size_t));
00392
writeLE (tp -> fix, &size);
00393
00394
00395
while ( size > buffer_size )
00396 {
00397 buffer_size <<= 1;
00398 buffer = (
char *)
SafeRealloc (buffer, buffer_size);
00399 }
00400
00401
00402 sp -> var . read (buffer, size);
00403 tp -> var . write (buffer, size);
00404
00405
00406
if ( ! sp -> fix . good( ) || ! sp -> var . good( ) )
00407
AMOS_THROW_IO(
"Unknown file read error in concat, bank corrupted");
00408
if ( ! tp -> fix . good( ) || ! tp -> var . good( ) )
00409
AMOS_THROW_IO(
"Unknown file write error in concat, bank corrupted");
00410
00411 ++
nbids_m;
00412 ++ last_bid_m;
00413 }
00414 }
00415
00416
00417
if (
fix_size_m == 0 )
00418
fix_size_m = s .
fix_size_m;
00419
00420 free (buffer);
00421 }
00422
00423
00424
00425 void Bank_t::create (
const string & dir,
BankMode_t mode)
00426 {
00427
if ( ! (mode &
B_WRITE) )
00428
AMOS_THROW_IO (
"Cannot create, bank not opened for writing");
00429
setMode (mode);
00430
00431
if (
is_open_m )
close( );
00432
00433
00434
if (
exists (dir) )
00435 {
00436
open (dir, mode);
00437
destroy( );
00438 }
00439
00440
00441
00442
try {
00443
00444
is_open_m =
true;
00445
store_dir_m = dir;
00446
store_pfx_m = dir +
'/' +
Decode (
banktype_m);
00447 mkdir (
store_dir_m . c_str( ),
DIR_MODE);
00448
00449
00450
touchFile (
store_pfx_m +
IFO_STORE_SUFFIX,
FILE_MODE,
true);
00451
touchFile (
store_pfx_m +
MAP_STORE_SUFFIX,
FILE_MODE,
true);
00452
00453
00454
syncIFO (
I_CREATE);
00455
00456
00457
addPartition (
true);
00458 }
00459
catch (
Exception_t) {
00460
destroy( );
00461
throw;
00462 }
00463 }
00464
00465
00466
00467 void Bank_t::destroy ( )
00468 {
00469
if ( !
is_open_m || ! (
mode_m &
B_WRITE) )
00470
AMOS_THROW_IO (
"Cannot destroy, bank not open for writing");
00471
00472
00473
clear( );
00474
00475
00476 unlink ((
store_pfx_m +
MAP_STORE_SUFFIX) . c_str( ));
00477 unlink ((
store_pfx_m +
IFO_STORE_SUFFIX) . c_str( ));
00478 unlink ((
store_pfx_m +
LCK_STORE_SUFFIX) . c_str( ));
00479
00480
00481 rmdir (
store_dir_m . c_str( ));
00482
00483
init( );
00484 }
00485
00486
00487
00488 bool Bank_t::exists (
const string & dir)
const
00489
{
00490
00491 string ifo_path (dir +
'/' +
Decode (
banktype_m) +
IFO_STORE_SUFFIX);
00492
return ( ! access (dir . c_str( ), R_OK | X_OK)
00493 &&
00494 ! access (ifo_path . c_str( ), R_OK) );
00495 }
00496
00497
00498
00499 void Bank_t::fetchBID (
ID_t bid,
IBankable_t & obj)
00500 {
00501
if ( !
is_open_m || ! (
mode_m &
B_READ) )
00502
AMOS_THROW_IO (
"Cannot fetch, bank not open for reading");
00503
if (
banktype_m != obj.
getNCode( ))
00504
AMOS_THROW_ARGUMENT (
"Cannot fetch, incompatible object type");
00505
00506
00507
BankPartition_t * partition =
localizeBID (bid);
00508
00509
bankstreamoff vpos;
00510
bankstreamoff off = bid *
fix_size_m;
00511 partition -> fix . seekg (off, ios::beg);
00512
readLE (partition -> fix, &vpos);
00513
readLE (partition -> fix, &(obj . flags_m));
00514 partition -> var . seekg (vpos);
00515 obj . readRecord (partition -> fix, partition -> var);
00516
00517
if ( ! partition -> fix . good( ) || ! partition -> var . good( ) )
00518
AMOS_THROW_IO (
"Unknown file read error in fetch, bank corrupted");
00519 }
00520
00521
00522
00523 ID_t Bank_t::getMaxIID ( )
const
00524
{
00525
ID_t max =
NULL_ID;
00526
IDMap_t::const_iterator i;
00527
00528
for ( i =
idmap_m . begin( ); i !=
idmap_m . end( ); ++ i )
00529
if ( i -> iid > max ) max = i -> iid;
00530
00531
return max;
00532 }
00533
00534
00535
00536 ID_t Bank_t::lookupBID (
const string & eid)
const
00537
{
00538
ID_t bid =
idmap_m .
lookupBID (eid);
00539
if ( bid ==
NULL_ID || bid >
last_bid_m )
00540 {
00541 stringstream ss;
00542 ss <<
"EID '" << eid <<
"' does not exist in bank";
00543
AMOS_THROW_ARGUMENT (ss . str( ));
00544 }
00545
return bid;
00546 }
00547
00548
00549
00550 ID_t Bank_t::lookupBID (
ID_t iid)
const
00551
{
00552
ID_t bid =
idmap_m .
lookupBID (iid);
00553
if ( bid ==
NULL_ID || bid >
last_bid_m )
00554 {
00555 stringstream ss;
00556 ss <<
"IID '" << iid <<
"' does not exist in bank";
00557
AMOS_THROW_ARGUMENT (ss . str( ));
00558 }
00559
return bid;
00560 }
00561
00562
00563
00564 void Bank_t::init ( )
00565 {
00566
fix_size_m = 0;
00567
is_open_m =
false;
00568
last_bid_m =
NULL_ID;
00569
max_bid_m =
NULL_ID;
00570
nbids_m =
NULL_ID;
00571
npartitions_m = 0;
00572
partition_size_m = 0;
00573
opened_m .
clear( );
00574
partitions_m .
clear( );
00575
store_dir_m . erase( );
00576
store_pfx_m . erase( );
00577
idmap_m .
clear( );
00578
idmap_m . setType (
banktype_m);
00579 }
00580
00581
00582
00583 void Bank_t::lockIFO ( )
00584 {
00585
if ( (
mode_m &
B_SPY) )
return;
00586
00587
00588 string ifo_path (
store_pfx_m +
IFO_STORE_SUFFIX);
00589 string lck_path (
store_pfx_m +
LCK_STORE_SUFFIX);
00590
for (
int i = 0; i <
LOCK_TIME; sleep(1), i ++ )
00591
if ( ! link (ifo_path.c_str( ), lck_path.c_str( )) )
00592
return;
00593
00594
AMOS_THROW_IO
00595 ((string)
"Failed to obtain bank IFO file lock, " + strerror (errno));
00596 }
00597
00598
00599
00600 void Bank_t::open (
const string & dir,
BankMode_t mode)
00601 {
00602
if (
is_open_m )
close( );
00603
00604
try {
00605
00606
is_open_m =
true;
00607
setMode (mode);
00608
store_dir_m = dir;
00609
store_pfx_m = dir +
'/' +
Decode (
banktype_m);
00610
00611
00612
touchFile (
store_pfx_m +
IFO_STORE_SUFFIX,
FILE_MODE,
false);
00613
touchFile (
store_pfx_m +
MAP_STORE_SUFFIX,
FILE_MODE,
false);
00614
00615
00616
syncIFO (
I_OPEN);
00617
00618
00619 string map_path (
store_pfx_m + MAP_STORE_SUFFIX);
00620 ifstream map_stream (map_path . c_str( ));
00621
if ( ! map_stream . is_open( ) )
00622
AMOS_THROW_IO (
"Could not open bank partition, " + map_path);
00623
00624
idmap_m . read (map_stream);
00625
00626
if ( ! map_stream . good( ) )
00627
AMOS_THROW_IO (
"Unknown file read error in open, bank corrupted");
00628 map_stream .
close( );
00629
00630
00631
if (
idmap_m .
getType( ) !=
banktype_m ||
00632
idmap_m .
getSize( ) >
nbids_m ||
00633
nbids_m >
last_bid_m ||
00634
last_bid_m >
max_bid_m ||
00635
partitions_m . size( ) !=
npartitions_m )
00636
AMOS_THROW_IO (
"Unknown file read error in open, bank corrupted");
00637 }
00638
catch (
Exception_t) {
00639
init( );
00640
throw;
00641 }
00642 }
00643
00644
00645
00646 Bank_t::BankPartition_t *
Bank_t::openPartition (
ID_t id)
00647 {
00648
BankPartition_t * partition =
partitions_m [
id];
00649
00650
00651
if ( partition -> fix . is_open( ) )
00652
return partition;
00653
00654
try {
00655
00656 ios::openmode mode = ios::binary | ios::ate | ios::in;
00657
if ( (
mode_m &
B_WRITE) )
00658 mode |= ios::out;
00659
00660 partition -> fix .
open (partition -> fix_name . c_str( ), mode);
00661
if ( ! partition -> fix . is_open( ) )
00662
AMOS_THROW_IO (
"Could not open bank partition, " + partition -> fix_name);
00663 partition -> var .
open (partition -> var_name . c_str( ), mode);
00664
if ( ! partition -> var . is_open( ) )
00665
AMOS_THROW_IO (
"Could not open bank partition, " + partition -> var_name);
00666 }
00667
catch (
Exception_t) {
00668 partition -> fix .
close( );
00669 partition -> var .
close( );
00670
throw;
00671 }
00672
00673
00674
while ( (
Size_t)
opened_m . size( ) >=
max_partitions_m )
00675 {
00676
opened_m . front( ) -> fix .
close( );
00677
opened_m . front( ) -> var .
close( );
00678
opened_m . pop_front( );
00679 }
00680
opened_m . push_back (partition);
00681
00682
return partition;
00683 }
00684
00685
00686
00687 void Bank_t::removeBID (
ID_t bid)
00688 {
00689
if ( !
is_open_m || ! (
mode_m &
B_READ &&
mode_m &
B_WRITE) )
00690
AMOS_THROW_IO (
"Cannot remove, bank not open for reading and writing");
00691
00692
00693
BankPartition_t * partition =
localizeBID (bid);
00694
00695
BankFlags_t flags;
00696
bankstreamoff off = bid *
fix_size_m +
sizeof (
bankstreamoff);
00697 partition -> fix . seekg (off, ios::beg);
00698
readLE (partition -> fix, &flags);
00699 flags . is_removed =
true;
00700 partition -> fix . seekp (off, ios::beg);
00701
writeLE (partition -> fix, &flags);
00702
00703
if ( ! partition -> fix . good( ) || ! partition -> var . good( ) )
00704
AMOS_THROW_IO (
"Unknown file error in remove, bank corrupted");
00705
00706 --
nbids_m;
00707 }
00708
00709
00710
00711 void Bank_t::replace (
ID_t iid,
IBankable_t & obj)
00712 {
00713
ID_t bid =
lookupBID (iid);
00714 string peid (
idmap_m .
lookupEID (iid));
00715
idmap_m .
remove (iid);
00716
00717
try {
00718
idmap_m . insert (obj . iid_m, obj . eid_m, bid);
00719 }
00720
catch (
Exception_t) {
00721
idmap_m . insert (iid, peid, bid);
00722
throw;
00723 }
00724
00725
try {
00726
replaceBID (bid, obj);
00727 }
00728
catch (
Exception_t) {
00729
idmap_m .
remove (obj . iid_m);
00730
idmap_m . insert (iid, peid, bid);
00731
throw;
00732 }
00733 }
00734
00735
00736
00737 void Bank_t::replace (
const string & eid,
IBankable_t & obj)
00738 {
00739
ID_t bid =
lookupBID (eid);
00740
ID_t piid =
idmap_m .
lookupIID (eid);
00741
idmap_m .
remove (eid);
00742
00743
try {
00744
idmap_m . insert (obj . iid_m, obj . eid_m, bid);
00745 }
00746
catch (
Exception_t) {
00747
idmap_m . insert (piid, eid, bid);
00748
throw;
00749 }
00750
00751
try {
00752
replaceBID (bid, obj);
00753 }
00754
catch (
Exception_t) {
00755
idmap_m .
remove (obj . eid_m);
00756
idmap_m . insert (piid, eid, bid);
00757
throw;
00758 }
00759 }
00760
00761
00762
00763 void Bank_t::replaceBID (
ID_t bid,
IBankable_t & obj)
00764 {
00765
if ( !
is_open_m || ! (
mode_m &
B_READ &&
mode_m &
B_WRITE) )
00766
AMOS_THROW_IO (
"Cannot replace, bank not open for reading and writing");
00767
if (
banktype_m != obj.
getNCode( ) )
00768
AMOS_THROW_ARGUMENT (
"Cannot replace, incompatible object type");
00769
00770
00771 obj . flags_m . is_removed =
false;
00772 obj . flags_m . is_modified =
true;
00773
00774
00775
BankPartition_t * partition =
localizeBID (bid);
00776
00777
bankstreamoff off = bid *
fix_size_m;
00778 partition -> fix . seekp (off, ios::beg);
00779 partition -> var . seekp (0, ios::end);
00780
bankstreamoff vpos = partition -> var . tellp( );
00781
writeLE (partition -> fix, &vpos);
00782
writeLE (partition -> fix, &(obj . flags_m));
00783 obj . writeRecord (partition -> fix, partition -> var);
00784
Size_t vsize = (std::streamoff)partition -> var . tellp( ) - vpos;
00785
writeLE (partition -> fix, &vsize);
00786
00787
if ( ! partition -> fix . good( ) || ! partition -> var . good( ) )
00788
AMOS_THROW_IO (
"Unknown file error in replace, bank corrupted");
00789 }
00790
00791
00792
00793 void Bank_t::syncIFO (IFOMode_t mode)
00794 {
00795
00796
if ( (
mode_m &
B_SPY) && mode !=
I_OPEN )
00797
return;
00798
00799
00800 ostringstream ss;
00801
char * user = getenv (
"USER");
00802 ss << ((
mode_m &
B_WRITE) ?
WRITE_LOCK_CHAR :
READ_LOCK_CHAR)
00803 <<
' ' << getuid( ) <<
' ' << (user == NULL ?
"null" : user);
00804 string lock (ss . str( ));
00805
00806
00807 string line;
00808
NCode_t banktype;
00809
ID_t nbids, last_bid;
00810
Size_t fix_size, npartitions, partition_size;
00811 vector<string> locks;
00812 vector<string>::iterator vi;
00813
00814
00815
lockIFO( );
00816
00817
try {
00818
00819
00820
if ( mode ==
I_OPEN || mode ==
I_CLOSE )
00821 {
00822 string ifo_path (
store_pfx_m +
IFO_STORE_SUFFIX);
00823 ifstream ifo_stream (ifo_path . c_str( ));
00824
if ( ! ifo_stream . is_open( ) )
00825
AMOS_THROW_IO (
"Could not open bank partition, " + ifo_path);
00826
00827 getline (ifo_stream, line,
'=');
00828 ifo_stream >> line;
00829
if ( line !=
BANK_VERSION )
00830
AMOS_THROW_IO
00831 (
"Could not read bank, incompatible bank version " + line);
00832 getline (ifo_stream, line,
'=');
00833 ifo_stream >> banktype;
00834
if ( banktype !=
banktype_m )
00835
AMOS_THROW_IO
00836 (
"Could not read bank, incompatible type " +
Decode (banktype));
00837
00838 getline (ifo_stream, line,
'=');
00839 ifo_stream >> nbids;
00840 getline (ifo_stream, line,
'=');
00841 ifo_stream >> last_bid;
00842 getline (ifo_stream, line,
'=');
00843 ifo_stream >> fix_size;
00844 getline (ifo_stream, line,
'=');
00845 ifo_stream >> npartitions;
00846 getline (ifo_stream, line,
'=');
00847 ifo_stream >> partition_size;
00848 getline (ifo_stream, line,
'=');
00849 getline (ifo_stream, line);
00850
00851
if ( ! ifo_stream . good( ) )
00852
AMOS_THROW_IO (
"Unknown file read error in sync, bank corrupted");
00853
00854
00855
bool noskip = (mode ==
I_OPEN);
00856 getline (ifo_stream, line);
00857
while ( ifo_stream . good( ) )
00858 {
00859
if ( noskip || line != lock )
00860 locks . push_back (line);
00861
else
00862 noskip =
true;
00863 getline (ifo_stream, line);
00864 }
00865 ifo_stream .
close( );
00866
00867
00868
if ( mode ==
I_OPEN )
00869 {
00870
nbids_m = nbids;
00871
last_bid_m = last_bid;
00872
fix_size_m = fix_size;
00873
partition_size_m = partition_size;
00874
00875
00876
try {
00877
while ( npartitions >
npartitions_m )
00878
addPartition (
false);
00879 }
00880
catch (
Exception_t) {
00881
for (
Size_t i = 0; i !=
npartitions_m; ++ i )
00882
delete (
partitions_m [i]);
00883
npartitions_m = 0;
00884
throw;
00885 }
00886 }
00887 }
00888
00889
00890
00891
char ltype =
NULL_CHAR;
00892 vector<string>::iterator vj;
00893
for ( vi = locks . begin( ); vi != locks . end( ); ++ vi )
00894 {
00895 vj = vi;
00896 ltype = vi ->
empty( ) ?
NULL_CHAR : (*vi)[0];
00897
switch ( ltype )
00898 {
00899
case READ_LOCK_CHAR:
00900
break;
00901
case WRITE_LOCK_CHAR:
00902
if ( locks . size( ) == 1 )
00903
break;
00904
00905
default:
00906
AMOS_THROW_IO (
"Invalid bank partition lock, bank corrupted");
00907 }
00908 }
00909
00910
00911
00912
if (
mode_m &
B_SPY )
00913 {
00914
if ( ltype ==
WRITE_LOCK_CHAR )
00915 cerr <<
"WARNING: Disregarding '" <<
Decode (
banktype_m)
00916 <<
"' bank lock, locked by '" + *vj +
"'" << endl;
00917
return;
00918 }
00919
00920
00921
00922
if ( (
mode_m &
B_READ) && ltype ==
WRITE_LOCK_CHAR )
00923
AMOS_THROW_IO
00924 (
"Could not open bank for reading, locked by '" + *vj +
"'");
00925
if ( (
mode_m &
B_WRITE) && ltype ==
WRITE_LOCK_CHAR )
00926
AMOS_THROW_IO
00927 (
"Could not open bank for writing, locked by '" + *vj +
"'");
00928
if ( (
mode_m &
B_WRITE) && ltype ==
READ_LOCK_CHAR )
00929
AMOS_THROW_IO
00930 (
"Could not open bank for writing, locked by '" + *vj +
"'");
00931
00932
00933
00934
if ( mode !=
I_CLOSE )
00935 locks . push_back (lock);
00936
00937
00938
00939
if ( (
mode_m &
B_WRITE) )
00940 {
00941 nbids =
nbids_m;
00942 last_bid =
last_bid_m;
00943 fix_size =
fix_size_m;
00944 npartitions =
npartitions_m;
00945 partition_size =
partition_size_m;
00946 }
00947
00948
00949
00950 string ifo_path (
store_pfx_m +
IFO_STORE_SUFFIX);
00951 ofstream ifo_stream (ifo_path . c_str( ));
00952
if ( ! ifo_stream . is_open( ) )
00953
AMOS_THROW_IO (
"Could not open bank partition, " + ifo_path);
00954
00955 ifo_stream
00956 <<
"____" <<
Decode (
banktype_m) <<
" BANK INFORMATION____" << endl
00957 <<
"bank version = " <<
BANK_VERSION << endl
00958 <<
"bank type = " <<
banktype_m << endl
00959 <<
"objects = " << nbids << endl
00960 <<
"indices = " << last_bid << endl
00961 <<
"bytes/index = " << fix_size << endl
00962 <<
"partitions = " << npartitions << endl
00963 <<
"indices/partition = " << partition_size << endl
00964 <<
"locks = " << endl;
00965
00966
00967
for ( vi = locks . begin( ); vi != locks . end( ); ++ vi )
00968 ifo_stream << *vi << endl;
00969
00970
if ( ! ifo_stream . good( ) )
00971
AMOS_THROW_IO (
"Unknown file write error in sync, bank corrupted");
00972 ifo_stream .
close( );
00973
00974 }
00975
catch (
Exception_t) {
00976
unlockIFO( );
00977
throw;
00978 }
00979
00980
00981
unlockIFO( );
00982 }
00983
00984
00985
00986 void Bank_t::touchFile (
const string & path,
int mode,
bool create)
00987 {
00988
int fd, flags;
00989
00990
if ( (
mode_m &
B_SPY) )
00991 flags = O_RDONLY;
00992
else
00993 flags = O_RDWR;
00994
00995
if ( create )
00996 flags |= O_CREAT | O_TRUNC;
00997
00998 fd = ::open (path . c_str( ), flags, mode);
00999
if ( fd == -1 )
01000 {
01001
if ( create )
01002
AMOS_THROW_IO
01003 (
"Could not create bank file, " + path +
", " + strerror (errno));
01004
else
01005
AMOS_THROW_IO
01006 (
"Could not open bank file, " + path +
", " + strerror (errno));
01007 }
01008
01009 fd = ::close (fd);
01010
if ( fd == -1 )
01011
AMOS_THROW_IO
01012 (
"Could not close bank file, " + path +
", " + strerror (errno));
01013 }
01014
01015
01016
01017 void Bank_t::unlockIFO ( )
01018 {
01019
if ( (
mode_m &
B_SPY) )
return;
01020
01021
01022 string lck_path (
store_pfx_m +
LCK_STORE_SUFFIX);
01023
if ( unlink (lck_path . c_str( )) )
01024
AMOS_THROW_IO
01025 ((string)
"Failed to release bank IFO file lock, " + strerror (errno));
01026 }
01027
01028
01029
01030 bool AMOS::BankExists (
NCode_t ncode,
const string & dir)
01031 {
01032
01033 string ifo_path (dir +
'/' +
Decode (ncode) + Bank_t::IFO_STORE_SUFFIX);
01034
return ( ! access (dir . c_str( ), R_OK | X_OK)
01035 &&
01036 ! access (ifo_path . c_str( ), R_OK) );
01037 }
01038
01039
01040
01041 void AMOS::PrintBankVersion (
const char * s)
01042 {
01043 cerr << s <<
" compiled for bank version " << Bank_t::BANK_VERSION << endl;
01044 }
01045
01046
01047
01048
01049
01050 Bank_t::BankPartition_t::BankPartition_t (
Size_t buffer_size)
01051 {
01052 fix_buff = (
char *)
SafeMalloc (buffer_size);
01053 var_buff = (
char *)
SafeMalloc (buffer_size);
01054
01055
fix . rdbuf( ) ->
_PUBSETBUF_ (fix_buff, buffer_size);
01056
var . rdbuf( ) ->
_PUBSETBUF_ (var_buff, buffer_size);
01057 }
01058
01059
01060
01061 Bank_t::BankPartition_t::~BankPartition_t ( )
01062 {
01063
fix .
close( );
01064
var .
close( );
01065
01066 free (fix_buff);
01067 free (var_buff);
01068 }