Blob DB: use compression in file header instead of global options
Summary: To fix the issue of failing to decompress existing value after reopen DB with a different compression settings. Closes https://github.com/facebook/rocksdb/pull/3142 Differential Revision: D6267260 Pulled By: yiwu-arbug fbshipit-source-id: c7cf7f3e33b0cd25520abf4771cdf9180cc02a5f
This commit is contained in:
parent
5dc70a15ca
commit
13b2a9b6ff
@ -330,6 +330,7 @@ Status BlobDBImpl::OpenAllFiles() {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
bfptr->SetHasTTL(bfptr->header_.has_ttl);
|
bfptr->SetHasTTL(bfptr->header_.has_ttl);
|
||||||
|
bfptr->SetCompression(bfptr->header_.compression);
|
||||||
bfptr->header_valid_ = true;
|
bfptr->header_valid_ = true;
|
||||||
|
|
||||||
std::shared_ptr<RandomAccessFileReader> ra_reader =
|
std::shared_ptr<RandomAccessFileReader> ra_reader =
|
||||||
@ -567,6 +568,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
|||||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||||
bfile->header_valid_ = true;
|
bfile->header_valid_ = true;
|
||||||
bfile->SetHasTTL(false);
|
bfile->SetHasTTL(false);
|
||||||
|
bfile->SetCompression(bdb_options_.compression);
|
||||||
|
|
||||||
Status s = writer->WriteHeader(bfile->header_);
|
Status s = writer->WriteHeader(bfile->header_);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -627,6 +629,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
|||||||
;
|
;
|
||||||
bfile->header_valid_ = true;
|
bfile->header_valid_ = true;
|
||||||
bfile->SetHasTTL(true);
|
bfile->SetHasTTL(true);
|
||||||
|
bfile->SetCompression(bdb_options_.compression);
|
||||||
bfile->file_size_ = BlobLogHeader::kSize;
|
bfile->file_size_ = BlobLogHeader::kSize;
|
||||||
|
|
||||||
// set the first value of the range, since that is
|
// set the first value of the range, since that is
|
||||||
@ -882,6 +885,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
|
|||||||
return Status::NotFound("Blob file not found");
|
return Status::NotFound("Blob file not found");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(bfile->compression() == bdb_options_.compression);
|
||||||
std::string compression_output;
|
std::string compression_output;
|
||||||
Slice value_compressed = GetCompressedSlice(value, &compression_output);
|
Slice value_compressed = GetCompressedSlice(value, &compression_output);
|
||||||
|
|
||||||
@ -1196,12 +1200,12 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
|
|
||||||
// TODO(yiwu): Should use compression flag in the blob file instead of
|
// TODO(yiwu): Should use compression flag in the blob file instead of
|
||||||
// current compression option.
|
// current compression option.
|
||||||
if (bdb_options_.compression != kNoCompression) {
|
if (bfile->compression() != kNoCompression) {
|
||||||
BlockContents contents;
|
BlockContents contents;
|
||||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||||
s = UncompressBlockContentsForCompressionType(
|
s = UncompressBlockContentsForCompressionType(
|
||||||
blob_value.data(), blob_value.size(), &contents,
|
blob_value.data(), blob_value.size(), &contents,
|
||||||
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
|
kBlockBasedTableVersionFormat, Slice(), bfile->compression(),
|
||||||
*(cfh->cfd()->ioptions()));
|
*(cfh->cfd()->ioptions()));
|
||||||
*(value->GetSelf()) = contents.data.ToString();
|
*(value->GetSelf()) = contents.data.ToString();
|
||||||
}
|
}
|
||||||
|
@ -58,6 +58,14 @@ class BlobDBTest : public testing::Test {
|
|||||||
ASSERT_OK(TryOpen(bdb_options, options));
|
ASSERT_OK(TryOpen(bdb_options, options));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
|
||||||
|
Options options = Options()) {
|
||||||
|
assert(blob_db_ != nullptr);
|
||||||
|
delete blob_db_;
|
||||||
|
blob_db_ = nullptr;
|
||||||
|
Open(bdb_options, options);
|
||||||
|
}
|
||||||
|
|
||||||
void Destroy() {
|
void Destroy() {
|
||||||
if (blob_db_) {
|
if (blob_db_) {
|
||||||
Options options = blob_db_->GetOptions();
|
Options options = blob_db_->GetOptions();
|
||||||
@ -573,6 +581,24 @@ TEST_F(BlobDBTest, Compression) {
|
|||||||
}
|
}
|
||||||
VerifyDB(data);
|
VerifyDB(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(BlobDBTest, DecompressAfterReopen) {
|
||||||
|
Random rnd(301);
|
||||||
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
|
bdb_options.disable_background_tasks = true;
|
||||||
|
bdb_options.compression = CompressionType::kSnappyCompression;
|
||||||
|
Open(bdb_options);
|
||||||
|
std::map<std::string, std::string> data;
|
||||||
|
for (size_t i = 0; i < 100; i++) {
|
||||||
|
PutRandom("put-key" + ToString(i), &rnd, &data);
|
||||||
|
}
|
||||||
|
VerifyDB(data);
|
||||||
|
bdb_options.compression = CompressionType::kNoCompression;
|
||||||
|
Reopen(bdb_options);
|
||||||
|
VerifyDB(data);
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
TEST_F(BlobDBTest, MultipleWriters) {
|
TEST_F(BlobDBTest, MultipleWriters) {
|
||||||
|
@ -30,6 +30,7 @@ BlobFile::BlobFile()
|
|||||||
: parent_(nullptr),
|
: parent_(nullptr),
|
||||||
file_number_(0),
|
file_number_(0),
|
||||||
has_ttl_(false),
|
has_ttl_(false),
|
||||||
|
compression_(kNoCompression),
|
||||||
blob_count_(0),
|
blob_count_(0),
|
||||||
gc_epoch_(-1),
|
gc_epoch_(-1),
|
||||||
file_size_(0),
|
file_size_(0),
|
||||||
@ -49,6 +50,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
|
|||||||
path_to_dir_(bdir),
|
path_to_dir_(bdir),
|
||||||
file_number_(fn),
|
file_number_(fn),
|
||||||
has_ttl_(false),
|
has_ttl_(false),
|
||||||
|
compression_(kNoCompression),
|
||||||
blob_count_(0),
|
blob_count_(0),
|
||||||
gc_epoch_(-1),
|
gc_epoch_(-1),
|
||||||
file_size_(0),
|
file_size_(0),
|
||||||
|
@ -41,6 +41,9 @@ class BlobFile {
|
|||||||
// have TTL.
|
// have TTL.
|
||||||
bool has_ttl_;
|
bool has_ttl_;
|
||||||
|
|
||||||
|
// Compression type of blobs in the file
|
||||||
|
CompressionType compression_;
|
||||||
|
|
||||||
// number of blobs in the file
|
// number of blobs in the file
|
||||||
std::atomic<uint64_t> blob_count_;
|
std::atomic<uint64_t> blob_count_;
|
||||||
|
|
||||||
@ -173,6 +176,12 @@ class BlobFile {
|
|||||||
|
|
||||||
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
|
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
|
||||||
|
|
||||||
|
CompressionType compression() const { return compression_; }
|
||||||
|
|
||||||
|
void SetCompression(CompressionType compression) {
|
||||||
|
compression_ = compression;
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
|
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user