Compare commits

...

13 Commits

Author SHA1 Message Date
anand76
e51db47feb Fix a race in LRUCacheShard::Promote (#8717)
Summary:
In ```LRUCacheShard::Promote```, a reference is released outside the LRU mutex. Fix the race condition.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8717

Reviewed By: zhichao-cao

Differential Revision: D30649206

Pulled By: anand1976

fbshipit-source-id: 09c0af05b2294a7fe2c02876a61b0bad6e3ada61
2021-08-31 18:06:53 -07:00
Lucian Grijincu
c3034fce32 rocksdb: don't call LZ4_loadDictHC with null dictionary
Summary: UBSAN revealed a pointer underflow when `LZ4HC_init_internal` is called with a null `start`.

Reviewed By: ajkr

Differential Revision: D30181874

fbshipit-source-id: ca9bbac1a85c58782871d7f153af733b000cc66c
2021-08-09 16:13:49 -07:00
Andrew Kryczka
2dea3dd258 update HISTORY.md release date 2021-08-09 11:34:36 -07:00
Levi Tamasi
37c4996282 Fix the sorting of KeyContexts for batched MultiGet (#8633)
Summary:
`CompareKeyContext::operator()` on the trunk has a bug: when comparing
column family IDs, `lhs` is used for both sides of the comparison. This
results in the `KeyContext`s getting sorted solely based on key, which
in turn means that keys with the same column family do not necessarily
form a single range in the sorted list. This violates an assumption of the
batched `MultiGet` logic, leading to the same column family
showing up multiple times in the list of `MultiGetColumnFamilyData`.
The end result is the code attempting to check out the thread-local
`SuperVersion` for the same CF multiple times, causing an
assertion violation in debug builds and memory corruption/crash in
release builds.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8633

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D30169182

Pulled By: ltamasi

fbshipit-source-id: a47710652df7e95b14b40fb710924c11a8478023
2021-08-09 10:59:11 -07:00
Andrew Kryczka
53da604580 update HISTORY.md and version.h for 6.23.3 2021-08-04 17:27:29 -07:00
Andrew Kryczka
60f5a22cff Do not attempt to rename non-existent info log (#8622)
Summary:
Previously we attempted to rename "LOG" to "LOG.old.*" without checking
its existence first. "LOG" had no reason to exist in a new DB.

Errors in renaming a non-existent "LOG" were swallowed via
`PermitUncheckedError()` so things worked. However the storage service's
error monitoring was detecting all these benign rename failures. So it
is better to fix it. Also with this PR we can now distinguish rename failure
for other reasons and return them.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8622

Test Plan: new unit test

Reviewed By: akankshamahajan15

Differential Revision: D30115189

Pulled By: ajkr

fbshipit-source-id: e2f337ffb2bd171be0203172abc8e16e7809b170
2021-08-04 17:26:19 -07:00
Levi Tamasi
2898067de8 Mention PR 8585 in HISTORY.md 2021-08-04 13:47:17 -07:00
Levi Tamasi
238a9c3f68 Bump version number to 6.23.2 2021-08-04 13:07:49 -07:00
Yanqin Jin
2511b42c7e Fix NotifyOnFlushCompleted() for atomic flush (#8585)
Summary:
PR https://github.com/facebook/rocksdb/issues/5908 added `flush_jobs_info_` to `FlushJob` to make sure
`OnFlushCompleted()` is called after committing flush results to
MANIFEST. However, `flush_jobs_info_` is not updated in atomic
flush, causing `NotifyOnFlushCompleted()` to skip `OnFlushCompleted()`.

This PR fixes this, in a similar way to https://github.com/facebook/rocksdb/issues/5908 that handles regular flush.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8585

Test Plan: make check

Reviewed By: jay-zhuang

Differential Revision: D29913720

Pulled By: riversand963

fbshipit-source-id: 4ff023c98372fa2c93188d4a5c8a4e9ffa0f4dda
2021-08-04 11:24:41 -07:00
Levi Tamasi
1920121cef Mention PR 8605 in HISTORY.md (#8619)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8619

Reviewed By: riversand963

Differential Revision: D30081937

Pulled By: ltamasi

fbshipit-source-id: 57505957ae2c22d4b194aa28cb3fd261b3b39919
2021-08-04 11:20:31 -07:00
Levi Tamasi
7c393c3fc6 Fix a race in ColumnFamilyData::UnrefAndTryDelete (#8605)
Summary:
The `ColumnFamilyData::UnrefAndTryDelete` code currently on the trunk
unlocks the DB mutex before destroying the `ThreadLocalPtr` holding
the per-thread `SuperVersion` pointers when the only remaining reference
is the back reference from `super_version_`. The idea behind this was to
break the circular dependency between `ColumnFamilyData` and `SuperVersion`:
when the penultimate reference goes away, `ColumnFamilyData` can clean up
the `SuperVersion`, which can in turn clean up `ColumnFamilyData`. (Assuming there
is a `SuperVersion` and it is not referenced by anything else.) However,
unlocking the mutex throws a wrench in this plan by making it possible for another thread
to jump in and take another reference to the `ColumnFamilyData`, keeping the
object alive in a zombie `ThreadLocalPtr`-less state. This can cause issues like
https://github.com/facebook/rocksdb/issues/8440 ,
https://github.com/facebook/rocksdb/issues/8382 ,
and might also explain the `was_last_ref` assertion failures from the `ColumnFamilySet`
destructor we sometimes observe during close in our stress tests.

Digging through the archives, this unlocking goes way back to 2014 (or earlier). The original
rationale was that `SuperVersionUnrefHandle` used to lock the mutex so it can call
`SuperVersion::Cleanup`; however, this logic turned out to be deadlock-prone.
https://github.com/facebook/rocksdb/pull/3510 fixed the deadlock but left the
unlocking in place. https://github.com/facebook/rocksdb/pull/6147 then introduced
the circular dependency and associated cleanup logic described above (in order
to enable iterators to keep the `ColumnFamilyData` for dropped column families alive),
and moved the unlocking-relocking snippet to its present location in `UnrefAndTryDelete`.
Finally, https://github.com/facebook/rocksdb/pull/7749 fixed a memory leak but
apparently exacerbated the race by (otherwise correctly) switching to `UnrefAndTryDelete`
in `SuperVersion::Cleanup`.

The patch simply eliminates the unlocking and relocking, which has been unnecessary
ever since https://github.com/facebook/rocksdb/issues/3510 made `SuperVersionUnrefHandle` lock-free.
This closes the window during which another thread could increase the reference count,
and hopefully fixes the issues above.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8605

Test Plan: Ran `make check` and stress tests locally.

Reviewed By: pdillinger

Differential Revision: D30051035

Pulled By: ltamasi

fbshipit-source-id: 8fe559e4b4ad69fc142579f8bc393ef525918528
2021-08-04 11:00:18 -07:00
Jay Zhuang
7513e9011b Update HISTORY.md and version.h for 6.23.1 2021-07-22 13:46:33 -07:00
Jay Zhuang
bf117c54ef Fix an race condition during multiple DB opening (#8574)
Summary:
ObjectLibrary is shared between multiple DB instances, the
Register() could have race condition.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8574

Test Plan: pass the failed test

Reviewed By: ajkr

Differential Revision: D29855096

Pulled By: jay-zhuang

fbshipit-source-id: 541eed0bd495d2c963d858d81e7eabf1ba16153c
2021-07-22 13:43:40 -07:00
19 changed files with 239 additions and 104 deletions

View File

@ -1,4 +1,22 @@
# Rocksdb Change Log # Rocksdb Change Log
## 6.23.4 (2021-08-31)
### Bug Fixes
* Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache.
## 6.23.3 (2021-08-09)
### Bug Fixes
* Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file.
* Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
## 6.23.2 (2021-08-04)
### Bug Fixes
* Fixed a race related to the destruction of `ColumnFamilyData` objects. The earlier logic unlocked the DB mutex before destroying the thread-local `SuperVersion` pointers, which could result in a process crash if another thread managed to get a reference to the `ColumnFamilyData` object.
* Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
## 6.23.1 (2021-07-22)
### Bug Fixes
* Fix a race condition during multiple DB instances opening.
## 6.23.0 (2021-07-16) ## 6.23.0 (2021-07-16)
### Behavior Changes ### Behavior Changes
* Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present. * Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present.

9
cache/lru_cache.cc vendored
View File

@ -358,7 +358,10 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
if (handle == nullptr) { if (handle == nullptr) {
LRU_Insert(e); LRU_Insert(e);
} else { } else {
// If caller already holds a ref, no need to take one here
if (!e->HasRefs()) {
e->Ref(); e->Ref();
}
*handle = reinterpret_cast<Cache::Handle*>(e); *handle = reinterpret_cast<Cache::Handle*>(e);
} }
} }
@ -396,11 +399,7 @@ void LRUCacheShard::Promote(LRUHandle* e) {
if (e->value) { if (e->value) {
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(e); Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(e);
Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false); Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false);
if (s.ok()) { if (!s.ok()) {
// InsertItem would have taken a reference on the item, so decrement it
// here as we expect the caller to already hold a reference
e->Unref();
} else {
// Item is in memory, but not accounted against the cache capacity. // Item is in memory, but not accounted against the cache capacity.
// When the handle is released, the item should get deleted // When the handle is released, the item should get deleted
assert(!e->InCache()); assert(!e->InCache());

View File

@ -457,7 +457,7 @@ void SuperVersion::Cleanup() {
to_delete.push_back(m); to_delete.push_back(m);
} }
current->Unref(); current->Unref();
cfd->UnrefAndTryDelete(this); cfd->UnrefAndTryDelete();
} }
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem, void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
@ -475,10 +475,10 @@ void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
namespace { namespace {
void SuperVersionUnrefHandle(void* ptr) { void SuperVersionUnrefHandle(void* ptr) {
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
// destroyed. When former happens, the thread shouldn't see kSVInUse. // destroyed. When the former happens, the thread shouldn't see kSVInUse.
// When latter happens, we are in ~ColumnFamilyData(), no get should happen as // When the latter happens, only super_version_ holds a reference
// well. // to ColumnFamilyData, so no further queries are possible.
SuperVersion* sv = static_cast<SuperVersion*>(ptr); SuperVersion* sv = static_cast<SuperVersion*>(ptr);
bool was_last_ref __attribute__((__unused__)); bool was_last_ref __attribute__((__unused__));
was_last_ref = sv->Unref(); was_last_ref = sv->Unref();
@ -668,7 +668,7 @@ ColumnFamilyData::~ColumnFamilyData() {
} }
} }
bool ColumnFamilyData::UnrefAndTryDelete(SuperVersion* sv_under_cleanup) { bool ColumnFamilyData::UnrefAndTryDelete() {
int old_refs = refs_.fetch_sub(1); int old_refs = refs_.fetch_sub(1);
assert(old_refs > 0); assert(old_refs > 0);
@ -678,22 +678,17 @@ bool ColumnFamilyData::UnrefAndTryDelete(SuperVersion* sv_under_cleanup) {
return true; return true;
} }
// If called under SuperVersion::Cleanup, we should not re-enter Cleanup on if (old_refs == 2 && super_version_ != nullptr) {
// the same SuperVersion. (But while installing a new SuperVersion, this
// cfd could be referenced only by two SuperVersions.)
if (old_refs == 2 && super_version_ != nullptr &&
super_version_ != sv_under_cleanup) {
// Only the super_version_ holds me // Only the super_version_ holds me
SuperVersion* sv = super_version_; SuperVersion* sv = super_version_;
super_version_ = nullptr; super_version_ = nullptr;
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex. // Release SuperVersion references kept in ThreadLocalPtr.
sv->db_mutex->Unlock();
local_sv_.reset(); local_sv_.reset();
sv->db_mutex->Lock();
if (sv->Unref()) { if (sv->Unref()) {
// May delete this ColumnFamilyData after calling Cleanup() // Note: sv will delete this ColumnFamilyData during Cleanup()
assert(sv->cfd == this);
sv->Cleanup(); sv->Cleanup();
delete sv; delete sv;
return true; return true;
@ -1261,14 +1256,13 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
void ColumnFamilyData::InstallSuperVersion( void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) { SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld(); db_mutex->AssertHeld();
return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_); return InstallSuperVersion(sv_context, mutable_cf_options_);
} }
void ColumnFamilyData::InstallSuperVersion( void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
SuperVersion* new_superversion = sv_context->new_superversion.release(); SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options; new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(this, mem_, imm_.current(), current_); new_superversion->Init(this, mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_; SuperVersion* old_superversion = super_version_;

View File

@ -208,8 +208,6 @@ struct SuperVersion {
uint64_t version_number; uint64_t version_number;
WriteStallCondition write_stall_condition; WriteStallCondition write_stall_condition;
InstrumentedMutex* db_mutex;
// should be called outside the mutex // should be called outside the mutex
SuperVersion() = default; SuperVersion() = default;
~SuperVersion(); ~SuperVersion();
@ -281,8 +279,7 @@ class ColumnFamilyData {
// UnrefAndTryDelete() decreases the reference count and do free if needed, // UnrefAndTryDelete() decreases the reference count and do free if needed,
// return true if this is freed else false, UnrefAndTryDelete() can only // return true if this is freed else false, UnrefAndTryDelete() can only
// be called while holding a DB mutex, or during single-threaded recovery. // be called while holding a DB mutex, or during single-threaded recovery.
// sv_under_cleanup is only provided when called from SuperVersion::Cleanup. bool UnrefAndTryDelete();
bool UnrefAndTryDelete(SuperVersion* sv_under_cleanup = nullptr);
// SetDropped() can only be called under following conditions: // SetDropped() can only be called under following conditions:
// 1) Holding a DB mutex, // 1) Holding a DB mutex,
@ -454,7 +451,6 @@ class ColumnFamilyData {
// the clients to allocate SuperVersion outside of mutex. // the clients to allocate SuperVersion outside of mutex.
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion() // IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
void InstallSuperVersion(SuperVersionContext* sv_context, void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options);
void InstallSuperVersion(SuperVersionContext* sv_context, void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex); InstrumentedMutex* db_mutex);

View File

@ -1362,6 +1362,28 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
} }
} }
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"one", "two"}, options);
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(2, "baz", "xyz"));
ASSERT_OK(Put(1, "abc", "def"));
// Note: keys for the same CF do not form a consecutive range
std::vector<int> cfs{1, 2, 1};
std::vector<std::string> keys{"foo", "baz", "abc"};
std::vector<std::string> values;
values =
MultiGet(cfs, keys, /* snapshot */ nullptr, /* batched */ GetParam());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(values[0], "bar");
ASSERT_EQ(values[1], "xyz");
ASSERT_EQ(values[2], "def");
}
INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
testing::Bool()); testing::Bool());

View File

@ -2276,20 +2276,18 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
multiget_cf_data; multiget_cf_data;
size_t cf_start = 0; size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family; ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
KeyContext* key_ctx = sorted_keys[i]; KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) { if (key_ctx->column_family != cf) {
multiget_cf_data.emplace_back( multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr);
MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
cf_start = i; cf_start = i;
cf = key_ctx->column_family; cf = key_ctx->column_family;
} }
} }
{
// multiget_cf_data.emplace_back(
// MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr); multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
}
std::function<MultiGetColumnFamilyData*( std::function<MultiGetColumnFamilyData*(
autovector<MultiGetColumnFamilyData, autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator&)> MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
@ -2349,7 +2347,7 @@ struct CompareKeyContext {
static_cast<ColumnFamilyHandleImpl*>(lhs->column_family); static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
uint32_t cfd_id1 = cfh->cfd()->GetID(); uint32_t cfd_id1 = cfh->cfd()->GetID();
const Comparator* comparator = cfh->cfd()->user_comparator(); const Comparator* comparator = cfh->cfd()->user_comparator();
cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family); cfh = static_cast<ColumnFamilyHandleImpl*>(rhs->column_family);
uint32_t cfd_id2 = cfh->cfd()->GetID(); uint32_t cfd_id2 = cfh->cfd()->GetID();
if (cfd_id1 < cfd_id2) { if (cfd_id1 < cfd_id2) {
@ -2373,39 +2371,24 @@ struct CompareKeyContext {
void DBImpl::PrepareMultiGetKeys( void DBImpl::PrepareMultiGetKeys(
size_t num_keys, bool sorted_input, size_t num_keys, bool sorted_input,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) { autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
#ifndef NDEBUG
if (sorted_input) { if (sorted_input) {
for (size_t index = 0; index < sorted_keys->size(); ++index) { #ifndef NDEBUG
if (index > 0) { CompareKeyContext key_context_less;
KeyContext* lhs = (*sorted_keys)[index - 1];
KeyContext* rhs = (*sorted_keys)[index];
ColumnFamilyHandleImpl* cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
uint32_t cfd_id1 = cfh->cfd()->GetID();
const Comparator* comparator = cfh->cfd()->user_comparator();
cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
uint32_t cfd_id2 = cfh->cfd()->GetID();
assert(cfd_id1 <= cfd_id2); for (size_t index = 1; index < sorted_keys->size(); ++index) {
if (cfd_id1 < cfd_id2) { const KeyContext* const lhs = (*sorted_keys)[index - 1];
continue; const KeyContext* const rhs = (*sorted_keys)[index];
}
// Both keys are from the same column family // lhs should be <= rhs, or in other words, rhs should NOT be < lhs
int cmp = comparator->CompareWithoutTimestamp( assert(!key_context_less(rhs, lhs));
*(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
assert(cmp <= 0);
}
index++;
}
} }
#endif #endif
if (!sorted_input) {
CompareKeyContext sort_comparator; return;
std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
sort_comparator);
} }
std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
CompareKeyContext());
} }
void DBImpl::MultiGet(const ReadOptions& read_options, void DBImpl::MultiGet(const ReadOptions& read_options,

View File

@ -590,6 +590,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<const autovector<MemTable*>*> mems_list; autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list; autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta; autovector<FileMetaData*> tmp_file_meta;
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i != num_cfs; ++i) { for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i]->GetMemTables(); const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) { if (!cfds[i]->IsDropped() && !mems.empty()) {
@ -597,13 +599,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
mems_list.emplace_back(&mems); mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]); mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
tmp_file_meta.emplace_back(&file_meta[i]); tmp_file_meta.emplace_back(&file_meta[i]);
#ifndef ROCKSDB_LITE
committed_flush_jobs_info.emplace_back(
jobs[i]->GetCommittedFlushJobsInfo());
#endif //! ROCKSDB_LITE
} }
} }
s = InstallMemtableAtomicFlushResults( s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta, versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer); committed_flush_jobs_info, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
} }
if (s.ok()) { if (s.ok()) {
@ -3467,7 +3474,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
if (UNLIKELY(sv_context->new_superversion == nullptr)) { if (UNLIKELY(sv_context->new_superversion == nullptr)) {
sv_context->NewSuperVersion(); sv_context->NewSuperVersion();
} }
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); cfd->InstallSuperVersion(sv_context, mutable_cf_options);
// There may be a small data race here. The snapshot tricking bottommost // There may be a small data race here. The snapshot tricking bottommost
// compaction may already be released here. But assuming there will always be // compaction may already be released here. But assuming there will always be

View File

@ -675,6 +675,14 @@ class SpecialEnv : public EnvWrapper {
} }
} }
Status RenameFile(const std::string& src, const std::string& dest) override {
rename_count_.fetch_add(1);
if (rename_error_.load(std::memory_order_acquire)) {
return Status::NotSupported("Simulated `RenameFile()` error.");
}
return target()->RenameFile(src, dest);
}
// Something to return when mocking current time // Something to return when mocking current time
const int64_t maybe_starting_time_; const int64_t maybe_starting_time_;
@ -702,6 +710,9 @@ class SpecialEnv : public EnvWrapper {
// Force write to log files to fail while this pointer is non-nullptr // Force write to log files to fail while this pointer is non-nullptr
std::atomic<bool> log_write_error_; std::atomic<bool> log_write_error_;
// Force `RenameFile()` to fail while this pointer is non-nullptr
std::atomic<bool> rename_error_{false};
// Slow down every log write, in micro-seconds. // Slow down every log write, in micro-seconds.
std::atomic<int> log_write_slowdown_; std::atomic<int> log_write_slowdown_;
@ -745,6 +756,8 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int> delete_count_; std::atomic<int> delete_count_;
std::atomic<int> rename_count_{0};
std::atomic<bool> is_wal_sync_thread_safe_{true}; std::atomic<bool> is_wal_sync_thread_safe_{true};
std::atomic<size_t> compaction_readahead_size_{}; std::atomic<size_t> compaction_readahead_size_{};

View File

@ -418,12 +418,19 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
for (auto cfd : all_cfds) { for (auto cfd : all_cfds) {
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
} }
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
#ifndef ROCKSDB_LITE
for (auto& job : flush_jobs) {
committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
}
#endif //! ROCKSDB_LITE
Status s = InstallMemtableAtomicFlushResults( Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs, versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
&job_context.memtables_to_free, nullptr /* db_directory */, committed_flush_jobs_info, &job_context.memtables_to_free,
nullptr /* log_buffer */); nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s); ASSERT_OK(s);
mutex_.Unlock(); mutex_.Unlock();

