Blob DB: use compression in file header instead of global options

Summary:
To fix the issue of failing to decompress existing value after reopen DB with a different compression settings.
Closes https://github.com/facebook/rocksdb/pull/3142

Differential Revision: D6267260

Pulled By: yiwu-arbug

fbshipit-source-id: c7cf7f3e33b0cd25520abf4771cdf9180cc02a5f
This commit is contained in:
Yi Wu 2017-11-07 17:40:44 -08:00
parent c3efe60855
commit c468fd127b
4 changed files with 43 additions and 2 deletions

View File

@ -330,6 +330,7 @@ Status BlobDBImpl::OpenAllFiles() {
continue; continue;
} }
bfptr->SetHasTTL(bfptr->header_.has_ttl); bfptr->SetHasTTL(bfptr->header_.has_ttl);
bfptr->SetCompression(bfptr->header_.compression);
bfptr->header_valid_ = true; bfptr->header_valid_ = true;
std::shared_ptr<RandomAccessFileReader> ra_reader = std::shared_ptr<RandomAccessFileReader> ra_reader =
@ -567,6 +568,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID(); reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
bfile->header_valid_ = true; bfile->header_valid_ = true;
bfile->SetHasTTL(false); bfile->SetHasTTL(false);
bfile->SetCompression(bdb_options_.compression);
Status s = writer->WriteHeader(bfile->header_); Status s = writer->WriteHeader(bfile->header_);
if (!s.ok()) { if (!s.ok()) {
@ -627,6 +629,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
; ;
bfile->header_valid_ = true; bfile->header_valid_ = true;
bfile->SetHasTTL(true); bfile->SetHasTTL(true);
bfile->SetCompression(bdb_options_.compression);
bfile->file_size_ = BlobLogHeader::kSize; bfile->file_size_ = BlobLogHeader::kSize;
// set the first value of the range, since that is // set the first value of the range, since that is
@ -882,6 +885,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
return Status::NotFound("Blob file not found"); return Status::NotFound("Blob file not found");
} }
assert(bfile->compression() == bdb_options_.compression);
std::string compression_output; std::string compression_output;
Slice value_compressed = GetCompressedSlice(value, &compression_output); Slice value_compressed = GetCompressedSlice(value, &compression_output);
@ -1196,12 +1200,12 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
// TODO(yiwu): Should use compression flag in the blob file instead of // TODO(yiwu): Should use compression flag in the blob file instead of
// current compression option. // current compression option.
if (bdb_options_.compression != kNoCompression) { if (bfile->compression() != kNoCompression) {
BlockContents contents; BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
blob_value.data(), blob_value.size(), &contents, blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, kBlockBasedTableVersionFormat, Slice(), bfile->compression(),
*(cfh->cfd()->ioptions())); *(cfh->cfd()->ioptions()));
*(value->GetSelf()) = contents.data.ToString(); *(value->GetSelf()) = contents.data.ToString();
} }

View File

@ -58,6 +58,14 @@ class BlobDBTest : public testing::Test {
ASSERT_OK(TryOpen(bdb_options, options)); ASSERT_OK(TryOpen(bdb_options, options));
} }
void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) {
assert(blob_db_ != nullptr);
delete blob_db_;
blob_db_ = nullptr;
Open(bdb_options, options);
}
void Destroy() { void Destroy() {
if (blob_db_) { if (blob_db_) {
Options options = blob_db_->GetOptions(); Options options = blob_db_->GetOptions();
@ -573,6 +581,24 @@ TEST_F(BlobDBTest, Compression) {
} }
VerifyDB(data); VerifyDB(data);
} }
TEST_F(BlobDBTest, DecompressAfterReopen) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = CompressionType::kSnappyCompression;
Open(bdb_options);
std::map<std::string, std::string> data;
for (size_t i = 0; i < 100; i++) {
PutRandom("put-key" + ToString(i), &rnd, &data);
}
VerifyDB(data);
bdb_options.compression = CompressionType::kNoCompression;
Reopen(bdb_options);
VerifyDB(data);
}
#endif #endif
TEST_F(BlobDBTest, MultipleWriters) { TEST_F(BlobDBTest, MultipleWriters) {

View File

@ -30,6 +30,7 @@ BlobFile::BlobFile()
: parent_(nullptr), : parent_(nullptr),
file_number_(0), file_number_(0),
has_ttl_(false), has_ttl_(false),
compression_(kNoCompression),
blob_count_(0), blob_count_(0),
gc_epoch_(-1), gc_epoch_(-1),
file_size_(0), file_size_(0),
@ -49,6 +50,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
path_to_dir_(bdir), path_to_dir_(bdir),
file_number_(fn), file_number_(fn),
has_ttl_(false), has_ttl_(false),
compression_(kNoCompression),
blob_count_(0), blob_count_(0),
gc_epoch_(-1), gc_epoch_(-1),
file_size_(0), file_size_(0),

View File

@ -41,6 +41,9 @@ class BlobFile {
// have TTL. // have TTL.
bool has_ttl_; bool has_ttl_;
// Compression type of blobs in the file
CompressionType compression_;
// number of blobs in the file // number of blobs in the file
std::atomic<uint64_t> blob_count_; std::atomic<uint64_t> blob_count_;
@ -173,6 +176,12 @@ class BlobFile {
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; } void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
CompressionType compression() const { return compression_; }
void SetCompression(CompressionType compression) {
compression_ = compression;
}
std::shared_ptr<Writer> GetWriter() const { return log_writer_; } std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
private: private: