Add blob files to VersionStorageInfo/VersionBuilder (#6597)

Summary:
The patch adds a couple of classes to represent metadata about
blob files: `SharedBlobFileMetaData` contains the information elements
that are immutable (once the blob file is closed), e.g. blob file number,
total number and size of blob files, checksum method/value, while
`BlobFileMetaData` contains attributes that can vary across versions like
the amount of garbage in the file. There is a single `SharedBlobFileMetaData`
for each blob file, which is jointly owned by the `BlobFileMetaData` objects
that point to it; `BlobFileMetaData` objects, in turn, are owned by `Version`s
and can also be shared if the (immutable _and_ mutable) state of the blob file
is the same in two versions.

In addition, the patch adds the blob file metadata to `VersionStorageInfo`, and extends
`VersionBuilder` so that it can apply blob file related `VersionEdit`s (i.e. those
containing `BlobFileAddition`s and/or `BlobFileGarbage`), and save blob file metadata
to a new `VersionStorageInfo`. Consistency checks are also extended to ensure
that table files point to blob files that are part of the `Version`, and that all blob files
that are part of any given `Version` have at least some _non_-garbage data in them.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6597

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D20656803

Pulled By: ltamasi

fbshipit-source-id: f1f74d135045b3b42d0146f03ee576ef0a4bfd80
This commit is contained in:
Levi Tamasi 2020-03-26 18:48:55 -07:00 committed by Facebook GitHub Bot
parent 6301dbe7a7
commit 6f62322fe4
9 changed files with 941 additions and 8 deletions

View File

@ -512,6 +512,7 @@ set(SOURCES
db/arena_wrapped_db_iter.cc db/arena_wrapped_db_iter.cc
db/blob/blob_file_addition.cc db/blob/blob_file_addition.cc
db/blob/blob_file_garbage.cc db/blob/blob_file_garbage.cc
db/blob/blob_file_meta.cc
db/builder.cc db/builder.cc
db/c.cc db/c.cc
db/column_family.cc db/column_family.cc

View File

@ -118,6 +118,7 @@ cpp_library(
"db/arena_wrapped_db_iter.cc", "db/arena_wrapped_db_iter.cc",
"db/blob/blob_file_addition.cc", "db/blob/blob_file_addition.cc",
"db/blob/blob_file_garbage.cc", "db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/builder.cc", "db/builder.cc",
"db/c.cc", "db/c.cc",
"db/column_family.cc", "db/column_family.cc",

52
db/blob/blob_file_meta.cc Normal file
View File

@ -0,0 +1,52 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_file_meta.h"
#include <ostream>
#include <sstream>
namespace ROCKSDB_NAMESPACE {
SharedBlobFileMetaData::~SharedBlobFileMetaData() {
// TODO: add the blob file to the list of obsolete files here
}
std::string SharedBlobFileMetaData::DebugString() const {
std::ostringstream oss;
oss << (*this);
return oss.str();
}
std::ostream& operator<<(std::ostream& os,
const SharedBlobFileMetaData& shared_meta) {
os << "blob_file_number: " << shared_meta.GetBlobFileNumber()
<< " total_blob_count: " << shared_meta.GetTotalBlobCount()
<< " total_blob_bytes: " << shared_meta.GetTotalBlobBytes()
<< " checksum_method: " << shared_meta.GetChecksumMethod()
<< " checksum_value: " << shared_meta.GetChecksumValue();
return os;
}
std::string BlobFileMetaData::DebugString() const {
std::ostringstream oss;
oss << (*this);
return oss.str();
}
std::ostream& operator<<(std::ostream& os, const BlobFileMetaData& meta) {
const auto& shared_meta = meta.GetSharedMeta();
assert(shared_meta);
os << (*shared_meta) << " garbage_blob_count: " << meta.GetGarbageBlobCount()
<< " garbage_blob_bytes: " << meta.GetGarbageBlobBytes();
return os;
}
} // namespace ROCKSDB_NAMESPACE

124
db/blob/blob_file_meta.h Normal file
View File

@ -0,0 +1,124 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/rocksdb_namespace.h"
#include <cassert>
#include <iosfwd>
#include <memory>
#include <string>
namespace ROCKSDB_NAMESPACE {
// SharedBlobFileMetaData represents the immutable part of blob files' metadata,
// like the blob file number, total number and size of blobs, or checksum
// method and value. There is supposed to be one object of this class per blob
// file (shared across all versions that include the blob file in question);
// hence, the type is neither copyable nor movable. A blob file can be marked
// obsolete when the corresponding SharedBlobFileMetaData object is destroyed.
class SharedBlobFileMetaData {
public:
SharedBlobFileMetaData(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, std::string checksum_method,
std::string checksum_value)
: blob_file_number_(blob_file_number),
total_blob_count_(total_blob_count),
total_blob_bytes_(total_blob_bytes),
checksum_method_(std::move(checksum_method)),
checksum_value_(std::move(checksum_value)) {
assert(checksum_method_.empty() == checksum_value_.empty());
}
~SharedBlobFileMetaData();
SharedBlobFileMetaData(const SharedBlobFileMetaData&) = delete;
SharedBlobFileMetaData& operator=(const SharedBlobFileMetaData&) = delete;
uint64_t GetBlobFileNumber() const { return blob_file_number_; }
uint64_t GetTotalBlobCount() const { return total_blob_count_; }
uint64_t GetTotalBlobBytes() const { return total_blob_bytes_; }
const std::string& GetChecksumMethod() const { return checksum_method_; }
const std::string& GetChecksumValue() const { return checksum_value_; }
std::string DebugString() const;
private:
uint64_t blob_file_number_;
uint64_t total_blob_count_;
uint64_t total_blob_bytes_;
std::string checksum_method_;
std::string checksum_value_;
};
std::ostream& operator<<(std::ostream& os,
const SharedBlobFileMetaData& shared_meta);
// BlobFileMetaData contains the part of the metadata for blob files that can
// vary across versions, like the amount of garbage in the blob file. In
// addition, BlobFileMetaData objects point to and share the ownership of the
// SharedBlobFileMetaData object for the corresponding blob file. Similarly to
// SharedBlobFileMetaData, BlobFileMetaData are not copyable or movable. They
// are meant to be jointly owned by the versions in which the blob file has the
// same (immutable *and* mutable) state.
class BlobFileMetaData {
public:
BlobFileMetaData(std::shared_ptr<SharedBlobFileMetaData> shared_meta,
uint64_t garbage_blob_count, uint64_t garbage_blob_bytes)
: shared_meta_(std::move(shared_meta)),
garbage_blob_count_(garbage_blob_count),
garbage_blob_bytes_(garbage_blob_bytes) {
assert(shared_meta_);
assert(garbage_blob_count_ <= shared_meta_->GetTotalBlobCount());
assert(garbage_blob_bytes_ <= shared_meta_->GetTotalBlobBytes());
}
~BlobFileMetaData() = default;
BlobFileMetaData(const BlobFileMetaData&) = delete;
BlobFileMetaData& operator=(const BlobFileMetaData&) = delete;
const std::shared_ptr<SharedBlobFileMetaData>& GetSharedMeta() const {
return shared_meta_;
}
uint64_t GetBlobFileNumber() const {
assert(shared_meta_);
return shared_meta_->GetBlobFileNumber();
}
uint64_t GetTotalBlobCount() const {
assert(shared_meta_);
return shared_meta_->GetTotalBlobCount();
}
uint64_t GetTotalBlobBytes() const {
assert(shared_meta_);
return shared_meta_->GetTotalBlobBytes();
}
const std::string& GetChecksumMethod() const {
assert(shared_meta_);
return shared_meta_->GetChecksumMethod();
}
const std::string& GetChecksumValue() const {
assert(shared_meta_);
return shared_meta_->GetChecksumValue();
}
uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; }
uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; }
std::string DebugString() const;
private:
std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
uint64_t garbage_blob_count_;
uint64_t garbage_blob_bytes_;
};
std::ostream& operator<<(std::ostream& os, const BlobFileMetaData& meta);
} // namespace ROCKSDB_NAMESPACE

View File

@ -14,13 +14,16 @@
#include <cinttypes> #include <cinttypes>
#include <functional> #include <functional>
#include <map> #include <map>
#include <memory>
#include <set> #include <set>
#include <sstream>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "db/blob/blob_file_meta.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/table_cache.h" #include "db/table_cache.h"
@ -100,6 +103,9 @@ class VersionBuilder::Rep {
FileComparator level_zero_cmp_; FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_; FileComparator level_nonzero_cmp_;
// Metadata for all blob files affected by the series of version edits.
std::map<uint64_t, std::shared_ptr<BlobFileMetaData>> changed_blob_files_;
public: public:
Rep(const FileOptions& file_options, Logger* info_log, Rep(const FileOptions& file_options, Logger* info_log,
TableCache* table_cache, TableCache* table_cache,
@ -140,6 +146,57 @@ class VersionBuilder::Rep {
} }
} }
std::shared_ptr<BlobFileMetaData> GetBlobFileMetaData(
uint64_t blob_file_number) const {
auto changed_it = changed_blob_files_.find(blob_file_number);
if (changed_it != changed_blob_files_.end()) {
const auto& meta = changed_it->second;
assert(meta);
return meta;
}
assert(base_vstorage_);
const auto& base_blob_files = base_vstorage_->GetBlobFiles();
auto base_it = base_blob_files.find(blob_file_number);
if (base_it != base_blob_files.end()) {
const auto& meta = base_it->second;
assert(meta);
return meta;
}
return std::shared_ptr<BlobFileMetaData>();
}
Status CheckConsistencyOfOldestBlobFileReference(
const VersionStorageInfo* vstorage, uint64_t blob_file_number) const {
assert(vstorage);
// TODO: remove this check once we actually start recoding metadata for
// blob files in the MANIFEST.
if (vstorage->GetBlobFiles().empty()) {
return Status::OK();
}
if (blob_file_number == kInvalidBlobFileNumber) {
return Status::OK();
}
const auto meta = GetBlobFileMetaData(blob_file_number);
if (!meta) {
std::ostringstream oss;
oss << "Blob file #" << blob_file_number
<< " is not part of this version";
return Status::Corruption("VersionBuilder", oss.str());
}
return Status::OK();
}
Status CheckConsistency(VersionStorageInfo* vstorage) { Status CheckConsistency(VersionStorageInfo* vstorage) {
#ifdef NDEBUG #ifdef NDEBUG
if (!vstorage->force_consistency_checks()) { if (!vstorage->force_consistency_checks()) {
@ -148,10 +205,31 @@ class VersionBuilder::Rep {
return Status::OK(); return Status::OK();
} }
#endif #endif
// make sure the files are sorted correctly // Make sure the files are sorted correctly and that the oldest blob file
// reference for each table file points to a valid blob file in this
// version.
for (int level = 0; level < num_levels_; level++) { for (int level = 0; level < num_levels_; level++) {
auto& level_files = vstorage->LevelFiles(level); auto& level_files = vstorage->LevelFiles(level);
if (level_files.empty()) {
continue;
}
assert(level_files[0]);
Status s = CheckConsistencyOfOldestBlobFileReference(
vstorage, level_files[0]->oldest_blob_file_number);
if (!s.ok()) {
return s;
}
for (size_t i = 1; i < level_files.size(); i++) { for (size_t i = 1; i < level_files.size(); i++) {
assert(level_files[i]);
s = CheckConsistencyOfOldestBlobFileReference(
vstorage, level_files[i]->oldest_blob_file_number);
if (!s.ok()) {
return s;
}
auto f1 = level_files[i - 1]; auto f1 = level_files[i - 1];
auto f2 = level_files[i]; auto f2 = level_files[i];
#ifndef NDEBUG #ifndef NDEBUG
@ -217,6 +295,23 @@ class VersionBuilder::Rep {
} }
} }
} }
// Make sure that all blob files in the version have non-garbage data.
const auto& blob_files = vstorage->GetBlobFiles();
for (const auto& pair : blob_files) {
const auto& blob_file_meta = pair.second;
assert(blob_file_meta);
if (blob_file_meta->GetGarbageBlobCount() >=
blob_file_meta->GetTotalBlobCount()) {
std::ostringstream oss;
oss << "Blob file #" << blob_file_meta->GetBlobFileNumber()
<< " consists entirely of garbage";
return Status::Corruption("VersionBuilder", oss.str());
}
}
return Status::OK(); return Status::OK();
} }
@ -282,6 +377,56 @@ class VersionBuilder::Rep {
return true; return true;
} }
Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
auto meta = GetBlobFileMetaData(blob_file_number);
if (meta) {
std::ostringstream oss;
oss << "Blob file #" << blob_file_number << " already added";
return Status::Corruption("VersionBuilder", oss.str());
}
auto shared_meta = std::make_shared<SharedBlobFileMetaData>(
blob_file_number, blob_file_addition.GetTotalBlobCount(),
blob_file_addition.GetTotalBlobBytes(),
blob_file_addition.GetChecksumMethod(),
blob_file_addition.GetChecksumValue());
constexpr uint64_t garbage_blob_count = 0;
constexpr uint64_t garbage_blob_bytes = 0;
auto new_meta = std::make_shared<BlobFileMetaData>(
std::move(shared_meta), garbage_blob_count, garbage_blob_bytes);
changed_blob_files_.emplace(blob_file_number, std::move(new_meta));
return Status::OK();
}
Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
auto meta = GetBlobFileMetaData(blob_file_number);
if (!meta) {
std::ostringstream oss;
oss << "Blob file #" << blob_file_number << " not found";
return Status::Corruption("VersionBuilder", oss.str());
}
assert(meta->GetBlobFileNumber() == blob_file_number);
auto new_meta = std::make_shared<BlobFileMetaData>(
meta->GetSharedMeta(),
meta->GetGarbageBlobCount() + blob_file_garbage.GetGarbageBlobCount(),
meta->GetGarbageBlobBytes() + blob_file_garbage.GetGarbageBlobBytes());
changed_blob_files_[blob_file_number] = std::move(new_meta);
return Status::OK();
}
// Apply all of the edits in *edit to the current state. // Apply all of the edits in *edit to the current state.
Status Apply(VersionEdit* edit) { Status Apply(VersionEdit* edit) {
Status s = CheckConsistency(base_vstorage_); Status s = CheckConsistency(base_vstorage_);
@ -333,8 +478,103 @@ class VersionBuilder::Rep {
} }
} }
} }
// Add new blob files
for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
s = ApplyBlobFileAddition(blob_file_addition);
if (!s.ok()) {
return s; return s;
} }
}
// Increase the amount of garbage for blob files affected by GC
for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
s = ApplyBlobFileGarbage(blob_file_garbage);
if (!s.ok()) {
return s;
}
}
return s;
}
void AddBlobFileIfNeeded(
VersionStorageInfo* vstorage,
const std::shared_ptr<BlobFileMetaData>& meta) const {
assert(vstorage);
assert(meta);
if (meta->GetGarbageBlobCount() < meta->GetTotalBlobCount()) {
vstorage->AddBlobFile(meta);
}
}
// Merge the blob file metadata from the base version with the changes (edits)
// applied, and save the result into *vstorage.
void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
assert(base_vstorage_);
assert(vstorage);
const auto& base_blob_files = base_vstorage_->GetBlobFiles();
auto base_it = base_blob_files.begin();
const auto base_it_end = base_blob_files.end();
auto changed_it = changed_blob_files_.begin();
const auto changed_it_end = changed_blob_files_.end();
while (base_it != base_it_end && changed_it != changed_it_end) {
const uint64_t base_blob_file_number = base_it->first;
const uint64_t changed_blob_file_number = changed_it->first;
const auto& base_meta = base_it->second;
const auto& changed_meta = changed_it->second;
assert(base_meta);
assert(changed_meta);
if (base_blob_file_number < changed_blob_file_number) {
assert(base_meta->GetGarbageBlobCount() <
base_meta->GetTotalBlobCount());
vstorage->AddBlobFile(base_meta);
++base_it;
} else if (changed_blob_file_number < base_blob_file_number) {
AddBlobFileIfNeeded(vstorage, changed_meta);
++changed_it;
} else {
assert(base_blob_file_number == changed_blob_file_number);
assert(base_meta->GetSharedMeta() == changed_meta->GetSharedMeta());
assert(base_meta->GetGarbageBlobCount() <=
changed_meta->GetGarbageBlobCount());
assert(base_meta->GetGarbageBlobBytes() <=
changed_meta->GetGarbageBlobBytes());
AddBlobFileIfNeeded(vstorage, changed_meta);
++base_it;
++changed_it;
}
}
while (base_it != base_it_end) {
const auto& base_meta = base_it->second;
assert(base_meta);
assert(base_meta->GetGarbageBlobCount() < base_meta->GetTotalBlobCount());
vstorage->AddBlobFile(base_meta);
++base_it;
}
while (changed_it != changed_it_end) {
const auto& changed_meta = changed_it->second;
assert(changed_meta);
AddBlobFileIfNeeded(vstorage, changed_meta);
++changed_it;
}
}
// Save the current state in *v. // Save the current state in *v.
Status SaveTo(VersionStorageInfo* vstorage) { Status SaveTo(VersionStorageInfo* vstorage) {
@ -390,6 +630,8 @@ class VersionBuilder::Rep {
} }
} }
SaveBlobFilesTo(vstorage);
s = CheckConsistency(vstorage); s = CheckConsistency(vstorage);
return s; return s;
} }

