From ad96563b79f477568aebf96a40d726c795c54e5c Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 14 Mar 2013 17:00:04 -0700 Subject: [PATCH] Ability to configure bufferedio-reads, filesystem-readaheads and mmap-read-write per database. Summary: This patch allows an application to specify whether to use bufferedio, reads-via-mmaps and writes-via-mmaps per database. Earlier, there was a global static variable that was used to configure this functionality. The default setting remains the same (and is backward compatible): 1. use bufferedio 2. do not use mmaps for reads 3. use mmap for writes 4. use readaheads for reads needed for compaction I also added a parameter to db_bench to be able to explicitly specify whether to do readaheads for compactions or not. Test Plan: make check Reviewers: sheki, heyongqiang, MarkCallaghan Reviewed By: sheki CC: leveldb Differential Revision: https://reviews.facebook.net/D9429 --- db/builder.cc | 4 +- db/builder.h | 2 + db/db_bench.cc | 46 ++++++++++++---- db/db_impl.cc | 34 +++++++----- db/db_impl.h | 3 ++ db/db_test.cc | 22 +++++--- db/repair.cc | 12 +++-- db/table_cache.cc | 16 +++--- db/table_cache.h | 10 +++- db/transaction_log_iterator_impl.cc | 8 +-- db/transaction_log_iterator_impl.h | 3 ++ db/version_set.cc | 32 ++++++++---- db/version_set.h | 15 +++++- hdfs/env_hdfs.h | 9 ++-- helpers/memenv/memenv.cc | 9 ++-- helpers/memenv/memenv_test.cc | 24 +++++---- include/leveldb/env.h | 47 +++++++++++++---- include/leveldb/options.h | 19 +++++++ include/leveldb/table.h | 5 +- table/table.cc | 10 +++- table/table_test.cc | 5 +- table/two_level_iterator.cc | 19 ++++--- table/two_level_iterator.h | 5 +- tools/manifest_dump.cc | 6 ++- tools/sst_dump.cc | 6 ++- util/env.cc | 7 ++- util/env_hdfs.cc | 3 +- util/env_posix.cc | 81 ++++++++++++++++++++--------- util/env_test.cc | 20 ++++--- util/ldb_cmd.cc | 13 +++-- util/options.cc | 14 ++++- util/storage_options.h | 63 ++++++++++++++++++++++ util/testutil.h | 5 +- 33 files changed, 429 insertions(+), 148 deletions(-) create mode 100644 util/storage_options.h diff --git a/db/builder.cc b/db/builder.cc index 1de062e5b..450fcf051 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -17,6 +17,7 @@ namespace leveldb { Status BuildTable(const std::string& dbname, Env* env, const Options& options, + const StorageOptions& soptions, TableCache* table_cache, Iterator* iter, FileMetaData* meta, @@ -38,7 +39,7 @@ Status BuildTable(const std::string& dbname, std::string fname = TableFileName(dbname, meta->number); if (iter->Valid()) { unique_ptr file; - s = env->NewWritableFile(fname, &file); + s = env->NewWritableFile(fname, &file, soptions); if (!s.ok()) { return s; } @@ -118,6 +119,7 @@ Status BuildTable(const std::string& dbname, if (s.ok()) { // Verify that the table is usable Iterator* it = table_cache->NewIterator(ReadOptions(), + soptions, meta->number, meta->file_size); s = it->status(); diff --git a/db/builder.h b/db/builder.h index e4abc7eaa..3c076eafa 100644 --- a/db/builder.h +++ b/db/builder.h @@ -8,6 +8,7 @@ #include "leveldb/comparator.h" #include "leveldb/status.h" #include "leveldb/types.h" +#include "util/storage_options.h" namespace leveldb { @@ -27,6 +28,7 @@ class VersionEdit; extern Status BuildTable(const std::string& dbname, Env* env, const Options& options, + const StorageOptions& soptions, TableCache* table_cache, Iterator* iter, FileMetaData* meta, diff --git a/db/db_bench.cc b/db/db_bench.cc index 275dfdec7..db0757b05 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -269,10 +269,20 @@ static int FLAGS_source_compaction_factor = 1; // Set the TTL for the WAL Files. static uint64_t FLAGS_WAL_ttl_seconds = 0; -extern bool useOsBuffer; -extern bool useFsReadAhead; -extern bool useMmapRead; -extern bool useMmapWrite; +// Allow buffered io using OS buffers +static bool FLAGS_use_os_buffer; + +// Allow filesystem to do read-aheads +static bool FLAGS_use_fsreadahead; + +// Allow reads to occur via mmap-ing files +static bool FLAGS_use_mmap_reads; + +// Allow writes to occur via mmap-ing files +static bool FLAGS_use_mmap_writes; + +// Allow readaheads to occur for compactions +static bool FLAGS_use_readahead_compactions; namespace leveldb { @@ -1099,6 +1109,13 @@ unique_ptr GenerateKeyFromInt(int v) FLAGS_max_grandparent_overlap_factor; options.disable_auto_compactions = FLAGS_disable_auto_compactions; options.source_compaction_factor = FLAGS_source_compaction_factor; + + // fill storage options + options.allow_os_buffer = FLAGS_use_os_buffer; + options.allow_readahead = FLAGS_use_fsreadahead; + options.allow_mmap_reads = FLAGS_use_mmap_reads; + options.allow_mmap_writes = FLAGS_use_mmap_writes; + options.allow_readahead_compactions = FLAGS_use_readahead_compactions; Status s; if(FLAGS_read_only) { s = DB::OpenForReadOnly(options, FLAGS_db, &db_); @@ -1629,9 +1646,10 @@ unique_ptr GenerateKeyFromInt(int v) void HeapProfile() { char fname[100]; + StorageOptions soptions; snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_); unique_ptr file; - Status s = FLAGS_env->NewWritableFile(fname, &file); + Status s = FLAGS_env->NewWritableFile(fname, &file, soptions); if (!s.ok()) { fprintf(stderr, "%s\n", s.ToString().c_str()); return; @@ -1654,6 +1672,13 @@ int main(int argc, char** argv) { leveldb::Options().max_background_compactions; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; + FLAGS_use_os_buffer = leveldb::StorageOptions().UseOsBuffer(); + FLAGS_use_fsreadahead = leveldb::StorageOptions().UseReadahead(); + FLAGS_use_mmap_reads = leveldb::StorageOptions().UseMmapReads(); + FLAGS_use_mmap_writes = leveldb::StorageOptions().UseMmapWrites(); + FLAGS_use_readahead_compactions = + leveldb::StorageOptions().UseReadaheadCompactions(); + std::string default_db_path; for (int i = 1; i < argc; i++) { @@ -1730,16 +1755,19 @@ int main(int argc, char** argv) { FLAGS_verify_checksum = n; } else if (sscanf(argv[i], "--bufferedio=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { - useOsBuffer = n; + FLAGS_use_os_buffer = n; } else if (sscanf(argv[i], "--mmap_read=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { - useMmapRead = n; + FLAGS_use_mmap_reads = n; } else if (sscanf(argv[i], "--mmap_write=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { - useMmapWrite = n; + FLAGS_use_mmap_writes = n; } else if (sscanf(argv[i], "--readahead=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { - useFsReadAhead = n; + FLAGS_use_fsreadahead = n; + } else if (sscanf(argv[i], "--readahead_compactions=%d%c", &n, &junk) == 1&& + (n == 0 || n == 1)) { + FLAGS_use_readahead_compactions = n; } else if (sscanf(argv[i], "--statistics=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { if (n == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 243fc70fe..8e49af590 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -162,7 +162,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) started_at_(options.env->NowMicros()), flush_on_destroy_(false), delayed_writes_(0), - last_flushed_sequence_(0) { + last_flushed_sequence_(0), + storage_options_(options) { mem_->Ref(); @@ -175,10 +176,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options_.max_open_files - 10; - table_cache_.reset(new TableCache(dbname_, &options_, table_cache_size)); + table_cache_.reset(new TableCache(dbname_, &options_, + storage_options_, table_cache_size)); - versions_.reset(new VersionSet(dbname_, &options_, table_cache_.get(), - &internal_comparator_)); + versions_.reset(new VersionSet(dbname_, &options_, storage_options_, + table_cache_.get(), &internal_comparator_)); dumpLeveldbBuildVersion(options_.info_log.get()); options_.Dump(options_.info_log.get()); @@ -262,7 +264,7 @@ Status DBImpl::NewDB() { const std::string manifest = DescriptorFileName(dbname_, 1); unique_ptr file; - Status s = env_->NewWritableFile(manifest, &file); + Status s = env_->NewWritableFile(manifest, &file, storage_options_); if (!s.ok()) { return s; } @@ -590,7 +592,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // Open the log file std::string fname = LogFileName(dbname_, log_number); unique_ptr file; - Status status = env_->NewSequentialFile(fname, &file); + Status status = env_->NewSequentialFile(fname, &file, storage_options_); if (!status.ok()) { MaybeIgnoreError(&status); return status; @@ -683,7 +685,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta, + s = BuildTable(dbname_, env_, options_, storage_options_, + table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, earliest_seqno_in_memtable); mutex_.Lock(); @@ -734,7 +737,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta, + s = BuildTable(dbname_, env_, options_, storage_options_, + table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, earliest_seqno_in_memtable); mutex_.Lock(); @@ -905,6 +909,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, iter->reset( new TransactionLogIteratorImpl(dbname_, &options_, + storage_options_, seq, probableWALFiles, &last_flushed_sequence_)); @@ -1010,7 +1015,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname, }; unique_ptr file; - Status status = env_->NewSequentialFile(fname, &file); + Status status = env_->NewSequentialFile(fname, &file, storage_options_); if (!status.ok()) { return status; @@ -1395,7 +1400,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Make the output file std::string fname = TableFileName(dbname_, file_number); - Status s = env_->NewWritableFile(fname, &compact->outfile); + Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); // Over-estimate slightly so we don't end up just barely crossing // the threshold. @@ -1447,6 +1452,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable Iterator* iter = table_cache_->NewIterator(ReadOptions(), + storage_options_, output_number, current_bytes); s = iter->status(); @@ -1842,7 +1848,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, } // Collect iterators for files in L0 - Ln - versions_->current()->AddIterators(options, &list); + versions_->current()->AddIterators(options, storage_options_, &list); Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); @@ -2172,7 +2178,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); unique_ptr lfile; - s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); + s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile, + storage_options_); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); @@ -2369,6 +2376,7 @@ DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = nullptr; + StorageOptions soptions; if (options.block_cache != nullptr && options.no_block_cache) { return Status::InvalidArgument( @@ -2387,7 +2395,7 @@ Status DB::Open(const Options& options, const std::string& dbname, uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), - &lfile); + &lfile, soptions); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * options.write_buffer_size); edit.SetLogNumber(new_log_number); diff --git a/db/db_impl.h b/db/db_impl.h index 92ad211d5..34e27c966 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -329,6 +329,9 @@ class DBImpl : public DB { // Used by transaction log iterator. SequenceNumber last_flushed_sequence_; + // The options to access storage files + const StorageOptions storage_options_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_test.cc b/db/db_test.cc index a9c83a7a4..e092384af 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -19,6 +19,7 @@ #include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" +#include "util/storage_options.h" namespace leveldb { @@ -100,7 +101,8 @@ class SpecialEnv : public EnvWrapper { manifest_write_error_.Release_Store(nullptr); } - Status NewWritableFile(const std::string& f, unique_ptr* r) { + Status NewWritableFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) { class SSTableFile : public WritableFile { private: SpecialEnv* env_; @@ -157,7 +159,7 @@ class SpecialEnv : public EnvWrapper { return Status::IOError("simulated write error"); } - Status s = target()->NewWritableFile(f, r); + Status s = target()->NewWritableFile(f, r, soptions); if (s.ok()) { if (strstr(f.c_str(), ".sst") != nullptr) { r->reset(new SSTableFile(this, std::move(*r))); @@ -169,7 +171,8 @@ class SpecialEnv : public EnvWrapper { } Status NewRandomAccessFile(const std::string& f, - unique_ptr* r) { + unique_ptr* r, + const EnvOptions& soptions) { class CountingFile : public RandomAccessFile { private: unique_ptr target_; @@ -186,7 +189,7 @@ class SpecialEnv : public EnvWrapper { } }; - Status s = target()->NewRandomAccessFile(f, r); + Status s = target()->NewRandomAccessFile(f, r, soptions); if (s.ok() && count_random_reads_) { r->reset(new CountingFile(std::move(*r), &random_read_counter_)); } @@ -564,7 +567,8 @@ TEST(DBTest, LevelLimitReopen) { TEST(DBTest, Preallocation) { const std::string src = dbname_ + "/alloc_test"; unique_ptr srcfile; - ASSERT_OK(env_->NewWritableFile(src, &srcfile)); + const StorageOptions soptions; + ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions)); srcfile->SetPreallocationBlockSize(1024 * 1024); // No writes should mean no preallocation @@ -2307,6 +2311,7 @@ TEST(DBTest, BloomFilter) { TEST(DBTest, SnapshotFiles) { Options options = CurrentOptions(); + const StorageOptions soptions; options.write_buffer_size = 100000000; // Large write buffer Reopen(&options); @@ -2360,9 +2365,9 @@ TEST(DBTest, SnapshotFiles) { } } unique_ptr srcfile; - ASSERT_OK(env_->NewSequentialFile(src, &srcfile)); + ASSERT_OK(env_->NewSequentialFile(src, &srcfile, soptions)); unique_ptr destfile; - ASSERT_OK(env_->NewWritableFile(dest, &destfile)); + ASSERT_OK(env_->NewWritableFile(dest, &destfile, soptions)); char buffer[4096]; Slice slice; @@ -3122,7 +3127,8 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKeyComparator cmp(BytewiseComparator()); Options options; - VersionSet vset(dbname, &options, nullptr, &cmp); + StorageOptions sopt; + VersionSet vset(dbname, &options, sopt, nullptr, &cmp); ASSERT_OK(vset.Recover()); VersionEdit vbase(vset.NumberLevels()); uint64_t fnum = 1; diff --git a/db/repair.cc b/db/repair.cc index b991f833e..b505b1a99 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -52,7 +52,7 @@ class Repairer { options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)), next_file_number_(1) { // TableCache can be small since we expect each table to be opened once. - table_cache_ = new TableCache(dbname_, &options_, 10); + table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10); edit_ = new VersionEdit(options.num_levels); } @@ -105,6 +105,7 @@ class Repairer { std::vector logs_; std::vector tables_; uint64_t next_file_number_; + const StorageOptions storage_options_; Status FindFiles() { std::vector filenames; @@ -169,7 +170,7 @@ class Repairer { // Open the log file std::string logname = LogFileName(dbname_, log); unique_ptr lfile; - Status status = env_->NewSequentialFile(logname, &lfile); + Status status = env_->NewSequentialFile(logname, &lfile, storage_options_); if (!status.ok()) { return status; } @@ -216,7 +217,8 @@ class Repairer { FileMetaData meta; meta.number = next_file_number_++; Iterator* iter = mem->NewIterator(); - status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, + status = BuildTable(dbname_, env_, options_, storage_options_, + table_cache_, iter, &meta, icmp_.user_comparator(), 0, 0); delete iter; mem->Unref(); @@ -258,7 +260,7 @@ class Repairer { Status status = env_->GetFileSize(fname, &t->meta.file_size); if (status.ok()) { Iterator* iter = table_cache_->NewIterator( - ReadOptions(), t->meta.number, t->meta.file_size); + ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); bool empty = true; ParsedInternalKey parsed; t->max_sequence = 0; @@ -296,7 +298,7 @@ class Repairer { Status WriteDescriptor() { std::string tmp = TempFileName(dbname_, 1); unique_ptr file; - Status status = env_->NewWritableFile(tmp, &file); + Status status = env_->NewWritableFile(tmp, &file, storage_options_); if (!status.ok()) { return status; } diff --git a/db/table_cache.cc b/db/table_cache.cc index f83578e5b..1269a3871 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -6,7 +6,6 @@ #include "db/filename.h" -#include "leveldb/env.h" #include "leveldb/table.h" #include "leveldb/statistics.h" #include "util/coding.h" @@ -33,10 +32,12 @@ static void UnrefEntry(void* arg1, void* arg2) { TableCache::TableCache(const std::string& dbname, const Options* options, + const StorageOptions& storage_options, int entries) : env_(options->env), dbname_(dbname), options_(options), + storage_options_(storage_options), cache_(NewLRUCache(entries, options->table_cache_numshardbits)) { dbstatistics = options->statistics; } @@ -44,7 +45,8 @@ TableCache::TableCache(const std::string& dbname, TableCache::~TableCache() { } -Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, +Status TableCache::FindTable(const EnvOptions& toptions, + uint64_t file_number, uint64_t file_size, Cache::Handle** handle, bool* tableIO) { Status s; char buf[sizeof(file_number)]; @@ -58,10 +60,10 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, std::string fname = TableFileName(dbname_, file_number); unique_ptr file; unique_ptr table; - s = env_->NewRandomAccessFile(fname, &file); + s = env_->NewRandomAccessFile(fname, &file, toptions); RecordTick(options_->statistics, NO_FILE_OPENS); if (s.ok()) { - s = Table::Open(*options_, std::move(file), file_size, &table); + s = Table::Open(*options_, toptions, std::move(file), file_size, &table); } if (!s.ok()) { @@ -80,6 +82,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, } Iterator* TableCache::NewIterator(const ReadOptions& options, + const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, Table** tableptr) { @@ -88,7 +91,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, } Cache::Handle* handle = nullptr; - Status s = FindTable(file_number, file_size, &handle); + Status s = FindTable(toptions, file_number, file_size, &handle); if (!s.ok()) { return NewErrorIterator(s); } @@ -111,7 +114,8 @@ Status TableCache::Get(const ReadOptions& options, void (*saver)(void*, const Slice&, const Slice&, bool), bool* tableIO) { Cache::Handle* handle = nullptr; - Status s = FindTable(file_number, file_size, &handle, tableIO); + Status s = FindTable(storage_options_, file_number, file_size, + &handle, tableIO); if (s.ok()) { Table* t = reinterpret_cast(cache_->Value(handle))->table.get(); diff --git a/db/table_cache.h b/db/table_cache.h index 674493092..6c6ed1e43 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -10,9 +10,11 @@ #include #include #include "db/dbformat.h" +#include "leveldb/env.h" #include "leveldb/cache.h" #include "leveldb/table.h" #include "port/port.h" +#include "util/storage_options.h" namespace leveldb { @@ -20,7 +22,8 @@ class Env; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, int entries); + TableCache(const std::string& dbname, const Options* options, + const StorageOptions& storage_options, int entries); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -31,6 +34,7 @@ class TableCache { // the cache and should not be deleted, and is valid for as long as the // returned iterator is live. Iterator* NewIterator(const ReadOptions& options, + const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, Table** tableptr = nullptr); @@ -52,9 +56,11 @@ class TableCache { Env* const env_; const std::string dbname_; const Options* options_; + const StorageOptions& storage_options_; std::shared_ptr cache_; - Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**, + Status FindTable(const EnvOptions& toptions, + uint64_t file_number, uint64_t file_size, Cache::Handle**, bool* tableIO = nullptr); }; diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc index 70ec8c9db..59f3c2461 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_iterator_impl.cc @@ -6,11 +6,13 @@ namespace leveldb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( const std::string& dbname, const Options* options, + const StorageOptions& soptions, SequenceNumber& seq, std::vector* files, SequenceNumber const * const lastFlushedSequence) : dbname_(dbname), options_(options), + soptions_(soptions), sequenceNumber_(seq), files_(files), started_(false), @@ -36,15 +38,15 @@ Status TransactionLogIteratorImpl::OpenLogFile( Env* env = options_->env; if (logFile.type == kArchivedLogFile) { std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); - return env->NewSequentialFile(fname, file); + return env->NewSequentialFile(fname, file, soptions_); } else { std::string fname = LogFileName(dbname_, logFile.logNumber); - Status status = env->NewSequentialFile(fname, file); + Status status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { // If cannot open file in DB directory. // Try the archive dir, as it could have moved in the meanwhile. fname = ArchivedLogFileName(dbname_, logFile.logNumber); - status = env->NewSequentialFile(fname, file); + status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { return Status::IOError(" Requested file not present in the dir"); } diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h index c1e541e5d..8891abf5c 100644 --- a/db/transaction_log_iterator_impl.h +++ b/db/transaction_log_iterator_impl.h @@ -10,6 +10,7 @@ #include "leveldb/transaction_log_iterator.h" #include "db/log_file.h" #include "db/log_reader.h" +#include "util/storage_options.h" namespace leveldb { @@ -27,6 +28,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { public: TransactionLogIteratorImpl(const std::string& dbname, const Options* options, + const StorageOptions& soptions, SequenceNumber& seqNum, std::vector* files, SequenceNumber const * const lastFlushedSequence); @@ -47,6 +49,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { private: const std::string& dbname_; const Options* options_; + const StorageOptions& soptions_; const uint64_t sequenceNumber_; const std::vector* files_; bool started_; diff --git a/db/version_set.cc b/db/version_set.cc index ba2d68a30..329fcffb2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -178,6 +178,7 @@ class Version::LevelFileNumIterator : public Iterator { static Iterator* GetFileIterator(void* arg, const ReadOptions& options, + const EnvOptions& soptions, const Slice& file_value) { TableCache* cache = reinterpret_cast(arg); if (file_value.size() != 16) { @@ -185,25 +186,28 @@ static Iterator* GetFileIterator(void* arg, Status::Corruption("FileReader invoked with unexpected value")); } else { return cache->NewIterator(options, + soptions, DecodeFixed64(file_value.data()), DecodeFixed64(file_value.data() + 8)); } } Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, + const StorageOptions& soptions, int level) const { return NewTwoLevelIterator( new LevelFileNumIterator(vset_->icmp_, &files_[level]), - &GetFileIterator, vset_->table_cache_, options); + &GetFileIterator, vset_->table_cache_, options, soptions); } void Version::AddIterators(const ReadOptions& options, + const StorageOptions& soptions, std::vector* iters) { // Merge all level zero files together since they may overlap for (size_t i = 0; i < files_[0].size(); i++) { iters->push_back( vset_->table_cache_->NewIterator( - options, files_[0][i]->number, files_[0][i]->file_size)); + options, soptions, files_[0][i]->number, files_[0][i]->file_size)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -211,7 +215,7 @@ void Version::AddIterators(const ReadOptions& options, // lazily. for (int level = 1; level < vset_->NumberLevels(); level++) { if (!files_[level].empty()) { - iters->push_back(NewConcatenatingIterator(options, level)); + iters->push_back(NewConcatenatingIterator(options, soptions, level)); } } } @@ -887,6 +891,7 @@ class VersionSet::Builder { VersionSet::VersionSet(const std::string& dbname, const Options* options, + const StorageOptions& storage_options, TableCache* table_cache, const InternalKeyComparator* cmp) : env_(options->env), @@ -904,7 +909,9 @@ VersionSet::VersionSet(const std::string& dbname, current_(nullptr), compactions_in_progress_(options_->num_levels), current_version_number_(0), - last_observed_manifest_size_(0) { + last_observed_manifest_size_(0), + storage_options_(storage_options), + storage_options_compactions_(storage_options_) { compact_pointer_ = new std::string[options_->num_levels]; Init(options_->num_levels); AppendVersion(new Version(this, current_version_number_++)); @@ -1002,7 +1009,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); unique_ptr descriptor_file; - s = env_->NewWritableFile(new_manifest_file, &descriptor_file); + s = env_->NewWritableFile(new_manifest_file, &descriptor_file, + storage_options_); if (s.ok()) { descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); @@ -1147,7 +1155,7 @@ Status VersionSet::Recover() { std::string dscname = dbname_ + "/" + current; unique_ptr file; - s = env_->NewSequentialFile(dscname, &file); + s = env_->NewSequentialFile(dscname, &file, storage_options_); if (!s.ok()) { return s; } @@ -1269,7 +1277,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, // Open the specified manifest file. unique_ptr file; - Status s = options.env->NewSequentialFile(dscname, &file); + Status s = options.env->NewSequentialFile(dscname, &file, storage_options_); if (!s.ok()) { return s; } @@ -1579,7 +1587,7 @@ bool VersionSet::ManifestContains(const std::string& record) const { std::string fname = DescriptorFileName(dbname_, manifest_file_number_); Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str()); unique_ptr file; - Status s = env_->NewSequentialFile(fname, &file); + Status s = env_->NewSequentialFile(fname, &file, storage_options_); if (!s.ok()) { Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str()); Log(options_->info_log, @@ -1623,7 +1631,8 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { // approximate offset of "ikey" within the table. Table* tableptr; Iterator* iter = table_cache_->NewIterator( - ReadOptions(), files[i]->number, files[i]->file_size, &tableptr); + ReadOptions(), storage_options_, files[i]->number, + files[i]->file_size, &tableptr); if (tableptr != nullptr) { result += tableptr->ApproximateOffsetOf(ikey.Encode()); } @@ -1735,13 +1744,14 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { const std::vector& files = c->inputs_[which]; for (size_t i = 0; i < files.size(); i++) { list[num++] = table_cache_->NewIterator( - options, files[i]->number, files[i]->file_size); + options, storage_options_compactions_, + files[i]->number, files[i]->file_size); } } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), - &GetFileIterator, table_cache_, options); + &GetFileIterator, table_cache_, options, storage_options_); } } } diff --git a/db/version_set.h b/db/version_set.h index 228256d76..7a7c814dc 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -63,7 +63,8 @@ class Version { // Append to *iters a sequence of iterators that will // yield the contents of this Version when merged together. // REQUIRES: This version has been saved (see VersionSet::SaveTo) - void AddIterators(const ReadOptions&, std::vector* iters); + void AddIterators(const ReadOptions&, const StorageOptions& soptions, + std::vector* iters); // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. @@ -136,7 +137,9 @@ class Version { friend class VersionSet; class LevelFileNumIterator; - Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const; + Iterator* NewConcatenatingIterator(const ReadOptions&, + const StorageOptions& soptions, + int level) const; VersionSet* vset_; // VersionSet to which this Version belongs Version* next_; // Next version in linked list @@ -204,6 +207,7 @@ class VersionSet { public: VersionSet(const std::string& dbname, const Options* options, + const StorageOptions& storage_options, TableCache* table_cache, const InternalKeyComparator*); ~VersionSet(); @@ -454,6 +458,13 @@ class VersionSet { // Save us the cost of checking file size twice in LogAndApply uint64_t last_observed_manifest_size_; + // storage options for all reads and writes except compactions + const StorageOptions& storage_options_; + + // storage options used for compactions. This is a copy of + // storage_options_ but with readaheads set to readahead_compactions_. + const StorageOptions storage_options_compactions_; + // No copying allowed VersionSet(const VersionSet&); void operator=(const VersionSet&); diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 964a1cab9..791360918 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -235,15 +235,18 @@ class HdfsEnv : public Env { } virtual Status NewSequentialFile(const std::string& fname, - unique_ptr* result); + unique_ptr* result, + const EnvOptions& options); virtual Status NewRandomAccessFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& options) { return notsup; } virtual Status NewWritableFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& options) { return notsup; } diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index 3ca0be28a..c3e706c59 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -233,7 +233,8 @@ class InMemoryEnv : public EnvWrapper { // Partial implementation of the Env interface. virtual Status NewSequentialFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& soptions) { MutexLock lock(&mutex_); if (file_map_.find(fname) == file_map_.end()) { *result = NULL; @@ -245,7 +246,8 @@ class InMemoryEnv : public EnvWrapper { } virtual Status NewRandomAccessFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& soptions) { MutexLock lock(&mutex_); if (file_map_.find(fname) == file_map_.end()) { *result = NULL; @@ -257,7 +259,8 @@ class InMemoryEnv : public EnvWrapper { } virtual Status NewWritableFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& soptions) { MutexLock lock(&mutex_); if (file_map_.find(fname) != file_map_.end()) { DeleteFileInternal(fname); diff --git a/helpers/memenv/memenv_test.cc b/helpers/memenv/memenv_test.cc index 4a4821660..b58e22dbd 100644 --- a/helpers/memenv/memenv_test.cc +++ b/helpers/memenv/memenv_test.cc @@ -8,6 +8,7 @@ #include "leveldb/db.h" #include "leveldb/env.h" #include "util/testharness.h" +#include "util/storage_options.h" #include #include #include @@ -17,6 +18,7 @@ namespace leveldb { class MemEnvTest { public: Env* env_; + const StorageOptions soptions_; MemEnvTest() : env_(NewMemEnv(Env::Default())) { @@ -40,7 +42,7 @@ TEST(MemEnvTest, Basics) { ASSERT_EQ(0U, children.size()); // Create a file. - ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file)); + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); writable_file.reset(); // Check that the file exists. @@ -52,7 +54,7 @@ TEST(MemEnvTest, Basics) { ASSERT_EQ("f", children[0]); // Write to the file. - ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file)); + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); ASSERT_OK(writable_file->Append("abc")); writable_file.reset(); @@ -71,9 +73,11 @@ TEST(MemEnvTest, Basics) { // Check that opening non-existent file fails. unique_ptr seq_file; unique_ptr rand_file; - ASSERT_TRUE(!env_->NewSequentialFile("/dir/non_existent", &seq_file).ok()); + ASSERT_TRUE(!env_->NewSequentialFile("/dir/non_existent", &seq_file, + soptions_).ok()); ASSERT_TRUE(!seq_file); - ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file).ok()); + ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file, + soptions_).ok()); ASSERT_TRUE(!rand_file); // Check that deleting works. @@ -94,13 +98,13 @@ TEST(MemEnvTest, ReadWrite) { ASSERT_OK(env_->CreateDir("/dir")); - ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file)); + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); ASSERT_OK(writable_file->Append("hello ")); ASSERT_OK(writable_file->Append("world")); writable_file.reset(); // Read sequentially. - ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file)); + ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_)); ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello". ASSERT_EQ(0, result.compare("hello")); ASSERT_OK(seq_file->Skip(1)); @@ -113,7 +117,7 @@ TEST(MemEnvTest, ReadWrite) { ASSERT_EQ(0U, result.size()); // Random reads. - ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file)); + ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_)); ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world". ASSERT_EQ(0, result.compare("world")); ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello". @@ -139,7 +143,7 @@ TEST(MemEnvTest, Misc) { ASSERT_TRUE(!test_dir.empty()); unique_ptr writable_file; - ASSERT_OK(env_->NewWritableFile("/a/b", &writable_file)); + ASSERT_OK(env_->NewWritableFile("/a/b", &writable_file, soptions_)); // These are no-ops, but we test they return success. ASSERT_OK(writable_file->Sync()); @@ -158,14 +162,14 @@ TEST(MemEnvTest, LargeWrite) { } unique_ptr writable_file; - ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file)); + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); ASSERT_OK(writable_file->Append("foo")); ASSERT_OK(writable_file->Append(write_data)); writable_file.reset(); unique_ptr seq_file; Slice result; - ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file)); + ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_)); ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo". ASSERT_EQ(0, result.compare("foo")); diff --git a/include/leveldb/env.h b/include/leveldb/env.h index a8094bbe9..8a27bbede 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -23,6 +23,7 @@ namespace leveldb { class FileLock; +class EnvOptions; class Logger; class RandomAccessFile; class SequentialFile; @@ -51,7 +52,9 @@ class Env { // // The returned file will only be accessed by one thread at a time. virtual Status NewSequentialFile(const std::string& fname, - unique_ptr* result) = 0; + unique_ptr* result, + const EnvOptions& options) + = 0; // Create a brand new random access read-only file with the // specified name. On success, stores a pointer to the new file in @@ -61,7 +64,9 @@ class Env { // // The returned file may be concurrently accessed by multiple threads. virtual Status NewRandomAccessFile(const std::string& fname, - unique_ptr* result) = 0; + unique_ptr* result, + const EnvOptions& options) + = 0; // Create an object that writes to a new file with the specified // name. Deletes any existing file with the same name and creates a @@ -71,7 +76,8 @@ class Env { // // The returned file will only be accessed by one thread at a time. virtual Status NewWritableFile(const std::string& fname, - unique_ptr* result) = 0; + unique_ptr* result, + const EnvOptions& options) = 0; // Returns true iff the named file exists. virtual bool FileExists(const std::string& fname) = 0; @@ -228,7 +234,7 @@ class RandomAccessFile { // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an // ID can be created this function returns the length of the ID and places it - // in "id"; otherwise, this function returns 0, in which case "id" may more + // in "id"; otherwise, this function returns 0, in which case "id" // may not have been modified. // // This function guarantees, for IDs from a given environment, two unique ids @@ -363,6 +369,24 @@ class FileLock { void operator=(const FileLock&); }; +// Options while opening a file to read/write +class EnvOptions { + public: + virtual ~EnvOptions() {} + + // If true, then allow caching of data in environment buffers + virtual bool UseOsBuffer() const = 0; + + // If true, then allow the environment to readahead data + virtual bool UseReadahead() const = 0; + + // If true, then use mmap to read data + virtual bool UseMmapReads() const = 0; + + // If true, then use mmap to write data + virtual bool UseMmapWrites() const = 0; +}; + // Log the specified data to *info_log if info_log is non-nullptr. extern void Log(const shared_ptr& info_log, const char* format, ...) # if defined(__GNUC__) || defined(__clang__) @@ -398,15 +422,18 @@ class EnvWrapper : public Env { // The following text is boilerplate that forwards all methods to target() Status NewSequentialFile(const std::string& f, - unique_ptr* r) { - return target_->NewSequentialFile(f, r); + unique_ptr* r, + const EnvOptions& options) { + return target_->NewSequentialFile(f, r, options); } Status NewRandomAccessFile(const std::string& f, - unique_ptr* r) { - return target_->NewRandomAccessFile(f, r); + unique_ptr* r, + const EnvOptions& options) { + return target_->NewRandomAccessFile(f, r, options); } - Status NewWritableFile(const std::string& f, unique_ptr* r) { - return target_->NewWritableFile(f, r); + Status NewWritableFile(const std::string& f, unique_ptr* r, + const EnvOptions& options) { + return target_->NewWritableFile(f, r, options); } bool FileExists(const std::string& f) { return target_->FileExists(f); } Status GetChildren(const std::string& dir, std::vector* r) { diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 532818ba4..350ed8f44 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -396,6 +396,25 @@ struct Options { // Purge duplicate/deleted keys when a memtable is flushed to storage. // Default: true bool purge_redundant_kvs_while_flush; + + // Data being read from file storage may be buffered in the OS + // Default: true + bool allow_os_buffer; + + // Reading a single block from a file can cause the OS/FS to start + // readaheads of other blocks from the file. Default: true + bool allow_readahead; + + // The reads triggered by compaction allows data to be readahead + // by the OS/FS. This overrides the setting of 'allow_readahead' + // for compaction-reads. Default: true + bool allow_readahead_compactions; + + // Allow the OS to mmap file for reading. Default: false + bool allow_mmap_reads; + + // Allow the OS to mmap file for writing. Default: true + bool allow_mmap_writes; }; // Options that control read operations diff --git a/include/leveldb/table.h b/include/leveldb/table.h index f3f3cd929..816748ac4 100644 --- a/include/leveldb/table.h +++ b/include/leveldb/table.h @@ -8,6 +8,7 @@ #include #include #include "leveldb/iterator.h" +#include "leveldb/env.h" namespace leveldb { @@ -39,6 +40,7 @@ class Table { // // *file must remain live while this Table is in use. static Status Open(const Options& options, + const EnvOptions& soptions, unique_ptr&& file, uint64_t file_size, unique_ptr
* table); @@ -67,7 +69,8 @@ class Table { Rep* rep_; explicit Table(Rep* rep) { rep_ = rep; } - static Iterator* BlockReader(void*, const ReadOptions&, const Slice&); + static Iterator* BlockReader(void*, const ReadOptions&, + const EnvOptions& soptions, const Slice&); static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, bool* didIO); diff --git a/table/table.cc b/table/table.cc index 8e9b02537..8444aa72c 100644 --- a/table/table.cc +++ b/table/table.cc @@ -29,8 +29,12 @@ struct Table::Rep { delete [] filter_data; delete index_block; } + Rep(const EnvOptions& storage_options) : + soptions(storage_options) { + } Options options; + const EnvOptions& soptions; Status status; unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; @@ -62,6 +66,7 @@ void Table::SetupCacheKeyPrefix(Rep* rep) { } Status Table::Open(const Options& options, + const EnvOptions& soptions, unique_ptr&& file, uint64_t size, unique_ptr
* table) { @@ -100,7 +105,7 @@ Status Table::Open(const Options& options, if (s.ok()) { // We've successfully read the footer and the index block: we're // ready to serve requests. - Rep* rep = new Table::Rep; + Rep* rep = new Table::Rep(soptions); rep->options = options; rep->file = std::move(file); rep->metaindex_handle = footer.metaindex_handle(); @@ -260,6 +265,7 @@ Iterator* Table::BlockReader(void* arg, Iterator* Table::BlockReader(void* arg, const ReadOptions& options, + const EnvOptions& soptions, const Slice& index_value) { return BlockReader(arg, options, index_value, nullptr); } @@ -267,7 +273,7 @@ Iterator* Table::BlockReader(void* arg, Iterator* Table::NewIterator(const ReadOptions& options) const { return NewTwoLevelIterator( rep_->index_block->NewIterator(rep_->options.comparator), - &Table::BlockReader, const_cast(this), options); + &Table::BlockReader, const_cast(this), options, rep_->soptions); } Status Table::InternalGet(const ReadOptions& options, const Slice& k, diff --git a/table/table_test.cc b/table/table_test.cc index 4daf78377..2afdd8401 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -257,7 +257,7 @@ class TableConstructor: public Constructor { // Open the table uniq_id_ = cur_uniq_id_++; source_.reset(new StringSource(sink_->contents(), uniq_id_)); - return Table::Open(options, std::move(source_), + return Table::Open(options, soptions, std::move(source_), sink_->contents().size(), &table_); } @@ -271,7 +271,7 @@ class TableConstructor: public Constructor { virtual Status Reopen(const Options& options) { source_.reset(new StringSource(sink_->contents(), uniq_id_)); - return Table::Open(options, std::move(source_), + return Table::Open(options, soptions, std::move(source_), sink_->contents().size(), &table_); } @@ -295,6 +295,7 @@ class TableConstructor: public Constructor { TableConstructor(); static uint64_t cur_uniq_id_; + const StorageOptions soptions; }; uint64_t TableConstructor::cur_uniq_id_ = 1; diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index e64ff7ca7..049ce59be 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -13,7 +13,8 @@ namespace leveldb { namespace { -typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&); +typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, + const EnvOptions& soptions, const Slice&); class TwoLevelIterator: public Iterator { public: @@ -21,7 +22,8 @@ class TwoLevelIterator: public Iterator { Iterator* index_iter, BlockFunction block_function, void* arg, - const ReadOptions& options); + const ReadOptions& options, + const EnvOptions& soptions); virtual ~TwoLevelIterator(); @@ -65,6 +67,7 @@ class TwoLevelIterator: public Iterator { BlockFunction block_function_; void* arg_; const ReadOptions options_; + const EnvOptions& soptions_; Status status_; IteratorWrapper index_iter_; IteratorWrapper data_iter_; // May be nullptr @@ -77,10 +80,12 @@ TwoLevelIterator::TwoLevelIterator( Iterator* index_iter, BlockFunction block_function, void* arg, - const ReadOptions& options) + const ReadOptions& options, + const EnvOptions& soptions) : block_function_(block_function), arg_(arg), options_(options), + soptions_(soptions), index_iter_(index_iter), data_iter_(nullptr) { } @@ -163,7 +168,7 @@ void TwoLevelIterator::InitDataBlock() { // data_iter_ is already constructed with this iterator, so // no need to change anything } else { - Iterator* iter = (*block_function_)(arg_, options_, handle); + Iterator* iter = (*block_function_)(arg_, options_, soptions_, handle); data_block_handle_.assign(handle.data(), handle.size()); SetDataIterator(iter); } @@ -176,8 +181,10 @@ Iterator* NewTwoLevelIterator( Iterator* index_iter, BlockFunction block_function, void* arg, - const ReadOptions& options) { - return new TwoLevelIterator(index_iter, block_function, arg, options); + const ReadOptions& options, + const EnvOptions& soptions) { + return new TwoLevelIterator(index_iter, block_function, arg, + options, soptions); } } // namespace leveldb diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 629ca3452..e1b376638 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -6,6 +6,7 @@ #define STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_ #include "leveldb/iterator.h" +#include "leveldb/env.h" namespace leveldb { @@ -25,9 +26,11 @@ extern Iterator* NewTwoLevelIterator( Iterator* (*block_function)( void* arg, const ReadOptions& options, + const EnvOptions& soptions, const Slice& index_value), void* arg, - const ReadOptions& options); + const ReadOptions& options, + const EnvOptions& soptions); } // namespace leveldb diff --git a/tools/manifest_dump.cc b/tools/manifest_dump.cc index 4cb28c7cb..5f0cf1b98 100644 --- a/tools/manifest_dump.cc +++ b/tools/manifest_dump.cc @@ -17,6 +17,7 @@ #include "table/two_level_iterator.h" #include "util/coding.h" #include "util/logging.h" +#include "util/storage_options.h" static int verbose = 0; static int output_hex = 0; @@ -61,12 +62,13 @@ int main(int argc, char** argv) { } Options options; + StorageOptions sopt; std::string file(manifestfile); std::string dbname("dummy"); - TableCache* tc = new TableCache(dbname, &options, 10); + TableCache* tc = new TableCache(dbname, &options, sopt, 10); const InternalKeyComparator* cmp = new InternalKeyComparator(options.comparator); - VersionSet* versions = new VersionSet(dbname, &options, + VersionSet* versions = new VersionSet(dbname, &options, sopt, tc, cmp); Status s = versions->DumpManifest(options, file, verbose, output_hex); if (!s.ok()) { diff --git a/tools/sst_dump.cc b/tools/sst_dump.cc index 7aeb3f5a0..7297571b2 100644 --- a/tools/sst_dump.cc +++ b/tools/sst_dump.cc @@ -34,6 +34,7 @@ private: uint64_t read_num_; bool verify_checksum_; bool output_hex_; + StorageOptions soptions_; }; SstFileReader::SstFileReader(std::string file_path, @@ -48,13 +49,14 @@ Status SstFileReader::ReadSequential(bool print_kv, uint64_t read_num) unique_ptr
table; Options table_options; unique_ptr file; - Status s = table_options.env->NewRandomAccessFile(file_name_, &file); + Status s = table_options.env->NewRandomAccessFile(file_name_, &file, + soptions_); if(!s.ok()) { return s; } uint64_t file_size; table_options.env->GetFileSize(file_name_, &file_size); - s = Table::Open(table_options, std::move(file), file_size, &table); + s = Table::Open(table_options, soptions_, std::move(file), file_size, &table); if(!s.ok()) { return s; } diff --git a/util/env.cc b/util/env.cc index 0aaa03ae7..830a144bd 100644 --- a/util/env.cc +++ b/util/env.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "leveldb/env.h" +#include "util/storage_options.h" namespace leveldb { @@ -46,7 +47,8 @@ static Status DoWriteStringToFile(Env* env, const Slice& data, const std::string& fname, bool should_sync) { unique_ptr file; - Status s = env->NewWritableFile(fname, &file); + StorageOptions soptions; + Status s = env->NewWritableFile(fname, &file, soptions); if (!s.ok()) { return s; } @@ -71,9 +73,10 @@ Status WriteStringToFileSync(Env* env, const Slice& data, } Status ReadFileToString(Env* env, const std::string& fname, std::string* data) { + StorageOptions soptions; data->clear(); unique_ptr file; - Status s = env->NewSequentialFile(fname, &file); + Status s = env->NewSequentialFile(fname, &file, soptions); if (!s.ok()) { return s; } diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index 4ebefb523..b702e6a22 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -516,7 +516,8 @@ Status HdfsEnv::NewLogger(const std::string& fname, #include "hdfs/env_hdfs.h" namespace leveldb { Status HdfsEnv::NewSequentialFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& options) { return Status::NotSupported("Not compiled with hdfs support"); } } diff --git a/util/env_posix.cc b/util/env_posix.cc index 103cadcf5..3e9f1b529 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -64,10 +64,17 @@ class PosixSequentialFile: public SequentialFile { private: std::string filename_; FILE* file_; + int fd_; + bool use_os_buffer = true; public: - PosixSequentialFile(const std::string& fname, FILE* f) - : filename_(fname), file_(f) { } + PosixSequentialFile(const std::string& fname, FILE* f, + const EnvOptions& options) + : filename_(fname), file_(f) { + fd_ = fileno(f); + assert(!options.UseMmapReads()); + use_os_buffer = options.UseOsBuffer(); + } virtual ~PosixSequentialFile() { fclose(file_); } virtual Status Read(size_t n, Slice* result, char* scratch) { @@ -82,6 +89,11 @@ class PosixSequentialFile: public SequentialFile { s = IOError(filename_, errno); } } + if (!use_os_buffer) { + // we need to fadvise away the entire range of pages because + // we do not want readahead pages to be cached. + posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages + } return s; } @@ -98,14 +110,17 @@ class PosixRandomAccessFile: public RandomAccessFile { private: std::string filename_; int fd_; + bool use_os_buffer = true; public: - PosixRandomAccessFile(const std::string& fname, int fd) + PosixRandomAccessFile(const std::string& fname, int fd, + const EnvOptions& options) : filename_(fname), fd_(fd) { - if (!useFsReadAhead) { - // disable read-aheads + assert(!options.UseMmapReads()); + if (!options.UseReadahead()) { // disable read-aheads posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM); } + use_os_buffer = options.UseOsBuffer(); } virtual ~PosixRandomAccessFile() { close(fd_); } @@ -118,7 +133,7 @@ class PosixRandomAccessFile: public RandomAccessFile { // An error: return a non-ok status s = IOError(filename_, errno); } - if (!useOsBuffer) { + if (!use_os_buffer) { // we need to fadvise away the entire range of pages because // we do not want readahead pages to be cached. posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages @@ -165,8 +180,13 @@ class PosixMmapReadableFile: public RandomAccessFile { public: // base[0,length-1] contains the mmapped contents of the file. - PosixMmapReadableFile(const std::string& fname, void* base, size_t length) - : filename_(fname), mmapped_region_(base), length_(length) { } + PosixMmapReadableFile(const std::string& fname, void* base, size_t length, + const EnvOptions& options) + : filename_(fname), mmapped_region_(base), length_(length) { + assert(options.UseMmapReads()); + assert(options.UseOsBuffer()); + assert(options.UseReadahead()); + } virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); } virtual Status Read(uint64_t offset, size_t n, Slice* result, @@ -259,7 +279,8 @@ class PosixMmapFile : public WritableFile { } public: - PosixMmapFile(const std::string& fname, int fd, size_t page_size) + PosixMmapFile(const std::string& fname, int fd, size_t page_size, + const EnvOptions& options) : filename_(fname), fd_(fd), page_size_(page_size), @@ -271,6 +292,7 @@ class PosixMmapFile : public WritableFile { file_offset_(0), pending_sync_(false) { assert((page_size & (page_size - 1)) == 0); + assert(options.UseMmapWrites()); } @@ -409,7 +431,8 @@ class PosixWritableFile : public WritableFile { bool pending_fsync_; public: - PosixWritableFile(const std::string& fname, int fd, size_t capacity) : + PosixWritableFile(const std::string& fname, int fd, size_t capacity, + const EnvOptions& options) : filename_(fname), fd_(fd), cursize_(0), @@ -418,6 +441,7 @@ class PosixWritableFile : public WritableFile { filesize_(0), pending_sync_(false), pending_fsync_(false) { + assert(!options.UseMmapWrites()); } ~PosixWritableFile() { @@ -585,26 +609,28 @@ class PosixEnv : public Env { } virtual Status NewSequentialFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& options) { result->reset(); FILE* f = fopen(fname.c_str(), "r"); if (f == nullptr) { *result = nullptr; return IOError(fname, errno); } else { - result->reset(new PosixSequentialFile(fname, f)); + result->reset(new PosixSequentialFile(fname, f, options)); return Status::OK(); } } virtual Status NewRandomAccessFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& options) { result->reset(); Status s; int fd = open(fname.c_str(), O_RDONLY); if (fd < 0) { s = IOError(fname, errno); - } else if (useMmapRead && sizeof(void*) >= 8) { + } else if (options.UseMmapReads() && sizeof(void*) >= 8) { // Use of mmap for random reads has been removed because it // kills performance when storage is fast. // Use mmap when virtual address-space is plentiful. @@ -613,39 +639,41 @@ class PosixEnv : public Env { if (s.ok()) { void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0); if (base != MAP_FAILED) { - result->reset(new PosixMmapReadableFile(fname, base, size)); + result->reset(new PosixMmapReadableFile(fname, base, size, options)); } else { s = IOError(fname, errno); } } close(fd); } else { - result->reset(new PosixRandomAccessFile(fname, fd)); + result->reset(new PosixRandomAccessFile(fname, fd, options)); } return s; } virtual Status NewWritableFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& options) { result->reset(); Status s; const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); if (fd < 0) { s = IOError(fname, errno); } else { - if (!checkedDiskForMmap_) { - // this will be executed once in the program's lifetime. - if (useMmapWrite) { + if (options.UseMmapWrites()) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. // do not use mmapWrite on non ext-3/xfs/tmpfs systems. - useMmapWrite = SupportsFastAllocate(fname); + if (!SupportsFastAllocate(fname)) { + forceMmapOff = true; + } + checkedDiskForMmap_ = true; } - checkedDiskForMmap_ = true; } - - if (useMmapWrite) { - result->reset(new PosixMmapFile(fname, fd, page_size_)); + if (options.UseMmapWrites() && !forceMmapOff) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); } else { - result->reset(new PosixWritableFile(fname, fd, 65536)); + result->reset(new PosixWritableFile(fname, fd, 65536, options)); } } return s; @@ -880,6 +908,7 @@ class PosixEnv : public Env { private: bool checkedDiskForMmap_ = false; + bool forceMmapOff = false; // do we override Env options? void PthreadCall(const char* label, int result) { if (result != 0) { diff --git a/util/env_test.cc b/util/env_test.cc index 67dc409c4..b0dff593f 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -8,6 +8,7 @@ #include "port/port.h" #include "util/coding.h" #include "util/testharness.h" +#include "util/storage_options.h" namespace leveldb { @@ -119,21 +120,22 @@ char temp_id[MAX_ID_SIZE]; TEST(EnvPosixTest, RandomAccessUniqueID) { // Create file. + const StorageOptions soptions; std::string fname = test::TmpDir() + "/" + "testfile"; unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fname, &wfile)); + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); unique_ptr file; // Get Unique ID - ASSERT_OK(env_->NewRandomAccessFile(fname, &file)); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); ASSERT_TRUE(id_size > 0); std::string unique_id1(temp_id, id_size); ASSERT_TRUE(IsUniqueIDValid(unique_id1)); // Get Unique ID again - ASSERT_OK(env_->NewRandomAccessFile(fname, &file)); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); ASSERT_TRUE(id_size > 0); std::string unique_id2(temp_id, id_size); @@ -141,7 +143,7 @@ TEST(EnvPosixTest, RandomAccessUniqueID) { // Get Unique ID again after waiting some time. env_->SleepForMicroseconds(1000000); - ASSERT_OK(env_->NewRandomAccessFile(fname, &file)); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); ASSERT_TRUE(id_size > 0); std::string unique_id3(temp_id, id_size); @@ -172,6 +174,7 @@ bool HasPrefix(const std::unordered_set& ss) { TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { // Check whether a bunch of concurrently existing files have unique IDs. + const StorageOptions soptions; // Create the files std::vector fnames; @@ -180,7 +183,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { // Create file. unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile)); + ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions)); } // Collect and check whether the IDs are unique. @@ -188,7 +191,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { for (const std::string fname: fnames) { unique_ptr file; std::string unique_id; - ASSERT_OK(env_->NewRandomAccessFile(fname, &file)); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); ASSERT_TRUE(id_size > 0); unique_id = std::string(temp_id, id_size); @@ -207,6 +210,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { } TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) { + const StorageOptions soptions; std::string fname = test::TmpDir() + "/" + "testfile"; // Check that after file is deleted we don't get same ID again in a new file. @@ -215,14 +219,14 @@ TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) { // Create file. { unique_ptr wfile; - ASSERT_OK(env_->NewWritableFile(fname, &wfile)); + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); } // Get Unique ID std::string unique_id; { unique_ptr file; - ASSERT_OK(env_->NewRandomAccessFile(fname, &file)); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); ASSERT_TRUE(id_size > 0); unique_id = std::string(temp_id, id_size); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index df843cfed..fec05242f 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -577,9 +577,10 @@ leveldb::Options ReduceDBLevelsCommand::PrepareOptionsForOpenDB() { Status ReduceDBLevelsCommand::GetOldNumOfLevels(leveldb::Options& opt, int* levels) { - TableCache tc(db_path_, &opt, 10); + StorageOptions soptions; + TableCache tc(db_path_, &opt, soptions, 10); const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, &tc, &cmp); + VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); // We rely the VersionSet::Recover to tell us the internal data structures // in the db. And the Recover() should never do any change // (like LogAndApply) to the manifest file. @@ -633,9 +634,10 @@ void ReduceDBLevelsCommand::DoCommand() { db_->CompactRange(nullptr, nullptr); CloseDB(); - TableCache tc(db_path_, &opt, 10); + StorageOptions soptions; + TableCache tc(db_path_, &opt, soptions, 10); const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, &tc, &cmp); + VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); // We rely the VersionSet::Recover to tell us the internal data structures // in the db. And the Recover() should never do any change (like LogAndApply) // to the manifest file. @@ -724,7 +726,8 @@ void WALDumperCommand::DoCommand() { unique_ptr file; Env* env_ = Env::Default(); - Status status = env_->NewSequentialFile(wal_file_, &file); + StorageOptions soptions; + Status status = env_->NewSequentialFile(wal_file_, &file, soptions); if (!status.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED("Failed to open WAL file " + status.ToString()); diff --git a/util/options.cc b/util/options.cc index 957ea093c..4888c5714 100644 --- a/util/options.cc +++ b/util/options.cc @@ -60,8 +60,12 @@ Options::Options() disable_auto_compactions(false), WAL_ttl_seconds(0), manifest_preallocation_size(4 * 1024 * 1024), - purge_redundant_kvs_while_flush(true) { - + purge_redundant_kvs_while_flush(true), + allow_os_buffer(true), + allow_readahead(true), + allow_readahead_compactions(true), + allow_mmap_reads(false), + allow_mmap_writes(true) { } void @@ -103,6 +107,12 @@ Options::Dump(Logger* log) const Log(log," Options.keep_log_file_num: %ld", keep_log_file_num); Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); + Log(log," Options.allow_os_buffer: %d", allow_os_buffer); + Log(log," Options.allow_readahead: %d", allow_readahead); + Log(log," Options.allow_mmap_reads: %d", allow_mmap_reads); + Log(log," Options.allow_mmap_writes: %d", allow_mmap_writes); + Log(log," Options.allow_readahead_compactions: %d", + allow_readahead_compactions); Log(log," Options.purge_redundant_kvs_while_flush: %d", purge_redundant_kvs_while_flush); Log(log," Options.compression_opts.window_bits: %d", diff --git a/util/storage_options.h b/util/storage_options.h new file mode 100644 index 000000000..9f3ec8b03 --- /dev/null +++ b/util/storage_options.h @@ -0,0 +1,63 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +#ifndef STORAGE_LEVELDB_UTIL_STORAGE_OPTIONS_H_ +#define STORAGE_LEVELDB_UTIL_STORAGE_OPTIONS_H_ + +#include +#include +#include "leveldb/env.h" +#include "leveldb/options.h" + +namespace leveldb { + +// Environment Options that are used to read files from storage +class StorageOptions : public EnvOptions { + public: + /* implicit */ StorageOptions(const Options& opt) : + data_in_os_(opt.allow_os_buffer), + fs_readahead_(opt.allow_readahead), + readahead_compactions_(opt.allow_readahead_compactions), + use_mmap_reads_(opt.allow_mmap_reads), + use_mmap_writes_(opt.allow_mmap_writes) { + } + + // copy constructor with readaheads set to readahead_compactions_ + StorageOptions(const StorageOptions& opt) { + data_in_os_ = opt.UseOsBuffer(); + fs_readahead_ = opt.UseReadaheadCompactions(); + readahead_compactions_ = opt.UseReadaheadCompactions(); + use_mmap_reads_ = opt.UseMmapReads(); + use_mmap_writes_ = opt.UseMmapWrites(); + } + + // constructor with default options + StorageOptions() { + Options opt; + data_in_os_ = opt.allow_os_buffer; + fs_readahead_ = opt.allow_readahead; + readahead_compactions_ = fs_readahead_; + use_mmap_reads_ = opt.allow_mmap_reads; + use_mmap_writes_ = opt.allow_mmap_writes; + } + + virtual ~StorageOptions() {} + + bool UseOsBuffer() const { return data_in_os_; }; + bool UseReadahead() const { return fs_readahead_; }; + bool UseMmapReads() const { return use_mmap_reads_; } + bool UseMmapWrites() const { return use_mmap_writes_; } + bool UseReadaheadCompactions() const { return readahead_compactions_;} + + private: + bool data_in_os_; + bool fs_readahead_; + bool readahead_compactions_; + bool use_mmap_reads_; + bool use_mmap_writes_; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_UTIL_STORAGE_OPTIONS_H_ diff --git a/util/testutil.h b/util/testutil.h index 8b200db97..dd399af8c 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -37,13 +37,14 @@ class ErrorEnv : public EnvWrapper { num_writable_file_errors_(0) { } virtual Status NewWritableFile(const std::string& fname, - unique_ptr* result) { + unique_ptr* result, + const EnvOptions& soptions) { result->reset(); if (writable_file_error_) { ++num_writable_file_errors_; return Status::IOError(fname, "fake error"); } - return target()->NewWritableFile(fname, result); + return target()->NewWritableFile(fname, result, soptions); } };