WAL log retention policy based on archive size.

Summary:
Archive cleaning will still happen every WAL_ttl seconds
but archived logs will be deleted only if archive size
is greater then a WAL_size_limit value.
Empty archived logs will be deleted evety WAL_ttl.

Test Plan:
1. Unit tests pass.
2. Benchmark.

Reviewers: emayanke, dhruba, haobo, sdong, kailiu, igor

Reviewed By: emayanke

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13869
This commit is contained in:
shamdor 2013-11-06 18:46:28 -08:00
parent 292c2b3357
commit c2be2cba04
11 changed files with 228 additions and 59 deletions

View File

@ -554,6 +554,11 @@ void leveldb_options_set_WAL_ttl_seconds(leveldb_options_t* opt, uint64_t ttl) {
opt->rep.WAL_ttl_seconds = ttl; opt->rep.WAL_ttl_seconds = ttl;
} }
void leveldb_options_set_WAL_size_limit_MB(
leveldb_options_t* opt, uint64_t limit) {
opt->rep.WAL_size_limit_MB = limit;
}
leveldb_comparator_t* leveldb_comparator_create( leveldb_comparator_t* leveldb_comparator_create(
void* state, void* state,
void (*destructor)(void*), void (*destructor)(void*),

View File

@ -397,7 +397,9 @@ DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
" a compaction run that compacts Level-K with Level-(K+1) (for" " a compaction run that compacts Level-K with Level-(K+1) (for"
" K >= 1)"); " K >= 1)");
DEFINE_uint64(wal_ttl, 0, "Set the TTL for the WAL Files in seconds."); DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
" in MB.");
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer, DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
"Allow buffered io using OS buffers"); "Allow buffered io using OS buffers");
@ -1352,7 +1354,8 @@ class Benchmark {
options.level0_slowdown_writes_trigger = options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger; FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type_e; options.compression = FLAGS_compression_type_e;
options.WAL_ttl_seconds = FLAGS_wal_ttl; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
if (FLAGS_min_level_to_compress >= 0) { if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level.resize(FLAGS_num_levels); options.compression_per_level.resize(FLAGS_num_levels);

View File

@ -264,6 +264,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
delete_obsolete_files_last_run_(0), delete_obsolete_files_last_run_(0),
purge_wal_files_last_run_(0), purge_wal_files_last_run_(0),
last_stats_dump_time_microsec_(0), last_stats_dump_time_microsec_(0),
default_interval_to_delete_obsolete_WAL_(600),
stall_level0_slowdown_(0), stall_level0_slowdown_(0),
stall_memtable_compaction_(0), stall_memtable_compaction_(0),
stall_level0_num_files_(0), stall_level0_num_files_(0),
@ -407,7 +408,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
} }
const Status DBImpl::CreateArchivalDirectory() { const Status DBImpl::CreateArchivalDirectory() {
if (options_.WAL_ttl_seconds > 0) { if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
std::string archivalPath = ArchivalDirectory(options_.wal_dir); std::string archivalPath = ArchivalDirectory(options_.wal_dir);
return env_->CreateDirIfMissing(archivalPath); return env_->CreateDirIfMissing(archivalPath);
} }
@ -494,7 +495,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
Status DBImpl::DeleteLogFile(uint64_t number) { Status DBImpl::DeleteLogFile(uint64_t number) {
Status s; Status s;
auto filename = LogFileName(options_.wal_dir, number); auto filename = LogFileName(options_.wal_dir, number);
if (options_.WAL_ttl_seconds > 0) { if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
s = env_->RenameFile(filename, s = env_->RenameFile(filename,
ArchivedLogFileName(options_.wal_dir, number)); ArchivedLogFileName(options_.wal_dir, number));
@ -613,34 +614,128 @@ void DBImpl::DeleteObsoleteFiles() {
EvictObsoleteFiles(deletion_state); EvictObsoleteFiles(deletion_state);
} }
// 1. Go through all archived files and
// a. if ttl is enabled, delete outdated files
// b. if archive size limit is enabled, delete empty files,
// compute file number and size.
// 2. If size limit is enabled:
// a. compute how many files should be deleted
// b. get sorted non-empty archived logs
// c. delete what should be deleted
void DBImpl::PurgeObsoleteWALFiles() { void DBImpl::PurgeObsoleteWALFiles() {
int64_t current_time; bool const ttl_enabled = options_.WAL_ttl_seconds > 0;
Status s = env_->GetCurrentTime(&current_time); bool const size_limit_enabled = options_.WAL_size_limit_MB > 0;
uint64_t now_seconds = static_cast<uint64_t>(current_time); if (!ttl_enabled && !size_limit_enabled) {
assert(s.ok());
if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) {
if (purge_wal_files_last_run_ + options_.WAL_ttl_seconds > now_seconds) {
return; return;
} }
std::vector<std::string> wal_files;
std::string archival_dir = ArchivalDirectory(options_.wal_dir); int64_t current_time;
env_->GetChildren(archival_dir, &wal_files); Status s = env_->GetCurrentTime(&current_time);
for (const auto& f : wal_files) { if (!s.ok()) {
uint64_t file_m_time; Log(options_.info_log, "Can't get current time: %s", s.ToString().c_str());
const std::string file_path = archival_dir + "/" + f; assert(false);
const Status s = env_->GetFileModificationTime(file_path, &file_m_time); return;
if (s.ok() && (now_seconds - file_m_time > options_.WAL_ttl_seconds)) {
Status status = env_->DeleteFile(file_path);
if (!status.ok()) {
Log(options_.info_log,
"Failed Deleting a WAL file Error : i%s",
status.ToString().c_str());
}
} // Ignore errors.
} }
uint64_t const now_seconds = static_cast<uint64_t>(current_time);
uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ?
options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_;
if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
return;
} }
purge_wal_files_last_run_ = now_seconds; purge_wal_files_last_run_ = now_seconds;
std::string archival_dir = ArchivalDirectory(options_.wal_dir);
std::vector<std::string> files;
s = env_->GetChildren(archival_dir, &files);
if (!s.ok()) {
Log(options_.info_log, "Can't get archive files: %s", s.ToString().c_str());
assert(false);
return;
}
size_t log_files_num = 0;
uint64_t log_file_size = 0;
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
std::string const file_path = archival_dir + "/" + f;
if (ttl_enabled) {
uint64_t file_m_time;
Status const s = env_->GetFileModificationTime(file_path,
&file_m_time);
if (!s.ok()) {
Log(options_.info_log, "Can't get file mod time: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
}
if (now_seconds - file_m_time > options_.WAL_ttl_seconds) {
Status const s = env_->DeleteFile(file_path);
if (!s.ok()) {
Log(options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
}
continue;
}
}
if (size_limit_enabled) {
uint64_t file_size;
Status const s = env_->GetFileSize(file_path, &file_size);
if (!s.ok()) {
Log(options_.info_log, "Can't get file size: %s: %s",
file_path.c_str(), s.ToString().c_str());
return;
} else {
if (file_size > 0) {
log_file_size = std::max(log_file_size, file_size);
++log_files_num;
} else {
Status s = env_->DeleteFile(file_path);
if (!s.ok()) {
Log(options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
}
}
}
}
}
}
if (0 == log_files_num || !size_limit_enabled) {
return;
}
size_t const files_keep_num = options_.WAL_size_limit_MB *
1024 * 1024 / log_file_size;
if (log_files_num <= files_keep_num) {
return;
}
size_t files_del_num = log_files_num - files_keep_num;
VectorLogPtr archived_logs;
AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
if (files_del_num > archived_logs.size()) {
Log(options_.info_log, "Trying to delete more archived log files than "
"exist. Deleting all");
files_del_num = archived_logs.size();
}
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
Status const s = DeleteFile(file_path);
if (!s.ok()) {
Log(options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
}
}
} }
// If externalTable is set, then apply recovered transactions // If externalTable is set, then apply recovered transactions

View File

@ -116,6 +116,11 @@ class DBImpl : public DB {
// get total level0 file size. Only for testing. // get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);} uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);}
void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL)
{
default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
}
protected: protected:
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
@ -323,6 +328,10 @@ class DBImpl : public DB {
// last time stats were dumped to LOG // last time stats were dumped to LOG
std::atomic<uint64_t> last_stats_dump_time_microsec_; std::atomic<uint64_t> last_stats_dump_time_microsec_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
uint64_t default_interval_to_delete_obsolete_WAL_;
// These count the number of microseconds for which MakeRoomForWrite stalls. // These count the number of microseconds for which MakeRoomForWrite stalls.
uint64_t stall_level0_slowdown_; uint64_t stall_level0_slowdown_;
uint64_t stall_memtable_compaction_; uint64_t stall_memtable_compaction_;

View File

@ -3811,28 +3811,28 @@ std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
return std::move(log_files); return std::move(log_files);
} }
TEST(DBTest, WALArchival) { TEST(DBTest, WALArchivalTtl) {
do { do {
std::string value(1024, '1');
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
options.WAL_ttl_seconds = 1000; options.WAL_ttl_seconds = 1000;
DestroyAndReopen(&options); DestroyAndReopen(&options);
// TEST : Create DB with a ttl and no size limit.
// TEST : Create DB with a ttl.
// Put some keys. Count the log files present in the DB just after insert. // Put some keys. Count the log files present in the DB just after insert.
// Re-open db. Causes deletion/archival to take place. // Re-open db. Causes deletion/archival to take place.
// Assert that the files moved under "/archive". // Assert that the files moved under "/archive".
// Reopen db with small ttl.
// Assert that archive was removed.
std::string archiveDir = ArchivalDirectory(dbname_); std::string archiveDir = ArchivalDirectory(dbname_);
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
for (int j = 0; j < 10; ++j) { for (int j = 0; j < 10; ++j) {
ASSERT_OK(Put(Key(10*i+j), value)); ASSERT_OK(Put(Key(10 * i + j), DummyString(1024)));
} }
std::vector<uint64_t> logFiles = ListLogFiles(env_, dbname_); std::vector<uint64_t> log_files = ListLogFiles(env_, dbname_);
options.create_if_missing = false; options.create_if_missing = false;
Reopen(&options); Reopen(&options);
@ -3840,37 +3840,78 @@ TEST(DBTest, WALArchival) {
std::vector<uint64_t> logs = ListLogFiles(env_, archiveDir); std::vector<uint64_t> logs = ListLogFiles(env_, archiveDir);
std::set<uint64_t> archivedFiles(logs.begin(), logs.end()); std::set<uint64_t> archivedFiles(logs.begin(), logs.end());
for (auto& log : logFiles) { for (auto& log : log_files) {
ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end()); ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end());
} }
} }
std::vector<uint64_t> logFiles = ListLogFiles(env_, archiveDir); std::vector<uint64_t> log_files = ListLogFiles(env_, archiveDir);
ASSERT_TRUE(logFiles.size() > 0); ASSERT_TRUE(log_files.size() > 0);
options.WAL_ttl_seconds = 1; options.WAL_ttl_seconds = 1;
env_->SleepForMicroseconds(2 * 1000 * 1000); env_->SleepForMicroseconds(2 * 1000 * 1000);
Reopen(&options); Reopen(&options);
logFiles = ListLogFiles(env_, archiveDir); log_files = ListLogFiles(env_, archiveDir);
ASSERT_TRUE(logFiles.size() == 0); ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST(DBTest, WALClear) { uint64_t GetLogDirSize(std::string dir_path, SpecialEnv* env) {
uint64_t dir_size = 0;
std::vector<std::string> files;
env->GetChildren(dir_path, &files);
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
std::string const file_path = dir_path + "/" + f;
uint64_t file_size;
env->GetFileSize(file_path, &file_size);
dir_size += file_size;
}
}
return dir_size;
}
TEST(DBTest, WALArchivalSizeLimit) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
options.WAL_ttl_seconds = 1; options.WAL_ttl_seconds = 0;
options.WAL_size_limit_MB = 1000;
for (int j = 0; j < 10; ++j) // TEST : Create DB with huge size limit and no ttl.
for (int i = 0; i < 10; ++i) // Put some keys. Count the archived log files present in the DB
ASSERT_OK(Put(Key(10*i+j), DummyString(1024))); // just after insert. Assert that there are many enough.
// Change size limit. Re-open db.
// Assert that archive is not greater than WAL_size_limit_MB.
// Set ttl and time_to_check_ to small values. Re-open db.
// Assert that there are no archived logs left.
DestroyAndReopen(&options);
for (int i = 0; i < 128 * 128; ++i) {
ASSERT_OK(Put(Key(i), DummyString(1024)));
}
Reopen(&options); Reopen(&options);
std::string archive_dir = ArchivalDirectory(dbname_); std::string archive_dir = ArchivalDirectory(dbname_);
std::vector<std::uint64_t> log_files = ListLogFiles(env_, archive_dir); std::vector<std::uint64_t> log_files = ListLogFiles(env_, archive_dir);
ASSERT_TRUE(!log_files.empty()); ASSERT_TRUE(log_files.size() > 2);
env_->SleepForMicroseconds(2 * 1000 * 1000);
options.WAL_size_limit_MB = 8;
Reopen(&options);
dbfull()->TEST_PurgeObsoleteteWAL(); dbfull()->TEST_PurgeObsoleteteWAL();
uint64_t archive_size = GetLogDirSize(archive_dir, env_);
ASSERT_TRUE(archive_size <= options.WAL_size_limit_MB * 1024 * 1024);
options.WAL_ttl_seconds = 1;
dbfull()->TEST_SetDefaultTimeToCheck(1);
env_->SleepForMicroseconds(2 * 1000 * 1000);
Reopen(&options);
dbfull()->TEST_PurgeObsoleteteWAL();
log_files = ListLogFiles(env_, archive_dir); log_files = ListLogFiles(env_, archive_dir);
ASSERT_TRUE(log_files.empty()); ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());

