rocksdb/utilities/blob_db/blob_db_test.cc
Levi Tamasi e8cb32ed67 Introduce BlobFileCache and add support for blob files to Get() (#7540)
Summary:
The patch adds blob file support to the `Get` API by extending `Version` so that
whenever a blob reference is read from a file, the blob is retrieved from the corresponding
blob file and passed back to the caller. (This is assuming the blob reference is valid
and the blob file is actually part of the given `Version`.) It also introduces a cache
of `BlobFileReader`s called `BlobFileCache` that enables sharing `BlobFileReader`s
between callers. `BlobFileCache` uses the same backing cache as `TableCache`, so
`max_open_files` (if specified) limits the total number of open (table + blob) files.

TODO: proactively open/cache blob files and pin the cache handles of the readers in the
metadata objects similarly to what `VersionBuilder::LoadTableHandlers` does for
table files.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7540

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D24260219

Pulled By: ltamasi

fbshipit-source-id: a8a2a4f11d3d04d6082201b52184bc4d7b0857ba
2020-10-15 13:04:47 -07:00

2387 lines
82 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
#include <algorithm>
#include <chrono>
#include <cstdlib>
#include <iomanip>
#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
#include "db/blob/blob_index.h"
#include "db/db_test_util.h"
#include "env/composite_env_wrapper.h"
#include "file/file_util.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/random.h"
#include "util/string_util.h"
#include "utilities/blob_db/blob_db_impl.h"
#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
class BlobDBTest : public testing::Test {
public:
const int kMaxBlobSize = 1 << 14;
struct BlobIndexVersion {
BlobIndexVersion() = default;
BlobIndexVersion(std::string _user_key, uint64_t _file_number,
uint64_t _expiration, SequenceNumber _sequence,
ValueType _type)
: user_key(std::move(_user_key)),
file_number(_file_number),
expiration(_expiration),
sequence(_sequence),
type(_type) {}
std::string user_key;
uint64_t file_number = kInvalidBlobFileNumber;
uint64_t expiration = kNoExpiration;
SequenceNumber sequence = 0;
ValueType type = kTypeValue;
};
BlobDBTest()
: dbname_(test::PerThreadDBPath("blob_db_test")),
mock_env_(new MockTimeEnv(Env::Default())),
fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok());
}
~BlobDBTest() override {
SyncPoint::GetInstance()->ClearAllCallBacks();
Destroy();
}
Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) {
options.create_if_missing = true;
if (options.env == mock_env_.get()) {
// Need to disable stats dumping and persisting which also use
// RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
// With mocked time, this can hang on some platforms (MacOS)
// because (a) on some platforms, pthread_cond_timedwait does not appear
// to release the lock for other threads to operate if the deadline time
// is already passed, and (b) TimedWait calls are currently a bad
// abstraction because the deadline parameter is usually computed from
// Env time, but is interpreted in real clock time.
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
}
return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
}
void Open(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) {
ASSERT_OK(TryOpen(bdb_options, options));
}
void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) {
assert(blob_db_ != nullptr);
delete blob_db_;
blob_db_ = nullptr;
Open(bdb_options, options);
}
void Close() {
assert(blob_db_ != nullptr);
delete blob_db_;
blob_db_ = nullptr;
}
void Destroy() {
if (blob_db_) {
Options options = blob_db_->GetOptions();
BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
delete blob_db_;
blob_db_ = nullptr;
ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
}
}
BlobDBImpl *blob_db_impl() {
return reinterpret_cast<BlobDBImpl *>(blob_db_);
}
Status Put(const Slice &key, const Slice &value,
std::map<std::string, std::string> *data = nullptr) {
Status s = blob_db_->Put(WriteOptions(), key, value);
if (data != nullptr) {
(*data)[key.ToString()] = value.ToString();
}
return s;
}
void Delete(const std::string &key,
std::map<std::string, std::string> *data = nullptr) {
ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
if (data != nullptr) {
data->erase(key);
}
}
Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
std::map<std::string, std::string> *data = nullptr) {
Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
if (data != nullptr) {
(*data)[key.ToString()] = value.ToString();
}
return s;
}
Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
}
void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = rnd->HumanReadableString(len);
ASSERT_OK(
blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
if (data != nullptr) {
(*data)[key] = value;
}
}
void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = rnd->HumanReadableString(len);
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
expiration));
if (data != nullptr) {
(*data)[key] = value;
}
}
void PutRandom(const std::string &key, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
PutRandom(blob_db_, key, rnd, data);
}
void PutRandom(DB *db, const std::string &key, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = rnd->HumanReadableString(len);
ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
if (data != nullptr) {
(*data)[key] = value;
}
}
void PutRandomToWriteBatch(
const std::string &key, Random *rnd, WriteBatch *batch,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = rnd->HumanReadableString(len);
ASSERT_OK(batch->Put(key, value));
if (data != nullptr) {
(*data)[key] = value;
}
}
// Verify blob db contain expected data and nothing more.
void VerifyDB(const std::map<std::string, std::string> &data) {
VerifyDB(blob_db_, data);
}
void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
// Verify normal Get
auto* cfh = db->DefaultColumnFamily();
for (auto &p : data) {
PinnableSlice value_slice;
ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
ASSERT_EQ(p.second, value_slice.ToString());
std::string value;
ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
ASSERT_EQ(p.second, value);
}
// Verify iterators
Iterator *iter = db->NewIterator(ReadOptions());
iter->SeekToFirst();
for (auto &p : data) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(p.first, iter->key().ToString());
ASSERT_EQ(p.second, iter->value().ToString());
iter->Next();
}
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
void VerifyBaseDB(
const std::map<std::string, KeyVersion> &expected_versions) {
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
DB *db = blob_db_->GetRootDB();
const size_t kMaxKeys = 10000;
std::vector<KeyVersion> versions;
GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
ASSERT_EQ(expected_versions.size(), versions.size());
size_t i = 0;
for (auto &key_version : expected_versions) {
const KeyVersion &expected_version = key_version.second;
ASSERT_EQ(expected_version.user_key, versions[i].user_key);
ASSERT_EQ(expected_version.sequence, versions[i].sequence);
ASSERT_EQ(expected_version.type, versions[i].type);
if (versions[i].type == kTypeValue) {
ASSERT_EQ(expected_version.value, versions[i].value);
} else {
ASSERT_EQ(kTypeBlobIndex, versions[i].type);
PinnableSlice value;
ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
versions[i].value, &value));
ASSERT_EQ(expected_version.value, value.ToString());
}
i++;
}
}
void VerifyBaseDBBlobIndex(
const std::map<std::string, BlobIndexVersion> &expected_versions) {
const size_t kMaxKeys = 10000;
std::vector<KeyVersion> versions;
ASSERT_OK(
GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
ASSERT_EQ(versions.size(), expected_versions.size());
size_t i = 0;
for (const auto &expected_pair : expected_versions) {
const BlobIndexVersion &expected_version = expected_pair.second;
ASSERT_EQ(versions[i].user_key, expected_version.user_key);
ASSERT_EQ(versions[i].sequence, expected_version.sequence);
ASSERT_EQ(versions[i].type, expected_version.type);
if (versions[i].type != kTypeBlobIndex) {
ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
ASSERT_EQ(kNoExpiration, expected_version.expiration);
++i;
continue;
}
BlobIndex blob_index;
ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
const uint64_t file_number = !blob_index.IsInlined()
? blob_index.file_number()
: kInvalidBlobFileNumber;
ASSERT_EQ(file_number, expected_version.file_number);
const uint64_t expiration =
blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
ASSERT_EQ(expiration, expected_version.expiration);
++i;
}
}
void InsertBlobs() {
WriteOptions wo;
std::string value;
Random rnd(301);
for (size_t i = 0; i < 100000; i++) {
uint64_t ttl = rnd.Next() % 86400;
PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
}
for (size_t i = 0; i < 10; i++) {
Delete("key" + ToString(i % 500));
}
}
const std::string dbname_;
std::unique_ptr<MockTimeEnv> mock_env_;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
BlobDB *blob_db_;
}; // class BlobDBTest
TEST_F(BlobDBTest, Put) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
VerifyDB(data);
}
TEST_F(BlobDBTest, PutWithTTL) {
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
uint64_t ttl = rnd.Next() % 100;
PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
(ttl <= 50 ? nullptr : &data));
}
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
VerifyDB(data);
}
TEST_F(BlobDBTest, PutUntil) {
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
uint64_t expiration = rnd.Next() % 100 + 50;
PutRandomUntil("key" + ToString(i), expiration, &rnd,
(expiration <= 100 ? nullptr : &data));
}
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
VerifyDB(data);
}
TEST_F(BlobDBTest, StackableDBGet) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
for (size_t i = 0; i < 100; i++) {
StackableDB *db = blob_db_;
ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
std::string key = "key" + ToString(i);
PinnableSlice pinnable_value;
ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
std::string string_value;
ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
ASSERT_EQ(string_value, pinnable_value.ToString());
ASSERT_EQ(string_value, data[key]);
}
}
TEST_F(BlobDBTest, GetExpiration) {
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
mock_env_->set_current_time(100);
Open(bdb_options, options);
Put("key1", "value1");
PutWithTTL("key2", "value2", 200);
PinnableSlice value;
uint64_t expiration;
ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
ASSERT_EQ("value1", value.ToString());
ASSERT_EQ(kNoExpiration, expiration);
ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
ASSERT_EQ("value2", value.ToString());
ASSERT_EQ(300 /* = 100 + 200 */, expiration);
}
TEST_F(BlobDBTest, GetIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0; // Make sure value write to blob file
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
PinnableSlice value;
ASSERT_OK(Put("foo", "bar"));
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
ASSERT_TRUE(s.IsIOError());
// Reactivate file system to allow test to close DB.
fault_injection_env_->SetFilesystemActive(true);
}
TEST_F(BlobDBTest, PutIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0; // Make sure value write to blob file
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
ASSERT_TRUE(Put("foo", "v1").IsIOError());
fault_injection_env_->SetFilesystemActive(true, Status::IOError());
ASSERT_OK(Put("bar", "v1"));
}
TEST_F(BlobDBTest, WriteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
WriteBatch batch;
for (size_t j = 0; j < 10; j++) {
PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
}
blob_db_->Write(WriteOptions(), &batch);
}
VerifyDB(data);
}
TEST_F(BlobDBTest, Delete) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
for (size_t i = 0; i < 100; i += 5) {
Delete("key" + ToString(i), &data);
}
VerifyDB(data);
}
TEST_F(BlobDBTest, DeleteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
WriteBatch batch;
for (size_t i = 0; i < 100; i++) {
batch.Delete("key" + ToString(i));
}
ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
// DB should be empty.
VerifyDB({});
}
TEST_F(BlobDBTest, Override) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (int i = 0; i < 10000; i++) {
PutRandom("key" + ToString(i), &rnd, nullptr);
}
// override all the keys
for (int i = 0; i < 10000; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
VerifyDB(data);
}
#ifdef SNAPPY
TEST_F(BlobDBTest, Compression) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = CompressionType::kSnappyCompression;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("put-key" + ToString(i), &rnd, &data);
}
for (int i = 0; i < 100; i++) {
WriteBatch batch;
for (size_t j = 0; j < 10; j++) {
PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
&batch, &data);
}
blob_db_->Write(WriteOptions(), &batch);
}
VerifyDB(data);
}
TEST_F(BlobDBTest, DecompressAfterReopen) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = CompressionType::kSnappyCompression;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("put-key" + ToString(i), &rnd, &data);
}
VerifyDB(data);
bdb_options.compression = CompressionType::kNoCompression;
Reopen(bdb_options);
VerifyDB(data);
}
TEST_F(BlobDBTest, EnableDisableCompressionGC) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = kSnappyCompression;
Open(bdb_options);
std::map<std::string, std::string> data;
size_t data_idx = 0;
for (; data_idx < 100; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());
// disable compression
bdb_options.compression = kNoCompression;
Reopen(bdb_options);
// Add more data with new compression type
for (; data_idx < 200; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());
// Trigger compaction
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
VerifyDB(data);
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
}
// enabling the compression again
bdb_options.compression = kSnappyCompression;
Reopen(bdb_options);
// Add more data with new compression type
for (; data_idx < 300; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
// Trigger compaction
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
VerifyDB(data);
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
}
}
#ifdef LZ4
// Test switch compression types and run GC, it needs both Snappy and LZ4
// support.
TEST_F(BlobDBTest, ChangeCompressionGC) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = kLZ4Compression;
Open(bdb_options);
std::map<std::string, std::string> data;
size_t data_idx = 0;
for (; data_idx < 100; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());
// Change compression type
bdb_options.compression = kSnappyCompression;
Reopen(bdb_options);
// Add more data with Snappy compression type
for (; data_idx < 200; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
// Verify blob file compression type
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDB(data);
blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
}
// Disable compression
bdb_options.compression = kNoCompression;
Reopen(bdb_options);
for (; data_idx < 300; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDB(data);
blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
}
// switching different compression types to generate mixed compression types
bdb_options.compression = kSnappyCompression;
Reopen(bdb_options);
for (; data_idx < 400; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
bdb_options.compression = kLZ4Compression;
Reopen(bdb_options);
for (; data_idx < 500; data_idx++) {
PutRandom("put-key" + ToString(data_idx), &rnd, &data);
}
VerifyDB(data);
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDB(data);
blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
for (auto bfile : blob_files) {
ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
}
}
#endif // LZ4
#endif // SNAPPY
TEST_F(BlobDBTest, MultipleWriters) {
Open(BlobDBOptions());
std::vector<port::Thread> workers;
std::vector<std::map<std::string, std::string>> data_set(10);
for (uint32_t i = 0; i < 10; i++)
workers.push_back(port::Thread(
[&](uint32_t id) {
Random rnd(301 + id);
for (int j = 0; j < 100; j++) {
std::string key = "key" + ToString(id) + "_" + ToString(j);
if (id < 5) {
PutRandom(key, &rnd, &data_set[id]);
} else {
WriteBatch batch;
PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
blob_db_->Write(WriteOptions(), &batch);
}
}
},
i));
std::map<std::string, std::string> data;
for (size_t i = 0; i < 10; i++) {
workers[i].join();
data.insert(data_set[i].begin(), data_set[i].end());
}
VerifyDB(data);
}
TEST_F(BlobDBTest, SstFileManager) {
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
NewSstFileManager(mock_env_.get()));
sst_file_manager->SetDeleteRateBytesPerSecond(1);
SstFileManagerImpl *sfm =
static_cast<SstFileManagerImpl *>(sst_file_manager.get());
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
Options db_options;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
assert(arg);
const std::string *const file_path =
static_cast<const std::string *>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
SyncPoint::GetInstance()->EnableProcessing();
db_options.sst_file_manager = sst_file_manager;
Open(bdb_options, db_options);
// Create one obselete file and clean it.
blob_db_->Put(WriteOptions(), "foo", "bar");
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
// Even if SSTFileManager is not set, DB is creating a dummy one.
ASSERT_EQ(1, files_scheduled_to_delete);
Destroy();
// Make sure that DestroyBlobDB() also goes through delete scheduler.
ASSERT_EQ(2, files_scheduled_to_delete);
SyncPoint::GetInstance()->DisableProcessing();
sfm->WaitForEmptyTrash();
}
TEST_F(BlobDBTest, SstFileManagerRestart) {
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
assert(arg);
const std::string *const file_path =
static_cast<const std::string *>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
NewSstFileManager(mock_env_.get()));
sst_file_manager->SetDeleteRateBytesPerSecond(1);
SstFileManagerImpl *sfm =
static_cast<SstFileManagerImpl *>(sst_file_manager.get());
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
Options db_options;
SyncPoint::GetInstance()->EnableProcessing();
db_options.sst_file_manager = sst_file_manager;
Open(bdb_options, db_options);
std::string blob_dir = blob_db_impl()->TEST_blob_dir();
blob_db_->Put(WriteOptions(), "foo", "bar");
Close();
// Create 3 dummy trash files under the blob_dir
LegacyFileSystemWrapper fs(db_options.env);
CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
// Make sure that reopening the DB rescan the existing trash files
Open(bdb_options, db_options);
ASSERT_EQ(files_scheduled_to_delete, 2);
sfm->WaitForEmptyTrash();
// There should be exact one file under the blob dir now.
std::vector<std::string> all_files;
ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
int nfiles = 0;
for (const auto &f : all_files) {
assert(!f.empty());
if (f[0] == '.') {
continue;
}
nfiles++;
}
ASSERT_EQ(nfiles, 1);
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
// i = when to take snapshot
for (int i = 0; i < 4; i++) {
Destroy();
Open(bdb_options);
const Snapshot *snapshot = nullptr;
// First file
ASSERT_OK(Put("key1", "value"));
if (i == 0) {
snapshot = blob_db_->GetSnapshot();
}
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
// Second file
ASSERT_OK(Put("key2", "value"));
if (i == 1) {
snapshot = blob_db_->GetSnapshot();
}
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
auto bfile = blob_files[1];
ASSERT_FALSE(bfile->Immutable());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
// Third file
ASSERT_OK(Put("key3", "value"));
if (i == 2) {
snapshot = blob_db_->GetSnapshot();
}
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_TRUE(bfile->Obsolete());
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
Delete("key2");
if (i == 3) {
snapshot = blob_db_->GetSnapshot();
}
ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (i >= 2) {
// The snapshot shouldn't see data in bfile
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
} else {
// The snapshot will see data in bfile, so the file shouldn't be deleted
ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
}
}
}
TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
Options options;
options.env = mock_env_.get();
mock_env_->set_current_time(0);
Open(BlobDBOptions(), options);
ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
ColumnFamilyHandle *handle = nullptr;
std::string value;
std::vector<std::string> values;
// The call simply pass through to base db. It should succeed.
ASSERT_OK(
blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
.IsNotSupported());
ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
.IsNotSupported());
WriteBatch batch;
batch.Put("k1", "v1");
batch.Put(handle, "k2", "v2");
ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
ASSERT_TRUE(
blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
{"k1", "k2"}, &values);
ASSERT_EQ(2, statuses.size());
ASSERT_TRUE(statuses[0].IsNotSupported());
ASSERT_TRUE(statuses[1].IsNotSupported());
ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
delete handle;
}
TEST_F(BlobDBTest, GetLiveFilesMetaData) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.blob_dir = "blob_dir";
bdb_options.path_relative = true;
bdb_options.ttl_range_secs = 10;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
Open(bdb_options, options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
constexpr uint64_t expiration = 1000ULL;
PutRandomUntil("key100", expiration, &rnd, &data);
std::vector<LiveFileMetaData> metadata;
blob_db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(2U, metadata.size());
// Path should be relative to db_name, but begin with slash.
const std::string filename1("/blob_dir/000001.blob");
ASSERT_EQ(filename1, metadata[0].name);
ASSERT_EQ(1, metadata[0].file_number);
ASSERT_EQ(0, metadata[0].oldest_ancester_time);
ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name);
const std::string filename2("/blob_dir/000002.blob");
ASSERT_EQ(filename2, metadata[1].name);
ASSERT_EQ(2, metadata[1].file_number);
ASSERT_EQ(expiration, metadata[1].oldest_ancester_time);
ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name);
std::vector<std::string> livefile;
uint64_t mfs;
ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
ASSERT_EQ(5U, livefile.size());
ASSERT_EQ(filename1, livefile[3]);
ASSERT_EQ(filename2, livefile[4]);
VerifyDB(data);
}
TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
constexpr size_t kNumKey = 20;
constexpr size_t kNumIteration = 10;
Random rnd(301);
std::map<std::string, std::string> data;
std::vector<bool> is_blob(kNumKey, false);
// Write to plain rocksdb.
Options options;
options.create_if_missing = true;
DB *db = nullptr;
ASSERT_OK(DB::Open(options, dbname_, &db));
for (size_t i = 0; i < kNumIteration; i++) {
auto key_index = rnd.Next() % kNumKey;
std::string key = "key" + ToString(key_index);
PutRandom(db, key, &rnd, &data);
}
VerifyDB(db, data);
delete db;
db = nullptr;
// Open as blob db. Verify it can read existing data.
Open();
VerifyDB(blob_db_, data);
for (size_t i = 0; i < kNumIteration; i++) {
auto key_index = rnd.Next() % kNumKey;
std::string key = "key" + ToString(key_index);
is_blob[key_index] = true;
PutRandom(blob_db_, key, &rnd, &data);
}
VerifyDB(blob_db_, data);
delete blob_db_;
blob_db_ = nullptr;
// Verify plain db return error for keys written by blob db.
ASSERT_OK(DB::Open(options, dbname_, &db));
std::string value;
for (size_t i = 0; i < kNumKey; i++) {
std::string key = "key" + ToString(i);
Status s = db->Get(ReadOptions(), key, &value);
if (data.count(key) == 0) {
ASSERT_TRUE(s.IsNotFound());
} else if (is_blob[i]) {
ASSERT_TRUE(s.IsCorruption());
} else {
ASSERT_OK(s);
ASSERT_EQ(data[key], value);
}
}
delete db;
}
// Test to verify that a NoSpace IOError Status is returned on reaching
// max_db_size limit.
TEST_F(BlobDBTest, OutOfSpace) {
// Use mock env to stop wall clock.
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.max_db_size = 200;
bdb_options.is_fifo = false;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
// Each stored blob has an overhead of about 42 bytes currently.
// So a small key + a 100 byte blob should take up ~150 bytes in the db.
std::string value(100, 'v');
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
// Putting another blob should fail as ading it would exceed the max_db_size
// limit.
Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
ASSERT_TRUE(s.IsIOError());
ASSERT_TRUE(s.IsNoSpace());
}
TEST_F(BlobDBTest, FIFOEviction) {
BlobDBOptions bdb_options;
bdb_options.max_db_size = 200;
bdb_options.blob_file_size = 100;
bdb_options.is_fifo = true;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::atomic<int> evict_count{0};
SyncPoint::GetInstance()->SetCallBack(
"BlobDBImpl::EvictOldestBlobFile:Evicted",
[&](void *) { evict_count++; });
SyncPoint::GetInstance()->EnableProcessing();
// Each stored blob has an overhead of 32 bytes currently.
// So a 100 byte blob should take up 132 bytes.
std::string value(100, 'v');
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
VerifyDB({{"key1", value}});
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
// Adding another 100 bytes blob would take the total size to 264 bytes
// (2*132). max_db_size will be exceeded
// than max_db_size and trigger FIFO eviction.
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
ASSERT_EQ(1, evict_count);
// key1 will exist until corresponding file be deleted.
VerifyDB({{"key1", value}, {"key2", value}});
// Adding another 100 bytes blob without TTL.
ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
ASSERT_EQ(2, evict_count);
// key1 and key2 will exist until corresponding file be deleted.
VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
// The fourth blob file, without TTL.
ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
ASSERT_EQ(3, evict_count);
VerifyDB(
{{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(4, blob_files.size());
ASSERT_TRUE(blob_files[0]->Obsolete());
ASSERT_TRUE(blob_files[1]->Obsolete());
ASSERT_TRUE(blob_files[2]->Obsolete());
ASSERT_FALSE(blob_files[3]->Obsolete());
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(3, obsolete_files.size());
ASSERT_EQ(blob_files[0], obsolete_files[0]);
ASSERT_EQ(blob_files[1], obsolete_files[1]);
ASSERT_EQ(blob_files[2], obsolete_files[2]);
blob_db_impl()->TEST_DeleteObsoleteFiles();
obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_TRUE(obsolete_files.empty());
VerifyDB({{"key4", value}});
}
TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
Options options;
BlobDBOptions bdb_options;
bdb_options.max_db_size = 1000;
bdb_options.blob_file_size = 5000;
bdb_options.is_fifo = true;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::atomic<int> evict_count{0};
SyncPoint::GetInstance()->SetCallBack(
"BlobDBImpl::EvictOldestBlobFile:Evicted",
[&](void *) { evict_count++; });
SyncPoint::GetInstance()->EnableProcessing();
std::string value(2000, 'v');
ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
ASSERT_EQ(0, evict_count);
}
TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
BlobDBOptions bdb_options;
bdb_options.is_fifo = true;
bdb_options.min_blob_size = 100;
bdb_options.disable_background_tasks = true;
Options options;
// Use mock env to stop wall clock.
options.env = mock_env_.get();
options.disable_auto_compactions = true;
auto statistics = CreateDBStatistics();
options.statistics = statistics;
Open(bdb_options, options);
ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
std::string small_value(50, 'v');
std::map<std::string, std::string> data;
// Insert some data into LSM tree to make sure FIFO eviction take SST
// file size into account.
for (int i = 0; i < 1000; i++) {
ASSERT_OK(Put("key" + ToString(i), small_value, &data));
}
ASSERT_OK(blob_db_->Flush(FlushOptions()));
uint64_t live_sst_size = 0;
ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
&live_sst_size));
ASSERT_TRUE(live_sst_size > 0);
ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
bdb_options.max_db_size = live_sst_size + 2000;
Reopen(bdb_options, options);
ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
std::string value_1k(1000, 'v');
ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
VerifyDB(data);
// large_key2 evicts large_key1
ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
blob_db_impl()->TEST_DeleteObsoleteFiles();
data.erase("large_key1");
VerifyDB(data);
// large_key3 get no enough space even after evicting large_key2, so it
// instead return no space error.
std::string value_2k(2000, 'v');
ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
// Verify large_key2 still exists.
VerifyDB(data);
}
// Test flush or compaction will trigger FIFO eviction since they update
// total SST file size.
TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
BlobDBOptions bdb_options;
bdb_options.max_db_size = 1000;
bdb_options.is_fifo = true;
bdb_options.min_blob_size = 100;
bdb_options.disable_background_tasks = true;
Options options;
// Use mock env to stop wall clock.
options.env = mock_env_.get();
auto statistics = CreateDBStatistics();
options.statistics = statistics;
options.compression = kNoCompression;
Open(bdb_options, options);
std::string value(800, 'v');
ASSERT_OK(PutWithTTL("large_key", value, 60));
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
VerifyDB({{"large_key", value}});
// Insert some small keys and flush to bring DB out of space.
std::map<std::string, std::string> data;
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("key" + ToString(i), "v", &data));
}
ASSERT_OK(blob_db_->Flush(FlushOptions()));
// Verify large_key is deleted by FIFO eviction.
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
VerifyDB(data);
}
TEST_F(BlobDBTest, InlineSmallValues) {
constexpr uint64_t kMaxExpiration = 1000;
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = kMaxExpiration;
bdb_options.min_blob_size = 100;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
mock_env_->set_current_time(0);
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, KeyVersion> versions;
for (size_t i = 0; i < 1000; i++) {
bool is_small_value = rnd.Next() % 2;
bool has_ttl = rnd.Next() % 2;
uint64_t expiration = rnd.Next() % kMaxExpiration;
int len = is_small_value ? 50 : 200;
std::string key = "key" + ToString(i);
std::string value = rnd.HumanReadableString(len);
std::string blob_index;
data[key] = value;
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
if (!has_ttl) {
ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
} else {
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
}
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
versions[key] =
KeyVersion(key, value, sequence,
(is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
}
VerifyDB(data);
VerifyBaseDB(versions);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
std::shared_ptr<BlobFile> non_ttl_file;
std::shared_ptr<BlobFile> ttl_file;
if (blob_files[0]->HasTTL()) {
ttl_file = blob_files[0];
non_ttl_file = blob_files[1];
} else {
non_ttl_file = blob_files[0];
ttl_file = blob_files[1];
}
ASSERT_FALSE(non_ttl_file->HasTTL());
ASSERT_TRUE(ttl_file->HasTTL());
}
TEST_F(BlobDBTest, UserCompactionFilter) {
class CustomerFilter : public CompactionFilter {
public:
bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
std::string *new_value, bool *value_changed) const override {
*value_changed = false;
// changing value size to test value transitions between inlined data
// and stored-in-blob data
if (value.size() % 4 == 1) {
*new_value = value.ToString();
// double size by duplicating value
*new_value += *new_value;
*value_changed = true;
return false;
} else if (value.size() % 3 == 1) {
*new_value = value.ToString();
// trancate value size by half
*new_value = new_value->substr(0, new_value->size() / 2);
*value_changed = true;
return false;
} else if (value.size() % 2 == 1) {
return true;
}
return false;
}
bool IgnoreSnapshots() const override { return true; }
const char *Name() const override { return "CustomerFilter"; }
};
class CustomerFilterFactory : public CompactionFilterFactory {
const char *Name() const override { return "CustomerFilterFactory"; }
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context & /*context*/) override {
return std::unique_ptr<CompactionFilter>(new CustomerFilter());
}
};
constexpr size_t kNumPuts = 1 << 10;
// Generate both inlined and blob value
constexpr uint64_t kMinValueSize = 1 << 6;
constexpr uint64_t kMaxValueSize = 1 << 8;
constexpr uint64_t kMinBlobSize = 1 << 7;
static_assert(kMinValueSize < kMinBlobSize, "");
static_assert(kMaxValueSize > kMinBlobSize, "");
BlobDBOptions bdb_options;
bdb_options.min_blob_size = kMinBlobSize;
bdb_options.blob_file_size = kMaxValueSize * 10;
bdb_options.disable_background_tasks = true;
if (Snappy_Supported()) {
bdb_options.compression = CompressionType::kSnappyCompression;
}
// case_num == 0: Test user defined compaction filter
// case_num == 1: Test user defined compaction filter factory
for (int case_num = 0; case_num < 2; case_num++) {
Options options;
if (case_num == 0) {
options.compaction_filter = new CustomerFilter();
} else {
options.compaction_filter_factory.reset(new CustomerFilterFactory());
}
options.disable_auto_compactions = true;
options.env = mock_env_.get();
options.statistics = CreateDBStatistics();
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, std::string> data_after_compact;
Random rnd(301);
uint64_t value_size = kMinValueSize;
int drop_record = 0;
for (size_t i = 0; i < kNumPuts; ++i) {
std::ostringstream oss;
oss << "key" << std::setw(4) << std::setfill('0') << i;
const std::string key(oss.str());
const std::string value = rnd.HumanReadableString((int)value_size);
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
if (value.length() % 4 == 1) {
data_after_compact[key] = value + value;
} else if (value.length() % 3 == 1) {
data_after_compact[key] = value.substr(0, value.size() / 2);
} else if (value.length() % 2 == 1) {
++drop_record;
} else {
data_after_compact[key] = value;
}
if (++value_size > kMaxValueSize) {
value_size = kMinValueSize;
}
}
// Verify full data set
VerifyDB(data);
// Applying compaction filter for records
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Verify data after compaction, only value with even length left.
VerifyDB(data_after_compact);
ASSERT_EQ(drop_record,
options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
delete options.compaction_filter;
Destroy();
}
}
// Test user comapction filter when there is IO error on blob data.
TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
class CustomerFilter : public CompactionFilter {
public:
bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
std::string *new_value, bool *value_changed) const override {
*new_value = value.ToString() + "_new";
*value_changed = true;
return false;
}
bool IgnoreSnapshots() const override { return true; }
const char *Name() const override { return "CustomerFilter"; }
};
constexpr size_t kNumPuts = 100;
constexpr int kValueSize = 100;
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = kValueSize * 10;
bdb_options.disable_background_tasks = true;
bdb_options.compression = CompressionType::kNoCompression;
std::vector<std::string> io_failure_cases = {
"BlobDBImpl::CreateBlobFileAndWriter",
"BlobIndexCompactionFilterBase::WriteBlobToNewFile",
"BlobDBImpl::CloseBlobFile"};
for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
Options options;
options.compaction_filter = new CustomerFilter();
options.disable_auto_compactions = true;
options.env = fault_injection_env_.get();
options.statistics = CreateDBStatistics();
Open(bdb_options, options);
std::map<std::string, std::string> data;
Random rnd(301);
for (size_t i = 0; i < kNumPuts; ++i) {
std::ostringstream oss;
oss << "key" << std::setw(4) << std::setfill('0') << i;
const std::string key(oss.str());
const std::string value = rnd.HumanReadableString(kValueSize);
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
}
// Verify full data set
VerifyDB(data);
SyncPoint::GetInstance()->SetCallBack(
io_failure_cases[case_num], [&](void * /*arg*/) {
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
});
SyncPoint::GetInstance()->EnableProcessing();
auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_TRUE(s.IsIOError());
// Reactivate file system to allow test to verify and close DB.
fault_injection_env_->SetFilesystemActive(true);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// Verify full data set after compaction failure
VerifyDB(data);
delete options.compaction_filter;
Destroy();
}
}
// Test comapction filter should remove any expired blob index.
TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
constexpr size_t kNumKeys = 100;
constexpr size_t kNumPuts = 1000;
constexpr uint64_t kMaxExpiration = 1000;
constexpr uint64_t kCompactTime = 500;
constexpr uint64_t kMinBlobSize = 100;
Random rnd(301);
mock_env_->set_current_time(0);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = kMinBlobSize;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, std::string> data_after_compact;
for (size_t i = 0; i < kNumPuts; i++) {
bool is_small_value = rnd.Next() % 2;
bool has_ttl = rnd.Next() % 2;
uint64_t expiration = rnd.Next() % kMaxExpiration;
int len = is_small_value ? 10 : 200;
std::string key = "key" + ToString(rnd.Next() % kNumKeys);
std::string value = rnd.HumanReadableString(len);
if (!has_ttl) {
if (is_small_value) {
std::string blob_entry;
BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
// Fake blob index with TTL. See what it will do.
ASSERT_GT(kMinBlobSize, blob_entry.size());
value = blob_entry;
}
ASSERT_OK(Put(key, value));
data_after_compact[key] = value;
} else {
ASSERT_OK(PutUntil(key, value, expiration));
if (expiration <= kCompactTime) {
data_after_compact.erase(key);
} else {
data_after_compact[key] = value;
}
}
data[key] = value;
}
VerifyDB(data);
mock_env_->set_current_time(kCompactTime);
// Take a snapshot before compaction. Make sure expired blob indexes is
// filtered regardless of snapshot.
const Snapshot *snapshot = blob_db_->GetSnapshot();
// Issue manual compaction to trigger compaction filter.
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_->ReleaseSnapshot(snapshot);
// Verify expired blob index are filtered.
std::vector<KeyVersion> versions;
const size_t kMaxKeys = 10000;
GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
ASSERT_EQ(data_after_compact.size(), versions.size());
for (auto &version : versions) {
ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
}
VerifyDB(data_after_compact);
}
// Test compaction filter should remove any blob index where corresponding
// blob file has been removed.
TEST_F(BlobDBTest, FilterFileNotAvailable) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Options options;
options.disable_auto_compactions = true;
Open(bdb_options, options);
ASSERT_OK(Put("foo", "v1"));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
ASSERT_OK(Put("bar", "v2"));
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
const size_t kMaxKeys = 10000;
DB *base_db = blob_db_->GetRootDB();
std::vector<KeyVersion> versions;
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
ASSERT_EQ(2, versions.size());
ASSERT_EQ("bar", versions[0].user_key);
ASSERT_EQ("foo", versions[1].user_key);
VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
ASSERT_EQ(2, versions.size());
ASSERT_EQ("bar", versions[0].user_key);
ASSERT_EQ("foo", versions[1].user_key);
VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
// Remove the first blob file and compact. foo should be remove from base db.
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
ASSERT_EQ(1, versions.size());
ASSERT_EQ("bar", versions[0].user_key);
VerifyDB({{"bar", "v2"}});
// Remove the second blob file and compact. bar should be remove from base db.
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
ASSERT_EQ(0, versions.size());
VerifyDB({});
}
// Test compaction filter should filter any inlined TTL keys that would have
// been dropped by last FIFO eviction if they are store out-of-line.
TEST_F(BlobDBTest, FilterForFIFOEviction) {
Random rnd(215);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 100;
bdb_options.ttl_range_secs = 60;
bdb_options.max_db_size = 0;
bdb_options.disable_background_tasks = true;
Options options;
// Use mock env to stop wall clock.
mock_env_->set_current_time(0);
options.env = mock_env_.get();
auto statistics = CreateDBStatistics();
options.statistics = statistics;
options.disable_auto_compactions = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, std::string> data_after_compact;
// Insert some small values that will be inlined.
for (int i = 0; i < 1000; i++) {
std::string key = "key" + ToString(i);
std::string value = rnd.HumanReadableString(50);
uint64_t ttl = rnd.Next() % 120 + 1;
ASSERT_OK(PutWithTTL(key, value, ttl, &data));
if (ttl >= 60) {
data_after_compact[key] = value;
}
}
uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
ASSERT_OK(blob_db_->Flush(FlushOptions()));
uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
ASSERT_GT(live_sst_size, 0);
VerifyDB(data);
bdb_options.max_db_size = live_sst_size + 30000;
bdb_options.is_fifo = true;
Reopen(bdb_options, options);
VerifyDB(data);
// Put two large values, each on a different blob file.
std::string large_value(10000, 'v');
ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
data["large_key1"] = large_value;
data["large_key2"] = large_value;
VerifyDB(data);
// Put a third large value which will bring the DB out of space.
// FIFO eviction will evict the file of large_key1.
ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
data.erase("large_key1");
data["large_key3"] = large_value;
VerifyDB(data);
// Putting some more small values. These values shouldn't be evicted by
// compaction filter since they are inserted after FIFO eviction.
ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
// FIFO eviction doesn't trigger again since there enough room for the flush.
ASSERT_OK(blob_db_->Flush(FlushOptions()));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
// Manual compact and check if compaction filter evict those keys with
// expiration < 60.
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// All keys with expiration < 60, plus large_key1 is filtered by
// compaction filter.
ASSERT_EQ(num_keys_to_evict + 1,
statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
data_after_compact["large_key2"] = large_value;
data_after_compact["large_key3"] = large_value;
VerifyDB(data_after_compact);
}
TEST_F(BlobDBTest, GarbageCollection) {
constexpr size_t kNumPuts = 1 << 10;
constexpr uint64_t kExpiration = 1000;
constexpr uint64_t kCompactTime = 500;
constexpr uint64_t kKeySize = 7; // "key" + 4 digits
constexpr uint64_t kSmallValueSize = 1 << 6;
constexpr uint64_t kLargeValueSize = 1 << 8;
constexpr uint64_t kMinBlobSize = 1 << 7;
static_assert(kSmallValueSize < kMinBlobSize, "");
static_assert(kLargeValueSize > kMinBlobSize, "");
constexpr size_t kBlobsPerFile = 8;
constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
constexpr uint64_t kBlobFileSize =
BlobLogHeader::kSize +
(BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
BlobDBOptions bdb_options;
bdb_options.min_blob_size = kMinBlobSize;
bdb_options.blob_file_size = kBlobFileSize;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 0.25;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
options.statistics = CreateDBStatistics();
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, KeyVersion> blob_value_versions;
std::map<std::string, BlobIndexVersion> blob_index_versions;
Random rnd(301);
// Add a bunch of large non-TTL values. These will be written to non-TTL
// blob files and will be subject to GC.
for (size_t i = 0; i < kNumPuts; ++i) {
std::ostringstream oss;
oss << "key" << std::setw(4) << std::setfill('0') << i;
const std::string key(oss.str());
const std::string value = rnd.HumanReadableString(kLargeValueSize);
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
blob_index_versions[key] =
BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
sequence, kTypeBlobIndex);
}
// Add some small and/or TTL values that will be ignored during GC.
// First, add a large TTL value will be written to its own TTL blob file.
{
const std::string key("key2000");
const std::string value = rnd.HumanReadableString(kLargeValueSize);
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(PutUntil(key, value, kExpiration));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
blob_index_versions[key] =
BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
sequence, kTypeBlobIndex);
}
// Now add a small TTL value (which will be inlined).
{
const std::string key("key3000");
const std::string value = rnd.HumanReadableString(kSmallValueSize);
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(PutUntil(key, value, kExpiration));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
blob_index_versions[key] = BlobIndexVersion(
key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
}
// Finally, add a small non-TTL value (which will be stored as a regular
// value).
{
const std::string key("key4000");
const std::string value = rnd.HumanReadableString(kSmallValueSize);
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
blob_index_versions[key] = BlobIndexVersion(
key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
}
VerifyDB(data);
VerifyBaseDB(blob_value_versions);
VerifyBaseDBBlobIndex(blob_index_versions);
// At this point, we should have 128 immutable non-TTL files with file numbers
// 1..128.
{
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
for (size_t i = 0; i < kNumBlobFiles; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
ASSERT_EQ(live_imm_files[i]->GetFileSize(),
kBlobFileSize + BlobLogFooter::kSize);
}
}
mock_env_->set_current_time(kCompactTime);
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// We expect the data to remain the same and the blobs from the oldest N files
// to be moved to new files. Sequence numbers get zeroed out during the
// compaction.
VerifyDB(data);
for (auto &pair : blob_value_versions) {
KeyVersion &version = pair.second;
version.sequence = 0;
}
VerifyBaseDB(blob_value_versions);
const uint64_t cutoff = static_cast<uint64_t>(
bdb_options.garbage_collection_cutoff * kNumBlobFiles);
for (auto &pair : blob_index_versions) {
BlobIndexVersion &version = pair.second;
version.sequence = 0;
if (version.file_number == kInvalidBlobFileNumber) {
continue;
}
if (version.file_number > cutoff) {
continue;
}
version.file_number += kNumBlobFiles + 1;
}
VerifyBaseDBBlobIndex(blob_index_versions);
const Statistics *const statistics = options.statistics.get();
assert(statistics);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
cutoff * kBlobsPerFile);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
cutoff * kBlobsPerFile * kLargeValueSize);
// At this point, we should have 128 immutable non-TTL files with file numbers
// 33..128 and 130..161. (129 was taken by the TTL blob file.)
{
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
for (size_t i = 0; i < kNumBlobFiles; ++i) {
uint64_t expected_file_number = i + cutoff + 1;
if (expected_file_number > kNumBlobFiles) {
++expected_file_number;
}
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
ASSERT_EQ(live_imm_files[i]->GetFileSize(),
kBlobFileSize + BlobLogFooter::kSize);
}
}
}
TEST_F(BlobDBTest, GarbageCollectionFailure) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
Options db_options;
db_options.statistics = CreateDBStatistics();
Open(bdb_options, db_options);
// Write a couple of valid blobs.
Put("foo", "bar");
Put("dead", "beef");
// Write a fake blob reference into the base DB that cannot be parsed.
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(
&batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
"not a valid blob index"));
ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
auto blob_file = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
.IsCorruption());
const Statistics *const statistics = db_options.statistics.get();
assert(statistics);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
}
// File should be evicted after expiration.
TEST_F(BlobDBTest, EvictExpiredFile) {
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 100;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
Open(bdb_options, options);
mock_env_->set_current_time(50);
std::map<std::string, std::string> data;
ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files[0];
ASSERT_FALSE(blob_file->Immutable());
ASSERT_FALSE(blob_file->Obsolete());
VerifyDB(data);
mock_env_->set_current_time(250);
// The key should expired now.
blob_db_impl()->TEST_EvictExpiredFiles();
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
ASSERT_TRUE(blob_file->Immutable());
ASSERT_TRUE(blob_file->Obsolete());
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
// Make sure we don't return garbage value after blob file being evicted,
// but the blob index still exists in the LSM tree.
std::string val = "";
ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
ASSERT_EQ("", val);
}
TEST_F(BlobDBTest, DisableFileDeletions) {
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (bool force : {true, false}) {
ASSERT_OK(Put("foo", "v", &data));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
// Call DisableFileDeletions twice.
ASSERT_OK(blob_db_->DisableFileDeletions());
ASSERT_OK(blob_db_->DisableFileDeletions());
// File deletions should be disabled.
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB(data);
// Enable file deletions once. If force=true, file deletion is enabled.
// Otherwise it needs to enable it for a second time.
ASSERT_OK(blob_db_->EnableFileDeletions(force));
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (!force) {
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB(data);
// Call EnableFileDeletions a second time.
ASSERT_OK(blob_db_->EnableFileDeletions(false));
blob_db_impl()->TEST_DeleteObsoleteFiles();
}
// Regardless of value of `force`, file should be deleted by now.
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB({});
}
}
TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
BlobDBOptions bdb_options;
bdb_options.enable_garbage_collection = true;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
// Register some dummy blob files.
blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
// Initialize the blob <-> SST file mapping. First, add some SST files with
// blob file references, then some without.
std::vector<LiveFileMetaData> live_files;
for (uint64_t i = 1; i <= 10; ++i) {
LiveFileMetaData live_file;
live_file.file_number = i;
live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
live_files.emplace_back(live_file);
}
for (uint64_t i = 11; i <= 20; ++i) {
LiveFileMetaData live_file;
live_file.file_number = i;
live_files.emplace_back(live_file);
}
blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
// Check that the blob <-> SST mappings have been correctly initialized.
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(blob_files.size(), 5);
{
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 5);
for (size_t i = 0; i < 5; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
}
ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
}
{
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
const std::vector<bool> expected_obsolete{false, false, false, false,
false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 5);
for (size_t i = 0; i < 5; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
}
ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
}
// Simulate a flush where the SST does not reference any blob files.
{
FlushJobInfo info{};
info.file_number = 21;
info.smallest_seqno = 1;
info.largest_seqno = 100;
blob_db_impl()->TEST_ProcessFlushJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
const std::vector<bool> expected_obsolete{false, false, false, false,
false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 5);
for (size_t i = 0; i < 5; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
}
ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
}
// Simulate a flush where the SST references a blob file.
{
FlushJobInfo info{};
info.file_number = 22;
info.oldest_blob_file_number = 5;
info.smallest_seqno = 101;
info.largest_seqno = 200;
blob_db_impl()->TEST_ProcessFlushJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
const std::vector<bool> expected_obsolete{false, false, false, false,
false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 5);
for (size_t i = 0; i < 5; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
}
ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
}
// Simulate a compaction. Some inputs and outputs have blob file references,
// some don't. There is also a trivial move (which means the SST appears on
// both the input and the output list). Blob file 1 loses all its linked SSTs,
// and since it got marked immutable at sequence number 200 which has already
// been flushed, it can be marked obsolete.
{
CompactionJobInfo info{};
info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
info.input_file_infos.emplace_back(
CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
info.output_file_infos.emplace_back(
CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
const std::vector<bool> expected_obsolete{true, false, false, false, false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 4);
for (size_t i = 0; i < 4; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
}
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(obsolete_files.size(), 1);
ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
}
// Simulate a failed compaction. No mappings should be updated.
{
CompactionJobInfo info{};
info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
info.status = Status::Corruption();
blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
const std::vector<bool> expected_obsolete{true, false, false, false, false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 4);
for (size_t i = 0; i < 4; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
}
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(obsolete_files.size(), 1);
ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
}
// Simulate another compaction. Blob file 2 loses all its linked SSTs
// but since it got marked immutable at sequence number 300 which hasn't
// been flushed yet, it cannot be marked obsolete at this point.
{
CompactionJobInfo info{};
info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
const std::vector<bool> expected_obsolete{true, false, false, false, false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 4);
for (size_t i = 0; i < 4; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
}
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(obsolete_files.size(), 1);
ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
}
// Simulate a flush with largest sequence number 300. This will make it
// possible to mark blob file 2 obsolete.
{
FlushJobInfo info{};
info.file_number = 26;
info.smallest_seqno = 201;
info.largest_seqno = 300;
blob_db_impl()->TEST_ProcessFlushJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
const std::vector<bool> expected_obsolete{true, true, false, false, false};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
}
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), 3);
for (size_t i = 0; i < 3; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
}
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(obsolete_files.size(), 2);
ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
}
}
TEST_F(BlobDBTest, ShutdownWait) {
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 100;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = false;
Options options;
options.env = mock_env_.get();
SyncPoint::GetInstance()->LoadDependency({
{"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
{"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
{"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
{"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
});
// Force all tasks to be scheduled immediately.
SyncPoint::GetInstance()->SetCallBack(
"TimeQueue::Add:item.end", [&](void *arg) {
std::chrono::steady_clock::time_point *tp =
static_cast<std::chrono::steady_clock::time_point *>(arg);
*tp =
std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
});
SyncPoint::GetInstance()->SetCallBack(
"BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
// Sleep 3 ms to increase the chance of data race.
// We've synced up the code so that EvictExpiredFiles()
// is called concurrently with ~BlobDBImpl().
// ~BlobDBImpl() is supposed to wait for all background
// task to shutdown before doing anything else. In order
// to use the same test to reproduce a bug of the waiting
// logic, we wait a little bit here, so that TSAN can
// catch the data race.
// We should improve the test if we find a better way.
Env::Default()->SleepForMicroseconds(3000);
});
SyncPoint::GetInstance()->EnableProcessing();
Open(bdb_options, options);
mock_env_->set_current_time(50);
std::map<std::string, std::string> data;
ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files[0];
ASSERT_FALSE(blob_file->Immutable());
ASSERT_FALSE(blob_file->Obsolete());
VerifyDB(data);
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
mock_env_->set_current_time(250);
// The key should expired now.
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
Close();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(BlobDBTest, SyncBlobFileBeforeClose) {
Options options;
options.statistics = CreateDBStatistics();
BlobDBOptions blob_options;
blob_options.min_blob_size = 0;
blob_options.bytes_per_sync = 1 << 20;
blob_options.disable_background_tasks = true;
Open(blob_options, options);
Put("foo", "bar");
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1);
}
TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions blob_options;
blob_options.min_blob_size = 0;
blob_options.bytes_per_sync = 1 << 20;
blob_options.disable_background_tasks = true;
Open(blob_options, options);
Put("foo", "bar");
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
SyncPoint::GetInstance()->SetCallBack(
"BlobLogWriter::Sync", [this](void * /* arg */) {
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
});
SyncPoint::GetInstance()->EnableProcessing();
const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]);
fault_injection_env_->SetFilesystemActive(true);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(s.IsIOError());
}
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
// A black-box test for the ttl wrapper around rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE