Added sequence number hint for GetSortedWalFiles

Added an optional SequenceNumber hint to WAL manager's GetSortedWalFiles
API. This allows GetSortedWalFiles to prune irrelevant files early. It
will use the sequence number cache for this. This requires a scan over
the sequence number cache, which will take time proportional to the
number of entries in the cache. However, it may still be a lot better
than peeking into the WAL files and performing lots of IO.
The change helps when there are many (archived) WAL files and the first
call to GetUpdatesSince() with a sequence number close to the tip of the
WAL. Previously, this would open _all_ available WAL files and read
their first entries to find out the lowest sequence number per file.
With the change, we now keep track of the lowest sequence number as we
open files, so that we can ignore all WAL files with a log number that
is lower than the log number we will need.
Note that the sequence number given to GetSortedWalFiles is only a hint,
which is used as a performance optimization.
The API can still return more files than needed, so that callers may
still want to post-filter its result (GetUpdatesSince does so). But in
the ideal case the hint allows performing less file IOs or at least
building up smaller in-memory results.
This commit is contained in:
jsteemann 2022-04-22 13:01:19 +02:00
parent fff28a7725
commit 81229b8f39
3 changed files with 171 additions and 11 deletions

View File

@ -36,6 +36,11 @@ namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
void WalManager::TEST_ClearFirstRecordCache() {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.clear();
}
Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
if (s.ok()) {
@ -45,14 +50,14 @@ Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
return s;
}
Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
Status WalManager::GetSortedWalFiles(VectorLogPtr& files, SequenceNumber seq) {
// First get sorted files in db dir, then get sorted files from archived
// dir, to avoid a race condition where a log file is moved to archived
// dir in between.
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile);
s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile, seq);
if (!s.ok()) {
return s;
}
@ -68,7 +73,7 @@ Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
std::string archivedir = ArchivalDirectory(wal_dir_);
Status exists = env_->FileExists(archivedir);
if (exists.ok()) {
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile, seq);
if (!s.ok()) {
return s;
}
@ -114,8 +119,8 @@ Status WalManager::GetUpdatesSince(
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
Status s = GetSortedWalFiles(*wal_files);
auto wal_files = std::make_unique<VectorLogPtr>();
Status s = GetSortedWalFiles(*wal_files, seq);
if (!s.ok()) {
return s;
}
@ -124,9 +129,9 @@ Status WalManager::GetUpdatesSince(
if (!s.ok()) {
return s;
}
iter->reset(new TransactionLogIteratorImpl(
*iter = std::make_unique<TransactionLogIteratorImpl>(
wal_dir_, &db_options_, read_options, file_options_, seq,
std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
std::move(wal_files), version_set, seq_per_batch_, io_tracer_);
return (*iter)->status();
}
@ -289,7 +294,35 @@ void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
Status WalManager::GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
WalFileType log_type) {
WalFileType log_type,
SequenceNumber seq_hint) {
uint64_t min_logfile_number_to_keep = 0;
if (seq_hint > 0) {
// target lower bound sequence number was specified.
// try to find the minimum logfile number that contains the
// target range, by checking the logfile to sequence number
// cache.
// this is only a performance optimization to avoid peeking
// into every logfile. if the cache is not yet populated, we
// may look into more logfiles than necessary, but the result
// should still be correct.
SequenceNumber min_sequence_found = 0;
MutexLock l(&read_first_record_cache_mutex_);
for (auto const& entry : read_first_record_cache_) {
// the cache maps logfile numbers to sequence numbers
uint64_t file_number = entry.first;
SequenceNumber file_seq = entry.second;
if (file_seq <= seq_hint) {
if (min_sequence_found == 0 || file_seq > min_sequence_found) {
min_logfile_number_to_keep = file_number;
min_sequence_found = file_seq;
}
}
}
assert(min_sequence_found <= seq_hint);
}
std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) {
@ -300,6 +333,12 @@ Status WalManager::GetSortedWalsOfType(const std::string& path,
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kWalFile) {
if (number < min_logfile_number_to_keep) {
// quickly filter out logfiles if they contain data older
// than what we are interested in
continue;
}
SequenceNumber sequence;
Status s = ReadFirstRecord(log_type, number, &sequence);
if (!s.ok()) {
@ -310,6 +349,17 @@ Status WalManager::GetSortedWalsOfType(const std::string& path,
continue;
}
if (sequence <= seq_hint) {
// found a logfile with data that is potentially older than what
// we are interested in.
if (min_logfile_number_to_keep == 0 ||
min_logfile_number_to_keep > number) {
// adjust our lower bound, so that we can ignore all files older
// than this one.
min_logfile_number_to_keep = number;
}
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
@ -377,7 +427,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
SequenceNumber* sequence) {
*sequence = 0;
if (type != kAliveLogFile && type != kArchivedLogFile) {
ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
ROCKS_LOG_ERROR(db_options_.info_log, "[WalManager] Unknown file type %s",
ToString(type).c_str());
return Status::NotSupported(
"File Type Not Known " + ToString(type));

View File

@ -49,7 +49,13 @@ class WalManager {
wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()),
io_tracer_(io_tracer) {}
Status GetSortedWalFiles(VectorLogPtr& files);
// Get WAL files sorted by file number. The lower bound sequence number
// seq_hint can be given as an optional hint if only certain WAL files are
// needed. The hint is a performance optimization only. The function can
// still return files outside the specified range, so its result needs to
// be post-filtered to restrict the files to a certain range of sequence
// numbers.
Status GetSortedWalFiles(VectorLogPtr& files, SequenceNumber seq_hint = 0);
// Allow user to tail transaction log to find all recent changes to the
// database that are newer than `seq_number`.
@ -62,6 +68,8 @@ class WalManager {
void ArchiveWALFile(const std::string& fname, uint64_t number);
void TEST_ClearFirstRecordCache();
Status DeleteFile(const std::string& fname, uint64_t number);
Status GetLiveWalFile(uint64_t number, std::unique_ptr<LogFile>* log_file);
@ -77,8 +85,14 @@ class WalManager {
}
private:
// Get WAL files of specified type, sorted by file number.
// The lower bound sequence number seq_hint can be given as an optional hint
// if only certain WAL files are needed. The hint is a performance
// optimization only. The function can still return files outside the
// specified range, so its result needs to be post-filtered to restrict the
// files to a certain range of sequence numbers.
Status GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files,
WalFileType type);
WalFileType type, SequenceNumber seq_hint = 0);
// Requires: all_logs should be sorted with earliest log file first
// Retains all log files in all_logs which contain updates with seq no.
// Greater Than or Equal to the requested SequenceNumber.

View File

@ -280,6 +280,102 @@ TEST_F(WalManagerTest, WALArchivalTtl) {
ASSERT_TRUE(log_files.empty());
}
TEST_F(WalManagerTest, GetWalFilesWithSequenceNumberHint) {
Init();
RollTheLog(false);
Put("key0", std::string(1024, 'a'));
SequenceNumber seq0 = versions_->LastSequence();
Put("key1", std::string(1024, 'a'));
SequenceNumber seq1 = versions_->LastSequence();
RollTheLog(false);
Put("key2", std::string(1024, 'a'));
SequenceNumber seq2 = versions_->LastSequence();
// test number of logfiles
auto get_num_files = [&](SequenceNumber seq, bool clear_cache) -> size_t {
if (clear_cache) {
wal_manager_->TEST_ClearFirstRecordCache();
}
VectorLogPtr v;
wal_manager_->GetSortedWalFiles(v, seq);
return v.size();
};
// the initial get_num_files call populates the cache for all following
// operations
ASSERT_EQ(2, get_num_files(seq0, false));
ASSERT_EQ(2, get_num_files(seq1, false));
ASSERT_EQ(1, get_num_files(seq2, false));
ASSERT_EQ(1, get_num_files(seq2 + 1, false));
ASSERT_EQ(1, get_num_files(seq2 + 1000, false));
ASSERT_EQ(2, get_num_files(seq0, true));
ASSERT_EQ(2, get_num_files(seq1, true));
ASSERT_GE(get_num_files(seq2, true), 1);
// test number of records
auto get_num_records = [&](SequenceNumber seq, bool clear_cache) -> size_t {
if (clear_cache) {
wal_manager_->TEST_ClearFirstRecordCache();
}
auto iter = OpenTransactionLogIter(seq);
return CountRecords(iter.get());
};
ASSERT_EQ(3, get_num_records(seq0, true));
ASSERT_EQ(3, get_num_records(seq0, false));
ASSERT_EQ(2, get_num_records(seq1, true));
ASSERT_EQ(2, get_num_records(seq1, false));
ASSERT_EQ(1, get_num_records(seq2, true));
ASSERT_EQ(1, get_num_records(seq2, false));
ASSERT_EQ(0, get_num_records(seq2 + 1, true));
ASSERT_EQ(0, get_num_records(seq2 + 1, false));
ASSERT_EQ(0, get_num_records(seq2 + 1000, true));
ASSERT_EQ(0, get_num_records(seq2 + 1000, false));
}
TEST_F(WalManagerTest, GetWalFilesWithSequenceNumberHintManyFiles) {
Init();
RollTheLog(false);
// write some initial key so our start sequence number is not 0.
Put("foo", "bar");
SequenceNumber seq0 = versions_->LastSequence();
ASSERT_NE(0, seq0);
constexpr size_t n = 1000;
for (size_t i = 0; i < n; ++i) {
std::string key = "key" + std::to_string(i);
Put(key, key);
if (i > 0 && i % 2 == 0) {
RollTheLog(false);
}
}
SequenceNumber seq1 = versions_->LastSequence();
ASSERT_EQ(n, seq1 - seq0);
// test number of logfiles
auto get_num_files = [&](SequenceNumber seq) -> size_t {
VectorLogPtr v;
wal_manager_->GetSortedWalFiles(v, seq);
return v.size();
};
// the initial get_num_files call populates the cache for all following
// operations
ASSERT_EQ(n / 2, get_num_files(0));
ASSERT_EQ(n / 2, get_num_files(seq0));
ASSERT_EQ(n / 4 + 1, get_num_files(seq0 + n / 2));
ASSERT_EQ(1, get_num_files(seq1));
ASSERT_EQ(1, get_num_files(seq1 + 1));
}
TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
Init();
RollTheLog(false);