change use_direct_writes to use_direct_io_for_flush_and_compaction
Summary: Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction Now if Options::use_direct_io_for_flush_and_compaction = true, we will enable direct io for both reads and writes for flush and compaction job. Whereas Options::use_direct_reads controls user reads like iterator and Get(). Closes https://github.com/facebook/rocksdb/pull/2117 Differential Revision: D4860912 Pulled By: lightmark fbshipit-source-id: d93575a8a5e780cf7e40797287edc425ee648c19
This commit is contained in:
parent
b6f6b73a9c
commit
8d7edd5908
@ -8,6 +8,7 @@
|
||||
* DB::Get in place of std::string accepts PinnableSlice, which avoids the extra memcpy of value to std::string in most of cases.
|
||||
* PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it.
|
||||
* The old API that accepts std::string, although discouraged, is still supported.
|
||||
* Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details.
|
||||
|
||||
### New Features
|
||||
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/filename.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -103,6 +104,10 @@ Status BuildTable(
|
||||
unique_ptr<WritableFileWriter> file_writer;
|
||||
{
|
||||
unique_ptr<WritableFile> file;
|
||||
#ifndef NDEBUG
|
||||
bool use_direct_writes = env_options.use_direct_writes;
|
||||
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
|
||||
#endif // !NDEBUG
|
||||
s = NewWritableFile(env, fname, &file, env_options);
|
||||
if (!s.ok()) {
|
||||
EventHelpers::LogAndNotifyTableFileCreationFinished(
|
||||
@ -180,6 +185,11 @@ Status BuildTable(
|
||||
|
||||
if (s.ok() && !empty) {
|
||||
// Verify that the table is usable
|
||||
// We set for_compaction to false and don't OptimizeForCompactionTableRead
|
||||
// here because this is a special case after we finish the table building
|
||||
// No matter whether use_direct_io_for_flush_and_compaction is true,
|
||||
// we will regrad this verification as user reads since the goal is
|
||||
// to cache it here for further user reads
|
||||
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
|
||||
ReadOptions(), env_options, internal_comparator, meta->fd,
|
||||
nullptr /* range_del_agg */, nullptr,
|
||||
|
6
db/c.cc
6
db/c.cc
@ -2121,9 +2121,9 @@ void rocksdb_options_set_use_direct_reads(rocksdb_options_t* opt,
|
||||
opt->rep.use_direct_reads = v;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_use_direct_writes(rocksdb_options_t* opt,
|
||||
unsigned char v) {
|
||||
opt->rep.use_direct_writes = v;
|
||||
void rocksdb_options_set_use_direct_io_for_flush_and_compaction(
|
||||
rocksdb_options_t* opt, unsigned char v) {
|
||||
opt->rep.use_direct_io_for_flush_and_compaction = v;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_allow_mmap_reads(
|
||||
|
@ -1072,6 +1072,11 @@ Status CompactionJob::FinishCompactionOutputFile(
|
||||
TableProperties tp;
|
||||
if (s.ok() && current_entries > 0) {
|
||||
// Verify that the table is usable
|
||||
// We set for_compaction to false and don't OptimizeForCompactionTableRead
|
||||
// here because this is a special case after we finish the table building
|
||||
// No matter whether use_direct_io_for_flush_and_compaction is true,
|
||||
// we will regrad this verification as user reads since the goal is
|
||||
// to cache it here for further user reads
|
||||
InternalIterator* iter = cfd->table_cache()->NewIterator(
|
||||
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
|
||||
nullptr /* range_del_agg */, nullptr,
|
||||
@ -1198,7 +1203,11 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
#endif // !ROCKSDB_LITE
|
||||
// Make the output file
|
||||
unique_ptr<WritableFile> writable_file;
|
||||
Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
|
||||
EnvOptions opt_env_opts =
|
||||
env_->OptimizeForCompactionTableWrite(env_options_, db_options_);
|
||||
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
|
||||
&opt_env_opts.use_direct_writes);
|
||||
Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(
|
||||
db_options_.info_log,
|
||||
|
@ -40,6 +40,12 @@ class DBCompactionTestWithParam
|
||||
bool exclusive_manual_compaction_;
|
||||
};
|
||||
|
||||
class DBCompactionDirectIOTest : public DBCompactionTest,
|
||||
public ::testing::WithParamInterface<bool> {
|
||||
public:
|
||||
DBCompactionDirectIOTest() : DBCompactionTest() {}
|
||||
};
|
||||
|
||||
namespace {
|
||||
|
||||
class FlushedFileCollector : public EventListener {
|
||||
@ -2552,6 +2558,39 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
|
||||
std::make_tuple(4, true),
|
||||
std::make_tuple(4, false)));
|
||||
|
||||
TEST_P(DBCompactionDirectIOTest, DirectIO) {
|
||||
Options options = CurrentOptions();
|
||||
Destroy(options);
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.use_direct_io_for_flush_and_compaction = GetParam();
|
||||
options.env = new MockEnv(Env::Default());
|
||||
Reopen(options);
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"TableCache::NewIterator:for_compaction", [&](void* arg) {
|
||||
bool* use_direct_reads = static_cast<bool*>(arg);
|
||||
ASSERT_EQ(*use_direct_reads,
|
||||
options.use_direct_io_for_flush_and_compaction);
|
||||
});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
|
||||
bool* use_direct_writes = static_cast<bool*>(arg);
|
||||
ASSERT_EQ(*use_direct_writes,
|
||||
options.use_direct_io_for_flush_and_compaction);
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
MakeTables(3, "p", "q", 1);
|
||||
ASSERT_EQ("1,1,1", FilesPerLevel(1));
|
||||
Compact(1, "p1", "p9");
|
||||
ASSERT_EQ("0,0,1", FilesPerLevel(1));
|
||||
Destroy(options);
|
||||
delete options.env;
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
|
||||
testing::Bool());
|
||||
|
||||
class CompactionPriTest : public DBTestBase,
|
||||
public testing::WithParamInterface<uint32_t> {
|
||||
public:
|
||||
|
@ -19,6 +19,12 @@ class DBFlushTest : public DBTestBase {
|
||||
DBFlushTest() : DBTestBase("/db_flush_test") {}
|
||||
};
|
||||
|
||||
class DBFlushDirectIOTest : public DBFlushTest,
|
||||
public ::testing::WithParamInterface<bool> {
|
||||
public:
|
||||
DBFlushDirectIOTest() : DBFlushTest() {}
|
||||
};
|
||||
|
||||
// We had issue when two background threads trying to flush at the same time,
|
||||
// only one of them get committed. The test verifies the issue is fixed.
|
||||
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
|
||||
@ -83,6 +89,33 @@ TEST_F(DBFlushTest, SyncFail) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_P(DBFlushDirectIOTest, DirectIO) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.max_background_flushes = 2;
|
||||
options.use_direct_io_for_flush_and_compaction = GetParam();
|
||||
options.env = new MockEnv(Env::Default());
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:create_file", [&](void* arg) {
|
||||
bool* use_direct_writes = static_cast<bool*>(arg);
|
||||
ASSERT_EQ(*use_direct_writes,
|
||||
options.use_direct_io_for_flush_and_compaction);
|
||||
});
|
||||
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("foo", "v"));
|
||||
FlushOptions flush_options;
|
||||
flush_options.wait = true;
|
||||
ASSERT_OK(dbfull()->Flush(flush_options));
|
||||
Destroy(options);
|
||||
delete options.env;
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
||||
testing::Bool());
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -101,7 +101,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
|
||||
result.compaction_readahead_size = 1024 * 1024 * 2;
|
||||
}
|
||||
|
||||
if (result.compaction_readahead_size > 0) {
|
||||
if (result.compaction_readahead_size > 0 ||
|
||||
result.use_direct_io_for_flush_and_compaction) {
|
||||
result.new_table_reader_for_compaction_inputs = true;
|
||||
}
|
||||
|
||||
@ -165,10 +166,12 @@ static Status ValidateOptions(
|
||||
"then direct I/O reads (use_direct_reads) must be disabled. ");
|
||||
}
|
||||
|
||||
if (db_options.allow_mmap_writes && db_options.use_direct_writes) {
|
||||
if (db_options.allow_mmap_writes &&
|
||||
db_options.use_direct_io_for_flush_and_compaction) {
|
||||
return Status::NotSupported(
|
||||
"If memory mapped writes (allow_mmap_writes) are enabled "
|
||||
"then direct I/O writes (use_direct_writes) must be disabled. ");
|
||||
"then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
|
||||
"be disabled. ");
|
||||
}
|
||||
|
||||
if (db_options.keep_log_file_num == 0) {
|
||||
@ -823,9 +826,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
std::vector<SequenceNumber> snapshot_seqs =
|
||||
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
||||
|
||||
EnvOptions optimized_env_options =
|
||||
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
|
||||
s = BuildTable(
|
||||
dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_,
|
||||
cfd->table_cache(), iter.get(),
|
||||
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
|
||||
optimized_env_options, cfd->table_cache(), iter.get(),
|
||||
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
|
||||
&meta, cfd->internal_comparator(),
|
||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||
|
@ -1995,7 +1995,8 @@ TEST_F(DBTest2, DirectIO) {
|
||||
return;
|
||||
}
|
||||
Options options = CurrentOptions();
|
||||
options.use_direct_reads = options.use_direct_writes = true;
|
||||
options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
|
||||
true;
|
||||
options.allow_mmap_reads = options.allow_mmap_writes = false;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
|
@ -294,9 +294,11 @@ Status FlushJob::WriteLevel0Table() {
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
|
||||
&output_compression_);
|
||||
EnvOptions optimized_env_options =
|
||||
db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_);
|
||||
s = BuildTable(
|
||||
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
|
||||
env_options_, cfd_->table_cache(), iter.get(),
|
||||
optimized_env_options, cfd_->table_cache(), iter.get(),
|
||||
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
|
||||
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
|
||||
cfd_->GetName(), existing_snapshots_,
|
||||
|
@ -331,7 +331,7 @@ int main(int argc, char** argv) {
|
||||
options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
|
||||
options.level0_slowdown_writes_trigger = 99999;
|
||||
options.level0_stop_writes_trigger = 99999;
|
||||
options.use_direct_writes = true;
|
||||
options.use_direct_io_for_flush_and_compaction = true;
|
||||
options.write_buffer_size = FLAGS_memtable_size;
|
||||
rocksdb::BlockBasedTableOptions table_options;
|
||||
table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size);
|
||||
|
@ -377,9 +377,11 @@ class Repairer {
|
||||
ro.total_order_seek = true;
|
||||
Arena arena;
|
||||
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
|
||||
EnvOptions optimized_env_options =
|
||||
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
|
||||
status = BuildTable(
|
||||
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
|
||||
env_options_, table_cache_, iter.get(),
|
||||
optimized_env_options, table_cache_, iter.get(),
|
||||
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
|
||||
&meta, cfd->internal_comparator(),
|
||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||
|
@ -184,6 +184,11 @@ InternalIterator* TableCache::NewIterator(
|
||||
}
|
||||
size_t readahead = 0;
|
||||
if (for_compaction) {
|
||||
#ifndef NDEBUG
|
||||
bool use_direct_reads_for_compaction = env_options.use_direct_reads;
|
||||
TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
|
||||
&use_direct_reads_for_compaction);
|
||||
#endif // !NDEBUG
|
||||
if (ioptions_.new_table_reader_for_compaction_inputs) {
|
||||
readahead = ioptions_.compaction_readahead_size;
|
||||
create_new_table_reader = true;
|
||||
|
@ -2218,7 +2218,8 @@ VersionSet::VersionSet(const std::string& dbname,
|
||||
current_version_number_(0),
|
||||
manifest_file_size_(0),
|
||||
env_options_(storage_options),
|
||||
env_options_compactions_(env_options_) {}
|
||||
env_options_compactions_(
|
||||
env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {}
|
||||
|
||||
void CloseTables(void* ptr, size_t) {
|
||||
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
|
||||
@ -3477,7 +3478,7 @@ InternalIterator* VersionSet::MakeInputIterator(
|
||||
// Create concatenating iterator for the files from this level
|
||||
list[num++] = NewTwoLevelIterator(
|
||||
new LevelFileIteratorState(
|
||||
cfd->table_cache(), read_options, env_options_,
|
||||
cfd->table_cache(), read_options, env_options_compactions_,
|
||||
cfd->internal_comparator(),
|
||||
nullptr /* no per level latency histogram */,
|
||||
true /* for_compaction */, false /* prefix enabled */,
|
||||
|
19
env/env.cc
vendored
19
env/env.cc
vendored
@ -10,9 +10,9 @@
|
||||
#include "rocksdb/env.h"
|
||||
|
||||
#include <thread>
|
||||
#include "options/db_options.h"
|
||||
#include "port/port.h"
|
||||
#include "port/sys_time.h"
|
||||
|
||||
#include "rocksdb/options.h"
|
||||
#include "util/arena.h"
|
||||
#include "util/autovector.h"
|
||||
@ -316,7 +316,6 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
|
||||
env_options->use_mmap_reads = options.allow_mmap_reads;
|
||||
env_options->use_mmap_writes = options.allow_mmap_writes;
|
||||
env_options->use_direct_reads = options.use_direct_reads;
|
||||
env_options->use_direct_writes = options.use_direct_writes;
|
||||
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
|
||||
env_options->bytes_per_sync = options.bytes_per_sync;
|
||||
env_options->compaction_readahead_size = options.compaction_readahead_size;
|
||||
@ -341,6 +340,22 @@ EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
|
||||
return env_options;
|
||||
}
|
||||
|
||||
EnvOptions Env::OptimizeForCompactionTableWrite(
|
||||
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
|
||||
EnvOptions optimized_env_options(env_options);
|
||||
optimized_env_options.use_direct_writes =
|
||||
db_options.use_direct_io_for_flush_and_compaction;
|
||||
return optimized_env_options;
|
||||
}
|
||||
|
||||
EnvOptions Env::OptimizeForCompactionTableRead(
|
||||
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
|
||||
EnvOptions optimized_env_options(env_options);
|
||||
optimized_env_options.use_direct_reads =
|
||||
db_options.use_direct_io_for_flush_and_compaction;
|
||||
return optimized_env_options;
|
||||
}
|
||||
|
||||
EnvOptions::EnvOptions(const DBOptions& options) {
|
||||
AssignEnvOptions(this, options);
|
||||
}
|
||||
|
@ -804,8 +804,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_writes(
|
||||
rocksdb_options_t*, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_reads(
|
||||
rocksdb_options_t*, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_writes(
|
||||
rocksdb_options_t*, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_options_set_use_direct_io_for_flush_and_compaction(rocksdb_options_t*,
|
||||
unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_is_fd_close_on_exec(
|
||||
rocksdb_options_t*, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_skip_log_error_on_recovery(
|
||||
|
@ -44,6 +44,7 @@ class WritableFile;
|
||||
class RandomRWFile;
|
||||
class Directory;
|
||||
struct DBOptions;
|
||||
struct ImmutableDBOptions;
|
||||
class RateLimiter;
|
||||
class ThreadStatusUpdater;
|
||||
struct ThreadStatus;
|
||||
@ -375,6 +376,20 @@ class Env {
|
||||
virtual EnvOptions OptimizeForManifestWrite(
|
||||
const EnvOptions& env_options) const;
|
||||
|
||||
// OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy
|
||||
// of the EnvOptions in the parameters, but is optimized for writing table
|
||||
// files. Default implementation returns the copy of the same object.
|
||||
virtual EnvOptions OptimizeForCompactionTableWrite(
|
||||
const EnvOptions& env_options,
|
||||
const ImmutableDBOptions& db_options) const;
|
||||
|
||||
// OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy
|
||||
// of the EnvOptions in the parameters, but is optimized for reading table
|
||||
// files. Default implementation returns the copy of the same object.
|
||||
virtual EnvOptions OptimizeForCompactionTableRead(
|
||||
const EnvOptions& env_options,
|
||||
const ImmutableDBOptions& db_options) const;
|
||||
|
||||
// Returns the status of all threads that belong to the current Env.
|
||||
virtual Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
|
||||
return Status::NotSupported("Not supported.");
|
||||
|
@ -581,15 +581,16 @@ struct DBOptions {
|
||||
// bufferized. The hardware buffer of the devices may however still
|
||||
// be used. Memory mapped files are not impacted by these parameters.
|
||||
|
||||
// Use O_DIRECT for reading file
|
||||
// Use O_DIRECT for user reads
|
||||
// Default: false
|
||||
// Not supported in ROCKSDB_LITE mode!
|
||||
bool use_direct_reads = false;
|
||||
|
||||
// Use O_DIRECT for writing file
|
||||
// Use O_DIRECT for both reads and writes in background flush and compactions
|
||||
// When true, we also force new_table_reader_for_compaction_inputs to true.
|
||||
// Default: false
|
||||
// Not supported in ROCKSDB_LITE mode!
|
||||
bool use_direct_writes = false;
|
||||
bool use_direct_io_for_flush_and_compaction = false;
|
||||
|
||||
// If false, fallocate() calls are bypassed
|
||||
bool allow_fallocate = true;
|
||||
|
@ -1052,24 +1052,26 @@ void Java_org_rocksdb_Options_setUseDirectReads(JNIEnv* env, jobject jobj,
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_Options
|
||||
* Method: useDirectWrites
|
||||
* Method: useDirectIoForFlushAndCompaction
|
||||
* Signature: (J)Z
|
||||
*/
|
||||
jboolean Java_org_rocksdb_Options_useDirectWrites(JNIEnv* env, jobject jobj,
|
||||
jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_writes;
|
||||
jboolean Java_org_rocksdb_Options_useDirectIoForFlushAndCompaction(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::Options*>(jhandle)
|
||||
->use_direct_io_for_flush_and_compaction;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_Options
|
||||
* Method: setUseDirectReads
|
||||
* Method: setUseDirectIoForFlushAndCompaction
|
||||
* Signature: (JZ)V
|
||||
*/
|
||||
void Java_org_rocksdb_Options_setUseDirectWrites(JNIEnv* env, jobject jobj,
|
||||
jlong jhandle,
|
||||
jboolean use_direct_writes) {
|
||||
reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_writes =
|
||||
static_cast<bool>(use_direct_writes);
|
||||
void Java_org_rocksdb_Options_setUseDirectIoForFlushAndCompaction(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle,
|
||||
jboolean use_direct_io_for_flush_and_compaction) {
|
||||
reinterpret_cast<rocksdb::Options*>(jhandle)
|
||||
->use_direct_io_for_flush_and_compaction =
|
||||
static_cast<bool>(use_direct_io_for_flush_and_compaction);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -4920,12 +4922,13 @@ void Java_org_rocksdb_DBOptions_setUseDirectReads(JNIEnv* env, jobject jobj,
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_DBOptions
|
||||
* Method: useDirectWrites
|
||||
* Method: useDirectIoForFlushAndCompaction
|
||||
* Signature: (J)Z
|
||||
*/
|
||||
jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj,
|
||||
jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_writes;
|
||||
jboolean Java_org_rocksdb_DBOptions_useDirectIoForFlushAndCompaction(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)
|
||||
->use_direct_io_for_flush_and_compaction;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -4933,11 +4936,12 @@ jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj,
|
||||
* Method: setUseDirectReads
|
||||
* Signature: (JZ)V
|
||||
*/
|
||||
void Java_org_rocksdb_DBOptions_setUseDirectWrites(JNIEnv* env, jobject jobj,
|
||||
jlong jhandle,
|
||||
jboolean use_direct_writes) {
|
||||
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_writes =
|
||||
static_cast<bool>(use_direct_writes);
|
||||
void Java_org_rocksdb_DBOptions_setUseDirectIoForFlushAndCompaction(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle,
|
||||
jboolean use_direct_io_for_flush_and_compaction) {
|
||||
reinterpret_cast<rocksdb::DBOptions*>(jhandle)
|
||||
->use_direct_io_for_flush_and_compaction =
|
||||
static_cast<bool>(use_direct_io_for_flush_and_compaction);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -531,17 +531,18 @@ public class DBOptions
|
||||
}
|
||||
|
||||
@Override
|
||||
public DBOptions setUseDirectWrites(
|
||||
final boolean useDirectWrites) {
|
||||
public DBOptions setUseDirectIoForFlushAndCompaction(
|
||||
final boolean useDirectIoForFlushAndCompaction) {
|
||||
assert(isOwningHandle());
|
||||
setUseDirectWrites(nativeHandle_, useDirectWrites);
|
||||
setUseDirectIoForFlushAndCompaction(nativeHandle_,
|
||||
useDirectIoForFlushAndCompaction);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useDirectWrites() {
|
||||
public boolean useDirectIoForFlushAndCompaction() {
|
||||
assert(isOwningHandle());
|
||||
return useDirectWrites(nativeHandle_);
|
||||
return useDirectIoForFlushAndCompaction(nativeHandle_);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1025,8 +1026,9 @@ public class DBOptions
|
||||
private native long manifestPreallocationSize(long handle);
|
||||
private native void setUseDirectReads(long handle, boolean useDirectReads);
|
||||
private native boolean useDirectReads(long handle);
|
||||
private native void setUseDirectWrites(long handle, boolean useDirectWrites);
|
||||
private native boolean useDirectWrites(long handle);
|
||||
private native void setUseDirectIoForFlushAndCompaction(
|
||||
long handle, boolean useDirectIoForFlushAndCompaction);
|
||||
private native boolean useDirectIoForFlushAndCompaction(long handle);
|
||||
private native void setAllowFAllocate(final long handle,
|
||||
final boolean allowFAllocate);
|
||||
private native boolean allowFAllocate(final long handle);
|
||||
|
@ -804,21 +804,24 @@ public interface DBOptionsInterface<T extends DBOptionsInterface> {
|
||||
boolean useDirectReads();
|
||||
|
||||
/**
|
||||
* Enable the OS to use direct I/O for writing sst tables.
|
||||
* Enable the OS to use direct reads and writes in flush and
|
||||
* compaction
|
||||
* Default: false
|
||||
*
|
||||
* @param useDirectWrites if true, then direct write is enabled
|
||||
* @param useDirectIoForFlushAndCompaction if true, then direct
|
||||
* I/O will be enabled for background flush and compactions
|
||||
* @return the instance of the current object.
|
||||
*/
|
||||
T setUseDirectWrites(boolean useDirectWrites);
|
||||
T setUseDirectIoForFlushAndCompaction(boolean useDirectIoForFlushAndCompaction);
|
||||
|
||||
/**
|
||||
* Enable the OS to use direct I/O for writing sst tables.
|
||||
* Default: false
|
||||
* Enable the OS to use direct reads and writes in flush and
|
||||
* compaction
|
||||
*
|
||||
* @return if true, then direct writes are enabled
|
||||
* @return if true, then direct I/O is enabled for flush and
|
||||
* compaction
|
||||
*/
|
||||
boolean useDirectWrites();
|
||||
boolean useDirectIoForFlushAndCompaction();
|
||||
|
||||
/**
|
||||
* Whether fallocate calls are allowed
|
||||
|
@ -593,16 +593,17 @@ public class Options extends RocksObject
|
||||
}
|
||||
|
||||
@Override
|
||||
public Options setUseDirectWrites(final boolean useDirectWrites) {
|
||||
public Options setUseDirectIoForFlushAndCompaction(
|
||||
final boolean useDirectIoForFlushAndCompaction) {
|
||||
assert(isOwningHandle());
|
||||
setUseDirectWrites(nativeHandle_, useDirectWrites);
|
||||
setUseDirectIoForFlushAndCompaction(nativeHandle_, useDirectIoForFlushAndCompaction);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useDirectWrites() {
|
||||
public boolean useDirectIoForFlushAndCompaction() {
|
||||
assert(isOwningHandle());
|
||||
return useDirectWrites(nativeHandle_);
|
||||
return useDirectIoForFlushAndCompaction(nativeHandle_);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1621,8 +1622,9 @@ public class Options extends RocksObject
|
||||
private native long manifestPreallocationSize(long handle);
|
||||
private native void setUseDirectReads(long handle, boolean useDirectReads);
|
||||
private native boolean useDirectReads(long handle);
|
||||
private native void setUseDirectWrites(long handle, boolean useDirectWrites);
|
||||
private native boolean useDirectWrites(long handle);
|
||||
private native void setUseDirectIoForFlushAndCompaction(
|
||||
long handle, boolean useDirectIoForFlushAndCompaction);
|
||||
private native boolean useDirectIoForFlushAndCompaction(long handle);
|
||||
private native void setAllowFAllocate(final long handle,
|
||||
final boolean allowFAllocate);
|
||||
private native boolean allowFAllocate(final long handle);
|
||||
|
@ -331,11 +331,11 @@ public class DBOptionsTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void useDirectWrites() {
|
||||
public void useDirectIoForFlushAndCompaction() {
|
||||
try(final DBOptions opt = new DBOptions()) {
|
||||
final boolean boolValue = rand.nextBoolean();
|
||||
opt.setUseDirectWrites(boolValue);
|
||||
assertThat(opt.useDirectWrites()).isEqualTo(boolValue);
|
||||
opt.setUseDirectIoForFlushAndCompaction(boolValue);
|
||||
assertThat(opt.useDirectIoForFlushAndCompaction()).isEqualTo(boolValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -553,11 +553,11 @@ public class OptionsTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void useDirectWrites() {
|
||||
public void useDirectIoForFlushAndCompaction() {
|
||||
try(final Options opt = new Options()) {
|
||||
final boolean boolValue = rand.nextBoolean();
|
||||
opt.setUseDirectWrites(boolValue);
|
||||
assertThat(opt.useDirectWrites()).isEqualTo(boolValue);
|
||||
opt.setUseDirectIoForFlushAndCompaction(boolValue);
|
||||
assertThat(opt.useDirectIoForFlushAndCompaction()).isEqualTo(boolValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
|
||||
allow_mmap_reads(options.allow_mmap_reads),
|
||||
allow_mmap_writes(options.allow_mmap_writes),
|
||||
use_direct_reads(options.use_direct_reads),
|
||||
use_direct_writes(options.use_direct_writes),
|
||||
use_direct_io_for_flush_and_compaction(
|
||||
options.use_direct_io_for_flush_and_compaction),
|
||||
allow_fallocate(options.allow_fallocate),
|
||||
is_fd_close_on_exec(options.is_fd_close_on_exec),
|
||||
advise_random_on_open(options.advise_random_on_open),
|
||||
@ -127,8 +128,10 @@ void ImmutableDBOptions::Dump(Logger* log) const {
|
||||
allow_mmap_writes);
|
||||
ROCKS_LOG_HEADER(log, " Options.use_direct_reads: %d",
|
||||
use_direct_reads);
|
||||
ROCKS_LOG_HEADER(log, " Options.use_direct_writes: %d",
|
||||
use_direct_writes);
|
||||
ROCKS_LOG_HEADER(log,
|
||||
" "
|
||||
"Options.use_direct_io_for_flush_and_compaction: %d",
|
||||
use_direct_io_for_flush_and_compaction);
|
||||
ROCKS_LOG_HEADER(log, " Options.create_missing_column_families: %d",
|
||||
create_missing_column_families);
|
||||
ROCKS_LOG_HEADER(log, " Options.db_log_dir: %s",
|
||||
|
@ -48,7 +48,7 @@ struct ImmutableDBOptions {
|
||||
bool allow_mmap_reads;
|
||||
bool allow_mmap_writes;
|
||||
bool use_direct_reads;
|
||||
bool use_direct_writes;
|
||||
bool use_direct_io_for_flush_and_compaction;
|
||||
bool allow_fallocate;
|
||||
bool is_fd_close_on_exec;
|
||||
bool advise_random_on_open;
|
||||
|
@ -153,7 +153,8 @@ DBOptions::DBOptions(const Options& options)
|
||||
allow_mmap_reads(options.allow_mmap_reads),
|
||||
allow_mmap_writes(options.allow_mmap_writes),
|
||||
use_direct_reads(options.use_direct_reads),
|
||||
use_direct_writes(options.use_direct_writes),
|
||||
use_direct_io_for_flush_and_compaction(
|
||||
options.use_direct_io_for_flush_and_compaction),
|
||||
allow_fallocate(options.allow_fallocate),
|
||||
is_fd_close_on_exec(options.is_fd_close_on_exec),
|
||||
skip_log_error_on_recovery(options.skip_log_error_on_recovery),
|
||||
|
@ -70,7 +70,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
|
||||
options.allow_mmap_reads = immutable_db_options.allow_mmap_reads;
|
||||
options.allow_mmap_writes = immutable_db_options.allow_mmap_writes;
|
||||
options.use_direct_reads = immutable_db_options.use_direct_reads;
|
||||
options.use_direct_writes = immutable_db_options.use_direct_writes;
|
||||
options.use_direct_io_for_flush_and_compaction =
|
||||
immutable_db_options.use_direct_io_for_flush_and_compaction;
|
||||
options.allow_fallocate = immutable_db_options.allow_fallocate;
|
||||
options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec;
|
||||
options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec;
|
||||
|
@ -163,8 +163,10 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
|
||||
{offsetof(struct DBOptions, use_direct_reads), OptionType::kBoolean,
|
||||
OptionVerificationType::kNormal, false, 0}},
|
||||
{"use_direct_writes",
|
||||
{offsetof(struct DBOptions, use_direct_writes), OptionType::kBoolean,
|
||||
OptionVerificationType::kNormal, false, 0}},
|
||||
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}},
|
||||
{"use_direct_io_for_flush_and_compaction",
|
||||
{offsetof(struct DBOptions, use_direct_io_for_flush_and_compaction),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
|
||||
{"allow_2pc",
|
||||
{offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean,
|
||||
OptionVerificationType::kNormal, false, 0}},
|
||||
|
@ -274,7 +274,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
|
||||
"allow_fallocate=true;"
|
||||
"allow_mmap_reads=false;"
|
||||
"use_direct_reads=false;"
|
||||
"use_direct_writes=false;"
|
||||
"use_direct_io_for_flush_and_compaction=false;"
|
||||
"max_log_file_size=4607;"
|
||||
"random_access_max_buffer_size=1048576;"
|
||||
"advise_random_on_open=true;"
|
||||
|
@ -120,7 +120,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
||||
{"allow_mmap_reads", "true"},
|
||||
{"allow_mmap_writes", "false"},
|
||||
{"use_direct_reads", "false"},
|
||||
{"use_direct_writes", "false"},
|
||||
{"use_direct_io_for_flush_and_compaction", "false"},
|
||||
{"is_fd_close_on_exec", "true"},
|
||||
{"skip_log_error_on_recovery", "false"},
|
||||
{"stats_dump_period_sec", "46"},
|
||||
@ -234,7 +234,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
||||
ASSERT_EQ(new_db_opt.allow_mmap_reads, true);
|
||||
ASSERT_EQ(new_db_opt.allow_mmap_writes, false);
|
||||
ASSERT_EQ(new_db_opt.use_direct_reads, false);
|
||||
ASSERT_EQ(new_db_opt.use_direct_writes, false);
|
||||
ASSERT_EQ(new_db_opt.use_direct_io_for_flush_and_compaction, false);
|
||||
ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true);
|
||||
ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false);
|
||||
ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U);
|
||||
|
@ -800,17 +800,18 @@ DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
|
||||
" in MB.");
|
||||
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
|
||||
|
||||
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
|
||||
DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
|
||||
"Allow reads to occur via mmap-ing files");
|
||||
|
||||
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
|
||||
DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
|
||||
"Allow writes to occur via mmap-ing files");
|
||||
|
||||
DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads,
|
||||
DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
|
||||
"Use O_DIRECT for reading data");
|
||||
|
||||
DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes,
|
||||
"Use O_DIRECT for writing data");
|
||||
DEFINE_bool(use_direct_io_for_flush_and_compaction,
|
||||
rocksdb::Options().use_direct_io_for_flush_and_compaction,
|
||||
"Use O_DIRECT for background flush and compaction I/O");
|
||||
|
||||
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
|
||||
"Advise random access on table file open");
|
||||
@ -2813,7 +2814,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
options.allow_mmap_reads = FLAGS_mmap_read;
|
||||
options.allow_mmap_writes = FLAGS_mmap_write;
|
||||
options.use_direct_reads = FLAGS_use_direct_reads;
|
||||
options.use_direct_writes = FLAGS_use_direct_writes;
|
||||
options.use_direct_io_for_flush_and_compaction =
|
||||
FLAGS_use_direct_io_for_flush_and_compaction;
|
||||
#ifndef ROCKSDB_LITE
|
||||
options.compaction_options_fifo = CompactionOptionsFIFO(
|
||||
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024);
|
||||
|
@ -211,7 +211,7 @@ const std::string options_file_content = R"OPTIONS_FILE(
|
||||
allow_mmap_reads=false
|
||||
allow_mmap_writes=false
|
||||
use_direct_reads=false
|
||||
use_direct_writes=false
|
||||
use_direct_io_for_flush_and_compaction=false
|
||||
stats_dump_period_sec=600
|
||||
allow_fallocate=true
|
||||
max_log_file_size=83886080
|
||||
|
@ -269,16 +269,17 @@ DEFINE_string(db, "", "Use the db with the following name.");
|
||||
DEFINE_bool(verify_checksum, false,
|
||||
"Verify checksum for every block read from storage");
|
||||
|
||||
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
|
||||
DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
|
||||
"Allow reads to occur via mmap-ing files");
|
||||
|
||||
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
|
||||
DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
|
||||
"Allow writes to occur via mmap-ing files");
|
||||
|
||||
DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads,
|
||||
DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
|
||||
"Use O_DIRECT for reading data");
|
||||
|
||||
DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes,
|
||||
DEFINE_bool(use_direct_io_for_flush_and_compaction,
|
||||
rocksdb::Options().use_direct_io_for_flush_and_compaction,
|
||||
"Use O_DIRECT for writing data");
|
||||
|
||||
// Database statistics
|
||||
@ -2156,7 +2157,8 @@ class StressTest {
|
||||
options_.allow_mmap_reads = FLAGS_mmap_read;
|
||||
options_.allow_mmap_writes = FLAGS_mmap_write;
|
||||
options_.use_direct_reads = FLAGS_use_direct_reads;
|
||||
options_.use_direct_writes = FLAGS_use_direct_writes;
|
||||
options_.use_direct_io_for_flush_and_compaction =
|
||||
FLAGS_use_direct_io_for_flush_and_compaction;
|
||||
options_.target_file_size_base = FLAGS_target_file_size_base;
|
||||
options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
|
||||
options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
|
||||
|
@ -242,7 +242,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) {
|
||||
db_opt->allow_mmap_reads = rnd->Uniform(2);
|
||||
db_opt->allow_mmap_writes = rnd->Uniform(2);
|
||||
db_opt->use_direct_reads = rnd->Uniform(2);
|
||||
db_opt->use_direct_writes = rnd->Uniform(2);
|
||||
db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2);
|
||||
db_opt->create_if_missing = rnd->Uniform(2);
|
||||
db_opt->create_missing_column_families = rnd->Uniform(2);
|
||||
db_opt->enable_thread_tracking = rnd->Uniform(2);
|
||||
|
Loading…
Reference in New Issue
Block a user