Implement taoAssocRangeGet().

Summary:

Test Plan:

Reviewers:

CC:

Task ID: #

Blame Rev:
This commit is contained in:
Dhruba Borthakur 2012-08-03 00:27:48 -07:00
parent 0c98fdcf27
commit 88c515b6ba
5 changed files with 174 additions and 28 deletions

View File

@ -12,7 +12,7 @@ compiled thrift libraries.
If you want to compile leveldb with thrift-server support, please set the following If you want to compile leveldb with thrift-server support, please set the following
enviroment variables appropriately: enviroment variables appropriately:
USE_THRIFT=1 USE_THRIFT=1
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./thrift/libs LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./thrift/libs:./snappy/libs
make clean leveldb_server leveldb_server_test make clean leveldb_server leveldb_server_test
You can run the leveldb server unit tests by You can run the leveldb server unit tests by

View File

@ -56,7 +56,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int64_t taoAssocDelete(const Text& tableName, int64_t assocType, int64_t id1, int64_t taoAssocDelete(const Text& tableName, int64_t assocType, int64_t id1,
int64_t id2, AssocVisibility visibility, bool update_count, int64_t id2, AssocVisibility visibility, bool update_count,
const Text& wormhole_comment) { const Text& wormhole_comment) {
printf("taoAssocDelete\n"); printf("taoAssocDelete not implemented yet\n");
return 0; return 0;
} }
@ -64,7 +64,14 @@ class AssocServiceHandler : virtual public AssocServiceIf {
const Text& tableName, int64_t assocType, int64_t id1, const Text& tableName, int64_t assocType, int64_t id1,
int64_t start_time, int64_t end_time, int64_t offset, int64_t start_time, int64_t end_time, int64_t offset,
int64_t limit) { int64_t limit) {
printf("taoAssocRangeGet\n"); leveldb::DB* db = openhandles_->get(tableName, NULL);
if (db == NULL) {
throw generate_exception(tableName, Code::kNotFound,
"taoAssocRangeGet: Unable to open database " ,
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
}
assocRangeGetBytimeInternal(_return, tableName, db, assocType, id1,
start_time, end_time, offset, limit);
} }
void taoAssocGet(std::vector<TaoAssocGetResult> & _return, void taoAssocGet(std::vector<TaoAssocGetResult> & _return,
@ -73,10 +80,10 @@ class AssocServiceHandler : virtual public AssocServiceIf {
leveldb::DB* db = openhandles_->get(tableName, NULL); leveldb::DB* db = openhandles_->get(tableName, NULL);
if (db == NULL) { if (db == NULL) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to database " , "taoAssocGet:Unable to open database " ,
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
} }
taoAssocGetInternal(_return, tableName, db, assocType, id1, id2s); assocGetInternal(_return, tableName, db, assocType, id1, id2s);
} }
int64_t taoAssocCount(const Text& tableName, int64_t assocType, int64_t id1) { int64_t taoAssocCount(const Text& tableName, int64_t assocType, int64_t id1) {
@ -84,17 +91,22 @@ class AssocServiceHandler : virtual public AssocServiceIf {
if (db == NULL) { if (db == NULL) {
return Code::kNotFound; return Code::kNotFound;
} }
return taoAssocCountInternal(tableName, db, assocType, id1); return assocCountInternal(tableName, db, assocType, id1);
} }
private: private:
OpenHandles* openhandles_; OpenHandles* openhandles_;
// the maximum values returned in a rangeget/multiget call.
const static unsigned int MAX_RANGE_SIZE = 10000;
// //
// inserts an assoc // Inserts an assoc
// Returns true if the iinsertion is successful, otherwise return false. // If update_count, returns the updated count of the assoc.
// If update_count, return zero.
// On failure, return negative number.
// //
bool assocPutInternal(const Text& tableName, leveldb::DB* db, int64_t assocPutInternal(const Text& tableName, leveldb::DB* db,
int64_t assocType, int64_t id1, int64_t assocType, int64_t id1,
int64_t id2, int64_t id1Type, int64_t id2Type, int64_t id2, int64_t id1Type, int64_t id2Type,
int64_t ts, AssocVisibility vis, int64_t ts, AssocVisibility vis,
@ -102,6 +114,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
const Text& wormhole_comment) { const Text& wormhole_comment) {
leveldb::WriteOptions woptions; leveldb::WriteOptions woptions;
woptions.sync = true; woptions.sync = true;
ts = convertTime(ts); // change time to numberofmillis till MAXLONG
// create the payload for this assoc // create the payload for this assoc
int payloadsize = sizeof(id1Type) + sizeof(id2Type) + sizeof(dataVersion) + int payloadsize = sizeof(id1Type) + sizeof(id2Type) + sizeof(dataVersion) +
@ -174,10 +187,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// if ts != oldts, then delete 'p'$old_ts$id2 // if ts != oldts, then delete 'p'$old_ts$id2
if (!newassoc) { if (!newassoc) {
char* val = (char *)value.c_str(); extractTsVisString(&oldts, &oldvis, (char *)value.c_str());
extract_int64(&oldts, val);
extract_int8(&oldvis, val + sizeof(int64_t));
if (ts != oldts) { if (ts != oldts) {
if (!db->Delete(woptions, pkey).ok()) { if (!db->Delete(woptions, pkey).ok()) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
@ -231,7 +241,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
return 0; return 0;
} }
int64_t taoAssocCountInternal(const Text& tableName, leveldb::DB* db, int64_t assocCountInternal(const Text& tableName, leveldb::DB* db,
int64_t assocType, int64_t id1) { int64_t assocType, int64_t id1) {
// create key to query // create key to query
int maxkeysize = sizeof(id1) + sizeof(assocType) + 1; int maxkeysize = sizeof(id1) + sizeof(assocType) + 1;
@ -268,14 +278,80 @@ class AssocServiceHandler : virtual public AssocServiceIf {
return count; return count;
} }
void taoAssocGetInternal(std::vector<TaoAssocGetResult> & _return, void assocRangeGetBytimeInternal(std::vector<TaoAssocGetResult> & _return,
const Text& tableName, leveldb::DB* db,
int64_t assocType, int64_t id1,
int64_t start_time, int64_t end_time, int64_t offset,
int64_t limit) {
if (start_time < end_time) {
throw generate_exception(tableName, Code::kNotFound,
"assocRangeGetBytimeInternal:Bad starttime and endtime\n",
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
}
int64_t ts, id2;
const leveldb::ReadOptions options;
std::string wormhole;
// convert times to time-till-LONGMAX
int64_t startTime = convertTime(start_time);
int64_t endTime = convertTime(end_time);
// create max key to query
int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) +
sizeof(id2);
std::string dummy;
dummy.reserve(maxkeysize);
dummy.resize(maxkeysize);
// create rowkey
char* keybuf = &dummy[0];
int rowkeysize = makeRowKey(keybuf, id1, assocType);
// Position scan at 'p'$ts$id2 where ts = startTime and id2 = 0
id2 = 0;
leveldb::Iterator* iter = db->NewIterator(options);
int keysize = appendRowKeyForPayload(rowkeysize, keybuf, startTime, id2);
leveldb::Slice pkey(keybuf, keysize);
for (iter->Seek(pkey); iter->Valid() && limit > 0 ; iter->Next()) {
// skip over records that the caller is not interested in
if (offset > 0) {
offset--;
continue;
}
printf("XXX server key found %s\n", iter->key().ToString().c_str());
ASSERT_GE(iter->key().size_, (unsigned int)rowkeysize);
// extract the timestamp and id1 from the key
extractRowKeyP(&ts, &id2, rowkeysize, (char*)(iter->key().data_));
ASSERT_GE(ts, startTime);
if (ts > endTime) {
break;
}
// allocate a new slot in the result set.
_return.resize(_return.size() + 1);
TaoAssocGetResult* result = &_return.back();
// Fill up new element in result set.
result->id2 = id2;
result->time = convertTime(ts);
extractPayload((char*)iter->value().data_, &result->id1Type,
&result->id2Type,
&result->dataVersion, result->data, wormhole);
limit--;
}
}
void assocGetInternal(std::vector<TaoAssocGetResult> & _return,
const Text& tableName, const Text& tableName,
leveldb::DB* db, leveldb::DB* db,
int64_t assocType, int64_t id1, int64_t assocType, int64_t id1,
const std::vector<int64_t> & id2s) { const std::vector<int64_t> & id2s) {
int64_t ts, id2; int64_t ts, id2;
if (id2s.size() > 10000) { if (id2s.size() > MAX_RANGE_SIZE) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Ids2 cannot be gteater than 10K.", "Ids2 cannot be gteater than 10K.",
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
@ -283,7 +359,6 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// allocate the entire result buffer. // allocate the entire result buffer.
_return.reserve(id2s.size()); _return.reserve(id2s.size());
// create max key to query // create max key to query
int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) + int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) +
sizeof(id2); sizeof(id2);
@ -344,7 +419,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// Fill up new element in result set. // Fill up new element in result set.
result->id2 = id2; result->id2 = id2;
result->time = ts; result->time = convertTime(ts);
extractPayload((char*)value.c_str(), &result->id1Type, extractPayload((char*)value.c_str(), &result->id1Type,
&result->id2Type, &result->id2Type,
&result->dataVersion, result->data, wormhole); &result->dataVersion, result->data, wormhole);
@ -374,6 +449,17 @@ class AssocServiceHandler : virtual public AssocServiceIf {
dest = copy_int64_switch_endian(dest, id2); dest = copy_int64_switch_endian(dest, id2);
return rowkeysize + sizeof(ts) + sizeof(id2) + 1; return rowkeysize + sizeof(ts) + sizeof(id2) + 1;
} }
// extract the timestamp and id2 from the key p$ts$id2
inline void extractRowKeyP(int64_t* ts, int64_t* id,
int rowkeysize, char* src) {
src += rowkeysize; // skip over the rowkey
ASSERT_EQ(*src, 'p');
src++;
extract_int64(ts, src); src += sizeof(*ts);
extract_int64(id, src); src += sizeof(*id);
}
// fill the row key +'m' + id2 and returns the size of the key // fill the row key +'m' + id2 and returns the size of the key
inline int appendRowKeyForMeta(int rowkeysize, char* dest, inline int appendRowKeyForMeta(int rowkeysize, char* dest,
int64_t id2) { int64_t id2) {
@ -479,6 +565,14 @@ class AssocServiceHandler : virtual public AssocServiceIf {
*dest = *(int8_t *)src; *dest = *(int8_t *)src;
} }
// convert a timestamp from an ever-increasing number to
// a decreasing number. All stored timestamps in this database
// are MAXLONG - timestamp. Thus, a backward-scan in time
// is converted to a forward scan in the database.
inline int64_t convertTime(int64_t ts) {
return LONG_MAX - ts;
}
// generate an exception message // generate an exception message
LeveldbException generate_exception(const Text& tableName, LeveldbException generate_exception(const Text& tableName,

View File

@ -309,7 +309,9 @@ service AssocService {
* TAO Assoc RangeGet operation. * TAO Assoc RangeGet operation.
* Obtain assocs in bewteen start_time and end_time in reverse time order. * Obtain assocs in bewteen start_time and end_time in reverse time order.
* The range check is inclusive: start_time >= time && time >= end_time. * The range check is inclusive: start_time >= time && time >= end_time.
* And yes, start_time >= end_time. * And yes, start_time >= end_time because this range scan is a backward
* scan in time, starting with most recent time and scanning backwards
* for the most recent n assocs.
*/ */
list<TaoAssocGetResult> taoAssocRangeGet( list<TaoAssocGetResult> taoAssocRangeGet(
/** name of table */ /** name of table */

View File

@ -280,7 +280,7 @@ class DBHandler : virtual public DBIf {
return; return;
} }
// If the iterator has reached the endm close it rightaway. // If the iterator has reached the end close it rightaway.
// There is no need for the application to make another thrift // There is no need for the application to make another thrift
// call to cleanup the iterator. // call to cleanup the iterator.
if (!it->Valid()) { if (!it->Valid()) {
@ -302,19 +302,19 @@ class DBHandler : virtual public DBIf {
leveldb::Slice key = it->key(); leveldb::Slice key = it->key();
leveldb::Slice value = it->value(); leveldb::Slice value = it->value();
// pack results back to client
_return.keyvalue.key.data.assign(key.data_, key.size_);
_return.keyvalue.key.size = key.size_;
_return.keyvalue.value.data.assign(value.data_, value.size_);
_return.keyvalue.value.size = value.size_;
_return.status = Code::kOk; // success
// move to next or previous value // move to next or previous value
if (doNext) { if (doNext) {
it->Next(); it->Next();
} else { } else {
it->Prev(); it->Prev();
} }
// pack results back to client
_return.keyvalue.key.data = key.data_;
_return.keyvalue.key.size = key.size_;
_return.keyvalue.value.data = value.data_;
_return.keyvalue.value.size = value.size_;
_return.status = Code::kOk; // success
} }
// read the next value from the iterator // read the next value from the iterator

View File

@ -207,6 +207,34 @@ static void testClient() {
printf("Iterator scan-selective backward passes.\n"); printf("Iterator scan-selective backward passes.\n");
} }
// clean up all data that we inserted as part of this test
void clearDatabase() {
WriteOptions writeOptions;
ReadOptions readOptions;
readOptions.snapshot.snapshotid = 0;
printf("Clearing entire database.\n");
ResultIterator ri;
Slice dummy;
dbclient->NewIterator(ri, dbhandle, readOptions,
IteratorType::seekToFirst, dummy);
ASSERT_TRUE(ri.status == Code::kOk);
while (true) {
ResultPair pair;
dbclient->GetNext(pair, dbhandle, ri.iterator);
if (pair.status == Code::kOk) {
Slice key = pair.keyvalue.key;
Code code = dbclient->Delete(dbhandle, key, writeOptions);
ASSERT_EQ(code, Code::kOk);
} else {
break;
}
}
// no need to invoke DeleteIterator because we scanned
// till the end using the iterator and ist is auto-deleted
// by the server.
}
// //
// Run assoc tests // Run assoc tests
// //
@ -231,7 +259,7 @@ static void testAssocs() {
ts, vis, update_count, ts, vis, update_count,
dataVersion, data, wormhole_comments); dataVersion, data, wormhole_comments);
ASSERT_GE(count, 0); ASSERT_GE(count, 0);
printf("Put AssocPut suceeded.\n"); printf("AssocPut first record suceeded.\n");
// verify assoc counts. // verify assoc counts.
int64_t cnt = aclient->taoAssocCount(dbname, assocType, id1); int64_t cnt = aclient->taoAssocCount(dbname, assocType, id1);
@ -251,6 +279,25 @@ static void testAssocs() {
ASSERT_EQ(ts, readback[0].time); ASSERT_EQ(ts, readback[0].time);
ASSERT_EQ(dataVersion, readback[0].dataVersion); ASSERT_EQ(dataVersion, readback[0].dataVersion);
ASSERT_EQ(readback[0].data.compare(data), 0); ASSERT_EQ(readback[0].data.compare(data), 0);
// add one more assoc
const Text data1 = "data1......";
count = aclient->taoAssocPut(dbname, assocType,
id1, id2+2, id1Type+1, id2Type+1,
ts+1, vis, update_count,
dataVersion+1, data1, wormhole_comments);
ASSERT_EQ(count, 2);
printf("AssocPut second record suceeded.\n");
// do a range get for id1+type and verify that there
// are two assocs.
readback.clear();
int64_t offset = 0;
int64_t limit = 1000;
aclient->taoAssocRangeGet(readback, dbname, assocType,
id1, LONG_MAX, 0,
offset, limit);
ASSERT_EQ((unsigned int)2, readback.size());
} }
// //
@ -282,7 +329,10 @@ int main(int argc, char **argv) {
// run all tests // run all tests
testClient(); testClient();
clearDatabase();
testAssocs(); testAssocs();
clearDatabase();
// done all tests // done all tests
close(); close();