View File

@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include <cstring>
#include <memory>
#include <string> #include <string>
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/version_set.h" #include "db/version_set.h"
@ -57,12 +59,13 @@ class VersionBuilderTest : public testing::Test {
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100, SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
uint64_t num_entries = 0, uint64_t num_deletions = 0, uint64_t num_entries = 0, uint64_t num_deletions = 0,
bool sampled = false, SequenceNumber smallest_seqno = 0, bool sampled = false, SequenceNumber smallest_seqno = 0,
SequenceNumber largest_seqno = 0) { SequenceNumber largest_seqno = 0,
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber) {
assert(level < vstorage_.num_levels()); assert(level < vstorage_.num_levels());
FileMetaData* f = new FileMetaData( FileMetaData* f = new FileMetaData(
file_number, path_id, file_size, GetInternalKey(smallest, smallest_seq), file_number, path_id, file_size, GetInternalKey(smallest, smallest_seq),
GetInternalKey(largest, largest_seq), smallest_seqno, largest_seqno, GetInternalKey(largest, largest_seq), smallest_seqno, largest_seqno,
/* marked_for_compact */ false, kInvalidBlobFileNumber, /* marked_for_compact */ false, oldest_blob_file_number,
kUnknownOldestAncesterTime, kUnknownFileCreationTime, kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName); kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size = file_size; f->compensated_file_size = file_size;
@ -75,6 +78,34 @@ class VersionBuilderTest : public testing::Test {
} }
} }
void AddBlob(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, std::string checksum_method,
std::string checksum_value, uint64_t garbage_blob_count,
uint64_t garbage_blob_bytes) {
auto shared_meta = std::make_shared<SharedBlobFileMetaData>(
blob_file_number, total_blob_count, total_blob_bytes,
std::move(checksum_method), std::move(checksum_value));
auto meta = std::make_shared<BlobFileMetaData>(
std::move(shared_meta), garbage_blob_count, garbage_blob_bytes);
vstorage_.AddBlobFile(std::move(meta));
}
static std::shared_ptr<BlobFileMetaData> GetBlobFileMetaData(
const VersionStorageInfo::BlobFiles& blob_files,
uint64_t blob_file_number) {
const auto it = blob_files.find(blob_file_number);
if (it == blob_files.end()) {
return std::shared_ptr<BlobFileMetaData>();
}
const auto& meta = it->second;
assert(meta);
return meta;
}
void UpdateVersionStorageInfo() { void UpdateVersionStorageInfo() {
vstorage_.UpdateFilesByCompactionPri(ioptions_.compaction_pri); vstorage_.UpdateFilesByCompactionPri(ioptions_.compaction_pri);
vstorage_.UpdateNumNonEmptyLevels(); vstorage_.UpdateNumNonEmptyLevels();
@ -319,6 +350,448 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
UnrefFilesInVersion(&new_vstorage); UnrefFilesInVersion(&new_vstorage);
} }
TEST_F(VersionBuilderTest, ApplyBlobFileAddition) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
constexpr uint64_t blob_file_number = 1234;
constexpr uint64_t total_blob_count = 5678;
constexpr uint64_t total_blob_bytes = 999999;
constexpr char checksum_method[] = "SHA1";
constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
ASSERT_OK(builder.Apply(&edit));
constexpr bool force_consistency_checks = false;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
ASSERT_OK(builder.SaveTo(&new_vstorage));
const auto& new_blob_files = new_vstorage.GetBlobFiles();
ASSERT_EQ(new_blob_files.size(), 1);
const auto new_meta = GetBlobFileMetaData(new_blob_files, blob_file_number);
ASSERT_NE(new_meta, nullptr);
ASSERT_EQ(new_meta->GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(new_meta->GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(new_meta->GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(new_meta->GetChecksumMethod(), checksum_method);
ASSERT_EQ(new_meta->GetChecksumValue(), checksum_value);
ASSERT_EQ(new_meta->GetGarbageBlobCount(), 0);
ASSERT_EQ(new_meta->GetGarbageBlobBytes(), 0);
}
TEST_F(VersionBuilderTest, ApplyBlobFileAdditionAlreadyInBase) {
// Attempt to add a blob file that is already present in the base version.
constexpr uint64_t blob_file_number = 1234;
constexpr uint64_t total_blob_count = 5678;
constexpr uint64_t total_blob_bytes = 999999;
constexpr char checksum_method[] = "SHA1";
constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
constexpr uint64_t garbage_blob_count = 123;
constexpr uint64_t garbage_blob_bytes = 456789;
AddBlob(blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
checksum_value, garbage_blob_count, garbage_blob_bytes);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
const Status s = builder.Apply(&edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "Blob file #1234 already added"));
}
TEST_F(VersionBuilderTest, ApplyBlobFileAdditionAlreadyApplied) {
// Attempt to add the same blob file twice using version edits.
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
constexpr uint64_t blob_file_number = 1234;
constexpr uint64_t total_blob_count = 5678;
constexpr uint64_t total_blob_bytes = 999999;
constexpr char checksum_method[] = "SHA1";
constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
ASSERT_OK(builder.Apply(&edit));
const Status s = builder.Apply(&edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "Blob file #1234 already added"));
}
TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileInBase) {
// Increase the amount of garbage for a blob file present in the base version.
constexpr uint64_t blob_file_number = 1234;
constexpr uint64_t total_blob_count = 5678;
constexpr uint64_t total_blob_bytes = 999999;
constexpr char checksum_method[] = "SHA1";
constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
constexpr uint64_t garbage_blob_count = 123;
constexpr uint64_t garbage_blob_bytes = 456789;
AddBlob(blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
checksum_value, garbage_blob_count, garbage_blob_bytes);
const auto meta =
GetBlobFileMetaData(vstorage_.GetBlobFiles(), blob_file_number);
ASSERT_NE(meta, nullptr);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
constexpr uint64_t new_garbage_blob_count = 456;
constexpr uint64_t new_garbage_blob_bytes = 111111;
edit.AddBlobFileGarbage(blob_file_number, new_garbage_blob_count,
new_garbage_blob_bytes);
ASSERT_OK(builder.Apply(&edit));
constexpr bool force_consistency_checks = false;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
ASSERT_OK(builder.SaveTo(&new_vstorage));
const auto& new_blob_files = new_vstorage.GetBlobFiles();
ASSERT_EQ(new_blob_files.size(), 1);
const auto new_meta = GetBlobFileMetaData(new_blob_files, blob_file_number);
ASSERT_NE(new_meta, nullptr);
ASSERT_EQ(new_meta->GetSharedMeta(), meta->GetSharedMeta());
ASSERT_EQ(new_meta->GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(new_meta->GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(new_meta->GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(new_meta->GetChecksumMethod(), checksum_method);
ASSERT_EQ(new_meta->GetChecksumValue(), checksum_value);
ASSERT_EQ(new_meta->GetGarbageBlobCount(),
garbage_blob_count + new_garbage_blob_count);
ASSERT_EQ(new_meta->GetGarbageBlobBytes(),
garbage_blob_bytes + new_garbage_blob_bytes);
}
TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileAdditionApplied) {
// Increase the amount of garbage for a blob file added using a version edit.
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit addition;
constexpr uint64_t blob_file_number = 1234;
constexpr uint64_t total_blob_count = 5678;
constexpr uint64_t total_blob_bytes = 999999;
constexpr char checksum_method[] = "SHA1";
constexpr char checksum_value[] = "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
addition.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
ASSERT_OK(builder.Apply(&addition));
constexpr uint64_t garbage_blob_count = 123;
constexpr uint64_t garbage_blob_bytes = 456789;
VersionEdit garbage;
garbage.AddBlobFileGarbage(blob_file_number, garbage_blob_count,
garbage_blob_bytes);
ASSERT_OK(builder.Apply(&garbage));
constexpr bool force_consistency_checks = false;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
ASSERT_OK(builder.SaveTo(&new_vstorage));
const auto& new_blob_files = new_vstorage.GetBlobFiles();
ASSERT_EQ(new_blob_files.size(), 1);
const auto new_meta = GetBlobFileMetaData(new_blob_files, blob_file_number);
ASSERT_NE(new_meta, nullptr);
ASSERT_EQ(new_meta->GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(new_meta->GetTotalBlobCount(), total_blob_count);
ASSERT_EQ(new_meta->GetTotalBlobBytes(), total_blob_bytes);
ASSERT_EQ(new_meta->GetChecksumMethod(), checksum_method);
ASSERT_EQ(new_meta->GetChecksumValue(), checksum_value);
ASSERT_EQ(new_meta->GetGarbageBlobCount(), garbage_blob_count);
ASSERT_EQ(new_meta->GetGarbageBlobBytes(), garbage_blob_bytes);
}
TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileNotFound) {
// Attempt to increase the amount of garbage for a blob file that is
// neither in the base version, nor was it added using a version edit.
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
constexpr uint64_t blob_file_number = 1234;
constexpr uint64_t garbage_blob_count = 5678;
constexpr uint64_t garbage_blob_bytes = 999999;
edit.AddBlobFileGarbage(blob_file_number, garbage_blob_count,
garbage_blob_bytes);
const Status s = builder.Apply(&edit);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "Blob file #1234 not found"));
}
TEST_F(VersionBuilderTest, SaveBlobFilesTo) {
// Add three blob files to base version.
for (uint64_t i = 1; i <= 3; ++i) {
const uint64_t blob_file_number = i;
const uint64_t total_blob_count = i * 1000;
const uint64_t total_blob_bytes = i * 1000000;
const uint64_t garbage_blob_count = i * 100;
const uint64_t garbage_blob_bytes = i * 20000;
AddBlob(blob_file_number, total_blob_count, total_blob_bytes,
/* checksum_method */ std::string(),
/* checksum_value */ std::string(), garbage_blob_count,
garbage_blob_bytes);
}
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
// Add some garbage to the second and third blob files. The second blob file
// remains valid since it does not consist entirely of garbage yet. The third
// blob file is all garbage after the edit and will not be part of the new
// version.
edit.AddBlobFileGarbage(/* blob_file_number */ 2,
/* garbage_blob_count */ 200,
/* garbage_blob_bytes */ 100000);
edit.AddBlobFileGarbage(/* blob_file_number */ 3,
/* garbage_blob_count */ 2700,
/* garbage_blob_bytes */ 2940000);
// Add a fourth blob file.
edit.AddBlobFile(/* blob_file_number */ 4, /* total_blob_count */ 4000,
/* total_blob_bytes */ 4000000,
/* checksum_method */ std::string(),
/* checksum_value */ std::string());
ASSERT_OK(builder.Apply(&edit));
constexpr bool force_consistency_checks = false;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
ASSERT_OK(builder.SaveTo(&new_vstorage));
const auto& new_blob_files = new_vstorage.GetBlobFiles();
ASSERT_EQ(new_blob_files.size(), 3);
const auto meta1 = GetBlobFileMetaData(new_blob_files, 1);
ASSERT_NE(meta1, nullptr);
ASSERT_EQ(meta1->GetBlobFileNumber(), 1);
ASSERT_EQ(meta1->GetTotalBlobCount(), 1000);
ASSERT_EQ(meta1->GetTotalBlobBytes(), 1000000);
ASSERT_EQ(meta1->GetGarbageBlobCount(), 100);
ASSERT_EQ(meta1->GetGarbageBlobBytes(), 20000);
const auto meta2 = GetBlobFileMetaData(new_blob_files, 2);
ASSERT_NE(meta2, nullptr);
ASSERT_EQ(meta2->GetBlobFileNumber(), 2);
ASSERT_EQ(meta2->GetTotalBlobCount(), 2000);
ASSERT_EQ(meta2->GetTotalBlobBytes(), 2000000);
ASSERT_EQ(meta2->GetGarbageBlobCount(), 400);
ASSERT_EQ(meta2->GetGarbageBlobBytes(), 140000);
const auto meta4 = GetBlobFileMetaData(new_blob_files, 4);
ASSERT_NE(meta4, nullptr);
ASSERT_EQ(meta4->GetBlobFileNumber(), 4);
ASSERT_EQ(meta4->GetTotalBlobCount(), 4000);
ASSERT_EQ(meta4->GetTotalBlobBytes(), 4000000);
ASSERT_EQ(meta4->GetGarbageBlobCount(), 0);
ASSERT_EQ(meta4->GetGarbageBlobBytes(), 0);
}
TEST_F(VersionBuilderTest, CheckConsistencyForBlobFiles) {
// Initialize base version. The first table file points to a valid blob file
// in this version; the second one does not refer to any blob files.
Add(/* level */ 1, /* file_number */ 1, /* smallest */ "150",
/* largest */ "200", /* file_size */ 100,
/* path_id */ 0, /* smallest_seq */ 100, /* largest_seq */ 100,
/* num_entries */ 0, /* num_deletions */ 0,
/* sampled */ false, /* smallest_seqno */ 100, /* largest_seqno */ 100,
/* oldest_blob_file_number */ 16);
Add(/* level */ 1, /* file_number */ 23, /* smallest */ "201",
/* largest */ "300", /* file_size */ 100,
/* path_id */ 0, /* smallest_seq */ 200, /* largest_seq */ 200,
/* num_entries */ 0, /* num_deletions */ 0,
/* sampled */ false, /* smallest_seqno */ 200, /* largest_seqno */ 200,
kInvalidBlobFileNumber);
AddBlob(/* blob_file_number */ 16, /* total_blob_count */ 1000,
/* total_blob_bytes */ 1000000,
/* checksum_method */ std::string(),
/* checksum_value */ std::string(),
/* garbage_blob_count */ 500, /* garbage_blob_bytes */ 300000);
UpdateVersionStorageInfo();
// Add a new table file that points to the existing blob file, and add a
// new table file--blob file pair.
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
VersionEdit edit;
edit.AddFile(/* level */ 1, /* file_number */ 606, /* path_id */ 0,
/* file_size */ 100, /* smallest */ GetInternalKey("701"),
/* largest */ GetInternalKey("750"), /* smallest_seqno */ 200,
/* largest_seqno */ 200, /* marked_for_compaction */ false,
/* oldest_blob_file_number */ 16, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
edit.AddFile(/* level */ 1, /* file_number */ 700, /* path_id */ 0,
/* file_size */ 100, /* smallest */ GetInternalKey("801"),
/* largest */ GetInternalKey("850"), /* smallest_seqno */ 200,
/* largest_seqno */ 200, /* marked_for_compaction */ false,
/* oldest_blob_file_number */ 1000, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
edit.AddBlobFile(/* blob_file_number */ 1000, /* total_blob_count */ 2000,
/* total_blob_bytes */ 200000,
/* checksum_method */ std::string(),
/* checksum_value */ std::string());
ASSERT_OK(builder.Apply(&edit));
// Save to a new version in order to trigger consistency checks.
constexpr bool force_consistency_checks = true;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
ASSERT_OK(builder.SaveTo(&new_vstorage));
UnrefFilesInVersion(&new_vstorage);
}
TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesNotInVersion) {
// Initialize base version. The table file points to a blob file that is
// not in this version.
Add(/* level */ 1, /* file_number */ 1, /* smallest */ "150",
/* largest */ "200", /* file_size */ 100,
/* path_id */ 0, /* smallest_seq */ 100, /* largest_seq */ 100,
/* num_entries */ 0, /* num_deletions */ 0,
/* sampled */ false, /* smallest_seqno */ 100, /* largest_seqno */ 100,
/* oldest_blob_file_number */ 256);
AddBlob(/* blob_file_number */ 16, /* total_blob_count */ 1000,
/* total_blob_bytes */ 1000000,
/* checksum_method */ std::string(),
/* checksum_value */ std::string(),
/* garbage_blob_count */ 500, /* garbage_blob_bytes */ 300000);
UpdateVersionStorageInfo();
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
// Save to a new version in order to trigger consistency checks.
constexpr bool force_consistency_checks = true;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
const Status s = builder.SaveTo(&new_vstorage);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(
std::strstr(s.getState(), "Blob file #256 is not part of this version"));
UnrefFilesInVersion(&new_vstorage);
}
TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) {
// Initialize base version. The table file points to a blob file that is
// all garbage.
Add(/* level */ 1, /* file_number */ 1, /* smallest */ "150",
/* largest */ "200", /* file_size */ 100,
/* path_id */ 0, /* smallest_seq */ 100, /* largest_seq */ 100,
/* num_entries */ 0, /* num_deletions */ 0,
/* sampled */ false, /* smallest_seqno */ 100, /* largest_seqno */ 100,
/* oldest_blob_file_number */ 16);
AddBlob(/* blob_file_number */ 16, /* total_blob_count */ 1000,
/* total_blob_bytes */ 1000000,
/* checksum_method */ std::string(),
/* checksum_value */ std::string(),
/* garbage_blob_count */ 1000, /* garbage_blob_bytes */ 1000000);
UpdateVersionStorageInfo();
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
// Save to a new version in order to trigger consistency checks.
constexpr bool force_consistency_checks = true;
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, &vstorage_,
force_consistency_checks);
const Status s = builder.SaveTo(&new_vstorage);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(
std::strstr(s.getState(), "Blob file #16 consists entirely of garbage"));
UnrefFilesInVersion(&new_vstorage);
}
TEST_F(VersionBuilderTest, EstimatedActiveKeys) { TEST_F(VersionBuilderTest, EstimatedActiveKeys) {
const uint32_t kTotalSamples = 20; const uint32_t kTotalSamples = 20;
const uint32_t kNumLevels = 5; const uint32_t kNumLevels = 5;

View File

@ -2599,6 +2599,19 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
level_files->push_back(f); level_files->push_back(f);
} }
void VersionStorageInfo::AddBlobFile(
std::shared_ptr<BlobFileMetaData> blob_file_meta) {
assert(blob_file_meta);
const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
auto it = blob_files_.lower_bound(blob_file_number);
assert(it == blob_files_.end() || it->first != blob_file_number);
blob_files_.insert(
it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
}
// Version::PrepareApply() need to be called before calling the function, or // Version::PrepareApply() need to be called before calling the function, or
// following functions called: // following functions called:
// 1. UpdateNumNonEmptyLevels(); // 1. UpdateNumNonEmptyLevels();
@ -3441,6 +3454,21 @@ std::string Version::DebugString(bool hex, bool print_stats) const {
r.append("\n"); r.append("\n");
} }
} }
const auto& blob_files = storage_info_.GetBlobFiles();
if (!blob_files.empty()) {
r.append("--- blob files --- version# ");
AppendNumberTo(&r, version_number_);
r.append(" ---\n");
for (const auto& pair : blob_files) {
const auto& blob_file_meta = pair.second;
assert(blob_file_meta);
r.append(blob_file_meta->DebugString());
r.push_back('\n');
}
}
return r; return r;
} }

