rocksdb/db/blob/db_blob_basic_test.cc
Levi Tamasi 073ac54739 Log blob file space amp and expose it via the rocksdb.blob-stats DB property (#9538)
Summary:
Extend the periodic statistics in the info log with the total amount of garbage
in blob files and the space amplification pertaining to blob files, where the
latter is defined as `total_blob_file_size / (total_blob_file_size - total_blob_garbage_size)`.
Also expose the space amp via the `rocksdb.blob-stats` DB property.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9538

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D34126855

Pulled By: ltamasi

fbshipit-source-id: 3153e7a0fe0eca440322db273f4deaabaccc51b2
2022-02-10 12:42:11 -08:00

1035 lines
33 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <array>
#include <sstream>
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
class DBBlobBasicTest : public DBTestBase {
protected:
DBBlobBasicTest()
: DBTestBase("db_blob_basic_test", /* env_do_fsync */ false) {}
};
TEST_F(DBBlobBasicTest, GetBlob) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
ASSERT_EQ(Get(key), blob_value);
// Try again with no I/O allowed. The table and the necessary blocks should
// already be in their respective caches; however, the blob itself can only be
// read from the blob file, so the read should return Incomplete.
ReadOptions read_options;
read_options.read_tier = kBlockCacheTier;
PinnableSlice result;
ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)
.IsIncomplete());
}
TEST_F(DBBlobBasicTest, MultiGetBlobs) {
constexpr size_t min_blob_size = 6;
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
Reopen(options);
// Put then retrieve three key-values. The first value is below the size limit
// and is thus stored inline; the other two are stored separately as blobs.
constexpr size_t num_keys = 3;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "short";
static_assert(sizeof(first_value) - 1 < min_blob_size,
"first_value too long to be inlined");
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "long_value";
static_assert(sizeof(second_value) - 1 >= min_blob_size,
"second_value too short to be stored as blob");
ASSERT_OK(Put(second_key, second_value));
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "other_long_value";
static_assert(sizeof(third_value) - 1 >= min_blob_size,
"third_value too short to be stored as blob");
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Flush());
ReadOptions read_options;
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], second_value);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], third_value);
}
// Try again with no I/O allowed. The table and the necessary blocks should
// already be in their respective caches. The first (inlined) value should be
// successfully read; however, the two blob values could only be read from the
// blob file, so for those the read should return Incomplete.
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_TRUE(statuses[1].IsIncomplete());
ASSERT_TRUE(statuses[2].IsIncomplete());
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) {
Options options = GetDefaultOptions();
// First, create an external SST file ["b"].
const std::string file_path = dbname_ + "/test.sst";
{
SstFileWriter sst_file_writer(EnvOptions(), GetDefaultOptions());
Status s = sst_file_writer.Open(file_path);
ASSERT_OK(s);
ASSERT_OK(sst_file_writer.Put("b", "b_value"));
ASSERT_OK(sst_file_writer.Finish());
}
options.enable_blob_files = true;
options.min_blob_size = 1000;
options.use_direct_reads = true;
options.allow_ingest_behind = true;
// Open DB with fixed-prefix sst-partitioner so that compaction will cut
// new table file when encountering a new key whose 1-byte prefix changes.
constexpr size_t key_len = 1;
options.sst_partitioner_factory =
NewSstPartitionerFixedPrefixFactory(key_len);
Status s = TryReopen(options);
if (s.IsInvalidArgument()) {
ROCKSDB_GTEST_SKIP("This test requires direct IO support");
return;
}
ASSERT_OK(s);
constexpr size_t num_keys = 3;
constexpr size_t blob_size = 3000;
constexpr char first_key[] = "a";
const std::string first_blob(blob_size, 'a');
ASSERT_OK(Put(first_key, first_blob));
constexpr char second_key[] = "b";
const std::string second_blob(2 * blob_size, 'b');
ASSERT_OK(Put(second_key, second_blob));
constexpr char third_key[] = "d";
const std::string third_blob(blob_size, 'd');
ASSERT_OK(Put(third_key, third_blob));
// first_blob, second_blob and third_blob in the same blob file.
// SST Blob file
// L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'|
// | | | ^ ^ ^
// | | | | | |
// | | +---------|-------|--------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_OK(Flush());
constexpr char fourth_key[] = "c";
const std::string fourth_blob(blob_size, 'c');
ASSERT_OK(Put(fourth_key, fourth_blob));
// fourth_blob in another blob file.
// SST Blob file SST Blob file
// L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| ["c"] |'cccc'|
// | | | ^ ^ ^ | ^
// | | | | | | | |
// | | +---------|-------|--------+ +-------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
// Due to the above sst partitioner, we get 4 L1 files. The blob files are
// unchanged.
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["b"] ["c"] | | ["d"] |
// | | | | | |
// | | +---------|-------|---------------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
{
// Ingest the external SST file into bottommost level.
std::vector<std::string> ext_files{file_path};
IngestExternalFileOptions opts;
opts.ingest_behind = true;
ASSERT_OK(
db_->IngestExternalFile(db_->DefaultColumnFamily(), ext_files, opts));
}
// Now the database becomes as follows.
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["b"] ["c"] | | ["d"] |
// | | | | | |
// | | +---------|-------|---------------+
// | +-----------------|-------+
// +-------------------------+
//
// L6 ["b"]
{
// Compact ["b"] to bottommost level.
Slice begin = Slice(second_key);
Slice end = Slice(second_key);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, &begin, &end));
}
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["c"] | | ["d"] |
// | | | | |
// | +---------|-------|---------------+
// | +-----------------|-------+
// +-------|-----------------+
// |
// L6 ["b"]
ASSERT_EQ(3, NumTableFilesAtLevel(/*level=*/1));
ASSERT_EQ(1, NumTableFilesAtLevel(/*level=*/6));
bool called = false;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* arg) {
auto* aligned_reqs = static_cast<std::vector<FSReadRequest>*>(arg);
assert(aligned_reqs);
ASSERT_EQ(1, aligned_reqs->size());
called = true;
});
SyncPoint::GetInstance()->EnableProcessing();
std::array<Slice, num_keys> keys{{first_key, third_key, second_key}};
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
// The MultiGet(), when constructing the KeyContexts, will process the keys
// in such order: a, d, b. The reason is that ["a"] and ["d"] are in L1,
// while ["b"] resides in L6.
// Consequently, the original FSReadRequest list prepared by
// Version::MultiGetblob() will be for "a", "d" and "b". It is unsorted as
// follows:
//
// ["a", offset=30, len=3033],
// ["d", offset=9096, len=3033],
// ["b", offset=3063, len=6033]
//
// If we do not sort them before calling MultiRead() in DirectIO, then the
// underlying IO merging logic will yield two requests.
//
// [offset=0, len=4096] (for "a")
// [offset=0, len=12288] (result of merging the request for "d" and "b")
//
// We need to sort them in Version::MultiGetBlob() so that the underlying
// IO merging logic in DirectIO mode works as expected. The correct
// behavior will be one aligned request:
//
// [offset=0, len=12288]
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(called);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_blob);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], third_blob);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], second_blob);
}
}
#endif // !ROCKSDB_LITE
TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr size_t kNumBlobFiles = 3;
constexpr size_t kNumBlobsPerFile = 3;
constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles;
std::vector<std::string> key_strs;
std::vector<std::string> value_strs;
for (size_t i = 0; i < kNumBlobFiles; ++i) {
for (size_t j = 0; j < kNumBlobsPerFile; ++j) {
std::string key = "key" + std::to_string(i) + "_" + std::to_string(j);
std::string value =
"value_as_blob" + std::to_string(i) + "_" + std::to_string(j);
ASSERT_OK(Put(key, value));
key_strs.push_back(key);
value_strs.push_back(value);
}
ASSERT_OK(Flush());
}
assert(key_strs.size() == kNumKeys);
std::array<Slice, kNumKeys> keys;
for (size_t i = 0; i < keys.size(); ++i) {
keys[i] = key_strs[i];
}
std::array<PinnableSlice, kNumKeys> values;
std::array<Status, kNumKeys> statuses;
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0],
&values[0], &statuses[0]);
for (size_t i = 0; i < kNumKeys; ++i) {
ASSERT_OK(statuses[i]);
ASSERT_EQ(value_strs[i], values[i]);
}
}
TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
// Fake a corrupt blob index.
const std::string blob_index("foobar");
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsCorruption());
}
TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
DestroyAndReopen(options);
constexpr size_t kNumOfKeys = 3;
std::array<std::string, kNumOfKeys> key_strs;
std::array<std::string, kNumOfKeys> value_strs;
std::array<Slice, kNumOfKeys + 1> keys;
for (size_t i = 0; i < kNumOfKeys; ++i) {
key_strs[i] = "foo" + std::to_string(i);
value_strs[i] = "blob_value" + std::to_string(i);
ASSERT_OK(Put(key_strs[i], value_strs[i]));
keys[i] = key_strs[i];
}
constexpr char key[] = "key";
{
// Fake a corrupt blob index.
const std::string blob_index("foobar");
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
keys[kNumOfKeys] = Slice(static_cast<const char*>(key), sizeof(key) - 1);
}
ASSERT_OK(Flush());
std::array<PinnableSlice, kNumOfKeys + 1> values;
std::array<Status, kNumOfKeys + 1> statuses;
db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1,
keys.data(), values.data(), statuses.data(),
/*sorted_input=*/false);
for (size_t i = 0; i < kNumOfKeys + 1; ++i) {
if (i != kNumOfKeys) {
ASSERT_OK(statuses[i]);
ASSERT_EQ("blob_value" + std::to_string(i), values[i]);
} else {
ASSERT_TRUE(statuses[i].IsCorruption());
}
}
}
TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr size_t kNumOfKeys = 3;
std::array<std::string, kNumOfKeys> key_bufs;
std::array<std::string, kNumOfKeys> value_bufs;
std::array<Slice, kNumOfKeys> keys;
for (size_t i = 0; i < kNumOfKeys; ++i) {
key_bufs[i] = "foo" + std::to_string(i);
value_bufs[i] = "blob_value" + std::to_string(i);
ASSERT_OK(Put(key_bufs[i], value_bufs[i]));
keys[i] = key_bufs[i];
}
ASSERT_OK(Flush());
std::array<PinnableSlice, kNumOfKeys> values;
std::array<Status, kNumOfKeys> statuses;
ReadOptions read_opts;
read_opts.value_size_soft_limit = 1;
db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys,
keys.data(), values.data(), statuses.data(),
/*sorted_input=*/true);
for (const auto& s : statuses) {
ASSERT_TRUE(s.IsAborted());
}
}
TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) {
constexpr uint64_t min_blob_size = 10;
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
Reopen(options);
constexpr char key[] = "key";
constexpr char blob[] = "short";
static_assert(sizeof(short) - 1 < min_blob_size,
"Blob too long to be inlined");
// Fake an inlined TTL blob index.
std::string blob_index;
constexpr uint64_t expiration = 1234567890;
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsCorruption());
}
TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
// Fake a blob index referencing a non-existent blob file.
std::string blob_index;
constexpr uint64_t blob_file_number = 1000;
constexpr uint64_t offset = 1234;
constexpr uint64_t size = 5678;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsCorruption());
}
#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, GenerateIOTracing) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
std::string trace_file = dbname_ + "/io_trace_file";
Reopen(options);
{
// Create IO trace file
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(
NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer));
ASSERT_OK(db_->StartIOTrace(TraceOptions(), std::move(trace_writer)));
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
ASSERT_EQ(Get(key), blob_value);
ASSERT_OK(db_->EndIOTrace());
ASSERT_OK(env_->FileExists(trace_file));
}
{
// Parse trace file to check file operations related to blob files are
// recorded.
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(
NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader));
IOTraceReader reader(std::move(trace_reader));
IOTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
// Read records.
int blob_files_op_count = 0;
Status status;
while (true) {
IOTraceRecord record;
status = reader.ReadIOOp(&record);
if (!status.ok()) {
break;
}
if (record.file_name.find("blob") != std::string::npos) {
blob_files_op_count++;
}
}
// Assuming blob files will have Append, Close and then Read operations.
ASSERT_GT(blob_files_op_count, 2);
}
}
#endif // !ROCKSDB_LITE
TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(dbfull()->DisableFileDeletions());
constexpr int kNumTableFiles = 2;
for (int i = 0; i < kNumTableFiles; ++i) {
for (char ch = 'a'; ch != 'c'; ++ch) {
std::string key(1, ch);
ASSERT_OK(Put(key, "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
Close();
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
std::string blob_file_path;
uint64_t max_blob_file_num = kInvalidBlobFileNumber;
for (const auto& fname : files) {
uint64_t file_num = 0;
FileType type;
if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) &&
type == kBlobFile) {
if (file_num > max_blob_file_num) {
max_blob_file_num = file_num;
blob_file_path = dbname_ + "/" + fname;
}
}
}
ASSERT_OK(env_->DeleteFile(blob_file_path));
options.best_efforts_recovery = true;
Reopen(options);
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "a", &value));
ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value);
}
TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) {
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
ASSERT_OK(Put("Key1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v3"));
ASSERT_OK(Flush());
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value));
ASSERT_EQ(Get("Key1"), "v1,v2,v3");
}
TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) {
constexpr size_t num_keys = 3;
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
ASSERT_OK(Put("Key0", "v0_0"));
ASSERT_OK(Put("Key1", "v1_0"));
ASSERT_OK(Put("Key2", "v2_0"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key0", "v0_1"));
ASSERT_OK(Merge("Key1", "v1_1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key0", "v0_2"));
ASSERT_OK(Flush());
std::array<Slice, num_keys> keys{{"Key0", "Key1", "Key2"}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], "v0_0,v0_1,v0_2");
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], "v1_0,v1_1");
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], "v2_0");
}
#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, Properties) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key1[] = "key1";
constexpr size_t key1_size = sizeof(key1) - 1;
constexpr char key2[] = "key2";
constexpr size_t key2_size = sizeof(key2) - 1;
constexpr char key3[] = "key3";
constexpr size_t key3_size = sizeof(key3) - 1;
constexpr char blob[] = "00000000000000";
constexpr size_t blob_size = sizeof(blob) - 1;
constexpr char longer_blob[] = "00000000000000000000";
constexpr size_t longer_blob_size = sizeof(longer_blob) - 1;
ASSERT_OK(Put(key1, blob));
ASSERT_OK(Put(key2, longer_blob));
ASSERT_OK(Flush());
constexpr size_t first_blob_file_expected_size =
BlobLogHeader::kSize +
BlobLogRecord::CalculateAdjustmentForRecordHeader(key1_size) + blob_size +
BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) +
longer_blob_size + BlobLogFooter::kSize;
ASSERT_OK(Put(key3, blob));
ASSERT_OK(Flush());
constexpr size_t second_blob_file_expected_size =
BlobLogHeader::kSize +
BlobLogRecord::CalculateAdjustmentForRecordHeader(key3_size) + blob_size +
BlobLogFooter::kSize;
constexpr size_t total_expected_size =
first_blob_file_expected_size + second_blob_file_expected_size;
// Number of blob files
uint64_t num_blob_files = 0;
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumBlobFiles, &num_blob_files));
ASSERT_EQ(num_blob_files, 2);
// Total size of live blob files
uint64_t live_blob_file_size = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileSize,
&live_blob_file_size));
ASSERT_EQ(live_blob_file_size, total_expected_size);
// Total size of all blob files across all versions
// Note: this should be the same as above since we only have one
// version at this point.
uint64_t total_blob_file_size = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize,
&total_blob_file_size));
ASSERT_EQ(total_blob_file_size, total_expected_size);
// Delete key2 to create some garbage
ASSERT_OK(Delete(key2));
ASSERT_OK(Flush());
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
constexpr size_t expected_garbage_size =
BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) +
longer_blob_size;
constexpr double expected_space_amp =
static_cast<double>(total_expected_size) /
(total_expected_size - expected_garbage_size);
// Blob file stats
std::string blob_stats;
ASSERT_TRUE(db_->GetProperty(DB::Properties::kBlobStats, &blob_stats));
std::ostringstream oss;
oss << "Number of blob files: 2\nTotal size of blob files: "
<< total_expected_size
<< "\nTotal size of garbage in blob files: " << expected_garbage_size
<< "\nBlob file space amplification: " << expected_space_amp << '\n';
ASSERT_EQ(blob_stats, oss.str());
}
TEST_F(DBBlobBasicTest, PropertiesMultiVersion) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key1[] = "key1";
constexpr char key2[] = "key2";
constexpr char key3[] = "key3";
constexpr size_t key_size = sizeof(key1) - 1;
static_assert(sizeof(key2) - 1 == key_size, "unexpected size: key2");
static_assert(sizeof(key3) - 1 == key_size, "unexpected size: key3");
constexpr char blob[] = "0000000000";
constexpr size_t blob_size = sizeof(blob) - 1;
ASSERT_OK(Put(key1, blob));
ASSERT_OK(Flush());
ASSERT_OK(Put(key2, blob));
ASSERT_OK(Flush());
// Create an iterator to keep the current version alive
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
ASSERT_OK(iter->status());
// Note: the Delete and subsequent compaction results in the first blob file
// not making it to the final version. (It is still part of the previous
// version kept alive by the iterator though.) On the other hand, the Put
// results in a third blob file.
ASSERT_OK(Delete(key1));
ASSERT_OK(Put(key3, blob));
ASSERT_OK(Flush());
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
// Total size of all blob files across all versions: between the two versions,
// we should have three blob files of the same size with one blob each.
// The version kept alive by the iterator contains the first and the second
// blob file, while the final version contains the second and the third blob
// file. (The second blob file is thus shared by the two versions but should
// be counted only once.)
uint64_t total_blob_file_size = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize,
&total_blob_file_size));
ASSERT_EQ(total_blob_file_size,
3 * (BlobLogHeader::kSize +
BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
blob_size + BlobLogFooter::kSize));
}
#endif // !ROCKSDB_LITE
class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
public testing::WithParamInterface<std::string> {
protected:
DBBlobBasicIOErrorTest() : sync_point_(GetParam()) {
fault_injection_env_.reset(new FaultInjectionTestEnv(env_));
}
~DBBlobBasicIOErrorTest() { Close(); }
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
std::string sync_point_;
};
class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest {
public:
DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {}
};
INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileReader::OpenFile:NewRandomAccessFile",
"BlobFileReader::GetBlob:ReadFromFile"}));
INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileReader::OpenFile:NewRandomAccessFile",
"BlobFileReader::MultiGetBlob:ReadFromFile"}));
TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
Options options;
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr size_t num_keys = 2;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(statuses[0].IsIOError());
ASSERT_TRUE(statuses[1].IsIOError());
}
TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr size_t num_keys = 2;
constexpr char key1[] = "key1";
constexpr char value1[] = "blob1";
ASSERT_OK(Put(key1, value1));
ASSERT_OK(Flush());
constexpr char key2[] = "key2";
constexpr char value2[] = "blob2";
ASSERT_OK(Put(key2, value2));
ASSERT_OK(Flush());
std::array<Slice, num_keys> keys{{key1, key2}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
bool first_blob_file = true;
SyncPoint::GetInstance()->SetCallBack(
sync_point_, [&first_blob_file, this](void* /* arg */) {
if (first_blob_file) {
first_blob_file = false;
return;
}
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
keys.data(), values.data(), statuses.data());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(statuses[0]);
ASSERT_EQ(value1, values[0]);
ASSERT_TRUE(statuses[1].IsIOError());
}
namespace {
class ReadBlobCompactionFilter : public CompactionFilter {
public:
ReadBlobCompactionFilter() = default;
const char* Name() const override {
return "rocksdb.compaction.filter.read.blob";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const override {
if (value_type != CompactionFilter::ValueType::kValue) {
return CompactionFilter::Decision::kKeep;
}
assert(new_value);
new_value->assign(existing_value.data(), existing_value.size());
return CompactionFilter::Decision::kChangeValue;
}
};
} // anonymous namespace
TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ReadBlobCompactionFilter);
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
constexpr char key[] = "foo";
constexpr char blob_value[] = "foo_blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}