Summary: 1. Add override keyword to overridden virtual functions in EventListener 2. Fix a memory corruption that can happen during DB shutdown when in read-only mode due to a background write error 3. Fix uninitialized buffers in error_handler_test.cc that cause valgrind to complain Pull Request resolved: https://github.com/facebook/rocksdb/pull/4375 Differential Revision: D9875779 Pulled By: anand1976 fbshipit-source-id: 022ede1edc01a9f7e21ecf4c61ef7d46545d0640
This commit is contained in:
parent
8c25204633
commit
30c21df97c
@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
|
||||
// as well.
|
||||
use_custom_gc_(seq_per_batch),
|
||||
shutdown_initiated_(false),
|
||||
own_sfm_(options.sst_file_manager == nullptr),
|
||||
preserve_deletes_(options.preserve_deletes),
|
||||
closed_(false),
|
||||
error_handler_(this, immutable_db_options_, &mutex_) {
|
||||
@ -520,6 +521,17 @@ Status DBImpl::CloseHelper() {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
|
||||
LogFlush(immutable_db_options_.info_log);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// If the sst_file_manager was allocated by us during DB::Open(), ccall
|
||||
// Close() on it before closing the info_log. Otherwise, background thread
|
||||
// in SstFileManagerImpl might try to log something
|
||||
if (immutable_db_options_.sst_file_manager && own_sfm_) {
|
||||
auto sfm = static_cast<SstFileManagerImpl*>(
|
||||
immutable_db_options_.sst_file_manager.get());
|
||||
sfm->Close();
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
if (immutable_db_options_.info_log && own_info_log_) {
|
||||
Status s = immutable_db_options_.info_log->Close();
|
||||
if (ret.ok()) {
|
||||
|
@ -1541,6 +1541,9 @@ class DBImpl : public DB {
|
||||
// is set a little later during the shutdown after scheduling memtable
|
||||
// flushes
|
||||
bool shutdown_initiated_;
|
||||
// Flag to indicate whether sst_file_manager object was allocated in
|
||||
// DB::Open() or passed to us
|
||||
bool own_sfm_;
|
||||
|
||||
// Clients must periodically call SetPreserveDeletesSequenceNumber()
|
||||
// to advance this seqnum. Default value is 0 which means ALL deletes are
|
||||
|
@ -48,7 +48,8 @@ class ErrorHandlerListener : public EventListener {
|
||||
file_count_(0),
|
||||
fault_env_(nullptr) {}
|
||||
|
||||
void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /*ti*/) {
|
||||
void OnTableFileCreationStarted(
|
||||
const TableFileCreationBriefInfo& /*ti*/) override {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
file_creation_started_ = true;
|
||||
if (file_count_ > 0) {
|
||||
@ -61,13 +62,14 @@ class ErrorHandlerListener : public EventListener {
|
||||
}
|
||||
|
||||
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
|
||||
Status /*bg_error*/, bool* auto_recovery) {
|
||||
Status /*bg_error*/,
|
||||
bool* auto_recovery) override {
|
||||
if (*auto_recovery && no_auto_recovery_) {
|
||||
*auto_recovery = false;
|
||||
}
|
||||
}
|
||||
|
||||
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
|
||||
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
recovery_complete_ = true;
|
||||
cv_.SignalAll();
|
||||
@ -237,7 +239,6 @@ TEST_F(DBErrorHandlingTest, CorruptionError) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
#ifndef TRAVIS
|
||||
TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
@ -307,17 +308,16 @@ TEST_F(DBErrorHandlingTest, WALWriteError) {
|
||||
options.env = fault_env.get();
|
||||
options.listeners.emplace_back(listener);
|
||||
Status s;
|
||||
Random rnd(301);
|
||||
|
||||
listener->EnableAutoRecovery();
|
||||
DestroyAndReopen(options);
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
|
||||
for (auto i = 0; i<100; ++i) {
|
||||
sprintf(val, "%d", i);
|
||||
batch.Put(Key(i), Slice(val, sizeof(val)));
|
||||
batch.Put(Key(i), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
@ -327,12 +327,10 @@ TEST_F(DBErrorHandlingTest, WALWriteError) {
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
int write_error = 0;
|
||||
|
||||
for (auto i = 100; i<199; ++i) {
|
||||
sprintf(val, "%d", i);
|
||||
batch.Put(Key(i), Slice(val, sizeof(val)));
|
||||
batch.Put(Key(i), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
|
||||
@ -378,18 +376,17 @@ TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {
|
||||
options.env = fault_env.get();
|
||||
options.listeners.emplace_back(listener);
|
||||
Status s;
|
||||
Random rnd(301);
|
||||
|
||||
listener->EnableAutoRecovery();
|
||||
CreateAndReopenWithCF({"one", "two", "three"}, options);
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
|
||||
for (auto i = 1; i < 4; ++i) {
|
||||
for (auto j = 0; j < 100; ++j) {
|
||||
sprintf(val, "%d", j);
|
||||
batch.Put(handles_[i], Key(j), Slice(val, sizeof(val)));
|
||||
batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));
|
||||
}
|
||||
}
|
||||
|
||||
@ -400,13 +397,11 @@ TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
int write_error = 0;
|
||||
|
||||
// Write to one CF
|
||||
for (auto i = 100; i < 199; ++i) {
|
||||
sprintf(val, "%d", i);
|
||||
batch.Put(handles_[2], Key(i), Slice(val, sizeof(val)));
|
||||
batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
@ -462,6 +457,7 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
|
||||
std::vector<DB*> db;
|
||||
std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
|
||||
int kNumDbInstances = 3;
|
||||
Random rnd(301);
|
||||
|
||||
for (auto i = 0; i < kNumDbInstances; ++i) {
|
||||
listener.emplace_back(new ErrorHandlerListener());
|
||||
@ -489,11 +485,9 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
|
||||
|
||||
for (auto i = 0; i < kNumDbInstances; ++i) {
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
|
||||
for (auto j = 0; j <= 100; ++j) {
|
||||
sprintf(val, "%d", j);
|
||||
batch.Put(Key(j), Slice(val, sizeof(val)));
|
||||
batch.Put(Key(j), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
@ -505,12 +499,10 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
|
||||
def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
|
||||
for (auto i = 0; i < kNumDbInstances; ++i) {
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
|
||||
// Write to one CF
|
||||
for (auto j = 100; j < 199; ++j) {
|
||||
sprintf(val, "%d", j);
|
||||
batch.Put(Key(j), Slice(val, sizeof(val)));
|
||||
batch.Put(Key(j), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
@ -561,6 +553,7 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
|
||||
std::vector<DB*> db;
|
||||
std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
|
||||
int kNumDbInstances = 3;
|
||||
Random rnd(301);
|
||||
|
||||
for (auto i = 0; i < kNumDbInstances; ++i) {
|
||||
listener.emplace_back(new ErrorHandlerListener());
|
||||
@ -600,11 +593,9 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
|
||||
|
||||
for (auto i = 0; i < kNumDbInstances; ++i) {
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
|
||||
for (auto j = 0; j <= 100; ++j) {
|
||||
sprintf(val, "%d", j);
|
||||
batch.Put(Key(j), Slice(val, sizeof(val)));
|
||||
batch.Put(Key(j), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
@ -616,12 +607,10 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
|
||||
def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
|
||||
for (auto i = 0; i < kNumDbInstances; ++i) {
|
||||
WriteBatch batch;
|
||||
char val[1024];
|
||||
|
||||
// Write to one CF
|
||||
for (auto j = 100; j < 199; ++j) {
|
||||
sprintf(val, "%d", j);
|
||||
batch.Put(Key(j), Slice(val, sizeof(val)));
|
||||
batch.Put(Key(j), RandomString(&rnd, 1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
@ -682,7 +671,6 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
|
||||
options.clear();
|
||||
delete def_env;
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
|
@ -39,8 +39,15 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
|
||||
}
|
||||
|
||||
SstFileManagerImpl::~SstFileManagerImpl() {
|
||||
Close();
|
||||
}
|
||||
|
||||
void SstFileManagerImpl::Close() {
|
||||
{
|
||||
MutexLock l(&mu_);
|
||||
if (closing_) {
|
||||
return;
|
||||
}
|
||||
closing_ = true;
|
||||
cv_.SignalAll();
|
||||
}
|
||||
|
@ -121,6 +121,10 @@ class SstFileManagerImpl : public SstFileManager {
|
||||
|
||||
DeleteScheduler* delete_scheduler() { return &delete_scheduler_; }
|
||||
|
||||
// Stop the error recovery background thread. This should be called only
|
||||
// once in the object's lifetime, and before the destructor
|
||||
void Close();
|
||||
|
||||
private:
|
||||
// REQUIRES: mutex locked
|
||||
void OnAddFileImpl(const std::string& file_path, uint64_t file_size,
|
||||
|
Loading…
Reference in New Issue
Block a user