diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d0ad1cb0..61e314fa5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -483,6 +483,7 @@ set(SOURCES utilities/blob_db/blob_log_reader.cc utilities/blob_db/blob_log_writer.cc utilities/blob_db/blob_log_format.cc + utilities/blob_db/ttl_extractor.cc utilities/checkpoint/checkpoint_impl.cc utilities/col_buf_decoder.cc utilities/col_buf_encoder.cc diff --git a/TARGETS b/TARGETS index 1bafb01ca..de64bf5f7 100644 --- a/TARGETS +++ b/TARGETS @@ -212,6 +212,7 @@ cpp_library( "utilities/blob_db/blob_log_reader.cc", "utilities/blob_db/blob_log_writer.cc", "utilities/blob_db/blob_log_format.cc", + "utilities/blob_db/ttl_extractor.cc", "utilities/checkpoint/checkpoint_impl.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", diff --git a/src.mk b/src.mk index 8250947f5..cb3383ff0 100644 --- a/src.mk +++ b/src.mk @@ -159,6 +159,7 @@ LIB_SOURCES = \ utilities/blob_db/blob_log_reader.cc \ utilities/blob_db/blob_log_writer.cc \ utilities/blob_db/blob_log_format.cc \ + utilities/blob_db/ttl_extractor.cc \ utilities/checkpoint/checkpoint_impl.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index e2defe97c..ea60ad59b 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -17,7 +17,6 @@ #include "table/block.h" #include "table/block_based_table_builder.h" #include "table/block_builder.h" -#include "util/crc32c.h" #include "util/file_reader_writer.h" #include "util/filename.h" #include "utilities/blob_db/blob_db_impl.h" @@ -163,7 +162,6 @@ BlobDBOptions::BlobDBOptions() bytes_per_sync(0), blob_file_size(256 * 1024 * 1024), num_concurrent_simple_blobs(4), - default_ttl_extractor(false), compression(kNoCompression) {} } // namespace blob_db diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index f45a42f60..dfb21383d 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -13,6 +13,7 @@ #include "rocksdb/db.h" #include "rocksdb/status.h" #include "rocksdb/utilities/stackable_db.h" +#include "utilities/blob_db/ttl_extractor.h" namespace rocksdb { @@ -64,15 +65,10 @@ struct BlobDBOptions { // how many files to use for simple blobs at one time uint32_t num_concurrent_simple_blobs; - // this function is to be provided by client if they intend to - // use Put API to provide TTL. - // the first argument is the value in the Put API - // in case you want to do some modifications to the value, - // return a new Slice in the second. - // otherwise just copy the input value into output. - // the ttl should be extracted and returned in last pointer. - // otherwise assign it to -1 - std::function extract_ttl_fn; + // Instead of setting TTL explicitly by calling PutWithTTL or PutUntil, + // applications can set a TTLExtractor which can extract TTL from key-value + // pairs. + std::shared_ptr ttl_extractor; // eviction callback. // this function will be called for every blob that is getting @@ -80,9 +76,6 @@ struct BlobDBOptions { std::function gc_evict_cb_fn; - // default ttl extactor - bool default_ttl_extractor; - // what compression to use for Blob's CompressionType compression; @@ -95,10 +88,6 @@ struct BlobDBOptions { }; class BlobDB : public StackableDB { - public: - // the suffix to a blob value to represent "ttl:TTLVAL" - static const uint64_t kTTLSuffixLength = 8; - public: using rocksdb::StackableDB::Put; @@ -120,6 +109,8 @@ class BlobDB : public StackableDB { return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl); } + // Put with expiration. Key with expiration time equal to -1 + // means the key don't expire. virtual Status PutUntil(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, int32_t expiration) = 0; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 1dd72b6bc..95deda5b0 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -6,9 +6,7 @@ #include "utilities/blob_db/blob_db_impl.h" #include -#include #include -#include #include #include #include @@ -58,17 +56,6 @@ namespace rocksdb { namespace blob_db { -struct GCStats { - uint64_t blob_count = 0; - uint64_t num_deletes = 0; - uint64_t deleted_size = 0; - uint64_t num_relocs = 0; - uint64_t succ_deletes_lsm = 0; - uint64_t overrided_while_delete = 0; - uint64_t succ_relocs = 0; - std::shared_ptr newfile = nullptr; -}; - // BlobHandle is a pointer to the blob that is stored in the LSM class BlobHandle { public: @@ -192,7 +179,8 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, const DBOptions& db_options) : BlobDB(nullptr), db_impl_(nullptr), - myenv_(db_options.env), + env_(db_options.env), + ttl_extractor_(blob_db_options.ttl_extractor.get()), wo_set_(false), bdb_options_(blob_db_options), db_options_(db_options), @@ -218,10 +206,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, blob_dir_ = (bdb_options_.path_relative) ? dbname + "/" + bdb_options_.blob_dir : bdb_options_.blob_dir; - - if (bdb_options_.default_ttl_extractor) { - bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob; - } } Status BlobDBImpl::LinkToBaseDB(DB* db) { @@ -238,17 +222,17 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) { db_impl_ = dynamic_cast(db); } - myenv_ = db_->GetEnv(); + env_ = db_->GetEnv(); opt_db_.reset(new OptimisticTransactionDBImpl(db, false)); - Status s = myenv_->CreateDirIfMissing(blob_dir_); + Status s = env_->CreateDirIfMissing(blob_dir_); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "Failed to create blob directory: %s status: '%s'", blob_dir_.c_str(), s.ToString().c_str()); } - s = myenv_->NewDirectory(blob_dir_, &dir_ent_); + s = env_->NewDirectory(blob_dir_, &dir_ent_); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "Failed to open blob directory: %s status: '%s'", @@ -293,10 +277,6 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) blob_dir_ = (bdb_options_.path_relative) ? db_->GetName() + "/" + bdb_options_.blob_dir : bdb_options_.blob_dir; - - if (bdb_options_.default_ttl_extractor) { - bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob; - } } BlobDBImpl::~BlobDBImpl() { @@ -311,7 +291,7 @@ Status BlobDBImpl::OpenPhase1() { return Status::NotSupported("No blob directory in options"); std::unique_ptr dir_ent; - Status s = myenv_->NewDirectory(blob_dir_, &dir_ent); + Status s = env_->NewDirectory(blob_dir_, &dir_ent); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "Failed to open blob directory: %s status: '%s'", @@ -366,7 +346,7 @@ void BlobDBImpl::OnFlushBeginHandler(DB* db, const FlushJobInfo& info) { Status BlobDBImpl::GetAllLogFiles( std::set>* file_nums) { std::vector all_files; - Status status = myenv_->GetChildren(blob_dir_, &all_files); + Status status = env_->GetChildren(blob_dir_, &all_files); if (!status.ok()) { return status; } @@ -413,7 +393,7 @@ Status BlobDBImpl::OpenAllFiles() { for (auto f_iter : file_nums) { std::string bfpath = BlobFileName(blob_dir_, f_iter.first); uint64_t size_bytes; - Status s1 = myenv_->GetFileSize(bfpath, &size_bytes); + Status s1 = env_->GetFileSize(bfpath, &size_bytes); if (!s1.ok()) { ROCKS_LOG_WARN( db_options_.info_log, @@ -436,7 +416,7 @@ Status BlobDBImpl::OpenAllFiles() { // read header std::shared_ptr reader; - reader = bfptr->OpenSequentialReader(myenv_, db_options_, env_options_); + reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_); s1 = reader->ReadHeader(&bfptr->header_); if (!s1.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -448,7 +428,7 @@ Status BlobDBImpl::OpenAllFiles() { bfptr->header_valid_ = true; std::shared_ptr ra_reader = - GetOrOpenRandomAccessReader(bfptr, myenv_, env_options_); + GetOrOpenRandomAccessReader(bfptr, env_, env_options_); BlobLogFooter bf; s1 = bfptr->ReadFooter(&bf); @@ -586,13 +566,13 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { EnvOptions env_options = env_options_; env_options.writable_file_max_buffer_size = 0; - Status s = myenv_->ReopenWritableFile(fpath, &wfile, env_options); + Status s = env_->ReopenWritableFile(fpath, &wfile, env_options); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to open blob file for write: %s status: '%s'" " exists: '%s'", fpath.c_str(), s.ToString().c_str(), - myenv_->FileExists(fpath).ToString().c_str()); + env_->FileExists(fpath).ToString().c_str()); return s; } @@ -788,39 +768,13 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) { return bfile; } -bool BlobDBImpl::ExtractTTLFromBlob(const Slice& value, Slice* newval, - int32_t* ttl_val) { - *newval = value; - *ttl_val = -1; - if (value.size() <= BlobDB::kTTLSuffixLength) return false; - - int32_t ttl_tmp = - DecodeFixed32(value.data() + value.size() - sizeof(int32_t)); - std::string ttl_exp(value.data() + value.size() - BlobDB::kTTLSuffixLength, - 4); - if (ttl_exp != "ttl:") return false; - - newval->remove_suffix(BlobDB::kTTLSuffixLength); - *ttl_val = ttl_tmp; - return true; -} - -//////////////////////////////////////////////////////////////////////////////// -// A specific pattern is looked up at the end of the value part. -// ttl:TTLVAL . if this pattern is found, PutWithTTL is called, otherwise -// regular Put is called. -//////////////////////////////////////////////////////////////////////////////// Status BlobDBImpl::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Slice newval; - int32_t ttl_val; - if (bdb_options_.extract_ttl_fn) { - bdb_options_.extract_ttl_fn(value, &newval, &ttl_val); - return PutWithTTL(options, column_family, key, newval, ttl_val); - } - - return PutWithTTL(options, column_family, key, value, -1); + std::string new_value; + Slice value_slice; + int32_t expiration = ExtractExpiration(key, value, &value_slice, &new_value); + return PutUntil(options, column_family, key, value_slice, expiration); } Status BlobDBImpl::Delete(const WriteOptions& options, @@ -852,6 +806,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { Status batch_rewrite_status_; std::shared_ptr last_file_; bool has_put_; + std::string new_value_; public: explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) @@ -866,23 +821,13 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { bool has_put() { return has_put_; } virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value_unc) override { - Slice newval; - int32_t ttl_val = -1; - if (impl_->bdb_options_.extract_ttl_fn) { - impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val); - } else { - newval = value_unc; - } + const Slice& value_slice) override { + Slice value_unc; + int32_t expiration = + impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_); - int32_t expiration = -1; - if (ttl_val != -1) { - std::time_t cur_t = std::chrono::system_clock::to_time_t( - std::chrono::system_clock::now()); - expiration = ttl_val + static_cast(cur_t); - } std::shared_ptr bfile = - (ttl_val != -1) + (expiration != -1) ? impl_->SelectBlobFileTTL(expiration) : ((last_file_) ? last_file_ : impl_->SelectBlobFile()); if (last_file_ && last_file_ != bfile) { @@ -1004,12 +949,8 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, int32_t ttl) { - return PutUntil( - options, column_family, key, value, - (ttl != -1) - ? ttl + static_cast(std::chrono::system_clock::to_time_t( - std::chrono::system_clock::now())) - : -1); + return PutUntil(options, column_family, key, value, + static_cast(EpochNow()) + ttl); } Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, @@ -1024,6 +965,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, return *compression_output; } +// TODO(yiwu): We should use uint64_t for expiration. Status BlobDBImpl::PutUntil(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value_unc, int32_t expiration) { @@ -1097,6 +1039,24 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, return s; } +// TODO(yiwu): We should return uint64_t after updating the rest of the code +// to use uint64_t for expiration. +int32_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value, + Slice* value_slice, + std::string* new_value) { + uint64_t expiration = kNoExpiration; + bool value_changed = false; + if (ttl_extractor_ != nullptr) { + bool has_ttl = ttl_extractor_->ExtractExpiration( + key, value, EpochNow(), &expiration, new_value, &value_changed); + if (!has_ttl) { + expiration = kNoExpiration; + } + } + *value_slice = value_changed ? Slice(*new_value) : value; + return (expiration == kNoExpiration) ? -1 : static_cast(expiration); +} + Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, const Slice& value, std::string* index_entry) { @@ -1240,7 +1200,7 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, // takes locks when called std::shared_ptr reader = - GetOrOpenRandomAccessReader(bfile, myenv_, env_options_); + GetOrOpenRandomAccessReader(bfile, env_, env_options_); if (value != nullptr) { std::string* valueptr = value; @@ -1377,14 +1337,13 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { assert(!bfile->Immutable()); } - std::time_t epoch_now = - std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + uint64_t epoch_now = EpochNow(); for (auto bfile_pair : blob_files_) { auto bfile = bfile_pair.second; ROCKS_LOG_INFO( db_options_.info_log, - "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %d", + "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(), bfile->deleted_count_, bfile->deleted_size_, (bfile->ttl_range_.second - epoch_now)); @@ -1603,8 +1562,7 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { std::vector> process_files; { - std::time_t epoch_now = - std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + uint64_t epoch_now = EpochNow(); ReadLock rl(&mutex_); for (auto bfile : open_blob_files_) { @@ -1713,11 +1671,10 @@ std::pair BlobDBImpl::WaStats(bool aborted) { //////////////////////////////////////////////////////////////////////////////// Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, GCStats* gcstats) { - std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); - std::time_t tt = std::chrono::system_clock::to_time_t(now); + uint64_t tt = EpochNow(); std::shared_ptr reader = - bfptr->OpenSequentialReader(myenv_, db_options_, env_options_); + bfptr->OpenSequentialReader(env_, db_options_, env_options_); if (!reader) { ROCKS_LOG_ERROR(db_options_.info_log, "File sequential reader could not be opened", @@ -1987,7 +1944,7 @@ std::pair BlobDBImpl::DeleteObsFiles(bool aborted) { } } - Status s = myenv_->DeleteFile(bfile->PathName()); + Status s = env_->DeleteFile(bfile->PathName()); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "File failed to be deleted as obsolete %s", @@ -2019,7 +1976,7 @@ std::pair BlobDBImpl::DeleteObsFiles(bool aborted) { bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr bfile) { std::shared_ptr reader = - bfile->OpenSequentialReader(myenv_, db_options_, env_options_); + bfile->OpenSequentialReader(env_, db_options_, env_options_); if (!reader) { ROCKS_LOG_ERROR( db_options_.info_log, @@ -2264,6 +2221,23 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key, auto cfh = reinterpret_cast(DefaultColumnFamily()); return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence); } + +std::vector> BlobDBImpl::TEST_GetBlobFiles() const { + std::vector> blob_files; + for (auto& p : blob_files_) { + blob_files.emplace_back(p.second); + } + return blob_files; +} + +void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { + CloseSeqWrite(bfile, false /*abort*/); +} + +Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, + GCStats* gc_stats) { + return GCFileAndUpdateLSM(bfile, gc_stats); +} #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index a5c5822bb..8da5bbf65 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -45,7 +46,6 @@ namespace blob_db { class BlobFile; class BlobDBImpl; -struct GCStats; class BlobDBFlushBeginListener : public EventListener { public: @@ -134,6 +134,17 @@ struct blobf_compare_ttl { const std::shared_ptr& rhs) const; }; +struct GCStats { + uint64_t blob_count = 0; + uint64_t num_deletes = 0; + uint64_t deleted_size = 0; + uint64_t num_relocs = 0; + uint64_t succ_deletes_lsm = 0; + uint64_t overrided_while_delete = 0; + uint64_t succ_relocs = 0; + std::shared_ptr newfile = nullptr; +}; + /** * The implementation class for BlobDB. This manages the value * part in TTL aware sequentially written files. These files are @@ -147,6 +158,9 @@ class BlobDBImpl : public BlobDB { friend class BlobDBIterator; public: + static constexpr uint64_t kNoExpiration = + std::numeric_limits::max(); + using rocksdb::StackableDB::Put; Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override; @@ -200,12 +214,16 @@ class BlobDBImpl : public BlobDB { #ifndef NDEBUG Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence); + + std::vector> TEST_GetBlobFiles() const; + + void TEST_CloseBlobFile(std::shared_ptr& bfile); + + Status TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, + GCStats* gc_stats); #endif // !NDEBUG private: - static bool ExtractTTLFromBlob(const Slice& value, Slice* newval, - int32_t* ttl_val); - Status OpenPhase1(); Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, @@ -237,6 +255,9 @@ class BlobDBImpl : public BlobDB { // appends a task into timer queue to close the file void CloseIf(const std::shared_ptr& bfile); + int32_t ExtractExpiration(const Slice& key, const Slice& value, + Slice* value_slice, std::string* new_value); + Status AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, const Slice& value, std::string* index_entry); @@ -346,11 +367,12 @@ class BlobDBImpl : public BlobDB { std::vector>* to_process, uint64_t epoch, uint64_t last_id, size_t files_to_collect); - private: + uint64_t EpochNow() { return env_->NowMicros() / 1000000; } + // the base DB DBImpl* db_impl_; - - Env* myenv_; + Env* env_; + TTLExtractor* ttl_extractor_; // Optimistic Transaction DB used during Garbage collection // for atomicity diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 13ad7a2fa..6a43f6b77 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -25,7 +25,22 @@ class BlobDBTest : public testing::Test { public: const int kMaxBlobSize = 1 << 14; - BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), blob_db_(nullptr) { + class MockEnv : public EnvWrapper { + public: + MockEnv() : EnvWrapper(Env::Default()) {} + + void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; } + + uint64_t NowMicros() override { return now_micros_; } + + private: + uint64_t now_micros_ = 0; + }; + + BlobDBTest() + : dbname_(test::TmpDir() + "/blob_db_test"), + mock_env_(new MockEnv()), + blob_db_(nullptr) { Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); assert(s.ok()); } @@ -59,9 +74,25 @@ class BlobDBTest : public testing::Test { } } + void PutRandomUntil(const std::string &key, int32_t expiration, Random *rnd, + std::map *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = test::RandomHumanReadableString(rnd, len); + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value), + expiration)); + if (data != nullptr) { + (*data)[key] = value; + } + } + void PutRandom(const std::string &key, Random *rnd, std::map *data = nullptr) { - PutRandomWithTTL(key, -1, rnd, data); + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = test::RandomHumanReadableString(rnd, len); + ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value))); + if (data != nullptr) { + (*data)[key] = value; + } } void PutRandomToWriteBatch( @@ -115,6 +146,8 @@ class BlobDBTest : public testing::Test { } const std::string dbname_; + std::unique_ptr mock_env_; + std::shared_ptr ttl_extractor_; BlobDB *blob_db_; }; // class BlobDBTest @@ -130,6 +163,245 @@ TEST_F(BlobDBTest, Put) { VerifyDB(data); } +TEST_F(BlobDBTest, PutWithTTL) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptionsImpl bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map data; + mock_env_->set_now_micros(50 * 1000000); + for (size_t i = 0; i < 100; i++) { + int32_t ttl = rnd.Next() % 100; + PutRandomWithTTL("key" + ToString(i), ttl, &rnd, + (ttl < 50 ? nullptr : &data)); + } + mock_env_->set_now_micros(100 * 1000000); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + bdb_impl->TEST_CloseBlobFile(blob_files[0]); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); + ASSERT_EQ(data.size(), gc_stats.num_relocs); + VerifyDB(data); +} + +TEST_F(BlobDBTest, PutUntil) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptionsImpl bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map data; + mock_env_->set_now_micros(50 * 1000000); + for (size_t i = 0; i < 100; i++) { + int32_t expiration = rnd.Next() % 100 + 50; + PutRandomUntil("key" + ToString(i), expiration, &rnd, + (expiration < 100 ? nullptr : &data)); + } + mock_env_->set_now_micros(100 * 1000000); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + bdb_impl->TEST_CloseBlobFile(blob_files[0]); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); + ASSERT_EQ(data.size(), gc_stats.num_relocs); + VerifyDB(data); +} + +TEST_F(BlobDBTest, TTLExtrator_NoTTL) { + // The default ttl extractor return no ttl for every key. + ttl_extractor_.reset(new TTLExtractor()); + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptionsImpl bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.num_concurrent_simple_blobs = 1; + bdb_options.ttl_extractor = ttl_extractor_; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map data; + mock_env_->set_now_micros(0); + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + // very far in the future.. + mock_env_->set_now_micros(std::numeric_limits::max() - 10); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_FALSE(blob_files[0]->HasTTL()); + bdb_impl->TEST_CloseBlobFile(blob_files[0]); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(0, gc_stats.num_deletes); + ASSERT_EQ(100, gc_stats.num_relocs); + VerifyDB(data); +} + +TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { + Random rnd(301); + class TestTTLExtractor : public TTLExtractor { + public: + explicit TestTTLExtractor(Random *r) : rnd(r) {} + + virtual bool ExtractTTL(const Slice &key, const Slice &value, uint64_t *ttl, + std::string * /*new_value*/, + bool * /*value_changed*/) override { + *ttl = rnd->Next() % 100; + if (*ttl >= 50) { + data[key.ToString()] = value.ToString(); + } + return true; + } + + Random *rnd; + std::map data; + }; + ttl_extractor_.reset(new TestTTLExtractor(&rnd)); + Options options; + options.env = mock_env_.get(); + BlobDBOptionsImpl bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.ttl_extractor = ttl_extractor_; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + mock_env_->set_now_micros(50 * 1000000); + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd); + } + mock_env_->set_now_micros(100 * 1000000); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + bdb_impl->TEST_CloseBlobFile(blob_files[0]); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + auto &data = static_cast(ttl_extractor_.get())->data; + ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); + ASSERT_EQ(data.size(), gc_stats.num_relocs); + VerifyDB(data); +} + +TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { + Random rnd(301); + class TestTTLExtractor : public TTLExtractor { + public: + explicit TestTTLExtractor(Random *r) : rnd(r) {} + + virtual bool ExtractExpiration(const Slice &key, const Slice &value, + uint64_t /*now*/, uint64_t *expiration, + std::string * /*new_value*/, + bool * /*value_changed*/) override { + *expiration = rnd->Next() % 100 + 50; + if (*expiration >= 100) { + data[key.ToString()] = value.ToString(); + } + return true; + } + + Random *rnd; + std::map data; + }; + ttl_extractor_.reset(new TestTTLExtractor(&rnd)); + Options options; + options.env = mock_env_.get(); + BlobDBOptionsImpl bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.ttl_extractor = ttl_extractor_; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + mock_env_->set_now_micros(50 * 1000000); + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd); + } + mock_env_->set_now_micros(100 * 1000000); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + bdb_impl->TEST_CloseBlobFile(blob_files[0]); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + auto &data = static_cast(ttl_extractor_.get())->data; + ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); + ASSERT_EQ(data.size(), gc_stats.num_relocs); + VerifyDB(data); +} + +TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { + class TestTTLExtractor : public TTLExtractor { + public: + const Slice kTTLSuffix = Slice("ttl:"); + + bool ExtractTTL(const Slice & /*key*/, const Slice &value, uint64_t *ttl, + std::string *new_value, bool *value_changed) override { + if (value.size() < 12) { + return false; + } + const char *p = value.data() + value.size() - 12; + if (kTTLSuffix != Slice(p, 4)) { + return false; + } + *ttl = DecodeFixed64(p + 4); + *new_value = Slice(value.data(), value.size() - 12).ToString(); + *value_changed = true; + return true; + } + }; + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptionsImpl bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.ttl_extractor = std::make_shared(); + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map data; + mock_env_->set_now_micros(50 * 1000000); + for (size_t i = 0; i < 100; i++) { + int len = rnd.Next() % kMaxBlobSize + 1; + std::string key = "key" + ToString(i); + std::string value = test::RandomHumanReadableString(&rnd, len); + uint64_t ttl = rnd.Next() % 100; + std::string value_ttl = value + "ttl:"; + PutFixed64(&value_ttl, ttl); + ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl))); + if (ttl >= 50) { + data[key] = value; + } + } + mock_env_->set_now_micros(100 * 1000000); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + bdb_impl->TEST_CloseBlobFile(blob_files[0]); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); + ASSERT_EQ(data.size(), gc_stats.num_relocs); + VerifyDB(data); +} + TEST_F(BlobDBTest, StackableDBGet) { Random rnd(301); BlobDBOptionsImpl bdb_options; diff --git a/utilities/blob_db/blob_log_format.h b/utilities/blob_db/blob_log_format.h index b56cf205c..f4e62fe2d 100644 --- a/utilities/blob_db/blob_log_format.h +++ b/utilities/blob_db/blob_log_format.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -229,6 +230,10 @@ class BlobLogRecord { uint64_t GetBlobSize() const { return blob_size_; } + bool HasTTL() const { + return ttl_val_ != std::numeric_limits::max(); + } + uint32_t GetTTL() const { return ttl_val_; } uint64_t GetTimeVal() const { return time_val_; } diff --git a/utilities/blob_db/ttl_extractor.cc b/utilities/blob_db/ttl_extractor.cc new file mode 100644 index 000000000..735b2f30f --- /dev/null +++ b/utilities/blob_db/ttl_extractor.cc @@ -0,0 +1,31 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "ttl_extractor.h" + +#include "util/coding.h" + +namespace rocksdb { +namespace blob_db { + +bool TTLExtractor::ExtractTTL(const Slice& /*key*/, const Slice& /*value*/, + uint64_t* /*ttl*/, std::string* /*new_value*/, + bool* /*value_changed*/) { + return false; +} + +bool TTLExtractor::ExtractExpiration(const Slice& key, const Slice& value, + uint64_t now, uint64_t* expiration, + std::string* new_value, + bool* value_changed) { + uint64_t ttl; + bool has_ttl = ExtractTTL(key, value, &ttl, new_value, value_changed); + if (has_ttl) { + *expiration = now + ttl; + } + return has_ttl; +} + +} // namespace blob_db +} // namespace rocksdb diff --git a/utilities/blob_db/ttl_extractor.h b/utilities/blob_db/ttl_extractor.h new file mode 100644 index 000000000..51df94451 --- /dev/null +++ b/utilities/blob_db/ttl_extractor.h @@ -0,0 +1,43 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include +#include + +#include "rocksdb/slice.h" + +namespace rocksdb { +namespace blob_db { + +// TTLExtractor allow applications to extract TTL from key-value pairs. +// This useful for applications using Put or WriteBatch to write keys and +// don't intend to migrate to PutWithTTL or PutUntil. +// +// Applications can implement either ExtractTTL or ExtractExpiration. If both +// are implemented, ExtractExpiration will take precedence. +class TTLExtractor { + public: + // Extract TTL from key-value pair. + // Return true if the key has TTL, false otherwise. If key has TTL, + // TTL is pass back through ttl. The method can optionally modify the value, + // pass the result back through new_value, and also set value_changed to true. + virtual bool ExtractTTL(const Slice& key, const Slice& value, uint64_t* ttl, + std::string* new_value, bool* value_changed); + + // Extract expiration time from key-value pair. + // Return true if the key has expiration time, false otherwise. If key has + // expiration time, it is pass back through expiration. The method can + // optionally modify the value, pass the result back through new_value, + // and also set value_changed to true. + virtual bool ExtractExpiration(const Slice& key, const Slice& value, + uint64_t now, uint64_t* expiration, + std::string* new_value, bool* value_changed); + + virtual ~TTLExtractor() = default; +}; + +} // namespace blob_db +} // namespace rocksdb