View File

@ -38,6 +38,7 @@ class DeleteFileTest {
options_.target_file_size_base = 1024*1024*1000; options_.target_file_size_base = 1024*1024*1000;
options_.max_bytes_for_level_base = 1024*1024*1000; options_.max_bytes_for_level_base = 1024*1024*1000;
options_.WAL_ttl_seconds = 300; // Used to test log files options_.WAL_ttl_seconds = 300; // Used to test log files
options_.WAL_size_limit_MB = 1024; // Used to test log files
dbname_ = test::TmpDir() + "/deletefile_test"; dbname_ = test::TmpDir() + "/deletefile_test";
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);
numlevels_ = 7; numlevels_ = 7;

View File

@ -264,9 +264,10 @@ class DB {
// seq_number. If the sequence number is non existent, it returns an iterator // seq_number. If the sequence number is non existent, it returns an iterator
// at the first available seq_no after the requested seq_no // at the first available seq_no after the requested seq_no
// Returns Status::Ok if iterator is valid // Returns Status::Ok if iterator is valid
// Must set WAL_ttl_seconds to a large value to use this api, else the WAL // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
// files will get cleared aggressively and the iterator might keep getting // use this api, else the WAL files will get
// invalid before an update is read. // cleared aggressively and the iterator might keep getting invalid before
// an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) = 0; unique_ptr<TransactionLogIterator>* iter) = 0;

View File

@ -489,15 +489,20 @@ struct Options {
// be issued on this database. // be issued on this database.
bool disable_auto_compactions; bool disable_auto_compactions;
// The number of seconds a WAL(write ahead log) should be kept after it has // The following two fields affect how archived logs will be deleted.
// been marked as Not Live. If the value is set. The WAL files are moved to // 1. If both set to 0, logs will be deleted asap and will not get into
// the archive directory and deleted after the given TTL. // the archive.
// If set to 0, WAL files are deleted as soon as they are not required by // 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0,
// the database. // WAL files will be checked every 10 min and if total size is greater
// If set to std::numeric_limits<uint64_t>::max() the WAL files will never be // then WAL_size_limit_MB, they will be deleted starting with the
// deleted. // earliest until size_limit is met. All empty files will be deleted.
// Default : 0 // 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then
// WAL files will be checked every WAL_ttl_secondsi / 2 and those which
// are older than WAL_ttl_seconds will be deleted.
// 4. If both are not 0, WAL files will be checked every 10 min and both
// checks will be performed with ttl being first.
uint64_t WAL_ttl_seconds; uint64_t WAL_ttl_seconds;
uint64_t WAL_size_limit_MB;
// Number of bytes to preallocate (via fallocate) the manifest // Number of bytes to preallocate (via fallocate) the manifest
// files. Default is 4mb, which is reasonable to reduce random IO // files. Default is 4mb, which is reasonable to reduce random IO

View File

@ -16,7 +16,9 @@ typedef std::vector<std::unique_ptr<LogFile>> VectorLogPtr;
enum WalFileType { enum WalFileType {
/* Indicates that WAL file is in archive directory. WAL files are moved from /* Indicates that WAL file is in archive directory. WAL files are moved from
* the main db directory to archive directory once they are not live and stay * the main db directory to archive directory once they are not live and stay
* there for a duration of WAL_ttl_seconds which can be set in Options * there until cleaned up. Files are cleaned depending on archive size
* (Options::WAL_size_limit_MB) and time since last cleaning
* (Options::WAL_ttl_seconds).
*/ */
kArchivedLogFile = 0, kArchivedLogFile = 0,

View File

@ -80,11 +80,14 @@ static void ReplicationThreadBody(void* arg) {
DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should" DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should"
" perform."); " perform.");
DEFINE_uint64(wal_ttl, 1000, "the wal ttl for the run(in seconds)"); DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
DEFINE_uint64(wal_size_limit_MB, 10, "the wal size limit for the run"
"(in MB)");
int main(int argc, const char** argv) { int main(int argc, const char** argv) {
google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
" --num_inserts=<num_inserts> --wal_ttl=<WAL_ttl_seconds>"); " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
" --wal_size_limit_MB=<WAL_size_limit_MB>");
google::ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true); google::ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
Env* env = Env::Default(); Env* env = Env::Default();
@ -93,7 +96,8 @@ int main(int argc, const char** argv) {
default_db_path += "db_repl_stress"; default_db_path += "db_repl_stress";
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.WAL_ttl_seconds = FLAGS_wal_ttl; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
DB* db; DB* db;
DestroyDB(default_db_path, options); DestroyDB(default_db_path, options);

View File

@ -77,6 +77,7 @@ Options::Options()
arena_block_size(0), arena_block_size(0),
disable_auto_compactions(false), disable_auto_compactions(false),
WAL_ttl_seconds(0), WAL_ttl_seconds(0),
WAL_size_limit_MB(0),
manifest_preallocation_size(4 * 1024 * 1024), manifest_preallocation_size(4 * 1024 * 1024),
purge_redundant_kvs_while_flush(true), purge_redundant_kvs_while_flush(true),
allow_os_buffer(true), allow_os_buffer(true),
@ -237,6 +238,8 @@ Options::Dump(Logger* log) const
disable_auto_compactions); disable_auto_compactions);
Log(log," Options.WAL_ttl_seconds: %ld", Log(log," Options.WAL_ttl_seconds: %ld",
WAL_ttl_seconds); WAL_ttl_seconds);
Log(log," Options.WAL_size_limit_MB: %ld",
WAL_size_limit_MB);
Log(log," Options.manifest_preallocation_size: %ld", Log(log," Options.manifest_preallocation_size: %ld",
manifest_preallocation_size); manifest_preallocation_size);
Log(log," Options.purge_redundant_kvs_while_flush: %d", Log(log," Options.purge_redundant_kvs_while_flush: %d",