Making persistent cache more resilient to filesystem failures
Summary: The persistent cache is designed to hop over errors and return key not found. So far, it has shown resilience to write errors, encoding errors, data corruption etc. It is not resilient against disappearing files/directories. This was exposed during testing when multiple instances of persistence cache was started sharing the same directory simulating an unpredictable filesystem environment. This patch - makes the write code path more resilient to errors while creating files - makes the read code path more resilient to handle situation where files are not found - added a test that does negative write/read testing by removing the directory while writes are in progress Closes https://github.com/facebook/rocksdb/pull/1472 Differential Revision: D4143413 Pulled By: kradhakrishnan fbshipit-source-id: fd25e9b
This commit is contained in:
parent
734e4acafb
commit
3068870cce
@ -11,6 +11,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "utilities/persistent_cache/block_cache_tier_file.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -54,8 +55,15 @@ Status BlockCacheTier::Open() {
|
||||
}
|
||||
}
|
||||
|
||||
// create a new file
|
||||
assert(!cache_file_);
|
||||
NewCacheFile();
|
||||
status = NewCacheFile();
|
||||
if (!status.ok()) {
|
||||
Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
|
||||
status.ToString().c_str());
|
||||
return status;
|
||||
}
|
||||
|
||||
assert(cache_file_);
|
||||
|
||||
if (opt_.pipeline_writes) {
|
||||
@ -231,7 +239,10 @@ Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
|
||||
}
|
||||
|
||||
assert(cache_file_->Eof());
|
||||
NewCacheFile();
|
||||
Status status = NewCacheFile();
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert into lookup index
|
||||
@ -280,7 +291,6 @@ Status BlockCacheTier::Lookup(const Slice& key, unique_ptr<char[]>* val,
|
||||
|
||||
status = file->Read(lba, &blk_key, &blk_val, scratch.get());
|
||||
--file->refs_;
|
||||
assert(status);
|
||||
if (!status) {
|
||||
stats_.cache_misses_++;
|
||||
stats_.cache_errors_++;
|
||||
@ -309,25 +319,36 @@ bool BlockCacheTier::Erase(const Slice& key) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void BlockCacheTier::NewCacheFile() {
|
||||
Status BlockCacheTier::NewCacheFile() {
|
||||
lock_.AssertHeld();
|
||||
|
||||
Info(opt_.log, "Creating cache file %d", writer_cache_id_);
|
||||
TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
|
||||
(void*)(GetCachePath().c_str()));
|
||||
|
||||
std::unique_ptr<WriteableCacheFile> f(
|
||||
new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
|
||||
GetCachePath(), writer_cache_id_,
|
||||
opt_.cache_file_size, opt_.log));
|
||||
|
||||
bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
|
||||
if (!status) {
|
||||
return Status::IOError("Error creating file");
|
||||
}
|
||||
|
||||
Info(opt_.log, "Created cache file %d", writer_cache_id_);
|
||||
|
||||
writer_cache_id_++;
|
||||
|
||||
cache_file_ = new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
|
||||
GetCachePath(), writer_cache_id_,
|
||||
opt_.cache_file_size, opt_.log);
|
||||
bool status;
|
||||
status =
|
||||
cache_file_->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
|
||||
assert(status);
|
||||
cache_file_ = f.release();
|
||||
|
||||
// insert to cache files tree
|
||||
status = metadata_.Insert(cache_file_);
|
||||
(void)status;
|
||||
assert(status);
|
||||
if (!status) {
|
||||
Error(opt_.log, "Error inserting to metadata");
|
||||
return Status::IOError("Error inserting to metadata");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool BlockCacheTier::Reserve(const size_t size) {
|
||||
|
@ -103,7 +103,7 @@ class BlockCacheTier : public PersistentCacheTier {
|
||||
// insert implementation
|
||||
Status InsertImpl(const Slice& key, const Slice& data);
|
||||
// Create a new cache file
|
||||
void NewCacheFile();
|
||||
Status NewCacheFile();
|
||||
// Get cache directory path
|
||||
std::string GetCachePath() const { return opt_.path + "/cache"; }
|
||||
// Cleanup folder
|
||||
|
@ -216,7 +216,10 @@ bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
|
||||
ReadLock _(&rwlock_);
|
||||
|
||||
assert(lba.cache_id_ == cache_id_);
|
||||
assert(file_);
|
||||
|
||||
if (!file_) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Slice result;
|
||||
Status s = file_->Read(lba.off_, lba.size_, &result, scratch);
|
||||
@ -259,11 +262,12 @@ WriteableCacheFile::~WriteableCacheFile() {
|
||||
// This file never flushed. We give priority to shutdown since this is a
|
||||
// cache
|
||||
// TODO(krad): Figure a way to flush the pending data
|
||||
assert(file_);
|
||||
|
||||
assert(refs_ == 1);
|
||||
--refs_;
|
||||
if (file_) {
|
||||
assert(refs_ == 1);
|
||||
--refs_;
|
||||
}
|
||||
}
|
||||
assert(!refs_);
|
||||
ClearBuffers();
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,32 @@ static void OnOpenForWrite(void* arg) {
|
||||
}
|
||||
#endif
|
||||
|
||||
static void RemoveDirectory(const std::string& folder) {
|
||||
std::vector<std::string> files;
|
||||
Status status = Env::Default()->GetChildren(folder, &files);
|
||||
if (!status.ok()) {
|
||||
// we assume the directory does not exist
|
||||
return;
|
||||
}
|
||||
|
||||
// cleanup files with the patter :digi:.rc
|
||||
for (auto file : files) {
|
||||
if (file == "." || file == "..") {
|
||||
continue;
|
||||
}
|
||||
status = Env::Default()->DeleteFile(folder + "/" + file);
|
||||
assert(status.ok());
|
||||
}
|
||||
|
||||
status = Env::Default()->DeleteDir(folder);
|
||||
assert(status.ok());
|
||||
}
|
||||
|
||||
static void OnDeleteDir(void* arg) {
|
||||
char* dir = static_cast<char*>(arg);
|
||||
RemoveDirectory(std::string(dir));
|
||||
}
|
||||
|
||||
//
|
||||
// Simple logger that prints message on stdout
|
||||
//
|
||||
@ -116,6 +142,21 @@ PersistentCacheTierTest::PersistentCacheTierTest()
|
||||
#endif
|
||||
}
|
||||
|
||||
// Block cache tests
|
||||
TEST_F(PersistentCacheTierTest, BlockCacheInsertWithFileCreateError) {
|
||||
cache_ = NewBlockCache(Env::Default(), path_,
|
||||
/*size=*/std::numeric_limits<uint64_t>::max(),
|
||||
/*direct_writes=*/ false);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"BlockCacheTier::NewCacheFile:DeleteDir", OnDeleteDir);
|
||||
|
||||
RunNegativeInsertTest(/*nthreads=*/ 1,
|
||||
/*max_keys*/
|
||||
static_cast<size_t>(10 * 1024 * kStressFactor));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
#ifdef TRAVIS
|
||||
// Travis is unable to handle the normal version of the tests running out of
|
||||
// fds, out of space and timeouts. This is an easier version of the test
|
||||
|
@ -132,7 +132,12 @@ class PersistentCacheTierTest : public testing::Test {
|
||||
memset(data, '0' + (i % 10), sizeof(data));
|
||||
auto k = prefix + PaddedNumber(i, /*count=*/8);
|
||||
Slice key(k);
|
||||
while (!cache_->Insert(key, data, sizeof(data)).ok()) {
|
||||
while (true) {
|
||||
Status status = cache_->Insert(key, data, sizeof(data));
|
||||
if (status.ok()) {
|
||||
break;
|
||||
}
|
||||
ASSERT_TRUE(status.IsTryAgain());
|
||||
Env::Default()->SleepForMicroseconds(1 * 1000 * 1000);
|
||||
}
|
||||
}
|
||||
@ -180,6 +185,17 @@ class PersistentCacheTierTest : public testing::Test {
|
||||
cache_.reset();
|
||||
}
|
||||
|
||||
// template for negative insert test
|
||||
void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) {
|
||||
Insert(nthreads, max_keys);
|
||||
Verify(nthreads, /*eviction_enabled=*/true);
|
||||
ASSERT_LT(stats_verify_hits_, max_keys);
|
||||
ASSERT_GT(stats_verify_missed_, 0);
|
||||
|
||||
cache_->Close();
|
||||
cache_.reset();
|
||||
}
|
||||
|
||||
// template for insert with eviction test
|
||||
void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) {
|
||||
Insert(nthreads, max_keys);
|
||||
|
Loading…
Reference in New Issue
Block a user