From 972f96b3fbae1a4675043bdf4279c9072ad69645 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Thu, 22 Dec 2016 12:51:29 -0800 Subject: [PATCH] direct io write support Summary: rocksdb direct io support ``` [gzh@dev11575.prn2 ~/rocksdb] ./db_bench -benchmarks=fillseq --num=1000000 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 5.0 Date: Wed Nov 23 13:17:43 2016 CPU: 40 * Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz CPUCache: 25600 KB Keys: 16 bytes each Values: 100 bytes each (50 bytes after compression) Entries: 1000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 110.6 MB (estimated) FileSize: 62.9 MB (estimated) Write rate: 0 bytes/second Compression: Snappy Memtablerep: skip_list Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags DB path: [/tmp/rocksdbtest-112628/dbbench] fillseq : 4.393 micros/op 227639 ops/sec; 25.2 MB/s [gzh@dev11575.prn2 ~/roc Closes https://github.com/facebook/rocksdb/pull/1564 Differential Revision: D4241093 Pulled By: lightmark fbshipit-source-id: 98c29e3 --- db/c.cc | 13 +- db/db_impl.cc | 10 +- db/db_test.cc | 14 +- db/db_test_util.cc | 16 ++ db/db_test_util.h | 2 + db/forward_iterator_bench.cc | 2 +- examples/rocksdb_option_file_example.ini | 9 +- include/rocksdb/c.h | 6 +- include/rocksdb/env.h | 38 +-- include/rocksdb/options.h | 32 +-- include/rocksdb/table.h | 6 +- include/rocksdb/utilities/env_librados.h | 110 ++++---- include/rocksdb/utilities/env_mirror.h | 7 +- .../org/rocksdb/benchmark/DbBenchmark.java | 12 +- java/rocksjni/env_options.cc | 79 ++---- java/rocksjni/options.cc | 108 +++++--- java/src/main/java/org/rocksdb/DBOptions.java | 31 ++- .../java/org/rocksdb/DBOptionsInterface.java | 33 ++- java/src/main/java/org/rocksdb/Options.java | 29 +- .../test/java/org/rocksdb/DBOptionsTest.java | 15 +- .../test/java/org/rocksdb/EnvOptionsTest.java | 9 - .../test/java/org/rocksdb/OptionsTest.java | 17 +- port/win/env_win.cc | 26 +- port/win/env_win.h | 8 +- port/win/io_win.cc | 96 +++---- port/win/io_win.h | 255 ++++++++---------- tools/db_bench_tool.cc | 10 +- tools/db_bench_tool_test.cc | 5 +- util/aligned_buffer.h | 2 +- util/db_options.cc | 16 +- util/db_options.h | 2 +- util/env.cc | 2 +- util/env_posix.cc | 190 +++++++------ util/file_reader_writer.cc | 34 +-- util/file_reader_writer.h | 6 +- util/file_reader_writer_test.cc | 10 +- util/io_posix.cc | 48 ++-- util/io_posix.h | 28 +- util/log_write_bench.cc | 3 +- util/memenv.cc | 4 +- util/mock_env.cc | 4 +- util/options.cc | 14 +- util/options_helper.cc | 2 +- util/options_helper.h | 6 +- util/options_settable_test.cc | 2 +- util/options_test.cc | 6 +- util/testutil.cc | 3 +- utilities/backupable/backupable_db.cc | 5 +- utilities/env_mirror.cc | 6 +- 49 files changed, 720 insertions(+), 671 deletions(-) diff --git a/db/c.cc b/db/c.cc index 86c510930..5fdc2a11f 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1719,9 +1719,14 @@ void rocksdb_options_set_manifest_preallocation_size( void rocksdb_options_set_purge_redundant_kvs_while_flush(rocksdb_options_t* opt, unsigned char v) {} -void rocksdb_options_set_allow_os_buffer(rocksdb_options_t* opt, - unsigned char v) { - opt->rep.allow_os_buffer = v; +void rocksdb_options_set_use_direct_reads(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.use_direct_reads = v; +} + +void rocksdb_options_set_use_direct_writes(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.use_direct_writes = v; } void rocksdb_options_set_allow_mmap_reads( @@ -2000,7 +2005,7 @@ rocksdb_ratelimiter_t* rocksdb_ratelimiter_create( void rocksdb_ratelimiter_destroy(rocksdb_ratelimiter_t *limiter) { if (limiter->rep) { - delete limiter->rep; + delete limiter->rep; } delete limiter; } diff --git a/db/db_impl.cc b/db/db_impl.cc index e18599a02..d1810bc88 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -255,11 +255,17 @@ static Status ValidateOptions( "More than four DB paths are not supported yet. "); } - if (db_options.allow_mmap_reads && !db_options.allow_os_buffer) { + if (db_options.allow_mmap_reads && db_options.use_direct_reads) { // Protect against assert in PosixMMapReadableFile constructor return Status::NotSupported( "If memory mapped reads (allow_mmap_reads) are enabled " - "then os caching (allow_os_buffer) must also be enabled. "); + "then direct I/O reads (use_direct_reads) must be disabled. "); + } + + if (db_options.allow_mmap_writes && db_options.use_direct_writes) { + return Status::NotSupported( + "If memory mapped writes (allow_mmap_writes) are enabled " + "then direct I/O writes (use_direct_writes) must be disabled. "); } return Status::OK(); diff --git a/db/db_test.cc b/db/db_test.cc index 7c6907182..f01b508ae 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3396,19 +3396,21 @@ TEST_F(DBTest, TableOptionsSanitizeTest) { TEST_F(DBTest, MmapAndBufferOptions) { Options options = CurrentOptions(); - options.allow_os_buffer = false; + options.use_direct_reads = true; options.allow_mmap_reads = true; ASSERT_NOK(TryReopen(options)); // All other combinations are acceptable - options.allow_os_buffer = true; + options.use_direct_reads = false; ASSERT_OK(TryReopen(options)); - options.allow_os_buffer = false; - options.allow_mmap_reads = false; - ASSERT_OK(TryReopen(options)); + if (IsDirectIOSupported()) { + options.use_direct_reads = true; + options.allow_mmap_reads = false; + ASSERT_OK(TryReopen(options)); + } - options.allow_os_buffer = true; + options.use_direct_reads = false; ASSERT_OK(TryReopen(options)); } #endif diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 9a703aae8..23a495e33 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -478,6 +478,22 @@ Status DBTestBase::TryReopen(const Options& options) { return DB::Open(options, dbname_, &db_); } +bool DBTestBase::IsDirectIOSupported() { + EnvOptions env_options; + env_options.use_mmap_writes = false; + env_options.use_direct_writes = true; + std::string tmp = TempFileName(dbname_, 999); + Status s; + { + unique_ptr file; + s = env_->NewWritableFile(tmp, &file, env_options); + } + if (s.ok()) { + s = env_->DeleteFile(tmp); + } + return s.ok(); +} + Status DBTestBase::Flush(int cf) { if (cf == 0) { return db_->Flush(FlushOptions()); diff --git a/db/db_test_util.h b/db/db_test_util.h index bf58d27ba..ad2b23d4c 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -692,6 +692,8 @@ class DBTestBase : public testing::Test { Status TryReopen(const Options& options); + bool IsDirectIOSupported(); + Status Flush(int cf = 0); Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()); diff --git a/db/forward_iterator_bench.cc b/db/forward_iterator_bench.cc index 0f44a9e44..cd37ddf3c 100644 --- a/db/forward_iterator_bench.cc +++ b/db/forward_iterator_bench.cc @@ -330,7 +330,7 @@ int main(int argc, char** argv) { options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone; options.level0_slowdown_writes_trigger = 99999; options.level0_stop_writes_trigger = 99999; - options.allow_os_buffer = false; + options.use_direct_writes = true; options.write_buffer_size = FLAGS_memtable_size; rocksdb::BlockBasedTableOptions table_options; table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size); diff --git a/examples/rocksdb_option_file_example.ini b/examples/rocksdb_option_file_example.ini index 51098ac0a..61ef92b6d 100644 --- a/examples/rocksdb_option_file_example.ini +++ b/examples/rocksdb_option_file_example.ini @@ -73,12 +73,13 @@ error_if_exists=false recycle_log_file_num=0 skip_log_error_on_recovery=false - allow_mmap_reads=false - allow_os_buffer=true db_log_dir= new_table_reader_for_compaction_inputs=true + allow_mmap_reads=false allow_mmap_writes=false - + use_direct_reads=false + use_direct_writes=false + [CFOptions "default"] compaction_style=kCompactionStyleLevel @@ -127,7 +128,7 @@ write_buffer_size=134217728 disable_auto_compactions=false inplace_update_support=false - + [TableOptions/BlockBasedTable "default"] format_version=2 whole_key_filtering=true diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 29fe4d885..5452fcb42 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -657,12 +657,14 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_manifest_preallocation_size( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_purge_redundant_kvs_while_flush(rocksdb_options_t*, unsigned char); -extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_os_buffer( - rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_reads( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_writes( rocksdb_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_reads( + rocksdb_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_writes( + rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_is_fd_close_on_exec( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_skip_log_error_on_recovery( diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index e24c15d83..3e2458628 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -61,9 +61,6 @@ struct EnvOptions { // construct from Options explicit EnvOptions(const DBOptions& options); - // If true, then allow caching of data in environment buffers - bool use_os_buffer = true; - // If true, then use mmap to read data bool use_mmap_reads = false; @@ -373,8 +370,8 @@ class Env { // OptimizeForManifestWrite will create a new EnvOptions object that is a copy // of the EnvOptions in the parameters, but is optimized for writing manifest // files. Default implementation returns the copy of the same object. - virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options) - const; + virtual EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const; // Returns the status of all threads that belong to the current Env. virtual Status GetThreadList(std::vector* thread_list) { @@ -512,17 +509,15 @@ class WritableFile { } virtual ~WritableFile(); - // Indicates if the class makes use of unbuffered I/O - // If false you must pass aligned buffer to Write() - virtual bool UseOSBuffer() const { - return true; - } + // Indicates if the class makes use of direct IO + // If true you must pass aligned buffer to Write() + virtual bool UseDirectIO() const { return false; } const size_t c_DefaultPageSize = 4 * 1024; // Use the returned alignment value to allocate - // aligned buffer for Write() when UseOSBuffer() - // returns false + // aligned buffer for Write() when UseDirectIO() + // returns true virtual size_t GetRequiredBufferAlignment() const { return c_DefaultPageSize; } @@ -538,7 +533,7 @@ class WritableFile { // the sector. The implementation thus needs to also rewrite the last // partial sector. // Note: PositionAppend does not guarantee moving the file offset after the - // write. A WriteabelFile object must support either Append or + // write. A WritableFile object must support either Append or // PositionedAppend, so the users cannot mix the two. // // PositionedAppend() can only happen on the page/sector boundaries. For that @@ -583,10 +578,6 @@ class WritableFile { return false; } - // Indicates the upper layers if the current WritableFile implementation - // uses direct IO. - virtual bool UseDirectIO() const { return false; } - /* * Change the priority in rate limiter if rate limiting is enabled. * If rate limiting is not enabled, this call has no effect. @@ -695,17 +686,14 @@ class RandomRWFile { RandomRWFile() {} virtual ~RandomRWFile() {} - // Indicates if the class makes use of unbuffered I/O + // Indicates if the class makes use of direct I/O // If false you must pass aligned buffer to Write() - virtual bool UseOSBuffer() const { - return true; - } + virtual bool UseDirectIO() const { return false; } const size_t c_DefaultPageSize = 4 * 1024; - // Use the returned alignment value to allocate - // aligned buffer for Write() when UseOSBuffer() - // returns false + // Use the returned alignment value to allocate aligned + // buffer for Write() when UseDirectIO() returns true virtual size_t GetRequiredBufferAlignment() const { return c_DefaultPageSize; } @@ -722,7 +710,7 @@ class RandomRWFile { virtual void EnableReadAhead() {} // Write bytes in `data` at offset `offset`, Returns Status::OK() on success. - // Pass aligned buffer when UseOSBuffer() returns false. + // Pass aligned buffer when UseDirectIO() returns true. virtual Status Write(uint64_t offset, const Slice& data) = 0; // Read up to `n` bytes starting from offset `offset` and store them in diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 7e6657793..c0ab3edda 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1100,26 +1100,6 @@ struct DBOptions { // large amounts of data (such as xfs's allocsize option). size_t manifest_preallocation_size; - // Hint the OS that it should not buffer disk I/O. Enabling this - // parameter may improve performance but increases pressure on the - // system cache. - // - // The exact behavior of this parameter is platform dependent. - // - // On POSIX systems, after RocksDB reads data from disk it will - // mark the pages as "unneeded". The operating system may - or may not - // - evict these pages from memory, reducing pressure on the system - // cache. If the disk block is requested again this can result in - // additional disk I/O. - // - // On WINDOWS system, files will be opened in "unbuffered I/O" mode - // which means that data read from the disk will not be cached or - // bufferized. The hardware buffer of the devices may however still - // be used. Memory mapped files are not impacted by this parameter. - // - // Default: true - bool allow_os_buffer; - // Allow the OS to mmap file for reading sst tables. Default: false bool allow_mmap_reads; @@ -1128,10 +1108,22 @@ struct DBOptions { // Default: false bool allow_mmap_writes; + // Enable direct I/O mode for read/write + // they may or may not improve performance depending on the use case + // + // Files will be opened in "direct I/O" mode + // which means that data r/w from the disk will not be cached or + // bufferized. The hardware buffer of the devices may however still + // be used. Memory mapped files are not impacted by these parameters. + // Use O_DIRECT for reading file // Default: false bool use_direct_reads; + // Use O_DIRECT for writing file + // Default: false + bool use_direct_writes; + // If false, fallocate() calls are bypassed bool allow_fallocate; diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 9eb5968cf..cc1a65968 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -157,9 +157,9 @@ struct BlockBasedTableOptions { // a SstTable. Instead, buffer in WritableFileWriter will take // care of the flushing when it is full. // - // On Windows, this option helps a lot when unbuffered I/O - // (allow_os_buffer = false) is used, since it avoids small - // unbuffered disk write. + // This option helps a lot when direct I/O writes + // (use_direct_writes = true) is used, since it avoids small + // direct disk write. // // User may also adjust writable_file_max_buffer_size to optimize disk I/O // size. diff --git a/include/rocksdb/utilities/env_librados.h b/include/rocksdb/utilities/env_librados.h index 5c10ea7cc..bd3cdac67 100644 --- a/include/rocksdb/utilities/env_librados.h +++ b/include/rocksdb/utilities/env_librados.h @@ -1,5 +1,7 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_UTILITIES_ENV_LIBRADOS_H #define ROCKSDB_UTILITIES_ENV_LIBRADOS_H @@ -15,17 +17,16 @@ namespace rocksdb { class LibradosWritableFile; class EnvLibrados : public EnvWrapper { -public: + public: // Create a brand new sequentially-readable file with the specified name. // On success, stores a pointer to the new file in *result and returns OK. // On failure stores nullptr in *result and returns non-OK. If the file does // not exist, returns a non-OK status. // // The returned file will only be accessed by one thread at a time. - Status NewSequentialFile( - const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; // Create a brand new random access read-only file with the // specified name. On success, stores a pointer to the new file in @@ -34,10 +35,9 @@ public: // status. // // The returned file may be concurrently accessed by multiple threads. - Status NewRandomAccessFile( - const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; // Create an object that writes to a new file with the specified // name. Deletes any existing file with the same name and creates a @@ -46,17 +46,15 @@ public: // returns non-OK. // // The returned file will only be accessed by one thread at a time. - Status NewWritableFile( - const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; // Reuse an existing file by renaming it and opening it as writable. - Status ReuseWritableFile( - const std::string& fname, - const std::string& old_fname, - std::unique_ptr* result, - const EnvOptions& options); + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override; // Create an object that represents a directory. Will fail if directory // doesn't exist. If the directory exists, it will open the directory @@ -65,47 +63,44 @@ public: // On success, stores a pointer to the new Directory in // *result and returns OK. On failure stores nullptr in *result and // returns non-OK. - Status NewDirectory( - const std::string& name, - std::unique_ptr* result); + Status NewDirectory(const std::string& name, + std::unique_ptr* result) override; // Returns OK if the named file exists. // NotFound if the named file does not exist, // the calling process does not have permission to determine // whether this file exists, or if the path is invalid. // IOError if an IO Error was encountered - Status FileExists(const std::string& fname); + Status FileExists(const std::string& fname) overrdie; // Store in *result the names of the children of the specified directory. // The names are relative to "dir". // Original contents of *results are dropped. - Status GetChildren(const std::string& dir, - std::vector* result); + Status GetChildren(const std::string& dir, std::vector* result); // Delete the named file. - Status DeleteFile(const std::string& fname); + Status DeleteFile(const std::string& fname) override; // Create the specified directory. Returns error if directory exists. - Status CreateDir(const std::string& dirname); + Status CreateDir(const std::string& dirname) override; // Creates directory if missing. Return Ok if it exists, or successful in // Creating. - Status CreateDirIfMissing(const std::string& dirname); + Status CreateDirIfMissing(const std::string& dirname) override; // Delete the specified directory. - Status DeleteDir(const std::string& dirname); + Status DeleteDir(const std::string& dirname) override; // Store the size of fname in *file_size. - Status GetFileSize(const std::string& fname, uint64_t* file_size); + Status GetFileSize(const std::string& fname, uint64_t* file_size) override; // Store the last modification time of fname in *file_mtime. Status GetFileModificationTime(const std::string& fname, - uint64_t* file_mtime); + uint64_t* file_mtime) override; // Rename file src to target. - Status RenameFile(const std::string& src, - const std::string& target); + Status RenameFile(const std::string& src, const std::string& target) override; // Hard Link file src to target. - Status LinkFile(const std::string& src, const std::string& target); + Status LinkFile(const std::string& src, const std::string& target) override; // Lock the specified file. Used to prevent concurrent access to // the same db by multiple processes. On failure, stores nullptr in @@ -129,8 +124,7 @@ public: Status UnlockFile(FileLock* lock); // Get full directory name for this db. - Status GetAbsolutePath(const std::string& db_path, - std::string* output_path); + Status GetAbsolutePath(const std::string& db_path, std::string* output_path); // Generate unique id std::string GenerateUniqueId(); @@ -142,31 +136,29 @@ public: const std::string& config_path, const std::string& db_pool); - explicit EnvLibrados(const std::string& client_name, // first 3 parameters are for RADOS client init - const std::string& cluster_name, - const uint64_t flags, - const std::string& db_name, - const std::string& config_path, - const std::string& db_pool, - const std::string& wal_dir, - const std::string& wal_pool, - const uint64_t write_buffer_size); - ~EnvLibrados() { - _rados.shutdown(); - } -private: + explicit EnvLibrados( + const std::string& client_name, // first 3 parameters are + // for RADOS client init + const std::string& cluster_name, const uint64_t flags, + const std::string& db_name, const std::string& config_path, + const std::string& db_pool, const std::string& wal_dir, + const std::string& wal_pool, const uint64_t write_buffer_size); + ~EnvLibrados() { _rados.shutdown(); } + + private: std::string _client_name; std::string _cluster_name; uint64_t _flags; - std::string _db_name; // get from user, readable string; Also used as db_id for db metadata + std::string _db_name; // get from user, readable string; Also used as db_id + // for db metadata std::string _config_path; - librados::Rados _rados; // RADOS client + librados::Rados _rados; // RADOS client std::string _db_pool_name; - librados::IoCtx _db_pool_ioctx; // IoCtx for connecting db_pool - std::string _wal_dir; // WAL dir path + librados::IoCtx _db_pool_ioctx; // IoCtx for connecting db_pool + std::string _wal_dir; // WAL dir path std::string _wal_pool_name; - librados::IoCtx _wal_pool_ioctx; // IoCtx for connecting wal_pool - uint64_t _write_buffer_size; // WritableFile buffer max size + librados::IoCtx _wal_pool_ioctx; // IoCtx for connecting wal_pool + uint64_t _write_buffer_size; // WritableFile buffer max size /* private function to communicate with rados */ std::string _CreateFid(); @@ -175,10 +167,8 @@ private: Status _RenameFid(const std::string& old_fname, const std::string& new_fname); Status _AddFid(const std::string& fname, const std::string& fid); Status _DelFid(const std::string& fname); - Status _GetSubFnames( - const std::string& dirname, - std::vector * result - ); + Status _GetSubFnames(const std::string& dirname, + std::vector* result); librados::IoCtx* _GetIoctx(const std::string& prefix); friend class LibradosWritableFile; }; diff --git a/include/rocksdb/utilities/env_mirror.h b/include/rocksdb/utilities/env_mirror.h index 091c92a0e..e0ead08a8 100644 --- a/include/rocksdb/utilities/env_mirror.h +++ b/include/rocksdb/utilities/env_mirror.h @@ -15,10 +15,9 @@ // semantics and behavior are correct (in that they match that of an // existing, stable Env, like the default POSIX one). -#ifndef ROCKSDB_LITE +#pragma once -#ifndef STORAGE_ROCKSDB_INCLUDE_UTILITIES_ENVMIRROR_H_ -#define STORAGE_ROCKSDB_INCLUDE_UTLIITIES_ENVMIRROR_H_ +#ifndef ROCKSDB_LITE #include #include @@ -174,6 +173,4 @@ class EnvMirror : public EnvWrapper { } // namespace rocksdb -#endif // STORAGE_ROCKSDB_INCLUDE_UTILITIES_ENVMIRROR_H_ - #endif // ROCKSDB_LITE diff --git a/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java b/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java index 9c3f97a33..926fb7432 100644 --- a/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/benchmark/src/main/java/org/rocksdb/benchmark/DbBenchmark.java @@ -1417,12 +1417,18 @@ public class DbBenchmark { } }, /* TODO(yhchiang): enable the following - bufferedio(rocksdb::EnvOptions().use_os_buffer, - "Allow buffered io using OS buffers.") { + direct_reads(rocksdb::EnvOptions().use_direct_reads, + "Allow direct I/O reads.") { @Override public Object parseValue(String value) { return parseBoolean(value); } - }, + }, + direct_writes(rocksdb::EnvOptions().use_direct_reads, + "Allow direct I/O reads.") { + @Override public Object parseValue(String value) { + return parseBoolean(value); + } + }, */ mmap_read(false, "Allow reads to occur via mmap-ing files.") { diff --git a/java/rocksjni/env_options.cc b/java/rocksjni/env_options.cc index 0618ad33d..cc2dc6f1d 100644 --- a/java/rocksjni/env_options.cc +++ b/java/rocksjni/env_options.cc @@ -49,23 +49,43 @@ void Java_org_rocksdb_EnvOptions_disposeInternal(JNIEnv *env, jobject jobj, /* * Class: org_rocksdb_EnvOptions - * Method: setUseOsBuffer + * Method: setUseDirectReads * Signature: (JZ)V */ -void Java_org_rocksdb_EnvOptions_setUseOsBuffer(JNIEnv *env, jobject jobj, - jlong jhandle, - jboolean use_os_buffer) { - ENV_OPTIONS_SET_BOOL(jhandle, use_os_buffer); +void Java_org_rocksdb_EnvOptions_setUseDirectReads(JNIEnv *env, jobject jobj, + jlong jhandle, + jboolean use_direct_reads) { + ENV_OPTIONS_SET_BOOL(jhandle, use_direct_reads); } /* * Class: org_rocksdb_EnvOptions - * Method: useOsBuffer + * Method: useDirectReads * Signature: (J)Z */ -jboolean Java_org_rocksdb_EnvOptions_useOsBuffer(JNIEnv *env, jobject jobj, - jlong jhandle) { - return ENV_OPTIONS_GET(jhandle, use_os_buffer); +jboolean Java_org_rocksdb_EnvOptions_useDirectReads(JNIEnv *env, jobject jobj, + jlong jhandle) { + return ENV_OPTIONS_GET(jhandle, use_direct_reads); +} + +/* + * Class: org_rocksdb_EnvOptions + * Method: setUseDirectWrites + * Signature: (JZ)V + */ +void Java_org_rocksdb_EnvOptions_setUseDirectWrites( + JNIEnv *env, jobject jobj, jlong jhandle, jboolean use_direct_writes) { + ENV_OPTIONS_SET_BOOL(jhandle, use_direct_writes); +} + +/* + * Class: org_rocksdb_EnvOptions + * Method: useDirectWrites + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_EnvOptions_useDirectWrites(JNIEnv *env, jobject jobj, + jlong jhandle) { + return ENV_OPTIONS_GET(jhandle, use_direct_writes); } /* @@ -110,47 +130,6 @@ jboolean Java_org_rocksdb_EnvOptions_useMmapWrites(JNIEnv *env, jobject jobj, return ENV_OPTIONS_GET(jhandle, use_mmap_writes); } -/* - * Class: org_rocksdb_EnvOptions - * Method: setUseDirectReads - * Signature: (JZ)V - */ -void Java_org_rocksdb_EnvOptions_setUseDirectReads(JNIEnv *env, jobject jobj, - jlong jhandle, - jboolean use_direct_reads) { - ENV_OPTIONS_SET_BOOL(jhandle, use_direct_reads); -} - -/* - * Class: org_rocksdb_EnvOptions - * Method: useDirectReads - * Signature: (J)Z - */ -jboolean Java_org_rocksdb_EnvOptions_useDirectReads(JNIEnv *env, jobject jobj, - jlong jhandle) { - return ENV_OPTIONS_GET(jhandle, use_direct_reads); -} - -/* - * Class: org_rocksdb_EnvOptions - * Method: setUseDirectWrites - * Signature: (JZ)V - */ -void Java_org_rocksdb_EnvOptions_setUseDirectWrites( - JNIEnv *env, jobject jobj, jlong jhandle, jboolean use_direct_writes) { - ENV_OPTIONS_SET_BOOL(jhandle, use_direct_writes); -} - -/* - * Class: org_rocksdb_EnvOptions - * Method: useDirectWrites - * Signature: (J)Z - */ -jboolean Java_org_rocksdb_EnvOptions_useDirectWrites(JNIEnv *env, jobject jobj, - jlong jhandle) { - return ENV_OPTIONS_GET(jhandle, use_direct_writes); -} - /* * Class: org_rocksdb_EnvOptions * Method: setAllowFallocate diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 2722dfc26..9ba02cfef 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -870,27 +870,6 @@ void Java_org_rocksdb_Options_setManifestPreallocationSize( } } -/* - * Class: org_rocksdb_Options - * Method: allowOsBuffer - * Signature: (J)Z - */ -jboolean Java_org_rocksdb_Options_allowOsBuffer( - JNIEnv* env, jobject jobj, jlong jhandle) { - return reinterpret_cast(jhandle)->allow_os_buffer; -} - -/* - * Class: org_rocksdb_Options - * Method: setAllowOsBuffer - * Signature: (JZ)V - */ -void Java_org_rocksdb_Options_setAllowOsBuffer( - JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow_os_buffer) { - reinterpret_cast(jhandle)->allow_os_buffer = - static_cast(allow_os_buffer); -} - /* * Method: setTableFactory * Signature: (JJ)V @@ -943,6 +922,50 @@ void Java_org_rocksdb_Options_setAllowMmapWrites( static_cast(allow_mmap_writes); } +/* + * Class: org_rocksdb_Options + * Method: useDirectReads + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_Options_useDirectReads(JNIEnv* env, jobject jobj, + jlong jhandle) { + return reinterpret_cast(jhandle)->use_direct_reads; +} + +/* + * Class: org_rocksdb_Options + * Method: setUseDirectReads + * Signature: (JZ)V + */ +void Java_org_rocksdb_Options_setUseDirectReads(JNIEnv* env, jobject jobj, + jlong jhandle, + jboolean use_direct_reads) { + reinterpret_cast(jhandle)->use_direct_reads = + static_cast(use_direct_reads); +} + +/* + * Class: org_rocksdb_Options + * Method: useDirectWrites + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_Options_useDirectWrites(JNIEnv* env, jobject jobj, + jlong jhandle) { + return reinterpret_cast(jhandle)->use_direct_writes; +} + +/* + * Class: org_rocksdb_Options + * Method: setUseDirectReads + * Signature: (JZ)V + */ +void Java_org_rocksdb_Options_setUseDirectWrites(JNIEnv* env, jobject jobj, + jlong jhandle, + jboolean use_direct_writes) { + reinterpret_cast(jhandle)->use_direct_writes = + static_cast(use_direct_writes); +} + /* * Class: org_rocksdb_Options * Method: isFdCloseOnExec @@ -4144,23 +4167,46 @@ jlong Java_org_rocksdb_DBOptions_manifestPreallocationSize( /* * Class: org_rocksdb_DBOptions - * Method: setAllowOsBuffer - * Signature: (JZ)V + * Method: useDirectReads + * Signature: (J)Z */ -void Java_org_rocksdb_DBOptions_setAllowOsBuffer( - JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow_os_buffer) { - reinterpret_cast(jhandle)->allow_os_buffer = - static_cast(allow_os_buffer); +jboolean Java_org_rocksdb_DBOptions_useDirectReads(JNIEnv* env, jobject jobj, + jlong jhandle) { + return reinterpret_cast(jhandle)->use_direct_reads; } /* * Class: org_rocksdb_DBOptions - * Method: allowOsBuffer + * Method: setUseDirectReads + * Signature: (JZ)V + */ +void Java_org_rocksdb_DBOptions_setUseDirectReads(JNIEnv* env, jobject jobj, + jlong jhandle, + jboolean use_direct_reads) { + reinterpret_cast(jhandle)->use_direct_reads = + static_cast(use_direct_reads); +} + +/* + * Class: org_rocksdb_DBOptions + * Method: useDirectWrites * Signature: (J)Z */ -jboolean Java_org_rocksdb_DBOptions_allowOsBuffer( - JNIEnv* env, jobject jobj, jlong jhandle) { - return reinterpret_cast(jhandle)->allow_os_buffer; +jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj, + jlong jhandle) { + return reinterpret_cast(jhandle)->use_direct_writes; +} + +/* + * Class: org_rocksdb_DBOptions + * Method: setUseDirectReads + * Signature: (JZ)V + */ +void Java_org_rocksdb_DBOptions_setUseDirectWrites(JNIEnv* env, jobject jobj, + jlong jhandle, + jboolean use_direct_writes) { + reinterpret_cast(jhandle)->use_direct_writes = + static_cast(use_direct_writes); } /* diff --git a/java/src/main/java/org/rocksdb/DBOptions.java b/java/src/main/java/org/rocksdb/DBOptions.java index f8ea1c1b5..411edd7e4 100644 --- a/java/src/main/java/org/rocksdb/DBOptions.java +++ b/java/src/main/java/org/rocksdb/DBOptions.java @@ -457,17 +457,31 @@ public class DBOptions extends RocksObject implements DBOptionsInterface { } @Override - public DBOptions setAllowOsBuffer( - final boolean allowOsBuffer) { + public DBOptions setUseDirectReads( + final boolean useDirectReads) { assert(isOwningHandle()); - setAllowOsBuffer(nativeHandle_, allowOsBuffer); + setUseDirectReads(nativeHandle_, useDirectReads); return this; } @Override - public boolean allowOsBuffer() { + public boolean useDirectReads() { assert(isOwningHandle()); - return allowOsBuffer(nativeHandle_); + return useDirectReads(nativeHandle_); + } + + @Override + public DBOptions setUseDirectWrites( + final boolean useDirectWrites) { + assert(isOwningHandle()); + setUseDirectWrites(nativeHandle_, useDirectWrites); + return this; + } + + @Override + public boolean useDirectWrites() { + assert(isOwningHandle()); + return useDirectWrites(nativeHandle_); } @Override @@ -710,9 +724,10 @@ public long delayedWriteRate(){ private native void setManifestPreallocationSize( long handle, long size) throws IllegalArgumentException; private native long manifestPreallocationSize(long handle); - private native void setAllowOsBuffer( - long handle, boolean allowOsBuffer); - private native boolean allowOsBuffer(long handle); + private native void setUseDirectReads(long handle, boolean useDirectReads); + private native boolean useDirectReads(long handle); + private native void setUseDirectWrites(long handle, boolean useDirectWrites); + private native boolean useDirectWrites(long handle); private native void setAllowMmapReads( long handle, boolean allowMmapReads); private native boolean allowMmapReads(long handle); diff --git a/java/src/main/java/org/rocksdb/DBOptionsInterface.java b/java/src/main/java/org/rocksdb/DBOptionsInterface.java index b93761d64..95f5db4b1 100644 --- a/java/src/main/java/org/rocksdb/DBOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/DBOptionsInterface.java @@ -673,21 +673,38 @@ public interface DBOptionsInterface { long manifestPreallocationSize(); /** - * Data being read from file storage may be buffered in the OS - * Default: true + * Enable the OS to use direct I/O for reading sst tables. + * Default: false * - * @param allowOsBuffer if true, then OS buffering is allowed. + * @param useDirectReads if true, then direct read is enabled * @return the instance of the current Object. */ - Object setAllowOsBuffer(boolean allowOsBuffer); + Object setUseDirectReads(boolean useDirectReads); /** - * Data being read from file storage may be buffered in the OS - * Default: true + * Enable the OS to use direct I/O for reading sst tables. + * Default: false * - * @return if true, then OS buffering is allowed. + * @return if true, then direct reads are enabled */ - boolean allowOsBuffer(); + boolean useDirectReads(); + + /** + * Enable the OS to use direct I/O for writing sst tables. + * Default: false + * + * @param useDirectWrites if true, then direct write is enabled + * @return the instance of the current Object. + */ + Object setUseDirectWrites(boolean useDirectWrites); + + /** + * Enable the OS to use direct I/O for writing sst tables. + * Default: false + * + * @return if true, then direct writes are enabled + */ + boolean useDirectWrites(); /** * Allow the OS to mmap file for reading sst tables. diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 827938c72..22b5d244a 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -530,18 +530,32 @@ public class Options extends RocksObject } @Override - public boolean allowOsBuffer() { + public Options setUseDirectReads(final boolean useDirectReads) { assert(isOwningHandle()); - return allowOsBuffer(nativeHandle_); + setUseDirectReads(nativeHandle_, useDirectReads); + return this; } @Override - public Options setAllowOsBuffer(final boolean allowOsBuffer) { + public boolean useDirectReads() { assert(isOwningHandle()); - setAllowOsBuffer(nativeHandle_, allowOsBuffer); + return useDirectReads(nativeHandle_); + } + + @Override + public Options setUseDirectWrites(final boolean useDirectWrites) { + assert(isOwningHandle()); + setUseDirectWrites(nativeHandle_, useDirectWrites); return this; } + @Override + public boolean useDirectWrites() { + assert(isOwningHandle()); + return useDirectWrites(nativeHandle_); + } + + @Override public boolean allowMmapReads() { assert(isOwningHandle()); @@ -1289,9 +1303,10 @@ public class Options extends RocksObject private native void setManifestPreallocationSize( long handle, long size) throws IllegalArgumentException; private native long manifestPreallocationSize(long handle); - private native void setAllowOsBuffer( - long handle, boolean allowOsBuffer); - private native boolean allowOsBuffer(long handle); + private native void setUseDirectReads(long handle, boolean useDirectReads); + private native boolean useDirectReads(long handle); + private native void setUseDirectWrites(long handle, boolean useDirectWrites); + private native boolean useDirectWrites(long handle); private native void setAllowMmapReads( long handle, boolean allowMmapReads); private native boolean allowMmapReads(long handle); diff --git a/java/src/test/java/org/rocksdb/DBOptionsTest.java b/java/src/test/java/org/rocksdb/DBOptionsTest.java index edee5184d..c1d908ea6 100644 --- a/java/src/test/java/org/rocksdb/DBOptionsTest.java +++ b/java/src/test/java/org/rocksdb/DBOptionsTest.java @@ -281,11 +281,20 @@ public class DBOptionsTest { } @Test - public void allowOsBuffer() { + public void useDirectReads() { try(final DBOptions opt = new DBOptions()) { final boolean boolValue = rand.nextBoolean(); - opt.setAllowOsBuffer(boolValue); - assertThat(opt.allowOsBuffer()).isEqualTo(boolValue); + opt.setUseDirectReads(boolValue); + assertThat(opt.useDirectReads()).isEqualTo(boolValue); + } + } + + @Test + public void useDirectWrites() { + try(final DBOptions opt = new DBOptions()) { + final boolean boolValue = rand.nextBoolean(); + opt.setUseDirectWrites(boolValue); + assertThat(opt.useDirectWrites()).isEqualTo(boolValue); } } diff --git a/java/src/test/java/org/rocksdb/EnvOptionsTest.java b/java/src/test/java/org/rocksdb/EnvOptionsTest.java index acc73998d..35dbc902e 100644 --- a/java/src/test/java/org/rocksdb/EnvOptionsTest.java +++ b/java/src/test/java/org/rocksdb/EnvOptionsTest.java @@ -18,15 +18,6 @@ public class EnvOptionsTest { public static final Random rand = PlatformRandomHelper.getPlatformSpecificRandomFactory(); - @Test - public void useOsBuffer() { - try (final EnvOptions envOptions = new EnvOptions()) { - final boolean boolValue = rand.nextBoolean(); - envOptions.setUseOsBuffer(boolValue); - assertThat(envOptions.useOsBuffer()).isEqualTo(boolValue); - } - } - @Test public void useMmapReads() { try (final EnvOptions envOptions = new EnvOptions()) { diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index 00f59fe0e..be04598b1 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -572,11 +572,20 @@ public class OptionsTest { } @Test - public void allowOsBuffer() { - try (final Options opt = new Options()) { + public void useDirectReads() { + try(final Options opt = new Options()) { final boolean boolValue = rand.nextBoolean(); - opt.setAllowOsBuffer(boolValue); - assertThat(opt.allowOsBuffer()).isEqualTo(boolValue); + opt.setUseDirectReads(boolValue); + assertThat(opt.useDirectReads()).isEqualTo(boolValue); + } + } + + @Test + public void useDirectWrites() { + try(final Options opt = new Options()) { + final boolean boolValue = rand.nextBoolean(); + opt.setUseDirectWrites(boolValue); + assertThat(opt.useDirectWrites()).isEqualTo(boolValue); } } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 134fabf3b..fc0037702 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -7,9 +7,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "port/win/env_win.h" #include -#include #include +#include #include #include // _getpid @@ -25,7 +26,6 @@ #include "port/dirent.h" #include "port/win/win_logger.h" #include "port/win/io_win.h" -#include "port/win/env_win.h" #include "util/iostats_context_imp.h" @@ -148,7 +148,7 @@ Status WinEnvIO::NewRandomAccessFile(const std::string& fname, // Random access is to disable read-ahead as the system reads too much data DWORD fileFlags = FILE_ATTRIBUTE_READONLY; - if (!options.use_os_buffer && !options.use_mmap_reads) { + if (options.use_direct_reads && !options.use_mmap_reads) { fileFlags |= FILE_FLAG_NO_BUFFERING; } else { fileFlags |= FILE_FLAG_RANDOM_ACCESS; @@ -229,8 +229,8 @@ Status WinEnvIO::NewRandomAccessFile(const std::string& fname, } Status WinEnvIO::NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) { + std::unique_ptr* result, + const EnvOptions& options) { const size_t c_BufferCapacity = 64 * 1024; EnvOptions local_options(options); @@ -240,7 +240,7 @@ Status WinEnvIO::NewWritableFile(const std::string& fname, DWORD fileFlags = FILE_ATTRIBUTE_NORMAL; - if (!local_options.use_os_buffer && !local_options.use_mmap_writes) { + if (local_options.use_direct_writes && !local_options.use_mmap_writes) { fileFlags = FILE_FLAG_NO_BUFFERING; } @@ -305,7 +305,7 @@ Status WinEnvIO::NewRandomRWFile(const std::string & fname, DWORD creation_disposition = OPEN_ALWAYS; // Create if necessary or open existing DWORD file_flags = FILE_FLAG_RANDOM_ACCESS; - if (!options.use_os_buffer) { + if (options.use_direct_reads && options.use_direct_writes) { file_flags |= FILE_FLAG_NO_BUFFERING; } @@ -744,11 +744,11 @@ std::string WinEnvIO::TimeToString(uint64_t secondsSince1970) { EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options, const DBOptions& db_options) const { EnvOptions optimized = env_options; - optimized.use_mmap_writes = false; optimized.bytes_per_sync = db_options.wal_bytes_per_sync; - optimized.use_os_buffer = - true; // This is because we flush only whole pages on unbuffered io and + optimized.use_mmap_writes = false; + // This is because we flush only whole pages on unbuffered io and // the last records are not guaranteed to be flushed. + optimized.use_direct_writes = false; // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit // test and make this false @@ -760,7 +760,7 @@ EnvOptions WinEnvIO::OptimizeForManifestWrite( const EnvOptions& env_options) const { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; - optimized.use_os_buffer = true; + optimized.use_direct_writes = false; optimized.fallocate_with_keep_size = true; return optimized; } @@ -914,8 +914,8 @@ Status WinEnv::NewRandomAccessFile(const std::string& fname, } Status WinEnv::NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) { + std::unique_ptr* result, + const EnvOptions& options) { return winenv_io_.NewWritableFile(fname, result, options); } diff --git a/port/win/env_win.h b/port/win/env_win.h index fe890d48a..5a6224865 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -89,8 +89,8 @@ public: const EnvOptions& options); virtual Status NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + std::unique_ptr* result, + const EnvOptions& options); // The returned file will only be accessed by one thread at a time. virtual Status NewRandomRWFile(const std::string& fname, @@ -190,8 +190,8 @@ public: const EnvOptions& options) override; Status NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) override; + std::unique_ptr* result, + const EnvOptions& options) override; // The returned file will only be accessed by one thread at a time. Status NewRandomRWFile(const std::string& fname, diff --git a/port/win/io_win.cc b/port/win/io_win.cc index 43d3ed614..fddbafc77 100644 --- a/port/win/io_win.cc +++ b/port/win/io_win.cc @@ -12,7 +12,6 @@ #include "util/sync_point.h" #include "util/coding.h" #include "util/iostats_context_imp.h" -#include "util/sync_point.h" #include "util/aligned_buffer.h" @@ -158,12 +157,14 @@ size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) { //////////////////////////////////////////////////////////////////////////////////////////////////// // WinMmapReadableFile -WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, - const void* mapped_region, size_t length) - : WinFileData(fileName, hFile, false), - hMap_(hMap), - mapped_region_(mapped_region), - length_(length) {} +WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, + HANDLE hFile, HANDLE hMap, + const void* mapped_region, + size_t length) + : WinFileData(fileName, hFile, false /* use_direct_io */), + hMap_(hMap), + mapped_region_(mapped_region), + length_(length) {} WinMmapReadableFile::~WinMmapReadableFile() { BOOL ret = ::UnmapViewOfFile(mapped_region_); @@ -521,9 +522,8 @@ size_t WinMmapFile::GetUniqueId(char* id, size_t max_size) const { // WinSequentialFile WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f, - const EnvOptions& options) - : WinFileData(fname, f, options.use_os_buffer) -{} + const EnvOptions& options) + : WinFileData(fname, f, options.use_direct_reads) {} WinSequentialFile::~WinSequentialFile() { assert(hFile_ != INVALID_HANDLE_VALUE); @@ -661,8 +661,8 @@ WinRandomAccessImpl::WinRandomAccessImpl(WinFileData* file_base, assert(!options.use_mmap_reads); - // Unbuffered access, use internal buffer for reads - if (!file_base_->UseOSBuffer()) { + // Direct access, use internal buffer for reads + if (file_base_->UseDirectIO()) { // Do not allocate the buffer either until the first request or // until there is a call to allocate a read-ahead buffer buffer_.Alignment(alignment); @@ -683,11 +683,10 @@ Status WinRandomAccessImpl::ReadImpl(uint64_t offset, size_t n, Slice* result, return s; } - // When in unbuffered mode we need to do the following changes: + // When in direct I/O mode we need to do the following changes: // - use our own aligned buffer // - always read at the offset of that is a multiple of alignment - if (!file_base_->UseOSBuffer()) { - + if (file_base_->UseDirectIO()) { uint64_t first_page_start = 0; size_t actual_bytes_toread = 0; size_t bytes_requested = left; @@ -778,10 +777,8 @@ Status WinRandomAccessImpl::ReadImpl(uint64_t offset, size_t n, Slice* result, inline void WinRandomAccessImpl::HintImpl(RandomAccessFile::AccessPattern pattern) { - - if (pattern == RandomAccessFile::SEQUENTIAL && - !file_base_->UseOSBuffer() && - compaction_readahead_size_ > 0) { + if (pattern == RandomAccessFile::SEQUENTIAL && file_base_->UseDirectIO() && + compaction_readahead_size_ > 0) { std::lock_guard lg(buffer_mut_); if (!read_ahead_) { read_ahead_ = true; @@ -798,11 +795,11 @@ void WinRandomAccessImpl::HintImpl(RandomAccessFile::AccessPattern pattern) { /////////////////////////////////////////////////////////////////////////////////////////////////// /// WinRandomAccessFile -WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, - const EnvOptions& options) : - WinFileData(fname, hFile, options.use_os_buffer), - WinRandomAccessImpl(this, alignment, options) { -} +WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, + size_t alignment, + const EnvOptions& options) + : WinFileData(fname, hFile, options.use_direct_reads), + WinRandomAccessImpl(this, alignment, options) {} WinRandomAccessFile::~WinRandomAccessFile() { } @@ -851,7 +848,7 @@ WinWritableImpl::WinWritableImpl(WinFileData* file_data, size_t alignment) Status WinWritableImpl::AppendImpl(const Slice& data) { // Used for buffered access ONLY - assert(file_data_->UseOSBuffer()); + assert(!file_data_->UseDirectIO()); assert(data.size() < std::numeric_limits::max()); Status s; @@ -885,7 +882,7 @@ Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset) } else { assert(size_t(ret) == data.size()); - // For sequential write this would be simple + // For sequential write this would be simple // size extension by data.size() uint64_t write_end = offset + data.size(); if (write_end >= filesize_) { @@ -934,9 +931,8 @@ Status WinWritableImpl::SyncImpl() { // Calls flush buffers if (fsync(file_data_->GetFileHandle()) < 0) { auto lastError = GetLastError(); - s = IOErrorFromWindowsError("fsync failed at Sync() for: " + - file_data_->GetName(), - lastError); + s = IOErrorFromWindowsError( + "fsync failed at Sync() for: " + file_data_->GetName(), lastError); } return s; } @@ -967,21 +963,19 @@ Status WinWritableImpl::AllocateImpl(uint64_t offset, uint64_t len) { //////////////////////////////////////////////////////////////////////////////// /// WinWritableFile -WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, - size_t /* capacity */, const EnvOptions& options) - : WinFileData(fname, hFile, options.use_os_buffer), - WinWritableImpl(this, alignment) { - +WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, + size_t alignment, size_t /* capacity */, + const EnvOptions& options) + : WinFileData(fname, hFile, options.use_direct_writes), + WinWritableImpl(this, alignment) { assert(!options.use_mmap_writes); } WinWritableFile::~WinWritableFile() { } - // Indicates if the class makes use of unbuffered I/O -bool WinWritableFile::UseOSBuffer() const { - return WinFileData::UseOSBuffer(); -} +// Indicates if the class makes use of direct I/O +bool WinWritableFile::UseDirectIO() const { return WinFileData::UseDirectIO(); } size_t WinWritableFile::GetRequiredBufferAlignment() const { return GetAlignement(); @@ -1015,9 +1009,7 @@ Status WinWritableFile::Sync() { return SyncImpl(); } -Status WinWritableFile::Fsync() { - return SyncImpl(); -} +Status WinWritableFile::Fsync() { return SyncImpl(); } uint64_t WinWritableFile::GetFileSize() { return GetFileSizeImpl(); @@ -1034,17 +1026,14 @@ size_t WinWritableFile::GetUniqueId(char* id, size_t max_size) const { ///////////////////////////////////////////////////////////////////////// /// WinRandomRWFile -WinRandomRWFile::WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment, - const EnvOptions& options) : - WinFileData(fname, hFile, options.use_os_buffer), - WinRandomAccessImpl(this, alignment, options), - WinWritableImpl(this, alignment) { +WinRandomRWFile::WinRandomRWFile(const std::string& fname, HANDLE hFile, + size_t alignment, const EnvOptions& options) + : WinFileData(fname, hFile, + options.use_direct_reads && options.use_direct_writes), + WinRandomAccessImpl(this, alignment, options), + WinWritableImpl(this, alignment) {} -} - -bool WinRandomRWFile::UseOSBuffer() const { - return WinFileData::UseOSBuffer(); -} +bool WinRandomRWFile::UseDirectIO() const { return WinFileData::UseDirectIO(); } size_t WinRandomRWFile::GetRequiredBufferAlignment() const { return GetAlignement(); @@ -1062,8 +1051,8 @@ Status WinRandomRWFile::Write(uint64_t offset, const Slice & data) { return PositionedAppendImpl(data, offset); } -Status WinRandomRWFile::Read(uint64_t offset, size_t n, Slice * result, - char * scratch) const { +Status WinRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { return ReadImpl(offset, n, result, scratch); } @@ -1094,4 +1083,3 @@ WinFileLock::~WinFileLock() { } } - diff --git a/port/win/io_win.h b/port/win/io_win.h index 6907aeef1..581370200 100644 --- a/port/win/io_win.h +++ b/port/win/io_win.h @@ -8,17 +8,16 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include -#include - -#include "util/aligned_buffer.h" - -#include #include +#include +#include + +#include "rocksdb/Status.h" +#include "rocksdb/env.h" +#include "util/aligned_buffer.h" #include -#include namespace rocksdb { namespace port { @@ -26,9 +25,9 @@ namespace port { std::string GetWindowsErrSz(DWORD err); inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { - return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL)) ? - Status::NoSpace(context, GetWindowsErrSz(err)) : - Status::IOError(context, GetWindowsErrSz(err)); + return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL)) + ? Status::NoSpace(context, GetWindowsErrSz(err)) + : Status::IOError(context, GetWindowsErrSz(err)); } inline Status IOErrorFromLastWindowsError(const std::string& context) { @@ -36,9 +35,9 @@ inline Status IOErrorFromLastWindowsError(const std::string& context) { } inline Status IOError(const std::string& context, int err_number) { - return (err_number == ENOSPC) ? - Status::NoSpace(context, strerror(err_number)) : - Status::IOError(context, strerror(err_number)); + return (err_number == ENOSPC) + ? Status::NoSpace(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); } // Note the below two do not set errno because they are used only here in this @@ -54,49 +53,34 @@ inline int fsync(HANDLE hFile) { return 0; } -SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, - uint64_t offset); +SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, uint64_t offset); SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset); -Status fallocate(const std::string& filename, HANDLE hFile, - uint64_t to_size); - -Status ftruncate(const std::string& filename, HANDLE hFile, - uint64_t toSize); +Status fallocate(const std::string& filename, HANDLE hFile, uint64_t to_size); +Status ftruncate(const std::string& filename, HANDLE hFile, uint64_t toSize); size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size); class WinFileData { -protected: - + protected: const std::string filename_; HANDLE hFile_; - // There is no equivalent of advising away buffered pages as in posix. - // To implement this flag we would need to do unbuffered reads which + // If ture, the I/O issued would be direct I/O which the buffer // will need to be aligned (not sure there is a guarantee that the buffer // passed in is aligned). - // Hence we currently ignore this flag. It is used only in a few cases - // which should not be perf critical. - // If perf evaluation finds this to be a problem, we can look into - // implementing this. - const bool use_os_buffer_; - -public: + const bool use_direct_io_; + public: // We want this class be usable both for inheritance (prive // or protected) and for containment so __ctor and __dtor public - WinFileData(const std::string& filename, HANDLE hFile, bool use_os_buffer) : - filename_(filename), hFile_(hFile), use_os_buffer_(use_os_buffer) - {} + WinFileData(const std::string& filename, HANDLE hFile, bool use_direct_io) + : filename_(filename), hFile_(hFile), use_direct_io_(use_direct_io) {} - virtual ~WinFileData() { - this->CloseFile(); - } + virtual ~WinFileData() { this->CloseFile(); } bool CloseFile() { - bool result = true; if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) { @@ -111,13 +95,12 @@ public: HANDLE GetFileHandle() const { return hFile_; } - bool UseOSBuffer() const { return use_os_buffer_; } + bool UseDirectIO() const { return use_direct_io_; } WinFileData(const WinFileData&) = delete; WinFileData& operator=(const WinFileData&) = delete; }; - // mmap() based random-access class WinMmapReadableFile : private WinFileData, public RandomAccessFile { HANDLE hMap_; @@ -125,10 +108,10 @@ class WinMmapReadableFile : private WinFileData, public RandomAccessFile { const void* mapped_region_; const size_t length_; -public: + public: // mapped_region_[0,length-1] contains the mmapped contents of the file. WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, - const void* mapped_region, size_t length); + const void* mapped_region, size_t length); ~WinMmapReadableFile(); @@ -136,7 +119,7 @@ public: WinMmapReadableFile& operator=(const WinMmapReadableFile&) = delete; virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override; + char* scratch) const override; virtual Status InvalidateCache(size_t offset, size_t length) override; @@ -148,20 +131,20 @@ public: // file before reading from it, or for log files, the reading code // knows enough to skip zero suffixes. class WinMmapFile : private WinFileData, public WritableFile { -private: + private: HANDLE hMap_; const size_t page_size_; // We flush the mapping view in page_size // increments. We may decide if this is a memory // page size or SSD page size const size_t - allocation_granularity_; // View must start at such a granularity + allocation_granularity_; // View must start at such a granularity - size_t reserved_size_; // Preallocated size + size_t reserved_size_; // Preallocated size - size_t mapping_size_; // The max size of the mapping object + size_t mapping_size_; // The max size of the mapping object // we want to guess the final file size to minimize the remapping - size_t view_size_; // How much memory to map into a view at a time + size_t view_size_; // How much memory to map into a view at a time char* mapped_begin_; // Must begin at the file offset that is aligned with // allocation_granularity_ @@ -184,10 +167,9 @@ private: virtual Status PreallocateInternal(uint64_t spaceToReserve); -public: - + public: WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, - size_t allocation_granularity, const EnvOptions& options); + size_t allocation_granularity, const EnvOptions& options); ~WinMmapFile(); @@ -227,9 +209,9 @@ public: }; class WinSequentialFile : private WinFileData, public SequentialFile { -public: + public: WinSequentialFile(const std::string& fname, HANDLE f, - const EnvOptions& options); + const EnvOptions& options); ~WinSequentialFile(); @@ -244,89 +226,87 @@ public: }; class WinRandomAccessImpl { -protected: - + protected: WinFileData* file_base_; - bool read_ahead_; + bool read_ahead_; const size_t compaction_readahead_size_; const size_t random_access_max_buffer_size_; - mutable std::mutex buffer_mut_; + mutable std::mutex buffer_mut_; mutable AlignedBuffer buffer_; mutable uint64_t - buffered_start_; // file offset set that is currently buffered + buffered_start_; // file offset set that is currently buffered // Override for behavior change when creating a custom env virtual SSIZE_T PositionedReadInternal(char* src, size_t numBytes, - uint64_t offset) const; + uint64_t offset) const; - /* - * The function reads a requested amount of bytes into the specified aligned - * buffer Upon success the function sets the length of the buffer to the - * amount of bytes actually read even though it might be less than actually - * requested. It then copies the amount of bytes requested by the user (left) - * to the user supplied buffer (dest) and reduces left by the amount of bytes - * copied to the user buffer - * - * @user_offset [in] - offset on disk where the read was requested by the user - * @first_page_start [in] - actual page aligned disk offset that we want to - * read from - * @bytes_to_read [in] - total amount of bytes that will be read from disk - * which is generally greater or equal to the amount - * that the user has requested due to the - * either alignment requirements or read_ahead in - * effect. - * @left [in/out] total amount of bytes that needs to be copied to the user - * buffer. It is reduced by the amount of bytes that actually - * copied - * @buffer - buffer to use - * @dest - user supplied buffer - */ + /* + * The function reads a requested amount of bytes into the specified aligned + * buffer Upon success the function sets the length of the buffer to the + * amount of bytes actually read even though it might be less than actually + * requested. It then copies the amount of bytes requested by the user (left) + * to the user supplied buffer (dest) and reduces left by the amount of bytes + * copied to the user buffer + * + * @user_offset [in] - offset on disk where the read was requested by the user + * @first_page_start [in] - actual page aligned disk offset that we want to + * read from + * @bytes_to_read [in] - total amount of bytes that will be read from disk + * which is generally greater or equal to the amount + * that the user has requested due to the + * either alignment requirements or read_ahead in + * effect. + * @left [in/out] total amount of bytes that needs to be copied to the user + * buffer. It is reduced by the amount of bytes that actually + * copied + * @buffer - buffer to use + * @dest - user supplied buffer + */ SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, - AlignedBuffer& buffer, char* dest) const; + size_t bytes_to_read, size_t& left, + AlignedBuffer& buffer, char* dest) const; SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, - char* dest) const; + size_t bytes_to_read, size_t& left, + char* dest) const; SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, - uint64_t first_page_start, - size_t bytes_to_read, size_t& left, - char* dest) const; + uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + char* dest) const; WinRandomAccessImpl(WinFileData* file_base, size_t alignment, - const EnvOptions& options); + const EnvOptions& options); virtual ~WinRandomAccessImpl() {} -public: - + public: WinRandomAccessImpl(const WinRandomAccessImpl&) = delete; WinRandomAccessImpl& operator=(const WinRandomAccessImpl&) = delete; - Status ReadImpl(uint64_t offset, size_t n, Slice* result, - char* scratch) const; + char* scratch) const; void HintImpl(RandomAccessFile::AccessPattern pattern); }; // pread() based random-access -class WinRandomAccessFile : private WinFileData, - protected WinRandomAccessImpl, // Want to be able to override PositionedReadInternal - public RandomAccessFile { - -public: +class WinRandomAccessFile + : private WinFileData, + protected WinRandomAccessImpl, // Want to be able to override + // PositionedReadInternal + public RandomAccessFile { + public: WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, - const EnvOptions& options); + const EnvOptions& options); ~WinRandomAccessFile(); virtual void EnableReadAhead() override; virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override; + char* scratch) const override; virtual bool ShouldForwardRawRequest() const override; @@ -337,7 +317,6 @@ public: virtual size_t GetUniqueId(char* id, size_t max_size) const override; }; - // This is a sequential write class. It has been mimicked (as others) after // the original Posix class. We add support for unbuffered I/O on windows as // well @@ -351,12 +330,11 @@ public: // No padding is required for // buffered access. class WinWritableImpl { -protected: - - WinFileData* file_data_; - const uint64_t alignment_; - uint64_t filesize_; // How much data is actually written disk - uint64_t reservedsize_; // how far we have reserved space + protected: + WinFileData* file_data_; + const uint64_t alignment_; + uint64_t filesize_; // How much data is actually written disk + uint64_t reservedsize_; // how far we have reserved space virtual Status PreallocateInternal(uint64_t spaceToReserve); @@ -368,7 +346,8 @@ protected: Status AppendImpl(const Slice& data); - // Requires that the data is aligned as specified by GetRequiredBufferAlignment() + // Requires that the data is aligned as specified by + // GetRequiredBufferAlignment() Status PositionedAppendImpl(const Slice& data, uint64_t offset); Status TruncateImpl(uint64_t size); @@ -380,7 +359,8 @@ protected: uint64_t GetFileSizeImpl() { // Double accounting now here with WritableFileWriter // and this size will be wrong when unbuffered access is used - // but tests implement their own writable files and do not use WritableFileWrapper + // but tests implement their own writable files and do not use + // WritableFileWrapper // so we need to squeeze a square peg through // a round hole here. return filesize_; @@ -388,32 +368,30 @@ protected: Status AllocateImpl(uint64_t offset, uint64_t len); -public: - + public: WinWritableImpl(const WinWritableImpl&) = delete; WinWritableImpl& operator=(const WinWritableImpl&) = delete; }; - class WinWritableFile : private WinFileData, - protected WinWritableImpl, - public WritableFile { - -public: + protected WinWritableImpl, + public WritableFile { + public: WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, - size_t capacity, const EnvOptions& options); + size_t capacity, const EnvOptions& options); ~WinWritableFile(); - // Indicates if the class makes use of unbuffered I/O + // Indicates if the class makes use of direct I/O // Use PositionedAppend - virtual bool UseOSBuffer() const override; + virtual bool UseDirectIO() const override; virtual size_t GetRequiredBufferAlignment() const override; virtual Status Append(const Slice& data) override; - // Requires that the data is aligned as specified by GetRequiredBufferAlignment() + // Requires that the data is aligned as specified by + // GetRequiredBufferAlignment() virtual Status PositionedAppend(const Slice& data, uint64_t offset) override; // Need to implement this so the file is truncated correctly @@ -437,30 +415,27 @@ public: virtual size_t GetUniqueId(char* id, size_t max_size) const override; }; - class WinRandomRWFile : private WinFileData, - protected WinRandomAccessImpl, - protected WinWritableImpl, - public RandomRWFile { - -public: - + protected WinRandomAccessImpl, + protected WinWritableImpl, + public RandomRWFile { + public: WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment, - const EnvOptions& options); + const EnvOptions& options); ~WinRandomRWFile() {} - // Indicates if the class makes use of unbuffered I/O + // Indicates if the class makes use of direct I/O // If false you must pass aligned buffer to Write() - virtual bool UseOSBuffer() const override; + virtual bool UseDirectIO() const override; - // Use the returned alignment value to allocate - // aligned buffer for Write() when UseOSBuffer() - // returns false + // Use the returned alignment value to allocate aligned + // buffer for Write() when UseDirectIO() returns true virtual size_t GetRequiredBufferAlignment() const override; // Used by the file_reader_writer to decide if the ReadAhead wrapper - // should simply forward the call and do not enact read_ahead buffering or locking. + // should simply forward the call and do not enact read_ahead buffering or + // locking. // The implementation below takes care of reading ahead virtual bool ShouldForwardRawRequest() const override; @@ -469,14 +444,14 @@ public: virtual void EnableReadAhead() override; // Write bytes in `data` at offset `offset`, Returns Status::OK() on success. - // Pass aligned buffer when UseOSBuffer() returns false. + // Pass aligned buffer when UseDirectIO() returns true. virtual Status Write(uint64_t offset, const Slice& data) override; // Read up to `n` bytes starting from offset `offset` and store them in // result, provided `scratch` size should be at least `n`. // Returns Status::OK() on success. virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override; + char* scratch) const override; virtual Status Flush() override; @@ -487,16 +462,15 @@ public: virtual Status Close() override; }; - class WinDirectory : public Directory { -public: + public: WinDirectory() {} virtual Status Fsync() override; }; class WinFileLock : public FileLock { -public: + public: explicit WinFileLock(HANDLE hFile) : hFile_(hFile) { assert(hFile != NULL); assert(hFile != INVALID_HANDLE_VALUE); @@ -504,9 +478,8 @@ public: ~WinFileLock(); -private: + private: HANDLE hFile_; }; - } } diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index d0c8fb0af..38d9053d1 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -771,9 +771,6 @@ DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files" " in MB."); DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size"); -DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer, - "Allow buffered io using OS buffers"); - DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads, "Allow reads to occur via mmap-ing files"); @@ -783,6 +780,9 @@ DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes, DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads, "Use O_DIRECT for reading data"); +DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes, + "Use O_DIRECT for writing data"); + DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open, "Advise random access on table file open"); @@ -2750,6 +2750,7 @@ class Benchmark { options.allow_mmap_reads = FLAGS_mmap_read; options.allow_mmap_writes = FLAGS_mmap_write; options.use_direct_reads = FLAGS_use_direct_reads; + options.use_direct_writes = FLAGS_use_direct_writes; if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset( NewFixedPrefixTransform(FLAGS_prefix_size)); @@ -2951,9 +2952,6 @@ class Benchmark { options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits; // fill storage options - options.allow_os_buffer = FLAGS_bufferedio; - options.allow_mmap_reads = FLAGS_mmap_read; - options.allow_mmap_writes = FLAGS_mmap_write; options.advise_random_on_open = FLAGS_advise_random_on_open; options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e; options.use_adaptive_mutex = FLAGS_use_adaptive_mutex; diff --git a/tools/db_bench_tool_test.cc b/tools/db_bench_tool_test.cc index 978334174..025556d4b 100644 --- a/tools/db_bench_tool_test.cc +++ b/tools/db_bench_tool_test.cc @@ -208,13 +208,14 @@ const std::string options_file_content = R"OPTIONS_FILE( max_background_flushes=1 create_if_missing=true error_if_exists=false - allow_os_buffer=true delayed_write_rate=1048576 manifest_preallocation_size=4194304 + allow_mmap_reads=false allow_mmap_writes=false + use_direct_reads=false + use_direct_writes=false stats_dump_period_sec=600 allow_fallocate=true - allow_mmap_reads=false max_log_file_size=83886080 random_access_max_buffer_size=1048576 advise_random_on_open=true diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 2f79f12f7..14e5d1234 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -24,7 +24,7 @@ inline size_t Roundup(size_t x, size_t y) { } // This class is to manage an aligned user -// allocated buffer for unbuffered I/O purposes +// allocated buffer for direct I/O purposes // though can be used for any purpose. class AlignedBuffer { size_t alignment_; diff --git a/util/db_options.cc b/util/db_options.cc index 123c87af7..8e141d728 100644 --- a/util/db_options.cc +++ b/util/db_options.cc @@ -50,10 +50,10 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) wal_ttl_seconds(options.WAL_ttl_seconds), wal_size_limit_mb(options.WAL_size_limit_MB), manifest_preallocation_size(options.manifest_preallocation_size), - allow_os_buffer(options.allow_os_buffer), allow_mmap_reads(options.allow_mmap_reads), allow_mmap_writes(options.allow_mmap_writes), use_direct_reads(options.use_direct_reads), + use_direct_writes(options.use_direct_writes), allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), stats_dump_period_sec(options.stats_dump_period_sec), @@ -119,16 +119,16 @@ void ImmutableDBOptions::Dump(Logger* log) const { Header(log, " Options.recycle_log_file_num: %" ROCKSDB_PRIszt, recycle_log_file_num); - Header(log, " Options.allow_os_buffer: %d", - allow_os_buffer); - Header(log, " Options.allow_mmap_reads: %d", - allow_mmap_reads); Header(log, " Options.allow_fallocate: %d", allow_fallocate); + Header(log, " Options.allow_mmap_reads: %d", + allow_mmap_reads); Header(log, " Options.allow_mmap_writes: %d", allow_mmap_writes); Header(log, " Options.use_direct_reads: %d", use_direct_reads); + Header(log, " Options.use_direct_writes: %d", + use_direct_writes); Header(log, " Options.create_missing_column_families: %d", create_missing_column_families); Header(log, " Options.db_log_dir: %s", @@ -148,12 +148,6 @@ void ImmutableDBOptions::Dump(Logger* log) const { Header(log, " Options.manifest_preallocation_size: %" ROCKSDB_PRIszt, manifest_preallocation_size); - Header(log, " Options.allow_os_buffer: %d", - allow_os_buffer); - Header(log, " Options.allow_mmap_reads: %d", - allow_mmap_reads); - Header(log, " Options.allow_mmap_writes: %d", - allow_mmap_writes); Header(log, " Options.is_fd_close_on_exec: %d", is_fd_close_on_exec); Header(log, " Options.stats_dump_period_sec: %u", diff --git a/util/db_options.h b/util/db_options.h index 728e0dd2d..e50d158a4 100644 --- a/util/db_options.h +++ b/util/db_options.h @@ -46,10 +46,10 @@ struct ImmutableDBOptions { uint64_t wal_ttl_seconds; uint64_t wal_size_limit_mb; size_t manifest_preallocation_size; - bool allow_os_buffer; bool allow_mmap_reads; bool allow_mmap_writes; bool use_direct_reads; + bool use_direct_writes; bool allow_fallocate; bool is_fd_close_on_exec; unsigned int stats_dump_period_sec; diff --git a/util/env.cc b/util/env.cc index 690377adb..7a57f5e86 100644 --- a/util/env.cc +++ b/util/env.cc @@ -313,10 +313,10 @@ EnvWrapper::~EnvWrapper() { namespace { // anonymous namespace void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { - env_options->use_os_buffer = options.allow_os_buffer; env_options->use_mmap_reads = options.allow_mmap_reads; env_options->use_mmap_writes = options.allow_mmap_writes; env_options->use_direct_reads = options.use_direct_reads; + env_options->use_direct_writes = options.use_direct_writes; env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; env_options->compaction_readahead_size = options.compaction_readahead_size; diff --git a/util/env_posix.cc b/util/env_posix.cc index bd7596a1d..3f7a13f1c 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -38,6 +38,7 @@ #endif #include #include +#include #include "port/port.h" #include "rocksdb/slice.h" #include "util/coding.h" @@ -250,64 +251,66 @@ class PosixEnv : public Env { result->reset(); Status s; int fd = -1; + int flags = O_CREAT | O_TRUNC; + // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) + if (options.use_direct_writes && !options.use_mmap_writes) { + // Note: we should avoid O_APPEND here due to ta the following bug: + // POSIX requires that opening a file with the O_APPEND flag should + // have no affect on the location at which pwrite() writes data. + // However, on Linux, if a file is opened with O_APPEND, pwrite() + // appends data to the end of the file, regardless of the value of + // offset. + // More info here: https://linux.die.net/man/2/pwrite + flags |= O_WRONLY; +#ifndef OS_MACOSX + flags |= O_DIRECT; +#endif + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + } else if (options.use_mmap_writes) { + // non-direct I/O + flags |= O_RDWR; + } else { + flags |= O_WRONLY; + } + do { IOSTATS_TIMER_GUARD(open_nanos); - fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + fd = open(fname.c_str(), flags, 0644); } while (fd < 0 && errno == EINTR); + if (fd < 0) { s = IOError(fname, errno); - } else { - SetFD_CLOEXEC(fd, &options); - if (options.use_mmap_writes) { - if (!checkedDiskForMmap_) { - // this will be executed once in the program's lifetime. - // do not use mmapWrite on non ext-3/xfs/tmpfs systems. - if (!SupportsFastAllocate(fname)) { - forceMmapOff = true; - } - checkedDiskForMmap_ = true; - } - } - if (options.use_mmap_writes && !forceMmapOff) { - result->reset(new PosixMmapFile(fname, fd, page_size_, options)); - } else if (options.use_direct_writes) { - close(fd); -#ifdef OS_MACOSX - int flags = O_WRONLY | O_APPEND | O_TRUNC | O_CREAT; -#else - // Note: we should avoid O_APPEND here due to ta the following bug: - // POSIX requires that opening a file with the O_APPEND flag should - // have no affect on the location at which pwrite() writes data. - // However, on Linux, if a file is opened with O_APPEND, pwrite() - // appends data to the end of the file, regardless of the value of - // offset. - // More info here: https://linux.die.net/man/2/pwrite - int flags = O_WRONLY | O_TRUNC | O_CREAT | O_DIRECT; -#endif - TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); - fd = open(fname.c_str(), flags, 0644); - if (fd < 0) { - s = IOError(fname, errno); - } else { - std::unique_ptr file( - new PosixDirectIOWritableFile(fname, fd)); - *result = std::move(file); - s = Status::OK(); -#ifdef OS_MACOSX - if (fcntl(fd, F_NOCACHE, 1) == -1) { - close(fd); - s = IOError(fname, errno); - } -#endif - } - } else { - // disable mmap writes - EnvOptions no_mmap_writes_options = options; - no_mmap_writes_options.use_mmap_writes = false; + return s; + } + SetFD_CLOEXEC(fd, &options); - result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + if (options.use_mmap_writes) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. + // do not use mmapWrite on non ext-3/xfs/tmpfs systems. + if (!SupportsFastAllocate(fname)) { + forceMmapOff_ = true; + } + checkedDiskForMmap_ = true; } } + if (options.use_mmap_writes && !forceMmapOff_) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + s = IOError(fname, errno); + return s; + } +#endif + result->reset(new PosixWritableFile(fname, fd, options)); + } else { + // disable mmap writes + EnvOptions no_mmap_writes_options = options; + no_mmap_writes_options.use_mmap_writes = false; + result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + } return s; } @@ -318,40 +321,68 @@ class PosixEnv : public Env { result->reset(); Status s; int fd = -1; + + int flags = 0; + // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) + if (options.use_direct_writes && !options.use_mmap_writes) { + flags |= O_WRONLY; +#ifndef OS_MACOSX + flags |= O_DIRECT; +#endif + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + } else if (options.use_mmap_writes) { + // mmap needs O_RDWR mode + flags |= O_RDWR; + } else { + flags |= O_WRONLY; + } + do { IOSTATS_TIMER_GUARD(open_nanos); - fd = open(old_fname.c_str(), O_RDWR, 0644); + fd = open(old_fname.c_str(), flags, 0644); } while (fd < 0 && errno == EINTR); if (fd < 0) { s = IOError(fname, errno); - } else { - SetFD_CLOEXEC(fd, &options); - // rename into place - if (rename(old_fname.c_str(), fname.c_str()) != 0) { - Status r = IOError(old_fname, errno); - close(fd); - return r; - } - if (options.use_mmap_writes) { - if (!checkedDiskForMmap_) { - // this will be executed once in the program's lifetime. - // do not use mmapWrite on non ext-3/xfs/tmpfs systems. - if (!SupportsFastAllocate(fname)) { - forceMmapOff = true; - } - checkedDiskForMmap_ = true; - } - } - if (options.use_mmap_writes && !forceMmapOff) { - result->reset(new PosixMmapFile(fname, fd, page_size_, options)); - } else { - // disable mmap writes - EnvOptions no_mmap_writes_options = options; - no_mmap_writes_options.use_mmap_writes = false; + return s; + } - result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + SetFD_CLOEXEC(fd, &options); + // rename into place + if (rename(old_fname.c_str(), fname.c_str()) != 0) { + s = IOError(old_fname, errno); + close(fd); + return s; + } + + if (options.use_mmap_writes) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. + // do not use mmapWrite on non ext-3/xfs/tmpfs systems. + if (!SupportsFastAllocate(fname)) { + forceMmapOff_ = true; + } + checkedDiskForMmap_ = true; } } + if (options.use_mmap_writes && !forceMmapOff_) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + s = IOError(fname, errno); + return s; + } +#endif + result->reset(new PosixWritableFile(fname, fd, options)); + } else { + // disable mmap writes + EnvOptions no_mmap_writes_options = options; + no_mmap_writes_options.use_mmap_writes = false; + result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + } + return s; + return s; } @@ -724,6 +755,7 @@ class PosixEnv : public Env { const DBOptions& db_options) const override { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; + optimized.use_direct_writes = false; optimized.bytes_per_sync = db_options.wal_bytes_per_sync; // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit @@ -736,14 +768,14 @@ class PosixEnv : public Env { const EnvOptions& env_options) const override { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; + optimized.use_direct_writes = false; optimized.fallocate_with_keep_size = true; return optimized; } private: bool checkedDiskForMmap_; - bool forceMmapOff; // do we override Env options? - + bool forceMmapOff_; // do we override Env options? // Returns true iff the named directory exists and is a directory. virtual bool DirExists(const std::string& dname) { @@ -784,7 +816,7 @@ class PosixEnv : public Env { PosixEnv::PosixEnv() : checkedDiskForMmap_(false), - forceMmapOff(false), + forceMmapOff_(false), page_size_(getpagesize()), thread_pools_(Priority::TOTAL) { ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 662648f85..34b3aab50 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -61,9 +61,8 @@ Status WritableFileWriter::Append(const Slice& data) { writable_file_->PrepareWrite(static_cast(GetFileSize()), left); } - // Flush only when I/O is buffered - if (use_os_buffer_ && - (buf_.Capacity() - buf_.CurrentSize()) < left) { + // Flush only when buffered I/O + if (!direct_io_ && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (buf_.CurrentSize() > 0) { s = Flush(); if (!s.ok()) { @@ -79,10 +78,10 @@ Status WritableFileWriter::Append(const Slice& data) { assert(buf_.CurrentSize() == 0); } - // We never write directly to disk with unbuffered I/O on. + // We never write directly to disk with direct I/O on. // or we simply use it for its original purpose to accumulate many small // chunks - if (!use_os_buffer_ || (buf_.Capacity() >= left)) { + if (direct_io_ || (buf_.Capacity() >= left)) { while (left > 0) { size_t appended = buf_.Append(src, left); left -= appended; @@ -96,7 +95,7 @@ Status WritableFileWriter::Append(const Slice& data) { // We double the buffer here because // Flush calls do not keep up with the incoming bytes - // This is the only place when buffer is changed with unbuffered I/O + // This is the only place when buffer is changed with direct I/O if (buf_.Capacity() < max_buffer_size_) { size_t desiredCapacity = buf_.Capacity() * 2; desiredCapacity = std::min(desiredCapacity, max_buffer_size_); @@ -132,7 +131,7 @@ Status WritableFileWriter::Close() { s = Flush(); // flush cache to OS - // In unbuffered mode we write whole pages so + // In direct I/O mode we write whole pages so // we need to let the file know where data ends. Status interim = writable_file_->Truncate(filesize_); if (!interim.ok() && s.ok()) { @@ -151,17 +150,18 @@ Status WritableFileWriter::Close() { return s; } -// write out the cached data to the OS cache +// write out the cached data to the OS cache or storage if direct I/O +// enabled Status WritableFileWriter::Flush() { Status s; TEST_KILL_RANDOM("WritableFileWriter::Flush:0", rocksdb_kill_odds * REDUCE_ODDS2); if (buf_.CurrentSize() > 0) { - if (use_os_buffer_) { - s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + if (direct_io_) { + s = WriteDirect(); } else { - s = WriteUnbuffered(); + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); } if (!s.ok()) { return s; @@ -259,8 +259,8 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { if (align) { // Here we may actually require more than burst and block - // but we can not write less than one page at a time on unbuffered - // thus we may want not to use ratelimiter s + // but we can not write less than one page at a time on direct I/O + // thus we may want not to use ratelimiter size_t alignment = buf_.Alignment(); bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); } @@ -273,7 +273,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { // limiter if available Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { Status s; - assert(use_os_buffer_); + assert(!direct_io_); const char* src = data; size_t left = size; @@ -308,10 +308,10 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { // whole number of pages to be written again on the next flush because we can // only write on aligned // offsets. -Status WritableFileWriter::WriteUnbuffered() { +Status WritableFileWriter::WriteDirect() { Status s; - assert(!use_os_buffer_); + assert(direct_io_); const size_t alignment = buf_.Alignment(); assert((next_write_offset_ % alignment) == 0); @@ -339,7 +339,7 @@ Status WritableFileWriter::WriteUnbuffered() { { IOSTATS_TIMER_GUARD(write_nanos); TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); - // Unbuffered writes must be positional + // direct writes must be positional s = writable_file_->PositionedAppend(Slice(src, size), write_offset); if (!s.ok()) { buf_.Size(file_advance + leftover_tail); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 5c3cfd6b3..6f01159db 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -103,7 +103,6 @@ class WritableFileWriter { uint64_t next_write_offset_; bool pending_sync_; const bool direct_io_; - const bool use_os_buffer_; uint64_t last_sync_size_; uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; @@ -118,7 +117,6 @@ class WritableFileWriter { next_write_offset_(0), pending_sync_(false), direct_io_(writable_file_->UseDirectIO()), - use_os_buffer_(writable_file_->UseOSBuffer()), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter) { @@ -156,8 +154,8 @@ class WritableFileWriter { private: // Used when os buffering is OFF and we are writing - // DMA such as in Windows unbuffered mode - Status WriteUnbuffered(); + // DMA such as in Direct I/O mode + Status WriteDirect(); // Normal write Status WriteBuffered(const char* data, size_t size); Status RangeSync(uint64_t offset, uint64_t nbytes); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 367de8b9e..67e97e796 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -88,9 +88,9 @@ TEST_F(WritableFileWriterTest, RangeSync) { TEST_F(WritableFileWriterTest, AppendStatusReturn) { class FakeWF : public WritableFile { public: - explicit FakeWF() : use_os_buffer_(true), io_error_(false) {} + explicit FakeWF() : use_direct_io_(false), io_error_(false) {} - virtual bool UseOSBuffer() const override { return use_os_buffer_; } + virtual bool UseDirectIO() const override { return use_direct_io_; } Status Append(const Slice& data) override { if (io_error_) { return Status::IOError("Fake IO error"); @@ -106,15 +106,15 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) { Status Close() override { return Status::OK(); } Status Flush() override { return Status::OK(); } Status Sync() override { return Status::OK(); } - void SetUseOSBuffer(bool val) { use_os_buffer_ = val; } + void SetUseDirectIO(bool val) { use_direct_io_ = val; } void SetIOError(bool val) { io_error_ = val; } protected: - bool use_os_buffer_; + bool use_direct_io_; bool io_error_; }; unique_ptr wf(new FakeWF()); - wf->SetUseOSBuffer(false); + wf->SetUseDirectIO(true); unique_ptr writer( new WritableFileWriter(std::move(wf), EnvOptions())); diff --git a/util/io_posix.cc b/util/io_posix.cc index 111f899ee..60d96aa64 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -165,7 +165,7 @@ PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* f, : filename_(fname), file_(f), fd_(fileno(f)), - use_os_buffer_(options.use_os_buffer) {} + use_direct_io_(options.use_direct_reads) {} PosixSequentialFile::~PosixSequentialFile() { fclose(file_); } @@ -187,7 +187,7 @@ Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) { s = IOError(filename_, errno); } } - if (!use_os_buffer_) { + if (use_direct_io_) { // we need to fadvise away the entire range of pages because // we do not want readahead pages to be cached. Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages @@ -294,7 +294,7 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { */ PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd, const EnvOptions& options) - : filename_(fname), fd_(fd), use_os_buffer_(options.use_os_buffer) { + : filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads) { assert(!options.use_mmap_reads || sizeof(void*) < 8); } @@ -325,7 +325,8 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, // An error: return a non-ok status s = IOError(filename_, errno); } - if (!use_os_buffer_) { + + if (use_direct_io_) { // we need to fadvise away the entire range of pages because // we do not want readahead pages to be cached. Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages @@ -397,7 +398,7 @@ PosixMmapReadableFile::PosixMmapReadableFile(const int fd, : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) { fd_ = fd_ + 0; // suppress the warning for used variables assert(options.use_mmap_reads); - assert(options.use_os_buffer); + assert(!options.use_direct_reads); } PosixMmapReadableFile::~PosixMmapReadableFile() { @@ -533,6 +534,7 @@ PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size, #endif assert((page_size & (page_size - 1)) == 0); assert(options.use_mmap_writes); + assert(!options.use_direct_writes); } PosixMmapFile::~PosixMmapFile() { @@ -665,7 +667,10 @@ Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) { */ PosixWritableFile::PosixWritableFile(const std::string& fname, int fd, const EnvOptions& options) - : filename_(fname), fd_(fd), filesize_(0) { + : filename_(fname), + direct_io_(options.use_direct_writes), + fd_(fd), + filesize_(0) { #ifdef ROCKSDB_FALLOCATE_PRESENT allow_fallocate_ = options.allow_fallocate; fallocate_with_keep_size_ = options.fallocate_with_keep_size; @@ -680,6 +685,7 @@ PosixWritableFile::~PosixWritableFile() { } Status PosixWritableFile::Append(const Slice& data) { + assert(!direct_io_|| (IsSectorAligned(data.size()) && IsPageAligned(data.data()))); const char* src = data.data(); size_t left = data.size(); while (left != 0) { @@ -698,6 +704,8 @@ Status PosixWritableFile::Append(const Slice& data) { } Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { + assert(direct_io_ && IsSectorAligned(offset) && + IsSectorAligned(data.size()) && IsPageAligned(data.data())); assert(offset <= std::numeric_limits::max()); const char* src = data.data(); size_t left = data.size(); @@ -713,7 +721,7 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { offset += done; src += done; } - filesize_ = offset + data.size(); + filesize_ = offset; return Status::OK(); } @@ -778,6 +786,9 @@ bool PosixWritableFile::IsSyncThreadSafe() const { return true; } uint64_t PosixWritableFile::GetFileSize() { return filesize_; } Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) { + if (direct_io_) { + return Status::OK(); + } #ifndef OS_LINUX return Status::OK(); #else @@ -825,29 +836,6 @@ size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const { } #endif -/* - * PosixDirectIOWritableFile - */ -Status PosixDirectIOWritableFile::Append(const Slice& data) { - assert(IsSectorAligned(data.size()) && IsPageAligned(data.data())); - if (!IsSectorAligned(data.size()) || !IsPageAligned(data.data())) { - return Status::IOError("Unaligned buffer for direct IO"); - } - return PosixWritableFile::Append(data); -} - -Status PosixDirectIOWritableFile::PositionedAppend(const Slice& data, - uint64_t offset) { - assert(IsSectorAligned(offset)); - assert(IsSectorAligned(data.size())); - assert(IsPageAligned(data.data())); - if (!IsSectorAligned(offset) || !IsSectorAligned(data.size()) || - !IsPageAligned(data.data())) { - return Status::IOError("offset or size is not aligned"); - } - return PosixWritableFile::PositionedAppend(data, offset); -} - /* * PosixRandomRWFile */ diff --git a/util/io_posix.h b/util/io_posix.h index ac0149c9c..4f01e86b0 100644 --- a/util/io_posix.h +++ b/util/io_posix.h @@ -41,7 +41,7 @@ class PosixSequentialFile : public SequentialFile { std::string filename_; FILE* file_; int fd_; - bool use_os_buffer_; + bool use_direct_io_; public: PosixSequentialFile(const std::string& fname, FILE* f, @@ -74,7 +74,7 @@ class PosixRandomAccessFile : public RandomAccessFile { protected: std::string filename_; int fd_; - bool use_os_buffer_; + bool use_direct_io_; public: PosixRandomAccessFile(const std::string& fname, int fd, @@ -108,6 +108,7 @@ class PosixDirectIORandomAccessFile : public PosixRandomAccessFile { class PosixWritableFile : public WritableFile { protected: const std::string filename_; + const bool direct_io_; int fd_; uint64_t filesize_; #ifdef ROCKSDB_FALLOCATE_PRESENT @@ -130,7 +131,13 @@ class PosixWritableFile : public WritableFile { virtual Status Sync() override; virtual Status Fsync() override; virtual bool IsSyncThreadSafe() const override; + virtual bool UseDirectIO() const override { return direct_io_; } virtual uint64_t GetFileSize() override; + virtual size_t GetRequiredBufferAlignment() const override { + // TODO(gzh): It should be the logical sector size/filesystem block size + // hardcoded as 4k for most cases + return 4 * 1024; + } virtual Status InvalidateCache(size_t offset, size_t length) override; #ifdef ROCKSDB_FALLOCATE_PRESENT virtual Status Allocate(uint64_t offset, uint64_t len) override; @@ -139,22 +146,7 @@ class PosixWritableFile : public WritableFile { #endif }; -class PosixDirectIOWritableFile : public PosixWritableFile { - public: - explicit PosixDirectIOWritableFile(const std::string& filename, int fd) - : PosixWritableFile(filename, fd, EnvOptions()) {} - virtual ~PosixDirectIOWritableFile() {} - - bool UseOSBuffer() const override { return false; } - size_t GetRequiredBufferAlignment() const override { return 4 * 1024; } - Status Append(const Slice& data) override; - Status PositionedAppend(const Slice& data, uint64_t offset) override; - bool UseDirectIO() const override { return true; } - Status InvalidateCache(size_t offset, size_t length) override { - return Status::OK(); - } -}; - +// mmap() based random-access class PosixMmapReadableFile : public RandomAccessFile { private: int fd_; diff --git a/util/log_write_bench.cc b/util/log_write_bench.cc index 1061ccf26..10566d2e7 100644 --- a/util/log_write_bench.cc +++ b/util/log_write_bench.cc @@ -34,8 +34,7 @@ namespace rocksdb { void RunBenchmark() { std::string file_name = test::TmpDir() + "/log_write_benchmark.log"; Env* env = Env::Default(); - EnvOptions env_options; - env_options.use_mmap_writes = false; + EnvOptions env_options = env->OptimizeForLogWrite(EnvOptions()); env_options.bytes_per_sync = FLAGS_bytes_per_sync; unique_ptr file; env->NewWritableFile(file_name, &file, env_options); diff --git a/util/memenv.cc b/util/memenv.cc index 825161fd3..03dfe6cf9 100644 --- a/util/memenv.cc +++ b/util/memenv.cc @@ -221,9 +221,7 @@ class RandomAccessFileImpl : public RandomAccessFile { class WritableFileImpl : public WritableFile { public: - WritableFileImpl(FileState* file) : file_(file) { - file_->Ref(); - } + explicit WritableFileImpl(FileState* file) : file_(file) { file_->Ref(); } ~WritableFileImpl() { file_->Unref(); diff --git a/util/mock_env.cc b/util/mock_env.cc index 9e200e8c0..09736bb97 100644 --- a/util/mock_env.cc +++ b/util/mock_env.cc @@ -440,8 +440,8 @@ Status MockEnv::NewRandomAccessFile(const std::string& fname, } Status MockEnv::NewWritableFile(const std::string& fname, - unique_ptr* result, - const EnvOptions& env_options) { + unique_ptr* result, + const EnvOptions& env_options) { auto fn = NormalizePath(fname); MutexLock lock(&mutex_); if (file_map_.find(fn) != file_map_.end()) { diff --git a/util/options.cc b/util/options.cc index b4a6d36dc..34df0ca89 100644 --- a/util/options.cc +++ b/util/options.cc @@ -198,10 +198,10 @@ DBOptions::DBOptions() WAL_ttl_seconds(0), WAL_size_limit_MB(0), manifest_preallocation_size(4 * 1024 * 1024), - allow_os_buffer(true), allow_mmap_reads(false), allow_mmap_writes(false), use_direct_reads(false), + use_direct_writes(false), allow_fallocate(true), is_fd_close_on_exec(true), skip_log_error_on_recovery(false), @@ -269,10 +269,10 @@ DBOptions::DBOptions(const Options& options) WAL_ttl_seconds(options.WAL_ttl_seconds), WAL_size_limit_MB(options.WAL_size_limit_MB), manifest_preallocation_size(options.manifest_preallocation_size), - allow_os_buffer(options.allow_os_buffer), allow_mmap_reads(options.allow_mmap_reads), allow_mmap_writes(options.allow_mmap_writes), use_direct_reads(options.use_direct_reads), + use_direct_writes(options.use_direct_writes), allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), skip_log_error_on_recovery(options.skip_log_error_on_recovery), @@ -336,11 +336,11 @@ void DBOptions::Dump(Logger* log) const { keep_log_file_num); Header(log, " Options.recycle_log_file_num: %" ROCKSDB_PRIszt, recycle_log_file_num); - Header(log, " Options.allow_os_buffer: %d", allow_os_buffer); - Header(log, " Options.allow_mmap_reads: %d", allow_mmap_reads); Header(log, " Options.allow_fallocate: %d", allow_fallocate); + Header(log, " Options.allow_mmap_reads: %d", allow_mmap_reads); Header(log, " Options.allow_mmap_writes: %d", allow_mmap_writes); Header(log, " Options.use_direct_reads: %d", use_direct_reads); + Header(log, " Options.use_direct_writes: %d", use_direct_writes); Header(log, " Options.create_missing_column_families: %d", create_missing_column_families); Header(log, " Options.db_log_dir: %s", @@ -366,12 +366,6 @@ void DBOptions::Dump(Logger* log) const { Header(log, " Options.manifest_preallocation_size: %" ROCKSDB_PRIszt, manifest_preallocation_size); - Header(log, " Options.allow_os_buffer: %d", - allow_os_buffer); - Header(log, " Options.allow_mmap_reads: %d", - allow_mmap_reads); - Header(log, " Options.allow_mmap_writes: %d", - allow_mmap_writes); Header(log, " Options.is_fd_close_on_exec: %d", is_fd_close_on_exec); Header(log, " Options.stats_dump_period_sec: %u", diff --git a/util/options_helper.cc b/util/options_helper.cc index b68e777b5..2834a68de 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -71,10 +71,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.WAL_size_limit_MB = immutable_db_options.wal_size_limit_mb; options.manifest_preallocation_size = immutable_db_options.manifest_preallocation_size; - options.allow_os_buffer = immutable_db_options.allow_os_buffer; options.allow_mmap_reads = immutable_db_options.allow_mmap_reads; options.allow_mmap_writes = immutable_db_options.allow_mmap_writes; options.use_direct_reads = immutable_db_options.use_direct_reads; + options.use_direct_writes = immutable_db_options.use_direct_writes; options.allow_fallocate = immutable_db_options.allow_fallocate; options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec; options.stats_dump_period_sec = immutable_db_options.stats_dump_period_sec; diff --git a/util/options_helper.h b/util/options_helper.h index 164b11f97..43678acf7 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -186,12 +186,14 @@ static std::unordered_map db_options_type_info = { {"use_direct_reads", {offsetof(struct DBOptions, use_direct_reads), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, + {"use_direct_writes", + {offsetof(struct DBOptions, use_direct_writes), OptionType::kBoolean, + OptionVerificationType::kNormal, false, 0}}, {"allow_2pc", {offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, {"allow_os_buffer", - {offsetof(struct DBOptions, allow_os_buffer), OptionType::kBoolean, - OptionVerificationType::kNormal, false, 0}}, + {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, true, 0}}, {"create_if_missing", {offsetof(struct DBOptions, create_if_missing), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index 59ed0e1e5..bf5eca6bc 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -267,7 +267,6 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "max_background_flushes=35;" "create_if_missing=false;" "error_if_exists=true;" - "allow_os_buffer=false;" "delayed_write_rate=4294976214;" "manifest_preallocation_size=1222;" "allow_mmap_writes=false;" @@ -275,6 +274,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_fallocate=true;" "allow_mmap_reads=false;" "use_direct_reads=false;" + "use_direct_writes=false;" "max_log_file_size=4607;" "random_access_max_buffer_size=1048576;" "advise_random_on_open=true;" diff --git a/util/options_test.cc b/util/options_test.cc index 1bdbada20..214303abc 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -116,9 +116,10 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"WAL_ttl_seconds", "43"}, {"WAL_size_limit_MB", "44"}, {"manifest_preallocation_size", "45"}, - {"allow_os_buffer", "false"}, {"allow_mmap_reads", "true"}, {"allow_mmap_writes", "false"}, + {"use_direct_reads", "false"}, + {"use_direct_writes", "false"}, {"is_fd_close_on_exec", "true"}, {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, @@ -231,9 +232,10 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.WAL_ttl_seconds, static_cast(43)); ASSERT_EQ(new_db_opt.WAL_size_limit_MB, static_cast(44)); ASSERT_EQ(new_db_opt.manifest_preallocation_size, 45U); - ASSERT_EQ(new_db_opt.allow_os_buffer, false); ASSERT_EQ(new_db_opt.allow_mmap_reads, true); ASSERT_EQ(new_db_opt.allow_mmap_writes, false); + ASSERT_EQ(new_db_opt.use_direct_reads, false); + ASSERT_EQ(new_db_opt.use_direct_writes, false); ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true); ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); diff --git a/util/testutil.cc b/util/testutil.cc index a04db0349..8642df295 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -240,7 +240,8 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { db_opt->advise_random_on_open = rnd->Uniform(2); db_opt->allow_mmap_reads = rnd->Uniform(2); db_opt->allow_mmap_writes = rnd->Uniform(2); - db_opt->allow_os_buffer = rnd->Uniform(2); + db_opt->use_direct_reads = rnd->Uniform(2); + db_opt->use_direct_writes = rnd->Uniform(2); db_opt->create_if_missing = rnd->Uniform(2); db_opt->create_missing_column_families = rnd->Uniform(2); db_opt->disableDataSync = rnd->Uniform(2); diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 139b44c3d..df1d8f5f4 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1162,7 +1162,7 @@ Status BackupEngineImpl::CopyOrCreateFile( unique_ptr src_file; EnvOptions env_options; env_options.use_mmap_writes = false; - env_options.use_os_buffer = false; + // TODO:(gzh) maybe use direct writes here if possible if (size != nullptr) { *size = 0; } @@ -1365,7 +1365,7 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, EnvOptions env_options; env_options.use_mmap_writes = false; - env_options.use_os_buffer = false; + env_options.use_direct_reads = false; std::unique_ptr src_file; Status s = src_env->NewSequentialFile(src, &src_file, env_options); @@ -1671,6 +1671,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { unique_ptr backup_meta_file; EnvOptions env_options; env_options.use_mmap_writes = false; + env_options.use_direct_writes = false; s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file, env_options); if (!s.ok()) { diff --git a/utilities/env_mirror.cc b/utilities/env_mirror.cc index 70946a81e..b7a56e55f 100644 --- a/utilities/env_mirror.cc +++ b/utilities/env_mirror.cc @@ -18,7 +18,7 @@ class SequentialFileMirror : public SequentialFile { public: unique_ptr a_, b_; std::string fname; - SequentialFileMirror(std::string f) : fname(f) {} + explicit SequentialFileMirror(std::string f) : fname(f) {} Status Read(size_t n, Slice* result, char* scratch) { Slice aslice; @@ -62,7 +62,7 @@ class RandomAccessFileMirror : public RandomAccessFile { public: unique_ptr a_, b_; std::string fname; - RandomAccessFileMirror(std::string f) : fname(f) {} + explicit RandomAccessFileMirror(std::string f) : fname(f) {} Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { Status as = a_->Read(offset, n, result, scratch); @@ -101,7 +101,7 @@ class WritableFileMirror : public WritableFile { public: unique_ptr a_, b_; std::string fname; - WritableFileMirror(std::string f) : fname(f) {} + explicit WritableFileMirror(std::string f) : fname(f) {} Status Append(const Slice& data) override { Status as = a_->Append(data);