From c384c08a4f699ded6d4e7e66432e464abc88eff4 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 12 May 2020 09:25:21 -0700 Subject: [PATCH] Add tests for compression failure in BlockBasedTableBuilder (#6709) Summary: Currently there is no check for whether BlockBasedTableBuilder will expose compression error status if compression fails during the table building. This commit adds fake faulting compressors and a unit test to test such cases. This check finds 5 bugs, and this commit also fixes them: 1. Not handling compression failure well in BlockBasedTableBuilder::BGWorkWriteRawBlock. 2. verify_compression failing in BlockBasedTableBuilder when used with ZSTD. 3. Wrongly passing the same reference of block contents to BlockBasedTableBuilder::CompressAndVerifyBlock in parallel compression. 4. Wrongly setting block_rep->first_key_in_next_block to nullptr in BlockBasedTableBuilder::EnterUnbuffered when there are still incoming data blocks. 5. Not maintaining variables for compression ratio estimation and first_block in BlockBasedTableBuilder::EnterUnbuffered. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6709 Reviewed By: ajkr Differential Revision: D21236254 fbshipit-source-id: 101f6e62b2bac2b7be72be198adf93cd32a1ff46 --- db/db_test2.cc | 140 ++++++++++++++++++ .../block_based/block_based_table_builder.cc | 94 +++++++++--- table/format.cc | 11 +- util/compression.h | 4 - 4 files changed, 227 insertions(+), 22 deletions(-) diff --git a/db/db_test2.cc b/db/db_test2.cc index 0296d7a7b..59e264ef4 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1268,6 +1268,146 @@ class CompactionCompressionListener : public EventListener { const Options* db_options_; }; +TEST_F(DBTest2, CompressionFailures) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.max_bytes_for_level_base = 1024; + options.max_bytes_for_level_multiplier = 2; + options.num_levels = 7; + options.max_background_compactions = 1; + options.target_file_size_base = 512; + + BlockBasedTableOptions table_options; + table_options.block_size = 512; + table_options.verify_compression = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + + enum CompressionFailureType { + kTestCompressionFail, + kTestDecompressionFail, + kTestDecompressionCorruption + } curr_compression_failure_type; + std::vector compression_failure_types = { + kTestCompressionFail, kTestDecompressionFail, + kTestDecompressionCorruption}; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue", + [&curr_compression_failure_type](void* arg) { + bool* ret = static_cast(arg); + if (curr_compression_failure_type == kTestCompressionFail) { + *ret = false; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "UncompressBlockContentsForCompressionType:TamperWithReturnValue", + [&curr_compression_failure_type](void* arg) { + Status* ret = static_cast(arg); + ASSERT_OK(*ret); + if (curr_compression_failure_type == kTestDecompressionFail) { + *ret = Status::Corruption("kTestDecompressionFail"); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "UncompressBlockContentsForCompressionType:" + "TamperWithDecompressionOutput", + [&curr_compression_failure_type](void* arg) { + if (curr_compression_failure_type == kTestDecompressionCorruption) { + BlockContents* contents = static_cast(arg); + // Ensure uncompressed data != original data + std::unique_ptr fake_data( + new char[contents->data.size() + 1]); + *contents = + BlockContents(std::move(fake_data), contents->data.size() + 1); + } + }); + + std::vector compression_types = GetSupportedCompressions(); + std::vector compression_max_dict_bytes = {0, 10}; + std::vector compression_parallel_threads = {1, 4}; + + std::map key_value_written; + + const int kKeySize = 5; + const int kValUnitSize = 16; + const int kValSize = 256; + Random rnd(405); + + Status s = Status::OK(); + + for (auto compression_failure_type : compression_failure_types) { + curr_compression_failure_type = compression_failure_type; + for (auto compression_type : compression_types) { + if (compression_type == kNoCompression) { + continue; + } + for (auto parallel_threads : compression_parallel_threads) { + for (auto max_dict_bytes : compression_max_dict_bytes) { + options.compression = compression_type; + options.compression_opts.parallel_threads = parallel_threads; + options.compression_opts.max_dict_bytes = max_dict_bytes; + options.bottommost_compression_opts.parallel_threads = + parallel_threads; + options.bottommost_compression_opts.max_dict_bytes = max_dict_bytes; + + DestroyAndReopen(options); + // Write 10 random files + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 5; j++) { + std::string key = RandomString(&rnd, kKeySize); + // Ensure good compression ratio + std::string valueUnit = RandomString(&rnd, kValUnitSize); + std::string value; + for (int k = 0; k < kValSize; k += kValUnitSize) { + value += valueUnit; + } + s = Put(key, value); + if (compression_failure_type == kTestCompressionFail) { + key_value_written[key] = value; + ASSERT_OK(s); + } + } + s = Flush(); + if (compression_failure_type == kTestCompressionFail) { + ASSERT_OK(s); + } + s = dbfull()->TEST_WaitForCompact(); + if (compression_failure_type == kTestCompressionFail) { + ASSERT_OK(s); + } + if (i == 4) { + // Make compression fail at the mid of table building + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + if (compression_failure_type == kTestCompressionFail) { + // Should be kNoCompression, check content consistency + std::unique_ptr db_iter(db_->NewIterator(ReadOptions())); + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + std::string key = db_iter->key().ToString(); + std::string value = db_iter->value().ToString(); + ASSERT_NE(key_value_written.find(key), key_value_written.end()); + ASSERT_EQ(key_value_written[key], value); + key_value_written.erase(key); + } + ASSERT_EQ(0, key_value_written.size()); + } else if (compression_failure_type == kTestDecompressionFail) { + ASSERT_EQ(std::string(s.getState()), + "Could not decompress: kTestDecompressionFail"); + } else if (compression_failure_type == kTestDecompressionCorruption) { + ASSERT_EQ(std::string(s.getState()), + "Decompressed block did not match raw block"); + } + } + } + } + } +} + TEST_F(DBTest2, CompressionOptions) { if (!Zlib_Supported() || !Snappy_Supported()) { return; diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 9a7e379ad..f6f0b95a1 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -109,42 +109,57 @@ bool CompressBlockInternal(const Slice& raw, const CompressionInfo& compression_info, uint32_t format_version, std::string* compressed_output) { + bool ret; + // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". switch (compression_info.type()) { case kSnappyCompression: - return Snappy_Compress(compression_info, raw.data(), raw.size(), - compressed_output); + ret = Snappy_Compress(compression_info, raw.data(), raw.size(), + compressed_output); + break; case kZlibCompression: - return Zlib_Compress( + ret = Zlib_Compress( compression_info, GetCompressFormatForVersion(kZlibCompression, format_version), raw.data(), raw.size(), compressed_output); + break; case kBZip2Compression: - return BZip2_Compress( + ret = BZip2_Compress( compression_info, GetCompressFormatForVersion(kBZip2Compression, format_version), raw.data(), raw.size(), compressed_output); + break; case kLZ4Compression: - return LZ4_Compress( + ret = LZ4_Compress( compression_info, GetCompressFormatForVersion(kLZ4Compression, format_version), raw.data(), raw.size(), compressed_output); + break; case kLZ4HCCompression: - return LZ4HC_Compress( + ret = LZ4HC_Compress( compression_info, GetCompressFormatForVersion(kLZ4HCCompression, format_version), raw.data(), raw.size(), compressed_output); + break; case kXpressCompression: - return XPRESS_Compress(raw.data(), raw.size(), compressed_output); + ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output); + break; case kZSTD: case kZSTDNotFinalCompression: - return ZSTD_Compress(compression_info, raw.data(), raw.size(), - compressed_output); + ret = ZSTD_Compress(compression_info, raw.data(), raw.size(), + compressed_output); + break; default: // Do not recognize this compression type - return false; + ret = false; } + + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue", + static_cast(&ret)); + + return ret; } } // namespace @@ -512,8 +527,7 @@ struct BlockBasedTableBuilder::Rep { _moptions.prefix_extractor != nullptr)); if (table_options.verify_compression) { for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { - verify_ctxs[i].reset(new UncompressionContext( - UncompressionContext::NoCache(), compression_type)); + verify_ctxs[i].reset(new UncompressionContext(compression_type)); } } } @@ -568,6 +582,7 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { // block_rep_pool during parallel compression. struct BlockRep { Slice contents; + Slice compressed_contents; std::unique_ptr data; std::unique_ptr compressed_data; CompressionType compression_type; @@ -656,6 +671,7 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { finished(false) { for (uint32_t i = 0; i < parallel_threads; i++) { block_rep_buf[i].contents = Slice(); + block_rep_buf[i].compressed_contents = Slice(); block_rep_buf[i].data.reset(new std::string()); block_rep_buf[i].compressed_data.reset(new std::string()); block_rep_buf[i].compression_type = CompressionType(); @@ -943,8 +959,8 @@ void BlockBasedTableBuilder::BGWorkCompression( CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ compression_ctx, verify_ctx, block_rep->compressed_data.get(), - &block_rep->contents, &(block_rep->compression_type), - &block_rep->status); + &block_rep->compressed_contents, + &(block_rep->compression_type), &block_rep->status); block_rep->slot->Fill(block_rep); } } @@ -1024,7 +1040,8 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( } } else { // Decompression reported an error. abort. - *out_status = Status::Corruption("Could not decompress"); + *out_status = Status::Corruption(std::string("Could not decompress: ") + + stat.getState()); abort_compression = true; } } @@ -1153,6 +1170,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub( 1, std::memory_order_relaxed) - 1; + assert(new_blocks_inflight < r->compression_opts.parallel_threads); r->pc_rep->estimated_file_size.store( r->get_offset() + static_cast( @@ -1183,6 +1201,17 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { slot->Take(block_rep); if (!block_rep->status.ok()) { r->SetStatus(block_rep->status); + // Return block_rep to the pool so that blocked Flush() can finish + // if there is one, and Flush() will notice !ok() next time. + block_rep->status = Status::OK(); + block_rep->compressed_data->clear(); + r->pc_rep->block_rep_pool.push(block_rep); + // Unlock first block if necessary. + if (r->pc_rep->first_block) { + std::lock_guard lock(r->pc_rep->first_block_mutex); + r->pc_rep->first_block = false; + r->pc_rep->first_block_cond.notify_one(); + } break; } @@ -1197,7 +1226,7 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { } r->pc_rep->raw_bytes_curr_block = block_rep->data->size(); - WriteRawBlock(block_rep->contents, block_rep->compression_type, + WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type, &r->pending_handle, true /* is_data_block*/); if (!ok()) { break; @@ -1569,9 +1598,34 @@ void BlockBasedTableBuilder::EnterUnbuffered() { block_rep->first_key_in_next_block->assign( r->data_block_and_keys_buffers[i + 1].second.front()); } else { - block_rep->first_key_in_next_block.reset(nullptr); + if (r->first_key_in_next_block == nullptr) { + block_rep->first_key_in_next_block.reset(nullptr); + } else { + block_rep->first_key_in_next_block->assign( + r->first_key_in_next_block->data(), + r->first_key_in_next_block->size()); + } } + uint64_t new_raw_bytes_inflight = + r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(), + std::memory_order_relaxed) + + block_rep->data->size(); + uint64_t new_blocks_inflight = + r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) + + 1; + r->pc_rep->estimated_file_size.store( + r->get_offset() + + static_cast( + static_cast(new_raw_bytes_inflight) * + r->pc_rep->curr_compression_ratio.load( + std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + + // Read out first_block here to avoid data race with BGWorkWriteRawBlock + bool first_block = r->pc_rep->first_block; + assert(block_rep->status.ok()); if (!r->pc_rep->write_queue.push(block_rep->slot.get())) { return; @@ -1579,6 +1633,12 @@ void BlockBasedTableBuilder::EnterUnbuffered() { if (!r->pc_rep->compress_queue.push(block_rep)) { return; } + + if (first_block) { + std::unique_lock lock(r->pc_rep->first_block_mutex); + r->pc_rep->first_block_cond.wait( + lock, [r] { return !r->pc_rep->first_block; }); + } } else { for (const auto& key : keys) { if (r->filter_builder != nullptr) { diff --git a/table/format.cc b/table/format.cc index 04a54e42d..de4e29664 100644 --- a/table/format.cc +++ b/table/format.cc @@ -341,6 +341,7 @@ Status UncompressBlockContentsForCompressionType( const UncompressionInfo& uncompression_info, const char* data, size_t n, BlockContents* contents, uint32_t format_version, const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { + Status ret = Status::OK(); CacheAllocationPtr ubuf; assert(uncompression_info.type() != kNoCompression && @@ -447,7 +448,15 @@ Status UncompressBlockContentsForCompressionType( contents->data.size()); RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED); - return Status::OK(); + TEST_SYNC_POINT_CALLBACK( + "UncompressBlockContentsForCompressionType:TamperWithReturnValue", + static_cast(&ret)); + TEST_SYNC_POINT_CALLBACK( + "UncompressBlockContentsForCompressionType:" + "TamperWithDecompressionOutput", + static_cast(contents)); + + return ret; } // diff --git a/util/compression.h b/util/compression.h index 8169841ba..6aac7eb1e 100644 --- a/util/compression.h +++ b/util/compression.h @@ -420,10 +420,6 @@ class UncompressionContext { ZSTDUncompressCachedData uncomp_cached_data_; public: - struct NoCache {}; - // Do not use context cache, used by TableBuilder - UncompressionContext(NoCache, CompressionType /* type */) {} - explicit UncompressionContext(CompressionType type) { if (type == kZSTD || type == kZSTDNotFinalCompression) { ctx_cache_ = CompressionContextCache::Instance();