Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e51db47feb | ||
|
c3034fce32 | ||
|
2dea3dd258 | ||
|
37c4996282 | ||
|
53da604580 | ||
|
60f5a22cff | ||
|
2898067de8 | ||
|
238a9c3f68 | ||
|
2511b42c7e | ||
|
1920121cef | ||
|
7c393c3fc6 | ||
|
7513e9011b | ||
|
bf117c54ef |
18
HISTORY.md
18
HISTORY.md
@ -1,4 +1,22 @@
|
||||
# Rocksdb Change Log
|
||||
## 6.23.4 (2021-08-31)
|
||||
### Bug Fixes
|
||||
* Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache.
|
||||
|
||||
## 6.23.3 (2021-08-09)
|
||||
### Bug Fixes
|
||||
* Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file.
|
||||
* Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
|
||||
|
||||
## 6.23.2 (2021-08-04)
|
||||
### Bug Fixes
|
||||
* Fixed a race related to the destruction of `ColumnFamilyData` objects. The earlier logic unlocked the DB mutex before destroying the thread-local `SuperVersion` pointers, which could result in a process crash if another thread managed to get a reference to the `ColumnFamilyData` object.
|
||||
* Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
|
||||
|
||||
## 6.23.1 (2021-07-22)
|
||||
### Bug Fixes
|
||||
* Fix a race condition during multiple DB instances opening.
|
||||
|
||||
## 6.23.0 (2021-07-16)
|
||||
### Behavior Changes
|
||||
* Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present.
|
||||
|
11
cache/lru_cache.cc
vendored
11
cache/lru_cache.cc
vendored
@ -358,7 +358,10 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
|
||||
if (handle == nullptr) {
|
||||
LRU_Insert(e);
|
||||
} else {
|
||||
e->Ref();
|
||||
// If caller already holds a ref, no need to take one here
|
||||
if (!e->HasRefs()) {
|
||||
e->Ref();
|
||||
}
|
||||
*handle = reinterpret_cast<Cache::Handle*>(e);
|
||||
}
|
||||
}
|
||||
@ -396,11 +399,7 @@ void LRUCacheShard::Promote(LRUHandle* e) {
|
||||
if (e->value) {
|
||||
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(e);
|
||||
Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false);
|
||||
if (s.ok()) {
|
||||
// InsertItem would have taken a reference on the item, so decrement it
|
||||
// here as we expect the caller to already hold a reference
|
||||
e->Unref();
|
||||
} else {
|
||||
if (!s.ok()) {
|
||||
// Item is in memory, but not accounted against the cache capacity.
|
||||
// When the handle is released, the item should get deleted
|
||||
assert(!e->InCache());
|
||||
|
@ -457,7 +457,7 @@ void SuperVersion::Cleanup() {
|
||||
to_delete.push_back(m);
|
||||
}
|
||||
current->Unref();
|
||||
cfd->UnrefAndTryDelete(this);
|
||||
cfd->UnrefAndTryDelete();
|
||||
}
|
||||
|
||||
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
|
||||
@ -475,10 +475,10 @@ void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
|
||||
|
||||
namespace {
|
||||
void SuperVersionUnrefHandle(void* ptr) {
|
||||
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
|
||||
// destroyed. When former happens, the thread shouldn't see kSVInUse.
|
||||
// When latter happens, we are in ~ColumnFamilyData(), no get should happen as
|
||||
// well.
|
||||
// UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
|
||||
// destroyed. When the former happens, the thread shouldn't see kSVInUse.
|
||||
// When the latter happens, only super_version_ holds a reference
|
||||
// to ColumnFamilyData, so no further queries are possible.
|
||||
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
|
||||
bool was_last_ref __attribute__((__unused__));
|
||||
was_last_ref = sv->Unref();
|
||||
@ -668,7 +668,7 @@ ColumnFamilyData::~ColumnFamilyData() {
|
||||
}
|
||||
}
|
||||
|
||||
bool ColumnFamilyData::UnrefAndTryDelete(SuperVersion* sv_under_cleanup) {
|
||||
bool ColumnFamilyData::UnrefAndTryDelete() {
|
||||
int old_refs = refs_.fetch_sub(1);
|
||||
assert(old_refs > 0);
|
||||
|
||||
@ -678,22 +678,17 @@ bool ColumnFamilyData::UnrefAndTryDelete(SuperVersion* sv_under_cleanup) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// If called under SuperVersion::Cleanup, we should not re-enter Cleanup on
|
||||
// the same SuperVersion. (But while installing a new SuperVersion, this
|
||||
// cfd could be referenced only by two SuperVersions.)
|
||||
if (old_refs == 2 && super_version_ != nullptr &&
|
||||
super_version_ != sv_under_cleanup) {
|
||||
if (old_refs == 2 && super_version_ != nullptr) {
|
||||
// Only the super_version_ holds me
|
||||
SuperVersion* sv = super_version_;
|
||||
super_version_ = nullptr;
|
||||
// Release SuperVersion reference kept in ThreadLocalPtr.
|
||||
// This must be done outside of mutex_ since unref handler can lock mutex.
|
||||
sv->db_mutex->Unlock();
|
||||
|
||||
// Release SuperVersion references kept in ThreadLocalPtr.
|
||||
local_sv_.reset();
|
||||
sv->db_mutex->Lock();
|
||||
|
||||
if (sv->Unref()) {
|
||||
// May delete this ColumnFamilyData after calling Cleanup()
|
||||
// Note: sv will delete this ColumnFamilyData during Cleanup()
|
||||
assert(sv->cfd == this);
|
||||
sv->Cleanup();
|
||||
delete sv;
|
||||
return true;
|
||||
@ -1261,14 +1256,13 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
|
||||
void ColumnFamilyData::InstallSuperVersion(
|
||||
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
|
||||
db_mutex->AssertHeld();
|
||||
return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
|
||||
return InstallSuperVersion(sv_context, mutable_cf_options_);
|
||||
}
|
||||
|
||||
void ColumnFamilyData::InstallSuperVersion(
|
||||
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
|
||||
SuperVersionContext* sv_context,
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
SuperVersion* new_superversion = sv_context->new_superversion.release();
|
||||
new_superversion->db_mutex = db_mutex;
|
||||
new_superversion->mutable_cf_options = mutable_cf_options;
|
||||
new_superversion->Init(this, mem_, imm_.current(), current_);
|
||||
SuperVersion* old_superversion = super_version_;
|
||||
|
@ -208,8 +208,6 @@ struct SuperVersion {
|
||||
uint64_t version_number;
|
||||
WriteStallCondition write_stall_condition;
|
||||
|
||||
InstrumentedMutex* db_mutex;
|
||||
|
||||
// should be called outside the mutex
|
||||
SuperVersion() = default;
|
||||
~SuperVersion();
|
||||
@ -281,8 +279,7 @@ class ColumnFamilyData {
|
||||
// UnrefAndTryDelete() decreases the reference count and do free if needed,
|
||||
// return true if this is freed else false, UnrefAndTryDelete() can only
|
||||
// be called while holding a DB mutex, or during single-threaded recovery.
|
||||
// sv_under_cleanup is only provided when called from SuperVersion::Cleanup.
|
||||
bool UnrefAndTryDelete(SuperVersion* sv_under_cleanup = nullptr);
|
||||
bool UnrefAndTryDelete();
|
||||
|
||||
// SetDropped() can only be called under following conditions:
|
||||
// 1) Holding a DB mutex,
|
||||
@ -454,7 +451,6 @@ class ColumnFamilyData {
|
||||
// the clients to allocate SuperVersion outside of mutex.
|
||||
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
|
||||
void InstallSuperVersion(SuperVersionContext* sv_context,
|
||||
InstrumentedMutex* db_mutex,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
void InstallSuperVersion(SuperVersionContext* sv_context,
|
||||
InstrumentedMutex* db_mutex);
|
||||
|
@ -1362,6 +1362,28 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"one", "two"}, options);
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
ASSERT_OK(Put(2, "baz", "xyz"));
|
||||
ASSERT_OK(Put(1, "abc", "def"));
|
||||
|
||||
// Note: keys for the same CF do not form a consecutive range
|
||||
std::vector<int> cfs{1, 2, 1};
|
||||
std::vector<std::string> keys{"foo", "baz", "abc"};
|
||||
std::vector<std::string> values;
|
||||
|
||||
values =
|
||||
MultiGet(cfs, keys, /* snapshot */ nullptr, /* batched */ GetParam());
|
||||
|
||||
ASSERT_EQ(values.size(), 3);
|
||||
ASSERT_EQ(values[0], "bar");
|
||||
ASSERT_EQ(values[1], "xyz");
|
||||
ASSERT_EQ(values[2], "def");
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
|
||||
testing::Bool());
|
||||
|
||||
|
@ -2276,20 +2276,18 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
||||
multiget_cf_data;
|
||||
size_t cf_start = 0;
|
||||
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
KeyContext* key_ctx = sorted_keys[i];
|
||||
if (key_ctx->column_family != cf) {
|
||||
multiget_cf_data.emplace_back(
|
||||
MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
|
||||
multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr);
|
||||
cf_start = i;
|
||||
cf = key_ctx->column_family;
|
||||
}
|
||||
}
|
||||
{
|
||||
// multiget_cf_data.emplace_back(
|
||||
// MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
|
||||
multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
|
||||
}
|
||||
|
||||
multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
|
||||
|
||||
std::function<MultiGetColumnFamilyData*(
|
||||
autovector<MultiGetColumnFamilyData,
|
||||
MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
|
||||
@ -2349,7 +2347,7 @@ struct CompareKeyContext {
|
||||
static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
|
||||
uint32_t cfd_id1 = cfh->cfd()->GetID();
|
||||
const Comparator* comparator = cfh->cfd()->user_comparator();
|
||||
cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
|
||||
cfh = static_cast<ColumnFamilyHandleImpl*>(rhs->column_family);
|
||||
uint32_t cfd_id2 = cfh->cfd()->GetID();
|
||||
|
||||
if (cfd_id1 < cfd_id2) {
|
||||
@ -2373,39 +2371,24 @@ struct CompareKeyContext {
|
||||
void DBImpl::PrepareMultiGetKeys(
|
||||
size_t num_keys, bool sorted_input,
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
|
||||
#ifndef NDEBUG
|
||||
if (sorted_input) {
|
||||
for (size_t index = 0; index < sorted_keys->size(); ++index) {
|
||||
if (index > 0) {
|
||||
KeyContext* lhs = (*sorted_keys)[index - 1];
|
||||
KeyContext* rhs = (*sorted_keys)[index];
|
||||
ColumnFamilyHandleImpl* cfh =
|
||||
static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
|
||||
uint32_t cfd_id1 = cfh->cfd()->GetID();
|
||||
const Comparator* comparator = cfh->cfd()->user_comparator();
|
||||
cfh =
|
||||
static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
|
||||
uint32_t cfd_id2 = cfh->cfd()->GetID();
|
||||
#ifndef NDEBUG
|
||||
CompareKeyContext key_context_less;
|
||||
|
||||
assert(cfd_id1 <= cfd_id2);
|
||||
if (cfd_id1 < cfd_id2) {
|
||||
continue;
|
||||
}
|
||||
for (size_t index = 1; index < sorted_keys->size(); ++index) {
|
||||
const KeyContext* const lhs = (*sorted_keys)[index - 1];
|
||||
const KeyContext* const rhs = (*sorted_keys)[index];
|
||||
|
||||
// Both keys are from the same column family
|
||||
int cmp = comparator->CompareWithoutTimestamp(
|
||||
*(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
|
||||
assert(cmp <= 0);
|
||||
}
|
||||
index++;
|
||||
// lhs should be <= rhs, or in other words, rhs should NOT be < lhs
|
||||
assert(!key_context_less(rhs, lhs));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (!sorted_input) {
|
||||
CompareKeyContext sort_comparator;
|
||||
std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
|
||||
sort_comparator);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
|
||||
CompareKeyContext());
|
||||
}
|
||||
|
||||
void DBImpl::MultiGet(const ReadOptions& read_options,
|
||||
|
@ -590,6 +590,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
autovector<const autovector<MemTable*>*> mems_list;
|
||||
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
||||
autovector<FileMetaData*> tmp_file_meta;
|
||||
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
|
||||
committed_flush_jobs_info;
|
||||
for (int i = 0; i != num_cfs; ++i) {
|
||||
const auto& mems = jobs[i]->GetMemTables();
|
||||
if (!cfds[i]->IsDropped() && !mems.empty()) {
|
||||
@ -597,13 +599,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
mems_list.emplace_back(&mems);
|
||||
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
|
||||
tmp_file_meta.emplace_back(&file_meta[i]);
|
||||
#ifndef ROCKSDB_LITE
|
||||
committed_flush_jobs_info.emplace_back(
|
||||
jobs[i]->GetCommittedFlushJobsInfo());
|
||||
#endif //! ROCKSDB_LITE
|
||||
}
|
||||
}
|
||||
|
||||
s = InstallMemtableAtomicFlushResults(
|
||||
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
|
||||
versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
|
||||
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
|
||||
committed_flush_jobs_info, &job_context->memtables_to_free,
|
||||
directories_.GetDbDir(), log_buffer);
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
@ -3467,7 +3474,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
|
||||
if (UNLIKELY(sv_context->new_superversion == nullptr)) {
|
||||
sv_context->NewSuperVersion();
|
||||
}
|
||||
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
|
||||
cfd->InstallSuperVersion(sv_context, mutable_cf_options);
|
||||
|
||||
// There may be a small data race here. The snapshot tricking bottommost
|
||||
// compaction may already be released here. But assuming there will always be
|
||||
|
@ -675,6 +675,14 @@ class SpecialEnv : public EnvWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
Status RenameFile(const std::string& src, const std::string& dest) override {
|
||||
rename_count_.fetch_add(1);
|
||||
if (rename_error_.load(std::memory_order_acquire)) {
|
||||
return Status::NotSupported("Simulated `RenameFile()` error.");
|
||||
}
|
||||
return target()->RenameFile(src, dest);
|
||||
}
|
||||
|
||||
// Something to return when mocking current time
|
||||
const int64_t maybe_starting_time_;
|
||||
|
||||
@ -702,6 +710,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
// Force write to log files to fail while this pointer is non-nullptr
|
||||
std::atomic<bool> log_write_error_;
|
||||
|
||||
// Force `RenameFile()` to fail while this pointer is non-nullptr
|
||||
std::atomic<bool> rename_error_{false};
|
||||
|
||||
// Slow down every log write, in micro-seconds.
|
||||
std::atomic<int> log_write_slowdown_;
|
||||
|
||||
@ -745,6 +756,8 @@ class SpecialEnv : public EnvWrapper {
|
||||
|
||||
std::atomic<int> delete_count_;
|
||||
|
||||
std::atomic<int> rename_count_{0};
|
||||
|
||||
std::atomic<bool> is_wal_sync_thread_safe_{true};
|
||||
|
||||
std::atomic<size_t> compaction_readahead_size_{};
|
||||
|
@ -418,12 +418,19 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
|
||||
for (auto cfd : all_cfds) {
|
||||
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
|
||||
committed_flush_jobs_info;
|
||||
#ifndef ROCKSDB_LITE
|
||||
for (auto& job : flush_jobs) {
|
||||
committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
|
||||
}
|
||||
#endif //! ROCKSDB_LITE
|
||||
|
||||
Status s = InstallMemtableAtomicFlushResults(
|
||||
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
|
||||
versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
|
||||
&job_context.memtables_to_free, nullptr /* db_directory */,
|
||||
nullptr /* log_buffer */);
|
||||
committed_flush_jobs_info, &job_context.memtables_to_free,
|
||||
nullptr /* db_directory */, nullptr /* log_buffer */);
|
||||
ASSERT_OK(s);
|
||||
|
||||
mutex_.Unlock();
|
||||
|
@ -356,32 +356,38 @@ TEST_F(EventListenerTest, MultiCF) {
|
||||
#ifdef ROCKSDB_USING_THREAD_STATUS
|
||||
options.enable_thread_tracking = true;
|
||||
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||
TestFlushListener* listener = new TestFlushListener(options.env, this);
|
||||
options.listeners.emplace_back(listener);
|
||||
options.table_properties_collector_factories.push_back(
|
||||
std::make_shared<TestPropertiesCollectorFactory>());
|
||||
std::vector<std::string> cf_names = {
|
||||
"pikachu", "ilya", "muromec", "dobrynia",
|
||||
"nikitich", "alyosha", "popovich"};
|
||||
CreateAndReopenWithCF(cf_names, options);
|
||||
for (auto atomic_flush : {false, true}) {
|
||||
options.atomic_flush = atomic_flush;
|
||||
options.create_if_missing = true;
|
||||
DestroyAndReopen(options);
|
||||
TestFlushListener* listener = new TestFlushListener(options.env, this);
|
||||
options.listeners.emplace_back(listener);
|
||||
options.table_properties_collector_factories.push_back(
|
||||
std::make_shared<TestPropertiesCollectorFactory>());
|
||||
std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
|
||||
"dobrynia", "nikitich", "alyosha",
|
||||
"popovich"};
|
||||
CreateAndReopenWithCF(cf_names, options);
|
||||
|
||||
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
||||
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
|
||||
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
|
||||
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
|
||||
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
|
||||
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
|
||||
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
|
||||
for (int i = 1; i < 8; ++i) {
|
||||
ASSERT_OK(Flush(i));
|
||||
ASSERT_EQ(listener->flushed_dbs_.size(), i);
|
||||
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
|
||||
}
|
||||
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
||||
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
|
||||
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
|
||||
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
|
||||
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
|
||||
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
|
||||
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
|
||||
for (int i = 1; i < 8; ++i) {
|
||||
ASSERT_OK(Flush(i));
|
||||
ASSERT_EQ(listener->flushed_dbs_.size(), i);
|
||||
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
|
||||
}
|
||||
|
||||
// make sure callback functions are called in the right order
|
||||
for (size_t i = 0; i < cf_names.size(); i++) {
|
||||
ASSERT_EQ(listener->flushed_dbs_[i], db_);
|
||||
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
|
||||
// make sure callback functions are called in the right order
|
||||
for (size_t i = 0; i < cf_names.size(); i++) {
|
||||
ASSERT_EQ(listener->flushed_dbs_[i], db_);
|
||||
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
|
||||
}
|
||||
Close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -736,6 +736,8 @@ Status InstallMemtableAtomicFlushResults(
|
||||
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
|
||||
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
|
||||
const autovector<FileMetaData*>& file_metas,
|
||||
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||
committed_flush_jobs_info,
|
||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||
LogBuffer* log_buffer) {
|
||||
AutoThreadOperationStageUpdater stage_updater(
|
||||
@ -765,6 +767,17 @@ Status InstallMemtableAtomicFlushResults(
|
||||
(*mems_list[k])[i]->SetFlushCompleted(true);
|
||||
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (committed_flush_jobs_info[k]) {
|
||||
assert(!mems_list[k]->empty());
|
||||
assert((*mems_list[k])[0]);
|
||||
std::unique_ptr<FlushJobInfo> flush_job_info =
|
||||
(*mems_list[k])[0]->ReleaseFlushJobInfo();
|
||||
committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
|
||||
}
|
||||
#else //! ROCKSDB_LITE
|
||||
(void)committed_flush_jobs_info;
|
||||
#endif // ROCKSDB_LITE
|
||||
}
|
||||
|
||||
Status s;
|
||||
|
@ -140,6 +140,8 @@ class MemTableListVersion {
|
||||
const autovector<const autovector<MemTable*>*>& mems_list,
|
||||
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
|
||||
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
|
||||
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||
committed_flush_jobs_info,
|
||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
@ -402,6 +404,8 @@ class MemTableList {
|
||||
const autovector<const autovector<MemTable*>*>& mems_list,
|
||||
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
|
||||
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
|
||||
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||
committed_flush_jobs_info,
|
||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
@ -452,6 +456,8 @@ extern Status InstallMemtableAtomicFlushResults(
|
||||
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
|
||||
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
|
||||
const autovector<FileMetaData*>& file_meta,
|
||||
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||
committed_flush_jobs_info,
|
||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||
LogBuffer* log_buffer);
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -182,12 +182,21 @@ class MemTableListTest : public testing::Test {
|
||||
for (auto& meta : file_metas) {
|
||||
file_meta_ptrs.push_back(&meta);
|
||||
}
|
||||
std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
|
||||
committed_flush_jobs_info_storage(cf_ids.size());
|
||||
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
|
||||
committed_flush_jobs_info;
|
||||
for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
|
||||
committed_flush_jobs_info.push_back(
|
||||
&committed_flush_jobs_info_storage[i]);
|
||||
}
|
||||
|
||||
InstrumentedMutex mutex;
|
||||
InstrumentedMutexLock l(&mutex);
|
||||
return InstallMemtableAtomicFlushResults(
|
||||
&lists, cfds, mutable_cf_options_list, mems_list, &versions,
|
||||
nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr,
|
||||
&log_buffer);
|
||||
nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
|
||||
committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -9,10 +9,12 @@
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <regex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -109,6 +111,8 @@ class ObjectLibrary {
|
||||
// Adds the input entry to the list for the given type
|
||||
void AddEntry(const std::string& type, std::unique_ptr<Entry>& entry);
|
||||
|
||||
// Protects the entry map
|
||||
mutable std::mutex mu_;
|
||||
// ** FactoryFunctions for this loader, organized by type
|
||||
std::unordered_map<std::string, std::vector<std::unique_ptr<Entry>>> entries_;
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 23
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 3
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -296,12 +296,19 @@ Status CreateLoggerFromOptions(const std::string& dbname,
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
// Open a log file in the same directory as the db
|
||||
env->RenameFile(
|
||||
fname, OldInfoLogFileName(dbname, clock->NowMicros(), db_absolute_path,
|
||||
options.db_log_dir))
|
||||
.PermitUncheckedError();
|
||||
s = env->NewLogger(fname, logger);
|
||||
if (logger->get() != nullptr) {
|
||||
s = env->FileExists(fname);
|
||||
if (s.ok()) {
|
||||
s = env->RenameFile(
|
||||
fname, OldInfoLogFileName(dbname, clock->NowMicros(), db_absolute_path,
|
||||
options.db_log_dir));
|
||||
} else if (s.IsNotFound()) {
|
||||
// "LOG" is not required to exist since this could be a new DB.
|
||||
s = Status::OK();
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = env->NewLogger(fname, logger);
|
||||
}
|
||||
if (s.ok() && logger->get() != nullptr) {
|
||||
(*logger)->SetInfoLogLevel(options.info_log_level);
|
||||
}
|
||||
return s;
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "logging/logging.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -686,6 +687,50 @@ TEST_F(AutoRollLoggerTest, FileCreateFailure) {
|
||||
ASSERT_NOK(CreateLoggerFromOptions("", options, &logger));
|
||||
ASSERT_TRUE(!logger);
|
||||
}
|
||||
|
||||
TEST_F(AutoRollLoggerTest, RenameOnlyWhenExists) {
|
||||
InitTestDb();
|
||||
SpecialEnv env(Env::Default());
|
||||
Options options;
|
||||
options.env = &env;
|
||||
|
||||
// Originally no LOG exists. Should not see a rename.
|
||||
{
|
||||
std::shared_ptr<Logger> logger;
|
||||
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
|
||||
ASSERT_EQ(0, env.rename_count_);
|
||||
}
|
||||
|
||||
// Now a LOG exists. Create a new one should see a rename.
|
||||
{
|
||||
std::shared_ptr<Logger> logger;
|
||||
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
|
||||
ASSERT_EQ(1, env.rename_count_);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(AutoRollLoggerTest, RenameError) {
|
||||
InitTestDb();
|
||||
SpecialEnv env(Env::Default());
|
||||
env.rename_error_ = true;
|
||||
Options options;
|
||||
options.env = &env;
|
||||
|
||||
// Originally no LOG exists. Should not be impacted by rename error.
|
||||
{
|
||||
std::shared_ptr<Logger> logger;
|
||||
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
|
||||
ASSERT_TRUE(logger != nullptr);
|
||||
}
|
||||
|
||||
// Now a LOG exists. Rename error should cause failure.
|
||||
{
|
||||
std::shared_ptr<Logger> logger;
|
||||
ASSERT_NOK(CreateLoggerFromOptions(kTestDir, options, &logger));
|
||||
ASSERT_TRUE(logger == nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -1224,8 +1224,10 @@ inline bool LZ4HC_Compress(const CompressionInfo& info,
|
||||
const char* compression_dict_data =
|
||||
compression_dict.size() > 0 ? compression_dict.data() : nullptr;
|
||||
size_t compression_dict_size = compression_dict.size();
|
||||
LZ4_loadDictHC(stream, compression_dict_data,
|
||||
static_cast<int>(compression_dict_size));
|
||||
if (compression_dict_data != nullptr) {
|
||||
LZ4_loadDictHC(stream, compression_dict_data,
|
||||
static_cast<int>(compression_dict_size));
|
||||
}
|
||||
|
||||
#if LZ4_VERSION_NUMBER >= 10700 // r129+
|
||||
outlen =
|
||||
|
@ -15,6 +15,7 @@ namespace ROCKSDB_NAMESPACE {
|
||||
// Otherwise, nullptr is returned
|
||||
const ObjectLibrary::Entry *ObjectLibrary::FindEntry(
|
||||
const std::string &type, const std::string &name) const {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
auto entries = entries_.find(type);
|
||||
if (entries != entries_.end()) {
|
||||
for (const auto &entry : entries->second) {
|
||||
@ -28,11 +29,13 @@ const ObjectLibrary::Entry *ObjectLibrary::FindEntry(
|
||||
|
||||
void ObjectLibrary::AddEntry(const std::string &type,
|
||||
std::unique_ptr<Entry> &entry) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
auto &entries = entries_[type];
|
||||
entries.emplace_back(std::move(entry));
|
||||
}
|
||||
|
||||
size_t ObjectLibrary::GetFactoryCount(size_t *types) const {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
*types = entries_.size();
|
||||
size_t factories = 0;
|
||||
for (const auto &e : entries_) {
|
||||
@ -42,6 +45,7 @@ size_t ObjectLibrary::GetFactoryCount(size_t *types) const {
|
||||
}
|
||||
|
||||
void ObjectLibrary::Dump(Logger *logger) const {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
for (const auto &iter : entries_) {
|
||||
ROCKS_LOG_HEADER(logger, " Registered factories for type[%s] ",
|
||||
iter.first.c_str());
|
||||
|
Loading…
Reference in New Issue
Block a user