diff --git a/db/db_test2.cc b/db/db_test2.cc index c5369371d..baf00f587 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1859,6 +1859,167 @@ TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) { options.max_successive_merges = 3; Reopen(options); } + +size_t GetEncodedEntrySize(size_t key_size, size_t value_size) { + std::string buffer; + + PutVarint32(&buffer, static_cast(0)); + PutVarint32(&buffer, static_cast(key_size)); + PutVarint32(&buffer, static_cast(value_size)); + + return buffer.size() + key_size + value_size; +} + +TEST_F(DBTest2, ReadAmpBitmap) { + Options options = CurrentOptions(); + BlockBasedTableOptions bbto; + // Disable delta encoding to make it easier to calculate read amplification + bbto.use_delta_encoding = false; + // Huge block cache to make it easier to calculate read amplification + bbto.block_cache = NewLRUCache(1024 * 1024 * 1024); + bbto.read_amp_bytes_per_bit = 16; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.statistics = rocksdb::CreateDBStatistics(); + DestroyAndReopen(options); + + const size_t kNumEntries = 10000; + + Random rnd(301); + for (size_t i = 0; i < kNumEntries; i++) { + ASSERT_OK(Put(Key(static_cast(i)), RandomString(&rnd, 100))); + } + ASSERT_OK(Flush()); + + Close(); + Reopen(options); + + // Read keys/values randomly and verify that reported read amp error + // is less than 2% + uint64_t total_useful_bytes = 0; + std::set read_keys; + std::string value; + for (size_t i = 0; i < kNumEntries * 5; i++) { + int key_idx = rnd.Next() % kNumEntries; + std::string k = Key(key_idx); + ASSERT_OK(db_->Get(ReadOptions(), k, &value)); + + if (read_keys.find(key_idx) == read_keys.end()) { + auto ik = InternalKey(k, 0, ValueType::kTypeValue); + total_useful_bytes += GetEncodedEntrySize(ik.size(), value.size()); + read_keys.insert(key_idx); + } + + double expected_read_amp = + static_cast(total_useful_bytes) / + options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + double read_amp = + static_cast(options.statistics->getTickerCount( + READ_AMP_ESTIMATE_USEFUL_BYTES)) / + options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + double error_pct = fabs(expected_read_amp - read_amp) * 100; + // Error between reported read amp and real read amp should be less than 2% + EXPECT_LE(error_pct, 2); + } + + // Make sure we read every thing in the DB (which is smaller than our cache) + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString())); + } + delete iter; + + // Read amp is 100% since we read all what we loaded in memory + ASSERT_EQ(options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES), + options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES)); +} + +TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) { + if (dbname_.find("dev/shm") != std::string::npos) { + // /dev/shm dont support getting a unique file id, this mean that + // running this test on /dev/shm will fail because lru_cache will load + // the blocks again regardless of them being already in the cache + return; + } + + std::shared_ptr lru_cache = NewLRUCache(1024 * 1024 * 1024); + std::shared_ptr stats = rocksdb::CreateDBStatistics(); + + Options options = CurrentOptions(); + BlockBasedTableOptions bbto; + // Disable delta encoding to make it easier to calculate read amplification + bbto.use_delta_encoding = false; + // Huge block cache to make it easier to calculate read amplification + bbto.block_cache = lru_cache; + bbto.read_amp_bytes_per_bit = 16; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.statistics = stats; + DestroyAndReopen(options); + + const int kNumEntries = 10000; + + Random rnd(301); + for (int i = 0; i < kNumEntries; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 100))); + } + ASSERT_OK(Flush()); + + Close(); + Reopen(options); + + uint64_t total_useful_bytes = 0; + std::set read_keys; + std::string value; + // Iter1: Read half the DB, Read even keys + // Key(0), Key(2), Key(4), Key(6), Key(8), ... + for (int i = 0; i < kNumEntries; i += 2) { + std::string k = Key(i); + ASSERT_OK(db_->Get(ReadOptions(), k, &value)); + + if (read_keys.find(i) == read_keys.end()) { + auto ik = InternalKey(k, 0, ValueType::kTypeValue); + total_useful_bytes += GetEncodedEntrySize(ik.size(), value.size()); + read_keys.insert(i); + } + } + + size_t total_useful_bytes_iter1 = + options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES); + size_t total_loaded_bytes_iter1 = + options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + Close(); + std::shared_ptr new_statistics = rocksdb::CreateDBStatistics(); + // Destroy old statistics obj that the blocks in lru_cache are pointing to + options.statistics.reset(); + // Use the statistics object that we just created + options.statistics = new_statistics; + Reopen(options); + + // Iter2: Read half the DB, Read odd keys + // Key(1), Key(3), Key(5), Key(7), Key(9), ... + for (int i = 1; i < kNumEntries; i += 2) { + std::string k = Key(i); + ASSERT_OK(db_->Get(ReadOptions(), k, &value)); + + if (read_keys.find(i) == read_keys.end()) { + auto ik = InternalKey(k, 0, ValueType::kTypeValue); + total_useful_bytes += GetEncodedEntrySize(ik.size(), value.size()); + read_keys.insert(i); + } + } + + size_t total_useful_bytes_iter2 = + options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES); + size_t total_loaded_bytes_iter2 = + options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + // We reached read_amp of 100% because we read all the keys in the DB + ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2, + total_loaded_bytes_iter1 + total_loaded_bytes_iter2); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index bb7134efe..6e2454ab6 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -197,6 +197,14 @@ enum Tickers : uint32_t { ROW_CACHE_HIT, ROW_CACHE_MISS, + // Read amplification statistics. + // Read amplification can be calculated using this formula + // (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES) + // + // REQUIRES: ReadOptions::read_amp_bytes_per_bit to be enabled + READ_AMP_ESTIMATE_USEFUL_BYTES, // Estimate of total bytes actually used. + READ_AMP_TOTAL_READ_BYTES, // Total size of loaded data blocks. + TICKER_ENUM_MAX }; @@ -291,6 +299,8 @@ const std::vector> TickersNameMap = { {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, {ROW_CACHE_HIT, "rocksdb.row.cache.hit"}, {ROW_CACHE_MISS, "rocksdb.row.cache.miss"}, + {READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"}, + {READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"}, }; /** diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 3bde6bdc5..2342f724c 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -173,6 +173,29 @@ struct BlockBasedTableOptions { // algorithms. bool verify_compression = false; + // If used, For every data block we load into memory, we will create a bitmap + // of size ((block_size / `read_amp_bytes_per_bit`) / 8) bytes. This bitmap + // will be used to figure out the percentage we actually read of the blocks. + // + // When this feature is used Tickers::READ_AMP_ESTIMATE_USEFUL_BYTES and + // Tickers::READ_AMP_TOTAL_READ_BYTES can be used to calculate the + // read amplification using this formula + // (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES) + // + // value => memory usage (percentage of loaded blocks memory) + // 1 => 12.50 % + // 2 => 06.25 % + // 4 => 03.12 % + // 8 => 01.56 % + // 16 => 00.78 % + // + // Note: This number must be a power of 2, if not it will be sanitized + // to be the next lowest power of 2, for example a value of 7 will be + // treated as 4, a value of 19 will be treated as 16. + // + // Default: 0 (disabled) + uint32_t read_amp_bytes_per_bit = 0; + // We currently have three versions: // 0 -- This version is currently written out by all RocksDB's versions by // default. Can be read by really old RocksDB's. Doesn't support changing diff --git a/table/block.cc b/table/block.cc index fa14a00f0..32eeeceaf 100644 --- a/table/block.cc +++ b/table/block.cc @@ -16,9 +16,11 @@ #include #include +#include "port/port.h" +#include "port/stack_trace.h" #include "rocksdb/comparator.h" -#include "table/format.h" #include "table/block_prefix_index.h" +#include "table/format.h" #include "util/coding.h" #include "util/logging.h" #include "util/perf_context_imp.h" @@ -344,7 +346,8 @@ uint32_t Block::NumRestarts() const { return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); } -Block::Block(BlockContents&& contents) +Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit, + Statistics* statistics) : contents_(std::move(contents)), data_(contents_.data.data()), size_(contents_.data.size()) { @@ -359,10 +362,14 @@ Block::Block(BlockContents&& contents) size_ = 0; } } + if (read_amp_bytes_per_bit != 0 && statistics && size_ != 0) { + read_amp_bitmap_.reset(new BlockReadAmpBitmap( + restart_offset_, read_amp_bytes_per_bit, statistics)); + } } InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter, - bool total_order_seek) { + bool total_order_seek, Statistics* stats) { if (size_ < 2*sizeof(uint32_t)) { if (iter != nullptr) { iter->SetStatus(Status::Corruption("bad block contents")); @@ -385,10 +392,17 @@ InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter, if (iter != nullptr) { iter->Initialize(cmp, data_, restart_offset_, num_restarts, - prefix_index_ptr); + prefix_index_ptr, read_amp_bitmap_.get()); } else { iter = new BlockIter(cmp, data_, restart_offset_, num_restarts, - prefix_index_ptr); + prefix_index_ptr, read_amp_bitmap_.get()); + } + + if (read_amp_bitmap_) { + if (read_amp_bitmap_->GetStatistics() != stats) { + // DB changed the Statistics pointer, we need to notify read_amp_bitmap_ + read_amp_bitmap_->SetStatistics(stats); + } } } diff --git a/table/block.h b/table/block.h index 81ca2aa41..470e52322 100644 --- a/table/block.h +++ b/table/block.h @@ -20,6 +20,7 @@ #include "db/pinned_iterators_manager.h" #include "rocksdb/iterator.h" #include "rocksdb/options.h" +#include "rocksdb/statistics.h" #include "table/block_prefix_index.h" #include "table/internal_iterator.h" @@ -32,10 +33,119 @@ class Comparator; class BlockIter; class BlockPrefixIndex; +// BlockReadAmpBitmap is a bitmap that map the rocksdb::Block data bytes to +// a bitmap with ratio bytes_per_bit. Whenever we access a range of bytes in +// the Block we update the bitmap and increment READ_AMP_ESTIMATE_USEFUL_BYTES. +class BlockReadAmpBitmap { + public: + explicit BlockReadAmpBitmap(size_t block_size, size_t bytes_per_bit, + Statistics* statistics) + : bitmap_(nullptr), bytes_per_bit_pow_(0), statistics_(statistics) { + assert(block_size > 0 && bytes_per_bit > 0); + + // convert bytes_per_bit to be a power of 2 + while (bytes_per_bit >>= 1) { + bytes_per_bit_pow_++; + } + + // num_bits_needed = ceil(block_size / bytes_per_bit) + size_t num_bits_needed = (block_size >> bytes_per_bit_pow_) + + (block_size % (1 << bytes_per_bit_pow_) != 0); + + // bitmap_size = ceil(num_bits_needed / kBitsPerEntry) + size_t bitmap_size = (num_bits_needed / kBitsPerEntry) + + (num_bits_needed % kBitsPerEntry != 0); + + // Create bitmap and set all the bits to 0 + bitmap_ = new std::atomic[bitmap_size]; + memset(bitmap_, 0, bitmap_size * kBytesPersEntry); + + RecordTick(GetStatistics(), READ_AMP_TOTAL_READ_BYTES, + num_bits_needed << bytes_per_bit_pow_); + } + + ~BlockReadAmpBitmap() { delete[] bitmap_; } + + void Mark(uint32_t start_offset, uint32_t end_offset) { + assert(end_offset >= start_offset); + + // Every new bit we set will bump this counter + uint32_t new_useful_bytes = 0; + // Index of first bit in mask (start_offset / bytes_per_bit) + uint32_t start_bit = start_offset >> bytes_per_bit_pow_; + // Index of last bit in mask (end_offset / bytes_per_bit) + uint32_t end_bit = end_offset >> bytes_per_bit_pow_; + // Index of middle bit (unique to this range) + uint32_t mid_bit = start_bit + 1; + + // It's guaranteed that ranges sent to Mark() wont overlap, this mean that + // we dont need to set the middle bits, we can simply set only one bit of + // the middle bits, and check this bit if we want to know if the whole + // range is set or not. + if (mid_bit < end_bit) { + if (GetAndSet(mid_bit) == 0) { + new_useful_bytes += (end_bit - mid_bit) << bytes_per_bit_pow_; + } else { + // If the middle bit is set, it's guaranteed that start and end bits + // are also set + return; + } + } else { + // This range dont have a middle bit, the whole range fall in 1 or 2 bits + } + + if (GetAndSet(start_bit) == 0) { + new_useful_bytes += (1 << bytes_per_bit_pow_); + } + + if (GetAndSet(end_bit) == 0) { + new_useful_bytes += (1 << bytes_per_bit_pow_); + } + + if (new_useful_bytes > 0) { + RecordTick(GetStatistics(), READ_AMP_ESTIMATE_USEFUL_BYTES, + new_useful_bytes); + } + } + + Statistics* GetStatistics() { + return statistics_.load(std::memory_order_relaxed); + } + + void SetStatistics(Statistics* stats) { statistics_.store(stats); } + + uint32_t GetBytesPerBit() { return 1 << bytes_per_bit_pow_; } + + private: + // Get the current value of bit at `bit_idx` and set it to 1 + inline bool GetAndSet(uint32_t bit_idx) { + const uint32_t byte_idx = bit_idx / kBitsPerEntry; + const uint32_t bit_mask = 1 << (bit_idx % kBitsPerEntry); + + return bitmap_[byte_idx].fetch_or(bit_mask, std::memory_order_relaxed) & + bit_mask; + } + + const uint32_t kBytesPersEntry = sizeof(uint32_t); // 4 bytes + const uint32_t kBitsPerEntry = kBytesPersEntry * 8; // 32 bits + + // Bitmap used to record the bytes that we read, use atomic to protect + // against multiple threads updating the same bit + std::atomic* bitmap_; + // (1 << bytes_per_bit_pow_) is bytes_per_bit. Use power of 2 to optimize + // muliplication and division + uint8_t bytes_per_bit_pow_; + // Pointer to DB Statistics object, Since this bitmap may outlive the DB + // this pointer maybe invalid, but the DB will update it to a valid pointer + // by using SetStatistics() before calling Mark() + std::atomic statistics_; +}; + class Block { public: // Initialize the block with the specified contents. - explicit Block(BlockContents&& contents); + explicit Block(BlockContents&& contents, size_t read_amp_bytes_per_bit = 0, + Statistics* statistics = nullptr); ~Block() = default; @@ -70,7 +180,8 @@ class Block { // and prefix_index_ are null, so this option does not matter. InternalIterator* NewIterator(const Comparator* comparator, BlockIter* iter = nullptr, - bool total_order_seek = true); + bool total_order_seek = true, + Statistics* stats = nullptr); void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index); // Report an approximation of how much memory has been used. @@ -82,6 +193,7 @@ class Block { size_t size_; // contents_.data.size() uint32_t restart_offset_; // Offset in data_ of restart array std::unique_ptr prefix_index_; + std::unique_ptr read_amp_bitmap_; // No copying allowed Block(const Block&); @@ -99,17 +211,22 @@ class BlockIter : public InternalIterator { restart_index_(0), status_(Status::OK()), prefix_index_(nullptr), - key_pinned_(false) {} + key_pinned_(false), + read_amp_bitmap_(nullptr), + last_bitmap_offset_(0) {} BlockIter(const Comparator* comparator, const char* data, uint32_t restarts, - uint32_t num_restarts, BlockPrefixIndex* prefix_index) + uint32_t num_restarts, BlockPrefixIndex* prefix_index, + BlockReadAmpBitmap* read_amp_bitmap) : BlockIter() { - Initialize(comparator, data, restarts, num_restarts, prefix_index); + Initialize(comparator, data, restarts, num_restarts, prefix_index, + read_amp_bitmap); } void Initialize(const Comparator* comparator, const char* data, uint32_t restarts, uint32_t num_restarts, - BlockPrefixIndex* prefix_index) { + BlockPrefixIndex* prefix_index, + BlockReadAmpBitmap* read_amp_bitmap) { assert(data_ == nullptr); // Ensure it is called only once assert(num_restarts > 0); // Ensure the param is valid @@ -120,6 +237,8 @@ class BlockIter : public InternalIterator { current_ = restarts_; restart_index_ = num_restarts_; prefix_index_ = prefix_index; + read_amp_bitmap_ = read_amp_bitmap; + last_bitmap_offset_ = current_ + 1; } void SetStatus(Status s) { @@ -134,6 +253,12 @@ class BlockIter : public InternalIterator { } virtual Slice value() const override { assert(Valid()); + if (read_amp_bitmap_ && current_ < restarts_ && + current_ != last_bitmap_offset_) { + read_amp_bitmap_->Mark(current_ /* current entry offset */, + NextEntryOffset() - 1); + last_bitmap_offset_ = current_; + } return value_; } @@ -164,6 +289,8 @@ class BlockIter : public InternalIterator { virtual bool IsValuePinned() const override { return true; } + size_t TEST_CurrentEntrySize() { return NextEntryOffset() - current_; } + private: const Comparator* comparator_; const char* data_; // underlying block contents @@ -179,6 +306,11 @@ class BlockIter : public InternalIterator { BlockPrefixIndex* prefix_index_; bool key_pinned_; + // read-amp bitmap + BlockReadAmpBitmap* read_amp_bitmap_; + // last `current_` value we report to read-amp bitmp + mutable uint32_t last_bitmap_offset_; + struct CachedPrevEntry { explicit CachedPrevEntry(uint32_t _offset, const char* _key_ptr, size_t _key_offset, size_t _key_size, Slice _value) diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index edb886cee..49dee3ddd 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -66,14 +66,16 @@ namespace { Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, std::unique_ptr* result, - const ImmutableCFOptions &ioptions, - bool do_uncompress, const Slice& compression_dict, - const PersistentCacheOptions& cache_options) { + const ImmutableCFOptions& ioptions, bool do_uncompress, + const Slice& compression_dict, + const PersistentCacheOptions& cache_options, + size_t read_amp_bytes_per_bit) { BlockContents contents; Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions, do_uncompress, compression_dict, cache_options); if (s.ok()) { - result->reset(new Block(std::move(contents))); + result->reset(new Block(std::move(contents), read_amp_bytes_per_bit, + ioptions.statistics)); } return s; @@ -188,7 +190,8 @@ class BinarySearchIndexReader : public IndexReader { std::unique_ptr index_block; auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - Slice() /*compression dict*/, cache_options); + Slice() /*compression dict*/, cache_options, + 0 /* read_amp_bytes_per_bit */); if (s.ok()) { *index_reader = new BinarySearchIndexReader( @@ -227,17 +230,20 @@ class BinarySearchIndexReader : public IndexReader { // key. class HashIndexReader : public IndexReader { public: - static Status Create( - const SliceTransform* hash_key_extractor, const Footer& footer, - RandomAccessFileReader* file, const ImmutableCFOptions &ioptions, - const Comparator* comparator, const BlockHandle& index_handle, - InternalIterator* meta_index_iter, IndexReader** index_reader, - bool hash_index_allow_collision, - const PersistentCacheOptions& cache_options) { + static Status Create(const SliceTransform* hash_key_extractor, + const Footer& footer, RandomAccessFileReader* file, + const ImmutableCFOptions& ioptions, + const Comparator* comparator, + const BlockHandle& index_handle, + InternalIterator* meta_index_iter, + IndexReader** index_reader, + bool hash_index_allow_collision, + const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - Slice() /*compression dict*/, cache_options); + Slice() /*compression dict*/, cache_options, + 0 /* read_amp_bytes_per_bit */); if (!s.ok()) { return s; @@ -791,7 +797,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, rep->file.get(), rep->footer, ReadOptions(), rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, Slice() /*compression dict*/, - rep->persistent_cache_options); + rep->persistent_cache_options, 0 /* read_amp_bytes_per_bit */); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log, @@ -809,9 +815,9 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, Status BlockBasedTable::GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, - const ImmutableCFOptions &ioptions, const ReadOptions& read_options, + const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, - const Slice& compression_dict) { + const Slice& compression_dict, size_t read_amp_bytes_per_bit) { Status s; Block* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -861,7 +867,8 @@ Status BlockBasedTable::GetDataBlockFromCache( // Insert uncompressed block into block cache if (s.ok()) { - block->value = new Block(std::move(contents)); // uncompressed block + block->value = new Block(std::move(contents), read_amp_bytes_per_bit, + statistics); // uncompressed block assert(block->value->compression_type() == kNoCompression); if (block_cache != nullptr && block->value->cachable() && read_options.fill_cache) { @@ -886,9 +893,9 @@ Status BlockBasedTable::GetDataBlockFromCache( Status BlockBasedTable::PutDataBlockToCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, - const ReadOptions& read_options, const ImmutableCFOptions &ioptions, + const ReadOptions& read_options, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, - const Slice& compression_dict) { + const Slice& compression_dict, size_t read_amp_bytes_per_bit) { assert(raw_block->compression_type() == kNoCompression || block_cache_compressed != nullptr); @@ -906,7 +913,8 @@ Status BlockBasedTable::PutDataBlockToCache( } if (raw_block->compression_type() != kNoCompression) { - block->value = new Block(std::move(contents)); // uncompressed block + block->value = new Block(std::move(contents), read_amp_bytes_per_bit, + statistics); // compressed block } else { block->value = raw_block; raw_block = nullptr; @@ -1206,8 +1214,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } s = GetDataBlockFromCache( - key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, &block, - rep->table_options.format_version, compression_dict); + key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, + &block, rep->table_options.format_version, compression_dict, + rep->table_options.read_amp_bytes_per_bit); if (block.value == nullptr && !no_io && ro.fill_cache) { std::unique_ptr raw_block; @@ -1216,14 +1225,15 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions, block_cache_compressed == nullptr, - compression_dict, rep->persistent_cache_options); + compression_dict, rep->persistent_cache_options, + rep->table_options.read_amp_bytes_per_bit); } if (s.ok()) { - s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, - ro, rep->ioptions, &block, raw_block.release(), - rep->table_options.format_version, - compression_dict); + s = PutDataBlockToCache( + key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, + &block, raw_block.release(), rep->table_options.format_version, + compression_dict, rep->table_options.read_amp_bytes_per_bit); } } } @@ -1242,7 +1252,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( std::unique_ptr block_value; s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, &block_value, rep->ioptions, true /* compress */, - compression_dict, rep->persistent_cache_options); + compression_dict, rep->persistent_cache_options, + rep->table_options.read_amp_bytes_per_bit); if (s.ok()) { block.value = block_value.release(); } @@ -1251,7 +1262,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( InternalIterator* iter; if (s.ok()) { assert(block.value != nullptr); - iter = block.value->NewIterator(&rep->internal_comparator, input_iter); + iter = block.value->NewIterator(&rep->internal_comparator, input_iter, true, + rep->ioptions.statistics); if (block.cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, block.cache_handle); @@ -1607,12 +1619,12 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, handle, cache_key_storage); Slice ckey; - s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, - rep_->ioptions, options, &block, - rep_->table_options.format_version, - rep_->compression_dict_block - ? rep_->compression_dict_block->data - : Slice()); + s = GetDataBlockFromCache( + cache_key, ckey, block_cache, nullptr, rep_->ioptions, options, &block, + rep_->table_options.format_version, + rep_->compression_dict_block ? rep_->compression_dict_block->data + : Slice(), + 0 /* read_amp_bytes_per_bit */); assert(s.ok()); bool in_cache = block.value != nullptr; if (in_cache) { diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index dbfa755a8..41c33c33c 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -190,9 +190,9 @@ class BlockBasedTable : public TableReader { static Status GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, - const ImmutableCFOptions &ioptions, const ReadOptions& read_options, + const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, - const Slice& compression_dict); + const Slice& compression_dict, size_t read_amp_bytes_per_bit); // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then @@ -207,9 +207,9 @@ class BlockBasedTable : public TableReader { static Status PutDataBlockToCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, - const ReadOptions& read_options, const ImmutableCFOptions &ioptions, + const ReadOptions& read_options, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, - const Slice& compression_dict); + const Slice& compression_dict, size_t read_amp_bytes_per_bit); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/table/block_test.cc b/table/block_test.cc index 424df87a3..cc074c3a3 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -4,7 +4,11 @@ // of patent rights can be found in the PATENTS file in the same directory. // #include +#include +#include #include +#include +#include #include #include "db/dbformat.h" @@ -216,6 +220,259 @@ TEST_F(BlockTest, IndexHashWithSharedPrefix) { CheckBlockContents(std::move(contents), kMaxKey, keys, values); } +// A slow and accurate version of BlockReadAmpBitmap that simply store +// all the marked ranges in a set. +class BlockReadAmpBitmapSlowAndAccurate { + public: + void Mark(size_t start_offset, size_t end_offset) { + assert(end_offset >= start_offset); + + marked_ranges_.emplace(end_offset, start_offset); + } + + // Return true if any byte in this range was Marked + bool IsAnyInRangeMarked(size_t start_offset, size_t end_offset) { + auto it = marked_ranges_.lower_bound(std::make_pair(start_offset, 0)); + if (it == marked_ranges_.end()) { + return false; + } + return start_offset <= it->first && end_offset >= it->second; + } + + private: + std::set> marked_ranges_ = {}; +}; + +TEST_F(BlockTest, BlockReadAmpBitmap) { + std::vector block_sizes = { + 1, // 1 byte + 32, // 32 bytes + 61, // 61 bytes + 64, // 64 bytes + 512, // 0.5 KB + 1024, // 1 KB + 1024 * 4, // 4 KB + 1024 * 10, // 10 KB + 1024 * 50, // 50 KB + 1024 * 1024, // 1 MB + 1024 * 1024 * 4, // 4 MB + 1024 * 1024 * 50, // 10 MB + 777, + 124653, + }; + const size_t kBytesPerBit = 64; + + Random rnd(301); + for (size_t block_size : block_sizes) { + std::shared_ptr stats = rocksdb::CreateDBStatistics(); + BlockReadAmpBitmap read_amp_bitmap(block_size, kBytesPerBit, stats.get()); + BlockReadAmpBitmapSlowAndAccurate read_amp_slow_and_accurate; + + size_t needed_bits = (block_size / kBytesPerBit); + if (block_size % kBytesPerBit != 0) { + needed_bits++; + } + size_t bitmap_size = needed_bits / 32; + if (needed_bits % 32 != 0) { + bitmap_size++; + } + size_t bits_in_bitmap = bitmap_size * 32; + + ASSERT_EQ(stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES), + needed_bits * kBytesPerBit); + + // Generate some random entries + std::vector random_entry_offsets; + for (int i = 0; i < 1000; i++) { + random_entry_offsets.push_back(rnd.Next() % block_size); + } + std::sort(random_entry_offsets.begin(), random_entry_offsets.end()); + auto it = + std::unique(random_entry_offsets.begin(), random_entry_offsets.end()); + random_entry_offsets.resize( + std::distance(random_entry_offsets.begin(), it)); + + std::vector> random_entries; + for (size_t i = 0; i < random_entry_offsets.size(); i++) { + size_t entry_start = random_entry_offsets[i]; + size_t entry_end; + if (i + 1 < random_entry_offsets.size()) { + entry_end = random_entry_offsets[i + 1] - 1; + } else { + entry_end = block_size - 1; + } + random_entries.emplace_back(entry_start, entry_end); + } + + for (size_t i = 0; i < random_entries.size(); i++) { + auto ¤t_entry = random_entries[rnd.Next() % random_entries.size()]; + + read_amp_bitmap.Mark(current_entry.first, current_entry.second); + read_amp_slow_and_accurate.Mark(current_entry.first, + current_entry.second); + + size_t total_bits = 0; + for (size_t bit_idx = 0; bit_idx < bits_in_bitmap; bit_idx++) { + size_t start_rng = bit_idx * kBytesPerBit; + size_t end_rng = (start_rng + kBytesPerBit) - 1; + + total_bits += + read_amp_slow_and_accurate.IsAnyInRangeMarked(start_rng, end_rng); + } + size_t expected_estimate_useful = total_bits * kBytesPerBit; + size_t got_estimate_useful = + stats->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES); + + ASSERT_EQ(expected_estimate_useful, got_estimate_useful); + } + } +} + +TEST_F(BlockTest, BlockWithReadAmpBitmap) { + Random rnd(301); + Options options = Options(); + std::unique_ptr ic; + ic.reset(new test::PlainInternalKeyComparator(options.comparator)); + + std::vector keys; + std::vector values; + BlockBuilder builder(16); + int num_records = 10000; + + GenerateRandomKVs(&keys, &values, 0, num_records, 1); + // add a bunch of records to a block + for (int i = 0; i < num_records; i++) { + builder.Add(keys[i], values[i]); + } + + Slice rawblock = builder.Finish(); + const size_t kBytesPerBit = 8; + + // Read the block sequentially using Next() + { + std::shared_ptr stats = rocksdb::CreateDBStatistics(); + + // create block reader + BlockContents contents; + contents.data = rawblock; + contents.cachable = true; + Block reader(std::move(contents), kBytesPerBit, stats.get()); + + // read contents of block sequentially + size_t read_bytes = 0; + BlockIter *iter = static_cast( + reader.NewIterator(options.comparator, nullptr, true, stats.get())); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + iter->value(); + read_bytes += iter->TEST_CurrentEntrySize(); + + double semi_acc_read_amp = + static_cast(read_bytes) / rawblock.size(); + double read_amp = static_cast(stats->getTickerCount( + READ_AMP_ESTIMATE_USEFUL_BYTES)) / + stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + // Error in read amplification will be less than 1% if we are reading + // sequentially + double error_pct = fabs(semi_acc_read_amp - read_amp) * 100; + EXPECT_LT(error_pct, 1); + } + + delete iter; + } + + // Read the block sequentially using Seek() + { + std::shared_ptr stats = rocksdb::CreateDBStatistics(); + + // create block reader + BlockContents contents; + contents.data = rawblock; + contents.cachable = true; + Block reader(std::move(contents), kBytesPerBit, stats.get()); + + size_t read_bytes = 0; + BlockIter *iter = static_cast( + reader.NewIterator(options.comparator, nullptr, true, stats.get())); + for (int i = 0; i < num_records; i++) { + Slice k(keys[i]); + + // search in block for this key + iter->Seek(k); + iter->value(); + read_bytes += iter->TEST_CurrentEntrySize(); + + double semi_acc_read_amp = + static_cast(read_bytes) / rawblock.size(); + double read_amp = static_cast(stats->getTickerCount( + READ_AMP_ESTIMATE_USEFUL_BYTES)) / + stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + // Error in read amplification will be less than 1% if we are reading + // sequentially + double error_pct = fabs(semi_acc_read_amp - read_amp) * 100; + EXPECT_LT(error_pct, 1); + } + delete iter; + } + + // Read the block randomly + { + std::shared_ptr stats = rocksdb::CreateDBStatistics(); + + // create block reader + BlockContents contents; + contents.data = rawblock; + contents.cachable = true; + Block reader(std::move(contents), kBytesPerBit, stats.get()); + + size_t read_bytes = 0; + BlockIter *iter = static_cast( + reader.NewIterator(options.comparator, nullptr, true, stats.get())); + std::unordered_set read_keys; + for (int i = 0; i < num_records; i++) { + int index = rnd.Uniform(num_records); + Slice k(keys[index]); + + iter->Seek(k); + iter->value(); + if (read_keys.find(index) == read_keys.end()) { + read_keys.insert(index); + read_bytes += iter->TEST_CurrentEntrySize(); + } + + double semi_acc_read_amp = + static_cast(read_bytes) / rawblock.size(); + double read_amp = static_cast(stats->getTickerCount( + READ_AMP_ESTIMATE_USEFUL_BYTES)) / + stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES); + + double error_pct = fabs(semi_acc_read_amp - read_amp) * 100; + // Error in read amplification will be less than 2% if we are reading + // randomly + EXPECT_LT(error_pct, 2); + } + delete iter; + } +} + +TEST_F(BlockTest, ReadAmpBitmapPow2) { + std::shared_ptr stats = rocksdb::CreateDBStatistics(); + ASSERT_EQ(BlockReadAmpBitmap(100, 1, stats.get()).GetBytesPerBit(), 1); + ASSERT_EQ(BlockReadAmpBitmap(100, 2, stats.get()).GetBytesPerBit(), 2); + ASSERT_EQ(BlockReadAmpBitmap(100, 4, stats.get()).GetBytesPerBit(), 4); + ASSERT_EQ(BlockReadAmpBitmap(100, 8, stats.get()).GetBytesPerBit(), 8); + ASSERT_EQ(BlockReadAmpBitmap(100, 16, stats.get()).GetBytesPerBit(), 16); + ASSERT_EQ(BlockReadAmpBitmap(100, 32, stats.get()).GetBytesPerBit(), 32); + + ASSERT_EQ(BlockReadAmpBitmap(100, 3, stats.get()).GetBytesPerBit(), 2); + ASSERT_EQ(BlockReadAmpBitmap(100, 7, stats.get()).GetBytesPerBit(), 4); + ASSERT_EQ(BlockReadAmpBitmap(100, 11, stats.get()).GetBytesPerBit(), 8); + ASSERT_EQ(BlockReadAmpBitmap(100, 17, stats.get()).GetBytesPerBit(), 16); + ASSERT_EQ(BlockReadAmpBitmap(100, 33, stats.get()).GetBytesPerBit(), 32); + ASSERT_EQ(BlockReadAmpBitmap(100, 35, stats.get()).GetBytesPerBit(), 32); +} + } // namespace rocksdb int main(int argc, char **argv) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 198e2ddb6..1888b060a 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -384,6 +384,10 @@ DEFINE_int32(index_block_restart_interval, "Number of keys between restart points " "for delta encoding of keys in index block."); +DEFINE_int32(read_amp_bytes_per_bit, + rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit, + "Number of bytes per bit to be used in block read-amp bitmap"); + DEFINE_int64(compressed_cache_size, -1, "Number of bytes to use as a cache of compressed data."); @@ -2805,6 +2809,7 @@ class Benchmark { block_based_options.skip_table_builder_flush = FLAGS_skip_table_builder_flush; block_based_options.format_version = 2; + block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit; options.table_factory.reset( NewBlockBasedTableFactory(block_based_options)); } diff --git a/util/options_helper.h b/util/options_helper.h index 221f6ee6d..ea437fa66 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -558,7 +558,10 @@ static std::unordered_map OptionType::kUInt32T, OptionVerificationType::kNormal}}, {"verify_compression", {offsetof(struct BlockBasedTableOptions, verify_compression), - OptionType::kBoolean, OptionVerificationType::kNormal}}}; + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"read_amp_bytes_per_bit", + {offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit), + OptionType::kSizeT, OptionVerificationType::kNormal}}}; static std::unordered_map plain_table_type_info = { {"user_key_len", diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index 320bd6657..0bd07f9f1 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -159,7 +159,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { "filter_policy=bloomfilter:4:true;whole_key_filtering=1;" "skip_table_builder_flush=1;format_version=1;" "hash_index_allow_collision=false;" - "verify_compression=true;", + "verify_compression=true;read_amp_bytes_per_bit=0", new_bbto)); ASSERT_EQ(unset_bytes_base,