Merge branch 'master' into columnfamilies

Conflicts:
	db/version_set.cc
	db/version_set.h
This commit is contained in:
Igor Canadi 2014-01-27 11:11:51 -08:00
commit ae16606f98
11 changed files with 124 additions and 29 deletions

View File

@ -886,6 +886,11 @@ Status DBImpl::Recover(
return s;
}
s = env_->NewDirectory(dbname_, &db_directory_);
if (!s.ok()) {
return s;
}
s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
@ -1178,6 +1183,9 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
(unsigned long) meta.number,
(unsigned long) meta.file_size,
s.ToString().c_str());
if (!options_.disableDataSync) {
db_directory_->Fsync();
}
mutex_.Lock();
}
base->Unref();
@ -1272,8 +1280,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free);
mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
if (s.ok()) {
InstallSuperVersion(deletion_state);
@ -1401,7 +1409,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
superversion_to_free = InstallSuperVersion(new_superversion);
new_superversion = nullptr;
@ -1974,7 +1982,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_);
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
InstallSuperVersion(deletion_state);
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
@ -2222,7 +2230,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
return versions_->LogAndApply(compact->compaction->edit(), &mutex_,
db_directory_.get());
}
//
@ -2595,6 +2604,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
input.reset();
if (!options_.disableDataSync) {
db_directory_->Fsync();
}
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
@ -3886,7 +3898,7 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(deletion_state);
}
@ -4032,7 +4044,8 @@ Status DB::OpenWithColumnFamilies(
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {
// set column family handles
@ -4053,6 +4066,7 @@ Status DB::OpenWithColumnFamilies(
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
impl->MaybeScheduleLogDBDeployStats();
s = impl->db_directory_->Fsync();
}
}

View File

@ -440,6 +440,8 @@ class DBImpl : public DB {
std::string host_name_;
std::unique_ptr<Directory> db_directory_;
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch tmp_batch_;

View File

@ -121,12 +121,10 @@ void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
const std::vector<MemTable*> &mems,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete) {
const std::vector<MemTable*>& mems, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, std::vector<MemTable*>* to_delete,
Directory* db_directory) {
mu->AssertHeld();
// If the flush was not successful, then just reset state.
@ -178,7 +176,7 @@ Status MemTableList::InstallMemtableFlushResults(
(unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu);
s = vset->LogAndApply(&m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable

View File

@ -95,7 +95,8 @@ class MemTableList {
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete);
std::vector<MemTable*>* to_delete,
Directory* db_directory);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().

View File

@ -1421,8 +1421,8 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
}
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
VersionEdit* edit,
port::Mutex* mu,
VersionEdit* edit, port::Mutex* mu,
Directory* db_directory,
bool new_descriptor_log) {
mu->AssertHeld();
@ -1545,6 +1545,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// of it later
env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number));
}
if (!options_->disableDataSync && db_directory != nullptr) {
db_directory->Fsync();
}
}
// find offset in manifest file where this version is stored.
@ -1967,7 +1970,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(&ve, &dummy_mutex, true);
return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,

View File

@ -288,15 +288,14 @@ class VersionSet {
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(ColumnFamilyData* column_family_data,
VersionEdit* edit,
port::Mutex* mu,
Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit,
port::Mutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false);
Status LogAndApply(VersionEdit* edit,
port::Mutex* mu,
Status LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory = nullptr,
bool new_descriptor_log = false) {
return LogAndApply(column_family_set_->GetDefault(), edit, mu,
return LogAndApply(column_family_set_->GetDefault(), edit, mu, db_directory,
new_descriptor_log);
}

View File

@ -70,6 +70,9 @@ class HdfsEnv : public Env {
unique_ptr<RandomRWFile>* result,
const EnvOptions& options);
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result);
virtual bool FileExists(const std::string& fname);
virtual Status GetChildren(const std::string& path,
@ -246,6 +249,11 @@ class HdfsEnv : public Env {
return notsup;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return notsup;
}
virtual bool FileExists(const std::string& fname){return false;}
virtual Status GetChildren(const std::string& path,

View File

@ -221,6 +221,11 @@ class WritableFileImpl : public WritableFile {
FileState* file_;
};
class InMemoryDirectory : public Directory {
public:
virtual Status Fsync() { return Status::OK(); }
};
class InMemoryEnv : public EnvWrapper {
public:
explicit InMemoryEnv(Env* base_env) : EnvWrapper(base_env) { }
@ -274,6 +279,12 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
result->reset(new InMemoryDirectory());
return Status::OK();
}
virtual bool FileExists(const std::string& fname) {
MutexLock lock(&mutex_);
return file_map_.find(fname) != file_map_.end();

View File

@ -33,6 +33,7 @@ class SequentialFile;
class Slice;
class WritableFile;
class RandomRWFile;
class Directory;
struct Options;
using std::unique_ptr;
@ -122,6 +123,16 @@ class Env {
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) = 0;
// Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory
// and create a new Directory object.
//
// On success, stores a pointer to the new Directory in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) = 0;
// Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0;
@ -488,6 +499,15 @@ class RandomRWFile {
void operator=(const RandomRWFile&);
};
// Directory object represents collection of files and implements
// filesystem operations that can be executed on directories.
class Directory {
public:
virtual ~Directory() {}
// Fsync directory
virtual Status Fsync() = 0;
};
// An interface for writing log messages.
class Logger {
public:
@ -578,6 +598,10 @@ class EnvWrapper : public Env {
const EnvOptions& options) {
return target_->NewRandomRWFile(f, r, options);
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return target_->NewDirectory(name, result);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
return target_->GetChildren(dir, r);

View File

@ -366,6 +366,11 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname,
return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv");
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return Status::NotSupported("NewDirectory not yet supported on HdfsEnv");
}
bool HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str());
if (value == 0) {

View File

@ -867,6 +867,24 @@ class PosixRandomRWFile : public RandomRWFile {
#endif
};
class PosixDirectory : public Directory {
public:
explicit PosixDirectory(int fd) : fd_(fd) {}
~PosixDirectory() {
close(fd_);
}
virtual Status Fsync() {
if (fsync(fd_) == -1) {
return IOError("directory", errno);
}
return Status::OK();
}
private:
int fd_;
};
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
mutex_lockedFiles.Lock();
if (lock) {
@ -1038,6 +1056,18 @@ class PosixEnv : public Env {
return s;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
result->reset();
const int fd = open(name.c_str(), 0);
if (fd < 0) {
return IOError(name, errno);
} else {
result->reset(new PosixDirectory(fd));
}
return Status::OK();
}
virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0;
}