Propagate errors from UpdateBoundaries (#9851)
Summary: In `FileMetaData`, we keep track of the lowest-numbered blob file referenced by the SST file in question for the purposes of BlobDB's garbage collection in the `oldest_blob_file_number` field, which is updated in `UpdateBoundaries`. However, with the current code, `BlobIndex` decoding errors (or invalid blob file numbers) are swallowed in this method. The patch changes this by propagating these errors and failing the corresponding flush/compaction. (Note that since blob references are generated by the BlobDB code and also parsed by `CompactionIterator`, in reality this can only happen in the case of memory corruption.) This change necessitated updating some unit tests that involved fake/corrupt `BlobIndex` objects. Some of these just used a dummy string like `"blob_index"` as a placeholder; these were replaced with real `BlobIndex`es. Some were relying on the earlier behavior to simulate corruption; these were replaced with `SyncPoint`-based test code that corrupts a valid blob reference at read time. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9851 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D35683671 Pulled By: ltamasi fbshipit-source-id: f7387af9945c48e4d5c4cd864f1ba425c7ad51f6
This commit is contained in:
parent
be81609b43
commit
db536ee045
@ -366,19 +366,26 @@ TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
|
||||
Reopen(options);
|
||||
|
||||
constexpr char key[] = "key";
|
||||
constexpr char blob[] = "blob";
|
||||
|
||||
// Fake a corrupt blob index.
|
||||
const std::string blob_index("foobar");
|
||||
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
|
||||
ASSERT_OK(Put(key, blob));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"Version::Get::TamperWithBlobIndex", [](void* arg) {
|
||||
Slice* const blob_index = static_cast<Slice*>(arg);
|
||||
assert(blob_index);
|
||||
assert(!blob_index->empty());
|
||||
blob_index->remove_prefix(1);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
PinnableSlice result;
|
||||
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
|
||||
.IsCorruption());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
|
||||
@ -401,17 +408,27 @@ TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
|
||||
}
|
||||
|
||||
constexpr char key[] = "key";
|
||||
{
|
||||
// Fake a corrupt blob index.
|
||||
const std::string blob_index("foobar");
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
keys[kNumOfKeys] = Slice(static_cast<const char*>(key), sizeof(key) - 1);
|
||||
}
|
||||
constexpr char blob[] = "blob";
|
||||
ASSERT_OK(Put(key, blob));
|
||||
keys[kNumOfKeys] = key;
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"Version::MultiGet::TamperWithBlobIndex", [&key](void* arg) {
|
||||
KeyContext* const key_context = static_cast<KeyContext*>(arg);
|
||||
assert(key_context);
|
||||
assert(key_context->key);
|
||||
|
||||
if (*(key_context->key) == key) {
|
||||
Slice* const blob_index = key_context->value;
|
||||
assert(blob_index);
|
||||
assert(!blob_index->empty());
|
||||
blob_index->remove_prefix(1);
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
std::array<PinnableSlice, kNumOfKeys + 1> values;
|
||||
std::array<Status, kNumOfKeys + 1> statuses;
|
||||
db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1,
|
||||
@ -425,6 +442,9 @@ TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
|
||||
ASSERT_TRUE(statuses[i].IsCorruption());
|
||||
}
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) {
|
||||
|
@ -415,16 +415,30 @@ TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) {
|
||||
new ValueMutationFilter(""));
|
||||
options.compaction_filter = compaction_filter_guard.get();
|
||||
DestroyAndReopen(options);
|
||||
// Mock a corrupted blob index
|
||||
|
||||
constexpr char key[] = "key";
|
||||
std::string blob_idx("blob_idx");
|
||||
WriteBatch write_batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&write_batch, 0, key, blob_idx));
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &write_batch));
|
||||
constexpr char blob[] = "blob";
|
||||
|
||||
ASSERT_OK(Put(key, blob));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
|
||||
[](void* arg) {
|
||||
Slice* const blob_index = static_cast<Slice*>(arg);
|
||||
assert(blob_index);
|
||||
assert(!blob_index->empty());
|
||||
blob_index->remove_prefix(1);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
||||
/*end=*/nullptr)
|
||||
.IsCorruption());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "db/arena_wrapped_db_iter.h"
|
||||
#include "db/blob/blob_index.h"
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_iter.h"
|
||||
#include "db/db_test_util.h"
|
||||
@ -138,20 +139,39 @@ class DBBlobIndexTest : public DBTestBase {
|
||||
}
|
||||
};
|
||||
|
||||
// Should be able to write kTypeBlobIndex to memtables and SST files.
|
||||
// Note: the following test case pertains to the StackableDB-based BlobDB
|
||||
// implementation. We should be able to write kTypeBlobIndex to memtables and
|
||||
// SST files.
|
||||
TEST_F(DBBlobIndexTest, Write) {
|
||||
for (auto tier : kAllTiers) {
|
||||
DestroyAndReopen(GetTestOptions());
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
std::string index = ToString(i);
|
||||
|
||||
std::vector<std::pair<std::string, std::string>> key_values;
|
||||
|
||||
constexpr size_t num_key_values = 5;
|
||||
|
||||
key_values.reserve(num_key_values);
|
||||
|
||||
for (size_t i = 1; i <= num_key_values; ++i) {
|
||||
std::string key = "key" + ToString(i);
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210,
|
||||
"blob" + ToString(i));
|
||||
|
||||
key_values.emplace_back(std::move(key), std::move(blob_index));
|
||||
}
|
||||
|
||||
for (const auto& key_value : key_values) {
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index));
|
||||
ASSERT_OK(PutBlobIndex(&batch, key_value.first, key_value.second));
|
||||
ASSERT_OK(Write(&batch));
|
||||
}
|
||||
|
||||
MoveDataTo(tier);
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
std::string index = ToString(i);
|
||||
ASSERT_EQ("blob" + index, GetBlobIndex("key" + index));
|
||||
|
||||
for (const auto& key_value : key_values) {
|
||||
ASSERT_EQ(GetBlobIndex(key_value.first), key_value.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -164,13 +184,19 @@ TEST_F(DBBlobIndexTest, Write) {
|
||||
// accidentally opening the base DB of a stacked BlobDB and actual corruption
|
||||
// when using the integrated BlobDB.
|
||||
TEST_F(DBBlobIndexTest, Get) {
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210, "blob");
|
||||
|
||||
for (auto tier : kAllTiers) {
|
||||
DestroyAndReopen(GetTestOptions());
|
||||
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(batch.Put("key", "value"));
|
||||
ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index"));
|
||||
ASSERT_OK(PutBlobIndex(&batch, "blob_key", blob_index));
|
||||
ASSERT_OK(Write(&batch));
|
||||
|
||||
MoveDataTo(tier);
|
||||
|
||||
// Verify normal value
|
||||
bool is_blob_index = false;
|
||||
PinnableSlice value;
|
||||
@ -178,6 +204,7 @@ TEST_F(DBBlobIndexTest, Get) {
|
||||
ASSERT_EQ("value", GetImpl("key"));
|
||||
ASSERT_EQ("value", GetImpl("key", &is_blob_index));
|
||||
ASSERT_FALSE(is_blob_index);
|
||||
|
||||
// Verify blob index
|
||||
if (tier <= kImmutableMemtables) {
|
||||
ASSERT_TRUE(Get("blob_key", &value).IsNotSupported());
|
||||
@ -186,7 +213,7 @@ TEST_F(DBBlobIndexTest, Get) {
|
||||
ASSERT_TRUE(Get("blob_key", &value).IsCorruption());
|
||||
ASSERT_EQ("CORRUPTION", GetImpl("blob_key"));
|
||||
}
|
||||
ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index));
|
||||
ASSERT_EQ(blob_index, GetImpl("blob_key", &is_blob_index));
|
||||
ASSERT_TRUE(is_blob_index);
|
||||
}
|
||||
}
|
||||
@ -196,11 +223,14 @@ TEST_F(DBBlobIndexTest, Get) {
|
||||
// if blob index is updated with a normal value. See the test case above for
|
||||
// more details.
|
||||
TEST_F(DBBlobIndexTest, Updated) {
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210, "blob");
|
||||
|
||||
for (auto tier : kAllTiers) {
|
||||
DestroyAndReopen(GetTestOptions());
|
||||
WriteBatch batch;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index"));
|
||||
ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), blob_index));
|
||||
}
|
||||
ASSERT_OK(Write(&batch));
|
||||
// Avoid blob values from being purged.
|
||||
@ -218,7 +248,7 @@ TEST_F(DBBlobIndexTest, Updated) {
|
||||
ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9"));
|
||||
MoveDataTo(tier);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot));
|
||||
ASSERT_EQ(blob_index, GetBlobIndex("key" + ToString(i), snapshot));
|
||||
}
|
||||
ASSERT_EQ("new_value", Get("key1"));
|
||||
if (tier <= kImmutableMemtables) {
|
||||
@ -232,7 +262,7 @@ TEST_F(DBBlobIndexTest, Updated) {
|
||||
for (int i = 6; i < 9; i++) {
|
||||
ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
|
||||
}
|
||||
ASSERT_EQ("blob_index", GetBlobIndex("key9"));
|
||||
ASSERT_EQ(blob_index, GetBlobIndex("key9"));
|
||||
dbfull()->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
}
|
||||
|
@ -212,7 +212,11 @@ Status BuildTable(
|
||||
break;
|
||||
}
|
||||
builder->Add(key, value);
|
||||
meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);
|
||||
|
||||
s = meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO(noetzli): Update stats after flush, too.
|
||||
if (io_priority == Env::IO_HIGH &&
|
||||
|
@ -234,6 +234,10 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
|
||||
return false;
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
|
||||
&value_);
|
||||
|
||||
// For integrated BlobDB impl, CompactionIterator reads blob value.
|
||||
// For Stacked BlobDB impl, the corresponding CompactionFilter's
|
||||
// FilterV2 method should read the blob value.
|
||||
@ -950,6 +954,10 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
|
||||
|
||||
// GC for integrated BlobDB
|
||||
if (compaction_->enable_blob_garbage_collection()) {
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
|
||||
&value_);
|
||||
|
||||
BlobIndex blob_index;
|
||||
|
||||
{
|
||||
|
@ -1533,11 +1533,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
break;
|
||||
}
|
||||
|
||||
const ParsedInternalKey& ikey = c_iter->ikey();
|
||||
status = sub_compact->current_output()->meta.UpdateBoundaries(
|
||||
key, value, ikey.sequence, ikey.type);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
|
||||
sub_compact->current_output_file_size =
|
||||
sub_compact->builder->EstimatedFileSize();
|
||||
const ParsedInternalKey& ikey = c_iter->ikey();
|
||||
sub_compact->current_output()->meta.UpdateBoundaries(
|
||||
key, value, ikey.sequence, ikey.type);
|
||||
sub_compact->num_output_records++;
|
||||
|
||||
// Close output file if it is big enough. Two possibilities determine it's
|
||||
|
@ -6484,20 +6484,29 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) {
|
||||
ASSERT_OK(Put(third_key, third_value));
|
||||
|
||||
constexpr char fourth_key[] = "fourth_key";
|
||||
constexpr char corrupt_blob_index[] = "foobar";
|
||||
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key,
|
||||
corrupt_blob_index));
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
constexpr char fourth_value[] = "fourth_value";
|
||||
ASSERT_OK(Put(fourth_key, fourth_value));
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
|
||||
[](void* arg) {
|
||||
Slice* const blob_index = static_cast<Slice*>(arg);
|
||||
assert(blob_index);
|
||||
assert(!blob_index->empty());
|
||||
blob_index->remove_prefix(1);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
constexpr Slice* begin = nullptr;
|
||||
constexpr Slice* end = nullptr;
|
||||
|
||||
ASSERT_TRUE(
|
||||
db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) {
|
||||
|
@ -3,6 +3,7 @@
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "db/blob/blob_index.h"
|
||||
#include "db/db_test_util.h"
|
||||
#include "rocksdb/rocksdb_namespace.h"
|
||||
|
||||
@ -54,7 +55,7 @@ class DbKvChecksumTest
|
||||
case WriteBatchOpType::kMerge:
|
||||
s = wb.Merge(cf_handle, "key", "val");
|
||||
break;
|
||||
case WriteBatchOpType::kBlobIndex:
|
||||
case WriteBatchOpType::kBlobIndex: {
|
||||
// TODO(ajkr): use public API once available.
|
||||
uint32_t cf_id;
|
||||
if (cf_handle == nullptr) {
|
||||
@ -62,8 +63,14 @@ class DbKvChecksumTest
|
||||
} else {
|
||||
cf_id = cf_handle->GetID();
|
||||
}
|
||||
s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", "val");
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210,
|
||||
"val");
|
||||
|
||||
s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", blob_index);
|
||||
break;
|
||||
}
|
||||
case WriteBatchOpType::kNum:
|
||||
assert(false);
|
||||
}
|
||||
|
@ -565,10 +565,13 @@ class Repairer {
|
||||
|
||||
counter++;
|
||||
|
||||
t->meta.UpdateBoundaries(key, iter->value(), parsed.sequence,
|
||||
status = t->meta.UpdateBoundaries(key, iter->value(), parsed.sequence,
|
||||
parsed.type);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
if (!iter->status().ok()) {
|
||||
}
|
||||
if (status.ok() && !iter->status().ok()) {
|
||||
status = iter->status();
|
||||
}
|
||||
delete iter;
|
||||
|
@ -28,35 +28,19 @@ uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
|
||||
return number | (path_id * (kFileNumberMask + 1));
|
||||
}
|
||||
|
||||
void FileMetaData::UpdateBoundaries(const Slice& key, const Slice& value,
|
||||
Status FileMetaData::UpdateBoundaries(const Slice& key, const Slice& value,
|
||||
SequenceNumber seqno,
|
||||
ValueType value_type) {
|
||||
if (smallest.size() == 0) {
|
||||
smallest.DecodeFrom(key);
|
||||
}
|
||||
largest.DecodeFrom(key);
|
||||
fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
|
||||
fd.largest_seqno = std::max(fd.largest_seqno, seqno);
|
||||
|
||||
if (value_type == kTypeBlobIndex) {
|
||||
BlobIndex blob_index;
|
||||
const Status s = blob_index.DecodeFrom(value);
|
||||
if (!s.ok()) {
|
||||
return;
|
||||
return s;
|
||||
}
|
||||
|
||||
if (blob_index.IsInlined()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (blob_index.HasTTL()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Paranoid check: this should not happen because BlobDB numbers the blob
|
||||
// files starting from 1.
|
||||
if (!blob_index.IsInlined() && !blob_index.HasTTL()) {
|
||||
if (blob_index.file_number() == kInvalidBlobFileNumber) {
|
||||
return;
|
||||
return Status::Corruption("Invalid blob file number");
|
||||
}
|
||||
|
||||
if (oldest_blob_file_number == kInvalidBlobFileNumber ||
|
||||
@ -66,6 +50,16 @@ void FileMetaData::UpdateBoundaries(const Slice& key, const Slice& value,
|
||||
}
|
||||
}
|
||||
|
||||
if (smallest.size() == 0) {
|
||||
smallest.DecodeFrom(key);
|
||||
}
|
||||
largest.DecodeFrom(key);
|
||||
fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
|
||||
fd.largest_seqno = std::max(fd.largest_seqno, seqno);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VersionEdit::Clear() {
|
||||
max_level_ = 0;
|
||||
db_id_.clear();
|
||||
|
@ -245,7 +245,7 @@ struct FileMetaData {
|
||||
|
||||
// REQUIRED: Keys must be given to the function in sorted order (it expects
|
||||
// the last key to be the largest).
|
||||
void UpdateBoundaries(const Slice& key, const Slice& value,
|
||||
Status UpdateBoundaries(const Slice& key, const Slice& value,
|
||||
SequenceNumber seqno, ValueType value_type);
|
||||
|
||||
// Unlike UpdateBoundaries, ranges do not need to be presented in any
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include "db/version_edit.h"
|
||||
|
||||
#include "db/blob/blob_index.h"
|
||||
#include "rocksdb/advanced_options.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "test_util/testharness.h"
|
||||
@ -611,6 +612,128 @@ TEST_F(VersionEditTest, IgnorableTags) {
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) {
|
||||
FileMetaData meta;
|
||||
|
||||
{
|
||||
constexpr uint64_t file_number = 10;
|
||||
constexpr uint32_t path_id = 0;
|
||||
constexpr uint64_t file_size = 0;
|
||||
|
||||
meta.fd = FileDescriptor(file_number, path_id, file_size);
|
||||
}
|
||||
|
||||
constexpr char key[] = "foo";
|
||||
|
||||
constexpr uint64_t expected_oldest_blob_file_number = 20;
|
||||
|
||||
// Plain old value (does not affect oldest_blob_file_number)
|
||||
{
|
||||
constexpr char value[] = "value";
|
||||
constexpr SequenceNumber seq = 200;
|
||||
|
||||
ASSERT_OK(meta.UpdateBoundaries(key, value, seq, kTypeValue));
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, kInvalidBlobFileNumber);
|
||||
}
|
||||
|
||||
// Non-inlined, non-TTL blob index (sets oldest_blob_file_number)
|
||||
{
|
||||
constexpr uint64_t blob_file_number = 25;
|
||||
static_assert(blob_file_number > expected_oldest_blob_file_number,
|
||||
"unexpected");
|
||||
|
||||
constexpr uint64_t offset = 1000;
|
||||
constexpr uint64_t size = 100;
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
|
||||
kNoCompression);
|
||||
|
||||
constexpr SequenceNumber seq = 201;
|
||||
|
||||
ASSERT_OK(meta.UpdateBoundaries(key, blob_index, seq, kTypeBlobIndex));
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, blob_file_number);
|
||||
}
|
||||
|
||||
// Another one, with the oldest blob file number (updates
|
||||
// oldest_blob_file_number)
|
||||
{
|
||||
constexpr uint64_t offset = 2000;
|
||||
constexpr uint64_t size = 300;
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeBlob(&blob_index, expected_oldest_blob_file_number, offset,
|
||||
size, kNoCompression);
|
||||
|
||||
constexpr SequenceNumber seq = 202;
|
||||
|
||||
ASSERT_OK(meta.UpdateBoundaries(key, blob_index, seq, kTypeBlobIndex));
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, expected_oldest_blob_file_number);
|
||||
}
|
||||
|
||||
// Inlined TTL blob index (does not affect oldest_blob_file_number)
|
||||
{
|
||||
constexpr uint64_t expiration = 9876543210;
|
||||
constexpr char value[] = "value";
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, value);
|
||||
|
||||
constexpr SequenceNumber seq = 203;
|
||||
|
||||
ASSERT_OK(meta.UpdateBoundaries(key, blob_index, seq, kTypeBlobIndex));
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, expected_oldest_blob_file_number);
|
||||
}
|
||||
|
||||
// Non-inlined TTL blob index (does not affect oldest_blob_file_number, even
|
||||
// though file number is smaller)
|
||||
{
|
||||
constexpr uint64_t expiration = 9876543210;
|
||||
constexpr uint64_t blob_file_number = 15;
|
||||
static_assert(blob_file_number < expected_oldest_blob_file_number,
|
||||
"unexpected");
|
||||
|
||||
constexpr uint64_t offset = 2000;
|
||||
constexpr uint64_t size = 500;
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeBlobTTL(&blob_index, expiration, blob_file_number, offset,
|
||||
size, kNoCompression);
|
||||
|
||||
constexpr SequenceNumber seq = 204;
|
||||
|
||||
ASSERT_OK(meta.UpdateBoundaries(key, blob_index, seq, kTypeBlobIndex));
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, expected_oldest_blob_file_number);
|
||||
}
|
||||
|
||||
// Corrupt blob index
|
||||
{
|
||||
constexpr char corrupt_blob_index[] = "!corrupt!";
|
||||
constexpr SequenceNumber seq = 205;
|
||||
|
||||
ASSERT_TRUE(
|
||||
meta.UpdateBoundaries(key, corrupt_blob_index, seq, kTypeBlobIndex)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, expected_oldest_blob_file_number);
|
||||
}
|
||||
|
||||
// Invalid blob file number
|
||||
{
|
||||
constexpr uint64_t offset = 10000;
|
||||
constexpr uint64_t size = 1000;
|
||||
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeBlob(&blob_index, kInvalidBlobFileNumber, offset, size,
|
||||
kNoCompression);
|
||||
|
||||
constexpr SequenceNumber seq = 206;
|
||||
|
||||
ASSERT_TRUE(meta.UpdateBoundaries(key, blob_index, seq, kTypeBlobIndex)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(meta.oldest_blob_file_number, expected_oldest_blob_file_number);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -2073,6 +2073,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
|
||||
if (is_blob_index) {
|
||||
if (do_merge && value) {
|
||||
TEST_SYNC_POINT_CALLBACK("Version::Get::TamperWithBlobIndex",
|
||||
value);
|
||||
|
||||
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
|
||||
constexpr uint64_t* bytes_read = nullptr;
|
||||
|
||||
@ -2300,6 +2303,9 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
|
||||
if (iter->is_blob_index) {
|
||||
if (iter->value) {
|
||||
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
||||
&(*iter));
|
||||
|
||||
const Slice& blob_index_slice = *(iter->value);
|
||||
BlobIndex blob_index;
|
||||
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
|
||||
|
@ -1908,11 +1908,15 @@ TEST_F(BlobDBTest, GarbageCollectionFailure) {
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("dead", "beef"));
|
||||
|
||||
// Write a fake blob reference into the base DB that cannot be parsed.
|
||||
// Write a fake blob reference into the base DB that points to a non-existing
|
||||
// blob file.
|
||||
std::string blob_index;
|
||||
BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234,
|
||||
/* size */ 5678, kNoCompression);
|
||||
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(
|
||||
&batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
|
||||
"not a valid blob index"));
|
||||
&batch, blob_db_->DefaultColumnFamily()->GetID(), "key", blob_index));
|
||||
ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
|
||||
|
||||
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
|
||||
@ -1921,7 +1925,7 @@ TEST_F(BlobDBTest, GarbageCollectionFailure) {
|
||||
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
|
||||
|
||||
ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
|
||||
.IsCorruption());
|
||||
.IsIOError());
|
||||
|
||||
const Statistics *const statistics = db_options.statistics.get();
|
||||
assert(statistics);
|
||||
|
Loading…
Reference in New Issue
Block a user