From d0beadd456739db892d35a570205c14ec6bbbc51 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 10 Oct 2013 00:03:08 -0700 Subject: [PATCH] Env class that can randomly read and write Summary: I have implemented basic simple use case that I need for External Value Store I'm working on. There is a potential for making this prettier by refactoring/combining WritableFile and RandomAccessFile, avoiding some copypasta. However, I decided to implement just the basic functionality, so I can continue working on the other diff. Test Plan: Added a unittest Reviewers: dhruba, haobo, kailiu Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D13365 --- include/rocksdb/env.h | 65 +++++++++++++++++++++++- util/env_posix.cc | 113 +++++++++++++++++++++++++++++++++++++++++- util/env_test.cc | 19 +++++++ 3 files changed, 195 insertions(+), 2 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 612eba60c..3f9023c25 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -28,6 +28,7 @@ class RandomAccessFile; class SequentialFile; class Slice; class WritableFile; +class RandomRWFile; class Options; using std::unique_ptr; @@ -109,6 +110,15 @@ class Env { unique_ptr* result, const EnvOptions& options) = 0; + // Create an object that both reads and writes to a file on + // specified offsets (random access). If file already exists, + // does not overwrite it. On success, stores a pointer to the + // new file in *result and returns OK. On failure stores nullptr + // in *result and returns non-OK. + virtual Status NewRandomRWFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) = 0; + // Returns true iff the named file exists. virtual bool FileExists(const std::string& fname) = 0; @@ -329,7 +339,7 @@ class WritableFile { /* * Sync data and/or metadata as well. - * By default, sync only metadata. + * By default, sync only data. * Override this method for environments where we need to sync * metadata as well. */ @@ -418,6 +428,55 @@ class WritableFile { void operator=(const WritableFile&); }; +// A file abstraction for random reading and writing. +class RandomRWFile { + public: + RandomRWFile() {} + virtual ~RandomRWFile() {} + + // Write data from Slice data to file starting from offset + // Returns IOError on failure, but does not guarantee + // atomicity of a write. Returns OK status on success. + // + // Safe for concurrent use. + virtual Status Write(uint64_t offset, const Slice& data) = 0; + // Read up to "n" bytes from the file starting at "offset". + // "scratch[0..n-1]" may be written by this routine. Sets "*result" + // to the data that was read (including if fewer than "n" bytes were + // successfully read). May set "*result" to point at data in + // "scratch[0..n-1]", so "scratch[0..n-1]" must be live when + // "*result" is used. If an error was encountered, returns a non-OK + // status. + // + // Safe for concurrent use by multiple threads. + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const = 0; + virtual Status Close() = 0; // closes the file + virtual Status Sync() = 0; // sync data + + /* + * Sync data and/or metadata as well. + * By default, sync only data. + * Override this method for environments where we need to sync + * metadata as well. + */ + virtual Status Fsync() { + return Sync(); + } + + /* + * Pre-allocate space for a file. + */ + virtual Status Allocate(off_t offset, off_t len) { + return Status::OK(); + } + + private: + // No copying allowed + RandomRWFile(const RandomRWFile&); + void operator=(const RandomRWFile&); +}; + // An interface for writing log messages. class Logger { public: @@ -497,6 +556,10 @@ class EnvWrapper : public Env { const EnvOptions& options) { return target_->NewWritableFile(f, r, options); } + Status NewRandomRWFile(const std::string& f, unique_ptr* r, + const EnvOptions& options) { + return target_->NewRandomRWFile(f, r, options); + } bool FileExists(const std::string& f) { return target_->FileExists(f); } Status GetChildren(const std::string& dir, std::vector* r) { return target_->GetChildren(dir, r); diff --git a/util/env_posix.cc b/util/env_posix.cc index 068d75190..e8ae8549e 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -565,7 +565,7 @@ class PosixWritableFile : public WritableFile { } virtual Status Append(const Slice& data) { - char* src = (char *)data.data(); + const char* src = data.data(); size_t left = data.size(); Status s; pending_sync_ = true; @@ -709,6 +709,98 @@ class PosixWritableFile : public WritableFile { #endif }; +class PosixRandomRWFile : public RandomRWFile { + private: + const std::string filename_; + int fd_; + bool pending_sync_; + bool pending_fsync_; + + public: + PosixRandomRWFile(const std::string& fname, int fd, + const EnvOptions& options) : + filename_(fname), + fd_(fd), + pending_sync_(false), + pending_fsync_(false) { + assert(!options.use_mmap_writes && !options.use_mmap_reads); + } + + ~PosixRandomRWFile() { + if (fd_ >= 0) { + Close(); + } + } + + virtual Status Write(uint64_t offset, const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + Status s; + pending_sync_ = true; + pending_fsync_ = true; + + while (left != 0) { + ssize_t done = pwrite(fd_, src, left, offset); + if (done < 0) { + return IOError(filename_, errno); + } + + left -= done; + src += done; + offset += done; + } + + return Status::OK(); + } + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + ssize_t r = pread(fd_, scratch, n, static_cast(offset)); + *result = Slice(scratch, (r < 0) ? 0 : r); + if (r < 0) { + s = IOError(filename_, errno); + } + return s; + } + + virtual Status Close() { + Status s = Status::OK(); + if (fd_ >= 0 && close(fd_) < 0) { + s = IOError(filename_, errno); + } + fd_ = -1; + return s; + } + + virtual Status Sync() { + if (pending_sync_ && fdatasync(fd_) < 0) { + return IOError(filename_, errno); + } + pending_sync_ = false; + return Status::OK(); + } + + virtual Status Fsync() { + if (pending_fsync_ && fsync(fd_) < 0) { + return IOError(filename_, errno); + } + pending_fsync_ = false; + pending_sync_ = false; + return Status::OK(); + } + +#ifdef OS_LINUX + virtual Status Allocate(off_t offset, off_t len) { + if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) { + return Status::OK(); + } else { + return IOError(filename_, errno); + } + } +#endif +}; + static int LockOrUnlock(const std::string& fname, int fd, bool lock) { mutex_lockedFiles.Lock(); if (lock) { @@ -855,6 +947,25 @@ class PosixEnv : public Env { return s; } + virtual Status NewRandomRWFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) { + result->reset(); + Status s; + const int fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644); + if (fd < 0) { + s = IOError(fname, errno); + } else { + SetFD_CLOEXEC(fd, &options); + // no support for mmap yet + if (options.use_mmap_writes || options.use_mmap_reads) { + return Status::NotSupported("No support for mmap read/write yet"); + } + result->reset(new PosixRandomRWFile(fname, fd, options)); + } + return s; + } + virtual bool FileExists(const std::string& fname) { return access(fname.c_str(), F_OK) == 0; } diff --git a/util/env_test.cc b/util/env_test.cc index 357edb3a5..2163ad14b 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -357,6 +357,25 @@ TEST(EnvPosixTest, InvalidateCache) { ASSERT_OK(env_->DeleteFile(fname)); } +TEST(EnvPosixTest, PosixRandomRWFileTest) { + EnvOptions soptions; + soptions.use_mmap_writes = soptions.use_mmap_reads = false; + std::string fname = test::TmpDir() + "/" + "testfile"; + + unique_ptr file; + ASSERT_OK(env_->NewRandomRWFile(fname, &file, soptions)); + ASSERT_OK(file.get()->Allocate(0, 10*1024*1024)); + ASSERT_OK(file.get()->Write(100, Slice("Hello world"))); + ASSERT_OK(file.get()->Write(105, Slice("Hello world"))); + ASSERT_OK(file.get()->Sync()); + ASSERT_OK(file.get()->Fsync()); + char scratch[100]; + Slice result; + ASSERT_OK(file.get()->Read(100, 16, &result, scratch)); + ASSERT_EQ(result.compare("HelloHello world"), 0); + ASSERT_OK(file.get()->Close()); +} + } // namespace rocksdb int main(int argc, char** argv) {