Merge pull request #803 from SherlockNoMad/SkipFlush
Add Option to Skip Flushing in TableBuilder
This commit is contained in:
commit
f31442fb5c
5
db/c.cc
5
db/c.cc
@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks(
|
|||||||
options->rep.cache_index_and_filter_blocks = v;
|
options->rep.cache_index_and_filter_blocks = v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rocksdb_block_based_options_set_skip_table_builder_flush(
|
||||||
|
rocksdb_block_based_table_options_t* options, unsigned char v) {
|
||||||
|
options->rep.skip_table_builder_flush = v;
|
||||||
|
}
|
||||||
|
|
||||||
void rocksdb_options_set_block_based_table_factory(
|
void rocksdb_options_set_block_based_table_factory(
|
||||||
rocksdb_options_t *opt,
|
rocksdb_options_t *opt,
|
||||||
rocksdb_block_based_table_options_t* table_options) {
|
rocksdb_block_based_table_options_t* table_options) {
|
||||||
|
@ -376,6 +376,12 @@ DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
|
|||||||
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
|
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
|
||||||
"Maximum windows randomaccess buffer size");
|
"Maximum windows randomaccess buffer size");
|
||||||
|
|
||||||
|
DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
|
||||||
|
"Maximum write buffer for Writeable File");
|
||||||
|
|
||||||
|
DEFINE_int32(skip_table_builder_flush, false, "Skip flushing block in "
|
||||||
|
"table builder ");
|
||||||
|
|
||||||
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
|
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
|
||||||
" use default settings.");
|
" use default settings.");
|
||||||
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
|
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
|
||||||
@ -2299,6 +2305,7 @@ class Benchmark {
|
|||||||
FLAGS_new_table_reader_for_compaction_inputs;
|
FLAGS_new_table_reader_for_compaction_inputs;
|
||||||
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
|
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
|
||||||
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
|
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
|
||||||
|
options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
|
||||||
options.statistics = dbstats;
|
options.statistics = dbstats;
|
||||||
if (FLAGS_enable_io_prio) {
|
if (FLAGS_enable_io_prio) {
|
||||||
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
|
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
|
||||||
@ -2441,6 +2448,8 @@ class Benchmark {
|
|||||||
block_based_options.block_size = FLAGS_block_size;
|
block_based_options.block_size = FLAGS_block_size;
|
||||||
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
|
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
|
||||||
block_based_options.filter_policy = filter_policy_;
|
block_based_options.filter_policy = filter_policy_;
|
||||||
|
block_based_options.skip_table_builder_flush =
|
||||||
|
FLAGS_skip_table_builder_flush;
|
||||||
block_based_options.format_version = 2;
|
block_based_options.format_version = 2;
|
||||||
options.table_factory.reset(
|
options.table_factory.reset(
|
||||||
NewBlockBasedTableFactory(block_based_options));
|
NewBlockBasedTableFactory(block_based_options));
|
||||||
|
@ -450,6 +450,9 @@ rocksdb_block_based_options_set_hash_index_allow_collision(
|
|||||||
extern ROCKSDB_LIBRARY_API void
|
extern ROCKSDB_LIBRARY_API void
|
||||||
rocksdb_block_based_options_set_cache_index_and_filter_blocks(
|
rocksdb_block_based_options_set_cache_index_and_filter_blocks(
|
||||||
rocksdb_block_based_table_options_t*, unsigned char);
|
rocksdb_block_based_table_options_t*, unsigned char);
|
||||||
|
extern ROCKSDB_LIBRARY_API void
|
||||||
|
rocksdb_block_based_options_set_skip_table_builder_flush(
|
||||||
|
rocksdb_block_based_table_options_t* options, unsigned char);
|
||||||
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(
|
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(
|
||||||
rocksdb_options_t* opt, rocksdb_block_based_table_options_t* table_options);
|
rocksdb_options_t* opt, rocksdb_block_based_table_options_t* table_options);
|
||||||
|
|
||||||
|
@ -94,6 +94,9 @@ struct EnvOptions {
|
|||||||
// See DBOPtions doc
|
// See DBOPtions doc
|
||||||
size_t random_access_max_buffer_size;
|
size_t random_access_max_buffer_size;
|
||||||
|
|
||||||
|
// See DBOptions doc
|
||||||
|
size_t writable_file_max_buffer_size = 1024 * 1024;
|
||||||
|
|
||||||
// If not nullptr, write rate limiting is enabled for flush and compaction
|
// If not nullptr, write rate limiting is enabled for flush and compaction
|
||||||
RateLimiter* rate_limiter = nullptr;
|
RateLimiter* rate_limiter = nullptr;
|
||||||
};
|
};
|
||||||
|
@ -1089,6 +1089,14 @@ struct DBOptions {
|
|||||||
// Default: 1 Mb
|
// Default: 1 Mb
|
||||||
size_t random_access_max_buffer_size;
|
size_t random_access_max_buffer_size;
|
||||||
|
|
||||||
|
// This is the maximum buffer size that is used by WritableFileWriter.
|
||||||
|
// On Windows, we need to maintain an aligned buffer for writes.
|
||||||
|
// We allow the buffer to grow until it's size hits the limit.
|
||||||
|
//
|
||||||
|
// Default: 1024 * 1024 (1 MB)
|
||||||
|
size_t writable_file_max_buffer_size;
|
||||||
|
|
||||||
|
|
||||||
// Use adaptive mutex, which spins in the user space before resorting
|
// Use adaptive mutex, which spins in the user space before resorting
|
||||||
// to kernel. This could reduce context switch when the mutex is not
|
// to kernel. This could reduce context switch when the mutex is not
|
||||||
// heavily contended. However, if the mutex is hot, we could end up
|
// heavily contended. However, if the mutex is hot, we could end up
|
||||||
|
@ -128,6 +128,20 @@ struct BlockBasedTableOptions {
|
|||||||
// This must generally be true for gets to be efficient.
|
// This must generally be true for gets to be efficient.
|
||||||
bool whole_key_filtering = true;
|
bool whole_key_filtering = true;
|
||||||
|
|
||||||
|
// If true, block will not be explictly flushed to disk during building
|
||||||
|
// a SstTable. Instead, buffer in WritableFileWriter will take
|
||||||
|
// care of the flushing when it is full.
|
||||||
|
//
|
||||||
|
// On Windows, this option helps a lot when unbuffered I/O
|
||||||
|
// (allow_os_buffer = false) is used, since it avoids small
|
||||||
|
// unbuffered disk write.
|
||||||
|
//
|
||||||
|
// User may also adjust writable_file_max_buffer_size to optimize disk I/O
|
||||||
|
// size.
|
||||||
|
//
|
||||||
|
// Default: false
|
||||||
|
bool skip_table_builder_flush = false;
|
||||||
|
|
||||||
// We currently have three versions:
|
// We currently have three versions:
|
||||||
// 0 -- This version is currently written out by all RocksDB's versions by
|
// 0 -- This version is currently written out by all RocksDB's versions by
|
||||||
// default. Can be read by really old RocksDB's. Doesn't support changing
|
// default. Can be read by really old RocksDB's. Doesn't support changing
|
||||||
|
@ -592,7 +592,7 @@ void BlockBasedTableBuilder::Flush() {
|
|||||||
if (!ok()) return;
|
if (!ok()) return;
|
||||||
if (r->data_block.empty()) return;
|
if (r->data_block.empty()) return;
|
||||||
WriteBlock(&r->data_block, &r->pending_handle);
|
WriteBlock(&r->data_block, &r->pending_handle);
|
||||||
if (ok()) {
|
if (ok() && !r->table_options.skip_table_builder_flush) {
|
||||||
r->status = r->file->Flush();
|
r->status = r->file->Flush();
|
||||||
}
|
}
|
||||||
if (r->filter_block != nullptr) {
|
if (r->filter_block != nullptr) {
|
||||||
|
@ -152,6 +152,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
|
|||||||
ret.append(buffer);
|
ret.append(buffer);
|
||||||
snprintf(buffer, kBufferSize, " whole_key_filtering: %d\n",
|
snprintf(buffer, kBufferSize, " whole_key_filtering: %d\n",
|
||||||
table_options_.whole_key_filtering);
|
table_options_.whole_key_filtering);
|
||||||
|
ret.append(buffer);
|
||||||
|
snprintf(buffer, kBufferSize, " skip_table_builder_flush: %d\n",
|
||||||
|
table_options_.skip_table_builder_flush);
|
||||||
|
ret.append(buffer);
|
||||||
snprintf(buffer, kBufferSize, " format_version: %d\n",
|
snprintf(buffer, kBufferSize, " format_version: %d\n",
|
||||||
table_options_.format_version);
|
table_options_.format_version);
|
||||||
ret.append(buffer);
|
ret.append(buffer);
|
||||||
|
@ -296,6 +296,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
|
|||||||
env_options->random_access_max_buffer_size =
|
env_options->random_access_max_buffer_size =
|
||||||
options.random_access_max_buffer_size;
|
options.random_access_max_buffer_size;
|
||||||
env_options->rate_limiter = options.rate_limiter.get();
|
env_options->rate_limiter = options.rate_limiter.get();
|
||||||
|
env_options->writable_file_max_buffer_size =
|
||||||
|
options.writable_file_max_buffer_size;
|
||||||
env_options->allow_fallocate = options.allow_fallocate;
|
env_options->allow_fallocate = options.allow_fallocate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,10 +21,6 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
namespace {
|
|
||||||
const size_t c_OneMb = (1 << 20);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
|
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
|
||||||
Status s = file_->Read(n, result, scratch);
|
Status s = file_->Read(n, result, scratch);
|
||||||
IOSTATS_ADD(bytes_read, result->size());
|
IOSTATS_ADD(bytes_read, result->size());
|
||||||
@ -76,9 +72,9 @@ Status WritableFileWriter::Append(const Slice& data) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (buf_.Capacity() < c_OneMb) {
|
if (buf_.Capacity() < max_buffer_size_) {
|
||||||
size_t desiredCapacity = buf_.Capacity() * 2;
|
size_t desiredCapacity = buf_.Capacity() * 2;
|
||||||
desiredCapacity = std::min(desiredCapacity, c_OneMb);
|
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
|
||||||
buf_.AllocateNewBuffer(desiredCapacity);
|
buf_.AllocateNewBuffer(desiredCapacity);
|
||||||
}
|
}
|
||||||
assert(buf_.CurrentSize() == 0);
|
assert(buf_.CurrentSize() == 0);
|
||||||
@ -102,9 +98,9 @@ Status WritableFileWriter::Append(const Slice& data) {
|
|||||||
// We double the buffer here because
|
// We double the buffer here because
|
||||||
// Flush calls do not keep up with the incoming bytes
|
// Flush calls do not keep up with the incoming bytes
|
||||||
// This is the only place when buffer is changed with unbuffered I/O
|
// This is the only place when buffer is changed with unbuffered I/O
|
||||||
if (buf_.Capacity() < c_OneMb) {
|
if (buf_.Capacity() < max_buffer_size_) {
|
||||||
size_t desiredCapacity = buf_.Capacity() * 2;
|
size_t desiredCapacity = buf_.Capacity() * 2;
|
||||||
desiredCapacity = std::min(desiredCapacity, c_OneMb);
|
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
|
||||||
buf_.AllocateNewBuffer(desiredCapacity);
|
buf_.AllocateNewBuffer(desiredCapacity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -156,7 +152,6 @@ Status WritableFileWriter::Close() {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// write out the cached data to the OS cache
|
// write out the cached data to the OS cache
|
||||||
Status WritableFileWriter::Flush() {
|
Status WritableFileWriter::Flush() {
|
||||||
Status s;
|
Status s;
|
||||||
|
@ -93,6 +93,7 @@ class WritableFileWriter {
|
|||||||
private:
|
private:
|
||||||
std::unique_ptr<WritableFile> writable_file_;
|
std::unique_ptr<WritableFile> writable_file_;
|
||||||
AlignedBuffer buf_;
|
AlignedBuffer buf_;
|
||||||
|
size_t max_buffer_size_;
|
||||||
// Actually written data size can be used for truncate
|
// Actually written data size can be used for truncate
|
||||||
// not counting padding data
|
// not counting padding data
|
||||||
uint64_t filesize_;
|
uint64_t filesize_;
|
||||||
@ -113,6 +114,7 @@ class WritableFileWriter {
|
|||||||
const EnvOptions& options)
|
const EnvOptions& options)
|
||||||
: writable_file_(std::move(file)),
|
: writable_file_(std::move(file)),
|
||||||
buf_(),
|
buf_(),
|
||||||
|
max_buffer_size_(options.writable_file_max_buffer_size),
|
||||||
filesize_(0),
|
filesize_(0),
|
||||||
next_write_offset_(0),
|
next_write_offset_(0),
|
||||||
pending_sync_(false),
|
pending_sync_(false),
|
||||||
|
@ -251,6 +251,7 @@ DBOptions::DBOptions()
|
|||||||
new_table_reader_for_compaction_inputs(false),
|
new_table_reader_for_compaction_inputs(false),
|
||||||
compaction_readahead_size(0),
|
compaction_readahead_size(0),
|
||||||
random_access_max_buffer_size(1024 * 1024),
|
random_access_max_buffer_size(1024 * 1024),
|
||||||
|
writable_file_max_buffer_size(1024 * 1024),
|
||||||
use_adaptive_mutex(false),
|
use_adaptive_mutex(false),
|
||||||
bytes_per_sync(0),
|
bytes_per_sync(0),
|
||||||
wal_bytes_per_sync(0),
|
wal_bytes_per_sync(0),
|
||||||
@ -313,6 +314,7 @@ DBOptions::DBOptions(const Options& options)
|
|||||||
options.new_table_reader_for_compaction_inputs),
|
options.new_table_reader_for_compaction_inputs),
|
||||||
compaction_readahead_size(options.compaction_readahead_size),
|
compaction_readahead_size(options.compaction_readahead_size),
|
||||||
random_access_max_buffer_size(options.random_access_max_buffer_size),
|
random_access_max_buffer_size(options.random_access_max_buffer_size),
|
||||||
|
writable_file_max_buffer_size(options.writable_file_max_buffer_size),
|
||||||
use_adaptive_mutex(options.use_adaptive_mutex),
|
use_adaptive_mutex(options.use_adaptive_mutex),
|
||||||
bytes_per_sync(options.bytes_per_sync),
|
bytes_per_sync(options.bytes_per_sync),
|
||||||
wal_bytes_per_sync(options.wal_bytes_per_sync),
|
wal_bytes_per_sync(options.wal_bytes_per_sync),
|
||||||
@ -412,6 +414,10 @@ void DBOptions::Dump(Logger* log) const {
|
|||||||
" Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt
|
" Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt
|
||||||
"d",
|
"d",
|
||||||
random_access_max_buffer_size);
|
random_access_max_buffer_size);
|
||||||
|
Header(log,
|
||||||
|
" Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt
|
||||||
|
"d",
|
||||||
|
writable_file_max_buffer_size);
|
||||||
Header(log, " Options.use_adaptive_mutex: %d",
|
Header(log, " Options.use_adaptive_mutex: %d",
|
||||||
use_adaptive_mutex);
|
use_adaptive_mutex);
|
||||||
Header(log, " Options.rate_limiter: %p",
|
Header(log, " Options.rate_limiter: %p",
|
||||||
|
@ -184,6 +184,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
|
|||||||
{"random_access_max_buffer_size",
|
{"random_access_max_buffer_size",
|
||||||
{offsetof(struct DBOptions, random_access_max_buffer_size),
|
{offsetof(struct DBOptions, random_access_max_buffer_size),
|
||||||
OptionType::kSizeT, OptionVerificationType::kNormal}},
|
OptionType::kSizeT, OptionVerificationType::kNormal}},
|
||||||
|
{"writable_file_max_buffer_size",
|
||||||
|
{offsetof(struct DBOptions, writable_file_max_buffer_size),
|
||||||
|
OptionType::kSizeT, OptionVerificationType::kNormal}},
|
||||||
{"use_adaptive_mutex",
|
{"use_adaptive_mutex",
|
||||||
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
|
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
|
||||||
OptionVerificationType::kNormal}},
|
OptionVerificationType::kNormal}},
|
||||||
@ -460,6 +463,9 @@ static std::unordered_map<std::string,
|
|||||||
{"whole_key_filtering",
|
{"whole_key_filtering",
|
||||||
{offsetof(struct BlockBasedTableOptions, whole_key_filtering),
|
{offsetof(struct BlockBasedTableOptions, whole_key_filtering),
|
||||||
OptionType::kBoolean, OptionVerificationType::kNormal}},
|
OptionType::kBoolean, OptionVerificationType::kNormal}},
|
||||||
|
{"skip_table_builder_flush",
|
||||||
|
{offsetof(struct BlockBasedTableOptions, skip_table_builder_flush),
|
||||||
|
OptionType::kBoolean, OptionVerificationType::kNormal}},
|
||||||
{"format_version",
|
{"format_version",
|
||||||
{offsetof(struct BlockBasedTableOptions, format_version),
|
{offsetof(struct BlockBasedTableOptions, format_version),
|
||||||
OptionType::kUInt32T, OptionVerificationType::kNormal}}};
|
OptionType::kUInt32T, OptionVerificationType::kNormal}}};
|
||||||
|
@ -341,6 +341,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
|||||||
{"new_table_reader_for_compaction_inputs", "true"},
|
{"new_table_reader_for_compaction_inputs", "true"},
|
||||||
{"compaction_readahead_size", "100"},
|
{"compaction_readahead_size", "100"},
|
||||||
{"random_access_max_buffer_size", "3145728"},
|
{"random_access_max_buffer_size", "3145728"},
|
||||||
|
{"writable_file_max_buffer_size", "314159"},
|
||||||
{"bytes_per_sync", "47"},
|
{"bytes_per_sync", "47"},
|
||||||
{"wal_bytes_per_sync", "48"},
|
{"wal_bytes_per_sync", "48"},
|
||||||
};
|
};
|
||||||
@ -452,6 +453,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
|||||||
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
|
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
|
||||||
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
|
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
|
||||||
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);
|
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);
|
||||||
|
ASSERT_EQ(new_db_opt.writable_file_max_buffer_size, 314159);
|
||||||
ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast<uint64_t>(47));
|
ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast<uint64_t>(47));
|
||||||
ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast<uint64_t>(48));
|
ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast<uint64_t>(48));
|
||||||
}
|
}
|
||||||
@ -621,7 +623,8 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
|||||||
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
|
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
|
||||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||||
"block_size_deviation=8;block_restart_interval=4;"
|
"block_size_deviation=8;block_restart_interval=4;"
|
||||||
"filter_policy=bloomfilter:4:true;whole_key_filtering=1",
|
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;"
|
||||||
|
"skip_table_builder_flush=1",
|
||||||
&new_opt));
|
&new_opt));
|
||||||
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
|
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
|
||||||
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
|
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
|
||||||
@ -636,6 +639,7 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
|||||||
ASSERT_EQ(new_opt.block_size_deviation, 8);
|
ASSERT_EQ(new_opt.block_size_deviation, 8);
|
||||||
ASSERT_EQ(new_opt.block_restart_interval, 4);
|
ASSERT_EQ(new_opt.block_restart_interval, 4);
|
||||||
ASSERT_TRUE(new_opt.filter_policy != nullptr);
|
ASSERT_TRUE(new_opt.filter_policy != nullptr);
|
||||||
|
ASSERT_TRUE(new_opt.skip_table_builder_flush);
|
||||||
|
|
||||||
// unknown option
|
// unknown option
|
||||||
ASSERT_NOK(GetBlockBasedTableOptionsFromString(table_opt,
|
ASSERT_NOK(GetBlockBasedTableOptionsFromString(table_opt,
|
||||||
|
Loading…
Reference in New Issue
Block a user