BlobDB: handle IO error on read (#4410)

Summary:
Fix IO error on read not being handle and crashing the DB. With the fix we properly return the error.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4410

Differential Revision: D9979246

Pulled By: yiwu-arbug

fbshipit-source-id: 111a85675067a29c03cb60e9a34103f4ff636694
This commit is contained in:
Yi Wu 2018-09-20 16:50:07 -07:00
parent e36e75d7c8
commit e90493a865
7 changed files with 73 additions and 24 deletions

View File

@ -197,6 +197,15 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
return s; return s;
} }
Status FaultInjectionTestEnv::NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) {
if (!IsFilesystemActive()) {
return GetError();
}
return target()->NewRandomAccessFile(fname, result, soptions);
}
Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
if (!IsFilesystemActive()) { if (!IsFilesystemActive()) {
return GetError(); return GetError();

View File

@ -111,6 +111,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
unique_ptr<WritableFile>* result, unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override; const EnvOptions& soptions) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;
virtual Status DeleteFile(const std::string& f) override; virtual Status DeleteFile(const std::string& f) override;
virtual Status RenameFile(const std::string& s, virtual Status RenameFile(const std::string& s,

View File

@ -304,13 +304,17 @@ void BlobDBImpl::CloseRandomAccessLocked(
open_file_count_--; open_file_count_--;
} }
std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader( Status BlobDBImpl::GetBlobFileReader(
const std::shared_ptr<BlobFile>& bfile, Env* env, const std::shared_ptr<BlobFile>& blob_file,
const EnvOptions& env_options) { std::shared_ptr<RandomAccessFileReader>* reader) {
assert(reader != nullptr);
bool fresh_open = false; bool fresh_open = false;
auto rar = bfile->GetOrOpenRandomAccessReader(env, env_options, &fresh_open); Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
if (fresh_open) open_file_count_++; if (s.ok() && fresh_open) {
return rar; assert(*reader != nullptr);
open_file_count_++;
}
return s;
} }
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) { std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
@ -998,8 +1002,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
} }
// takes locks when called // takes locks when called
std::shared_ptr<RandomAccessFileReader> reader = std::shared_ptr<RandomAccessFileReader> reader;
GetOrOpenRandomAccessReader(bfile, env_, env_options_); s = GetBlobFileReader(bfile, &reader);
if (!s.ok()) {
return s;
}
assert(blob_index.offset() > key.size() + sizeof(uint32_t)); assert(blob_index.offset() > key.size() + sizeof(uint32_t));
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t); uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);

View File

@ -296,11 +296,8 @@ class BlobDBImpl : public BlobDB {
// Open all blob files found in blob_dir. // Open all blob files found in blob_dir.
Status OpenAllBlobFiles(); Status OpenAllBlobFiles();
// hold write mutex on file and call Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
// creates a Random Access reader for GET call std::shared_ptr<RandomAccessFileReader>* reader);
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
const std::shared_ptr<BlobFile>& bfile, Env* env,
const EnvOptions& env_options);
// hold write mutex on file and call. // hold write mutex on file and call.
// Close the above Random Access reader // Close the above Random Access reader

View File

