Fix unaligned reads in read cache
Summary: - Fix unaligned reads in read cache by using RandomAccessFileReader - Allow read cache flags in db_bench Closes https://github.com/facebook/rocksdb/pull/1916 Differential Revision: D4610885 Pulled By: IslamAbdelRahman fbshipit-source-id: 2aa1dc8
This commit is contained in:
parent
8ad0fcdf99
commit
be3e5568be
@ -44,6 +44,7 @@
|
||||
#include "rocksdb/memtablerep.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
#include "rocksdb/persistent_cache.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
@ -67,6 +68,7 @@
|
||||
#include "util/xxhash.h"
|
||||
#include "utilities/blob_db/blob_db.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
#include "utilities/persistent_cache/block_cache_tier.h"
|
||||
|
||||
#ifdef OS_WIN
|
||||
#include <io.h> // open/close
|
||||
@ -445,6 +447,20 @@ DEFINE_bool(show_table_properties, false,
|
||||
|
||||
DEFINE_string(db, "", "Use the db with the following name.");
|
||||
|
||||
// Read cache flags
|
||||
|
||||
DEFINE_string(read_cache_path, "",
|
||||
"If not empty string, a read cache will be used in this path");
|
||||
|
||||
DEFINE_int64(read_cache_size, 4LL * 1024 * 1024 * 1024,
|
||||
"Maximum size of the read cache");
|
||||
|
||||
DEFINE_bool(read_cache_direct_write, true,
|
||||
"Whether to use Direct IO for writing to the read cache");
|
||||
|
||||
DEFINE_bool(read_cache_direct_read, true,
|
||||
"Whether to use Direct IO for reading from read cache");
|
||||
|
||||
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
|
||||
if (value >= 20) {
|
||||
fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
|
||||
@ -2901,6 +2917,45 @@ class Benchmark {
|
||||
FLAGS_skip_table_builder_flush;
|
||||
block_based_options.format_version = 2;
|
||||
block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
|
||||
if (FLAGS_read_cache_path != "") {
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status rc_status;
|
||||
|
||||
// Read cache need to be provided with a the Logger, we will put all
|
||||
// reac cache logs in the read cache path in a file named rc_LOG
|
||||
rc_status = FLAGS_env->CreateDirIfMissing(FLAGS_read_cache_path);
|
||||
std::shared_ptr<Logger> read_cache_logger;
|
||||
if (rc_status.ok()) {
|
||||
rc_status = FLAGS_env->NewLogger(FLAGS_read_cache_path + "/rc_LOG",
|
||||
&read_cache_logger);
|
||||
}
|
||||
|
||||
if (rc_status.ok()) {
|
||||
PersistentCacheConfig rc_cfg(FLAGS_env, FLAGS_read_cache_path,
|
||||
FLAGS_read_cache_size,
|
||||
read_cache_logger);
|
||||
|
||||
rc_cfg.enable_direct_reads = FLAGS_read_cache_direct_read;
|
||||
rc_cfg.enable_direct_writes = FLAGS_read_cache_direct_write;
|
||||
rc_cfg.writer_qdepth = 4;
|
||||
rc_cfg.writer_dispatch_size = 4 * 1024;
|
||||
|
||||
auto pcache = std::make_shared<BlockCacheTier>(rc_cfg);
|
||||
block_based_options.persistent_cache = pcache;
|
||||
rc_status = pcache->Open();
|
||||
}
|
||||
|
||||
if (!rc_status.ok()) {
|
||||
fprintf(stderr, "Error initializing read cache, %s\n",
|
||||
rc_status.ToString().c_str());
|
||||
exit(1);
|
||||
}
|
||||
#else
|
||||
fprintf(stderr, "Read cache is not supported in LITE\n");
|
||||
exit(1);
|
||||
|
||||
#endif
|
||||
}
|
||||
options.table_factory.reset(
|
||||
NewBlockBasedTableFactory(block_based_options));
|
||||
}
|
||||
|
@ -4,6 +4,8 @@
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
#pragma once
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#ifndef OS_WIN
|
||||
#include <unistd.h>
|
||||
#endif // ! OS_WIN
|
||||
@ -149,3 +151,5 @@ class BlockCacheTier : public PersistentCacheTier {
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
||||
|
@ -203,12 +203,15 @@ bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
|
||||
|
||||
Debug(log_, "Opening cache file %s", Path().c_str());
|
||||
|
||||
Status status = NewRandomAccessCacheFile(env_, Path(), &file_);
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
Status status =
|
||||
NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
|
||||
if (!status.ok()) {
|
||||
Error(log_, "Error opening random access file %s. %s", Path().c_str(),
|
||||
status.ToString().c_str());
|
||||
return false;
|
||||
}
|
||||
freader_.reset(new RandomAccessFileReader(std::move(file), env_));
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -219,12 +222,12 @@ bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
|
||||
|
||||
assert(lba.cache_id_ == cache_id_);
|
||||
|
||||
if (!file_) {
|
||||
if (!freader_) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Slice result;
|
||||
Status s = file_->Read(lba.off_, lba.size_, &result, scratch);
|
||||
Status s = freader_->Read(lba.off_, lba.size_, &result, scratch);
|
||||
if (!s.ok()) {
|
||||
Error(log_, "Error reading from file %s. %s", Path().c_str(),
|
||||
s.ToString().c_str());
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
#include "port/port.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/mutexlock.h"
|
||||
|
||||
// The io code path of persistent cache uses pipelined architecture
|
||||
@ -157,7 +158,7 @@ class RandomAccessCacheFile : public BlockCacheFile {
|
||||
bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
|
||||
|
||||
private:
|
||||
std::unique_ptr<RandomAccessFile> file_;
|
||||
std::unique_ptr<RandomAccessFileReader> freader_;
|
||||
|
||||
protected:
|
||||
bool OpenImpl(const bool enable_direct_reads);
|
||||
|
Loading…
Reference in New Issue
Block a user