View File

@ -356,13 +356,17 @@ TEST_F(EventListenerTest, MultiCF) {
#ifdef ROCKSDB_USING_THREAD_STATUS #ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true; options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
for (auto atomic_flush : {false, true}) {
options.atomic_flush = atomic_flush;
options.create_if_missing = true;
DestroyAndReopen(options);
TestFlushListener* listener = new TestFlushListener(options.env, this); TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back( options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>()); std::make_shared<TestPropertiesCollectorFactory>());
std::vector<std::string> cf_names = { std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
"pikachu", "ilya", "muromec", "dobrynia", "dobrynia", "nikitich", "alyosha",
"nikitich", "alyosha", "popovich"}; "popovich"};
CreateAndReopenWithCF(cf_names, options); CreateAndReopenWithCF(cf_names, options);
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
@ -383,6 +387,8 @@ TEST_F(EventListenerTest, MultiCF) {
ASSERT_EQ(listener->flushed_dbs_[i], db_); ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
} }
Close();
}
} }
TEST_F(EventListenerTest, MultiDBMultiListeners) { TEST_F(EventListenerTest, MultiDBMultiListeners) {

View File

@ -736,6 +736,8 @@ Status InstallMemtableAtomicFlushResults(
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset, const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_metas, const autovector<FileMetaData*>& file_metas,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer) { LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
@ -765,6 +767,17 @@ Status InstallMemtableAtomicFlushResults(
(*mems_list[k])[i]->SetFlushCompleted(true); (*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber()); (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
} }
#ifndef ROCKSDB_LITE
if (committed_flush_jobs_info[k]) {
assert(!mems_list[k]->empty());
assert((*mems_list[k])[0]);
std::unique_ptr<FlushJobInfo> flush_job_info =
(*mems_list[k])[0]->ReleaseFlushJobInfo();
committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
}
#else //! ROCKSDB_LITE
(void)committed_flush_jobs_info;
#endif // ROCKSDB_LITE
} }
Status s; Status s;

View File

@ -140,6 +140,8 @@ class MemTableListVersion {
const autovector<const autovector<MemTable*>*>& mems_list, const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker, VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta, InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer);
@ -402,6 +404,8 @@ class MemTableList {
const autovector<const autovector<MemTable*>*>& mems_list, const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker, VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta, InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer);
@ -452,6 +456,8 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset, const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory, autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -182,12 +182,21 @@ class MemTableListTest : public testing::Test {
for (auto& meta : file_metas) { for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta); file_meta_ptrs.push_back(&meta);
} }
std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
committed_flush_jobs_info_storage(cf_ids.size());
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
committed_flush_jobs_info.push_back(
&committed_flush_jobs_info_storage[i]);
}
InstrumentedMutex mutex; InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex); InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults( return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &lists, cfds, mutable_cf_options_list, mems_list, &versions,
nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr, nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
&log_buffer); committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
} }
}; };

