rocksdb/db/blob/db_blob_basic_test.cc
Yanqin Jin b0e20194ea Handle blob files when options.best_efforts_recovery is true (#8180)
Summary:
If `options.best_efforts_recovery == true`, RocksDB currently tolerates missing table files and recovers to the latest version without missing table files (not considering WAL). It is necessary to handle blob files as well to make the feature more complete.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8180

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D27840556

Pulled By: riversand963

fbshipit-source-id: 041685d0dc2e7779ac4f0374c07a8a327704aa5e
2021-04-19 11:56:14 -07:00

461 lines
14 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <array>
#include "db/blob/blob_index.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
class DBBlobBasicTest : public DBTestBase {
protected:
DBBlobBasicTest()
: DBTestBase("/db_blob_basic_test", /* env_do_fsync */ false) {}
};
TEST_F(DBBlobBasicTest, GetBlob) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
ASSERT_EQ(Get(key), blob_value);
// Try again with no I/O allowed. The table and the necessary blocks should
// already be in their respective caches; however, the blob itself can only be
// read from the blob file, so the read should return Incomplete.
ReadOptions read_options;
read_options.read_tier = kBlockCacheTier;
PinnableSlice result;
ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)
.IsIncomplete());
}
TEST_F(DBBlobBasicTest, MultiGetBlobs) {
constexpr size_t min_blob_size = 6;
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
Reopen(options);
// Put then retrieve three key-values. The first value is below the size limit
// and is thus stored inline; the other two are stored separately as blobs.
constexpr size_t num_keys = 3;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "short";
static_assert(sizeof(first_value) - 1 < min_blob_size,
"first_value too long to be inlined");
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "long_value";
static_assert(sizeof(second_value) - 1 >= min_blob_size,
"second_value too short to be stored as blob");
ASSERT_OK(Put(second_key, second_value));
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "other_long_value";
static_assert(sizeof(third_value) - 1 >= min_blob_size,
"third_value too short to be stored as blob");
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Flush());
ReadOptions read_options;
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], second_value);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], third_value);
}
// Try again with no I/O allowed. The table and the necessary blocks should
// already be in their respective caches. The first (inlined) value should be
// successfully read; however, the two blob values could only be read from the
// blob file, so for those the read should return Incomplete.
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_TRUE(statuses[1].IsIncomplete());
ASSERT_TRUE(statuses[2].IsIncomplete());
}
}
TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
// Fake a corrupt blob index.
const std::string blob_index("foobar");
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsCorruption());
}
TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) {
constexpr uint64_t min_blob_size = 10;
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
Reopen(options);
constexpr char key[] = "key";
constexpr char blob[] = "short";
static_assert(sizeof(short) - 1 < min_blob_size,
"Blob too long to be inlined");
// Fake an inlined TTL blob index.
std::string blob_index;
constexpr uint64_t expiration = 1234567890;
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsCorruption());
}
TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
// Fake a blob index referencing a non-existent blob file.
std::string blob_index;
constexpr uint64_t blob_file_number = 1000;
constexpr uint64_t offset = 1234;
constexpr uint64_t size = 5678;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsCorruption());
}
#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, GenerateIOTracing) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
std::string trace_file = dbname_ + "/io_trace_file";
Reopen(options);
{
// Create IO trace file
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(
NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer));
ASSERT_OK(db_->StartIOTrace(TraceOptions(), std::move(trace_writer)));
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
ASSERT_EQ(Get(key), blob_value);
ASSERT_OK(db_->EndIOTrace());
ASSERT_OK(env_->FileExists(trace_file));
}
{
// Parse trace file to check file operations related to blob files are
// recorded.
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(
NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader));
IOTraceReader reader(std::move(trace_reader));
IOTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
// Read records.
int blob_files_op_count = 0;
Status status;
while (true) {
IOTraceRecord record;
status = reader.ReadIOOp(&record);
if (!status.ok()) {
break;
}
if (record.file_name.find("blob") != std::string::npos) {
blob_files_op_count++;
}
}
// Assuming blob files will have Append, Close and then Read operations.
ASSERT_GT(blob_files_op_count, 2);
}
}
#endif // !ROCKSDB_LITE
TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(dbfull()->DisableFileDeletions());
constexpr int kNumTableFiles = 2;
for (int i = 0; i < kNumTableFiles; ++i) {
for (char ch = 'a'; ch != 'c'; ++ch) {
std::string key(1, ch);
ASSERT_OK(Put(key, "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
Close();
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
std::string blob_file_path;
uint64_t max_blob_file_num = kInvalidBlobFileNumber;
for (const auto& fname : files) {
uint64_t file_num = 0;
FileType type;
if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) &&
type == kBlobFile) {
if (file_num > max_blob_file_num) {
max_blob_file_num = file_num;
blob_file_path = dbname_ + "/" + fname;
}
}
}
ASSERT_OK(env_->DeleteFile(blob_file_path));
options.best_efforts_recovery = true;
Reopen(options);
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "a", &value));
ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value);
}
class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
public testing::WithParamInterface<std::string> {
protected:
DBBlobBasicIOErrorTest() : sync_point_(GetParam()) {
fault_injection_env_.reset(new FaultInjectionTestEnv(env_));
}
~DBBlobBasicIOErrorTest() { Close(); }
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileReader::OpenFile:NewRandomAccessFile",
"BlobFileReader::GetBlob:ReadFromFile"}));
TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
Options options;
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
.IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr size_t num_keys = 2;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(statuses[0].IsIOError());
ASSERT_TRUE(statuses[1].IsIOError());
}
namespace {
class ReadBlobCompactionFilter : public CompactionFilter {
public:
ReadBlobCompactionFilter() = default;
const char* Name() const override {
return "rocksdb.compaction.filter.read.blob";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const override {
if (value_type != CompactionFilter::ValueType::kValue) {
return CompactionFilter::Decision::kKeep;
}
assert(new_value);
new_value->assign(existing_value.data(), existing_value.size());
return CompactionFilter::Decision::kChangeValue;
}
};
} // anonymous namespace
TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ReadBlobCompactionFilter);
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
constexpr char key[] = "foo";
constexpr char blob_value[] = "foo_blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}