diff --git a/db/db_impl.cc b/db/db_impl.cc index 04fb1fe5e..62341a4bf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1235,11 +1235,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, DeletionState& deletion_state) { mutex_.AssertHeld(); assert(imm_.size() != 0); - - if (!imm_.IsFlushPending()) { - Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); - return Status::IOError("FlushMemTableToOutputFile already in progress"); - } + assert(imm_.IsFlushPending()); // Save the contents of the earliest memtable as a new Table uint64_t file_number; @@ -1247,7 +1243,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, imm_.PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); - return Status::IOError("Nothing in memstore to flush"); + return Status::OK(); } // record the logfile_number_ before we release the mutex @@ -1272,14 +1268,19 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, Status s = WriteLevel0Table(mems, edit, &file_number); if (s.ok() && shutting_down_.Acquire_Load()) { - s = Status::IOError( + s = Status::ShutdownInProgress( "Database shutdown started during memtable compaction" ); } + if (!s.ok()) { + imm_.RollbackMemtableFlush(mems, file_number, &pending_outputs_); + return s; + } + // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( - mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, + mems, versions_.get(), &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { @@ -1458,7 +1459,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS); if (seq > versions_->LastSequence()) { - return Status::IOError("Requested sequence not yet written in the db"); + return Status::NotFound( + "Requested sequence not yet written in the db"); } // Get all sorted Wal Files. // Do binary search and open files and find the seq number. @@ -1522,16 +1524,19 @@ Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, if (type == kAliveLogFile) { std::string fname = LogFileName(options_.wal_dir, number); Status status = ReadFirstLine(fname, result); - if (!status.ok()) { - // check if the file got moved to archive. - std::string archived_file = - ArchivedLogFileName(options_.wal_dir, number); - Status s = ReadFirstLine(archived_file, result); - if (!s.ok()) { - return Status::IOError("Log File has been deleted: " + archived_file); - } + if (status.ok() || env_->FileExists(fname)) { + // return OK or any error that is not caused non-existing file + return status; } - return Status::OK(); + + // check if the file got moved to archive. + std::string archived_file = + ArchivedLogFileName(options_.wal_dir, number); + Status s = ReadFirstLine(archived_file, result); + if (s.ok() || env_->FileExists(archived_file)) { + return s; + } + return Status::NotFound("Log File has been deleted: " + archived_file); } else if (type == kArchivedLogFile) { std::string fname = ArchivedLogFileName(options_.wal_dir, number); Status status = ReadFirstLine(fname, result); @@ -1546,12 +1551,17 @@ Status DBImpl::ReadFirstLine(const std::string& fname, Env* env; Logger* info_log; const char* fname; - Status* status; // nullptr if options_.paranoid_checks==false + + Status* status; + bool ignore_error; // true if options_.paranoid_checks==false virtual void Corruption(size_t bytes, const Status& s) { Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == nullptr ? "(ignoring error) " : ""), + (this->ignore_error ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); - if (this->status != nullptr && this->status->ok()) *this->status = s; + if (this->status->ok()) { + // only keep the first error + *this->status = s; + } } }; @@ -1567,23 +1577,30 @@ Status DBImpl::ReadFirstLine(const std::string& fname, reporter.env = env_; reporter.info_log = options_.info_log.get(); reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : nullptr); + reporter.status = &status; + reporter.ignore_error = !options_.paranoid_checks; log::Reader reader(std::move(file), &reporter, true/*checksum*/, 0/*initial_offset*/); std::string scratch; Slice record; - if (reader.ReadRecord(&record, &scratch) && status.ok()) { + if (reader.ReadRecord(&record, &scratch) && + (status.ok() || !options_.paranoid_checks)) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); - return Status::IOError("Corruption noted"); // TODO read record's till the first no corrupt entry? + } else { + WriteBatchInternal::SetContents(batch, record); + return Status::OK(); } - WriteBatchInternal::SetContents(batch, record); - return Status::OK(); } - return Status::IOError("Error reading from file " + fname); + + // ReadRecord returns false on EOF, which is deemed as OK() by Reader + if (status.ok()) { + status = Status::Corruption("eof reached"); + } + return status; } struct CompareLogByPointer { @@ -2219,7 +2236,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->level() + 1); - return Status::IOError("Compaction input files inconsistent"); + return Status::Corruption("Compaction input files inconsistent"); } Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", @@ -2600,7 +2617,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } if (status.ok() && shutting_down_.Acquire_Load()) { - status = Status::IOError("Database shutdown started during compaction"); + status = Status::ShutdownInProgress( + "Database shutdown started during compaction"); } if (status.ok() && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input.get()); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 4548bd298..d58fe5048 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -119,31 +119,33 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { flush_requested_ = false; // start-flush request is complete } +void MemTableList::RollbackMemtableFlush(const autovector& mems, + uint64_t file_number, std::set* pending_outputs) { + assert(!mems.empty()); + + // If the flush was not successful, then just reset state. + // Maybe a suceeding attempt to flush will be successful. + for (MemTable* m : mems) { + assert(m->flush_in_progress_); + assert(m->file_number_ == 0); + + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + } + pending_outputs->erase(file_number); + imm_flush_needed.Release_Store(reinterpret_cast(1)); +} + // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - const autovector& mems, VersionSet* vset, Status flushStatus, + const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, std::set& pending_outputs, autovector* to_delete, Directory* db_directory) { mu->AssertHeld(); - // If the flush was not successful, then just reset state. - // Maybe a suceeding attempt to flush will be successful. - if (!flushStatus.ok()) { - for (MemTable* m : mems) { - assert(m->flush_in_progress_); - assert(m->file_number_ == 0); - - m->flush_in_progress_ = false; - m->flush_completed_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - imm_flush_needed.Release_Store((void *)1); - pending_outputs.erase(file_number); - } - return flushStatus; - } - // flush was sucessful for (size_t i = 0; i < mems.size(); ++i) { // All the edits are associated with the first memtable of this batch. @@ -215,7 +217,6 @@ Status MemTableList::InstallMemtableFlushResults( pending_outputs.erase(m->file_number_); m->file_number_ = 0; imm_flush_needed.Release_Store((void *)1); - s = Status::IOError("Unable to commit flushed memtable"); } ++mem_id; } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) && diff --git a/db/memtable_list.h b/db/memtable_list.h index 01afa5cbe..0bd54235a 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -77,8 +77,8 @@ class MemTableList { MemTableListVersion* current() { return current_; } - // so that backgrund threads can detect non-nullptr pointer to - // determine whether this is anything more to start flushing. + // so that background threads can detect non-nullptr pointer to + // determine whether there is anything more to start flushing. port::AtomicPointer imm_flush_needed; // Returns the total number of memtables in the list @@ -92,11 +92,16 @@ class MemTableList { // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(autovector* mems); + // Reset status of the given memtable list back to pending state so that + // they can get picked up again on the next round of flush. + void RollbackMemtableFlush(const autovector& mems, + uint64_t file_number, + std::set* pending_outputs); + // Commit a successful flush in the manifest file Status InstallMemtableFlushResults(const autovector& m, - VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, - uint64_t file_number, + VersionSet* vset, port::Mutex* mu, + Logger* info_log, uint64_t file_number, std::set& pending_outputs, autovector* to_delete, Directory* db_directory); diff --git a/db/repair.cc b/db/repair.cc index 5a6cba44d..1d5468f25 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -119,7 +119,7 @@ class Repairer { return status; } if (filenames.empty()) { - return Status::IOError(dbname_, "repair found no files"); + return Status::Corruption(dbname_, "repair found no files"); } uint64_t number; diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 092d88caa..df3974967 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -49,9 +49,6 @@ Status TransactionLogIteratorImpl::OpenLogFile( // Try the archive dir, as it could have moved in the meanwhile. fname = ArchivedLogFileName(dir_, logFile->LogNumber()); status = env->NewSequentialFile(fname, file, soptions_); - if (!status.ok()) { - return Status::IOError("Requested file not present in the dir"); - } } return status; } @@ -190,7 +187,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { currentStatus_ = Status::OK(); } else { - currentStatus_ = Status::IOError("NO MORE DATA LEFT"); + currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); } return; } diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index e2304fdb6..dbd41fc9b 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -61,6 +61,10 @@ class Status { static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIncomplete, msg, msg2); } + static Status ShutdownInProgress(const Slice& msg, + const Slice& msg2 = Slice()) { + return Status(kShutdownInProgress, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -86,6 +90,9 @@ class Status { // Returns true iff the status indicates Incomplete bool IsIncomplete() const { return code() == kIncomplete; } + // Returns true iff the status indicates Incomplete + bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -99,7 +106,8 @@ class Status { kInvalidArgument = 4, kIOError = 5, kMergeInProgress = 6, - kIncomplete = 7 + kIncomplete = 7, + kShutdownInProgress = 8 }; // A nullptr state_ (which is always the case for OK) means the message diff --git a/util/blob_store.cc b/util/blob_store.cc index 9f0671281..76230679f 100644 --- a/util/blob_store.cc +++ b/util/blob_store.cc @@ -161,7 +161,7 @@ Status BlobStore::Put(const Slice& value, Blob* blob) { if (size_left > 0) { Delete(*blob); - return Status::IOError("Tried to write more data than fits in the blob"); + return Status::Corruption("Tried to write more data than fits in the blob"); } return Status::OK(); @@ -187,9 +187,13 @@ Status BlobStore::Get(const Blob& blob, chunk.size * block_size_, &result, &value->at(offset)); - if (!s.ok() || result.size() < chunk.size * block_size_) { + if (!s.ok()) { value->clear(); - return Status::IOError("Could not read in from file"); + return s; + } + if (result.size() < chunk.size * block_size_) { + value->clear(); + return Status::Corruption("Could not read in from file"); } offset += chunk.size * block_size_; } @@ -236,7 +240,7 @@ Status BlobStore::CreateNewBucket() { MutexLock l(&buckets_mutex_); if (buckets_size_ >= max_buckets_) { - return Status::IOError("Max size exceeded\n"); + return Status::NotSupported("Max size exceeded\n"); } int new_bucket_id = buckets_size_; diff --git a/util/status.cc b/util/status.cc index 69060a7cc..2a5f05a4b 100644 --- a/util/status.cc +++ b/util/status.cc @@ -60,7 +60,13 @@ std::string Status::ToString() const { type = "IO error: "; break; case kMergeInProgress: - type = "Merge In Progress: "; + type = "Merge in progress: "; + break; + case kIncomplete: + type = "Result incomplete: "; + break; + case kShutdownInProgress: + type = "Shutdown in progress: "; break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index da225e22b..89051f25a 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -857,7 +857,6 @@ void BackupEngineImpl::BackupMeta::Delete() { // // // ... -// TODO: maybe add checksum? Status BackupEngineImpl::BackupMeta::LoadFromFile( const std::string& backup_dir) { assert(Empty()); @@ -873,7 +872,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf.get()); if (!s.ok() || data.size() == max_backup_meta_file_size_) { - return s.ok() ? Status::IOError("File size too big") : s; + return s.ok() ? Status::Corruption("File size too big") : s; } buf[data.size()] = 0; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 89326bfe7..ade2d954f 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -166,7 +166,7 @@ class TestEnv : public EnvWrapper { const EnvOptions& options) { written_files_.push_back(f); if (limit_written_files_ <= 0) { - return Status::IOError("Sorry, can't do this"); + return Status::NotSupported("Sorry, can't do this"); } limit_written_files_--; return EnvWrapper::NewWritableFile(f, r, options);