View File

@ -9,10 +9,12 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex>
#include <regex> #include <regex>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "rocksdb/status.h" #include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -109,6 +111,8 @@ class ObjectLibrary {
// Adds the input entry to the list for the given type // Adds the input entry to the list for the given type
void AddEntry(const std::string& type, std::unique_ptr<Entry>& entry); void AddEntry(const std::string& type, std::unique_ptr<Entry>& entry);
// Protects the entry map
mutable std::mutex mu_;
// ** FactoryFunctions for this loader, organized by type // ** FactoryFunctions for this loader, organized by type
std::unordered_map<std::string, std::vector<std::unique_ptr<Entry>>> entries_; std::unordered_map<std::string, std::vector<std::unique_ptr<Entry>>> entries_;

View File

@ -11,7 +11,7 @@
#define ROCKSDB_MAJOR 6 #define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 23 #define ROCKSDB_MINOR 23
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 3
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -296,12 +296,19 @@ Status CreateLoggerFromOptions(const std::string& dbname,
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
// Open a log file in the same directory as the db // Open a log file in the same directory as the db
env->RenameFile( s = env->FileExists(fname);
if (s.ok()) {
s = env->RenameFile(
fname, OldInfoLogFileName(dbname, clock->NowMicros(), db_absolute_path, fname, OldInfoLogFileName(dbname, clock->NowMicros(), db_absolute_path,
options.db_log_dir)) options.db_log_dir));
.PermitUncheckedError(); } else if (s.IsNotFound()) {
// "LOG" is not required to exist since this could be a new DB.
s = Status::OK();
}
if (s.ok()) {
s = env->NewLogger(fname, logger); s = env->NewLogger(fname, logger);
if (logger->get() != nullptr) { }
if (s.ok() && logger->get() != nullptr) {
(*logger)->SetInfoLogLevel(options.info_log_level); (*logger)->SetInfoLogLevel(options.info_log_level);
} }
return s; return s;

View File

@ -19,6 +19,7 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "db/db_test_util.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -686,6 +687,50 @@ TEST_F(AutoRollLoggerTest, FileCreateFailure) {
ASSERT_NOK(CreateLoggerFromOptions("", options, &logger)); ASSERT_NOK(CreateLoggerFromOptions("", options, &logger));
ASSERT_TRUE(!logger); ASSERT_TRUE(!logger);
} }
TEST_F(AutoRollLoggerTest, RenameOnlyWhenExists) {
InitTestDb();
SpecialEnv env(Env::Default());
Options options;
options.env = &env;
// Originally no LOG exists. Should not see a rename.
{
std::shared_ptr<Logger> logger;
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
ASSERT_EQ(0, env.rename_count_);
}
// Now a LOG exists. Create a new one should see a rename.
{
std::shared_ptr<Logger> logger;
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
ASSERT_EQ(1, env.rename_count_);
}
}
TEST_F(AutoRollLoggerTest, RenameError) {
InitTestDb();
SpecialEnv env(Env::Default());
env.rename_error_ = true;
Options options;
options.env = &env;
// Originally no LOG exists. Should not be impacted by rename error.
{
std::shared_ptr<Logger> logger;
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
ASSERT_TRUE(logger != nullptr);
}
// Now a LOG exists. Rename error should cause failure.
{
std::shared_ptr<Logger> logger;
ASSERT_NOK(CreateLoggerFromOptions(kTestDir, options, &logger));
ASSERT_TRUE(logger == nullptr);
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -1224,8 +1224,10 @@ inline bool LZ4HC_Compress(const CompressionInfo& info,
const char* compression_dict_data = const char* compression_dict_data =
compression_dict.size() > 0 ? compression_dict.data() : nullptr; compression_dict.size() > 0 ? compression_dict.data() : nullptr;
size_t compression_dict_size = compression_dict.size(); size_t compression_dict_size = compression_dict.size();
if (compression_dict_data != nullptr) {
LZ4_loadDictHC(stream, compression_dict_data, LZ4_loadDictHC(stream, compression_dict_data,
static_cast<int>(compression_dict_size)); static_cast<int>(compression_dict_size));
}
#if LZ4_VERSION_NUMBER >= 10700 // r129+ #if LZ4_VERSION_NUMBER >= 10700 // r129+
outlen = outlen =

View File

@ -15,6 +15,7 @@ namespace ROCKSDB_NAMESPACE {
// Otherwise, nullptr is returned // Otherwise, nullptr is returned
const ObjectLibrary::Entry *ObjectLibrary::FindEntry( const ObjectLibrary::Entry *ObjectLibrary::FindEntry(
const std::string &type, const std::string &name) const { const std::string &type, const std::string &name) const {
std::unique_lock<std::mutex> lock(mu_);
auto entries = entries_.find(type); auto entries = entries_.find(type);
if (entries != entries_.end()) { if (entries != entries_.end()) {
for (const auto &entry : entries->second) { for (const auto &entry : entries->second) {
@ -28,11 +29,13 @@ const ObjectLibrary::Entry *ObjectLibrary::FindEntry(
void ObjectLibrary::AddEntry(const std::string &type, void ObjectLibrary::AddEntry(const std::string &type,
std::unique_ptr<Entry> &entry) { std::unique_ptr<Entry> &entry) {
std::unique_lock<std::mutex> lock(mu_);
auto &entries = entries_[type]; auto &entries = entries_[type];
entries.emplace_back(std::move(entry)); entries.emplace_back(std::move(entry));
} }
size_t ObjectLibrary::GetFactoryCount(size_t *types) const { size_t ObjectLibrary::GetFactoryCount(size_t *types) const {
std::unique_lock<std::mutex> lock(mu_);
*types = entries_.size(); *types = entries_.size();
size_t factories = 0; size_t factories = 0;
for (const auto &e : entries_) { for (const auto &e : entries_) {
@ -42,6 +45,7 @@ size_t ObjectLibrary::GetFactoryCount(size_t *types) const {
} }
void ObjectLibrary::Dump(Logger *logger) const { void ObjectLibrary::Dump(Logger *logger) const {
std::unique_lock<std::mutex> lock(mu_);
for (const auto &iter : entries_) { for (const auto &iter : entries_) {
ROCKS_LOG_HEADER(logger, " Registered factories for type[%s] ", ROCKS_LOG_HEADER(logger, " Registered factories for type[%s] ",
iter.first.c_str()); iter.first.c_str());