Fix bugs in WAL trash file handling (#5520)

Summary:
1. Cleanup WAL trash files on open
2. Don't apply deletion rate limit if WAL dir is different from db dir
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5520

Test Plan: Add new unit tests and make check

Differential Revision: D16096750

Pulled By: anand1976

fbshipit-source-id: 6f07858ad864b754b711db416f0389c45ede599b
This commit is contained in:
anand76 2019-07-06 21:04:22 -07:00
parent 3e1a6dd665
commit b6c329e406
11 changed files with 177 additions and 16 deletions

View File

@ -11,6 +11,7 @@
* Accessing a partition of a partitioned filter or index through a pinned reference is no longer considered a cache hit.
* The semantics of the per-block-type block read counts in the performance context now match those of the generic block_read_count.
* Add C bindings for secondary instance, i.e. DBImplSecondary.
* Rate limited deletion of WALs is only enabled if DBOptions::wal_dir is not set, or explicitly set to db_name passed to DB::Open and DBOptions::db_paths is empty, or same as db_paths[0].path
### New Features
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
@ -33,6 +34,7 @@
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family.
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
* On DB open, delete WAL trash files left behind in wal_dir
## 6.2.0 (4/30/2019)
### New Features

View File

@ -3148,6 +3148,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
// Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal
@ -3170,7 +3171,9 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile || type == kLogFile) {
del = DeleteDBFile(&soptions, path_to_delete, dbname);
del =
DeleteDBFile(&soptions, path_to_delete, dbname,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -3204,7 +3207,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(fname, &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
std::string table_path = path + "/" + fname;
Status del = DeleteDBFile(&soptions, table_path, dbname);
Status del = DeleteDBFile(&soptions, table_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3231,7 +3235,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
for (const auto& file : archiveFiles) {
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del =
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir);
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3246,7 +3251,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del =
DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
soptions.wal_dir);
soptions.wal_dir, /*force_bg=*/false,
/*force_fg=*/!wal_in_db_path);
if (result.ok() && !del.ok()) {
result = del;
}

View File

@ -1883,6 +1883,8 @@ class DBImpl : public DB {
// results sequentially. Flush results of memtables with lower IDs get
// installed to MANIFEST first.
InstrumentedCondVar atomic_flush_install_cv_;
bool wal_in_db_path_;
};
extern Options SanitizeOptions(const std::string& db, const Options& src);

View File

@ -258,7 +258,8 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
Status file_deletion_status;
if (type == kTableFile || type == kLogFile) {
file_deletion_status =
DeleteDBFile(&immutable_db_options_, fname, path_to_sync);
DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
} else {
file_deletion_status = env_->DeleteFile(fname);
}

View File

@ -122,6 +122,25 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
}
#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we
// cannot tell for sure. In either case, assume they're different and
// explicitly cleanup the trash log files (bypass DeleteScheduler)
// Do this first so even if we end up calling
// DeleteScheduler::CleanupDirectory on the same dir later, it will be
// safe
std::vector<std::string> filenames;
result.env->GetChildren(result.wal_dir, &filenames);
for (std::string& filename : filenames) {
if (filename.find(".log.trash",
filename.length() - std::string(".log.trash").length()) !=
std::string::npos) {
std::string trash_file = result.wal_dir + "/" + filename;
result.env->DeleteFile(trash_file);
}
}
}
// When the DB is stopped, it's possible that there are some .trash files that
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
@ -1294,6 +1313,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
delete impl;
return s;
}
impl->wal_in_db_path_ =
IsWalDirSameAsDBPath(&impl->immutable_db_options_);
impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists
s = impl->Recover(column_families);

View File

