disable direct reads for log and manifest and add direct io to tests

Summary:
Disable direct reads for log and manifest. Direct reads should not affect sequential_file
Also add kDirectIO for option_config_ in db_test_util
Closes https://github.com/facebook/rocksdb/pull/2337

Differential Revision: D5100261

Pulled By: lightmark

fbshipit-source-id: 0ebfd13b93fa1b8f9acae514ac44f8125a05868b
This commit is contained in:
Aaron Gao 2017-05-22 18:40:41 -07:00 committed by Facebook Github Bot
parent 15ba4d6c4b
commit 3e86c0f07c
15 changed files with 113 additions and 23 deletions

View File

@ -500,7 +500,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
unique_ptr<SequentialFileReader> file_reader; unique_ptr<SequentialFileReader> file_reader;
{ {
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
status = env_->NewSequentialFile(fname, &file, env_options_); status = env_->NewSequentialFile(fname, &file,
env_->OptimizeForLogRead(env_options_));
if (!status.ok()) { if (!status.ok()) {
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {

View File

@ -1007,6 +1007,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) { TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) {
int upper_bound_hits = 0; int upper_bound_hits = 0;
Options options = CurrentOptions();
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound", "BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound",
[&upper_bound_hits](void* arg) { [&upper_bound_hits](void* arg) {
@ -1014,7 +1015,6 @@ TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) {
upper_bound_hits += (*static_cast<bool*>(arg) ? 1 : 0); upper_bound_hits += (*static_cast<bool*>(arg) ? 1 : 0);
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.create_if_missing = true; options.create_if_missing = true;
options.prefix_extractor = nullptr; options.prefix_extractor = nullptr;

View File

@ -265,6 +265,14 @@ Options DBTestBase::CurrentOptions(
Options options = defaultOptions; Options options = defaultOptions;
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
bool set_block_based_table_factory = true; bool set_block_based_table_factory = true;
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
!defined(OS_AIX)
rocksdb::SyncPoint::GetInstance()->ClearCallBack(
"NewRandomAccessFile:O_DIRECT");
rocksdb::SyncPoint::GetInstance()->ClearCallBack(
"NewWritableFile:O_DIRECT");
#endif
switch (option_config_) { switch (option_config_) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
case kHashSkipList: case kHashSkipList:
@ -429,6 +437,26 @@ Options DBTestBase::CurrentOptions(
options.enable_write_thread_adaptive_yield = true; options.enable_write_thread_adaptive_yield = true;
break; break;
} }
case kDirectIO: {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
options.compaction_readahead_size = 2 * 1024 * 1024;
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
!defined(OS_AIX)
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewWritableFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewRandomAccessFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
#endif
break;
}
default: default:
break; break;

View File

@ -620,12 +620,13 @@ class DBTestBase : public testing::Test {
kRowCache = 27, kRowCache = 27,
kRecycleLogFiles = 28, kRecycleLogFiles = 28,
kConcurrentSkipList = 29, kConcurrentSkipList = 29,
kEnd = 30, kDirectIO = 30,
kLevelSubcompactions = 31, kEnd = 31,
kUniversalSubcompactions = 32, kLevelSubcompactions = 32,
kBlockBasedTableWithIndexRestartInterval = 33, kUniversalSubcompactions = 33,
kBlockBasedTableWithPartitionedIndex = 34, kBlockBasedTableWithIndexRestartInterval = 34,
kPartitionedFilterWithNewTableReaderForCompactions = 35, kBlockBasedTableWithPartitionedIndex = 35,
kPartitionedFilterWithNewTableReaderForCompactions = 36,
}; };
int option_config_; int option_config_;

View File

@ -316,7 +316,8 @@ class Repairer {
// Open the log file // Open the log file
std::string logname = LogFileName(dbname_, log); std::string logname = LogFileName(dbname_, log);
unique_ptr<SequentialFile> lfile; unique_ptr<SequentialFile> lfile;
Status status = env_->NewSequentialFile(logname, &lfile, env_options_); Status status = env_->NewSequentialFile(
logname, &lfile, env_->OptimizeForLogRead(env_options_));
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }

View File

@ -47,17 +47,18 @@ Status TransactionLogIteratorImpl::OpenLogFile(
Env* env = options_->env; Env* env = options_->env;
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
Status s; Status s;
EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_);
if (logFile->Type() == kArchivedLogFile) { if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
s = env->NewSequentialFile(fname, &file, soptions_); s = env->NewSequentialFile(fname, &file, optimized_env_options);
} else { } else {
std::string fname = LogFileName(dir_, logFile->LogNumber()); std::string fname = LogFileName(dir_, logFile->LogNumber());
s = env->NewSequentialFile(fname, &file, soptions_); s = env->NewSequentialFile(fname, &file, optimized_env_options);
if (!s.ok()) { if (!s.ok()) {
// If cannot open file in DB directory. // If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile. // Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dir_, logFile->LogNumber()); fname = ArchivedLogFileName(dir_, logFile->LogNumber());
s = env->NewSequentialFile(fname, &file, soptions_); s = env->NewSequentialFile(fname, &file, optimized_env_options);
} }
} }
if (s.ok()) { if (s.ok()) {

View File

@ -2637,7 +2637,7 @@ Status VersionSet::Recover(
{ {
unique_ptr<SequentialFile> manifest_file; unique_ptr<SequentialFile> manifest_file;
s = env_->NewSequentialFile(manifest_filename, &manifest_file, s = env_->NewSequentialFile(manifest_filename, &manifest_file,
env_options_); env_->OptimizeForManifestRead(env_options_));
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -3064,7 +3064,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Status s; Status s;
{ {
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
s = options.env->NewSequentialFile(dscname, &file, env_options_); s = options.env->NewSequentialFile(
dscname, &file, env_->OptimizeForManifestRead(env_options_));
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

View File

@ -433,7 +433,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,
}; };
std::unique_ptr<SequentialFile> file; std::unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, env_options_); Status status = env_->NewSequentialFile(
fname, &file, env_->OptimizeForLogRead(env_options_));
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(
new SequentialFileReader(std::move(file))); new SequentialFileReader(std::move(file)));

12
env/env.cc vendored
View File

@ -342,6 +342,18 @@ EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
return env_options; return env_options;
} }
EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = false;
return optimized_env_options;
}
EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = false;
return optimized_env_options;
}
EnvOptions Env::OptimizeForCompactionTableWrite( EnvOptions Env::OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options); EnvOptions optimized_env_options(env_options);

2
env/env_test.cc vendored
View File

@ -1233,7 +1233,7 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
ASSERT_EQ(size, 4096 * i); ASSERT_EQ(size, 4096 * i);
ASSERT_EQ(size, file_attrs_iter->size_bytes); ASSERT_EQ(size, file_attrs_iter->size_bytes);
} }
rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->ClearTrace();
} }
// Test that all WritableFileWrapper forwards all calls to WritableFile. // Test that all WritableFileWrapper forwards all calls to WritableFile.

View File

@ -379,6 +379,16 @@ class Env {
// Generates a unique id that can be used to identify a db // Generates a unique id that can be used to identify a db
virtual std::string GenerateUniqueId(); virtual std::string GenerateUniqueId();
// OptimizeForLogWrite will create a new EnvOptions object that is a copy of
// the EnvOptions in the parameters, but is optimized for reading log files.
virtual EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const;
// OptimizeForManifestRead will create a new EnvOptions object that is a copy
// of the EnvOptions in the parameters, but is optimized for reading manifest
// files.
virtual EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const;
// OptimizeForLogWrite will create a new EnvOptions object that is a copy of // OptimizeForLogWrite will create a new EnvOptions object that is a copy of
// the EnvOptions in the parameters, but is optimized for writing log files. // the EnvOptions in the parameters, but is optimized for writing log files.
// Default implementation returns the copy of the same object. // Default implementation returns the copy of the same object.
@ -390,16 +400,16 @@ class Env {
virtual EnvOptions OptimizeForManifestWrite( virtual EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const; const EnvOptions& env_options) const;
// OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy // OptimizeForCompactionTableWrite will create a new EnvOptions object that is
// of the EnvOptions in the parameters, but is optimized for writing table // a copy of the EnvOptions in the parameters, but is optimized for writing
// files. Default implementation returns the copy of the same object. // table files.
virtual EnvOptions OptimizeForCompactionTableWrite( virtual EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const; const ImmutableDBOptions& db_options) const;
// OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy // OptimizeForCompactionTableWrite will create a new EnvOptions object that
// of the EnvOptions in the parameters, but is optimized for reading table // is a copy of the EnvOptions in the parameters, but is optimized for reading
// files. Default implementation returns the copy of the same object. // table files.
virtual EnvOptions OptimizeForCompactionTableRead( virtual EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options, const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const; const ImmutableDBOptions& db_options) const;

View File

@ -31,6 +31,7 @@ int main() {
#else #else
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#include <fcntl.h>
#include <inttypes.h> #include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -60,7 +61,12 @@ int main() {
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
#include "util/sync_point.h"
#endif // !(defined NDEBUG) || !defined(OS_WIN)
#include "util/testutil.h" #include "util/testutil.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
using GFLAGS::ParseCommandLineFlags; using GFLAGS::ParseCommandLineFlags;
@ -1165,6 +1171,8 @@ class StressTest {
ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2", ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2",
}}, }},
{"max_sequential_skip_in_iterations", {"4", "8", "12"}}, {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
{"use_direct_reads", {"false", "true"}},
{"use_direct_io_for_flush_and_compaction", {"false", "true"}},
}; };
options_table_ = std::move(options_tbl); options_table_ = std::move(options_tbl);
@ -2352,6 +2360,20 @@ int main(int argc, char** argv) {
SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
" [OPTIONS]..."); " [OPTIONS]...");
ParseCommandLineFlags(&argc, &argv, true); ParseCommandLineFlags(&argc, &argv, true);
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
!defined(OS_AIX)
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewWritableFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewRandomAccessFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
#endif
if (FLAGS_statistics) { if (FLAGS_statistics) {
dbstats = rocksdb::CreateDBStatistics(); dbstats = rocksdb::CreateDBStatistics();

View File

@ -91,6 +91,14 @@ void SyncPoint::SetCallBack(const std::string point,
callbacks_[point] = callback; callbacks_[point] = callback;
} }
void SyncPoint::ClearCallBack(const std::string point) {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.erase(point);
}
void SyncPoint::ClearAllCallBacks() { void SyncPoint::ClearAllCallBacks() {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) { while (num_callbacks_running_ > 0) {

View File

@ -85,6 +85,10 @@ class SyncPoint {
// Set up a call back function in sync point. // Set up a call back function in sync point.
void SetCallBack(const std::string point, void SetCallBack(const std::string point,
std::function<void(void*)> callback); std::function<void(void*)> callback);
// Clear callback function by point
void ClearCallBack(const std::string point);
// Clear all call back functions. // Clear all call back functions.
void ClearAllCallBacks(); void ClearAllCallBacks();

View File

@ -1154,7 +1154,7 @@ Status BackupEngineImpl::CopyOrCreateFile(
unique_ptr<SequentialFile> src_file; unique_ptr<SequentialFile> src_file;
EnvOptions env_options; EnvOptions env_options;
env_options.use_mmap_writes = false; env_options.use_mmap_writes = false;
// TODO:(gzh) maybe use direct writes here if possible // TODO:(gzh) maybe use direct reads/writes here if possible
if (size != nullptr) { if (size != nullptr) {
*size = 0; *size = 0;
} }