@ -16,6 +16,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/debug.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/fault_injection_test_env.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -40,6 +41,7 @@ class BlobDBTest : public testing::Test {
BlobDBTest() BlobDBTest()
: dbname_(test::PerThreadDBPath("blob_db_test")), : dbname_(test::PerThreadDBPath("blob_db_test")),
mock_env_(new MockTimeEnv(Env::Default())), mock_env_(new MockTimeEnv(Env::Default())),
fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
blob_db_(nullptr) { blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok()); assert(s.ok());
@ -236,6 +238,7 @@ class BlobDBTest : public testing::Test {
const std::string dbname_; const std::string dbname_;
std::unique_ptr<MockTimeEnv> mock_env_; std::unique_ptr<MockTimeEnv> mock_env_;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
BlobDB *blob_db_; BlobDB *blob_db_;
}; // class BlobDBTest }; // class BlobDBTest
@ -354,6 +357,23 @@ TEST_F(BlobDBTest, GetExpiration) {
ASSERT_EQ(300 /* = 100 + 200 */, expiration); ASSERT_EQ(300 /* = 100 + 200 */, expiration);
} }
TEST_F(BlobDBTest, GetIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0; // Make sure value write to blob file
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
PinnableSlice value;
ASSERT_OK(Put("foo", "bar"));
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
ASSERT_TRUE(s.IsIOError());
// Reactivate file system to allow test to close DB.
fault_injection_env_->SetFilesystemActive(true);
}
TEST_F(BlobDBTest, WriteBatch) { TEST_F(BlobDBTest, WriteBatch) {
Random rnd(301); Random rnd(301);
BlobDBOptions bdb_options; BlobDBOptions bdb_options;
@ -461,7 +481,6 @@ TEST_F(BlobDBTest, DecompressAfterReopen) {
Reopen(bdb_options); Reopen(bdb_options);
VerifyDB(data); VerifyDB(data);
} }
#endif #endif
TEST_F(BlobDBTest, MultipleWriters) { TEST_F(BlobDBTest, MultipleWriters) {

View File

@ -191,36 +191,48 @@ void BlobFile::CloseRandomAccessLocked() {
last_access_ = -1; last_access_ = -1;
} }
std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader( Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
Env* env, const EnvOptions& env_options, bool* fresh_open) { std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open) {
assert(reader != nullptr);
assert(fresh_open != nullptr);
*fresh_open = false; *fresh_open = false;
int64_t current_time = 0; int64_t current_time = 0;
env->GetCurrentTime(&current_time); env->GetCurrentTime(&current_time);
last_access_.store(current_time); last_access_.store(current_time);
Status s;
{ {
ReadLock lockbfile_r(&mutex_); ReadLock lockbfile_r(&mutex_);
if (ra_file_reader_) return ra_file_reader_; if (ra_file_reader_) {
*reader = ra_file_reader_;
return s;
}
} }
WriteLock lockbfile_w(&mutex_); WriteLock lockbfile_w(&mutex_);
if (ra_file_reader_) return ra_file_reader_; // Double check.
if (ra_file_reader_) {
*reader = ra_file_reader_;
return s;
}
std::unique_ptr<RandomAccessFile> rfile; std::unique_ptr<RandomAccessFile> rfile;
Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options); s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file for random-read: %s status: '%s'" "Failed to open blob file for random-read: %s status: '%s'"
" exists: '%s'", " exists: '%s'",
PathName().c_str(), s.ToString().c_str(), PathName().c_str(), s.ToString().c_str(),
env->FileExists(PathName()).ToString().c_str()); env->FileExists(PathName()).ToString().c_str());
return nullptr; return s;
} }
ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile), ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile),
PathName()); PathName());
*reader = ra_file_reader_;
*fresh_open = true; *fresh_open = true;
return ra_file_reader_; return s;
} }
Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {

View File

@ -181,6 +181,10 @@ class BlobFile {
// footer_valid_ to false and return Status::OK. // footer_valid_ to false and return Status::OK.
Status ReadMetadata(Env* env, const EnvOptions& env_options); Status ReadMetadata(Env* env, const EnvOptions& env_options);
Status GetReader(Env* env, const EnvOptions& env_options,
std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open);
private: private:
std::shared_ptr<Reader> OpenRandomAccessReader( std::shared_ptr<Reader> OpenRandomAccessReader(
Env* env, const DBOptions& db_options, Env* env, const DBOptions& db_options,
@ -190,9 +194,6 @@ class BlobFile {
Status WriteFooterAndCloseLocked(); Status WriteFooterAndCloseLocked();
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open);
void CloseRandomAccessLocked(); void CloseRandomAccessLocked();
// this is used, when you are reading only the footer of a // this is used, when you are reading only the footer of a