Add a Close() method to DB to return status when closing a db

Summary:
Currently, the only way to close an open DB is to destroy the DB
object. There is no way for the caller to know the status. In one
instance, the destructor encountered an error due to failure to
close a log file on HDFS. In order to prevent silent failures, we add
DB::Close() that calls CloseImpl() which must be implemented by its
descendants.
The main failure point in the destructor is closing the log file. This
patch also adds a Close() entry point to Logger in order to get status.
When DBOptions::info_log is allocated and owned by the DBImpl, it is
explicitly closed by DBImpl::CloseImpl().
Closes https://github.com/facebook/rocksdb/pull/3348

Differential Revision: D6698158

Pulled By: anand1976

fbshipit-source-id: 9468e2892553eb09c4c41b8723f590c0dbd8ab7d
This commit is contained in:
Anand Ananthabhotla 2018-01-16 10:57:56 -08:00 committed by Facebook Github Bot
parent 68829ed89c
commit d0f1b49ab6
12 changed files with 174 additions and 14 deletions

View File

@ -847,6 +847,55 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) {
} }
#endif #endif
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* base) : EnvWrapper(base) { };
class TestLogger : public Logger {
public:
using Logger::Logv;
virtual void Logv(const char *format, va_list ap) override { };
private:
virtual Status CloseImpl() override {
return Status::NotSupported();
}
};
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) {
result->reset(new TestLogger());
return Status::OK();
}
};
TEST_F(DBBasicTest, DBClose) {
Options options = GetDefaultOptions();
std::string dbname = test::TmpDir(env_) + "/db_close_test";
ASSERT_OK(DestroyDB(dbname, options));
DB* db = nullptr;
options.create_if_missing = true;
options.env = new TestEnv(Env::Default());
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
s = db->Close();
ASSERT_EQ(s, Status::NotSupported());
delete db;
// Provide our own logger and ensure DB::Close() does not close it
options.info_log.reset(new TestEnv::TestLogger());
options.create_if_missing = false;
s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
s = db->Close();
ASSERT_EQ(s, Status::OK());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -141,6 +141,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
const bool seq_per_batch) const bool seq_per_batch)
: env_(options.env), : env_(options.env),
dbname_(dbname), dbname_(dbname),
own_info_log_(options.info_log == nullptr),
initial_db_options_(SanitizeOptions(dbname, options)), initial_db_options_(SanitizeOptions(dbname, options)),
immutable_db_options_(initial_db_options_), immutable_db_options_(initial_db_options_),
mutable_db_options_(initial_db_options_), mutable_db_options_(initial_db_options_),
@ -212,7 +213,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// requires a custom gc for compaction, we use that to set use_custom_gc_ // requires a custom gc for compaction, we use that to set use_custom_gc_
// as well. // as well.
use_custom_gc_(seq_per_batch), use_custom_gc_(seq_per_batch),
preserve_deletes_(options.preserve_deletes) { preserve_deletes_(options.preserve_deletes),
closed_(false) {
env_->GetAbsolutePath(dbname, &db_absolute_path_); env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.
@ -275,7 +277,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
} }
} }
DBImpl::~DBImpl() { Status DBImpl::CloseImpl() {
// CancelAllBackgroundWork called with false means we just set the shutdown // CancelAllBackgroundWork called with false means we just set the shutdown
// marker. After this we do a variant of the waiting and unschedule work // marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true)) // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
@ -378,8 +380,16 @@ DBImpl::~DBImpl() {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
LogFlush(immutable_db_options_.info_log); LogFlush(immutable_db_options_.info_log);
Status s = Status::OK();
if (immutable_db_options_.info_log && own_info_log_) {
s = immutable_db_options_.info_log->Close();
}
return s;
} }
DBImpl::~DBImpl() { Close(); }
void DBImpl::MaybeIgnoreError(Status* s) const { void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || immutable_db_options_.paranoid_checks) { if (s->ok() || immutable_db_options_.paranoid_checks) {
// No change needed // No change needed
@ -2320,7 +2330,15 @@ Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
return Status::OK(); return Status::OK();
} }
DB::~DB() { } DB::~DB() {}
Status DBImpl::Close() {
if (!closed_) {
closed_ = true;
return CloseImpl();
}
return Status::OK();
}
Status DB::ListColumnFamilies(const DBOptions& db_options, Status DB::ListColumnFamilies(const DBOptions& db_options,
const std::string& name, const std::string& name,

View File

@ -611,10 +611,14 @@ class DBImpl : public DB {
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch); const bool seq_per_batch);
virtual Status Close() override;
protected: protected:
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
unique_ptr<VersionSet> versions_; unique_ptr<VersionSet> versions_;
// Flag to check whether we allocated and own the info log file
bool own_info_log_;
const DBOptions initial_db_options_; const DBOptions initial_db_options_;
const ImmutableDBOptions immutable_db_options_; const ImmutableDBOptions immutable_db_options_;
MutableDBOptions mutable_db_options_; MutableDBOptions mutable_db_options_;
@ -912,6 +916,9 @@ class DBImpl : public DB {
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;
// Actual implementation of Close()
virtual Status CloseImpl();
// table_cache_ provides its own synchronization // table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_; std::shared_ptr<Cache> table_cache_;
@ -1359,6 +1366,9 @@ class DBImpl : public DB {
// is set to false. // is set to false.
std::atomic<SequenceNumber> preserve_deletes_seqnum_; std::atomic<SequenceNumber> preserve_deletes_seqnum_;
const bool preserve_deletes_; const bool preserve_deletes_;
// Flag to check whether Close() has been called on this DB
bool closed_;
}; };
extern Options SanitizeOptions(const std::string& db, extern Options SanitizeOptions(const std::string& db,

View File

@ -2191,6 +2191,8 @@ class ModelDB : public DB {
batch.Put(cf, k, v); batch.Put(cf, k, v);
return Write(o, &batch); return Write(o, &batch);
} }
using DB::Close;
virtual Status Close() { return Status::OK(); }
using DB::Delete; using DB::Delete;
virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf, virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& key) override { const Slice& key) override {

11
env/env.cc vendored
View File

@ -73,9 +73,18 @@ RandomAccessFile::~RandomAccessFile() {
WritableFile::~WritableFile() { WritableFile::~WritableFile() {
} }
Logger::~Logger() { Logger::~Logger() { Close(); }
Status Logger::Close() {
if (!closed_) {
closed_ = true;
return CloseImpl();
}
return Status::OK();
} }
Status Logger::CloseImpl() { return Status::OK(); }
FileLock::~FileLock() { FileLock::~FileLock() {
} }

16
env/env_hdfs.cc vendored
View File

@ -277,6 +277,16 @@ class HdfsLogger : public Logger {
HdfsWritableFile* file_; HdfsWritableFile* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
virtual Status CloseImpl() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
Status s = file_->Close();
if (mylog != nullptr && mylog == this) {
mylog = nullptr;
}
return s;
}
public: public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
: file_(f), gettid_(gettid) { : file_(f), gettid_(gettid) {
@ -285,12 +295,6 @@ class HdfsLogger : public Logger {
} }
virtual ~HdfsLogger() { virtual ~HdfsLogger() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
delete file_;
if (mylog != nullptr && mylog == this) {
mylog = nullptr;
}
} }
virtual void Logv(const char* format, va_list ap) { virtual void Logv(const char* format, va_list ap) {

14
env/posix_logger.h vendored
View File

@ -24,6 +24,7 @@
#endif #endif
#include <atomic> #include <atomic>
#include "env/io_posix.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -32,6 +33,15 @@ namespace rocksdb {
class PosixLogger : public Logger { class PosixLogger : public Logger {
private: private:
virtual Status CloseImpl() override {
int ret;
ret = fclose(file_);
if (ret) {
return IOError("Unable to close log file", "", ret);
}
return Status::OK();
}
FILE* file_; FILE* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_; std::atomic_size_t log_size_;
@ -51,9 +61,7 @@ class PosixLogger : public Logger {
last_flush_micros_(0), last_flush_micros_(0),
env_(env), env_(env),
flush_pending_(false) {} flush_pending_(false) {}
virtual ~PosixLogger() { virtual ~PosixLogger() { Close(); }
fclose(file_);
}
virtual void Flush() override { virtual void Flush() override {
TEST_SYNC_POINT("PosixLogger::Flush:Begin1"); TEST_SYNC_POINT("PosixLogger::Flush:Begin1");
TEST_SYNC_POINT("PosixLogger::Flush:Begin2"); TEST_SYNC_POINT("PosixLogger::Flush:Begin2");

View File

@ -163,6 +163,12 @@ class DB {
const std::vector<ColumnFamilyDescriptor>& column_families, const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr); std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
// Close the DB by releasing resources, closing files etc. This should be
// called before calling the desctructor so that the caller can get back a
// status in case there are any errors. Regardless of the return status, the
// DB must be freed
virtual Status Close() { return Status::OK(); }
// ListColumnFamilies will open the DB specified by argument name // ListColumnFamilies will open the DB specified by argument name
// and return the list of all column families in that DB // and return the list of all column families in that DB
// through column_families argument. The ordering of // through column_families argument. The ordering of

View File

@ -819,9 +819,12 @@ class Logger {
size_t kDoNotSupportGetLogFileSize = (std::numeric_limits<size_t>::max)(); size_t kDoNotSupportGetLogFileSize = (std::numeric_limits<size_t>::max)();
explicit Logger(const InfoLogLevel log_level = InfoLogLevel::INFO_LEVEL) explicit Logger(const InfoLogLevel log_level = InfoLogLevel::INFO_LEVEL)
: log_level_(log_level) {} : closed_(false), log_level_(log_level) {}
virtual ~Logger(); virtual ~Logger();
// Close the log file. Must be called before destructor
virtual Status Close();
// Write a header to the log file with the specified format // Write a header to the log file with the specified format
// It is recommended that you log all header information at the start of the // It is recommended that you log all header information at the start of the
// application. But it is not enforced. // application. But it is not enforced.
@ -852,6 +855,8 @@ class Logger {
// No copying allowed // No copying allowed
Logger(const Logger&); Logger(const Logger&);
void operator=(const Logger&); void operator=(const Logger&);
virtual Status CloseImpl();
bool closed_;
InfoLogLevel log_level_; InfoLogLevel log_level_;
}; };

View File

@ -25,6 +25,8 @@ class StackableDB : public DB {
delete db_; delete db_;
} }
virtual Status Close() override { return db_->Close(); }
virtual DB* GetBaseDB() { virtual DB* GetBaseDB() {
return db_; return db_;
} }

View File

@ -103,6 +103,14 @@ class AutoRollLogger : public Logger {
std::string ValistToString(const char* format, va_list args) const; std::string ValistToString(const char* format, va_list args) const;
// Write the logs marked as headers to the new log file // Write the logs marked as headers to the new log file
void WriteHeaderInfo(); void WriteHeaderInfo();
// Implementation of Close()
virtual Status CloseImpl() override {
if (logger_) {
return logger_->Close();
} else {
return Status::OK();
}
}
std::string log_fname_; // Current active info log's file name. std::string log_fname_; // Current active info log's file name.
std::string dbname_; std::string dbname_;

View File

@ -354,6 +354,45 @@ TEST_F(AutoRollLoggerTest, InfoLogLevel) {
inFile.close(); inFile.close();
} }
TEST_F(AutoRollLoggerTest, Close) {
InitTestDb();
size_t log_size = 8192;
size_t log_lines = 0;
AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0);
for (int log_level = InfoLogLevel::HEADER_LEVEL;
log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) {
logger.SetInfoLogLevel((InfoLogLevel)log_level);
for (int log_type = InfoLogLevel::DEBUG_LEVEL;
log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) {
// log messages with log level smaller than log_level will not be
// logged.
LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str());
}
log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1;
}
for (int log_level = InfoLogLevel::HEADER_LEVEL;
log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) {
logger.SetInfoLogLevel((InfoLogLevel)log_level);
// again, messages with level smaller than log_level will not be logged.
ROCKS_LOG_HEADER(&logger, "%s", kSampleMessage.c_str());
ROCKS_LOG_DEBUG(&logger, "%s", kSampleMessage.c_str());
ROCKS_LOG_INFO(&logger, "%s", kSampleMessage.c_str());
ROCKS_LOG_WARN(&logger, "%s", kSampleMessage.c_str());
ROCKS_LOG_ERROR(&logger, "%s", kSampleMessage.c_str());
ROCKS_LOG_FATAL(&logger, "%s", kSampleMessage.c_str());
log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1;
}
ASSERT_EQ(logger.Close(), Status::OK());
std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str());
size_t lines = std::count(std::istreambuf_iterator<char>(inFile),
std::istreambuf_iterator<char>(), '\n');
ASSERT_EQ(log_lines, lines);
inFile.close();
}
// Test the logger Header function for roll over logs // Test the logger Header function for roll over logs
// We expect the new logs creates as roll over to carry the headers specified // We expect the new logs creates as roll over to carry the headers specified
static std::vector<std::string> GetOldFileNames(const std::string& path) { static std::vector<std::string> GetOldFileNames(const std::string& path) {