Ability to configure bufferedio-reads, filesystem-readaheads and mmap-read-write per database.

Summary:
This patch allows an application to specify whether to use bufferedio,
reads-via-mmaps and writes-via-mmaps per database. Earlier, there
was a global static variable that was used to configure this functionality.

The default setting remains the same (and is backward compatible):
 1. use bufferedio
 2. do not use mmaps for reads
 3. use mmap for writes
 4. use readaheads for reads needed for compaction

I also added a parameter to db_bench to be able to explicitly specify
whether to do readaheads for compactions or not.

Test Plan: make check

Reviewers: sheki, heyongqiang, MarkCallaghan

Reviewed By: sheki

CC: leveldb

Differential Revision: https://reviews.facebook.net/D9429
This commit is contained in:
Dhruba Borthakur 2013-03-14 17:00:04 -07:00
parent 2adddeefcb
commit ad96563b79
33 changed files with 429 additions and 148 deletions

View File

@ -17,6 +17,7 @@ namespace leveldb {
Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
const StorageOptions& soptions,
TableCache* table_cache,
Iterator* iter,
FileMetaData* meta,
@ -38,7 +39,7 @@ Status BuildTable(const std::string& dbname,
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
unique_ptr<WritableFile> file;
s = env->NewWritableFile(fname, &file);
s = env->NewWritableFile(fname, &file, soptions);
if (!s.ok()) {
return s;
}
@ -118,6 +119,7 @@ Status BuildTable(const std::string& dbname,
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(),
soptions,
meta->number,
meta->file_size);
s = it->status();

View File

@ -8,6 +8,7 @@
#include "leveldb/comparator.h"
#include "leveldb/status.h"
#include "leveldb/types.h"
#include "util/storage_options.h"
namespace leveldb {
@ -27,6 +28,7 @@ class VersionEdit;
extern Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
const StorageOptions& soptions,
TableCache* table_cache,
Iterator* iter,
FileMetaData* meta,

View File

@ -269,10 +269,20 @@ static int FLAGS_source_compaction_factor = 1;
// Set the TTL for the WAL Files.
static uint64_t FLAGS_WAL_ttl_seconds = 0;
extern bool useOsBuffer;
extern bool useFsReadAhead;
extern bool useMmapRead;
extern bool useMmapWrite;
// Allow buffered io using OS buffers
static bool FLAGS_use_os_buffer;
// Allow filesystem to do read-aheads
static bool FLAGS_use_fsreadahead;
// Allow reads to occur via mmap-ing files
static bool FLAGS_use_mmap_reads;
// Allow writes to occur via mmap-ing files
static bool FLAGS_use_mmap_writes;
// Allow readaheads to occur for compactions
static bool FLAGS_use_readahead_compactions;
namespace leveldb {
@ -1099,6 +1109,13 @@ unique_ptr<char []> GenerateKeyFromInt(int v)
FLAGS_max_grandparent_overlap_factor;
options.disable_auto_compactions = FLAGS_disable_auto_compactions;
options.source_compaction_factor = FLAGS_source_compaction_factor;
// fill storage options
options.allow_os_buffer = FLAGS_use_os_buffer;
options.allow_readahead = FLAGS_use_fsreadahead;
options.allow_mmap_reads = FLAGS_use_mmap_reads;
options.allow_mmap_writes = FLAGS_use_mmap_writes;
options.allow_readahead_compactions = FLAGS_use_readahead_compactions;
Status s;
if(FLAGS_read_only) {
s = DB::OpenForReadOnly(options, FLAGS_db, &db_);
@ -1629,9 +1646,10 @@ unique_ptr<char []> GenerateKeyFromInt(int v)
void HeapProfile() {
char fname[100];
StorageOptions soptions;
snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
unique_ptr<WritableFile> file;
Status s = FLAGS_env->NewWritableFile(fname, &file);
Status s = FLAGS_env->NewWritableFile(fname, &file, soptions);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
@ -1654,6 +1672,13 @@ int main(int argc, char** argv) {
leveldb::Options().max_background_compactions;
// Compression test code above refers to FLAGS_block_size
FLAGS_block_size = leveldb::Options().block_size;
FLAGS_use_os_buffer = leveldb::StorageOptions().UseOsBuffer();
FLAGS_use_fsreadahead = leveldb::StorageOptions().UseReadahead();
FLAGS_use_mmap_reads = leveldb::StorageOptions().UseMmapReads();
FLAGS_use_mmap_writes = leveldb::StorageOptions().UseMmapWrites();
FLAGS_use_readahead_compactions =
leveldb::StorageOptions().UseReadaheadCompactions();
std::string default_db_path;
for (int i = 1; i < argc; i++) {
@ -1730,16 +1755,19 @@ int main(int argc, char** argv) {
FLAGS_verify_checksum = n;
} else if (sscanf(argv[i], "--bufferedio=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
useOsBuffer = n;
FLAGS_use_os_buffer = n;
} else if (sscanf(argv[i], "--mmap_read=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
useMmapRead = n;
FLAGS_use_mmap_reads = n;
} else if (sscanf(argv[i], "--mmap_write=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
useMmapWrite = n;
FLAGS_use_mmap_writes = n;
} else if (sscanf(argv[i], "--readahead=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
useFsReadAhead = n;
FLAGS_use_fsreadahead = n;
} else if (sscanf(argv[i], "--readahead_compactions=%d%c", &n, &junk) == 1&&
(n == 0 || n == 1)) {
FLAGS_use_readahead_compactions = n;
} else if (sscanf(argv[i], "--statistics=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
if (n == 1) {

View File

@ -162,7 +162,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
started_at_(options.env->NowMicros()),
flush_on_destroy_(false),
delayed_writes_(0),
last_flushed_sequence_(0) {
last_flushed_sequence_(0),
storage_options_(options) {
mem_->Ref();
@ -175,10 +176,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options_.max_open_files - 10;
table_cache_.reset(new TableCache(dbname_, &options_, table_cache_size));
table_cache_.reset(new TableCache(dbname_, &options_,
storage_options_, table_cache_size));
versions_.reset(new VersionSet(dbname_, &options_, table_cache_.get(),
&internal_comparator_));
versions_.reset(new VersionSet(dbname_, &options_, storage_options_,
table_cache_.get(), &internal_comparator_));
dumpLeveldbBuildVersion(options_.info_log.get());
options_.Dump(options_.info_log.get());
@ -262,7 +264,7 @@ Status DBImpl::NewDB() {
const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(manifest, &file);
Status s = env_->NewWritableFile(manifest, &file, storage_options_);
if (!s.ok()) {
return s;
}
@ -590,7 +592,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// Open the log file
std::string fname = LogFileName(dbname_, log_number);
unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file);
Status status = env_->NewSequentialFile(fname, &file, storage_options_);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
@ -683,7 +685,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta,
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable);
mutex_.Lock();
@ -734,7 +737,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta,
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable);
mutex_.Lock();
@ -905,6 +909,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
iter->reset(
new TransactionLogIteratorImpl(dbname_,
&options_,
storage_options_,
seq,
probableWALFiles,
&last_flushed_sequence_));
@ -1010,7 +1015,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
};
unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file);
Status status = env_->NewSequentialFile(fname, &file, storage_options_);
if (!status.ok()) {
return status;
@ -1395,7 +1400,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
// Make the output file
std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile);
Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
// Over-estimate slightly so we don't end up just barely crossing
// the threshold.
@ -1447,6 +1452,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
Iterator* iter = table_cache_->NewIterator(ReadOptions(),
storage_options_,
output_number,
current_bytes);
s = iter->status();
@ -1842,7 +1848,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
}
// Collect iterators for files in L0 - Ln
versions_->current()->AddIterators(options, &list);
versions_->current()->AddIterators(options, storage_options_, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
@ -2172,7 +2178,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile,
storage_options_);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
@ -2369,6 +2376,7 @@ DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = nullptr;
StorageOptions soptions;
if (options.block_cache != nullptr && options.no_block_cache) {
return Status::InvalidArgument(
@ -2387,7 +2395,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
&lfile, soptions);
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * options.write_buffer_size);
edit.SetLogNumber(new_log_number);

View File

@ -329,6 +329,9 @@ class DBImpl : public DB {
// Used by transaction log iterator.
SequenceNumber last_flushed_sequence_;
// The options to access storage files
const StorageOptions storage_options_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);

View File

@ -19,6 +19,7 @@
#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/storage_options.h"
namespace leveldb {
@ -100,7 +101,8 @@ class SpecialEnv : public EnvWrapper {
manifest_write_error_.Release_Store(nullptr);
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r) {
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& soptions) {
class SSTableFile : public WritableFile {
private:
SpecialEnv* env_;
@ -157,7 +159,7 @@ class SpecialEnv : public EnvWrapper {
return Status::IOError("simulated write error");
}
Status s = target()->NewWritableFile(f, r);
Status s = target()->NewWritableFile(f, r, soptions);
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != nullptr) {
r->reset(new SSTableFile(this, std::move(*r)));
@ -169,7 +171,8 @@ class SpecialEnv : public EnvWrapper {
}
Status NewRandomAccessFile(const std::string& f,
unique_ptr<RandomAccessFile>* r) {
unique_ptr<RandomAccessFile>* r,
const EnvOptions& soptions) {
class CountingFile : public RandomAccessFile {
private:
unique_ptr<RandomAccessFile> target_;
@ -186,7 +189,7 @@ class SpecialEnv : public EnvWrapper {
}
};
Status s = target()->NewRandomAccessFile(f, r);
Status s = target()->NewRandomAccessFile(f, r, soptions);
if (s.ok() && count_random_reads_) {
r->reset(new CountingFile(std::move(*r), &random_read_counter_));
}
@ -564,7 +567,8 @@ TEST(DBTest, LevelLimitReopen) {
TEST(DBTest, Preallocation) {
const std::string src = dbname_ + "/alloc_test";
unique_ptr<WritableFile> srcfile;
ASSERT_OK(env_->NewWritableFile(src, &srcfile));
const StorageOptions soptions;
ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
srcfile->SetPreallocationBlockSize(1024 * 1024);
// No writes should mean no preallocation
@ -2307,6 +2311,7 @@ TEST(DBTest, BloomFilter) {
TEST(DBTest, SnapshotFiles) {
Options options = CurrentOptions();
const StorageOptions soptions;
options.write_buffer_size = 100000000; // Large write buffer
Reopen(&options);
@ -2360,9 +2365,9 @@ TEST(DBTest, SnapshotFiles) {
}
}
unique_ptr<SequentialFile> srcfile;
ASSERT_OK(env_->NewSequentialFile(src, &srcfile));
ASSERT_OK(env_->NewSequentialFile(src, &srcfile, soptions));
unique_ptr<WritableFile> destfile;
ASSERT_OK(env_->NewWritableFile(dest, &destfile));
ASSERT_OK(env_->NewWritableFile(dest, &destfile, soptions));
char buffer[4096];
Slice slice;
@ -3122,7 +3127,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKeyComparator cmp(BytewiseComparator());
Options options;
VersionSet vset(dbname, &options, nullptr, &cmp);
StorageOptions sopt;
VersionSet vset(dbname, &options, sopt, nullptr, &cmp);
ASSERT_OK(vset.Recover());
VersionEdit vbase(vset.NumberLevels());
uint64_t fnum = 1;

View File

@ -52,7 +52,7 @@ class Repairer {
options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, 10);
table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10);
edit_ = new VersionEdit(options.num_levels);
}
@ -105,6 +105,7 @@ class Repairer {
std::vector<uint64_t> logs_;
std::vector<TableInfo> tables_;
uint64_t next_file_number_;
const StorageOptions storage_options_;
Status FindFiles() {
std::vector<std::string> filenames;
@ -169,7 +170,7 @@ class Repairer {
// Open the log file
std::string logname = LogFileName(dbname_, log);
unique_ptr<SequentialFile> lfile;
Status status = env_->NewSequentialFile(logname, &lfile);
Status status = env_->NewSequentialFile(logname, &lfile, storage_options_);
if (!status.ok()) {
return status;
}
@ -216,7 +217,8 @@ class Repairer {
FileMetaData meta;
meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta,
status = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0);
delete iter;
mem->Unref();
@ -258,7 +260,7 @@ class Repairer {
Status status = env_->GetFileSize(fname, &t->meta.file_size);
if (status.ok()) {
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), t->meta.number, t->meta.file_size);
ReadOptions(), storage_options_, t->meta.number, t->meta.file_size);
bool empty = true;
ParsedInternalKey parsed;
t->max_sequence = 0;
@ -296,7 +298,7 @@ class Repairer {
Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status status = env_->NewWritableFile(tmp, &file);
Status status = env_->NewWritableFile(tmp, &file, storage_options_);
if (!status.ok()) {
return status;
}

View File

@ -6,7 +6,6 @@
#include "db/filename.h"
#include "leveldb/env.h"
#include "leveldb/table.h"
#include "leveldb/statistics.h"
#include "util/coding.h"
@ -33,10 +32,12 @@ static void UnrefEntry(void* arg1, void* arg2) {
TableCache::TableCache(const std::string& dbname,
const Options* options,
const StorageOptions& storage_options,
int entries)
: env_(options->env),
dbname_(dbname),
options_(options),
storage_options_(storage_options),
cache_(NewLRUCache(entries, options->table_cache_numshardbits)) {
dbstatistics = options->statistics;
}
@ -44,7 +45,8 @@ TableCache::TableCache(const std::string& dbname,
TableCache::~TableCache() {
}
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Status TableCache::FindTable(const EnvOptions& toptions,
uint64_t file_number, uint64_t file_size,
Cache::Handle** handle, bool* tableIO) {
Status s;
char buf[sizeof(file_number)];
@ -58,10 +60,10 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
std::string fname = TableFileName(dbname_, file_number);
unique_ptr<RandomAccessFile> file;
unique_ptr<Table> table;
s = env_->NewRandomAccessFile(fname, &file);
s = env_->NewRandomAccessFile(fname, &file, toptions);
RecordTick(options_->statistics, NO_FILE_OPENS);
if (s.ok()) {
s = Table::Open(*options_, std::move(file), file_size, &table);
s = Table::Open(*options_, toptions, std::move(file), file_size, &table);
}
if (!s.ok()) {
@ -80,6 +82,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
}
Iterator* TableCache::NewIterator(const ReadOptions& options,
const EnvOptions& toptions,
uint64_t file_number,
uint64_t file_size,
Table** tableptr) {
@ -88,7 +91,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
}
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
Status s = FindTable(toptions, file_number, file_size, &handle);
if (!s.ok()) {
return NewErrorIterator(s);
}
@ -111,7 +114,8 @@ Status TableCache::Get(const ReadOptions& options,
void (*saver)(void*, const Slice&, const Slice&, bool),
bool* tableIO) {
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle, tableIO);
Status s = FindTable(storage_options_, file_number, file_size,
&handle, tableIO);
if (s.ok()) {
Table* t =
reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table.get();

View File

@ -10,9 +10,11 @@
#include <string>
#include <stdint.h>
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "leveldb/cache.h"
#include "leveldb/table.h"
#include "port/port.h"
#include "util/storage_options.h"
namespace leveldb {
@ -20,7 +22,8 @@ class Env;
class TableCache {
public:
TableCache(const std::string& dbname, const Options* options, int entries);
TableCache(const std::string& dbname, const Options* options,
const StorageOptions& storage_options, int entries);
~TableCache();
// Return an iterator for the specified file number (the corresponding
@ -31,6 +34,7 @@ class TableCache {
// the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
Iterator* NewIterator(const ReadOptions& options,
const EnvOptions& toptions,
uint64_t file_number,
uint64_t file_size,
Table** tableptr = nullptr);
@ -52,9 +56,11 @@ class TableCache {
Env* const env_;
const std::string dbname_;
const Options* options_;
const StorageOptions& storage_options_;
std::shared_ptr<Cache> cache_;
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**,
Status FindTable(const EnvOptions& toptions,
uint64_t file_number, uint64_t file_size, Cache::Handle**,
bool* tableIO = nullptr);
};

View File

@ -6,11 +6,13 @@ namespace leveldb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dbname,
const Options* options,
const StorageOptions& soptions,
SequenceNumber& seq,
std::vector<LogFile>* files,
SequenceNumber const * const lastFlushedSequence) :
dbname_(dbname),
options_(options),
soptions_(soptions),
sequenceNumber_(seq),
files_(files),
started_(false),
@ -36,15 +38,15 @@ Status TransactionLogIteratorImpl::OpenLogFile(
Env* env = options_->env;
if (logFile.type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber);
return env->NewSequentialFile(fname, file);
return env->NewSequentialFile(fname, file, soptions_);
} else {
std::string fname = LogFileName(dbname_, logFile.logNumber);
Status status = env->NewSequentialFile(fname, file);
Status status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
// If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dbname_, logFile.logNumber);
status = env->NewSequentialFile(fname, file);
status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
return Status::IOError(" Requested file not present in the dir");
}

View File

@ -10,6 +10,7 @@
#include "leveldb/transaction_log_iterator.h"
#include "db/log_file.h"
#include "db/log_reader.h"
#include "util/storage_options.h"
namespace leveldb {
@ -27,6 +28,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(const std::string& dbname,
const Options* options,
const StorageOptions& soptions,
SequenceNumber& seqNum,
std::vector<LogFile>* files,
SequenceNumber const * const lastFlushedSequence);
@ -47,6 +49,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
private:
const std::string& dbname_;
const Options* options_;
const StorageOptions& soptions_;
const uint64_t sequenceNumber_;
const std::vector<LogFile>* files_;
bool started_;

View File

@ -178,6 +178,7 @@ class Version::LevelFileNumIterator : public Iterator {
static Iterator* GetFileIterator(void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
const Slice& file_value) {
TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) {
@ -185,25 +186,28 @@ static Iterator* GetFileIterator(void* arg,
Status::Corruption("FileReader invoked with unexpected value"));
} else {
return cache->NewIterator(options,
soptions,
DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
}
}
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
const StorageOptions& soptions,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]),
&GetFileIterator, vset_->table_cache_, options);
&GetFileIterator, vset_->table_cache_, options, soptions);
}
void Version::AddIterators(const ReadOptions& options,
const StorageOptions& soptions,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(
vset_->table_cache_->NewIterator(
options, files_[0][i]->number, files_[0][i]->file_size));
options, soptions, files_[0][i]->number, files_[0][i]->file_size));
}
// For levels > 0, we can use a concatenating iterator that sequentially
@ -211,7 +215,7 @@ void Version::AddIterators(const ReadOptions& options,
// lazily.
for (int level = 1; level < vset_->NumberLevels(); level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
iters->push_back(NewConcatenatingIterator(options, soptions, level));
}
}
}
@ -887,6 +891,7 @@ class VersionSet::Builder {
VersionSet::VersionSet(const std::string& dbname,
const Options* options,
const StorageOptions& storage_options,
TableCache* table_cache,
const InternalKeyComparator* cmp)
: env_(options->env),
@ -904,7 +909,9 @@ VersionSet::VersionSet(const std::string& dbname,
current_(nullptr),
compactions_in_progress_(options_->num_levels),
current_version_number_(0),
last_observed_manifest_size_(0) {
last_observed_manifest_size_(0),
storage_options_(storage_options),
storage_options_compactions_(storage_options_) {
compact_pointer_ = new std::string[options_->num_levels];
Init(options_->num_levels);
AppendVersion(new Version(this, current_version_number_++));
@ -1002,7 +1009,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
unique_ptr<WritableFile> descriptor_file;
s = env_->NewWritableFile(new_manifest_file, &descriptor_file);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file,
storage_options_);
if (s.ok()) {
descriptor_log_.reset(new log::Writer(std::move(descriptor_file)));
s = WriteSnapshot(descriptor_log_.get());
@ -1147,7 +1155,7 @@ Status VersionSet::Recover() {
std::string dscname = dbname_ + "/" + current;
unique_ptr<SequentialFile> file;
s = env_->NewSequentialFile(dscname, &file);
s = env_->NewSequentialFile(dscname, &file, storage_options_);
if (!s.ok()) {
return s;
}
@ -1269,7 +1277,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
// Open the specified manifest file.
unique_ptr<SequentialFile> file;
Status s = options.env->NewSequentialFile(dscname, &file);
Status s = options.env->NewSequentialFile(dscname, &file, storage_options_);
if (!s.ok()) {
return s;
}
@ -1579,7 +1587,7 @@ bool VersionSet::ManifestContains(const std::string& record) const {
std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
unique_ptr<SequentialFile> file;
Status s = env_->NewSequentialFile(fname, &file);
Status s = env_->NewSequentialFile(fname, &file, storage_options_);
if (!s.ok()) {
Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
Log(options_->info_log,
@ -1623,7 +1631,8 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
// approximate offset of "ikey" within the table.
Table* tableptr;
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
ReadOptions(), storage_options_, files[i]->number,
files[i]->file_size, &tableptr);
if (tableptr != nullptr) {
result += tableptr->ApproximateOffsetOf(ikey.Encode());
}
@ -1735,13 +1744,14 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(
options, files[i]->number, files[i]->file_size);
options, storage_options_compactions_,
files[i]->number, files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
&GetFileIterator, table_cache_, options, storage_options_);
}
}
}

View File

@ -63,7 +63,8 @@ class Version {
// Append to *iters a sequence of iterators that will
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
void AddIterators(const ReadOptions&, const StorageOptions& soptions,
std::vector<Iterator*>* iters);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
@ -136,7 +137,9 @@ class Version {
friend class VersionSet;
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
Iterator* NewConcatenatingIterator(const ReadOptions&,
const StorageOptions& soptions,
int level) const;
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
@ -204,6 +207,7 @@ class VersionSet {
public:
VersionSet(const std::string& dbname,
const Options* options,
const StorageOptions& storage_options,
TableCache* table_cache,
const InternalKeyComparator*);
~VersionSet();
@ -454,6 +458,13 @@ class VersionSet {
// Save us the cost of checking file size twice in LogAndApply
uint64_t last_observed_manifest_size_;
// storage options for all reads and writes except compactions
const StorageOptions& storage_options_;
// storage options used for compactions. This is a copy of
// storage_options_ but with readaheads set to readahead_compactions_.
const StorageOptions storage_options_compactions_;
// No copying allowed
VersionSet(const VersionSet&);
void operator=(const VersionSet&);

View File

@ -235,15 +235,18 @@ class HdfsEnv : public Env {
}
virtual Status NewSequentialFile(const std::string& fname,
unique_ptr<SequentialFile>* result);
unique_ptr<SequentialFile>* result,
const EnvOptions& options);
virtual Status NewRandomAccessFile(const std::string& fname,
unique_ptr<RandomAccessFile>* result) {
unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) {
return notsup;
}
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result) {
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
return notsup;
}

View File

@ -233,7 +233,8 @@ class InMemoryEnv : public EnvWrapper {
// Partial implementation of the Env interface.
virtual Status NewSequentialFile(const std::string& fname,
unique_ptr<SequentialFile>* result) {
unique_ptr<SequentialFile>* result,
const EnvOptions& soptions) {
MutexLock lock(&mutex_);
if (file_map_.find(fname) == file_map_.end()) {
*result = NULL;
@ -245,7 +246,8 @@ class InMemoryEnv : public EnvWrapper {
}
virtual Status NewRandomAccessFile(const std::string& fname,
unique_ptr<RandomAccessFile>* result) {
unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) {
MutexLock lock(&mutex_);
if (file_map_.find(fname) == file_map_.end()) {
*result = NULL;
@ -257,7 +259,8 @@ class InMemoryEnv : public EnvWrapper {
}
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result) {
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) {
MutexLock lock(&mutex_);
if (file_map_.find(fname) != file_map_.end()) {
DeleteFileInternal(fname);

View File

@ -8,6 +8,7 @@
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "util/testharness.h"
#include "util/storage_options.h"
#include <memory>
#include <string>
#include <vector>
@ -17,6 +18,7 @@ namespace leveldb {
class MemEnvTest {
public:
Env* env_;
const StorageOptions soptions_;
MemEnvTest()
: env_(NewMemEnv(Env::Default())) {
@ -40,7 +42,7 @@ TEST(MemEnvTest, Basics) {
ASSERT_EQ(0U, children.size());
// Create a file.
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
writable_file.reset();
// Check that the file exists.
@ -52,7 +54,7 @@ TEST(MemEnvTest, Basics) {
ASSERT_EQ("f", children[0]);
// Write to the file.
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
ASSERT_OK(writable_file->Append("abc"));
writable_file.reset();
@ -71,9 +73,11 @@ TEST(MemEnvTest, Basics) {
// Check that opening non-existent file fails.
unique_ptr<SequentialFile> seq_file;
unique_ptr<RandomAccessFile> rand_file;
ASSERT_TRUE(!env_->NewSequentialFile("/dir/non_existent", &seq_file).ok());
ASSERT_TRUE(!env_->NewSequentialFile("/dir/non_existent", &seq_file,
soptions_).ok());
ASSERT_TRUE(!seq_file);
ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file).ok());
ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
soptions_).ok());
ASSERT_TRUE(!rand_file);
// Check that deleting works.
@ -94,13 +98,13 @@ TEST(MemEnvTest, ReadWrite) {
ASSERT_OK(env_->CreateDir("/dir"));
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
ASSERT_OK(writable_file->Append("hello "));
ASSERT_OK(writable_file->Append("world"));
writable_file.reset();
// Read sequentially.
ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file));
ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello".
ASSERT_EQ(0, result.compare("hello"));
ASSERT_OK(seq_file->Skip(1));
@ -113,7 +117,7 @@ TEST(MemEnvTest, ReadWrite) {
ASSERT_EQ(0U, result.size());
// Random reads.
ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file));
ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_));
ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world".
ASSERT_EQ(0, result.compare("world"));
ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello".
@ -139,7 +143,7 @@ TEST(MemEnvTest, Misc) {
ASSERT_TRUE(!test_dir.empty());
unique_ptr<WritableFile> writable_file;
ASSERT_OK(env_->NewWritableFile("/a/b", &writable_file));
ASSERT_OK(env_->NewWritableFile("/a/b", &writable_file, soptions_));
// These are no-ops, but we test they return success.
ASSERT_OK(writable_file->Sync());
@ -158,14 +162,14 @@ TEST(MemEnvTest, LargeWrite) {
}
unique_ptr<WritableFile> writable_file;
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
ASSERT_OK(writable_file->Append("foo"));
ASSERT_OK(writable_file->Append(write_data));
writable_file.reset();
unique_ptr<SequentialFile> seq_file;
Slice result;
ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file));
ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo".
ASSERT_EQ(0, result.compare("foo"));

View File

@ -23,6 +23,7 @@
namespace leveldb {
class FileLock;
class EnvOptions;
class Logger;
class RandomAccessFile;
class SequentialFile;
@ -51,7 +52,9 @@ class Env {
//
// The returned file will only be accessed by one thread at a time.
virtual Status NewSequentialFile(const std::string& fname,
unique_ptr<SequentialFile>* result) = 0;
unique_ptr<SequentialFile>* result,
const EnvOptions& options)
= 0;
// Create a brand new random access read-only file with the
// specified name. On success, stores a pointer to the new file in
@ -61,7 +64,9 @@ class Env {
//
// The returned file may be concurrently accessed by multiple threads.
virtual Status NewRandomAccessFile(const std::string& fname,
unique_ptr<RandomAccessFile>* result) = 0;
unique_ptr<RandomAccessFile>* result,
const EnvOptions& options)
= 0;
// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
@ -71,7 +76,8 @@ class Env {
//
// The returned file will only be accessed by one thread at a time.
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result) = 0;
unique_ptr<WritableFile>* result,
const EnvOptions& options) = 0;
// Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0;
@ -228,7 +234,7 @@ class RandomAccessFile {
// the file is opened (and will stay the same while the file is open).
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an
// ID can be created this function returns the length of the ID and places it
// in "id"; otherwise, this function returns 0, in which case "id" may more
// in "id"; otherwise, this function returns 0, in which case "id"
// may not have been modified.
//
// This function guarantees, for IDs from a given environment, two unique ids
@ -363,6 +369,24 @@ class FileLock {
void operator=(const FileLock&);
};
// Options while opening a file to read/write
class EnvOptions {
public:
virtual ~EnvOptions() {}
// If true, then allow caching of data in environment buffers
virtual bool UseOsBuffer() const = 0;
// If true, then allow the environment to readahead data
virtual bool UseReadahead() const = 0;
// If true, then use mmap to read data
virtual bool UseMmapReads() const = 0;
// If true, then use mmap to write data
virtual bool UseMmapWrites() const = 0;
};
// Log the specified data to *info_log if info_log is non-nullptr.
extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
@ -398,15 +422,18 @@ class EnvWrapper : public Env {
// The following text is boilerplate that forwards all methods to target()
Status NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r) {
return target_->NewSequentialFile(f, r);
unique_ptr<SequentialFile>* r,
const EnvOptions& options) {
return target_->NewSequentialFile(f, r, options);
}
Status NewRandomAccessFile(const std::string& f,
unique_ptr<RandomAccessFile>* r) {
return target_->NewRandomAccessFile(f, r);
unique_ptr<RandomAccessFile>* r,
const EnvOptions& options) {
return target_->NewRandomAccessFile(f, r, options);
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r) {
return target_->NewWritableFile(f, r);
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& options) {
return target_->NewWritableFile(f, r, options);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {

View File

@ -396,6 +396,25 @@ struct Options {
// Purge duplicate/deleted keys when a memtable is flushed to storage.
// Default: true
bool purge_redundant_kvs_while_flush;
// Data being read from file storage may be buffered in the OS
// Default: true
bool allow_os_buffer;
// Reading a single block from a file can cause the OS/FS to start
// readaheads of other blocks from the file. Default: true
bool allow_readahead;
// The reads triggered by compaction allows data to be readahead
// by the OS/FS. This overrides the setting of 'allow_readahead'
// for compaction-reads. Default: true
bool allow_readahead_compactions;
// Allow the OS to mmap file for reading. Default: false
bool allow_mmap_reads;
// Allow the OS to mmap file for writing. Default: true
bool allow_mmap_writes;
};
// Options that control read operations

View File

@ -8,6 +8,7 @@
#include <memory>
#include <stdint.h>
#include "leveldb/iterator.h"
#include "leveldb/env.h"
namespace leveldb {
@ -39,6 +40,7 @@ class Table {
//
// *file must remain live while this Table is in use.
static Status Open(const Options& options,
const EnvOptions& soptions,
unique_ptr<RandomAccessFile>&& file,
uint64_t file_size,
unique_ptr<Table>* table);
@ -67,7 +69,8 @@ class Table {
Rep* rep_;
explicit Table(Rep* rep) { rep_ = rep; }
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);
static Iterator* BlockReader(void*, const ReadOptions&,
const EnvOptions& soptions, const Slice&);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO);

View File

@ -29,8 +29,12 @@ struct Table::Rep {
delete [] filter_data;
delete index_block;
}
Rep(const EnvOptions& storage_options) :
soptions(storage_options) {
}
Options options;
const EnvOptions& soptions;
Status status;
unique_ptr<RandomAccessFile> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
@ -62,6 +66,7 @@ void Table::SetupCacheKeyPrefix(Rep* rep) {
}
Status Table::Open(const Options& options,
const EnvOptions& soptions,
unique_ptr<RandomAccessFile>&& file,
uint64_t size,
unique_ptr<Table>* table) {
@ -100,7 +105,7 @@ Status Table::Open(const Options& options,
if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Rep* rep = new Table::Rep;
Rep* rep = new Table::Rep(soptions);
rep->options = options;
rep->file = std::move(file);
rep->metaindex_handle = footer.metaindex_handle();
@ -260,6 +265,7 @@ Iterator* Table::BlockReader(void* arg,
Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
const Slice& index_value) {
return BlockReader(arg, options, index_value, nullptr);
}
@ -267,7 +273,7 @@ Iterator* Table::BlockReader(void* arg,
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
&Table::BlockReader, const_cast<Table*>(this), options, rep_->soptions);
}
Status Table::InternalGet(const ReadOptions& options, const Slice& k,

View File

@ -257,7 +257,7 @@ class TableConstructor: public Constructor {
// Open the table
uniq_id_ = cur_uniq_id_++;
source_.reset(new StringSource(sink_->contents(), uniq_id_));
return Table::Open(options, std::move(source_),
return Table::Open(options, soptions, std::move(source_),
sink_->contents().size(), &table_);
}
@ -271,7 +271,7 @@ class TableConstructor: public Constructor {
virtual Status Reopen(const Options& options) {
source_.reset(new StringSource(sink_->contents(), uniq_id_));
return Table::Open(options, std::move(source_),
return Table::Open(options, soptions, std::move(source_),
sink_->contents().size(), &table_);
}
@ -295,6 +295,7 @@ class TableConstructor: public Constructor {
TableConstructor();
static uint64_t cur_uniq_id_;
const StorageOptions soptions;
};
uint64_t TableConstructor::cur_uniq_id_ = 1;

View File

@ -13,7 +13,8 @@ namespace leveldb {
namespace {
typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&);
typedef Iterator* (*BlockFunction)(void*, const ReadOptions&,
const EnvOptions& soptions, const Slice&);
class TwoLevelIterator: public Iterator {
public:
@ -21,7 +22,8 @@ class TwoLevelIterator: public Iterator {
Iterator* index_iter,
BlockFunction block_function,
void* arg,
const ReadOptions& options);
const ReadOptions& options,
const EnvOptions& soptions);
virtual ~TwoLevelIterator();
@ -65,6 +67,7 @@ class TwoLevelIterator: public Iterator {
BlockFunction block_function_;
void* arg_;
const ReadOptions options_;
const EnvOptions& soptions_;
Status status_;
IteratorWrapper index_iter_;
IteratorWrapper data_iter_; // May be nullptr
@ -77,10 +80,12 @@ TwoLevelIterator::TwoLevelIterator(
Iterator* index_iter,
BlockFunction block_function,
void* arg,
const ReadOptions& options)
const ReadOptions& options,
const EnvOptions& soptions)
: block_function_(block_function),
arg_(arg),
options_(options),
soptions_(soptions),
index_iter_(index_iter),
data_iter_(nullptr) {
}
@ -163,7 +168,7 @@ void TwoLevelIterator::InitDataBlock() {
// data_iter_ is already constructed with this iterator, so
// no need to change anything
} else {
Iterator* iter = (*block_function_)(arg_, options_, handle);
Iterator* iter = (*block_function_)(arg_, options_, soptions_, handle);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
@ -176,8 +181,10 @@ Iterator* NewTwoLevelIterator(
Iterator* index_iter,
BlockFunction block_function,
void* arg,
const ReadOptions& options) {
return new TwoLevelIterator(index_iter, block_function, arg, options);
const ReadOptions& options,
const EnvOptions& soptions) {
return new TwoLevelIterator(index_iter, block_function, arg,
options, soptions);
}
} // namespace leveldb

View File

@ -6,6 +6,7 @@
#define STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_
#include "leveldb/iterator.h"
#include "leveldb/env.h"
namespace leveldb {
@ -25,9 +26,11 @@ extern Iterator* NewTwoLevelIterator(
Iterator* (*block_function)(
void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
const Slice& index_value),
void* arg,
const ReadOptions& options);
const ReadOptions& options,
const EnvOptions& soptions);
} // namespace leveldb

View File

@ -17,6 +17,7 @@
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/storage_options.h"
static int verbose = 0;
static int output_hex = 0;
@ -61,12 +62,13 @@ int main(int argc, char** argv) {
}
Options options;
StorageOptions sopt;
std::string file(manifestfile);
std::string dbname("dummy");
TableCache* tc = new TableCache(dbname, &options, 10);
TableCache* tc = new TableCache(dbname, &options, sopt, 10);
const InternalKeyComparator* cmp = new InternalKeyComparator(options.comparator);
VersionSet* versions = new VersionSet(dbname, &options,
VersionSet* versions = new VersionSet(dbname, &options, sopt,
tc, cmp);
Status s = versions->DumpManifest(options, file, verbose, output_hex);
if (!s.ok()) {

View File

@ -34,6 +34,7 @@ private:
uint64_t read_num_;
bool verify_checksum_;
bool output_hex_;
StorageOptions soptions_;
};
SstFileReader::SstFileReader(std::string file_path,
@ -48,13 +49,14 @@ Status SstFileReader::ReadSequential(bool print_kv, uint64_t read_num)
unique_ptr<Table> table;
Options table_options;
unique_ptr<RandomAccessFile> file;
Status s = table_options.env->NewRandomAccessFile(file_name_, &file);
Status s = table_options.env->NewRandomAccessFile(file_name_, &file,
soptions_);
if(!s.ok()) {
return s;
}
uint64_t file_size;
table_options.env->GetFileSize(file_name_, &file_size);
s = Table::Open(table_options, std::move(file), file_size, &table);
s = Table::Open(table_options, soptions_, std::move(file), file_size, &table);
if(!s.ok()) {
return s;
}

View File

@ -3,6 +3,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/env.h"
#include "util/storage_options.h"
namespace leveldb {
@ -46,7 +47,8 @@ static Status DoWriteStringToFile(Env* env, const Slice& data,
const std::string& fname,
bool should_sync) {
unique_ptr<WritableFile> file;
Status s = env->NewWritableFile(fname, &file);
StorageOptions soptions;
Status s = env->NewWritableFile(fname, &file, soptions);
if (!s.ok()) {
return s;
}
@ -71,9 +73,10 @@ Status WriteStringToFileSync(Env* env, const Slice& data,
}
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
StorageOptions soptions;
data->clear();
unique_ptr<SequentialFile> file;
Status s = env->NewSequentialFile(fname, &file);
Status s = env->NewSequentialFile(fname, &file, soptions);
if (!s.ok()) {
return s;
}

View File

@ -516,7 +516,8 @@ Status HdfsEnv::NewLogger(const std::string& fname,
#include "hdfs/env_hdfs.h"
namespace leveldb {
Status HdfsEnv::NewSequentialFile(const std::string& fname,
unique_ptr<SequentialFile>* result) {
unique_ptr<SequentialFile>* result,
const EnvOptions& options) {
return Status::NotSupported("Not compiled with hdfs support");
}
}

View File

@ -64,10 +64,17 @@ class PosixSequentialFile: public SequentialFile {
private:
std::string filename_;
FILE* file_;
int fd_;
bool use_os_buffer = true;
public:
PosixSequentialFile(const std::string& fname, FILE* f)
: filename_(fname), file_(f) { }
PosixSequentialFile(const std::string& fname, FILE* f,
const EnvOptions& options)
: filename_(fname), file_(f) {
fd_ = fileno(f);
assert(!options.UseMmapReads());
use_os_buffer = options.UseOsBuffer();
}
virtual ~PosixSequentialFile() { fclose(file_); }
virtual Status Read(size_t n, Slice* result, char* scratch) {
@ -82,6 +89,11 @@ class PosixSequentialFile: public SequentialFile {
s = IOError(filename_, errno);
}
}
if (!use_os_buffer) {
// we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached.
posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
}
return s;
}
@ -98,14 +110,17 @@ class PosixRandomAccessFile: public RandomAccessFile {
private:
std::string filename_;
int fd_;
bool use_os_buffer = true;
public:
PosixRandomAccessFile(const std::string& fname, int fd)
PosixRandomAccessFile(const std::string& fname, int fd,
const EnvOptions& options)
: filename_(fname), fd_(fd) {
if (!useFsReadAhead) {
// disable read-aheads
assert(!options.UseMmapReads());
if (!options.UseReadahead()) { // disable read-aheads
posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
}
use_os_buffer = options.UseOsBuffer();
}
virtual ~PosixRandomAccessFile() { close(fd_); }
@ -118,7 +133,7 @@ class PosixRandomAccessFile: public RandomAccessFile {
// An error: return a non-ok status
s = IOError(filename_, errno);
}
if (!useOsBuffer) {
if (!use_os_buffer) {
// we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached.
posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
@ -165,8 +180,13 @@ class PosixMmapReadableFile: public RandomAccessFile {
public:
// base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile(const std::string& fname, void* base, size_t length)
: filename_(fname), mmapped_region_(base), length_(length) { }
PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
const EnvOptions& options)
: filename_(fname), mmapped_region_(base), length_(length) {
assert(options.UseMmapReads());
assert(options.UseOsBuffer());
assert(options.UseReadahead());
}
virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
@ -259,7 +279,8 @@ class PosixMmapFile : public WritableFile {
}
public:
PosixMmapFile(const std::string& fname, int fd, size_t page_size)
PosixMmapFile(const std::string& fname, int fd, size_t page_size,
const EnvOptions& options)
: filename_(fname),
fd_(fd),
page_size_(page_size),
@ -271,6 +292,7 @@ class PosixMmapFile : public WritableFile {
file_offset_(0),
pending_sync_(false) {
assert((page_size & (page_size - 1)) == 0);
assert(options.UseMmapWrites());
}
@ -409,7 +431,8 @@ class PosixWritableFile : public WritableFile {
bool pending_fsync_;
public:
PosixWritableFile(const std::string& fname, int fd, size_t capacity) :
PosixWritableFile(const std::string& fname, int fd, size_t capacity,
const EnvOptions& options) :
filename_(fname),
fd_(fd),
cursize_(0),
@ -418,6 +441,7 @@ class PosixWritableFile : public WritableFile {
filesize_(0),
pending_sync_(false),
pending_fsync_(false) {
assert(!options.UseMmapWrites());
}
~PosixWritableFile() {
@ -585,26 +609,28 @@ class PosixEnv : public Env {
}
virtual Status NewSequentialFile(const std::string& fname,
unique_ptr<SequentialFile>* result) {
unique_ptr<SequentialFile>* result,
const EnvOptions& options) {
result->reset();
FILE* f = fopen(fname.c_str(), "r");
if (f == nullptr) {
*result = nullptr;
return IOError(fname, errno);
} else {
result->reset(new PosixSequentialFile(fname, f));
result->reset(new PosixSequentialFile(fname, f, options));
return Status::OK();
}
}
virtual Status NewRandomAccessFile(const std::string& fname,
unique_ptr<RandomAccessFile>* result) {
unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) {
result->reset();
Status s;
int fd = open(fname.c_str(), O_RDONLY);
if (fd < 0) {
s = IOError(fname, errno);
} else if (useMmapRead && sizeof(void*) >= 8) {
} else if (options.UseMmapReads() && sizeof(void*) >= 8) {
// Use of mmap for random reads has been removed because it
// kills performance when storage is fast.
// Use mmap when virtual address-space is plentiful.
@ -613,39 +639,41 @@ class PosixEnv : public Env {
if (s.ok()) {
void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
if (base != MAP_FAILED) {
result->reset(new PosixMmapReadableFile(fname, base, size));
result->reset(new PosixMmapReadableFile(fname, base, size, options));
} else {
s = IOError(fname, errno);
}
}
close(fd);
} else {
result->reset(new PosixRandomAccessFile(fname, fd));
result->reset(new PosixRandomAccessFile(fname, fd, options));
}
return s;
}
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result) {
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
result->reset();
Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
if (fd < 0) {
s = IOError(fname, errno);
} else {
if (options.UseMmapWrites()) {
if (!checkedDiskForMmap_) {
// this will be executed once in the program's lifetime.
if (useMmapWrite) {
// do not use mmapWrite on non ext-3/xfs/tmpfs systems.
useMmapWrite = SupportsFastAllocate(fname);
if (!SupportsFastAllocate(fname)) {
forceMmapOff = true;
}
checkedDiskForMmap_ = true;
}
if (useMmapWrite) {
result->reset(new PosixMmapFile(fname, fd, page_size_));
}
if (options.UseMmapWrites() && !forceMmapOff) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
} else {
result->reset(new PosixWritableFile(fname, fd, 65536));
result->reset(new PosixWritableFile(fname, fd, 65536, options));
}
}
return s;
@ -880,6 +908,7 @@ class PosixEnv : public Env {
private:
bool checkedDiskForMmap_ = false;
bool forceMmapOff = false; // do we override Env options?
void PthreadCall(const char* label, int result) {
if (result != 0) {

View File

@ -8,6 +8,7 @@
#include "port/port.h"
#include "util/coding.h"
#include "util/testharness.h"
#include "util/storage_options.h"
namespace leveldb {
@ -119,21 +120,22 @@ char temp_id[MAX_ID_SIZE];
TEST(EnvPosixTest, RandomAccessUniqueID) {
// Create file.
const StorageOptions soptions;
std::string fname = test::TmpDir() + "/" + "testfile";
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile));
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
unique_ptr<RandomAccessFile> file;
// Get Unique ID
ASSERT_OK(env_->NewRandomAccessFile(fname, &file));
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id1(temp_id, id_size);
ASSERT_TRUE(IsUniqueIDValid(unique_id1));
// Get Unique ID again
ASSERT_OK(env_->NewRandomAccessFile(fname, &file));
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id2(temp_id, id_size);
@ -141,7 +143,7 @@ TEST(EnvPosixTest, RandomAccessUniqueID) {
// Get Unique ID again after waiting some time.
env_->SleepForMicroseconds(1000000);
ASSERT_OK(env_->NewRandomAccessFile(fname, &file));
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
std::string unique_id3(temp_id, id_size);
@ -172,6 +174,7 @@ bool HasPrefix(const std::unordered_set<std::string>& ss) {
TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
// Check whether a bunch of concurrently existing files have unique IDs.
const StorageOptions soptions;
// Create the files
std::vector<std::string> fnames;
@ -180,7 +183,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
// Create file.
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile));
ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions));
}
// Collect and check whether the IDs are unique.
@ -188,7 +191,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
for (const std::string fname: fnames) {
unique_ptr<RandomAccessFile> file;
std::string unique_id;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file));
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
unique_id = std::string(temp_id, id_size);
@ -207,6 +210,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
}
TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) {
const StorageOptions soptions;
std::string fname = test::TmpDir() + "/" + "testfile";
// Check that after file is deleted we don't get same ID again in a new file.
@ -215,14 +219,14 @@ TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) {
// Create file.
{
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile));
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
}
// Get Unique ID
std::string unique_id;
{
unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file));
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
ASSERT_TRUE(id_size > 0);
unique_id = std::string(temp_id, id_size);

View File

@ -577,9 +577,10 @@ leveldb::Options ReduceDBLevelsCommand::PrepareOptionsForOpenDB() {
Status ReduceDBLevelsCommand::GetOldNumOfLevels(leveldb::Options& opt,
int* levels) {
TableCache tc(db_path_, &opt, 10);
StorageOptions soptions;
TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, &tc, &cmp);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change
// (like LogAndApply) to the manifest file.
@ -633,9 +634,10 @@ void ReduceDBLevelsCommand::DoCommand() {
db_->CompactRange(nullptr, nullptr);
CloseDB();
TableCache tc(db_path_, &opt, 10);
StorageOptions soptions;
TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, &tc, &cmp);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change (like LogAndApply)
// to the manifest file.
@ -724,7 +726,8 @@ void WALDumperCommand::DoCommand() {
unique_ptr<SequentialFile> file;
Env* env_ = Env::Default();
Status status = env_->NewSequentialFile(wal_file_, &file);
StorageOptions soptions;
Status status = env_->NewSequentialFile(wal_file_, &file, soptions);
if (!status.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED("Failed to open WAL file " +
status.ToString());

View File

@ -60,8 +60,12 @@ Options::Options()
disable_auto_compactions(false),
WAL_ttl_seconds(0),
manifest_preallocation_size(4 * 1024 * 1024),
purge_redundant_kvs_while_flush(true) {
purge_redundant_kvs_while_flush(true),
allow_os_buffer(true),
allow_readahead(true),
allow_readahead_compactions(true),
allow_mmap_reads(false),
allow_mmap_writes(true) {
}
void
@ -103,6 +107,12 @@ Options::Dump(Logger* log) const
Log(log," Options.keep_log_file_num: %ld", keep_log_file_num);
Log(log," Options.db_stats_log_interval: %d",
db_stats_log_interval);
Log(log," Options.allow_os_buffer: %d", allow_os_buffer);
Log(log," Options.allow_readahead: %d", allow_readahead);
Log(log," Options.allow_mmap_reads: %d", allow_mmap_reads);
Log(log," Options.allow_mmap_writes: %d", allow_mmap_writes);
Log(log," Options.allow_readahead_compactions: %d",
allow_readahead_compactions);
Log(log," Options.purge_redundant_kvs_while_flush: %d",
purge_redundant_kvs_while_flush);
Log(log," Options.compression_opts.window_bits: %d",

63
util/storage_options.h Normal file
View File

@ -0,0 +1,63 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#ifndef STORAGE_LEVELDB_UTIL_STORAGE_OPTIONS_H_
#define STORAGE_LEVELDB_UTIL_STORAGE_OPTIONS_H_
#include <string>
#include <stdint.h>
#include "leveldb/env.h"
#include "leveldb/options.h"
namespace leveldb {
// Environment Options that are used to read files from storage
class StorageOptions : public EnvOptions {
public:
/* implicit */ StorageOptions(const Options& opt) :
data_in_os_(opt.allow_os_buffer),
fs_readahead_(opt.allow_readahead),
readahead_compactions_(opt.allow_readahead_compactions),
use_mmap_reads_(opt.allow_mmap_reads),
use_mmap_writes_(opt.allow_mmap_writes) {
}
// copy constructor with readaheads set to readahead_compactions_
StorageOptions(const StorageOptions& opt) {
data_in_os_ = opt.UseOsBuffer();
fs_readahead_ = opt.UseReadaheadCompactions();
readahead_compactions_ = opt.UseReadaheadCompactions();
use_mmap_reads_ = opt.UseMmapReads();
use_mmap_writes_ = opt.UseMmapWrites();
}
// constructor with default options
StorageOptions() {
Options opt;
data_in_os_ = opt.allow_os_buffer;
fs_readahead_ = opt.allow_readahead;
readahead_compactions_ = fs_readahead_;
use_mmap_reads_ = opt.allow_mmap_reads;
use_mmap_writes_ = opt.allow_mmap_writes;
}
virtual ~StorageOptions() {}
bool UseOsBuffer() const { return data_in_os_; };
bool UseReadahead() const { return fs_readahead_; };
bool UseMmapReads() const { return use_mmap_reads_; }
bool UseMmapWrites() const { return use_mmap_writes_; }
bool UseReadaheadCompactions() const { return readahead_compactions_;}
private:
bool data_in_os_;
bool fs_readahead_;
bool readahead_compactions_;
bool use_mmap_reads_;
bool use_mmap_writes_;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_STORAGE_OPTIONS_H_

View File

@ -37,13 +37,14 @@ class ErrorEnv : public EnvWrapper {
num_writable_file_errors_(0) { }
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result) {
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return Status::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, result);
return target()->NewWritableFile(fname, result, soptions);
}
};