diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 94c6762a2..44eedffb6 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -600,6 +600,8 @@ Status CompactionJob::Run() { compaction_stats_.micros = env_->NowMicros() - start_micros; MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); + TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify"); + // Check if any thread encountered an error during execution Status status; for (const auto& state : compact_->sub_compact_states) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 519bf3371..7800144ba 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2649,6 +2649,110 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) { #endif } +TEST_F(DBTest2, IteratorPinnedMemory) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions bbto; + bbto.no_block_cache = false; + bbto.cache_index_and_filter_blocks = false; + bbto.block_cache = NewLRUCache(100000); + bbto.block_size = 400; // small block size + options.table_factory.reset(new BlockBasedTableFactory(bbto)); + Reopen(options); + + Random rnd(301); + std::string v = RandomString(&rnd, 400); + + // Since v is the size of a block, each key should take a block + // of 400+ bytes. + Put("1", v); + Put("3", v); + Put("5", v); + Put("7", v); + ASSERT_OK(Flush()); + + ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage()); + + // Verify that iterators don't pin more than one data block in block cache + // at each time. + { + unique_ptr iter(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + + for (int i = 0; i < 4; i++) { + ASSERT_TRUE(iter->Valid()); + // Block cache should contain exactly one block. + ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0); + ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + + iter->Seek("4"); + ASSERT_TRUE(iter->Valid()); + + ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0); + ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800); + + iter->Seek("3"); + ASSERT_TRUE(iter->Valid()); + + ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0); + ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800); + } + ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage()); + + // Test compaction case + Put("2", v); + Put("5", v); + Put("6", v); + Put("8", v); + ASSERT_OK(Flush()); + + // Clear existing data in block cache + bbto.block_cache->SetCapacity(0); + bbto.block_cache->SetCapacity(100000); + + // Verify compaction input iterators don't hold more than one data blocks at + // one time. + std::atomic finished(false); + std::atomic block_newed(0); + std::atomic block_destroyed(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Block::Block:0", [&](void* /*arg*/) { + if (finished) { + return; + } + // Two iterators. At most 2 outstanding blocks. + EXPECT_GE(block_newed.load(), block_destroyed.load()); + EXPECT_LE(block_newed.load(), block_destroyed.load() + 1); + block_newed.fetch_add(1); + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Block::~Block", [&](void* /*arg*/) { + if (finished) { + return; + } + // Two iterators. At most 2 outstanding blocks. + EXPECT_GE(block_newed.load(), block_destroyed.load() + 1); + EXPECT_LE(block_newed.load(), block_destroyed.load() + 2); + block_destroyed.fetch_add(1); + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run:BeforeVerify", + [&](void* /*arg*/) { finished = true; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Two input files. Each of them has 4 data blocks. + ASSERT_EQ(8, block_newed.load()); + ASSERT_EQ(8, block_destroyed.load()); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, TestBBTTailPrefetch) { std::atomic called(false); size_t expected_lower_bound = 512 * 1024; diff --git a/table/block.cc b/table/block.cc index 020a09baf..27db99e78 100644 --- a/table/block.cc +++ b/table/block.cc @@ -611,6 +611,8 @@ uint32_t Block::NumRestarts() const { return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); } +Block::~Block() { TEST_SYNC_POINT("Block::~Block"); } + Block::Block(BlockContents&& contents, SequenceNumber _global_seqno, size_t read_amp_bytes_per_bit, Statistics* statistics) : contents_(std::move(contents)), @@ -619,6 +621,7 @@ Block::Block(BlockContents&& contents, SequenceNumber _global_seqno, restart_offset_(0), num_restarts_(0), global_seqno_(_global_seqno) { + TEST_SYNC_POINT("Block::Block:0"); if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { diff --git a/table/block.h b/table/block.h index a29ea1689..8576c9f2c 100644 --- a/table/block.h +++ b/table/block.h @@ -147,7 +147,7 @@ class Block { size_t read_amp_bytes_per_bit = 0, Statistics* statistics = nullptr); - ~Block() = default; + ~Block(); size_t size() const { return size_; } const char* data() const { return data_; }