@ -470,6 +470,111 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
class DBWALTestWithParam
: public DBSSTTest,
public testing::WithParamInterface<std::tuple<std::string, bool>> {
public:
DBWALTestWithParam() {
wal_dir_ = std::get<0>(GetParam());
wal_dir_same_as_dbname_ = std::get<1>(GetParam());
}
std::string wal_dir_;
bool wal_dir_same_as_dbname_;
};
TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
Status DeleteFile(const std::string& fname) {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}
return target()->DeleteFile(fname);
}
void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
private:
bool fake_log_delete;
};
std::unique_ptr<MyEnv> env(new MyEnv(Env::Default()));
Destroy(last_options_);
env->set_fake_log_delete(true);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
options.env = env.get();
options.wal_dir = dbname_ + wal_dir_;
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
ASSERT_OK(TryReopen(options));
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Put("Key1", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
ASSERT_EQ("4", FilesPerLevel(0));
Close();
options.sst_file_manager.reset();
std::vector<std::string> filenames;
int trash_log_count = 0;
if (!wal_dir_same_as_dbname_) {
// Forcibly create some trash log files
std::unique_ptr<WritableFile> result;
env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
EnvOptions());
result.reset();
}
env->GetChildren(options.wal_dir, &filenames);
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_GE(trash_log_count, 1);
env->set_fake_log_delete(false);
ASSERT_OK(TryReopen(options));
filenames.clear();
trash_log_count = 0;
env->GetChildren(options.wal_dir, &filenames);
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_EQ(trash_log_count, 0);
Close();
}
INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
::testing::Values(std::make_tuple("", true),
std::make_tuple("_wal_dir", false)));
TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
Options options = CurrentOptions();

View File

@ -187,7 +187,8 @@ void WalManager::PurgeObsoleteWALFiles() {
continue;
}
if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
@ -213,7 +214,8 @@ void WalManager::PurgeObsoleteWALFiles() {
log_file_size = std::max(log_file_size, file_size);
++log_files_num;
} else {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Unable to delete file: %s: %s", file_path.c_str(),
@ -253,7 +255,8 @@ void WalManager::PurgeObsoleteWALFiles() {
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
db_options_.wal_dir, false);
db_options_.wal_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());

View File

@ -18,6 +18,7 @@
#include <memory>
#include "db/version_set.h"
#include "file/file_util.h"
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/env.h"
@ -40,7 +41,8 @@ class WalManager {
env_options_(env_options),
env_(db_options.env),
purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch) {}
seq_per_batch_(seq_per_batch),
wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)) {}
Status GetSortedWalFiles(VectorLogPtr& files);
@ -97,6 +99,8 @@ class WalManager {
bool seq_per_batch_;
bool wal_in_db_path_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;

View File

@ -88,12 +88,12 @@ Status CreateFile(Env* env, const std::string& destination,
}
Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg) {
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg, const bool force_fg) {
#ifndef ROCKSDB_LITE
SstFileManagerImpl* sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
if (sfm) {
if (sfm && !force_fg) {
return sfm->ScheduleFileDeletion(fname, dir_to_sync, force_bg);
} else {
return db_options->env->DeleteFile(fname);
@ -101,10 +101,21 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
#else
(void)dir_to_sync;
(void)force_bg;
(void)force_fg;
// SstFileManager is not supported in ROCKSDB_LITE
// Delete file immediately
return db_options->env->DeleteFile(fname);
#endif
}
bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) {
bool same = false;
Status s = db_options->env->AreFilesSame(db_options->wal_dir,
db_options->db_paths[0].path, &same);
if (s.IsNotSupported()) {
same = db_options->wal_dir == db_options->db_paths[0].path;
}
return same;
}
} // namespace rocksdb

View File

@ -24,7 +24,9 @@ extern Status CreateFile(Env* env, const std::string& destination,
extern Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname,
const std::string& path_to_sync,
const bool force_bg = false);
const std::string& path_to_sync, const bool force_bg,
const bool force_fg);
extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);
} // namespace rocksdb

View File

@ -1758,7 +1758,8 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
blob_files_.erase(bfile->BlobFileNumber());
Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
bfile->PathName(), blob_dir_, true);
bfile->PathName(), blob_dir_, true,
/*force_fg=*/false);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File failed to be deleted as obsolete %s",
@ -1848,7 +1849,8 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kBlobFile) {
Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true);
Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
/*force_fg=*/false);
if (status.ok() && !del.ok()) {
status = del;
}