Merge branch 'master' into columnfamilies
Conflicts: db/db_impl.cc db/db_impl.h db/db_impl_readonly.cc
This commit is contained in:
commit
2a9271b403
@ -551,7 +551,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
|
||||
return nullptr;
|
||||
}
|
||||
Version::FileSummaryStorage tmp;
|
||||
Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
|
||||
Log(options_->info_log, "Universal: candidate files(%zu): %s\n",
|
||||
version->files_[level].size(),
|
||||
version->LevelFileSummary(&tmp, 0));
|
||||
|
||||
|
@ -99,6 +99,7 @@ DEFINE_string(benchmarks,
|
||||
"Must be used with merge_operator\n"
|
||||
"\treadrandommergerandom -- perform N random read-or-merge "
|
||||
"operations. Must be used with merge_operator\n"
|
||||
"\tnewiterator -- repeated iterator creation\n"
|
||||
"\tseekrandom -- N random seeks\n"
|
||||
"\tcrc32c -- repeated crc32c of 4K of data\n"
|
||||
"\tacquireload -- load N*1000 times\n"
|
||||
@ -1089,6 +1090,8 @@ class Benchmark {
|
||||
method = &Benchmark::ReadRandom;
|
||||
} else if (name == Slice("readmissing")) {
|
||||
method = &Benchmark::ReadMissing;
|
||||
} else if (name == Slice("newiterator")) {
|
||||
method = &Benchmark::IteratorCreation;
|
||||
} else if (name == Slice("seekrandom")) {
|
||||
method = &Benchmark::SeekRandom;
|
||||
} else if (name == Slice("readhot")) {
|
||||
@ -1526,14 +1529,14 @@ class Benchmark {
|
||||
++count;
|
||||
char* tab = std::find(linep, linep + bufferLen, columnSeparator);
|
||||
if (tab == linep + bufferLen) {
|
||||
fprintf(stderr, "[Error] No Key delimiter TAB at line %ld\n", count);
|
||||
fprintf(stderr, "[Error] No Key delimiter TAB at line %zu\n", count);
|
||||
continue;
|
||||
}
|
||||
Slice key(linep, tab - linep);
|
||||
tab++;
|
||||
char* endLine = std::find(tab, linep + bufferLen, lineSeparator);
|
||||
if (endLine == linep + bufferLen) {
|
||||
fprintf(stderr, "[Error] No ENTER at end of line # %ld\n", count);
|
||||
fprintf(stderr, "[Error] No ENTER at end of line # %zu\n", count);
|
||||
continue;
|
||||
}
|
||||
Slice value(tab, endLine - tab);
|
||||
@ -1877,6 +1880,16 @@ class Benchmark {
|
||||
thread->stats.AddMessage(msg);
|
||||
}
|
||||
|
||||
void IteratorCreation(ThreadState* thread) {
|
||||
Duration duration(FLAGS_duration, reads_);
|
||||
ReadOptions options(FLAGS_verify_checksum, true);
|
||||
while (!duration.Done(1)) {
|
||||
Iterator* iter = db_->NewIterator(options);
|
||||
delete iter;
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
}
|
||||
|
||||
void SeekRandom(ThreadState* thread) {
|
||||
Duration duration(FLAGS_duration, reads_);
|
||||
ReadOptions options(FLAGS_verify_checksum, true);
|
||||
|
163
db/db_impl.cc
163
db/db_impl.cc
@ -2695,34 +2695,29 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
|
||||
namespace {
|
||||
struct IterState {
|
||||
IterState(DBImpl* db, port::Mutex* mu, SuperVersion* super_version)
|
||||
: db(db), mu(mu), super_version(super_version) {}
|
||||
|
||||
DBImpl* db;
|
||||
port::Mutex* mu;
|
||||
Version* version = nullptr;
|
||||
MemTable* mem = nullptr;
|
||||
MemTableListVersion* imm = nullptr;
|
||||
DBImpl *db;
|
||||
SuperVersion* super_version;
|
||||
};
|
||||
|
||||
static void CleanupIteratorState(void* arg1, void* arg2) {
|
||||
IterState* state = reinterpret_cast<IterState*>(arg1);
|
||||
DBImpl::DeletionState deletion_state(state->db->GetOptions().
|
||||
max_write_buffer_number);
|
||||
state->mu->Lock();
|
||||
if (state->mem) { // not set for immutable iterator
|
||||
MemTable* m = state->mem->Unref();
|
||||
if (m != nullptr) {
|
||||
deletion_state.memtables_to_free.push_back(m);
|
||||
}
|
||||
|
||||
bool need_cleanup = state->super_version->Unref();
|
||||
if (need_cleanup) {
|
||||
state->mu->Lock();
|
||||
state->super_version->Cleanup();
|
||||
state->db->FindObsoleteFiles(deletion_state, false, true);
|
||||
state->mu->Unlock();
|
||||
|
||||
delete state->super_version;
|
||||
state->db->PurgeObsoleteFiles(deletion_state);
|
||||
}
|
||||
if (state->version) { // not set for memtable-only iterator
|
||||
state->version->Unref();
|
||||
}
|
||||
if (state->imm) { // not set for memtable-only iterator
|
||||
state->imm->Unref(&deletion_state.memtables_to_free);
|
||||
}
|
||||
// fast path FindObsoleteFiles
|
||||
state->db->FindObsoleteFiles(deletion_state, false, true);
|
||||
state->mu->Unlock();
|
||||
state->db->PurgeObsoleteFiles(deletion_state);
|
||||
|
||||
delete state;
|
||||
}
|
||||
@ -2730,36 +2725,24 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
|
||||
|
||||
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
|
||||
SequenceNumber* latest_snapshot) {
|
||||
IterState* cleanup = new IterState;
|
||||
MemTable* mutable_mem;
|
||||
MemTableListVersion* immutable_mems;
|
||||
Version* version;
|
||||
|
||||
// Collect together all needed child iterators for mem
|
||||
mutex_.Lock();
|
||||
*latest_snapshot = versions_->LastSequence();
|
||||
mutable_mem = default_cfd_->mem();
|
||||
mutable_mem->Ref();
|
||||
immutable_mems = default_cfd_->imm()->current();
|
||||
immutable_mems->Ref();
|
||||
version = default_cfd_->current();
|
||||
version->Ref();
|
||||
SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref();
|
||||
mutex_.Unlock();
|
||||
|
||||
std::vector<Iterator*> iterator_list;
|
||||
iterator_list.push_back(mutable_mem->NewIterator(options));
|
||||
cleanup->mem = mutable_mem;
|
||||
cleanup->imm = immutable_mems;
|
||||
// Collect iterator for mutable mem
|
||||
iterator_list.push_back(super_version->mem->NewIterator(options));
|
||||
// Collect all needed child iterators for immutable memtables
|
||||
immutable_mems->AddIterators(options, &iterator_list);
|
||||
super_version->imm->AddIterators(options, &iterator_list);
|
||||
// Collect iterators for files in L0 - Ln
|
||||
version->AddIterators(options, storage_options_, &iterator_list);
|
||||
super_version->current->AddIterators(options, storage_options_,
|
||||
&iterator_list);
|
||||
Iterator* internal_iter =
|
||||
NewMergingIterator(&default_cfd_->internal_comparator(),
|
||||
&iterator_list[0], iterator_list.size());
|
||||
cleanup->version = version;
|
||||
cleanup->mu = &mutex_;
|
||||
cleanup->db = this;
|
||||
|
||||
IterState* cleanup = new IterState(this, &mutex_, super_version);
|
||||
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
|
||||
|
||||
return internal_iter;
|
||||
@ -2774,53 +2757,36 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
|
||||
const ReadOptions& options,
|
||||
uint64_t* superversion_number) {
|
||||
|
||||
MemTable* mutable_mem;
|
||||
MemTableListVersion* immutable_mems;
|
||||
Version* version;
|
||||
|
||||
// get all child iterators and bump their refcounts under lock
|
||||
mutex_.Lock();
|
||||
mutable_mem = default_cfd_->mem();
|
||||
mutable_mem->Ref();
|
||||
immutable_mems = default_cfd_->imm()->current();
|
||||
immutable_mems->Ref();
|
||||
version = default_cfd_->current();
|
||||
version->Ref();
|
||||
SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref();
|
||||
if (superversion_number != nullptr) {
|
||||
*superversion_number = CurrentVersionNumber();
|
||||
}
|
||||
mutex_.Unlock();
|
||||
|
||||
Iterator* mutable_iter = mutable_mem->NewIterator(options);
|
||||
IterState* mutable_cleanup = new IterState();
|
||||
mutable_cleanup->mem = mutable_mem;
|
||||
mutable_cleanup->db = this;
|
||||
mutable_cleanup->mu = &mutex_;
|
||||
mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr);
|
||||
|
||||
Iterator* mutable_iter = super_version->mem->NewIterator(options);
|
||||
// create a DBIter that only uses memtable content; see NewIterator()
|
||||
mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
|
||||
mutable_iter, kMaxSequenceNumber);
|
||||
|
||||
Iterator* immutable_iter;
|
||||
IterState* immutable_cleanup = new IterState();
|
||||
std::vector<Iterator*> list;
|
||||
immutable_mems->AddIterators(options, &list);
|
||||
immutable_cleanup->imm = immutable_mems;
|
||||
version->AddIterators(options, storage_options_, &list);
|
||||
immutable_cleanup->version = version;
|
||||
immutable_cleanup->db = this;
|
||||
immutable_cleanup->mu = &mutex_;
|
||||
|
||||
immutable_iter = NewMergingIterator(&default_cfd_->internal_comparator(),
|
||||
&list[0], list.size());
|
||||
immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup,
|
||||
nullptr);
|
||||
super_version->imm->AddIterators(options, &list);
|
||||
super_version->current->AddIterators(options, storage_options_, &list);
|
||||
Iterator* immutable_iter = NewMergingIterator(
|
||||
&default_cfd_->internal_comparator(), &list[0], list.size());
|
||||
|
||||
// create a DBIter that only uses memtable content; see NewIterator()
|
||||
immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
|
||||
immutable_iter, kMaxSequenceNumber);
|
||||
|
||||
// register cleanups
|
||||
mutable_iter->RegisterCleanup(CleanupIteratorState,
|
||||
new IterState(this, &mutex_, super_version), nullptr);
|
||||
|
||||
// bump the ref one more time since it will be Unref'ed twice
|
||||
immutable_iter->RegisterCleanup(CleanupIteratorState,
|
||||
new IterState(this, &mutex_, super_version->Ref()), nullptr);
|
||||
|
||||
return std::make_pair(mutable_iter, immutable_iter);
|
||||
}
|
||||
|
||||
@ -2943,7 +2909,6 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
|
||||
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
|
||||
SequenceNumber snapshot;
|
||||
std::vector<MemTable*> to_delete;
|
||||
|
||||
mutex_.Lock();
|
||||
if (options.snapshot != nullptr) {
|
||||
@ -2952,17 +2917,9 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
snapshot = versions_->LastSequence();
|
||||
}
|
||||
|
||||
// TODO only works for default column family
|
||||
MemTable* mem = default_cfd_->mem();
|
||||
MemTableListVersion* imm = default_cfd_->imm()->current();
|
||||
Version* current = default_cfd_->current();
|
||||
mem->Ref();
|
||||
imm->Ref();
|
||||
current->Ref();
|
||||
|
||||
// Unlock while reading from files and memtables
|
||||
|
||||
SuperVersion* get_version = default_cfd_->GetSuperVersion()->Ref();
|
||||
mutex_.Unlock();
|
||||
|
||||
bool have_stat_update = false;
|
||||
Version::GetStats stats;
|
||||
|
||||
@ -2987,12 +2944,14 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
std::string* value = &(*values)[i];
|
||||
|
||||
LookupKey lkey(keys[i], snapshot);
|
||||
if (mem->Get(lkey, value, &s, merge_context, options_)) {
|
||||
if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
|
||||
// Done
|
||||
} else if (imm->Get(lkey, value, &s, merge_context, options_)) {
|
||||
} else if (get_version->imm->Get(lkey, value, &s, merge_context,
|
||||
options_)) {
|
||||
// Done
|
||||
} else {
|
||||
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
|
||||
get_version->current->Get(options, lkey, value, &s, &merge_context,
|
||||
&stats, options_);
|
||||
have_stat_update = true;
|
||||
}
|
||||
|
||||
@ -3001,20 +2960,28 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
}
|
||||
}
|
||||
|
||||
// Post processing (decrement reference counts and record statistics)
|
||||
mutex_.Lock();
|
||||
if (!options_.disable_seek_compaction &&
|
||||
have_stat_update && current->UpdateStats(stats)) {
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
bool delete_get_version = false;
|
||||
if (!options_.disable_seek_compaction && have_stat_update) {
|
||||
mutex_.Lock();
|
||||
if (get_version->current->UpdateStats(stats)) {
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
if (get_version->Unref()) {
|
||||
get_version->Cleanup();
|
||||
delete_get_version = true;
|
||||
}
|
||||
mutex_.Unlock();
|
||||
} else {
|
||||
if (get_version->Unref()) {
|
||||
mutex_.Lock();
|
||||
get_version->Cleanup();
|
||||
mutex_.Unlock();
|
||||
delete_get_version = true;
|
||||
}
|
||||
}
|
||||
if (delete_get_version) {
|
||||
delete get_version;
|
||||
}
|
||||
MemTable* m = mem->Unref();
|
||||
imm->Unref(&to_delete);
|
||||
current->Unref();
|
||||
mutex_.Unlock();
|
||||
|
||||
// free up all obsolete memtables outside the mutex
|
||||
delete m;
|
||||
for (MemTable* v: to_delete) delete v;
|
||||
|
||||
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
|
||||
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys);
|
||||
|
@ -56,15 +56,15 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
|
||||
const ColumnFamilyHandle& column_family,
|
||||
const Slice& key, std::string* value) {
|
||||
Status s;
|
||||
MemTable* mem = GetDefaultColumnFamily()->mem();
|
||||
Version* current = GetDefaultColumnFamily()->current();
|
||||
SequenceNumber snapshot = versions_->LastSequence();
|
||||
SuperVersion* super_version = GetDefaultColumnFamily()->GetSuperVersion();
|
||||
MergeContext merge_context;
|
||||
LookupKey lkey(key, snapshot);
|
||||
if (mem->Get(lkey, value, &s, merge_context, options_)) {
|
||||
if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) {
|
||||
} else {
|
||||
Version::GetStats stats;
|
||||
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
|
||||
super_version->current->Get(options, lkey, value, &s, &merge_context,
|
||||
&stats, options_);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -92,6 +92,11 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
|
||||
ColumnFamilyDescriptor(default_column_family_name, cf_options));
|
||||
Status s = impl->Recover(column_families, true /* read only */,
|
||||
error_if_log_file_exist);
|
||||
if (s.ok()) {
|
||||
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
||||
delete cfd->InstallSuperVersion(new SuperVersion());
|
||||
}
|
||||
}
|
||||
impl->mutex_.Unlock();
|
||||
if (s.ok()) {
|
||||
*dbptr = impl;
|
||||
|
@ -421,6 +421,10 @@ class DBTest {
|
||||
return DB::Open(*options, dbname_, db);
|
||||
}
|
||||
|
||||
Status ReadOnlyReopen(Options* options) {
|
||||
return DB::OpenForReadOnly(*options, dbname_, &db_);
|
||||
}
|
||||
|
||||
Status TryReopen(Options* options = nullptr) {
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
@ -730,6 +734,26 @@ TEST(DBTest, ReadWrite) {
|
||||
} while (ChangeOptions());
|
||||
}
|
||||
|
||||
TEST(DBTest, ReadOnlyDB) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
ASSERT_OK(Put("bar", "v2"));
|
||||
ASSERT_OK(Put("foo", "v3"));
|
||||
Close();
|
||||
|
||||
Options options;
|
||||
ASSERT_OK(ReadOnlyReopen(&options));
|
||||
ASSERT_EQ("v3", Get("foo"));
|
||||
ASSERT_EQ("v2", Get("bar"));
|
||||
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||
int count = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_OK(iter->status());
|
||||
++count;
|
||||
}
|
||||
ASSERT_EQ(count, 2);
|
||||
delete iter;
|
||||
}
|
||||
|
||||
// Make sure that when options.block_cache is set, after a new table is
|
||||
// created its index/filter blocks are added to block cache.
|
||||
TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
|
||||
|
@ -1423,9 +1423,6 @@ class PosixEnv : public Env {
|
||||
nullptr,
|
||||
&ThreadPool::BGThreadWrapper,
|
||||
this));
|
||||
fprintf(stdout,
|
||||
"Created bg thread 0x%lx\n",
|
||||
(unsigned long)t);
|
||||
|
||||
// Set the thread name to aid debugging
|
||||
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
|
||||
|
@ -263,11 +263,11 @@ Options::Dump(Logger* log) const
|
||||
Log(log," Options.num_levels: %d", num_levels);
|
||||
Log(log," Options.disableDataSync: %d", disableDataSync);
|
||||
Log(log," Options.use_fsync: %d", use_fsync);
|
||||
Log(log," Options.max_log_file_size: %ld", max_log_file_size);
|
||||
Log(log," Options.max_log_file_size: %zu", max_log_file_size);
|
||||
Log(log,"Options.max_manifest_file_size: %lu",
|
||||
(unsigned long)max_manifest_file_size);
|
||||
Log(log," Options.log_file_time_to_roll: %ld", log_file_time_to_roll);
|
||||
Log(log," Options.keep_log_file_num: %ld", keep_log_file_num);
|
||||
Log(log," Options.log_file_time_to_roll: %zu", log_file_time_to_roll);
|
||||
Log(log," Options.keep_log_file_num: %zu", keep_log_file_num);
|
||||
Log(log," Options.db_stats_log_interval: %d",
|
||||
db_stats_log_interval);
|
||||
Log(log," Options.allow_os_buffer: %d", allow_os_buffer);
|
||||
@ -323,7 +323,7 @@ Options::Dump(Logger* log) const
|
||||
table_cache_numshardbits);
|
||||
Log(log," Options.table_cache_remove_scan_count_limit: %d",
|
||||
table_cache_remove_scan_count_limit);
|
||||
Log(log," Options.arena_block_size: %ld",
|
||||
Log(log," Options.arena_block_size: %zu",
|
||||
arena_block_size);
|
||||
Log(log," Options.delete_obsolete_files_period_micros: %lu",
|
||||
(unsigned long)delete_obsolete_files_period_micros);
|
||||
@ -343,7 +343,7 @@ Options::Dump(Logger* log) const
|
||||
(unsigned long)WAL_ttl_seconds);
|
||||
Log(log," Options.WAL_size_limit_MB: %lu",
|
||||
(unsigned long)WAL_size_limit_MB);
|
||||
Log(log," Options.manifest_preallocation_size: %ld",
|
||||
Log(log," Options.manifest_preallocation_size: %zu",
|
||||
manifest_preallocation_size);
|
||||
Log(log," Options.purge_redundant_kvs_while_flush: %d",
|
||||
purge_redundant_kvs_while_flush);
|
||||
|
Loading…
x
Reference in New Issue
Block a user