Add tests for compression failure in BlockBasedTableBuilder (#6709)

Summary:
Currently there is no check for whether BlockBasedTableBuilder will expose
compression error status if compression fails during the table building.
This commit adds fake faulting compressors and a unit test to test such
cases.

This check finds 5 bugs, and this commit also fixes them:

1. Not handling compression failure well in
   BlockBasedTableBuilder::BGWorkWriteRawBlock.
2. verify_compression failing in BlockBasedTableBuilder when used with ZSTD.
3. Wrongly passing the same reference of block contents to
   BlockBasedTableBuilder::CompressAndVerifyBlock in parallel compression.
4. Wrongly setting block_rep->first_key_in_next_block to nullptr in
   BlockBasedTableBuilder::EnterUnbuffered when there are still incoming data
   blocks.
5. Not maintaining variables for compression ratio estimation and first_block
   in BlockBasedTableBuilder::EnterUnbuffered.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6709

Reviewed By: ajkr

Differential Revision: D21236254

fbshipit-source-id: 101f6e62b2bac2b7be72be198adf93cd32a1ff46
This commit is contained in:
Ziyue Yang 2020-05-12 09:25:21 -07:00 committed by Facebook GitHub Bot
parent 3f218074ee
commit c384c08a4f
4 changed files with 227 additions and 22 deletions

View File

@ -1268,6 +1268,146 @@ class CompactionCompressionListener : public EventListener {
const Options* db_options_; const Options* db_options_;
}; };
TEST_F(DBTest2, CompressionFailures) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 1024;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
options.target_file_size_base = 512;
BlockBasedTableOptions table_options;
table_options.block_size = 512;
table_options.verify_compression = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
enum CompressionFailureType {
kTestCompressionFail,
kTestDecompressionFail,
kTestDecompressionCorruption
} curr_compression_failure_type;
std::vector<CompressionFailureType> compression_failure_types = {
kTestCompressionFail, kTestDecompressionFail,
kTestDecompressionCorruption};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue",
[&curr_compression_failure_type](void* arg) {
bool* ret = static_cast<bool*>(arg);
if (curr_compression_failure_type == kTestCompressionFail) {
*ret = false;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UncompressBlockContentsForCompressionType:TamperWithReturnValue",
[&curr_compression_failure_type](void* arg) {
Status* ret = static_cast<Status*>(arg);
ASSERT_OK(*ret);
if (curr_compression_failure_type == kTestDecompressionFail) {
*ret = Status::Corruption("kTestDecompressionFail");
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UncompressBlockContentsForCompressionType:"
"TamperWithDecompressionOutput",
[&curr_compression_failure_type](void* arg) {
if (curr_compression_failure_type == kTestDecompressionCorruption) {
BlockContents* contents = static_cast<BlockContents*>(arg);
// Ensure uncompressed data != original data
std::unique_ptr<char[]> fake_data(
new char[contents->data.size() + 1]);
*contents =
BlockContents(std::move(fake_data), contents->data.size() + 1);
}
});
std::vector<CompressionType> compression_types = GetSupportedCompressions();
std::vector<uint32_t> compression_max_dict_bytes = {0, 10};
std::vector<uint32_t> compression_parallel_threads = {1, 4};
std::map<std::string, std::string> key_value_written;
const int kKeySize = 5;
const int kValUnitSize = 16;
const int kValSize = 256;
Random rnd(405);
Status s = Status::OK();
for (auto compression_failure_type : compression_failure_types) {
curr_compression_failure_type = compression_failure_type;
for (auto compression_type : compression_types) {
if (compression_type == kNoCompression) {
continue;
}
for (auto parallel_threads : compression_parallel_threads) {
for (auto max_dict_bytes : compression_max_dict_bytes) {
options.compression = compression_type;
options.compression_opts.parallel_threads = parallel_threads;
options.compression_opts.max_dict_bytes = max_dict_bytes;
options.bottommost_compression_opts.parallel_threads =
parallel_threads;
options.bottommost_compression_opts.max_dict_bytes = max_dict_bytes;
DestroyAndReopen(options);
// Write 10 random files
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
std::string key = RandomString(&rnd, kKeySize);
// Ensure good compression ratio
std::string valueUnit = RandomString(&rnd, kValUnitSize);
std::string value;
for (int k = 0; k < kValSize; k += kValUnitSize) {
value += valueUnit;
}
s = Put(key, value);
if (compression_failure_type == kTestCompressionFail) {
key_value_written[key] = value;
ASSERT_OK(s);
}
}
s = Flush();
if (compression_failure_type == kTestCompressionFail) {
ASSERT_OK(s);
}
s = dbfull()->TEST_WaitForCompact();
if (compression_failure_type == kTestCompressionFail) {
ASSERT_OK(s);
}
if (i == 4) {
// Make compression fail at the mid of table building
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
if (compression_failure_type == kTestCompressionFail) {
// Should be kNoCompression, check content consistency
std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::string key = db_iter->key().ToString();
std::string value = db_iter->value().ToString();
ASSERT_NE(key_value_written.find(key), key_value_written.end());
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
ASSERT_EQ(0, key_value_written.size());
} else if (compression_failure_type == kTestDecompressionFail) {
ASSERT_EQ(std::string(s.getState()),
"Could not decompress: kTestDecompressionFail");
} else if (compression_failure_type == kTestDecompressionCorruption) {
ASSERT_EQ(std::string(s.getState()),
"Decompressed block did not match raw block");
}
}
}
}
}
}
TEST_F(DBTest2, CompressionOptions) { TEST_F(DBTest2, CompressionOptions) {
if (!Zlib_Supported() || !Snappy_Supported()) { if (!Zlib_Supported() || !Snappy_Supported()) {
return; return;

View File

@ -109,42 +109,57 @@ bool CompressBlockInternal(const Slice& raw,
const CompressionInfo& compression_info, const CompressionInfo& compression_info,
uint32_t format_version, uint32_t format_version,
std::string* compressed_output) { std::string* compressed_output) {
bool ret;
// Will return compressed block contents if (1) the compression method is // Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough". // supported in this platform and (2) the compression rate is "good enough".
switch (compression_info.type()) { switch (compression_info.type()) {
case kSnappyCompression: case kSnappyCompression:
return Snappy_Compress(compression_info, raw.data(), raw.size(), ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
compressed_output); compressed_output);
break;
case kZlibCompression: case kZlibCompression:
return Zlib_Compress( ret = Zlib_Compress(
compression_info, compression_info,
GetCompressFormatForVersion(kZlibCompression, format_version), GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output); raw.data(), raw.size(), compressed_output);
break;
case kBZip2Compression: case kBZip2Compression:
return BZip2_Compress( ret = BZip2_Compress(
compression_info, compression_info,
GetCompressFormatForVersion(kBZip2Compression, format_version), GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output); raw.data(), raw.size(), compressed_output);
break;
case kLZ4Compression: case kLZ4Compression:
return LZ4_Compress( ret = LZ4_Compress(
compression_info, compression_info,
GetCompressFormatForVersion(kLZ4Compression, format_version), GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output); raw.data(), raw.size(), compressed_output);
break;
case kLZ4HCCompression: case kLZ4HCCompression:
return LZ4HC_Compress( ret = LZ4HC_Compress(
compression_info, compression_info,
GetCompressFormatForVersion(kLZ4HCCompression, format_version), GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output); raw.data(), raw.size(), compressed_output);
break;
case kXpressCompression: case kXpressCompression:
return XPRESS_Compress(raw.data(), raw.size(), compressed_output); ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
break;
case kZSTD: case kZSTD:
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
return ZSTD_Compress(compression_info, raw.data(), raw.size(), ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
compressed_output); compressed_output);
break;
default: default:
// Do not recognize this compression type // Do not recognize this compression type
return false; ret = false;
} }
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue",
static_cast<void*>(&ret));
return ret;
} }
} // namespace } // namespace
@ -512,8 +527,7 @@ struct BlockBasedTableBuilder::Rep {
_moptions.prefix_extractor != nullptr)); _moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) { if (table_options.verify_compression) {
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
verify_ctxs[i].reset(new UncompressionContext( verify_ctxs[i].reset(new UncompressionContext(compression_type));
UncompressionContext::NoCache(), compression_type));
} }
} }
} }
@ -568,6 +582,7 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
// block_rep_pool during parallel compression. // block_rep_pool during parallel compression.
struct BlockRep { struct BlockRep {
Slice contents; Slice contents;
Slice compressed_contents;
std::unique_ptr<std::string> data; std::unique_ptr<std::string> data;
std::unique_ptr<std::string> compressed_data; std::unique_ptr<std::string> compressed_data;
CompressionType compression_type; CompressionType compression_type;
@ -656,6 +671,7 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
finished(false) { finished(false) {
for (uint32_t i = 0; i < parallel_threads; i++) { for (uint32_t i = 0; i < parallel_threads; i++) {
block_rep_buf[i].contents = Slice(); block_rep_buf[i].contents = Slice();
block_rep_buf[i].compressed_contents = Slice();
block_rep_buf[i].data.reset(new std::string()); block_rep_buf[i].data.reset(new std::string());
block_rep_buf[i].compressed_data.reset(new std::string()); block_rep_buf[i].compressed_data.reset(new std::string());
block_rep_buf[i].compression_type = CompressionType(); block_rep_buf[i].compression_type = CompressionType();
@ -943,8 +959,8 @@ void BlockBasedTableBuilder::BGWorkCompression(
CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
compression_ctx, verify_ctx, compression_ctx, verify_ctx,
block_rep->compressed_data.get(), block_rep->compressed_data.get(),
&block_rep->contents, &(block_rep->compression_type), &block_rep->compressed_contents,
&block_rep->status); &(block_rep->compression_type), &block_rep->status);
block_rep->slot->Fill(block_rep); block_rep->slot->Fill(block_rep);
} }
} }
@ -1024,7 +1040,8 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
} }
} else { } else {
// Decompression reported an error. abort. // Decompression reported an error. abort.
*out_status = Status::Corruption("Could not decompress"); *out_status = Status::Corruption(std::string("Could not decompress: ") +
stat.getState());
abort_compression = true; abort_compression = true;
} }
} }
@ -1153,6 +1170,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub( uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub(
1, std::memory_order_relaxed) - 1, std::memory_order_relaxed) -
1; 1;
assert(new_blocks_inflight < r->compression_opts.parallel_threads);
r->pc_rep->estimated_file_size.store( r->pc_rep->estimated_file_size.store(
r->get_offset() + r->get_offset() +
static_cast<uint64_t>( static_cast<uint64_t>(
@ -1183,6 +1201,17 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
slot->Take(block_rep); slot->Take(block_rep);
if (!block_rep->status.ok()) { if (!block_rep->status.ok()) {
r->SetStatus(block_rep->status); r->SetStatus(block_rep->status);
// Return block_rep to the pool so that blocked Flush() can finish
// if there is one, and Flush() will notice !ok() next time.
block_rep->status = Status::OK();
block_rep->compressed_data->clear();
r->pc_rep->block_rep_pool.push(block_rep);
// Unlock first block if necessary.
if (r->pc_rep->first_block) {
std::lock_guard<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block = false;
r->pc_rep->first_block_cond.notify_one();
}
break; break;
} }
@ -1197,7 +1226,7 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
} }
r->pc_rep->raw_bytes_curr_block = block_rep->data->size(); r->pc_rep->raw_bytes_curr_block = block_rep->data->size();
WriteRawBlock(block_rep->contents, block_rep->compression_type, WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
&r->pending_handle, true /* is_data_block*/); &r->pending_handle, true /* is_data_block*/);
if (!ok()) { if (!ok()) {
break; break;
@ -1569,9 +1598,34 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
block_rep->first_key_in_next_block->assign( block_rep->first_key_in_next_block->assign(
r->data_block_and_keys_buffers[i + 1].second.front()); r->data_block_and_keys_buffers[i + 1].second.front());
} else { } else {
block_rep->first_key_in_next_block.reset(nullptr); if (r->first_key_in_next_block == nullptr) {
block_rep->first_key_in_next_block.reset(nullptr);
} else {
block_rep->first_key_in_next_block->assign(
r->first_key_in_next_block->data(),
r->first_key_in_next_block->size());
}
} }
uint64_t new_raw_bytes_inflight =
r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(),
std::memory_order_relaxed) +
block_rep->data->size();
uint64_t new_blocks_inflight =
r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) +
1;
r->pc_rep->estimated_file_size.store(
r->get_offset() +
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) *
r->pc_rep->curr_compression_ratio.load(
std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
// Read out first_block here to avoid data race with BGWorkWriteRawBlock
bool first_block = r->pc_rep->first_block;
assert(block_rep->status.ok()); assert(block_rep->status.ok());
if (!r->pc_rep->write_queue.push(block_rep->slot.get())) { if (!r->pc_rep->write_queue.push(block_rep->slot.get())) {
return; return;
@ -1579,6 +1633,12 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
if (!r->pc_rep->compress_queue.push(block_rep)) { if (!r->pc_rep->compress_queue.push(block_rep)) {
return; return;
} }
if (first_block) {
std::unique_lock<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block_cond.wait(
lock, [r] { return !r->pc_rep->first_block; });
}
} else { } else {
for (const auto& key : keys) { for (const auto& key : keys) {
if (r->filter_builder != nullptr) { if (r->filter_builder != nullptr) {

View File

@ -341,6 +341,7 @@ Status UncompressBlockContentsForCompressionType(
const UncompressionInfo& uncompression_info, const char* data, size_t n, const UncompressionInfo& uncompression_info, const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
Status ret = Status::OK();
CacheAllocationPtr ubuf; CacheAllocationPtr ubuf;
assert(uncompression_info.type() != kNoCompression && assert(uncompression_info.type() != kNoCompression &&
@ -447,7 +448,15 @@ Status UncompressBlockContentsForCompressionType(
contents->data.size()); contents->data.size());
RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED); RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
return Status::OK(); TEST_SYNC_POINT_CALLBACK(
"UncompressBlockContentsForCompressionType:TamperWithReturnValue",
static_cast<void*>(&ret));
TEST_SYNC_POINT_CALLBACK(
"UncompressBlockContentsForCompressionType:"
"TamperWithDecompressionOutput",
static_cast<void*>(contents));
return ret;
} }
// //

View File

@ -420,10 +420,6 @@ class UncompressionContext {
ZSTDUncompressCachedData uncomp_cached_data_; ZSTDUncompressCachedData uncomp_cached_data_;
public: public:
struct NoCache {};
// Do not use context cache, used by TableBuilder
UncompressionContext(NoCache, CompressionType /* type */) {}
explicit UncompressionContext(CompressionType type) { explicit UncompressionContext(CompressionType type) {
if (type == kZSTD || type == kZSTDNotFinalCompression) { if (type == kZSTD || type == kZSTDNotFinalCompression) {
ctx_cache_ = CompressionContextCache::Instance(); ctx_cache_ = CompressionContextCache::Instance();