Support saving history in memtable_list
Summary: For transactions, we are using the memtables to validate that there are no write conflicts. But after flushing, we don't have any memtables, and transactions could fail to commit. So we want to someone keep around some extra history to use for conflict checking. In addition, we want to provide a way to increase the size of this history if too many transactions fail to commit. After chatting with people, it seems like everyone prefers just using Memtables to store this history (instead of a separate history structure). It seems like the best place for this is abstracted inside the memtable_list. I decide to create a separate list in MemtableListVersion as using the same list complicated the flush/installalflushresults logic too much. This diff adds a new parameter to control how much memtable history to keep around after flushing. However, it sounds like people aren't too fond of adding new parameters. So I am making the default size of flushed+not-flushed memtables be set to max_write_buffers. This should not change the maximum amount of memory used, but make it more likely we're using closer the the limit. (We are now postponing deleting flushed memtables until the max_write_buffer limit is reached). So while we might use more memory on average, we are still obeying the limit set (and you could argue it's better to go ahead and use up memory now instead of waiting for a write stall to happen to test this limit). However, if people are opposed to this default behavior, we can easily set it to 0 and require this parameter be set in order to use transactions. Test Plan: Added a xfunc test to play around with setting different values of this parameter in all tests. Added testing in memtablelist_test and planning on adding more testing here. Reviewers: sdong, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D37443
This commit is contained in:
parent
ec4ff4e99c
commit
c815351038
@ -2,7 +2,8 @@
|
||||
|
||||
## Public API changes
|
||||
* DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error.
|
||||
* Move listeners from ColumnFamilyOptions to DBOptions.
|
||||
* Move listeners from ColumnFamilyOptions to DBOptions.
|
||||
* Add max_write_buffer_number_to_maintain option
|
||||
|
||||
## 3.11.0 (5/19/2015)
|
||||
|
||||
|
5
db/c.cc
5
db/c.cc
@ -1594,6 +1594,11 @@ void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt
|
||||
opt->rep.min_write_buffer_number_to_merge = n;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_max_write_buffer_number_to_maintain(
|
||||
rocksdb_options_t* opt, int n) {
|
||||
opt->rep.max_write_buffer_number_to_maintain = n;
|
||||
}
|
||||
|
||||
void rocksdb_options_set_max_background_compactions(rocksdb_options_t* opt, int n) {
|
||||
opt->rep.max_background_compactions = n;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include "util/autovector.h"
|
||||
#include "util/hash_skiplist_rep.h"
|
||||
#include "util/options_helper.h"
|
||||
#include "util/xfunc.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -152,6 +153,18 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
|
||||
if (result.max_write_buffer_number < 2) {
|
||||
result.max_write_buffer_number = 2;
|
||||
}
|
||||
if (result.max_write_buffer_number_to_maintain < 0) {
|
||||
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
|
||||
}
|
||||
XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
|
||||
xf_transaction_set_memtable_history1,
|
||||
xf_transaction_set_memtable_history,
|
||||
&result.max_write_buffer_number_to_maintain);
|
||||
XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
|
||||
xf_transaction_clear_memtable_history1,
|
||||
xf_transaction_clear_memtable_history,
|
||||
&result.max_write_buffer_number_to_maintain);
|
||||
|
||||
if (!result.prefix_extractor) {
|
||||
assert(result.memtable_factory);
|
||||
Slice name = result.memtable_factory->Name();
|
||||
@ -309,7 +322,8 @@ ColumnFamilyData::ColumnFamilyData(
|
||||
mutable_cf_options_(options_, ioptions_),
|
||||
write_buffer_(write_buffer),
|
||||
mem_(nullptr),
|
||||
imm_(options_.min_write_buffer_number_to_merge),
|
||||
imm_(options_.min_write_buffer_number_to_merge,
|
||||
options_.max_write_buffer_number_to_maintain),
|
||||
super_version_(nullptr),
|
||||
super_version_number_(0),
|
||||
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
|
||||
@ -445,13 +459,13 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
|
||||
auto write_controller = column_family_set_->write_controller_;
|
||||
|
||||
if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
|
||||
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
|
||||
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
|
||||
"[%s] Stopping writes because we have %d immutable memtables "
|
||||
"(waiting for flush), max_write_buffer_number is set to %d",
|
||||
name_.c_str(), imm()->size(),
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number);
|
||||
} else if (vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger) {
|
||||
|
@ -663,15 +663,19 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
|
||||
default_cf.write_buffer_size = 100000;
|
||||
default_cf.max_write_buffer_number = 10;
|
||||
default_cf.min_write_buffer_number_to_merge = 1;
|
||||
default_cf.max_write_buffer_number_to_maintain = 0;
|
||||
one.write_buffer_size = 200000;
|
||||
one.max_write_buffer_number = 10;
|
||||
one.min_write_buffer_number_to_merge = 2;
|
||||
one.max_write_buffer_number_to_maintain = 1;
|
||||
two.write_buffer_size = 1000000;
|
||||
two.max_write_buffer_number = 10;
|
||||
two.min_write_buffer_number_to_merge = 3;
|
||||
two.max_write_buffer_number_to_maintain = 2;
|
||||
three.write_buffer_size = 90000;
|
||||
three.max_write_buffer_number = 10;
|
||||
three.min_write_buffer_number_to_merge = 4;
|
||||
three.max_write_buffer_number_to_maintain = -1;
|
||||
|
||||
Reopen({default_cf, one, two, three});
|
||||
|
||||
|
@ -251,6 +251,20 @@ DEFINE_int32(min_write_buffer_number_to_merge,
|
||||
" writing less data to storage if there are duplicate records "
|
||||
" in each of these individual write buffers.");
|
||||
|
||||
DEFINE_int32(max_write_buffer_number_to_maintain,
|
||||
rocksdb::Options().max_write_buffer_number_to_maintain,
|
||||
"The total maximum number of write buffers to maintain in memory "
|
||||
"including copies of buffers that have already been flushed. "
|
||||
"Unlike max_write_buffer_number, this parameter does not affect "
|
||||
"flushing. This controls the minimum amount of write history "
|
||||
"that will be available in memory for conflict checking when "
|
||||
"Transactions are used. If this value is too low, some "
|
||||
"transactions may fail at commit time due to not being able to "
|
||||
"determine whether there were any write conflicts. Setting this "
|
||||
"value to 0 will cause write buffers to be freed immediately "
|
||||
"after they are flushed. If this value is set to -1, "
|
||||
"'max_write_buffer_number' will be used.");
|
||||
|
||||
DEFINE_int32(max_background_compactions,
|
||||
rocksdb::Options().max_background_compactions,
|
||||
"The maximum number of concurrent background compactions"
|
||||
@ -2033,6 +2047,8 @@ class Benchmark {
|
||||
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
|
||||
options.min_write_buffer_number_to_merge =
|
||||
FLAGS_min_write_buffer_number_to_merge;
|
||||
options.max_write_buffer_number_to_maintain =
|
||||
FLAGS_max_write_buffer_number_to_maintain;
|
||||
options.max_background_compactions = FLAGS_max_background_compactions;
|
||||
options.max_background_flushes = FLAGS_max_background_flushes;
|
||||
options.compaction_style = FLAGS_compaction_style_e;
|
||||
|
@ -93,12 +93,16 @@ void DumpRocksDBBuildVersion(Logger * log);
|
||||
|
||||
struct DBImpl::WriteContext {
|
||||
autovector<SuperVersion*> superversions_to_free_;
|
||||
autovector<MemTable*> memtables_to_free_;
|
||||
bool schedule_bg_work_ = false;
|
||||
|
||||
~WriteContext() {
|
||||
for (auto& sv : superversions_to_free_) {
|
||||
delete sv;
|
||||
}
|
||||
for (auto& m : memtables_to_free_) {
|
||||
delete m;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -1201,7 +1205,7 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
|
||||
bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) {
|
||||
mutex_.AssertHeld();
|
||||
assert(cfd->imm()->size() != 0);
|
||||
assert(cfd->imm()->NumNotFlushed() != 0);
|
||||
assert(cfd->imm()->IsFlushPending());
|
||||
|
||||
FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
|
||||
@ -1850,7 +1854,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
WriteContext context;
|
||||
InstrumentedMutexLock guard_lock(&mutex_);
|
||||
|
||||
if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) {
|
||||
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) {
|
||||
// Nothing to flush
|
||||
return Status::OK();
|
||||
}
|
||||
@ -1882,7 +1886,7 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
|
||||
Status s;
|
||||
// Wait until the compaction completes
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
while (cfd->imm()->size() > 0 && bg_error_.ok()) {
|
||||
while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
if (!bg_error_.ok()) {
|
||||
@ -3527,13 +3531,13 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
|
||||
// doesn't need that particular log to stay alive, so we just
|
||||
// advance the log number. no need to persist this in the manifest
|
||||
if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
|
||||
loop_cfd->imm()->size() == 0) {
|
||||
loop_cfd->imm()->NumNotFlushed() == 0) {
|
||||
loop_cfd->SetLogNumber(logfile_number_);
|
||||
}
|
||||
}
|
||||
}
|
||||
cfd->mem()->SetNextLogNumber(logfile_number_);
|
||||
cfd->imm()->Add(cfd->mem());
|
||||
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
|
||||
new_mem->Ref();
|
||||
cfd->SetMemtable(new_mem);
|
||||
context->superversions_to_free_.push_back(
|
||||
|
@ -3141,6 +3141,7 @@ TEST_F(DBTest, FlushMultipleMemtable) {
|
||||
writeOpt.disableWAL = true;
|
||||
options.max_write_buffer_number = 4;
|
||||
options.min_write_buffer_number_to_merge = 3;
|
||||
options.max_write_buffer_number_to_maintain = -1;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
|
||||
ASSERT_OK(Flush(1));
|
||||
@ -3159,6 +3160,7 @@ TEST_F(DBTest, NumImmutableMemTable) {
|
||||
writeOpt.disableWAL = true;
|
||||
options.max_write_buffer_number = 4;
|
||||
options.min_write_buffer_number_to_merge = 3;
|
||||
options.max_write_buffer_number_to_maintain = 0;
|
||||
options.write_buffer_size = 1000000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
|
||||
@ -3314,6 +3316,7 @@ TEST_F(DBTest, FlushEmptyColumnFamily) {
|
||||
writeOpt.disableWAL = true;
|
||||
options.max_write_buffer_number = 2;
|
||||
options.min_write_buffer_number_to_merge = 1;
|
||||
options.max_write_buffer_number_to_maintain = 1;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
|
||||
// Compaction can still go through even if no thread can flush the
|
||||
@ -3360,6 +3363,7 @@ TEST_F(DBTest, GetProperty) {
|
||||
options.max_background_flushes = 1;
|
||||
options.max_write_buffer_number = 10;
|
||||
options.min_write_buffer_number_to_merge = 1;
|
||||
options.max_write_buffer_number_to_maintain = 0;
|
||||
options.write_buffer_size = 1000000;
|
||||
Reopen(options);
|
||||
|
||||
@ -3578,6 +3582,7 @@ TEST_F(DBTest, FlushSchedule) {
|
||||
options.level0_stop_writes_trigger = 1 << 10;
|
||||
options.level0_slowdown_writes_trigger = 1 << 10;
|
||||
options.min_write_buffer_number_to_merge = 1;
|
||||
options.max_write_buffer_number_to_maintain = 1;
|
||||
options.max_write_buffer_number = 2;
|
||||
options.write_buffer_size = 100 * 1000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
@ -3783,6 +3788,7 @@ Options DeletionTriggerOptions() {
|
||||
options.compression = kNoCompression;
|
||||
options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
|
||||
options.min_write_buffer_number_to_merge = 1;
|
||||
options.max_write_buffer_number_to_maintain = 0;
|
||||
options.num_levels = kCDTNumLevels;
|
||||
options.max_mem_compaction_level = 0;
|
||||
options.level0_file_num_compaction_trigger = 1;
|
||||
|
@ -107,7 +107,12 @@ TEST_F(FlushJobTest, NonEmpty) {
|
||||
InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
|
||||
inserted_keys.insert({internal_key.Encode().ToString(), value});
|
||||
}
|
||||
cfd->imm()->Add(new_mem);
|
||||
|
||||
autovector<MemTable*> to_delete;
|
||||
cfd->imm()->Add(new_mem, &to_delete);
|
||||
for (auto& m : to_delete) {
|
||||
delete m;
|
||||
}
|
||||
|
||||
EventLogger event_logger(db_options_.info_log.get());
|
||||
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
|
||||
|
@ -92,6 +92,8 @@ static const std::string cfstats = "cfstats";
|
||||
static const std::string dbstats = "dbstats";
|
||||
static const std::string levelstats = "levelstats";
|
||||
static const std::string num_immutable_mem_table = "num-immutable-mem-table";
|
||||
static const std::string num_immutable_mem_table_flushed =
|
||||
"num-immutable-mem-table-flushed";
|
||||
static const std::string mem_table_flush_pending = "mem-table-flush-pending";
|
||||
static const std::string compaction_pending = "compaction-pending";
|
||||
static const std::string background_errors = "background-errors";
|
||||
@ -185,6 +187,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
|
||||
*is_int_property = true;
|
||||
if (in == num_immutable_mem_table) {
|
||||
return kNumImmutableMemTable;
|
||||
} else if (in == num_immutable_mem_table_flushed) {
|
||||
return kNumImmutableMemTableFlushed;
|
||||
} else if (in == mem_table_flush_pending) {
|
||||
return kMemtableFlushPending;
|
||||
} else if (in == compaction_pending) {
|
||||
@ -307,7 +311,10 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
|
||||
|
||||
switch (property_type) {
|
||||
case kNumImmutableMemTable:
|
||||
*value = cfd_->imm()->size();
|
||||
*value = cfd_->imm()->NumNotFlushed();
|
||||
return true;
|
||||
case kNumImmutableMemTableFlushed:
|
||||
*value = cfd_->imm()->NumFlushed();
|
||||
return true;
|
||||
case kMemtableFlushPending:
|
||||
// Return number of mem tables that are ready to flush (made immutable)
|
||||
|
@ -32,9 +32,12 @@ enum DBPropertyType : uint32_t {
|
||||
kStats, // Return general statitistics of both DB and CF
|
||||
kSsTables, // Return a human readable string of current SST files
|
||||
kStartIntTypes, // ---- Dummy value to indicate the start of integer values
|
||||
kNumImmutableMemTable, // Return number of immutable mem tables
|
||||
kMemtableFlushPending, // Return 1 if mem table flushing is pending,
|
||||
// otherwise 0.
|
||||
kNumImmutableMemTable, // Return number of immutable mem tables that
|
||||
// have not been flushed.
|
||||
kNumImmutableMemTableFlushed, // Return number of immutable mem tables
|
||||
// in memory that have already been flushed
|
||||
kMemtableFlushPending, // Return 1 if mem table flushing is pending,
|
||||
// otherwise 0.
|
||||
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
|
||||
kBackgroundErrors, // Return accumulated background errors encountered.
|
||||
kCurSizeActiveMemTable, // Return current size of the active memtable
|
||||
|
@ -27,16 +27,27 @@ class InternalKeyComparator;
|
||||
class Mutex;
|
||||
class VersionSet;
|
||||
|
||||
MemTableListVersion::MemTableListVersion(MemTableListVersion* old) {
|
||||
MemTableListVersion::MemTableListVersion(MemTableListVersion* old)
|
||||
: max_write_buffer_number_to_maintain_(
|
||||
old->max_write_buffer_number_to_maintain_) {
|
||||
if (old != nullptr) {
|
||||
memlist_ = old->memlist_;
|
||||
size_ = old->size_;
|
||||
for (auto& m : memlist_) {
|
||||
m->Ref();
|
||||
}
|
||||
|
||||
memlist_history_ = old->memlist_history_;
|
||||
for (auto& m : memlist_history_) {
|
||||
m->Ref();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MemTableListVersion::MemTableListVersion(
|
||||
int max_write_buffer_number_to_maintain)
|
||||
: max_write_buffer_number_to_maintain_(
|
||||
max_write_buffer_number_to_maintain) {}
|
||||
|
||||
void MemTableListVersion::Ref() { ++refs_; }
|
||||
|
||||
void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
|
||||
@ -52,16 +63,24 @@ void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
|
||||
to_delete->push_back(x);
|
||||
}
|
||||
}
|
||||
for (const auto& m : memlist_history_) {
|
||||
MemTable* x = m->Unref();
|
||||
if (x != nullptr) {
|
||||
to_delete->push_back(x);
|
||||
}
|
||||
}
|
||||
delete this;
|
||||
}
|
||||
}
|
||||
|
||||
int MemTableListVersion::size() const { return size_; }
|
||||
int MemTableList::NumNotFlushed() const {
|
||||
int size = static_cast<int>(current_->memlist_.size());
|
||||
assert(num_flush_not_started_ <= size);
|
||||
return size;
|
||||
}
|
||||
|
||||
// Returns the total number of memtables in the list
|
||||
int MemTableList::size() const {
|
||||
assert(num_flush_not_started_ <= current_->size_);
|
||||
return current_->size_;
|
||||
int MemTableList::NumFlushed() const {
|
||||
return static_cast<int>(current_->memlist_history_.size());
|
||||
}
|
||||
|
||||
// Search all the memtables starting from the most recent one.
|
||||
@ -77,6 +96,17 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
|
||||
return false;
|
||||
}
|
||||
|
||||
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
|
||||
std::string* value, Status* s,
|
||||
MergeContext* merge_context) {
|
||||
for (auto& memtable : memlist_history_) {
|
||||
if (memtable->Get(key, value, s, merge_context)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void MemTableListVersion::AddIterators(const ReadOptions& options,
|
||||
std::vector<Iterator*>* iterator_list,
|
||||
Arena* arena) {
|
||||
@ -110,17 +140,41 @@ uint64_t MemTableListVersion::GetTotalNumDeletes() const {
|
||||
}
|
||||
|
||||
// caller is responsible for referencing m
|
||||
void MemTableListVersion::Add(MemTable* m) {
|
||||
void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
|
||||
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
|
||||
memlist_.push_front(m);
|
||||
++size_;
|
||||
|
||||
TrimHistory(to_delete);
|
||||
}
|
||||
|
||||
// caller is responsible for unreferencing m
|
||||
void MemTableListVersion::Remove(MemTable* m) {
|
||||
// Removes m from list of memtables not flushed. Caller should NOT Unref m.
|
||||
void MemTableListVersion::Remove(MemTable* m,
|
||||
autovector<MemTable*>* to_delete) {
|
||||
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
|
||||
memlist_.remove(m);
|
||||
--size_;
|
||||
|
||||
if (max_write_buffer_number_to_maintain_ > 0) {
|
||||
memlist_history_.push_front(m);
|
||||
TrimHistory(to_delete);
|
||||
} else {
|
||||
if (m->Unref()) {
|
||||
to_delete->push_back(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we don't use up too much space in history
|
||||
void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
|
||||
while (memlist_.size() + memlist_history_.size() >
|
||||
static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
|
||||
!memlist_history_.empty()) {
|
||||
MemTable* x = memlist_history_.back();
|
||||
memlist_history_.pop_back();
|
||||
|
||||
if (x->Unref()) {
|
||||
to_delete->push_back(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if there is at least one memtable on which flush has
|
||||
@ -229,12 +283,8 @@ Status MemTableList::InstallMemtableFlushResults(
|
||||
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
|
||||
": memtable #%" PRIu64 " done",
|
||||
cfd->GetName().c_str(), m->file_number_, mem_id);
|
||||
current_->Remove(m);
|
||||
assert(m->file_number_ > 0);
|
||||
|
||||
if (m->Unref() != nullptr) {
|
||||
to_delete->push_back(m);
|
||||
}
|
||||
current_->Remove(m, to_delete);
|
||||
} else {
|
||||
//commit failed. setup state so that we can flush again.
|
||||
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
|
||||
@ -256,15 +306,15 @@ Status MemTableList::InstallMemtableFlushResults(
|
||||
}
|
||||
|
||||
// New memtables are inserted at the front of the list.
|
||||
void MemTableList::Add(MemTable* m) {
|
||||
assert(current_->size_ >= num_flush_not_started_);
|
||||
void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
|
||||
assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
|
||||
InstallNewVersion();
|
||||
// this method is used to move mutable memtable into an immutable list.
|
||||
// since mutable memtable is already refcounted by the DBImpl,
|
||||
// and when moving to the imutable list we don't unref it,
|
||||
// we don't have to ref the memtable here. we just take over the
|
||||
// reference from the DBImpl.
|
||||
current_->Add(m);
|
||||
current_->Add(m, to_delete);
|
||||
m->MarkImmutable();
|
||||
num_flush_not_started_++;
|
||||
if (num_flush_not_started_ == 1) {
|
||||
|
@ -41,17 +41,23 @@ class MergeIteratorBuilder;
|
||||
class MemTableListVersion {
|
||||
public:
|
||||
explicit MemTableListVersion(MemTableListVersion* old = nullptr);
|
||||
explicit MemTableListVersion(int max_write_buffer_number_to_maintain);
|
||||
|
||||
void Ref();
|
||||
void Unref(autovector<MemTable*>* to_delete = nullptr);
|
||||
|
||||
int size() const;
|
||||
|
||||
// Search all the memtables starting from the most recent one.
|
||||
// Return the most recent value found, if any.
|
||||
bool Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context);
|
||||
|
||||
// Similar to Get(), but searches the Memtable history of memtables that
|
||||
// have already been flushed. Should only be used from in-memory only
|
||||
// queries (such as Transaction validation) as the history may contain
|
||||
// writes that are also present in the SST files.
|
||||
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext* merge_context);
|
||||
|
||||
void AddIterators(const ReadOptions& options,
|
||||
std::vector<Iterator*>* iterator_list, Arena* arena);
|
||||
|
||||
@ -63,14 +69,26 @@ class MemTableListVersion {
|
||||
uint64_t GetTotalNumDeletes() const;
|
||||
|
||||
private:
|
||||
// REQUIRE: m is mutable memtable
|
||||
void Add(MemTable* m);
|
||||
// REQUIRE: m is mutable memtable
|
||||
void Remove(MemTable* m);
|
||||
// REQUIRE: m is an immutable memtable
|
||||
void Add(MemTable* m, autovector<MemTable*>* to_delete);
|
||||
// REQUIRE: m is an immutable memtable
|
||||
void Remove(MemTable* m, autovector<MemTable*>* to_delete);
|
||||
|
||||
void TrimHistory(autovector<MemTable*>* to_delete);
|
||||
|
||||
friend class MemTableList;
|
||||
|
||||
// Immutable MemTables that have not yet been flushed.
|
||||
std::list<MemTable*> memlist_;
|
||||
int size_ = 0;
|
||||
|
||||
// MemTables that have already been flushed
|
||||
// (used during Transaction validation)
|
||||
std::list<MemTable*> memlist_history_;
|
||||
|
||||
// Maximum number of MemTables to keep in memory (including both flushed
|
||||
// and not-yet-flushed tables).
|
||||
const int max_write_buffer_number_to_maintain_;
|
||||
|
||||
int refs_ = 0;
|
||||
};
|
||||
|
||||
@ -88,10 +106,11 @@ class MemTableListVersion {
|
||||
class MemTableList {
|
||||
public:
|
||||
// A list of memtables.
|
||||
explicit MemTableList(int min_write_buffer_number_to_merge)
|
||||
explicit MemTableList(int min_write_buffer_number_to_merge,
|
||||
int max_write_buffer_number_to_maintain)
|
||||
: imm_flush_needed(false),
|
||||
min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
|
||||
current_(new MemTableListVersion()),
|
||||
current_(new MemTableListVersion(max_write_buffer_number_to_maintain)),
|
||||
num_flush_not_started_(0),
|
||||
commit_in_progress_(false),
|
||||
flush_requested_(false) {
|
||||
@ -108,8 +127,13 @@ class MemTableList {
|
||||
// determine whether there is anything more to start flushing.
|
||||
std::atomic<bool> imm_flush_needed;
|
||||
|
||||
// Returns the total number of memtables in the list
|
||||
int size() const;
|
||||
// Returns the total number of memtables in the list that haven't yet
|
||||
// been flushed and logged.
|
||||
int NumNotFlushed() const;
|
||||
|
||||
// Returns total number of memtables in the list that have been
|
||||
// completely flushed and logged.
|
||||
int NumFlushed() const;
|
||||
|
||||
// Returns true if there is at least one memtable on which flush has
|
||||
// not yet started.
|
||||
@ -133,7 +157,7 @@ class MemTableList {
|
||||
|
||||
// New memtables are inserted at the front of the list.
|
||||
// Takes ownership of the referenced held on *m by the caller of Add().
|
||||
void Add(MemTable* m);
|
||||
void Add(MemTable* m, autovector<MemTable*>* to_delete);
|
||||
|
||||
// Returns an estimate of the number of bytes of data in use.
|
||||
size_t ApproximateMemoryUsage();
|
||||
@ -153,7 +177,7 @@ class MemTableList {
|
||||
// DB mutex held
|
||||
void InstallNewVersion();
|
||||
|
||||
int min_write_buffer_number_to_merge_;
|
||||
const int min_write_buffer_number_to_merge_;
|
||||
|
||||
MemTableListVersion* current_;
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "db/memtable_list.h"
|
||||
@ -92,9 +93,9 @@ class MemTableListTest : public testing::Test {
|
||||
|
||||
TEST_F(MemTableListTest, Empty) {
|
||||
// Create an empty MemTableList and validate basic functions.
|
||||
MemTableList list(1);
|
||||
MemTableList list(1, 0);
|
||||
|
||||
ASSERT_EQ(0, list.size());
|
||||
ASSERT_EQ(0, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
|
||||
@ -110,12 +111,15 @@ TEST_F(MemTableListTest, Empty) {
|
||||
TEST_F(MemTableListTest, GetTest) {
|
||||
// Create MemTableList
|
||||
int min_write_buffer_number_to_merge = 2;
|
||||
MemTableList list(min_write_buffer_number_to_merge);
|
||||
int max_write_buffer_number_to_maintain = 0;
|
||||
MemTableList list(min_write_buffer_number_to_merge,
|
||||
max_write_buffer_number_to_maintain);
|
||||
|
||||
SequenceNumber seq = 1;
|
||||
std::string value;
|
||||
Status s;
|
||||
MergeContext merge_context;
|
||||
autovector<MemTable*> to_delete;
|
||||
|
||||
LookupKey lkey("key1", seq);
|
||||
bool found = list.current()->Get(lkey, &value, &s, &merge_context);
|
||||
@ -158,7 +162,7 @@ TEST_F(MemTableListTest, GetTest) {
|
||||
ASSERT_EQ(1, mem->num_deletes());
|
||||
|
||||
// Add memtable to list
|
||||
list.Add(mem);
|
||||
list.Add(mem, &to_delete);
|
||||
|
||||
SequenceNumber saved_seq = seq;
|
||||
|
||||
@ -172,7 +176,7 @@ TEST_F(MemTableListTest, GetTest) {
|
||||
mem2->Add(++seq, kTypeValue, "key2", "value2.3");
|
||||
|
||||
// Add second memtable to list
|
||||
list.Add(mem2);
|
||||
list.Add(mem2, &to_delete);
|
||||
|
||||
// Fetch keys via MemTableList
|
||||
merge_context.Clear();
|
||||
@ -196,15 +200,187 @@ TEST_F(MemTableListTest, GetTest) {
|
||||
found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
ASSERT_EQ(2, list.size());
|
||||
ASSERT_EQ(2, list.NumNotFlushed());
|
||||
|
||||
autovector<MemTable*> to_delete;
|
||||
list.current()->Unref(&to_delete);
|
||||
for (MemTable* m : to_delete) {
|
||||
delete m;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MemTableListTest, GetFromHistoryTest) {
|
||||
// Create MemTableList
|
||||
int min_write_buffer_number_to_merge = 2;
|
||||
int max_write_buffer_number_to_maintain = 2;
|
||||
MemTableList list(min_write_buffer_number_to_merge,
|
||||
max_write_buffer_number_to_maintain);
|
||||
|
||||
SequenceNumber seq = 1;
|
||||
std::string value;
|
||||
Status s;
|
||||
MergeContext merge_context;
|
||||
autovector<MemTable*> to_delete;
|
||||
|
||||
LookupKey lkey("key1", seq);
|
||||
bool found = list.current()->Get(lkey, &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
// Create a MemTable
|
||||
InternalKeyComparator cmp(BytewiseComparator());
|
||||
auto factory = std::make_shared<SkipListFactory>();
|
||||
options.memtable_factory = factory;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
|
||||
WriteBuffer wb(options.db_write_buffer_size);
|
||||
MemTable* mem =
|
||||
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb);
|
||||
mem->Ref();
|
||||
|
||||
// Write some keys to this memtable.
|
||||
mem->Add(++seq, kTypeDeletion, "key1", "");
|
||||
mem->Add(++seq, kTypeValue, "key2", "value2");
|
||||
mem->Add(++seq, kTypeValue, "key2", "value2.2");
|
||||
|
||||
// Fetch the newly written keys
|
||||
merge_context.Clear();
|
||||
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context);
|
||||
// MemTable found out that this key is *not* found (at this sequence#)
|
||||
ASSERT_TRUE(found && s.IsNotFound());
|
||||
|
||||
merge_context.Clear();
|
||||
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context);
|
||||
ASSERT_TRUE(s.ok() && found);
|
||||
ASSERT_EQ(value, "value2.2");
|
||||
|
||||
// Add memtable to list
|
||||
list.Add(mem, &to_delete);
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Fetch keys via MemTableList
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
|
||||
ASSERT_TRUE(found && s.IsNotFound());
|
||||
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
|
||||
ASSERT_TRUE(s.ok() && found);
|
||||
ASSERT_EQ("value2.2", value);
|
||||
|
||||
// Flush this memtable from the list.
|
||||
// (It will then be a part of the memtable history).
|
||||
autovector<MemTable*> to_flush;
|
||||
list.PickMemtablesToFlush(&to_flush);
|
||||
ASSERT_EQ(1, to_flush.size());
|
||||
|
||||
s = Mock_InstallMemtableFlushResults(
|
||||
&list, MutableCFOptions(options, ioptions), to_flush, &to_delete);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(0, list.NumNotFlushed());
|
||||
ASSERT_EQ(1, list.NumFlushed());
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Verify keys are no longer in MemTableList
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
// Verify keys are present in history
|
||||
merge_context.Clear();
|
||||
found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s,
|
||||
&merge_context);
|
||||
ASSERT_TRUE(found && s.IsNotFound());
|
||||
|
||||
merge_context.Clear();
|
||||
found = list.current()->GetFromHistory(LookupKey("key2", seq), &value, &s,
|
||||
&merge_context);
|
||||
ASSERT_TRUE(found);
|
||||
ASSERT_EQ("value2.2", value);
|
||||
|
||||
// Create another memtable and write some keys to it
|
||||
WriteBuffer wb2(options.db_write_buffer_size);
|
||||
MemTable* mem2 =
|
||||
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2);
|
||||
mem2->Ref();
|
||||
|
||||
mem2->Add(++seq, kTypeDeletion, "key1", "");
|
||||
mem2->Add(++seq, kTypeValue, "key3", "value3");
|
||||
|
||||
// Add second memtable to list
|
||||
list.Add(mem2, &to_delete);
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
to_flush.clear();
|
||||
list.PickMemtablesToFlush(&to_flush);
|
||||
ASSERT_EQ(1, to_flush.size());
|
||||
|
||||
// Flush second memtable
|
||||
s = Mock_InstallMemtableFlushResults(
|
||||
&list, MutableCFOptions(options, ioptions), to_flush, &to_delete);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(0, list.NumNotFlushed());
|
||||
ASSERT_EQ(2, list.NumFlushed());
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Add a third memtable to push the first memtable out of the history
|
||||
WriteBuffer wb3(options.db_write_buffer_size);
|
||||
MemTable* mem3 =
|
||||
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb3);
|
||||
mem3->Ref();
|
||||
list.Add(mem3, &to_delete);
|
||||
ASSERT_EQ(1, list.NumNotFlushed());
|
||||
ASSERT_EQ(1, list.NumFlushed());
|
||||
ASSERT_EQ(1, to_delete.size());
|
||||
|
||||
// Verify keys are no longer in MemTableList
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
// Verify that the second memtable's keys are in the history
|
||||
merge_context.Clear();
|
||||
found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s,
|
||||
&merge_context);
|
||||
ASSERT_TRUE(found && s.IsNotFound());
|
||||
|
||||
merge_context.Clear();
|
||||
found = list.current()->GetFromHistory(LookupKey("key3", seq), &value, &s,
|
||||
&merge_context);
|
||||
ASSERT_TRUE(found);
|
||||
ASSERT_EQ("value3", value);
|
||||
|
||||
// Verify that key2 from the first memtable is no longer in the history
|
||||
merge_context.Clear();
|
||||
found =
|
||||
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
|
||||
ASSERT_FALSE(found);
|
||||
|
||||
// Cleanup
|
||||
list.current()->Unref(&to_delete);
|
||||
ASSERT_EQ(3, to_delete.size());
|
||||
for (MemTable* m : to_delete) {
|
||||
delete m;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
const int num_tables = 5;
|
||||
SequenceNumber seq = 1;
|
||||
@ -215,10 +391,13 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
ImmutableCFOptions ioptions(options);
|
||||
InternalKeyComparator cmp(BytewiseComparator());
|
||||
WriteBuffer wb(options.db_write_buffer_size);
|
||||
autovector<MemTable*> to_delete;
|
||||
|
||||
// Create MemTableList
|
||||
int min_write_buffer_number_to_merge = 3;
|
||||
MemTableList list(min_write_buffer_number_to_merge);
|
||||
int max_write_buffer_number_to_maintain = 7;
|
||||
MemTableList list(min_write_buffer_number_to_merge,
|
||||
max_write_buffer_number_to_maintain);
|
||||
|
||||
// Create some MemTables
|
||||
std::vector<MemTable*> tables;
|
||||
@ -264,9 +443,10 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
// Add 2 tables
|
||||
list.Add(tables[0]);
|
||||
list.Add(tables[1]);
|
||||
ASSERT_EQ(2, list.size());
|
||||
list.Add(tables[0], &to_delete);
|
||||
list.Add(tables[1], &to_delete);
|
||||
ASSERT_EQ(2, list.NumNotFlushed());
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Even though we have less than the minimum to flush, a flush is
|
||||
// pending since we had previously requested a flush and never called
|
||||
@ -277,7 +457,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
// Pick tables to flush
|
||||
list.PickMemtablesToFlush(&to_flush);
|
||||
ASSERT_EQ(2, to_flush.size());
|
||||
ASSERT_EQ(2, list.size());
|
||||
ASSERT_EQ(2, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
@ -288,16 +468,17 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
to_flush.clear();
|
||||
|
||||
// Add another table
|
||||
list.Add(tables[2]);
|
||||
list.Add(tables[2], &to_delete);
|
||||
// We now have the minimum to flush regardles of whether FlushRequested()
|
||||
// was called.
|
||||
ASSERT_TRUE(list.IsFlushPending());
|
||||
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Pick tables to flush
|
||||
list.PickMemtablesToFlush(&to_flush);
|
||||
ASSERT_EQ(3, to_flush.size());
|
||||
ASSERT_EQ(3, list.size());
|
||||
ASSERT_EQ(3, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
@ -305,14 +486,15 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
autovector<MemTable*> to_flush2;
|
||||
list.PickMemtablesToFlush(&to_flush2);
|
||||
ASSERT_EQ(0, to_flush2.size());
|
||||
ASSERT_EQ(3, list.size());
|
||||
ASSERT_EQ(3, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
// Add another table
|
||||
list.Add(tables[3]);
|
||||
list.Add(tables[3], &to_delete);
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Request a flush again
|
||||
list.FlushRequested();
|
||||
@ -322,7 +504,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
// Pick tables to flush again
|
||||
list.PickMemtablesToFlush(&to_flush2);
|
||||
ASSERT_EQ(1, to_flush2.size());
|
||||
ASSERT_EQ(4, list.size());
|
||||
ASSERT_EQ(4, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
@ -333,29 +515,28 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
to_flush.clear();
|
||||
|
||||
// Add another tables
|
||||
list.Add(tables[4]);
|
||||
ASSERT_EQ(5, list.size());
|
||||
list.Add(tables[4], &to_delete);
|
||||
ASSERT_EQ(5, list.NumNotFlushed());
|
||||
// We now have the minimum to flush regardles of whether FlushRequested()
|
||||
ASSERT_TRUE(list.IsFlushPending());
|
||||
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
|
||||
// Pick tables to flush
|
||||
list.PickMemtablesToFlush(&to_flush);
|
||||
// Should pick 4 of 5 since 1 table has been picked in to_flush2
|
||||
ASSERT_EQ(4, to_flush.size());
|
||||
ASSERT_EQ(5, list.size());
|
||||
ASSERT_EQ(5, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
// Pick tables to flush again
|
||||
autovector<MemTable*> to_flush3;
|
||||
ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
|
||||
ASSERT_EQ(5, list.size());
|
||||
ASSERT_EQ(5, list.NumNotFlushed());
|
||||
ASSERT_FALSE(list.IsFlushPending());
|
||||
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
|
||||
|
||||
autovector<MemTable*> to_delete;
|
||||
|
||||
// Flush the 4 memtables that were picked in to_flush
|
||||
s = Mock_InstallMemtableFlushResults(
|
||||
&list, MutableCFOptions(options, ioptions), to_flush, &to_delete);
|
||||
@ -366,17 +547,10 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
// Current implementation will only commit memtables in the order they were
|
||||
// created. So InstallMemtableFlushResults will install the first 3 tables
|
||||
// in to_flush and stop when it encounters a table not yet flushed.
|
||||
ASSERT_EQ(3, to_delete.size());
|
||||
ASSERT_EQ(2, list.size());
|
||||
|
||||
for (const auto& m : to_delete) {
|
||||
// Refcount should be 0 after calling InstallMemtableFlushResults.
|
||||
// Verify this, by Ref'ing then UnRef'ing:
|
||||
m->Ref();
|
||||
ASSERT_EQ(m, m->Unref());
|
||||
delete m;
|
||||
}
|
||||
to_delete.clear();
|
||||
ASSERT_EQ(2, list.NumNotFlushed());
|
||||
int num_in_history = std::min(3, max_write_buffer_number_to_maintain);
|
||||
ASSERT_EQ(num_in_history, list.NumFlushed());
|
||||
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
|
||||
|
||||
// Request a flush again. Should be nothing to flush
|
||||
list.FlushRequested();
|
||||
@ -388,10 +562,12 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
&list, MutableCFOptions(options, ioptions), to_flush2, &to_delete);
|
||||
ASSERT_OK(s);
|
||||
|
||||
// This will actually intall 2 tables. The 1 we told it to flush, and also
|
||||
// This will actually install 2 tables. The 1 we told it to flush, and also
|
||||
// tables[4] which has been waiting for tables[3] to commit.
|
||||
ASSERT_EQ(2, to_delete.size());
|
||||
ASSERT_EQ(0, list.size());
|
||||
ASSERT_EQ(0, list.NumNotFlushed());
|
||||
num_in_history = std::min(5, max_write_buffer_number_to_maintain);
|
||||
ASSERT_EQ(num_in_history, list.NumFlushed());
|
||||
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
|
||||
|
||||
for (const auto& m : to_delete) {
|
||||
// Refcount should be 0 after calling InstallMemtableFlushResults.
|
||||
@ -403,7 +579,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
||||
to_delete.clear();
|
||||
|
||||
list.current()->Unref(&to_delete);
|
||||
ASSERT_EQ(0, to_delete.size());
|
||||
int to_delete_size = std::min(5, max_write_buffer_number_to_maintain);
|
||||
ASSERT_EQ(to_delete_size, to_delete.size());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -538,6 +538,8 @@ extern void rocksdb_options_enable_statistics(rocksdb_options_t*);
|
||||
|
||||
extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int);
|
||||
extern void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int);
|
||||
extern void rocksdb_options_set_max_write_buffer_number_to_maintain(
|
||||
rocksdb_options_t*, int);
|
||||
extern void rocksdb_options_set_max_background_compactions(rocksdb_options_t*, int);
|
||||
extern void rocksdb_options_set_max_background_flushes(rocksdb_options_t*, int);
|
||||
extern void rocksdb_options_set_max_log_file_size(rocksdb_options_t*, size_t);
|
||||
|
@ -238,6 +238,21 @@ struct ColumnFamilyOptions {
|
||||
// individual write buffers. Default: 1
|
||||
int min_write_buffer_number_to_merge;
|
||||
|
||||
// The total maximum number of write buffers to maintain in memory including
|
||||
// copies of buffers that have already been flushed. Unlike
|
||||
// max_write_buffer_number, this parameter does not affect flushing.
|
||||
// This controls the minimum amount of write history that will be available
|
||||
// in memory for conflict checking when Transactions are used.
|
||||
// If this value is too low, some transactions may fail at commit time due
|
||||
// to not being able to determine whether there were any write conflicts.
|
||||
//
|
||||
// Setting this value to 0 will cause write buffers to be freed immediately
|
||||
// after they are flushed.
|
||||
// If this value is set to -1, 'max_write_buffer_number' will be used.
|
||||
//
|
||||
// Default: 0
|
||||
int max_write_buffer_number_to_maintain;
|
||||
|
||||
// Compress blocks using the specified compression algorithm. This
|
||||
// parameter can be changed dynamically.
|
||||
//
|
||||
|
@ -993,6 +993,30 @@ void Java_org_rocksdb_Options_setMinWriteBufferNumberToMerge(
|
||||
jhandle)->min_write_buffer_number_to_merge =
|
||||
static_cast<int>(jmin_write_buffer_number_to_merge);
|
||||
}
|
||||
/*
|
||||
* Class: org_rocksdb_Options
|
||||
* Method: maxWriteBufferNumberToMaintain
|
||||
* Signature: (J)I
|
||||
*/
|
||||
jint Java_org_rocksdb_Options_maxWriteBufferNumberToMaintain(JNIEnv* env,
|
||||
jobject jobj,
|
||||
jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::Options*>(jhandle)
|
||||
->max_write_buffer_number_to_maintain;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_Options
|
||||
* Method: setMaxWriteBufferNumberToMaintain
|
||||
* Signature: (JI)V
|
||||
*/
|
||||
void Java_org_rocksdb_Options_setMaxWriteBufferNumberToMaintain(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle,
|
||||
jint jmax_write_buffer_number_to_maintain) {
|
||||
reinterpret_cast<rocksdb::Options*>(jhandle)
|
||||
->max_write_buffer_number_to_maintain =
|
||||
static_cast<int>(jmax_write_buffer_number_to_maintain);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_Options
|
||||
@ -2153,6 +2177,30 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMinWriteBufferNumberToMerge(
|
||||
static_cast<int>(jmin_write_buffer_number_to_merge);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_ColumnFamilyOptions
|
||||
* Method: maxWriteBufferNumberToMaintain
|
||||
* Signature: (J)I
|
||||
*/
|
||||
jint Java_org_rocksdb_ColumnFamilyOptions_maxWriteBufferNumberToMaintain(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jhandle)
|
||||
->max_write_buffer_number_to_maintain;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_ColumnFamilyOptions
|
||||
* Method: setMaxWriteBufferNumberToMaintain
|
||||
* Signature: (JI)V
|
||||
*/
|
||||
void Java_org_rocksdb_ColumnFamilyOptions_setMaxWriteBufferNumberToMaintain(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle,
|
||||
jint jmax_write_buffer_number_to_maintain) {
|
||||
reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jhandle)
|
||||
->max_write_buffer_number_to_maintain =
|
||||
static_cast<int>(jmax_write_buffer_number_to_maintain);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_ColumnFamilyOptions
|
||||
* Method: setCompressionType
|
||||
|
@ -139,6 +139,20 @@ DEFINE_int32(min_write_buffer_number_to_merge,
|
||||
"writing less data to storage if there are duplicate records in"
|
||||
" each of these individual write buffers.");
|
||||
|
||||
DEFINE_int32(max_write_buffer_number_to_maintain,
|
||||
rocksdb::Options().max_write_buffer_number_to_maintain,
|
||||
"The total maximum number of write buffers to maintain in memory "
|
||||
"including copies of buffers that have already been flushed. "
|
||||
"Unlike max_write_buffer_number, this parameter does not affect "
|
||||
"flushing. This controls the minimum amount of write history "
|
||||
"that will be available in memory for conflict checking when "
|
||||
"Transactions are used. If this value is too low, some "
|
||||
"transactions may fail at commit time due to not being able to "
|
||||
"determine whether there were any write conflicts. Setting this "
|
||||
"value to 0 will cause write buffers to be freed immediately "
|
||||
"after they are flushed. If this value is set to -1, "
|
||||
"'max_write_buffer_number' will be used.");
|
||||
|
||||
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
|
||||
"Maximum number of files to keep open at the same time "
|
||||
"(use default if == 0)");
|
||||
@ -1767,6 +1781,8 @@ class StressTest {
|
||||
options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
|
||||
options_.min_write_buffer_number_to_merge =
|
||||
FLAGS_min_write_buffer_number_to_merge;
|
||||
options_.max_write_buffer_number_to_maintain =
|
||||
FLAGS_max_write_buffer_number_to_maintain;
|
||||
options_.max_background_compactions = FLAGS_max_background_compactions;
|
||||
options_.max_background_flushes = FLAGS_max_background_flushes;
|
||||
options_.compaction_style =
|
||||
|
@ -90,6 +90,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
|
||||
write_buffer_size(4 << 20),
|
||||
max_write_buffer_number(2),
|
||||
min_write_buffer_number_to_merge(1),
|
||||
max_write_buffer_number_to_maintain(0),
|
||||
compression(kSnappyCompression),
|
||||
prefix_extractor(nullptr),
|
||||
num_levels(7),
|
||||
@ -143,6 +144,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
|
||||
max_write_buffer_number(options.max_write_buffer_number),
|
||||
min_write_buffer_number_to_merge(
|
||||
options.min_write_buffer_number_to_merge),
|
||||
max_write_buffer_number_to_maintain(
|
||||
options.max_write_buffer_number_to_maintain),
|
||||
compression(options.compression),
|
||||
compression_per_level(options.compression_per_level),
|
||||
compression_opts(options.compression_opts),
|
||||
@ -398,6 +401,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||
min_write_buffer_number_to_merge);
|
||||
Warn(log, " Options.purge_redundant_kvs_while_flush: %d",
|
||||
purge_redundant_kvs_while_flush);
|
||||
Warn(log, " Options.max_write_buffer_number_to_maintain: %d",
|
||||
max_write_buffer_number_to_maintain);
|
||||
Warn(log, " Options.compression_opts.window_bits: %d",
|
||||
compression_opts.window_bits);
|
||||
Warn(log, " Options.compression_opts.level: %d",
|
||||
|
@ -391,6 +391,8 @@ bool ParseColumnFamilyOption(const std::string& name, const std::string& value,
|
||||
new_options->table_factory.reset(NewBlockBasedTableFactory(table_opt));
|
||||
} else if (name == "min_write_buffer_number_to_merge") {
|
||||
new_options->min_write_buffer_number_to_merge = ParseInt(value);
|
||||
} else if (name == "max_write_buffer_number_to_maintain") {
|
||||
new_options->max_write_buffer_number_to_maintain = ParseInt(value);
|
||||
} else if (name == "compression") {
|
||||
new_options->compression = ParseCompressionType(value);
|
||||
} else if (name == "compression_per_level") {
|
||||
|
@ -96,6 +96,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
||||
{"write_buffer_size", "1"},
|
||||
{"max_write_buffer_number", "2"},
|
||||
{"min_write_buffer_number_to_merge", "3"},
|
||||
{"max_write_buffer_number_to_maintain", "99"},
|
||||
{"compression", "kSnappyCompression"},
|
||||
{"compression_per_level",
|
||||
"kNoCompression:"
|
||||
@ -182,6 +183,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
||||
ASSERT_EQ(new_cf_opt.write_buffer_size, 1U);
|
||||
ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2);
|
||||
ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3);
|
||||
ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99);
|
||||
ASSERT_EQ(new_cf_opt.compression, kSnappyCompression);
|
||||
ASSERT_EQ(new_cf_opt.compression_per_level.size(), 6U);
|
||||
ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression);
|
||||
|
@ -64,6 +64,16 @@ void xf_manage_new(DBImpl* db, ReadOptions* read_options,
|
||||
|
||||
void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); }
|
||||
|
||||
void xf_transaction_set_memtable_history(
|
||||
int32_t* max_write_buffer_number_to_maintain) {
|
||||
*max_write_buffer_number_to_maintain = 10;
|
||||
}
|
||||
|
||||
void xf_transaction_clear_memtable_history(
|
||||
int32_t* max_write_buffer_number_to_maintain) {
|
||||
*max_write_buffer_number_to_maintain = 0;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // XFUNC
|
||||
|
@ -40,6 +40,10 @@ void xf_manage_new(DBImpl* db, ReadOptions* readoptions,
|
||||
bool is_snapshot_supported);
|
||||
void xf_manage_create(ManagedIterator* iter);
|
||||
void xf_manage_options(ReadOptions* read_options);
|
||||
void xf_transaction_set_memtable_history(
|
||||
int32_t* max_write_buffer_number_to_maintain);
|
||||
void xf_transaction_clear_memtable_history(
|
||||
int32_t* max_write_buffer_number_to_maintain);
|
||||
|
||||
// This class provides the facility to run custom code to test a specific
|
||||
// feature typically with all existing unit tests.
|
||||
|
Loading…
Reference in New Issue
Block a user