00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
#include "BankStream_AMOS.hh"
00011
using namespace AMOS;
00012
using namespace std;
00013
00014
00015
00016
00017
00018 const Size_t BankStream_t::DEFAULT_BUFFER_SIZE = 1024;
00019 const Size_t BankStream_t::MAX_OPEN_PARTITIONS = 2;
00020
00021
00022
00023 BankStream_t &
BankStream_t::ignore (bankstreamoff n)
00024 {
00025
if ( ! is_open_m || ! (mode_m &
B_READ) )
00026
AMOS_THROW_IO (
"Cannot ignore: bank not open for reading");
00027
00028
ID_t lid;
00029 bankstreamoff off;
00030
BankFlags_t bf;
00031 BankPartition_t * partition;
00032
Size_t skip = fix_size_m -
sizeof (bankstreamoff) -
sizeof (
BankFlags_t);
00033
00034
while ( n > 0 &&
inrange( ) )
00035 {
00036 lid =
curr_bid_m;
00037 partition =
localizeBID (lid);
00038 off = lid * fix_size_m;
00039
if ( (std::streamoff)partition -> fix .
tellg( ) != off )
00040 partition -> fix .
seekg (off, ios::beg);
00041
00042 partition -> fix .
ignore (
sizeof (bankstreamoff));
00043
readLE (partition -> fix, &bf);
00044 partition -> fix .
ignore (skip);
00045
00046
if ( ! bf . is_removed )
00047 -- n;
00048 ++ curr_bid_m;
00049 }
00050
00051
eof_m = !
inrange( );
00052
return *
this;
00053 }
00054
00055
00056
00057 void BankStream_t::open (
const std::string & dir,
BankMode_t mode)
00058 {
00059
Bank_t::open (dir, mode);
00060
init( );
00061
00062
const IDMap_t::HashTriple_t * tp = NULL;
00063
triples_m . resize (last_bid_m + 1, tp);
00064
for (
IDMap_t::const_iterator idmi =
getIDMap( ) . begin( );
00065 idmi !=
getIDMap( ) . end( ); ++ idmi )
00066
triples_m [idmi -> bid] = idmi;
00067 }
00068
00069
00070
00071 BankStream_t &
BankStream_t::operator>> (
IBankable_t & obj)
00072 {
00073
if ( ! is_open_m || ! (mode_m &
B_READ) )
00074
AMOS_THROW_IO (
"Cannot stream fetch: bank not open for reading");
00075
if ( banktype_m != obj.
getNCode( ) )
00076
AMOS_THROW_ARGUMENT (
"Cannot stream fetch: incompatible object type");
00077
if (
eof( ) )
00078
AMOS_THROW_ARGUMENT (
"Cannot stream fetch: beyond end of stream");
00079
00080
ID_t lid;
00081
BankFlags_t flags;
00082 bankstreamoff off;
00083 bankstreamoff vpos;
00084 BankPartition_t * partition;
00085
Size_t skip = fix_size_m -
sizeof (bankstreamoff) -
sizeof (
BankFlags_t);
00086
00087
00088 flags . is_removed =
true;
00089
while ( flags . is_removed )
00090 {
00091
if ( !
inrange( ) )
00092 {
00093
eof_m =
true;
00094
return *
this;
00095 }
00096
00097 lid =
curr_bid_m;
00098 partition =
localizeBID (lid);
00099 off = lid * fix_size_m;
00100
if ( (std::streamoff)partition -> fix .
tellg( ) != off )
00101 partition -> fix .
seekg (off, ios::beg);
00102
00103
readLE (partition -> fix, &vpos);
00104
readLE (partition -> fix, &flags);
00105
if ( flags . is_removed )
00106 partition -> fix .
ignore (skip);
00107
00108 ++ curr_bid_m;
00109 }
00110
00111
if ( (std::streamoff)partition -> var .
tellg( ) != vpos )
00112 partition -> var .
seekg (vpos);
00113
00114
const IDMap_t::HashTriple_t * trip =
triples_m [
curr_bid_m - 1];
00115
if ( trip == NULL )
00116 {
00117 obj . iid_m =
NULL_ID;
00118 obj . eid_m . erase( );
00119 }
00120
else
00121 {
00122 obj . iid_m = trip -> iid;
00123 obj . eid_m = trip -> eid;
00124 }
00125
00126 obj . flags_m = flags;
00127 obj . readRecord (partition -> fix, partition -> var);
00128
00129
if ( ! partition -> fix . good( ) || ! partition -> var . good( ) )
00130
AMOS_THROW_IO (
"Unknown file read error in stream fetch, bank corrupted");
00131
00132
return *
this;
00133 }
00134
00135
00136
00137 BankStream_t &
BankStream_t::operator<< (
IBankable_t & obj)
00138 {
00139
if ( ! is_open_m || ! (mode_m &
B_WRITE) )
00140
AMOS_THROW_IO (
"Cannot stream append: bank not open for writing");
00141
if ( banktype_m != obj.
getNCode( ) )
00142
AMOS_THROW_ARGUMENT (
"Cannot stream append: incompatible object type");
00143
00144
00145
triples_m . push_back
00146 (idmap_m . insert (obj . iid_m, obj . eid_m, last_bid_m + 1));
00147
00148
try {
00149
00150
if ( last_bid_m == max_bid_m )
00151
addPartition (
true);
00152
00153 BankPartition_t * partition =
getLastPartition( );
00154
00155
00156 obj . flags_m . is_removed =
false;
00157 obj . flags_m . is_modified =
false;
00158
00159
00160
00161
00162
00163 bankstreamoff fpos = (std::streamoff)partition -> fix .
tellp( );
00164 bankstreamoff vpos = (std::streamoff)partition -> var .
tellp( );
00165
writeLE (partition -> fix, &vpos);
00166
writeLE (partition -> fix, &(obj . flags_m));
00167 obj . writeRecord (partition -> fix, partition -> var);
00168
Size_t vsize = (std::streamoff)partition -> var .
tellp( ) - vpos;
00169
writeLE (partition -> fix, &vsize);
00170
00171
00172
Size_t fsize = (std::streamoff)partition -> fix .
tellp( ) - fpos;
00173
if ( fix_size_m == 0 )
00174 fix_size_m = fsize;
00175
00176
if ( fix_size_m != fsize ||
00177 ! partition -> fix . good( ) ||
00178 ! partition -> var . good( ) )
00179
AMOS_THROW_IO
00180 (
"Unknown file write error in stream append, bank corrupted");
00181
00182 ++ nbids_m;
00183 ++ last_bid_m;
00184 }
00185
catch (
Exception_t) {
00186
triples_m . pop_back( );
00187 idmap_m .
remove (obj . iid_m);
00188 idmap_m .
remove (obj . eid_m);
00189
throw;
00190 }
00191
00192
return *
this;
00193 }