This commit is contained in:
Vlad Balan 2014-10-17 15:44:52 -07:00
commit dbcfe27d6c
23 changed files with 743 additions and 205 deletions

View File

@ -326,13 +326,14 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
auto write_controller = column_family_set_->write_controller_;
if (imm()->size() == options_.max_write_buffer_number) {
if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush)",
name_.c_str(), imm()->size());
"(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->size(),
mutable_cf_options.max_write_buffer_number);
} else if (current_->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
@ -353,8 +354,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
} else if (options_.hard_rate_limit > 1.0 &&
score > options_.hard_rate_limit) {
} else if (mutable_cf_options.hard_rate_limit > 1.0 &&
score > mutable_cf_options.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;
write_controller_token_ =
write_controller->GetDelayToken(kHardLimitSlowdown);
@ -364,10 +365,11 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
} else if (options_.soft_rate_limit > 0.0 &&
score > options_.soft_rate_limit) {
uint64_t slowdown = SlowdownAmount(score, options_.soft_rate_limit,
options_.hard_rate_limit);
} else if (mutable_cf_options.soft_rate_limit > 0.0 &&
score > mutable_cf_options.soft_rate_limit) {
uint64_t slowdown = SlowdownAmount(score,
mutable_cf_options.soft_rate_limit,
mutable_cf_options.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(ioptions_.info_log,

View File

@ -1236,9 +1236,12 @@ Status DBImpl::Recover(
SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence());
}
// Initial value
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
max_total_in_memory_state_ += cfd->options()->write_buffer_size *
cfd->options()->max_write_buffer_number;
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number;
}
return s;
@ -1724,9 +1727,37 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
MutexLock l(&mutex_);
return cfh->cfd()->SetOptions(options_map);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
if (options_map.empty()) {
Log(db_options_.info_log, "SetOptions() on column family [%s], empty input",
cfd->GetName().c_str());
return false;
}
MutableCFOptions new_options;
bool succeed = false;
{
MutexLock l(&mutex_);
if (cfd->SetOptions(options_map)) {
new_options = *cfd->GetLatestMutableCFOptions();
succeed = true;
}
}
Log(db_options_.info_log, "SetOptions() on column family [%s], inputs:",
cfd->GetName().c_str());
for (const auto& o : options_map) {
Log(db_options_.info_log, "%s: %s\n", o.first.c_str(), o.second.c_str());
}
if (succeed) {
Log(db_options_.info_log, "[%s] SetOptions succeeded",
cfd->GetName().c_str());
new_options.Dump(db_options_.info_log.get());
} else {
Log(db_options_.info_log, "[%s] SetOptions failed",
cfd->GetName().c_str());
}
return succeed;
}
// return the same level if it cannot be moved
@ -1803,8 +1834,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
status = versions_->LogAndApply(cfd,
mutable_cf_options, &edit, &mutex_, db_directory_.get());
superversion_to_free = cfd->InstallSuperVersion(
new_superversion, &mutex_, mutable_cf_options);
superversion_to_free = InstallSuperVersion(
cfd, new_superversion, mutable_cf_options);
new_superversion = nullptr;
Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
@ -1840,10 +1871,10 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
return cfh->cfd()->options()->level0_stop_writes_trigger;
}
Status DBImpl::Flush(const FlushOptions& options,
Status DBImpl::Flush(const FlushOptions& flush_options,
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return FlushMemTable(cfh->cfd(), options);
return FlushMemTable(cfh->cfd(), flush_options);
}
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
@ -1933,7 +1964,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
}
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& options) {
const FlushOptions& flush_options) {
Status s;
{
WriteContext context;
@ -1957,7 +1988,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
write_thread_.ExitWriteThread(&w, &w, s);
}
if (s.ok() && options.wait) {
if (s.ok() && flush_options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd);
}
@ -2320,12 +2351,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else {
// no need to refcount in iteration since it's always under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->options()->disable_auto_compactions) {
// Pick up latest mutable CF Options and use it throughout the
// compaction job
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (!mutable_cf_options->disable_auto_compactions) {
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code
c.reset(cfd->PickCompaction(
*cfd->GetLatestMutableCFOptions(), log_buffer));
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
if (c != nullptr) {
// update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
@ -3441,7 +3474,7 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
}
} // namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
Iterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena) {
@ -3451,11 +3484,11 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(options, arena));
super_version->mem->NewIterator(read_options, arena));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &merge_iter_builder);
super_version->imm->AddIterators(read_options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, env_options_,
super_version->current->AddIterators(read_options, env_options_,
&merge_iter_builder);
internal_iter = merge_iter_builder.Finish();
IterState* cleanup = new IterState(this, &mutex_, super_version);
@ -3468,10 +3501,10 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
return default_cf_handle_;
}
Status DBImpl::Get(const ReadOptions& options,
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
return GetImpl(options, column_family, key, value);
return GetImpl(read_options, column_family, key, value);
}
// DeletionState gets created and destructed outside of the lock -- we
@ -3488,17 +3521,39 @@ void DBImpl::InstallSuperVersion(
ColumnFamilyData* cfd, DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
// if new_superversion == nullptr, it means somebody already used it
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
SuperVersion* old_superversion =
cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options);
InstallSuperVersion(cfd, deletion_state.new_superversion,
mutable_cf_options);
deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion);
}
Status DBImpl::GetImpl(const ReadOptions& options,
SuperVersion* DBImpl::InstallSuperVersion(
ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
auto* old = cfd->InstallSuperVersion(
new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
// We want to schedule potential flush or compactions since new options may
// have been picked up in this new version. New options may cause flush
// compaction trigger condition to change.
MaybeScheduleFlushOrCompaction();
// Update max_total_in_memory_state_
auto old_memtable_size = 0;
if (old) {
old_memtable_size = old->mutable_cf_options.write_buffer_size *
old->mutable_cf_options.max_write_buffer_number;
}
max_total_in_memory_state_ =
max_total_in_memory_state_ - old_memtable_size +
mutable_cf_options.write_buffer_size *
mutable_cf_options.max_write_buffer_number;
return old;
}
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
StopWatch sw(env_, stats_, DB_GET);
@ -3508,8 +3563,9 @@ Status DBImpl::GetImpl(const ReadOptions& options,
auto cfd = cfh->cfd();
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
if (read_options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
@ -3535,7 +3591,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
RecordTick(stats_, MEMTABLE_HIT);
} else {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(options, lkey, value, &s, &merge_context, value_found);
sv->current->Get(read_options, lkey, value, &s, &merge_context,
value_found);
RecordTick(stats_, MEMTABLE_MISS);
}
@ -3551,7 +3608,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
}
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& options,
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
@ -3577,8 +3634,9 @@ std::vector<Status> DBImpl::MultiGet(
}
mutex_.Lock();
if (options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
if (read_options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
@ -3621,7 +3679,8 @@ std::vector<Status> DBImpl::MultiGet(
// Done
} else {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(options, lkey, value, &s, &merge_context);
super_version->current->Get(read_options, lkey, value, &s,
&merge_context);
}
if (s.ok()) {
@ -3659,7 +3718,7 @@ std::vector<Status> DBImpl::MultiGet(
return stat_list;
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
*handle = nullptr;
@ -3674,26 +3733,23 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(new_id);
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(options.comparator->Name());
edit.SetComparatorName(cf_options.comparator->Name());
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Options opt(db_options_, options);
Options opt(db_options_, cf_options);
Status s = versions_->LogAndApply(nullptr,
MutableCFOptions(opt, ImmutableCFOptions(opt)),
&edit, &mutex_, db_directory_.get(), false, &options);
&edit, &mutex_, db_directory_.get(), false, &cf_options);
if (s.ok()) {
single_column_family_mode_ = false;
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_,
*cfd->GetLatestMutableCFOptions());
delete InstallSuperVersion(cfd, nullptr, *cfd->GetLatestMutableCFOptions());
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(db_options_.info_log, "Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
max_total_in_memory_state_ += cfd->options()->write_buffer_size *
cfd->options()->max_write_buffer_number;
} else {
Log(db_options_.info_log, "Creating column family [%s] FAILED -- %s",
column_family_name.c_str(), s.ToString().c_str());
@ -3712,7 +3768,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit.DropColumnFamily();
edit.SetColumnFamily(cfd->GetID());
Status s;
{
MutexLock l(&mutex_);
@ -3732,8 +3787,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
if (s.ok()) {
assert(cfd->IsDropped());
max_total_in_memory_state_ -= cfd->options()->write_buffer_size *
cfd->options()->max_write_buffer_number;
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number;
Log(db_options_.info_log, "Dropped column family with id %u\n",
cfd->GetID());
} else {
@ -3745,14 +3801,14 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
return s;
}
bool DBImpl::KeyMayExist(const ReadOptions& options,
bool DBImpl::KeyMayExist(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
if (value_found != nullptr) {
// falsify later if key-may-exist but can't fetch value
*value_found = true;
}
ReadOptions roptions = options;
ReadOptions roptions = read_options;
roptions.read_tier = kBlockCacheTier; // read from block cache only
auto s = GetImpl(roptions, column_family, key, value, value_found);
@ -3941,23 +3997,23 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
}
}
Status DBImpl::Delete(const WriteOptions& options,
Status DBImpl::Delete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key) {
return DB::Delete(options, column_family, key);
return DB::Delete(write_options, column_family, key);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.disableWAL = options.disableWAL;
w.sync = write_options.sync;
w.disableWAL = write_options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = options.timeout_hint_us;
w.timeout_hint_us = write_options.timeout_hint_us;
uint64_t expiration_time = 0;
bool has_timeout = false;
@ -3968,7 +4024,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
has_timeout = true;
}
if (!options.disableWAL) {
if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
}
@ -4036,7 +4092,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) {
DelayWrite(expiration_time);
status = DelayWrite(expiration_time);
}
if (UNLIKELY(status.ok() && has_timeout &&
@ -4074,13 +4130,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// Record statistics
RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
RecordTick(stats_, BYTES_WRITTEN, WriteBatchInternal::ByteSize(updates));
if (options.disableWAL) {
if (write_options.disableWAL) {
flush_on_destroy_ = true;
}
PERF_TIMER_STOP(write_pre_and_post_process_time);
uint64_t log_size = 0;
if (!options.disableWAL) {
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
Slice log_entry = WriteBatchInternal::Contents(updates);
status = log_->AddRecord(log_entry);
@ -4089,7 +4145,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
log_empty_ = false;
log_size = log_entry.size();
RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && options.sync) {
if (status.ok() && write_options.sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
if (db_options_.use_fsync) {
@ -4104,7 +4160,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(),
options.ignore_missing_column_families, 0, this, false);
write_options.ignore_missing_column_families, 0, this, false);
// A non-OK status here indicates iteration failure (either in-memory
// writebatch corruption (very bad), or the client specified invalid
// column family). This will later on trigger bg_error_.
@ -4123,7 +4179,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// internal stats
default_cf_internal_stats_->AddDBStats(
InternalStats::BYTES_WRITTEN, batch_size);
if (!options.disableWAL) {
if (!write_options.disableWAL) {
default_cf_internal_stats_->AddDBStats(
InternalStats::WAL_FILE_SYNCED, 1);
default_cf_internal_stats_->AddDBStats(
@ -4151,7 +4207,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::DelayWrite(uint64_t expiration_time) {
Status DBImpl::DelayWrite(uint64_t expiration_time) {
StopWatch sw(env_, stats_, WRITE_STALL);
bool has_timeout = (expiration_time > 0);
auto delay = write_controller_.GetDelay();
@ -4161,16 +4217,18 @@ void DBImpl::DelayWrite(uint64_t expiration_time) {
mutex_.Lock();
}
while (write_controller_.IsStopped()) {
while (bg_error_.ok() && write_controller_.IsStopped()) {
if (has_timeout) {
bg_cv_.TimedWait(expiration_time);
if (env_->NowMicros() > expiration_time) {
break;
return Status::TimedOut();
}
} else {
bg_cv_.Wait();
}
}
return bg_error_;
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
@ -4219,8 +4277,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 *
cfd->options()->write_buffer_size);
lfile->SetPreallocationBlockSize(
1.1 * mutable_cf_options.write_buffer_size);
new_log = new log::Writer(std::move(lfile));
}
}
@ -4232,6 +4290,9 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
new_superversion = new SuperVersion();
}
}
Log(db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "\n",
cfd->GetName().c_str(), new_log_number);
mutex_.Lock();
if (!s.ok()) {
// how do we fail if we're not creating new log?
@ -4264,11 +4325,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
cfd->imm()->Add(cfd->mem());
new_mem->Ref();
cfd->SetMemtable(new_mem);
Log(db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "\n",
cfd->GetName().c_str(), logfile_number_);
context->superversions_to_free_.push_back(
cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options));
InstallSuperVersion(cfd, new_superversion, mutable_cf_options));
return s;
}
@ -4614,7 +4672,7 @@ Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
}
// Default implementation -- returns not supported status
Status DB::CreateColumnFamily(const ColumnFamilyOptions& options,
Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
return Status::NotSupported("");
@ -4737,8 +4795,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_,
*cfd->GetLatestMutableCFOptions());
delete impl->InstallSuperVersion(
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
}
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));

View File

@ -367,7 +367,7 @@ class DBImpl : public DB {
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer);
void DelayWrite(uint64_t expiration_time);
Status DelayWrite(uint64_t expiration_time);
Status ScheduleFlushes(WriteContext* context);
@ -630,6 +630,10 @@ class DBImpl : public DB {
DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options);
SuperVersion* InstallSuperVersion(
ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.
inline SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);

View File

@ -8137,7 +8137,7 @@ TEST(DBTest, SimpleWriteTimeoutTest) {
options.max_background_flushes = 0;
options.max_write_buffer_number = 2;
options.max_total_wal_size = std::numeric_limits<uint64_t>::max();
WriteOptions write_opt = WriteOptions();
WriteOptions write_opt;
write_opt.timeout_hint_us = 0;
DestroyAndReopen(&options);
// fill the two write buffers
@ -8173,7 +8173,7 @@ static void RandomTimeoutWriter(void* arg) {
DB* db = state->db;
Random rnd(1000 + thread_id);
WriteOptions write_opt = WriteOptions();
WriteOptions write_opt;
write_opt.timeout_hint_us = 500;
int timeout_count = 0;
int num_keys = kNumKeys * 5;
@ -8558,14 +8558,13 @@ TEST(DBTest, DynamicMemtableOptions) {
auto gen_l0_kb = [this](int size) {
Random rnd(301);
std::vector<std::string> values;
for (int i = 0; i < size; i++) {
values.push_back(RandomString(&rnd, 1024));
ASSERT_OK(Put(Key(i), values[i]));
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
}
dbfull()->TEST_WaitForFlushMemTable();
};
// Test write_buffer_size
gen_l0_kb(64);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_TRUE(SizeAtLevel(0) < k64KB + k5KB);
@ -8587,103 +8586,299 @@ TEST(DBTest, DynamicMemtableOptions) {
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
ASSERT_TRUE(SizeAtLevel(0) < k128KB + k64KB + 2 * k5KB);
ASSERT_TRUE(SizeAtLevel(0) > k128KB + k64KB - 2 * k5KB);
// Test max_write_buffer_number
// Block compaction thread, which will also block the flushes because
// max_background_flushes == 0, so flushes are getting executed by the
// compaction thread
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low1;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1,
Env::Priority::LOW);
// Start from scratch and disable compaction/flush. Flush can only happen
// during compaction but trigger is pretty high
options.max_background_flushes = 0;
options.disable_auto_compactions = true;
DestroyAndReopen(&options);
// Put until timeout, bounded by 256 puts. We should see timeout at ~128KB
int count = 0;
Random rnd(301);
WriteOptions wo;
wo.timeout_hint_us = 1000;
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) {
count++;
}
ASSERT_TRUE(count > (128 * 0.9) && count < (128 * 1.1));
sleeping_task_low1.WakeUp();
sleeping_task_low1.WaitUntilDone();
// Increase
ASSERT_TRUE(dbfull()->SetOptions({
{"max_write_buffer_number", "8"},
}));
// Clean up memtable and L0
dbfull()->CompactRange(nullptr, nullptr);
SleepingBackgroundTask sleeping_task_low2;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2,
Env::Priority::LOW);
count = 0;
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) {
count++;
}
ASSERT_TRUE(count > (512 * 0.9) && count < (512 * 1.1));
sleeping_task_low2.WakeUp();
sleeping_task_low2.WaitUntilDone();
// Decrease
ASSERT_TRUE(dbfull()->SetOptions({
{"max_write_buffer_number", "4"},
}));
// Clean up memtable and L0
dbfull()->CompactRange(nullptr, nullptr);
SleepingBackgroundTask sleeping_task_low3;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3,
Env::Priority::LOW);
count = 0;
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) {
count++;
}
ASSERT_TRUE(count > (256 * 0.9) && count < (256 * 1.1));
sleeping_task_low3.WakeUp();
sleeping_task_low3.WaitUntilDone();
}
TEST(DBTest, DynamicCompactionOptions) {
// minimum write buffer size is enforced at 64KB
const uint64_t k32KB = 1 << 15;
const uint64_t k64KB = 1 << 16;
const uint64_t k128KB = 1 << 17;
const uint64_t k256KB = 1 << 18;
const uint64_t k5KB = 5 * 1024;
const uint64_t k4KB = 1 << 12;
Options options;
options.env = env_;
options.create_if_missing = true;
options.compression = kNoCompression;
options.max_background_compactions = 4;
options.hard_rate_limit = 1.1;
options.write_buffer_size = k128KB;
options.write_buffer_size = k64KB;
options.max_write_buffer_number = 2;
// Compaction related options
options.level0_file_num_compaction_trigger = 3;
options.level0_slowdown_writes_trigger = 10;
options.level0_stop_writes_trigger = 20;
options.level0_slowdown_writes_trigger = 4;
options.level0_stop_writes_trigger = 8;
options.max_grandparent_overlap_factor = 10;
options.expanded_compaction_factor = 25;
options.source_compaction_factor = 1;
options.target_file_size_base = k128KB;
options.target_file_size_base = k64KB;
options.target_file_size_multiplier = 1;
options.max_bytes_for_level_base = k256KB;
options.max_bytes_for_level_base = k128KB;
options.max_bytes_for_level_multiplier = 4;
// Block flush thread and disable compaction thread
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
DestroyAndReopen(&options);
auto gen_l0_kb = [this](int start, int size, int stride) {
Random rnd(301);
std::vector<std::string> values;
for (int i = 0; i < size; i++) {
values.push_back(RandomString(&rnd, 1024));
ASSERT_OK(Put(Key(start + stride * i), values[i]));
ASSERT_OK(Put(Key(start + stride * i), RandomString(&rnd, 1024)));
}
dbfull()->TEST_WaitForFlushMemTable();
};
// Write 3 files that have the same key range, trigger compaction and
// result in one L1 file
gen_l0_kb(0, 128, 1);
gen_l0_kb(0, 64, 1);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
gen_l0_kb(0, 128, 1);
gen_l0_kb(0, 64, 1);
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
gen_l0_kb(0, 128, 1);
gen_l0_kb(0, 64, 1);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,1", FilesPerLevel());
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(1U, metadata.size());
ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB
ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB
ASSERT_LE(metadata[0].size, k64KB + k4KB);
ASSERT_GE(metadata[0].size, k64KB - k4KB);
// Make compaction trigger and file size smaller
// Test compaction trigger and target_file_size_base
ASSERT_TRUE(dbfull()->SetOptions({
{"level0_file_num_compaction_trigger", "2"},
{"target_file_size_base", "65536"}
{"target_file_size_base", std::to_string(k32KB) }
}));
gen_l0_kb(0, 128, 1);
gen_l0_kb(0, 64, 1);
ASSERT_EQ("1,1", FilesPerLevel());
gen_l0_kb(0, 128, 1);
gen_l0_kb(0, 64, 1);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,2", FilesPerLevel());
metadata.clear();
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(2U, metadata.size());
ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB
ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB
ASSERT_LE(metadata[0].size, k32KB + k4KB);
ASSERT_GE(metadata[0].size, k32KB - k4KB);
// Change base level size to 1MB
ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} }));
// writing 56 x 128KB => 7MB
// (L1 + L2) = (1 + 4) * 1MB = 5MB
for (int i = 0; i < 56; ++i) {
gen_l0_kb(i, 128, 56);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1);
ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1);
// Change multiplier to 2 with smaller base
// Test max_bytes_for_level_base
ASSERT_TRUE(dbfull()->SetOptions({
{"max_bytes_for_level_multiplier", "2"},
{"max_bytes_for_level_base", "262144"}
{"max_bytes_for_level_base", std::to_string(k256KB) }
}));
// writing 16 x 128KB
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
for (int i = 0; i < 16; ++i) {
gen_l0_kb(i, 128, 50);
// writing 24 x 64KB => 6 * 256KB
// (L1 + L2) = (1 + 4) * 256KB
for (int i = 0; i < 24; ++i) {
gen_l0_kb(i, 64, 32);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(1) > k256KB * 0.8 &&
SizeAtLevel(1) < k256KB * 1.2);
ASSERT_TRUE(SizeAtLevel(2) > 4 * k256KB * 0.8 &&
SizeAtLevel(2) < 4 * k256KB * 1.2);
// Test max_bytes_for_level_multiplier and
// max_bytes_for_level_base (reduce)
ASSERT_TRUE(dbfull()->SetOptions({
{"max_bytes_for_level_multiplier", "2"},
{"max_bytes_for_level_base", std::to_string(k128KB) }
}));
// writing 20 x 64KB = 10 x 128KB
// (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
for (int i = 0; i < 20; ++i) {
gen_l0_kb(i, 64, 32);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) > k128KB * 0.8 &&
SizeAtLevel(1) < k128KB * 1.2);
ASSERT_TRUE(SizeAtLevel(2) > 2 * k128KB * 0.8 &&
SizeAtLevel(2) < 2 * k128KB * 1.2);
ASSERT_TRUE(SizeAtLevel(3) > 4 * k128KB * 0.8 &&
SizeAtLevel(3) < 4 * k128KB * 1.2);
// Clean up memtable and L0
dbfull()->CompactRange(nullptr, nullptr);
// Block compaction
SleepingBackgroundTask sleeping_task_low1;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1,
Env::Priority::LOW);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
int count = 0;
Random rnd(301);
WriteOptions wo;
wo.timeout_hint_us = 10000;
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) {
dbfull()->TEST_FlushMemTable(true);
count++;
}
// Stop trigger = 8
ASSERT_EQ(count, 8);
// Unblock
sleeping_task_low1.WakeUp();
sleeping_task_low1.WaitUntilDone();
// Test: stop trigger (reduce)
ASSERT_TRUE(dbfull()->SetOptions({
{"level0_stop_writes_trigger", "6"}
}));
dbfull()->CompactRange(nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// Block compaction
SleepingBackgroundTask sleeping_task_low2;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2,
Env::Priority::LOW);
count = 0;
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) {
dbfull()->TEST_FlushMemTable(true);
count++;
}
ASSERT_EQ(count, 6);
// Unblock
sleeping_task_low2.WakeUp();
sleeping_task_low2.WaitUntilDone();
// Test disable_auto_compactions
ASSERT_TRUE(dbfull()->SetOptions({
{"disable_auto_compactions", "true"}
}));
dbfull()->CompactRange(nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
// Wait for compaction so that put won't timeout
dbfull()->TEST_FlushMemTable(true);
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
ASSERT_TRUE(dbfull()->SetOptions({
{"disable_auto_compactions", "false"}
}));
dbfull()->CompactRange(nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
// Wait for compaction so that put won't timeout
dbfull()->TEST_FlushMemTable(true);
}
dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0), 4);
// Test for hard_rate_limit, change max_bytes_for_level_base to make level
// size big
ASSERT_TRUE(dbfull()->SetOptions({
{"max_bytes_for_level_base", std::to_string(k256KB) }
}));
// writing 40 x 64KB = 10 x 256KB
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
for (int i = 0; i < 40; ++i) {
gen_l0_kb(i, 64, 32);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) > k256KB * 0.8 &&
SizeAtLevel(1) < k256KB * 1.2);
ASSERT_TRUE(SizeAtLevel(2) > 2 * k256KB * 0.8 &&
SizeAtLevel(2) < 2 * k256KB * 1.2);
ASSERT_TRUE(SizeAtLevel(3) > 4 * k256KB * 0.8 &&
SizeAtLevel(3) < 4 * k256KB * 1.2);
// Reduce max_bytes_for_level_base and disable compaction at the same time
// This should cause score to increase
ASSERT_TRUE(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
{"max_bytes_for_level_base", "65536"},
}));
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
dbfull()->TEST_FlushMemTable(true);
// Check score is above 2
ASSERT_TRUE(SizeAtLevel(1) / k64KB > 2 ||
SizeAtLevel(2) / k64KB > 4 ||
SizeAtLevel(3) / k64KB > 8);
// Enfoce hard rate limit, L0 score is not regulated by this limit
ASSERT_TRUE(dbfull()->SetOptions({
{"hard_rate_limit", "2"}
}));
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
dbfull()->TEST_FlushMemTable(true);
// Hard rate limit slow down for 1000 us, so default 10ms should be ok
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
wo.timeout_hint_us = 500;
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).IsTimedOut());
// Bump up limit
ASSERT_TRUE(dbfull()->SetOptions({
{"hard_rate_limit", "100"}
}));
dbfull()->TEST_FlushMemTable(true);
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
}
} // namespace rocksdb

