BlobDB: Can return expiration together with Get() (#4227)
Summary: Add API to allow fetching expiration of a key with `Get()`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4227 Differential Revision: D9169897 Pulled By: yiwu-arbug fbshipit-source-id: 2a6f216c493dc75731ddcef1daa689b517fab31b
This commit is contained in:
parent
4bd3bc5c4f
commit
abbd5a4b93
@ -144,6 +144,15 @@ class BlobDB : public StackableDB {
|
|||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
PinnableSlice* value) override = 0;
|
PinnableSlice* value) override = 0;
|
||||||
|
|
||||||
|
// Get value and expiration.
|
||||||
|
virtual Status Get(const ReadOptions& options,
|
||||||
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
|
PinnableSlice* value, uint64_t* expiration) = 0;
|
||||||
|
virtual Status Get(const ReadOptions& options, const Slice& key,
|
||||||
|
PinnableSlice* value, uint64_t* expiration) {
|
||||||
|
return Get(options, DefaultColumnFamily(), key, value, expiration);
|
||||||
|
}
|
||||||
|
|
||||||
using rocksdb::StackableDB::MultiGet;
|
using rocksdb::StackableDB::MultiGet;
|
||||||
virtual std::vector<Status> MultiGet(
|
virtual std::vector<Status> MultiGet(
|
||||||
const ReadOptions& options,
|
const ReadOptions& options,
|
||||||
@ -181,7 +190,6 @@ class BlobDB : public StackableDB {
|
|||||||
|
|
||||||
virtual Status Write(const WriteOptions& opts,
|
virtual Status Write(const WriteOptions& opts,
|
||||||
WriteBatch* updates) override = 0;
|
WriteBatch* updates) override = 0;
|
||||||
|
|
||||||
using rocksdb::StackableDB::NewIterator;
|
using rocksdb::StackableDB::NewIterator;
|
||||||
virtual Iterator* NewIterator(const ReadOptions& options) override = 0;
|
virtual Iterator* NewIterator(const ReadOptions& options) override = 0;
|
||||||
virtual Iterator* NewIterator(const ReadOptions& options,
|
virtual Iterator* NewIterator(const ReadOptions& options,
|
||||||
|
@ -968,7 +968,7 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||||
PinnableSlice* value) {
|
PinnableSlice* value, uint64_t* expiration) {
|
||||||
assert(value != nullptr);
|
assert(value != nullptr);
|
||||||
BlobIndex blob_index;
|
BlobIndex blob_index;
|
||||||
Status s = blob_index.DecodeFrom(index_entry);
|
Status s = blob_index.DecodeFrom(index_entry);
|
||||||
@ -978,6 +978,13 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
|
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
|
||||||
return Status::NotFound("Key expired");
|
return Status::NotFound("Key expired");
|
||||||
}
|
}
|
||||||
|
if (expiration != nullptr) {
|
||||||
|
if (blob_index.HasTTL()) {
|
||||||
|
*expiration = blob_index.expiration();
|
||||||
|
} else {
|
||||||
|
*expiration = kNoExpiration;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (blob_index.IsInlined()) {
|
if (blob_index.IsInlined()) {
|
||||||
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
|
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
|
||||||
// memory buffer to avoid extra copy.
|
// memory buffer to avoid extra copy.
|
||||||
@ -1113,14 +1120,20 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
PinnableSlice* value) {
|
PinnableSlice* value) {
|
||||||
|
return Get(read_options, column_family, key, value, nullptr /*expiration*/);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
||||||
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
|
PinnableSlice* value, uint64_t* expiration) {
|
||||||
StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
|
StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
|
||||||
RecordTick(statistics_, BLOB_DB_NUM_GET);
|
RecordTick(statistics_, BLOB_DB_NUM_GET);
|
||||||
return GetImpl(read_options, column_family, key, value);
|
return GetImpl(read_options, column_family, key, value, expiration);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
|
Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
PinnableSlice* value) {
|
PinnableSlice* value, uint64_t* expiration) {
|
||||||
if (column_family != DefaultColumnFamily()) {
|
if (column_family != DefaultColumnFamily()) {
|
||||||
return Status::NotSupported(
|
return Status::NotSupported(
|
||||||
"Blob DB doesn't support non-default column family.");
|
"Blob DB doesn't support non-default column family.");
|
||||||
@ -1138,10 +1151,13 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
|
|||||||
&is_blob_index);
|
&is_blob_index);
|
||||||
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
|
||||||
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
|
||||||
|
if (expiration != nullptr) {
|
||||||
|
*expiration = kNoExpiration;
|
||||||
|
}
|
||||||
if (s.ok() && is_blob_index) {
|
if (s.ok() && is_blob_index) {
|
||||||
std::string index_entry = value->ToString();
|
std::string index_entry = value->ToString();
|
||||||
value->Reset();
|
value->Reset();
|
||||||
s = GetBlobValue(key, index_entry, value);
|
s = GetBlobValue(key, index_entry, value, expiration);
|
||||||
}
|
}
|
||||||
if (snapshot_created) {
|
if (snapshot_created) {
|
||||||
db_->ReleaseSnapshot(ro.snapshot);
|
db_->ReleaseSnapshot(ro.snapshot);
|
||||||
|
@ -130,6 +130,10 @@ class BlobDBImpl : public BlobDB {
|
|||||||
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
||||||
const Slice& key, PinnableSlice* value) override;
|
const Slice& key, PinnableSlice* value) override;
|
||||||
|
|
||||||
|
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
||||||
|
const Slice& key, PinnableSlice* value,
|
||||||
|
uint64_t* expiration) override;
|
||||||
|
|
||||||
using BlobDB::NewIterator;
|
using BlobDB::NewIterator;
|
||||||
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
|
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
|
||||||
|
|
||||||
@ -215,10 +219,10 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
Status GetImpl(const ReadOptions& read_options,
|
Status GetImpl(const ReadOptions& read_options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
PinnableSlice* value);
|
PinnableSlice* value, uint64_t* expiration = nullptr);
|
||||||
|
|
||||||
Status GetBlobValue(const Slice& key, const Slice& index_entry,
|
Status GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||||
PinnableSlice* value);
|
PinnableSlice* value, uint64_t* expiration = nullptr);
|
||||||
|
|
||||||
Slice GetCompressedSlice(const Slice& raw,
|
Slice GetCompressedSlice(const Slice& raw,
|
||||||
std::string* compression_output) const;
|
std::string* compression_output) const;
|
||||||
|
@ -334,6 +334,25 @@ TEST_F(BlobDBTest, StackableDBGet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(BlobDBTest, GetExpiration) {
|
||||||
|
Options options;
|
||||||
|
options.env = mock_env_.get();
|
||||||
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.disable_background_tasks = true;
|
||||||
|
mock_env_->set_current_time(100);
|
||||||
|
Open(bdb_options, options);
|
||||||
|
Put("key1", "value1");
|
||||||
|
PutWithTTL("key2", "value2", 200);
|
||||||
|
PinnableSlice value;
|
||||||
|
uint64_t expiration;
|
||||||
|
ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
|
||||||
|
ASSERT_EQ("value1", value.ToString());
|
||||||
|
ASSERT_EQ(kNoExpiration, expiration);
|
||||||
|
ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
|
||||||
|
ASSERT_EQ("value2", value.ToString());
|
||||||
|
ASSERT_EQ(300 /* = 100 + 200 */, expiration);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(BlobDBTest, WriteBatch) {
|
TEST_F(BlobDBTest, WriteBatch) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
Loading…
Reference in New Issue
Block a user