View File

@ -11,8 +11,9 @@
// newest version is called "current". Older versions may be kept // newest version is called "current". Older versions may be kept
// around to provide a consistent view to live iterators. // around to provide a consistent view to live iterators.
// //
// Each Version keeps track of a set of Table files per level. The // Each Version keeps track of a set of table files per level, as well as a
// entire set of versions is maintained in a VersionSet. // set of blob files. The entire set of versions is maintained in a
// VersionSet.
// //
// Version,VersionSet are thread-compatible, but require external // Version,VersionSet are thread-compatible, but require external
// synchronization on all accesses. // synchronization on all accesses.
@ -28,6 +29,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "db/blob/blob_file_meta.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/compaction/compaction.h" #include "db/compaction/compaction.h"
#include "db/compaction/compaction_picker.h" #include "db/compaction/compaction_picker.h"
@ -102,7 +104,7 @@ extern void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
// Information of the storage associated with each Version, including number of // Information of the storage associated with each Version, including number of
// levels of LSM tree, files information at each level, files marked for // levels of LSM tree, files information at each level, files marked for
// compaction, etc. // compaction, blob files, etc.
class VersionStorageInfo { class VersionStorageInfo {
public: public:
VersionStorageInfo(const InternalKeyComparator* internal_comparator, VersionStorageInfo(const InternalKeyComparator* internal_comparator,
@ -119,6 +121,8 @@ class VersionStorageInfo {
void AddFile(int level, FileMetaData* f, Logger* info_log = nullptr); void AddFile(int level, FileMetaData* f, Logger* info_log = nullptr);
void AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta);
void SetFinalized(); void SetFinalized();
// Update num_non_empty_levels_. // Update num_non_empty_levels_.
@ -279,6 +283,10 @@ class VersionStorageInfo {
return files_[level]; return files_[level];
} }
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
using BlobFiles = std::map<uint64_t, std::shared_ptr<BlobFileMetaData>>;
const BlobFiles& GetBlobFiles() const { return blob_files_; }
const ROCKSDB_NAMESPACE::LevelFilesBrief& LevelFilesBrief(int level) const { const ROCKSDB_NAMESPACE::LevelFilesBrief& LevelFilesBrief(int level) const {
assert(level < static_cast<int>(level_files_brief_.size())); assert(level < static_cast<int>(level_files_brief_.size()));
return level_files_brief_[level]; return level_files_brief_[level];
@ -453,6 +461,9 @@ class VersionStorageInfo {
// in increasing order of keys // in increasing order of keys
std::vector<FileMetaData*>* files_; std::vector<FileMetaData*>* files_;
// Map of blob files in version by number.
BlobFiles blob_files_;
// Level that L0 data should be compacted to. All levels < base_level_ should // Level that L0 data should be compacted to. All levels < base_level_ should
// be empty. -1 if it is not level-compaction so it's not applicable. // be empty. -1 if it is not level-compaction so it's not applicable.
int base_level_; int base_level_;
@ -553,8 +564,8 @@ class VersionStorageInfo {
}; };
using MultiGetRange = MultiGetContext::Range; using MultiGetRange = MultiGetContext::Range;
// A column family's version consists of the SST files owned by the column // A column family's version consists of the table and blob files owned by
// family at a certain point in time. // the column family at a certain point in time.
class Version { class Version {
public: public:
// Append to *iters a sequence of iterators that will // Append to *iters a sequence of iterators that will

1
src.mk
View File

@ -6,6 +6,7 @@ LIB_SOURCES = \
db/arena_wrapped_db_iter.cc \ db/arena_wrapped_db_iter.cc \
db/blob/blob_file_addition.cc \ db/blob/blob_file_addition.cc \
db/blob/blob_file_garbage.cc \ db/blob/blob_file_garbage.cc \
db/blob/blob_file_meta.cc \
db/builder.cc \ db/builder.cc \
db/c.cc \ db/c.cc \
db/column_family.cc \ db/column_family.cc \