From a08c8c851a77520de5ae86c99b03920cc5347cc9 Mon Sep 17 00:00:00 2001 From: krad Date: Tue, 15 Dec 2015 18:20:10 -0800 Subject: [PATCH] Added PersistentCache abstraction Summary: Added a new abstraction to cache page to RocksDB designed for the read cache use. RocksDB current block cache is more of an object cache. For the persistent read cache project, what we need is a page cache equivalent. This changes adds a cache abstraction to RocksDB to cache pages called PersistentCache. PersistentCache can cache uncompressed pages or raw pages (content as in filesystem). The user can choose to operate PersistentCache either in COMPRESSED or UNCOMPRESSED mode. Blame Rev: Test Plan: Run unit tests Reviewers: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D55707 --- CMakeLists.txt | 1 + db/db_test2.cc | 126 +++++++++++++++++++++++++++++ include/rocksdb/options.h | 1 + include/rocksdb/persistent_cache.h | 49 +++++++++++ include/rocksdb/statistics.h | 5 ++ include/rocksdb/table.h | 7 +- src.mk | 3 +- table/block_based_table_reader.cc | 118 +++++++++++++++++---------- table/block_based_table_reader.h | 14 ++-- table/format.cc | 105 ++++++++++++++++++------ table/format.h | 15 ++-- table/meta_blocks.cc | 11 +-- table/persistent_cache_helper.cc | 112 +++++++++++++++++++++++++ table/persistent_cache_helper.h | 63 +++++++++++++++ tools/db_bench_tool.cc | 2 +- util/io_posix.cc | 1 + util/options_settable_test.cc | 2 + 17 files changed, 548 insertions(+), 87 deletions(-) create mode 100644 include/rocksdb/persistent_cache.h create mode 100644 table/persistent_cache_helper.cc create mode 100644 table/persistent_cache_helper.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 7face2c3d..b17b8fb9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -183,6 +183,7 @@ set(SOURCES table/plain_table_index.cc table/plain_table_key_coding.cc table/plain_table_reader.cc + persistent_cache_helper.cc table/table_properties.cc table/two_level_iterator.cc tools/sst_dump_tool.cc diff --git a/db/db_test2.cc b/db/db_test2.cc index 8657c3036..4c80bb446 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -7,8 +7,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include + #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/persistent_cache.h" #include "rocksdb/wal_filter.h" namespace rocksdb { @@ -1024,7 +1026,131 @@ TEST_P(PinL0IndexAndFilterBlocksTest, INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest, PinL0IndexAndFilterBlocksTest, ::testing::Bool()); +#ifndef ROCKSDB_LITE +static void UniqueIdCallback(void* arg) { + int* result = reinterpret_cast(arg); + if (*result == -1) { + *result = 0; + } + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); +} + +class MockPersistentCache : public PersistentCache { + public: + explicit MockPersistentCache(const bool is_compressed, const size_t max_size) + : is_compressed_(is_compressed), max_size_(max_size) { + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); + } + + virtual ~MockPersistentCache() {} + + Status Insert(const Slice& page_key, const char* data, + const size_t size) override { + MutexLock _(&lock_); + + if (size_ > max_size_) { + size_ -= data_.begin()->second.size(); + data_.erase(data_.begin()); + } + + data_.insert(std::make_pair(page_key.ToString(), std::string(data, size))); + size_ += size; + return Status::OK(); + } + + Status Lookup(const Slice& page_key, std::unique_ptr* data, + size_t* size) override { + MutexLock _(&lock_); + auto it = data_.find(page_key.ToString()); + if (it == data_.end()) { + return Status::NotFound(); + } + + assert(page_key.ToString() == it->first); + data->reset(new char[it->second.size()]); + memcpy(data->get(), it->second.c_str(), it->second.size()); + *size = it->second.size(); + return Status::OK(); + } + + bool IsCompressed() override { return is_compressed_; } + + port::Mutex lock_; + std::map data_; + const bool is_compressed_ = true; + size_t size_ = 0; + const size_t max_size_ = 10 * 1024; // 10KiB +}; + +TEST_F(DBTest2, PersistentCache) { + int num_iter = 80; + + Options options; + options.write_buffer_size = 64 * 1024; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + options = CurrentOptions(options); + + auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024}; + auto types = {/*compressed*/ 1, /*uncompressed*/ 0}; + for (auto bsize : bsizes) { + for (auto type : types) { + BlockBasedTableOptions table_options; + table_options.persistent_cache.reset( + new MockPersistentCache(type, 10 * 1024)); + table_options.no_block_cache = true; + table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr; + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + // default column family doesn't have block cache + Options no_block_cache_opts; + no_block_cache_opts.statistics = options.statistics; + no_block_cache_opts = CurrentOptions(no_block_cache_opts); + BlockBasedTableOptions table_options_no_bc; + table_options_no_bc.no_block_cache = true; + no_block_cache_opts.table_factory.reset( + NewBlockBasedTableFactory(table_options_no_bc)); + ReopenWithColumnFamilies( + {"default", "pikachu"}, + std::vector({no_block_cache_opts, options})); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + std::vector values; + std::string str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = RandomString(&rnd, 1000); + } + values.push_back(str); + ASSERT_OK(Put(1, Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + ASSERT_OK(Flush(1)); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(1, Key(i)), values[i]); + } + + auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT); + auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS); + + ASSERT_GT(hit, 0); + ASSERT_GT(miss, 0); + } + } +} +#endif } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 538671824..c7afd1a84 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1613,6 +1613,7 @@ struct CompactRangeOptions { BottommostLevelCompaction bottommost_level_compaction = BottommostLevelCompaction::kIfHaveCompactionFilter; }; + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/persistent_cache.h b/include/rocksdb/persistent_cache.h new file mode 100644 index 000000000..ef49da5ab --- /dev/null +++ b/include/rocksdb/persistent_cache.h @@ -0,0 +1,49 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include + +#include "rocksdb/slice.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +// PersistentCache +// +// Persistent cache interface for caching IO pages on a persistent medium. The +// cache interface is specifically designed for persistent read cache. +class PersistentCache { + public: + virtual ~PersistentCache() {} + + // Insert to page cache + // + // page_key Identifier to identify a page uniquely across restarts + // data Page data + // size Size of the page + virtual Status Insert(const Slice& key, const char* data, + const size_t size) = 0; + + // Lookup page cache by page identifier + // + // page_key Page identifier + // buf Buffer where the data should be copied + // size Size of the page + virtual Status Lookup(const Slice& key, std::unique_ptr* data, + size_t* size) = 0; + + // Is cache storing uncompressed data ? + // + // True if the cache is configured to store uncompressed data else false + virtual bool IsCompressed() = 0; +}; + +} // namespace rocksdb diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c832516da..2f14c444b 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -54,6 +54,11 @@ enum Tickers : uint32_t { // # of times bloom filter has avoided file reads. BLOOM_FILTER_USEFUL, + // # persistent cache hit + PERSISTENT_CACHE_HIT, + // # persistent cache miss + PERSISTENT_CACHE_MISS, + // # of memtable hits. MEMTABLE_HIT, // # of memtable misses. diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index b066b67d9..ed9060bd5 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -21,15 +21,16 @@ #include #include "rocksdb/env.h" +#include "rocksdb/immutable_options.h" #include "rocksdb/iterator.h" #include "rocksdb/options.h" -#include "rocksdb/immutable_options.h" #include "rocksdb/status.h" namespace rocksdb { // -- Block-based Table class FlushBlockPolicyFactory; +class PersistentCache; class RandomAccessFile; struct TableReaderOptions; struct TableBuilderOptions; @@ -103,6 +104,10 @@ struct BlockBasedTableOptions { // If NULL, rocksdb will automatically create and use an 8MB internal cache. std::shared_ptr block_cache = nullptr; + // If non-NULL use the specified cache for pages read from device + // IF NULL, no page cache is used + std::shared_ptr persistent_cache = nullptr; + // If non-NULL use the specified cache for compressed blocks. // If NULL, rocksdb will not use a compressed block cache. std::shared_ptr block_cache_compressed = nullptr; diff --git a/src.mk b/src.mk index db6551241..d18a0f67a 100644 --- a/src.mk +++ b/src.mk @@ -82,6 +82,7 @@ LIB_SOURCES = \ table/plain_table_index.cc \ table/plain_table_key_coding.cc \ table/plain_table_reader.cc \ + table/persistent_cache_helper.cc \ table/table_properties.cc \ table/two_level_iterator.cc \ tools/dump/db_dump_tool.cc \ @@ -103,7 +104,7 @@ LIB_SOURCES = \ util/io_posix.cc \ util/threadpool.cc \ util/transaction_test_util.cc \ - util/sst_file_manager_impl.cc \ + util/sst_file_manager_impl.cc \ util/file_util.cc \ util/file_reader_writer.cc \ util/filter_policy.cc \ diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 82c9903fa..fa5c83966 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -25,17 +25,18 @@ #include "rocksdb/table_properties.h" #include "table/block.h" -#include "table/filter_block.h" #include "table/block_based_filter_block.h" #include "table/block_based_table_factory.h" -#include "table/full_filter_block.h" #include "table/block_hash_index.h" #include "table/block_prefix_index.h" +#include "table/filter_block.h" #include "table/format.h" +#include "table/full_filter_block.h" +#include "table/get_context.h" #include "table/internal_iterator.h" #include "table/meta_blocks.h" +#include "table/persistent_cache_helper.h" #include "table/two_level_iterator.h" -#include "table/get_context.h" #include "util/coding.h" #include "util/file_reader_writer.h" @@ -53,13 +54,6 @@ using std::unique_ptr; typedef BlockBasedTable::IndexReader IndexReader; namespace { -// The longest the prefix of the cache key used to identify blocks can be. -// We are using the fact that we know for Posix files the unique ID is three -// varints. -// For some reason, compiling for iOS complains that this variable is unused -const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) = - kMaxVarint64Length * 3 + 1; - // Read the block identified by "handle" from "file". // The only relevant option is options.verify_checksums for now. // On failure return non-OK. @@ -69,11 +63,13 @@ const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) = Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, std::unique_ptr* result, Env* env, - bool do_uncompress = true, - const Slice& compression_dict = Slice()) { + bool do_uncompress, const Slice& compression_dict, + const PersistentCacheOptions& cache_options, + Logger* info_log) { BlockContents contents; Status s = ReadBlockContents(file, footer, options, handle, &contents, env, - do_uncompress, compression_dict); + do_uncompress, compression_dict, cache_options, + info_log); if (s.ok()) { result->reset(new Block(std::move(contents))); } @@ -106,18 +102,12 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix, char* cache_key) { assert(cache_key != nullptr); assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + assert(cache_key_prefix_size <= BlockBasedTable::kMaxCacheKeyPrefixSize); memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset); return Slice(cache_key, static_cast(end - cache_key)); } -Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size, - const BlockHandle& handle, char* cache_key) { - return GetCacheKeyFromOffset(cache_key_prefix, cache_key_prefix_size, - handle.offset(), cache_key); -} - Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, Tickers block_cache_miss_ticker, Tickers block_cache_hit_ticker, @@ -183,11 +173,13 @@ class BinarySearchIndexReader : public IndexReader { // unmodified. static Status Create(RandomAccessFileReader* file, const Footer& footer, const BlockHandle& index_handle, Env* env, - const Comparator* comparator, - IndexReader** index_reader) { + const Comparator* comparator, IndexReader** index_reader, + const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, - &index_block, env); + &index_block, env, true /* decompress */, + Slice() /*compression dict*/, cache_options, + /*info_log*/ nullptr); if (s.ok()) { *index_reader = @@ -231,10 +223,13 @@ class HashIndexReader : public IndexReader { const BlockHandle& index_handle, InternalIterator* meta_index_iter, IndexReader** index_reader, - bool hash_index_allow_collision) { + bool hash_index_allow_collision, + const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, - &index_block, env); + &index_block, env, true /* decompress */, + Slice() /*compression dict*/, cache_options, + /*info_log*/ nullptr); if (!s.ok()) { return s; @@ -269,14 +264,15 @@ class HashIndexReader : public IndexReader { // Read contents for the blocks BlockContents prefixes_contents; s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle, - &prefixes_contents, env, true /* do decompression */); + &prefixes_contents, env, true /* decompress */, + Slice() /*compression dict*/, cache_options); if (!s.ok()) { return s; } BlockContents prefixes_meta_contents; s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle, - &prefixes_meta_contents, env, - true /* do decompression */); + &prefixes_meta_contents, env, true /* decompress */, + Slice() /*compression dict*/, cache_options); if (!s.ok()) { // TODO: log error return Status::OK(); @@ -388,10 +384,13 @@ struct BlockBasedTable::Rep { unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t cache_key_prefix_size = 0; + char persistent_cache_key_prefix[kMaxCacheKeyPrefixSize]; + size_t persistent_cache_key_prefix_size = 0; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size = 0; uint64_t dummy_index_reader_offset = 0; // ID that is unique for the block cache. + PersistentCacheOptions persistent_cache_options; // Footer contains the fixed table information Footer footer; @@ -451,6 +450,11 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { rep->dummy_index_reader_offset = file_size + rep->table_options.block_cache->NewId(); } + if (rep->table_options.persistent_cache != nullptr) { + GenerateCachePrefix(/*cache=*/nullptr, rep->file->file(), + &rep->persistent_cache_key_prefix[0], + &rep->persistent_cache_key_prefix_size); + } if (rep->table_options.block_cache_compressed != nullptr) { GenerateCachePrefix(rep->table_options.block_cache_compressed.get(), rep->file->file(), &rep->compressed_cache_key_prefix[0], @@ -466,7 +470,7 @@ void BlockBasedTable::GenerateCachePrefix(Cache* cc, // If the prefix wasn't generated or was too long, // create one from the cache. - if (*size == 0) { + if (cc && *size == 0) { char* end = EncodeVarint64(buffer, cc->NewId()); *size = static_cast(end - buffer); } @@ -507,6 +511,18 @@ bool IsFeatureSupported(const TableProperties& table_properties, } } // namespace +Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, + size_t cache_key_prefix_size, + const BlockHandle& handle, char* cache_key) { + assert(cache_key != nullptr); + assert(cache_key_prefix_size != 0); + assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); + char* end = + EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset()); + return Slice(cache_key, static_cast(end - cache_key)); +} + Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, const EnvOptions& env_options, const BlockBasedTableOptions& table_options, @@ -541,6 +557,13 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, SetupCacheKeyPrefix(rep, file_size); unique_ptr new_table(new BlockBasedTable(rep)); + // page cache options + rep->persistent_cache_options = + PersistentCacheOptions(rep->table_options.persistent_cache, + std::string(rep->persistent_cache_key_prefix, + rep->persistent_cache_key_prefix_size), + rep->ioptions.statistics); + // Read meta index std::unique_ptr meta; std::unique_ptr meta_iter; @@ -736,12 +759,10 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, // TODO: we never really verify check sum for meta index block std::unique_ptr meta; Status s = ReadBlockFromFile( - rep->file.get(), - rep->footer, - ReadOptions(), - rep->footer.metaindex_handle(), - &meta, - rep->ioptions.env); + rep->file.get(), rep->footer, ReadOptions(), + rep->footer.metaindex_handle(), &meta, rep->ioptions.env, + true /* decompress */, Slice() /*compression dict*/, + rep->persistent_cache_options, rep->ioptions.info_log); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log, @@ -908,7 +929,9 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep, size_t* filter_size) { BlockContents block; if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(), rep->filter_handle, &block, rep->ioptions.env, - false).ok()) { + false /* decompress */, Slice() /*compression dict*/, + rep->persistent_cache_options) + .ok()) { // Error reading the block return nullptr; } @@ -1148,7 +1171,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions.env, block_cache_compressed == nullptr, - compression_dict); + compression_dict, rep->persistent_cache_options, + rep->ioptions.info_log); } if (s.ok()) { @@ -1173,8 +1197,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } std::unique_ptr block_value; s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, - &block_value, rep->ioptions.env, - true /* do_uncompress */, compression_dict); + &block_value, rep->ioptions.env, true /* compress */, + compression_dict, rep->persistent_cache_options, + rep->ioptions.info_log); if (s.ok()) { block.value = block_value.release(); } @@ -1546,7 +1571,8 @@ Status BlockBasedTable::CreateIndexReader( switch (index_type_on_file) { case BlockBasedTableOptions::kBinarySearch: { return BinarySearchIndexReader::Create( - file, footer, footer.index_handle(), env, comparator, index_reader); + file, footer, footer.index_handle(), env, comparator, index_reader, + rep_->persistent_cache_options); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr meta_guard; @@ -1561,7 +1587,8 @@ Status BlockBasedTable::CreateIndexReader( "Unable to read the metaindex block." " Fall back to binary search index."); return BinarySearchIndexReader::Create( - file, footer, footer.index_handle(), env, comparator, index_reader); + file, footer, footer.index_handle(), env, comparator, + index_reader, rep_->persistent_cache_options); } meta_index_iter = meta_iter_guard.get(); } @@ -1573,7 +1600,7 @@ Status BlockBasedTable::CreateIndexReader( return HashIndexReader::Create( rep_->internal_prefix_transform.get(), footer, file, env, comparator, footer.index_handle(), meta_index_iter, index_reader, - rep_->hash_index_allow_collision); + rep_->hash_index_allow_collision, rep_->persistent_cache_options); } default: { std::string error_message = @@ -1691,8 +1718,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { BlockHandle handle; if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) { BlockContents block; - if (ReadBlockContents(rep_->file.get(), rep_->footer, ReadOptions(), - handle, &block, rep_->ioptions.env, false).ok()) { + if (ReadBlockContents( + rep_->file.get(), rep_->footer, ReadOptions(), handle, &block, + rep_->ioptions.env, false /*decompress*/, + Slice() /*compression dict*/, rep_->persistent_cache_options) + .ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( rep_->ioptions.prefix_extractor, table_options, table_options.whole_key_filtering, std::move(block))); diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 45b303e0e..37d760e01 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -15,11 +15,12 @@ #include #include "rocksdb/options.h" +#include "rocksdb/persistent_cache.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" -#include "table/table_reader.h" #include "table/table_properties_internal.h" +#include "table/table_reader.h" #include "util/coding.h" #include "util/file_reader_writer.h" @@ -54,6 +55,9 @@ class BlockBasedTable : public TableReader { public: static const std::string kFilterBlockPrefix; static const std::string kFullFilterBlockPrefix; + // The longest prefix of the cache key used to identify blocks. + // For Posix files the unique ID is three varints. + static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length * 3 + 1; // Attempt to open the table that is stored in bytes [0..file_size) // of "file", and read the metadata entries necessary to allow @@ -128,6 +132,10 @@ class BlockBasedTable : public TableReader { // Implementation of IndexReader will be exposed to internal cc file only. class IndexReader; + static Slice GetCacheKey(const char* cache_key_prefix, + size_t cache_key_prefix_size, + const BlockHandle& handle, char* cache_key); + private: template struct CachableEntry; @@ -229,10 +237,6 @@ class BlockBasedTable : public TableReader { static void GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer, size_t* size); - // The longest prefix of the cache key used to identify blocks. - // For Posix files the unique ID is three varints. - static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; - // Helper functions for DumpTable() Status DumpIndexBlock(WritableFile* out_file); Status DumpDataBlocks(WritableFile* out_file); diff --git a/table/format.cc b/table/format.cc index 628e08af1..7a62f66bb 100644 --- a/table/format.cc +++ b/table/format.cc @@ -14,6 +14,8 @@ #include "rocksdb/env.h" #include "table/block.h" +#include "table/block_based_table_reader.h" +#include "table/persistent_cache_helper.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -294,10 +296,12 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer, } // namespace Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& options, const BlockHandle& handle, - BlockContents* contents, Env* env, - bool decompression_requested, - const Slice& compression_dict) { + const ReadOptions& read_options, + const BlockHandle& handle, BlockContents* contents, + Env* env, bool decompression_requested, + const Slice& compression_dict, + const PersistentCacheOptions& cache_options, + Logger* info_log) { Status status; Slice slice; size_t n = static_cast(handle.size()); @@ -306,17 +310,63 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, char* used_buf = nullptr; rocksdb::CompressionType compression_type; - if (decompression_requested && - n + kBlockTrailerSize < DefaultStackBufferSize) { - // If we've got a small enough hunk of data, read it in to the - // trivially allocated stack buffer instead of needing a full malloc() - used_buf = &stack_buf[0]; - } else { - heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); - used_buf = heap_buf.get(); + if (cache_options.persistent_cache && + !cache_options.persistent_cache->IsCompressed()) { + status = PersistentCacheHelper::LookupUncompressedPage(cache_options, + handle, contents); + if (status.ok()) { + // uncompressed page is found for the block handle + return status; + } else { + // uncompressed page is not found + if (info_log && !status.IsNotFound()) { + assert(!status.ok()); + Log(InfoLogLevel::INFO_LEVEL, info_log, + "Error reading from persistent cache. %s", + status.ToString().c_str()); + } + } } - status = ReadBlock(file, footer, options, handle, &slice, used_buf); + if (cache_options.persistent_cache && + cache_options.persistent_cache->IsCompressed()) { + // lookup uncompressed cache mode p-cache + status = PersistentCacheHelper::LookupRawPage( + cache_options, handle, &heap_buf, n + kBlockTrailerSize); + } else { + status = Status::NotFound(); + } + + if (status.ok()) { + // cache hit + used_buf = heap_buf.get(); + slice = Slice(heap_buf.get(), n); + } else { + if (info_log && !status.IsNotFound()) { + assert(!status.ok()); + Log(InfoLogLevel::INFO_LEVEL, info_log, + "Error reading from persistent cache. %s", status.ToString().c_str()); + } + // cache miss read from device + if (decompression_requested && + n + kBlockTrailerSize < DefaultStackBufferSize) { + // If we've got a small enough hunk of data, read it in to the + // trivially allocated stack buffer instead of needing a full malloc() + used_buf = &stack_buf[0]; + } else { + heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); + used_buf = heap_buf.get(); + } + + status = ReadBlock(file, footer, read_options, handle, &slice, used_buf); + if (status.ok() && read_options.fill_cache && + cache_options.persistent_cache && + cache_options.persistent_cache->IsCompressed()) { + // insert to raw cache + PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf, + n + kBlockTrailerSize); + } + } if (!status.ok()) { return status; @@ -327,21 +377,29 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, compression_type = static_cast(slice.data()[n]); if (decompression_requested && compression_type != kNoCompression) { - return UncompressBlockContents(slice.data(), n, contents, footer.version(), - compression_dict); - } - - if (slice.data() != used_buf) { + // compressed page, uncompress, update cache + status = UncompressBlockContents(slice.data(), n, contents, + footer.version(), compression_dict); + } else if (slice.data() != used_buf) { + // the slice content is not the buffer provided *contents = BlockContents(Slice(slice.data(), n), false, compression_type); - return status; + } else { + // page is uncompressed, the buffer either stack or heap provided + if (used_buf == &stack_buf[0]) { + heap_buf = std::unique_ptr(new char[n]); + memcpy(heap_buf.get(), stack_buf, n); + } + *contents = BlockContents(std::move(heap_buf), n, true, compression_type); } - if (used_buf == &stack_buf[0]) { - heap_buf = std::unique_ptr(new char[n]); - memcpy(heap_buf.get(), stack_buf, n); + if (status.ok() && read_options.fill_cache && + cache_options.persistent_cache && + !cache_options.persistent_cache->IsCompressed()) { + // insert to uncompressed cache + PersistentCacheHelper::InsertUncompressedPage(cache_options, handle, + *contents); } - *contents = BlockContents(std::move(heap_buf), n, true, compression_type); return status; } @@ -447,6 +505,7 @@ Status UncompressBlockContents(const char* data, size_t n, default: return Status::Corruption("bad block type"); } + return Status::OK(); } diff --git a/table/format.h b/table/format.h index d9846d529..a488f8dd8 100644 --- a/table/format.h +++ b/table/format.h @@ -16,6 +16,7 @@ #include "rocksdb/table.h" #include "port/port.h" // noexcept +#include "table/persistent_cache_helper.h" namespace rocksdb { @@ -208,13 +209,13 @@ struct BlockContents { // Read the block identified by "handle" from "file". On failure // return non-OK. On success fill *result and return OK. -extern Status ReadBlockContents(RandomAccessFileReader* file, - const Footer& footer, - const ReadOptions& options, - const BlockHandle& handle, - BlockContents* contents, Env* env, - bool do_uncompress, - const Slice& compression_dict = Slice()); +extern Status ReadBlockContents( + RandomAccessFileReader* file, const Footer& footer, + const ReadOptions& options, const BlockHandle& handle, + BlockContents* contents, Env* env, bool do_uncompress = true, + const Slice& compression_dict = Slice(), + const PersistentCacheOptions& cache_options = PersistentCacheOptions(), + Logger* info_log = nullptr); // The 'data' points to the raw block contents read in from file. // This method allocates a new heap buffer and the raw block diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 7028d106c..2cbe9eca7 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -13,6 +13,7 @@ #include "table/block.h" #include "table/format.h" #include "table/internal_iterator.h" +#include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "util/coding.h" @@ -164,7 +165,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, read_options.verify_checksums = false; Status s; s = ReadBlockContents(file, footer, read_options, handle, &block_contents, - env, false); + env, false /* decompress */); if (!s.ok()) { return s; @@ -264,7 +265,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, ReadOptions read_options; read_options.verify_checksums = false; s = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, env, false); + &metaindex_contents, env, false /* decompress */); if (!s.ok()) { return s; } @@ -318,7 +319,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, ReadOptions read_options; read_options.verify_checksums = false; s = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, env, false); + &metaindex_contents, env, false /* do decompression */); if (!s.ok()) { return s; } @@ -347,7 +348,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, ReadOptions read_options; read_options.verify_checksums = false; status = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, env, false); + &metaindex_contents, env, false /* decompress */); if (!status.ok()) { return status; } @@ -367,7 +368,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, // Reading metablock return ReadBlockContents(file, footer, read_options, block_handle, contents, - env, false); + env, false /* decompress */); } } // namespace rocksdb diff --git a/table/persistent_cache_helper.cc b/table/persistent_cache_helper.cc new file mode 100644 index 000000000..d68c80735 --- /dev/null +++ b/table/persistent_cache_helper.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#include "table/persistent_cache_helper.h" +#include "table/format.h" + +namespace rocksdb { + +void PersistentCacheHelper::InsertRawPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + const char* data, const size_t size) { + assert(cache_options.persistent_cache); + assert(cache_options.persistent_cache->IsCompressed()); + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // insert content to cache + cache_options.persistent_cache->Insert(key, data, size); +} + +void PersistentCacheHelper::InsertUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + const BlockContents& contents) { + assert(cache_options.persistent_cache); + assert(!cache_options.persistent_cache->IsCompressed()); + if (!contents.cachable || contents.compression_type != kNoCompression) { + // We shouldn't cache this. Either + // (1) content is not cacheable + // (2) content is compressed + return; + } + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // insert block contents to page cache + cache_options.persistent_cache->Insert(key, contents.data.data(), + contents.data.size()); +} + +Status PersistentCacheHelper::LookupRawPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + std::unique_ptr* raw_data, const size_t raw_data_size) { + assert(cache_options.persistent_cache); + assert(cache_options.persistent_cache->IsCompressed()); + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // Lookup page + size_t size; + Status s = cache_options.persistent_cache->Lookup(key, raw_data, &size); + if (!s.ok()) { + // cache miss + RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS); + return s; + } + + // cache hit + assert(raw_data_size == handle.size() + kBlockTrailerSize); + assert(size == raw_data_size); + RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT); + return Status::OK(); +} + +Status PersistentCacheHelper::LookupUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + BlockContents* contents) { + assert(cache_options.persistent_cache); + assert(!cache_options.persistent_cache->IsCompressed()); + if (!contents) { + // We shouldn't lookup in the cache. Either + // (1) Nowhere to store + return Status::NotFound(); + } + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // Lookup page + std::unique_ptr data; + size_t size; + Status s = cache_options.persistent_cache->Lookup(key, &data, &size); + if (!s.ok()) { + // cache miss + RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS); + return s; + } + + // please note we are potentially comparing compressed data size with + // uncompressed data size + assert(handle.size() <= size); + + // update stats + RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT); + // construct result and return + *contents = + BlockContents(std::move(data), size, false /*cacheable*/, kNoCompression); + return Status::OK(); +} + +} // namespace rocksdb diff --git a/table/persistent_cache_helper.h b/table/persistent_cache_helper.h new file mode 100644 index 000000000..45a1f87d2 --- /dev/null +++ b/table/persistent_cache_helper.h @@ -0,0 +1,63 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include + +#include "table/block_based_table_reader.h" +#include "util/statistics.h" + +namespace rocksdb { + +struct BlockContents; + +// PersistentCacheOptions +// +// This describe the caching behavior for page cache +// This is used to pass the context for caching and the cache handle +struct PersistentCacheOptions { + PersistentCacheOptions() {} + explicit PersistentCacheOptions( + const std::shared_ptr& _persistent_cache, + const std::string _key_prefix, Statistics* const _statistics) + : persistent_cache(_persistent_cache), + key_prefix(_key_prefix), + statistics(_statistics) {} + + virtual ~PersistentCacheOptions() {} + + std::shared_ptr persistent_cache; + std::string key_prefix; + Statistics* statistics = nullptr; +}; + +// PersistentCacheHelper +// +// Encapsulates some of the helper logic for read and writing from the cache +class PersistentCacheHelper { + public: + // insert block into raw page cache + static void InsertRawPage(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, const char* data, + const size_t size); + + // insert block into uncompressed cache + static void InsertUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + const BlockContents& contents); + + // lookup block from raw page cacge + static Status LookupRawPage(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, + std::unique_ptr* raw_data, + const size_t raw_data_size); + + // lookup block from uncompressed cache + static Status LookupUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + BlockContents* contents); +}; + +} // namespace rocksdb diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 242d5f4c6..df288fdee 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2099,7 +2099,7 @@ class Benchmark { } } if (FLAGS_statistics) { - fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); + fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); } } diff --git a/util/io_posix.cc b/util/io_posix.cc index 05a7f2788..18238f900 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -119,6 +119,7 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { long version = 0; result = ioctl(fd, FS_IOC_GETVERSION, &version); + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); if (result == -1) { return 0; } diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index c05e3b803..f83ed9886 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -102,6 +102,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offsetof(struct BlockBasedTableOptions, block_cache), sizeof(std::shared_ptr)}, + {offsetof(struct BlockBasedTableOptions, persistent_cache), + sizeof(std::shared_ptr)}, {offsetof(struct BlockBasedTableOptions, block_cache_compressed), sizeof(std::shared_ptr)}, {offsetof(struct BlockBasedTableOptions, filter_policy),