BlobDB: handle IO error on write (#4580)

Summary:
A fix similar to #4410 but on the write path. On IO error on `SelectBlobFile()` we didn't return error code properly, but simply a nullptr of `BlobFile`. The `AppendBlob()` method didn't have null check for the pointer and caused crash. The fix make sure we properly return error code in this case.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4580

Differential Revision: D10513849

Pulled By: yiwu-arbug

fbshipit-source-id: 80bca920d1d7a3541149de981015ad83e0aa14b5
This commit is contained in:
Yi Wu 2018-10-23 15:01:36 -07:00 committed by Facebook Github Bot
parent 742302a1a3
commit c7a45ca91f
5 changed files with 160 additions and 95 deletions

View File

@ -197,6 +197,27 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
return s;
}
Status FaultInjectionTestEnv::ReopenWritableFile(
const std::string& fname, unique_ptr<WritableFile>* result,
const EnvOptions& soptions) {
if (!IsFilesystemActive()) {
return GetError();
}
Status s = target()->ReopenWritableFile(fname, result, soptions);
if (s.ok()) {
result->reset(new TestWritableFile(fname, std::move(*result), this));
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = GetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
}
return s;
}
Status FaultInjectionTestEnv::NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) {

View File

@ -110,6 +110,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status ReopenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;

View File

@ -404,82 +404,91 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
return (b1 || b2) ? nullptr : (*finditr);
}
std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
const std::shared_ptr<BlobFile>& bfile) {
std::shared_ptr<Writer> writer = bfile->GetWriter();
if (writer) return writer;
Status s = CreateWriterLocked(bfile);
if (!s.ok()) return nullptr;
writer = bfile->GetWriter();
return writer;
Status BlobDBImpl::CheckOrCreateWriterLocked(
const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<Writer>* writer) {
assert(writer != nullptr);
*writer = blob_file->GetWriter();
if (*writer != nullptr) {
return Status::OK();
}
Status s = CreateWriterLocked(blob_file);
if (s.ok()) {
*writer = blob_file->GetWriter();
}
return s;
}
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
assert(blob_file != nullptr);
{
ReadLock rl(&mutex_);
if (open_non_ttl_file_ != nullptr) {
return open_non_ttl_file_;
*blob_file = open_non_ttl_file_;
return Status::OK();
}
}
// CHECK again
WriteLock wl(&mutex_);
if (open_non_ttl_file_ != nullptr) {
return open_non_ttl_file_;
*blob_file = open_non_ttl_file_;
return Status::OK();
}
std::shared_ptr<BlobFile> bfile = NewBlobFile("SelectBlobFile");
assert(bfile);
*blob_file = NewBlobFile("SelectBlobFile");
assert(*blob_file != nullptr);
// file not visible, hence no lock
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
if (!writer) {
std::shared_ptr<Writer> writer;
Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to get writer from blob file: %s",
bfile->PathName().c_str());
return nullptr;
"Failed to get writer from blob file: %s, error: %s",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
bfile->file_size_ = BlobLogHeader::kSize;
bfile->header_.compression = bdb_options_.compression;
bfile->header_.has_ttl = false;
bfile->header_.column_family_id =
(*blob_file)->file_size_ = BlobLogHeader::kSize;
(*blob_file)->header_.compression = bdb_options_.compression;
(*blob_file)->header_.has_ttl = false;
(*blob_file)->header_.column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
bfile->header_valid_ = true;
bfile->SetColumnFamilyId(bfile->header_.column_family_id);
bfile->SetHasTTL(false);
bfile->SetCompression(bdb_options_.compression);
(*blob_file)->header_valid_ = true;
(*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
(*blob_file)->SetHasTTL(false);
(*blob_file)->SetCompression(bdb_options_.compression);
Status s = writer->WriteHeader(bfile->header_);
s = writer->WriteHeader((*blob_file)->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to write header to new blob file: %s"
" status: '%s'",
bfile->PathName().c_str(), s.ToString().c_str());
return nullptr;
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
open_non_ttl_file_ = bfile;
blob_files_.insert(
std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
open_non_ttl_file_ = *blob_file;
total_blob_size_ += BlobLogHeader::kSize;
return bfile;
return s;
}
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
std::shared_ptr<BlobFile>* blob_file) {
assert(blob_file != nullptr);
assert(expiration != kNoExpiration);
uint64_t epoch_read = 0;
std::shared_ptr<BlobFile> bfile;
{
ReadLock rl(&mutex_);
bfile = FindBlobFileLocked(expiration);
*blob_file = FindBlobFileLocked(expiration);
epoch_read = epoch_of_.load();
}
if (bfile) {
assert(!bfile->Immutable());
return bfile;
if (*blob_file != nullptr) {
assert(!(*blob_file)->Immutable());
return Status::OK();
}
uint64_t exp_low =
@ -487,61 +496,66 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
bfile = NewBlobFile("SelectBlobFileTTL");
assert(bfile);
*blob_file = NewBlobFile("SelectBlobFileTTL");
assert(*blob_file != nullptr);
ROCKS_LOG_INFO(db_options_.info_log, "New blob file TTL range: %s %d %d",
bfile->PathName().c_str(), exp_low, exp_high);
(*blob_file)->PathName().c_str(), exp_low, exp_high);
LogFlush(db_options_.info_log);
// we don't need to take lock as no other thread is seeing bfile yet
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
if (!writer) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to get writer from blob file with TTL: %s",
bfile->PathName().c_str());
return nullptr;
std::shared_ptr<Writer> writer;
Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to get writer from blob file with TTL: %s, error: %s",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
bfile->header_.expiration_range = expiration_range;
bfile->header_.compression = bdb_options_.compression;
bfile->header_.has_ttl = true;
bfile->header_.column_family_id =
(*blob_file)->header_.expiration_range = expiration_range;
(*blob_file)->header_.compression = bdb_options_.compression;
(*blob_file)->header_.has_ttl = true;
(*blob_file)->header_.column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
;
bfile->header_valid_ = true;
bfile->SetColumnFamilyId(bfile->header_.column_family_id);
bfile->SetHasTTL(true);
bfile->SetCompression(bdb_options_.compression);
bfile->file_size_ = BlobLogHeader::kSize;
(*blob_file)->header_valid_ = true;
(*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
(*blob_file)->SetHasTTL(true);
(*blob_file)->SetCompression(bdb_options_.compression);
(*blob_file)->file_size_ = BlobLogHeader::kSize;
// set the first value of the range, since that is
// concrete at this time. also necessary to add to open_ttl_files_
bfile->expiration_range_ = expiration_range;
(*blob_file)->expiration_range_ = expiration_range;
WriteLock wl(&mutex_);
// in case the epoch has shifted in the interim, then check
// check condition again - should be rare.
if (epoch_of_.load() != epoch_read) {
auto bfile2 = FindBlobFileLocked(expiration);
if (bfile2) return bfile2;
std::shared_ptr<BlobFile> blob_file2 = FindBlobFileLocked(expiration);
if (blob_file2 != nullptr) {
*blob_file = std::move(blob_file2);
return Status::OK();
}
}
Status s = writer->WriteHeader(bfile->header_);
s = writer->WriteHeader((*blob_file)->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to write header to new blob file: %s"
" status: '%s'",
bfile->PathName().c_str(), s.ToString().c_str());
return nullptr;
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
open_ttl_files_.insert(bfile);
blob_files_.insert(
std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
open_ttl_files_.insert(*blob_file);
total_blob_size_ += BlobLogHeader::kSize;
epoch_of_++;
return bfile;
return s;
}
class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
@ -695,36 +709,41 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
return s;
}
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
? SelectBlobFileTTL(expiration)
: SelectBlobFile();
assert(bfile != nullptr);
assert(bfile->compression() == bdb_options_.compression);
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
&index_entry);
if (expiration == kNoExpiration) {
RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
std::shared_ptr<BlobFile> blob_file;
if (expiration != kNoExpiration) {
s = SelectBlobFileTTL(expiration, &blob_file);
} else {
RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
s = SelectBlobFile(&blob_file);
}
if (s.ok()) {
assert(blob_file != nullptr);
assert(blob_file->compression() == bdb_options_.compression);
s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
&index_entry);
}
if (s.ok()) {
if (expiration != kNoExpiration) {
bfile->ExtendExpirationRange(expiration);
blob_file->ExtendExpirationRange(expiration);
}
s = CloseBlobFileIfNeeded(bfile);
if (s.ok()) {
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
index_entry);
s = CloseBlobFileIfNeeded(blob_file);
}
if (s.ok()) {
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
index_entry);
}
if (s.ok()) {
if (expiration == kNoExpiration) {
RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
} else {
RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
}
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
" status: '%s' blob_file: '%s'",
bfile->PathName().c_str(), key.ToString().c_str(),
blob_file->PathName().c_str(), key.ToString().c_str(),
value.size(), s.ToString().c_str(),
bfile->DumpState().c_str());
blob_file->DumpState().c_str());
}
}
@ -867,9 +886,10 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
uint64_t key_offset = 0;
{
WriteLock lockbfile_w(&bfile->mutex_);
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
if (!writer) {
return Status::IOError("Failed to create blob writer");
std::shared_ptr<Writer> writer;
s = CheckOrCreateWriterLocked(bfile, &writer);
if (!s.ok()) {
return s;
}
// write the blob to the blob log.
@ -1574,7 +1594,13 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
reason += bfptr->PathName();
newfile = NewBlobFile(reason);
new_writer = CheckOrCreateWriterLocked(newfile);
s = CheckOrCreateWriterLocked(newfile, &new_writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to open file %s for writer, error: %s",
newfile->PathName().c_str(), s.ToString().c_str());
break;
}
// Can't use header beyond this point
newfile->header_ = std::move(header);
newfile->header_valid_ = true;

View File

@ -255,10 +255,11 @@ class BlobDBImpl : public BlobDB {
// find an existing blob log file based on the expiration unix epoch
// if such a file does not exist, return nullptr
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration);
Status SelectBlobFileTTL(uint64_t expiration,
std::shared_ptr<BlobFile>* blob_file);
// find an existing blob log file to append the value to
std::shared_ptr<BlobFile> SelectBlobFile();
Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
@ -309,8 +310,8 @@ class BlobDBImpl : public BlobDB {
// returns a Writer object for the file. If writer is not
// already present, creates one. Needs Write Mutex to be held
std::shared_ptr<Writer> CheckOrCreateWriterLocked(
const std::shared_ptr<BlobFile>& bfile);
Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<Writer>* writer);
// Iterate through keys and values on Blob and write into
// separate file the remaining blobs and delete/update pointers

View File

@ -374,6 +374,19 @@ TEST_F(BlobDBTest, GetIOError) {
fault_injection_env_->SetFilesystemActive(true);
}
TEST_F(BlobDBTest, PutIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0; // Make sure value write to blob file
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
ASSERT_TRUE(Put("foo", "v1").IsIOError());
fault_injection_env_->SetFilesystemActive(true, Status::IOError());
ASSERT_OK(Put("bar", "v1"));
}
TEST_F(BlobDBTest, WriteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;