View File

@ -133,6 +133,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
return kBackgroundErrors;
} else if (in == "cur-size-active-mem-table") {
return kCurSizeActiveMemTable;
} else if (in == "cur-size-all-mem-tables") {
return kCurSizeAllMemTables;
} else if (in == "num-entries-active-mem-table") {
return kNumEntriesInMutableMemtable;
} else if (in == "num-entries-imm-mem-tables") {
@ -250,12 +252,17 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
// Current size of the active memtable
*value = cfd_->mem()->ApproximateMemoryUsage();
return true;
case kCurSizeAllMemTables:
// Current size of the active memtable + immutable memtables
*value = cfd_->mem()->ApproximateMemoryUsage() +
cfd_->imm()->ApproximateMemoryUsage();
return true;
case kNumEntriesInMutableMemtable:
// Current size of the active memtable
// Current number of entires in the active memtable
*value = cfd_->mem()->GetNumEntries();
return true;
case kNumEntriesInImmutableMemtable:
// Current size of the active memtable
// Current number of entries in the immutable memtables
*value = cfd_->imm()->current()->GetTotalNumEntries();
return true;
case kEstimatedNumKeys:

View File

@ -36,6 +36,8 @@ enum DBPropertyType : uint32_t {
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
kBackgroundErrors, // Return accumulated background errors encountered.
kCurSizeActiveMemTable, // Return current size of the active memtable
kCurSizeAllMemTables, // Return current size of all (active + immutable)
// memtables
kNumEntriesInMutableMemtable, // Return number of entries in the mutable
// memtable.
kNumEntriesInImmutableMemtable, // Return sum of number of entries in all

View File

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <algorithm>
#include <set>
#include <utility>
#include <vector>
@ -74,7 +75,7 @@ struct FileMetaData {
// Stats for compensating deletion entries during compaction
// File size compensated by deletion entry.
// This is updated in Version::UpdateTemporaryStats() first time when the
// This is updated in Version::UpdateAccumulatedStats() first time when the
// file is created or loaded. After it is updated, it is immutable.
uint64_t compensated_file_size;
uint64_t num_entries; // the number of entries.

View File

@ -597,7 +597,19 @@ uint64_t Version::GetEstimatedActiveKeys() {
// (1) there is merge keys
// (2) keys are directly overwritten
// (3) deletion on non-existing keys
return num_non_deletions_ - num_deletions_;
// (4) low number of samples
if (num_samples_ == 0) {
return 0;
}
if (num_samples_ < files_->size()) {
// casting to avoid overflowing
return static_cast<uint64_t>(static_cast<double>(
accumulated_num_non_deletions_ - accumulated_num_deletions_) *
files_->size() / num_samples_);
} else {
return accumulated_num_non_deletions_ - accumulated_num_deletions_;
}
}
void Version::AddIterators(const ReadOptions& read_options,
@ -658,17 +670,21 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
compaction_score_(num_levels_),
compaction_level_(num_levels_),
version_number_(version_number),
total_file_size_(0),
total_raw_key_size_(0),
total_raw_value_size_(0),
num_non_deletions_(0),
num_deletions_(0) {
accumulated_file_size_(0),
accumulated_raw_key_size_(0),
accumulated_raw_value_size_(0),
accumulated_num_non_deletions_(0),
accumulated_num_deletions_(0),
num_samples_(0) {
if (cfd != nullptr && cfd->current() != nullptr) {
total_file_size_ = cfd->current()->total_file_size_;
total_raw_key_size_ = cfd->current()->total_raw_key_size_;
total_raw_value_size_ = cfd->current()->total_raw_value_size_;
num_non_deletions_ = cfd->current()->num_non_deletions_;
num_deletions_ = cfd->current()->num_deletions_;
accumulated_file_size_ = cfd->current()->accumulated_file_size_;
accumulated_raw_key_size_ = cfd->current()->accumulated_raw_key_size_;
accumulated_raw_value_size_ =
cfd->current()->accumulated_raw_value_size_;
accumulated_num_non_deletions_ =
cfd->current()->accumulated_num_non_deletions_;
accumulated_num_deletions_ = cfd->current()->accumulated_num_deletions_;
num_samples_ = cfd->current()->num_samples_;
}
}
@ -748,7 +764,7 @@ void Version::GenerateFileLevels() {
void Version::PrepareApply(const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted) {
UpdateTemporaryStats();
UpdateAccumulatedStats();
ComputeCompactionScore(mutable_cf_options, size_being_compacted);
UpdateFilesBySize();
UpdateNumNonEmptyLevels();
@ -757,7 +773,8 @@ void Version::PrepareApply(const MutableCFOptions& mutable_cf_options,
}
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
if (file_meta->init_stats_from_file) {
if (file_meta->init_stats_from_file ||
file_meta->compensated_file_size > 0) {
return false;
}
std::shared_ptr<const TableProperties> tp;
@ -778,26 +795,55 @@ bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
return true;
}
void Version::UpdateTemporaryStats() {
void Version::UpdateAccumulatedStats(FileMetaData* file_meta) {
assert(file_meta->init_stats_from_file);
accumulated_file_size_ += file_meta->fd.GetFileSize();
accumulated_raw_key_size_ += file_meta->raw_key_size;
accumulated_raw_value_size_ += file_meta->raw_value_size;
accumulated_num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
accumulated_num_deletions_ += file_meta->num_deletions;
num_samples_++;
}
void Version::UpdateAccumulatedStats() {
static const int kDeletionWeightOnCompaction = 2;
// incrementally update the average value size by
// including newly added files into the global stats
// maximum number of table properties loaded from files.
const int kMaxInitCount = 20;
int init_count = 0;
int total_count = 0;
for (int level = 0; level < num_levels_; level++) {
// here only the first kMaxInitCount files which haven't been
// initialized from file will be updated with num_deletions.
// The motivation here is to cap the maximum I/O per Version creation.
// The reason for choosing files from lower-level instead of higher-level
// is that such design is able to propagate the initialization from
// lower-level to higher-level: When the num_deletions of lower-level
// files are updated, it will make the lower-level files have accurate
// compensated_file_size, making lower-level to higher-level compaction
// will be triggered, which creates higher-level files whose num_deletions
// will be updated here.
for (int level = 0;
level < num_levels_ && init_count < kMaxInitCount; ++level) {
for (auto* file_meta : files_[level]) {
if (MaybeInitializeFileMetaData(file_meta)) {
// each FileMeta will be initialized only once.
total_file_size_ += file_meta->fd.GetFileSize();
total_raw_key_size_ += file_meta->raw_key_size;
total_raw_value_size_ += file_meta->raw_value_size;
num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
num_deletions_ += file_meta->num_deletions;
init_count++;
UpdateAccumulatedStats(file_meta);
if (++init_count >= kMaxInitCount) {
break;
}
}
}
}
// In case all sampled-files contain only deletion entries, then we
// load the table-property of a file in higher-level to initialize
// that value.
for (int level = num_levels_ - 1;
accumulated_raw_value_size_ == 0 && level >= 0; --level) {
for (int i = static_cast<int>(files_[level].size()) - 1;
accumulated_raw_value_size_ == 0 && i >= 0; --i) {
if (MaybeInitializeFileMetaData(files_[level][i])) {
UpdateAccumulatedStats(files_[level][i]);
}
total_count++;
}
}

View File

@ -212,13 +212,15 @@ class Version {
uint64_t GetVersionNumber() const { return version_number_; }
uint64_t GetAverageValueSize() const {
if (num_non_deletions_ == 0) {
if (accumulated_num_non_deletions_ == 0) {
return 0;
}
assert(total_raw_key_size_ + total_raw_value_size_ > 0);
assert(total_file_size_ > 0);
return total_raw_value_size_ / num_non_deletions_ * total_file_size_ /
(total_raw_key_size_ + total_raw_value_size_);
assert(accumulated_raw_key_size_ + accumulated_raw_value_size_ > 0);
assert(accumulated_file_size_ > 0);
return accumulated_raw_value_size_ /
accumulated_num_non_deletions_ *
accumulated_file_size_ /
(accumulated_raw_key_size_ + accumulated_raw_value_size_);
}
// REQUIRES: lock is held
@ -268,14 +270,17 @@ class Version {
// Update num_non_empty_levels_.
void UpdateNumNonEmptyLevels();
// The helper function of UpdateTemporaryStats, which may fill the missing
// The helper function of UpdateAccumulatedStats, which may fill the missing
// fields of file_mata from its associated TableProperties.
// Returns true if it does initialize FileMetaData.
bool MaybeInitializeFileMetaData(FileMetaData* file_meta);
// Update the temporary stats associated with the current version.
// This temporary stats will be used in compaction.
void UpdateTemporaryStats();
// Update the accumulated stats from a file-meta.
void UpdateAccumulatedStats(FileMetaData* file_meta);
// Update the accumulated stats associated with the current version.
// This accumulated stats will be used in compaction.
void UpdateAccumulatedStats();
// Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first.
@ -337,16 +342,19 @@ class Version {
Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
// total file size
uint64_t total_file_size_;
// the total size of all raw keys.
uint64_t total_raw_key_size_;
// the total size of all raw values.
uint64_t total_raw_value_size_;
// the following are the sampled temporary stats.
// the current accumulated size of sampled files.
uint64_t accumulated_file_size_;
// the current accumulated size of all raw keys based on the sampled files.
uint64_t accumulated_raw_key_size_;
// the current accumulated size of all raw keys based on the sampled files.
uint64_t accumulated_raw_value_size_;
// total number of non-deletion entries
uint64_t num_non_deletions_;
uint64_t accumulated_num_non_deletions_;
// total number of deletion entries
uint64_t num_deletions_;
uint64_t accumulated_num_deletions_;
// the number of samples
uint64_t num_samples_;
~Version();

View File

@ -42,7 +42,7 @@ test: java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ColumnFamilyTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.FilterTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.KeyMayExistTest
#java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOnlyTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MergeTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest

View File

@ -112,7 +112,7 @@ public class Options extends RocksObject {
*/
public boolean createMissingColumnFamilies() {
assert(isInitialized());
return createIfMissing(nativeHandle_);
return createMissingColumnFamilies(nativeHandle_);
}
/**
@ -1154,6 +1154,7 @@ public class Options extends RocksObject {
*/
public Options setMemTableConfig(MemTableConfig config)
throws RocksDBException {
memTableConfig_ = config;
setMemTableFactory(nativeHandle_, config.newMemTableFactoryHandle());
return this;
}
@ -1168,6 +1169,7 @@ public class Options extends RocksObject {
* @throws RocksDBException
*/
public Options setRateLimiterConfig(RateLimiterConfig config) {
rateLimiterConfig_ = config;
setRateLimiter(nativeHandle_, config.newRateLimiterHandle());
return this;
}
@ -1191,6 +1193,7 @@ public class Options extends RocksObject {
* @return the reference of the current Options.
*/
public Options setTableFormatConfig(TableFormatConfig config) {
tableFormatConfig_ = config;
setTableFactory(nativeHandle_, config.newTableFactoryHandle());
return this;
}
@ -2316,4 +2319,7 @@ public class Options extends RocksObject {
long cacheSize_;
int numShardBits_;
RocksEnv env_;
MemTableConfig memTableConfig_;
TableFormatConfig tableFormatConfig_;
RateLimiterConfig rateLimiterConfig_;
}

View File

@ -13,19 +13,30 @@ public class FilterTest {
}
public static void main(String[] args) {
Options options = new Options();
// test table config without filter
// test table config
BlockBasedTableConfig blockConfig = new BlockBasedTableConfig();
options.setTableFormatConfig(blockConfig);
options.setTableFormatConfig(new BlockBasedTableConfig().
setFilter(new BloomFilter()));
options.dispose();
System.gc();
System.runFinalization();
// new Bloom filter
options = new Options();
blockConfig = new BlockBasedTableConfig();
blockConfig.setFilter(new BloomFilter());
options.setTableFormatConfig(blockConfig);
blockConfig.setFilter(new BloomFilter(10));
BloomFilter bloomFilter = new BloomFilter(10);
blockConfig.setFilter(bloomFilter);
options.setTableFormatConfig(blockConfig);
System.gc();
System.runFinalization();
blockConfig.setFilter(new BloomFilter(10, false));
options.setTableFormatConfig(blockConfig);
options.dispose();
options = null;
blockConfig = null;
System.gc();
System.runFinalization();
System.out.println("Filter test passed");
}
}

View File

@ -24,9 +24,12 @@
void Java_org_rocksdb_BloomFilter_createNewBloomFilter(
JNIEnv* env, jobject jobj, jint bits_per_key,
jboolean use_block_base_builder) {
const rocksdb::FilterPolicy* fp = rocksdb::NewBloomFilterPolicy(bits_per_key,
use_block_base_builder);
rocksdb::FilterJni::setHandle(env, jobj, fp);
rocksdb::FilterPolicy* fp = const_cast<rocksdb::FilterPolicy *>(
rocksdb::NewBloomFilterPolicy(bits_per_key, use_block_base_builder));
std::shared_ptr<rocksdb::FilterPolicy> *pFilterPolicy =
new std::shared_ptr<rocksdb::FilterPolicy>;
*pFilterPolicy = std::shared_ptr<rocksdb::FilterPolicy>(fp);
rocksdb::FilterJni::setHandle(env, jobj, pFilterPolicy);
}
/*
@ -35,6 +38,9 @@ void Java_org_rocksdb_BloomFilter_createNewBloomFilter(
* Signature: (J)V
*/
void Java_org_rocksdb_Filter_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
delete reinterpret_cast<rocksdb::FilterPolicy*>(handle);
JNIEnv* env, jobject jobj, jlong jhandle) {
std::shared_ptr<rocksdb::FilterPolicy> *handle =
reinterpret_cast<std::shared_ptr<rocksdb::FilterPolicy> *>(jhandle);
handle->reset();
}

View File

@ -313,14 +313,16 @@ class FilterJni {
}
// Get the pointer to rocksdb::FilterPolicy.
static rocksdb::FilterPolicy* getHandle(JNIEnv* env, jobject jobj) {
return reinterpret_cast<rocksdb::FilterPolicy*>(
static std::shared_ptr<rocksdb::FilterPolicy>* getHandle(
JNIEnv* env, jobject jobj) {
return reinterpret_cast
<std::shared_ptr<rocksdb::FilterPolicy> *>(
env->GetLongField(jobj, getHandleFieldID(env)));
}
// Pass the rocksdb::FilterPolicy pointer to the java side.
static void setHandle(
JNIEnv* env, jobject jobj, const rocksdb::FilterPolicy* op) {
JNIEnv* env, jobject jobj, std::shared_ptr<rocksdb::FilterPolicy>* op) {
env->SetLongField(
jobj, getHandleFieldID(env),
reinterpret_cast<jlong>(op));

View File

@ -56,8 +56,10 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
options.block_restart_interval = block_restart_interval;
options.whole_key_filtering = whole_key_filtering;
if (jfilterPolicy > 0) {
options.filter_policy.reset(
reinterpret_cast<rocksdb::FilterPolicy*>(jfilterPolicy));
std::shared_ptr<rocksdb::FilterPolicy> *pFilterPolicy =
reinterpret_cast<std::shared_ptr<rocksdb::FilterPolicy> *>(
jfilterPolicy);
options.filter_policy = *pFilterPolicy;
}
options.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
options.hash_index_allow_collision = hash_index_allow_collision;

View File

@ -40,6 +40,7 @@ const string LDBCommand::ARG_FROM = "from";
const string LDBCommand::ARG_TO = "to";
const string LDBCommand::ARG_MAX_KEYS = "max_keys";
const string LDBCommand::ARG_BLOOM_BITS = "bloom_bits";
const string LDBCommand::ARG_FIX_PREFIX_LEN = "fix_prefix_len";
const string LDBCommand::ARG_COMPRESSION_TYPE = "compression_type";
const string LDBCommand::ARG_BLOCK_SIZE = "block_size";
const string LDBCommand::ARG_AUTO_COMPACTION = "auto_compaction";
@ -221,9 +222,11 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
map<string, string>::const_iterator itr;
BlockBasedTableOptions table_options;
bool use_table_options = false;
int bits;
if (ParseIntOption(option_map_, ARG_BLOOM_BITS, bits, exec_state_)) {
if (bits > 0) {
use_table_options = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(bits));
} else {
exec_state_ = LDBCommandExecuteResult::FAILED(ARG_BLOOM_BITS +
@ -234,14 +237,18 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
int block_size;
if (ParseIntOption(option_map_, ARG_BLOCK_SIZE, block_size, exec_state_)) {
if (block_size > 0) {
use_table_options = true;
table_options.block_size = block_size;
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
} else {
exec_state_ = LDBCommandExecuteResult::FAILED(ARG_BLOCK_SIZE +
" must be > 0.");
}
}
if (use_table_options) {
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
}
itr = option_map_.find(ARG_AUTO_COMPACTION);
if (itr != option_map_.end()) {
opt.disable_auto_compactions = ! StringToBool(itr->second);
@ -294,6 +301,18 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
opt.db_paths.emplace_back(db_path_, std::numeric_limits<uint64_t>::max());
}
int fix_prefix_len;
if (ParseIntOption(option_map_, ARG_FIX_PREFIX_LEN, fix_prefix_len,
exec_state_)) {
if (fix_prefix_len > 0) {
opt.prefix_extractor.reset(
NewFixedPrefixTransform(static_cast<size_t>(fix_prefix_len)));
} else {
exec_state_ =
LDBCommandExecuteResult::FAILED(ARG_FIX_PREFIX_LEN + " must be > 0.");
}
}
return opt;
}

View File

@ -46,6 +46,7 @@ public:
static const string ARG_TO;
static const string ARG_MAX_KEYS;
static const string ARG_BLOOM_BITS;
static const string ARG_FIX_PREFIX_LEN;
static const string ARG_COMPRESSION_TYPE;
static const string ARG_BLOCK_SIZE;
static const string ARG_AUTO_COMPACTION;
@ -284,9 +285,10 @@ protected:
* passed in.
*/
vector<string> BuildCmdLineOptions(vector<string> options) {
vector<string> ret = {ARG_DB, ARG_BLOOM_BITS, ARG_BLOCK_SIZE,
ARG_AUTO_COMPACTION, ARG_COMPRESSION_TYPE,
ARG_WRITE_BUFFER_SIZE, ARG_FILE_SIZE};
vector<string> ret = {ARG_DB, ARG_BLOOM_BITS,
ARG_BLOCK_SIZE, ARG_AUTO_COMPACTION,
ARG_COMPRESSION_TYPE, ARG_WRITE_BUFFER_SIZE,
ARG_FILE_SIZE, ARG_FIX_PREFIX_LEN};
ret.insert(ret.end(), options.begin(), options.end());
return ret;
}

View File

@ -47,6 +47,7 @@ public:
" with 'put','get','scan','dump','query','batchput'"
" : DB supports ttl and value is internally timestamp-suffixed\n");
ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=<int,e.g.:14>\n");
ret.append(" --" + LDBCommand::ARG_FIX_PREFIX_LEN + "=<int,e.g.:14>\n");
ret.append(" --" + LDBCommand::ARG_COMPRESSION_TYPE +
"=<no|snappy|zlib|bzip2>\n");
ret.append(" --" + LDBCommand::ARG_BLOCK_SIZE +

View File

@ -3,8 +3,15 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <limits>
#include <cassert>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
#include "util/mutable_cf_options.h"
@ -69,4 +76,56 @@ uint64_t MutableCFOptions::ExpandedCompactionByteSizeLimit(int level) const {
return MaxFileSizeForLevel(level) * expanded_compaction_factor;
}
void MutableCFOptions::Dump(Logger* log) const {
// Memtable related options
Log(log, " write_buffer_size: %zu", write_buffer_size);
Log(log, " max_write_buffer_number: %d",
max_write_buffer_number);
Log(log, " arena_block_size: %zu", arena_block_size);
Log(log, " memtable_prefix_bloom_bits: %" PRIu32,
memtable_prefix_bloom_bits);
Log(log, " memtable_prefix_bloom_probes: %" PRIu32,
memtable_prefix_bloom_probes);
Log(log, " memtable_prefix_bloom_huge_page_tlb_size: %zu",
memtable_prefix_bloom_huge_page_tlb_size);
Log(log, " max_successive_merges: %zu",
max_successive_merges);
Log(log, " filter_deletes: %d",
filter_deletes);
Log(log, " disable_auto_compactions: %d",
disable_auto_compactions);
Log(log, " soft_rate_limit: %lf",
soft_rate_limit);
Log(log, " hard_rate_limit: %lf",
hard_rate_limit);
Log(log, " level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger);
Log(log, " level0_slowdown_writes_trigger: %d",
level0_slowdown_writes_trigger);
Log(log, " level0_stop_writes_trigger: %d",
level0_stop_writes_trigger);
Log(log, " max_grandparent_overlap_factor: %d",
max_grandparent_overlap_factor);
Log(log, " expanded_compaction_factor: %d",
expanded_compaction_factor);
Log(log, " source_compaction_factor: %d",
source_compaction_factor);
Log(log, " target_file_size_base: %d",
target_file_size_base);
Log(log, " target_file_size_multiplier: %d",
target_file_size_multiplier);
Log(log, " max_bytes_for_level_base: %" PRIu64,
max_bytes_for_level_base);
Log(log, " max_bytes_for_level_multiplier: %d",
max_bytes_for_level_multiplier);
std::string result;
char buf[10];
for (const auto m : max_bytes_for_level_multiplier_additional) {
snprintf(buf, sizeof(buf), "%d, ", m);
result += buf;
}
result.resize(result.size() - 2);
Log(log, "max_bytes_for_level_multiplier_additional: %s", result.c_str());
}
} // namespace rocksdb

View File

@ -14,6 +14,7 @@ namespace rocksdb {
struct MutableCFOptions {
MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions)
: write_buffer_size(options.write_buffer_size),
max_write_buffer_number(options.max_write_buffer_number),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes),
@ -21,6 +22,9 @@ struct MutableCFOptions {
options.memtable_prefix_bloom_huge_page_tlb_size),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes),
disable_auto_compactions(options.disable_auto_compactions),
soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit),
level0_file_num_compaction_trigger(
options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
@ -39,12 +43,16 @@ struct MutableCFOptions {
}
MutableCFOptions()
: write_buffer_size(0),
max_write_buffer_number(0),
arena_block_size(0),
memtable_prefix_bloom_bits(0),
memtable_prefix_bloom_probes(0),
memtable_prefix_bloom_huge_page_tlb_size(0),
max_successive_merges(0),
filter_deletes(false),
disable_auto_compactions(false),
soft_rate_limit(0),
hard_rate_limit(0),
level0_file_num_compaction_trigger(0),
level0_slowdown_writes_trigger(0),
level0_stop_writes_trigger(0),
@ -70,8 +78,11 @@ struct MutableCFOptions {
uint64_t MaxGrandParentOverlapBytes(int level) const;
uint64_t ExpandedCompactionByteSizeLimit(int level) const;
void Dump(Logger* log) const;
// Memtable related options
size_t write_buffer_size;
int max_write_buffer_number;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
uint32_t memtable_prefix_bloom_probes;
@ -80,6 +91,9 @@ struct MutableCFOptions {
bool filter_deletes;
// Compaction related options
bool disable_auto_compactions;
double soft_rate_limit;
double hard_rate_limit;
int level0_file_num_compaction_trigger;
int level0_slowdown_writes_trigger;
int level0_stop_writes_trigger;

View File

@ -92,6 +92,8 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value,
new_options->max_successive_merges = ParseInt64(value);
} else if (name == "filter_deletes") {
new_options->filter_deletes = ParseBoolean(name, value);
} else if (name == "max_write_buffer_number") {
new_options->max_write_buffer_number = ParseInt(value);
} else {
return false;
}
@ -101,7 +103,13 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value,
template<typename OptionsType>
bool ParseCompactionOptions(const std::string& name, const std::string& value,
OptionsType* new_options) {
if (name == "level0_file_num_compaction_trigger") {
if (name == "disable_auto_compactions") {
new_options->disable_auto_compactions = ParseBoolean(name, value);
} else if (name == "soft_rate_limit") {
new_options->soft_rate_limit = ParseDouble(value);
} else if (name == "hard_rate_limit") {
new_options->hard_rate_limit = ParseDouble(value);
} else if (name == "level0_file_num_compaction_trigger") {
new_options->level0_file_num_compaction_trigger = ParseInt(value);
} else if (name == "level0_slowdown_writes_trigger") {
new_options->level0_slowdown_writes_trigger = ParseInt(value);
@ -220,8 +228,6 @@ bool GetColumnFamilyOptionsFromMap(
try {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else if (o.first == "max_write_buffer_number") {
new_options->max_write_buffer_number = ParseInt(o.second);
} else if (o.first == "min_write_buffer_number_to_merge") {
new_options->min_write_buffer_number_to_merge = ParseInt(o.second);
} else if (o.first == "compression") {
@ -266,12 +272,6 @@ bool GetColumnFamilyOptionsFromMap(
new_options->num_levels = ParseInt(o.second);
} else if (o.first == "max_mem_compaction_level") {
new_options->max_mem_compaction_level = ParseInt(o.second);
} else if (o.first == "soft_rate_limit") {
new_options->soft_rate_limit = ParseDouble(o.second);
} else if (o.first == "hard_rate_limit") {
new_options->hard_rate_limit = ParseDouble(o.second);
} else if (o.first == "disable_auto_compactions") {
new_options->disable_auto_compactions = ParseBoolean(o.first, o.second);
} else if (o.first == "purge_redundant_kvs_while_flush") {
new_options->purge_redundant_kvs_while_flush =
ParseBoolean(o.first, o.second);

View File

@ -304,6 +304,10 @@ struct WriteBatchIndexEntry {
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
: offset(0), column_family(c), search_key(sk) {}
// If this flag appears in the offset, it indicates a key that is smaller
// than any other entry for the same column family
static const size_t kFlagMin = std::numeric_limits<size_t>::max();
size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry
const Slice* search_key; // if not null, instead of reading keys from
@ -354,14 +358,16 @@ class WBWIIteratorImpl : public WBWIIterator {
virtual void SeekToFirst() {
valid_ = true;
WriteBatchIndexEntry search_entry(nullptr, column_family_id_);
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_);
skip_list_iter_.Seek(&search_entry);
ReadEntry();
}
virtual void SeekToLast() {
valid_ = true;
WriteBatchIndexEntry search_entry(nullptr, column_family_id_ + 1);
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_ + 1);
skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast();
@ -636,6 +642,12 @@ int WriteBatchEntryComparator::operator()(
return -1;
}
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
return -1;
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
return 1;
}
Status s;
Slice key1, key2;
if (entry1->search_key == nullptr) {

View File

@ -522,7 +522,18 @@ TEST(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
Random rnd(rand_seed);
ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
if (rand_seed % 2 == 0) {
batch.Put(&cf2, "zoo", "bar");
}
if (rand_seed % 4 == 1) {
batch.Put(&cf3, "zoo", "bar");
}
KVMap map;
KVMap merged_map;
for (auto key : source_strings) {
@ -619,6 +630,7 @@ TEST(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
{
@ -659,7 +671,21 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
AssertIter(iter.get(), "a", "aa");
}
// Test the case that there is one element in the write batch
batch.Put(&cf2, "zoo", "bar");
batch.Put(&cf1, "a", "aa");
{
KVMap empty_map;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
iter->SeekToFirst();
AssertIter(iter.get(), "a", "aa");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
batch.Delete(&cf1, "b");
batch.Put(&cf1, "c", "cc");
batch.Put(&cf1, "d", "dd");
@ -725,6 +751,7 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
iter->Next();
AssertIter(iter.get(), "f", "ff");
}
{
KVMap empty_map;
std::unique_ptr<Iterator> iter(
@ -763,6 +790,60 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
AssertIter(iter.get(), "c", "cc");
}
}
TEST(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {
ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator());
ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
// Test the case that there is one element in the write batch
batch.Put(&cf2, "zoo", "bar");
batch.Put(&cf1, "a", "aa");
{
KVMap empty_map;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
iter->SeekToFirst();
AssertIter(iter.get(), "a", "aa");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
batch.Put(&cf1, "c", "cc");
{
KVMap map;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
iter->SeekToFirst();
AssertIter(iter.get(), "c", "cc");
iter->Next();
AssertIter(iter.get(), "a", "aa");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->SeekToLast();
AssertIter(iter.get(), "a", "aa");
iter->Prev();
AssertIter(iter.get(), "c", "cc");
iter->Prev();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("b");
AssertIter(iter.get(), "a", "aa");
iter->Prev();
AssertIter(iter.get(), "c", "cc");
iter->Seek("a");
AssertIter(iter.get(), "a", "aa");
}
}
} // namespace
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }