Split WinEnv into separate classes. (#1128)

For ease of reuse and customization as a library
  without wrapping.
  WinEnvThreads is a class for replacement.
  WintEnvIO is a class for reuse and behavior override.
  Added private virtual functions for custom override
  of fallocate pread for io classes.
This commit is contained in:
Dmitri Smirnov 2016-05-19 16:40:54 -07:00 committed by Siying Dong
parent bb98ca3c80
commit 26adaad438
9 changed files with 2553 additions and 1901 deletions

View File

@ -153,7 +153,9 @@ set(SOURCES
memtable/skiplistrep.cc
memtable/vectorrep.cc
port/stack_trace.cc
port/win/io_win.cc
port/win/env_win.cc
port/win/env_default.cc
port/win/port_win.cc
port/win/win_logger.cc
port/win/xpress_win.cc

42
port/win/env_default.cc Normal file
View File

@ -0,0 +1,42 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <mutex>
#include <rocksdb/env.h>
#include "port/win/env_win.h"
namespace rocksdb {
namespace port {
// We choose to create this on the heap and using std::once for the following
// reasons
// 1) Currently available MS compiler does not implement atomic C++11
// initialization of
// function local statics
// 2) We choose not to destroy the env because joining the threads from the
// system loader
// which destroys the statics (same as from DLLMain) creates a system loader
// dead-lock.
// in this manner any remaining threads are terminated OK.
namespace {
std::once_flag winenv_once_flag;
Env* envptr;
};
}
Env* Env::Default() {
using namespace port;
std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); });
return envptr;
}
}

File diff suppressed because it is too large Load Diff

276
port/win/env_win.h Normal file
View File

@ -0,0 +1,276 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// An Env is an interface used by the rocksdb implementation to access
// operating system functionality like the filesystem etc. Callers
// may wish to provide a custom Env object when opening a database to
// get fine gain control; e.g., to rate limit file system operations.
//
// All Env implementations are safe for concurrent access from
// multiple threads without any external synchronization.
#pragma once
#include <rocksdb/env.h>
#include "util/threadpool.h"
#include <mutex>
#include <vector>
namespace rocksdb {
namespace port {
// Currently not designed for inheritance but rather a replacement
class WinEnvThreads {
public:
explicit WinEnvThreads(Env* hosted_env);
~WinEnvThreads();
WinEnvThreads(const WinEnvThreads&) = delete;
WinEnvThreads& operator=(const WinEnvThreads&) = delete;
void Schedule(void(*function)(void*), void* arg, Env::Priority pri,
void* tag,
void(*unschedFunction)(void* arg));
int UnSchedule(void* arg, Env::Priority pri);
void StartThread(void(*function)(void* arg), void* arg);
void WaitForJoin();
unsigned int GetThreadPoolQueueLen(Env::Priority pri) const;
static uint64_t gettid();
uint64_t GetThreadID() const;
void SleepForMicroseconds(int micros);
// Allow increasing the number of worker threads.
void SetBackgroundThreads(int num, Env::Priority pri);
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri);
private:
Env* hosted_env_;
mutable std::mutex mu_;
std::vector<ThreadPool> thread_pools_;
std::vector<std::thread> threads_to_join_;
};
// Designed for inheritance so can be re-used
// but certain parts replaced
class WinEnvIO {
public:
explicit WinEnvIO(Env* hosted_env);
virtual ~WinEnvIO();
virtual Status DeleteFile(const std::string& fname);
virtual Status GetCurrentTime(int64_t* unix_time);
virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);
virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);
virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);
virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result);
virtual Status FileExists(const std::string& fname);
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result);
virtual Status CreateDir(const std::string& name);
virtual Status CreateDirIfMissing(const std::string& name);
virtual Status DeleteDir(const std::string& name);
virtual Status GetFileSize(const std::string& fname,
uint64_t* size);
static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime);
virtual Status RenameFile(const std::string& src,
const std::string& target);
virtual Status LinkFile(const std::string& src,
const std::string& target);
virtual Status LockFile(const std::string& lockFname,
FileLock** lock);
virtual Status UnlockFile(FileLock* lock);
virtual Status GetTestDirectory(std::string* result);
virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);
virtual uint64_t NowMicros();
virtual uint64_t NowNanos();
virtual Status GetHostName(char* name, uint64_t len);
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path);
virtual std::string TimeToString(uint64_t secondsSince1970);
virtual EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const;
virtual EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const;
size_t GetPageSize() const { return page_size_; }
size_t GetAllocationGranularity() const { return allocation_granularity_; }
uint64_t GetPerfCounterFrequency() const { return perf_counter_frequency_; }
private:
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname);
typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME);
Env* hosted_env_;
size_t page_size_;
size_t allocation_granularity_;
uint64_t perf_counter_frequency_;
FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_;
};
class WinEnv : public Env {
public:
WinEnv();
~WinEnv();
Status DeleteFile(const std::string& fname) override;
Status GetCurrentTime(int64_t* unix_time) override;
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
Status FileExists(const std::string& fname) override;
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
Status CreateDir(const std::string& name) override;
Status CreateDirIfMissing(const std::string& name) override;
Status DeleteDir(const std::string& name) override;
Status GetFileSize(const std::string& fname,
uint64_t* size) override;
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override;
Status RenameFile(const std::string& src,
const std::string& target) override;
Status LinkFile(const std::string& src,
const std::string& target) override;
Status LockFile(const std::string& lockFname,
FileLock** lock) override;
Status UnlockFile(FileLock* lock) override;
Status GetTestDirectory(std::string* result) override;
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;
uint64_t NowMicros() override;
uint64_t NowNanos() override;
Status GetHostName(char* name, uint64_t len) override;
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override;
std::string TimeToString(uint64_t secondsSince1970) override;
Status GetThreadList(
std::vector<ThreadStatus>* thread_list) override;
void Schedule(void(*function)(void*), void* arg, Env::Priority pri,
void* tag,
void(*unschedFunction)(void* arg)) override;
int UnSchedule(void* arg, Env::Priority pri) override;
void StartThread(void(*function)(void* arg), void* arg) override;
void WaitForJoin();
unsigned int GetThreadPoolQueueLen(Env::Priority pri) const override;
uint64_t GetThreadID() const override;
void SleepForMicroseconds(int micros) override;
// Allow increasing the number of worker threads.
void SetBackgroundThreads(int num, Env::Priority pri) override;
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override;
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override;
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override;
private:
WinEnvIO winenv_io_;
WinEnvThreads winenv_threads_;
};
}
}

963
port/win/io_win.cc Normal file
View File

@ -0,0 +1,963 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "port/win/io_win.h"
#include "util/sync_point.h"
#include "util/coding.h"
#include "util/iostats_context_imp.h"
#include "util/sync_point.h"
#include "util/aligned_buffer.h"
namespace rocksdb {
namespace port {
std::string GetWindowsErrSz(DWORD err) {
LPSTR lpMsgBuf;
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, err,
0, // Default language
reinterpret_cast<LPSTR>(&lpMsgBuf), 0, NULL);
std::string Err = lpMsgBuf;
LocalFree(lpMsgBuf);
return Err;
}
// We preserve the original name of this interface to denote the original idea
// behind it.
// All reads happen by a specified offset and pwrite interface does not change
// the position of the file pointer. Judging from the man page and errno it does
// execute
// lseek atomically to return the position of the file back where it was.
// WriteFile() does not
// have this capability. Therefore, for both pread and pwrite the pointer is
// advanced to the next position
// which is fine for writes because they are (should be) sequential.
// Because all the reads/writes happen by the specified offset, the caller in
// theory should not
// rely on the current file offset.
SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes,
uint64_t offset) {
assert(numBytes <= std::numeric_limits<DWORD>::max());
OVERLAPPED overlapped = { 0 };
ULARGE_INTEGER offsetUnion;
offsetUnion.QuadPart = offset;
overlapped.Offset = offsetUnion.LowPart;
overlapped.OffsetHigh = offsetUnion.HighPart;
SSIZE_T result = 0;
unsigned long bytesWritten = 0;
if (FALSE == WriteFile(hFile, src, static_cast<DWORD>(numBytes), &bytesWritten,
&overlapped)) {
result = -1;
} else {
result = bytesWritten;
}
return result;
}
// See comments for pwrite above
SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset) {
assert(numBytes <= std::numeric_limits<DWORD>::max());
OVERLAPPED overlapped = { 0 };
ULARGE_INTEGER offsetUnion;
offsetUnion.QuadPart = offset;
overlapped.Offset = offsetUnion.LowPart;
overlapped.OffsetHigh = offsetUnion.HighPart;
SSIZE_T result = 0;
unsigned long bytesRead = 0;
if (FALSE == ReadFile(hFile, src, static_cast<DWORD>(numBytes), &bytesRead,
&overlapped)) {
return -1;
} else {
result = bytesRead;
}
return result;
}
// SetFileInformationByHandle() is capable of fast pre-allocates.
// However, this does not change the file end position unless the file is
// truncated and the pre-allocated space is not considered filled with zeros.
Status fallocate(const std::string& filename, HANDLE hFile,
uint64_t to_size) {
Status status;
FILE_ALLOCATION_INFO alloc_info;
alloc_info.AllocationSize.QuadPart = to_size;
if (!SetFileInformationByHandle(hFile, FileAllocationInfo, &alloc_info,
sizeof(FILE_ALLOCATION_INFO))) {
auto lastError = GetLastError();
status = IOErrorFromWindowsError(
"Failed to pre-allocate space: " + filename, lastError);
}
return status;
}
Status ftruncate(const std::string& filename, HANDLE hFile,
uint64_t toSize) {
Status status;
FILE_END_OF_FILE_INFO end_of_file;
end_of_file.EndOfFile.QuadPart = toSize;
if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file,
sizeof(FILE_END_OF_FILE_INFO))) {
auto lastError = GetLastError();
status = IOErrorFromWindowsError("Failed to Set end of file: " + filename,
lastError);
}
return status;
}
size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) {
if (max_size < kMaxVarint64Length * 3) {
return 0;
}
BY_HANDLE_FILE_INFORMATION FileInfo;
BOOL result = GetFileInformationByHandle(hFile, &FileInfo);
TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
if (!result) {
return 0;
}
char* rid = id;
rid = EncodeVarint64(rid, uint64_t(FileInfo.dwVolumeSerialNumber));
rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexHigh));
rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexLow));
assert(rid >= id);
return static_cast<size_t>(rid - id);
}
WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap,
const void* mapped_region, size_t length)
: fileName_(fileName),
hFile_(hFile),
hMap_(hMap),
mapped_region_(mapped_region),
length_(length) {}
WinMmapReadableFile::~WinMmapReadableFile() {
BOOL ret = ::UnmapViewOfFile(mapped_region_);
assert(ret);
ret = ::CloseHandle(hMap_);
assert(ret);
ret = ::CloseHandle(hFile_);
assert(ret);
}
Status WinMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
if (offset > length_) {
*result = Slice();
return IOError(fileName_, EINVAL);
} else if (offset + n > length_) {
n = length_ - offset;
}
*result =
Slice(reinterpret_cast<const char*>(mapped_region_)+offset, n);
return s;
}
Status WinMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
return Status::OK();
}
size_t WinMmapReadableFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size);
}
// Can only truncate or reserve to a sector size aligned if
// used on files that are opened with Unbuffered I/O
Status WinMmapFile::TruncateFile(uint64_t toSize) {
return ftruncate(filename_, hFile_, toSize);
}
Status WinMmapFile::UnmapCurrentRegion() {
Status status;
if (mapped_begin_ != nullptr) {
if (!::UnmapViewOfFile(mapped_begin_)) {
status = IOErrorFromWindowsError(
"Failed to unmap file view: " + filename_, GetLastError());
}
// Move on to the next portion of the file
file_offset_ += view_size_;
// UnmapView automatically sends data to disk but not the metadata
// which is good and provides some equivalent of fdatasync() on Linux
// therefore, we donot need separate flag for metadata
mapped_begin_ = nullptr;
mapped_end_ = nullptr;
dst_ = nullptr;
last_sync_ = nullptr;
pending_sync_ = false;
}
return status;
}
Status WinMmapFile::MapNewRegion() {
Status status;
assert(mapped_begin_ == nullptr);
size_t minDiskSize = file_offset_ + view_size_;
if (minDiskSize > reserved_size_) {
status = Allocate(file_offset_, view_size_);
if (!status.ok()) {
return status;
}
}
// Need to remap
if (hMap_ == NULL || reserved_size_ > mapping_size_) {
if (hMap_ != NULL) {
// Unmap the previous one
BOOL ret = ::CloseHandle(hMap_);
assert(ret);
hMap_ = NULL;
}
ULARGE_INTEGER mappingSize;
mappingSize.QuadPart = reserved_size_;
hMap_ = CreateFileMappingA(
hFile_,
NULL, // Security attributes
PAGE_READWRITE, // There is not a write only mode for mapping
mappingSize.HighPart, // Enable mapping the whole file but the actual
// amount mapped is determined by MapViewOfFile
mappingSize.LowPart,
NULL); // Mapping name
if (NULL == hMap_) {
return IOErrorFromWindowsError(
"WindowsMmapFile failed to create file mapping for: " + filename_,
GetLastError());
}
mapping_size_ = reserved_size_;
}
ULARGE_INTEGER offset;
offset.QuadPart = file_offset_;
// View must begin at the granularity aligned offset
mapped_begin_ = reinterpret_cast<char*>(
MapViewOfFileEx(hMap_, FILE_MAP_WRITE, offset.HighPart, offset.LowPart,
view_size_, NULL));
if (!mapped_begin_) {
status = IOErrorFromWindowsError(
"WindowsMmapFile failed to map file view: " + filename_,
GetLastError());
} else {
mapped_end_ = mapped_begin_ + view_size_;
dst_ = mapped_begin_;
last_sync_ = mapped_begin_;
pending_sync_ = false;
}
return status;
}
Status WinMmapFile::PreallocateInternal(uint64_t spaceToReserve) {
return fallocate(filename_, hFile_, spaceToReserve);
}
WinMmapFile::WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size,
size_t allocation_granularity, const EnvOptions& options)
: filename_(fname),
hFile_(hFile),
hMap_(NULL),
page_size_(page_size),
allocation_granularity_(allocation_granularity),
reserved_size_(0),
mapping_size_(0),
view_size_(0),
mapped_begin_(nullptr),
mapped_end_(nullptr),
dst_(nullptr),
last_sync_(nullptr),
file_offset_(0),
pending_sync_(false) {
// Allocation granularity must be obtained from GetSystemInfo() and must be
// a power of two.
assert(allocation_granularity > 0);
assert((allocation_granularity & (allocation_granularity - 1)) == 0);
assert(page_size > 0);
assert((page_size & (page_size - 1)) == 0);
// Only for memory mapped writes
assert(options.use_mmap_writes);
// View size must be both the multiple of allocation_granularity AND the
// page size and the granularity is usually a multiple of a page size.
const size_t viewSize = 32 * 1024; // 32Kb similar to the Windows File Cache in buffered mode
view_size_ = Roundup(viewSize, allocation_granularity_);
}
WinMmapFile::~WinMmapFile() {
if (hFile_) {
this->Close();
}
}
Status WinMmapFile::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
while (left > 0) {
assert(mapped_begin_ <= dst_);
size_t avail = mapped_end_ - dst_;
if (avail == 0) {
Status s = UnmapCurrentRegion();
if (s.ok()) {
s = MapNewRegion();
}
if (!s.ok()) {
return s;
}
} else {
size_t n = std::min(left, avail);
memcpy(dst_, src, n);
dst_ += n;
src += n;
left -= n;
pending_sync_ = true;
}
}
// Now make sure that the last partial page is padded with zeros if needed
size_t bytesToPad = Roundup(size_t(dst_), page_size_) - size_t(dst_);
if (bytesToPad > 0) {
memset(dst_, 0, bytesToPad);
}
return Status::OK();
}
// Means Close() will properly take care of truncate
// and it does not need any additional information
Status WinMmapFile::Truncate(uint64_t size) {
return Status::OK();
}
Status WinMmapFile::Close() {
Status s;
assert(NULL != hFile_);
// We truncate to the precise size so no
// uninitialized data at the end. SetEndOfFile
// which we use does not write zeros and it is good.
uint64_t targetSize = GetFileSize();
if (mapped_begin_ != nullptr) {
// Sync before unmapping to make sure everything
// is on disk and there is not a lazy writing
// so we are deterministic with the tests
Sync();
s = UnmapCurrentRegion();
}
if (NULL != hMap_) {
BOOL ret = ::CloseHandle(hMap_);
if (!ret && s.ok()) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to Close mapping for file: " + filename_, lastError);
}
hMap_ = NULL;
}
if (hFile_ != NULL) {
TruncateFile(targetSize);
BOOL ret = ::CloseHandle(hFile_);
hFile_ = NULL;
if (!ret && s.ok()) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to close file map handle: " + filename_, lastError);
}
}
return s;
}
Status WinMmapFile::Flush() { return Status::OK(); }
// Flush only data
Status WinMmapFile::Sync() {
Status s;
// Some writes occurred since last sync
if (dst_ > last_sync_) {
assert(mapped_begin_);
assert(dst_);
assert(dst_ > mapped_begin_);
assert(dst_ < mapped_end_);
size_t page_begin =
TruncateToPageBoundary(page_size_, last_sync_ - mapped_begin_);
size_t page_end =
TruncateToPageBoundary(page_size_, dst_ - mapped_begin_ - 1);
// Flush only the amount of that is a multiple of pages
if (!::FlushViewOfFile(mapped_begin_ + page_begin,
(page_end - page_begin) + page_size_)) {
s = IOErrorFromWindowsError("Failed to FlushViewOfFile: " + filename_,
GetLastError());
} else {
last_sync_ = dst_;
}
}
return s;
}
/**
* Flush data as well as metadata to stable storage.
*/
Status WinMmapFile::Fsync() {
Status s = Sync();
// Flush metadata
if (s.ok() && pending_sync_) {
if (!::FlushFileBuffers(hFile_)) {
s = IOErrorFromWindowsError("Failed to FlushFileBuffers: " + filename_,
GetLastError());
}
pending_sync_ = false;
}
return s;
}
/**
* Get the size of valid data in the file. This will not match the
* size that is returned from the filesystem because we use mmap
* to extend file by map_size every time.
*/
uint64_t WinMmapFile::GetFileSize() {
size_t used = dst_ - mapped_begin_;
return file_offset_ + used;
}
Status WinMmapFile::InvalidateCache(size_t offset, size_t length) {
return Status::OK();
}
Status WinMmapFile::Allocate(uint64_t offset, uint64_t len) {
Status status;
TEST_KILL_RANDOM("WinMmapFile::Allocate", rocksdb_kill_odds);
// Make sure that we reserve an aligned amount of space
// since the reservation block size is driven outside so we want
// to check if we are ok with reservation here
size_t spaceToReserve = Roundup(offset + len, view_size_);
// Nothing to do
if (spaceToReserve <= reserved_size_) {
return status;
}
IOSTATS_TIMER_GUARD(allocate_nanos);
status = PreallocateInternal(spaceToReserve);
if (status.ok()) {
reserved_size_ = spaceToReserve;
}
return status;
}
size_t WinMmapFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size);
}
WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f,
const EnvOptions& options)
: filename_(fname),
file_(f),
use_os_buffer_(options.use_os_buffer)
{}
WinSequentialFile::~WinSequentialFile() {
assert(file_ != INVALID_HANDLE_VALUE);
CloseHandle(file_);
}
Status WinSequentialFile::Read(size_t n, Slice* result, char* scratch) {
Status s;
size_t r = 0;
// Windows ReadFile API accepts a DWORD.
// While it is possible to read in a loop if n is > UINT_MAX
// it is a highly unlikely case.
if (n > UINT_MAX) {
return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER);
}
DWORD bytesToRead = static_cast<DWORD>(n); //cast is safe due to the check above
DWORD bytesRead = 0;
BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL);
if (ret == TRUE) {
r = bytesRead;
} else {
return IOErrorFromWindowsError(filename_, GetLastError());
}
*result = Slice(scratch, r);
return s;
}
Status WinSequentialFile::Skip(uint64_t n) {
// Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit
// integer. As such it is a highly unlikley case to have n so large.
if (n > _I64_MAX) {
return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER);
}
LARGE_INTEGER li;
li.QuadPart = static_cast<int64_t>(n); //cast is safe due to the check above
BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT);
if (ret == FALSE) {
return IOErrorFromWindowsError(filename_, GetLastError());
}
return Status::OK();
}
Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) {
return Status::OK();
}
SSIZE_T WinRandomAccessFile::ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
AlignedBuffer& buffer, char* dest) const {
assert(buffer.CurrentSize() == 0);
assert(buffer.Capacity() >= bytes_to_read);
SSIZE_T read =
PositionedReadInternal(buffer.Destination(), bytes_to_read, first_page_start);
if (read > 0) {
buffer.Size(read);
// Let's figure out how much we read from the users standpoint
if ((first_page_start + buffer.CurrentSize()) > user_offset) {
assert(first_page_start <= user_offset);
size_t buffer_offset = user_offset - first_page_start;
read = buffer.Read(dest, buffer_offset, left);
} else {
read = 0;
}
left -= read;
}
return read;
}
SSIZE_T WinRandomAccessFile::ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
char* dest) const {
AlignedBuffer bigBuffer;
bigBuffer.Alignment(buffer_.Alignment());
bigBuffer.AllocateNewBuffer(bytes_to_read);
return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left,
bigBuffer, dest);
}
SSIZE_T WinRandomAccessFile::ReadIntoInstanceBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
char* dest) const {
SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read,
left, buffer_, dest);
if (read > 0) {
buffered_start_ = first_page_start;
}
return read;
}
void WinRandomAccessFile::CalculateReadParameters(uint64_t offset, size_t bytes_requested,
size_t& actual_bytes_toread,
uint64_t& first_page_start) const {
const size_t alignment = buffer_.Alignment();
first_page_start = TruncateToPageBoundary(alignment, offset);
const uint64_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
actual_bytes_toread = (last_page_start - first_page_start) + alignment;
}
SSIZE_T WinRandomAccessFile::PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset) const {
return pread(hFile_, src, numBytes, offset);
}
WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options)
: filename_(fname),
hFile_(hFile),
use_os_buffer_(options.use_os_buffer),
read_ahead_(false),
compaction_readahead_size_(options.compaction_readahead_size),
random_access_max_buffer_size_(options.random_access_max_buffer_size),
buffer_(),
buffered_start_(0) {
assert(!options.use_mmap_reads);
// Unbuffered access, use internal buffer for reads
if (!use_os_buffer_) {
// Do not allocate the buffer either until the first request or
// until there is a call to allocate a read-ahead buffer
buffer_.Alignment(alignment);
}
}
WinRandomAccessFile::~WinRandomAccessFile() {
if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) {
::CloseHandle(hFile_);
}
}
void WinRandomAccessFile::EnableReadAhead() { this->Hint(SEQUENTIAL); }
Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
SSIZE_T r = -1;
size_t left = n;
char* dest = scratch;
if (n == 0) {
*result = Slice(scratch, 0);
return s;
}
// When in unbuffered mode we need to do the following changes:
// - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment
if (!use_os_buffer_) {
uint64_t first_page_start = 0;
size_t actual_bytes_toread = 0;
size_t bytes_requested = left;
if (!read_ahead_ && random_access_max_buffer_size_ == 0) {
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread,
first_page_start);
assert(actual_bytes_toread > 0);
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
std::unique_lock<std::mutex> lock(buffer_mut_);
// Let's see if at least some of the requested data is already
// in the buffer
if (offset >= buffered_start_ &&
offset < (buffered_start_ + buffer_.CurrentSize())) {
size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left);
assert(r >= 0);
left -= size_t(r);
offset += r;
dest += r;
}
// Still some left or none was buffered
if (left > 0) {
// Figure out the start/end offset for reading and amount to read
bytes_requested = left;
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
bytes_requested = compaction_readahead_size_;
}
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread,
first_page_start);
assert(actual_bytes_toread > 0);
if (buffer_.Capacity() < actual_bytes_toread) {
// If we are in read-ahead mode or the requested size
// exceeds max buffer size then use one-shot
// big buffer otherwise reallocate main buffer
if (read_ahead_ ||
(actual_bytes_toread > random_access_max_buffer_size_)) {
// Unlock the mutex since we are not using instance buffer
lock.unlock();
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
buffer_.AllocateNewBuffer(actual_bytes_toread);
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
} else {
buffer_.Clear();
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
}
}
} else {
r = PositionedReadInternal(scratch, left, offset);
if (r > 0) {
left -= r;
}
}
*result = Slice(scratch, (r < 0) ? 0 : n - left);
if (r < 0) {
s = IOErrorFromLastWindowsError(filename_);
}
return s;
}
bool WinRandomAccessFile::ShouldForwardRawRequest() const {
return true;
}
void WinRandomAccessFile::Hint(AccessPattern pattern) {
if (pattern == SEQUENTIAL && !use_os_buffer_ &&
compaction_readahead_size_ > 0) {
std::lock_guard<std::mutex> lg(buffer_mut_);
if (!read_ahead_) {
read_ahead_ = true;
// This would allocate read-ahead size + 2 alignments
// - one for memory alignment which added implicitly by AlignedBuffer
// - We add one more alignment because we will read one alignment more
// from disk
buffer_.AllocateNewBuffer(compaction_readahead_size_ +
buffer_.Alignment());
}
}
}
Status WinRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
return Status::OK();
}
size_t WinRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size);
}
Status WinWritableFile::PreallocateInternal(uint64_t spaceToReserve) {
return fallocate(filename_, hFile_, spaceToReserve);
}
WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
size_t capacity, const EnvOptions& options)
: filename_(fname),
hFile_(hFile),
use_os_buffer_(options.use_os_buffer),
alignment_(alignment),
filesize_(0),
reservedsize_(0) {
assert(!options.use_mmap_writes);
}
WinWritableFile::~WinWritableFile() {
if (NULL != hFile_ && INVALID_HANDLE_VALUE != hFile_) {
WinWritableFile::Close();
}
}
// Indicates if the class makes use of unbuffered I/O
bool WinWritableFile::UseOSBuffer() const {
return use_os_buffer_;
}
size_t WinWritableFile::GetRequiredBufferAlignment() const {
return alignment_;
}
Status WinWritableFile::Append(const Slice& data) {
// Used for buffered access ONLY
assert(use_os_buffer_);
assert(data.size() < std::numeric_limits<DWORD>::max());
Status s;
DWORD bytesWritten = 0;
if (!WriteFile(hFile_, data.data(),
static_cast<DWORD>(data.size()), &bytesWritten, NULL)) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to WriteFile: " + filename_,
lastError);
} else {
assert(size_t(bytesWritten) == data.size());
filesize_ += data.size();
}
return s;
}
Status WinWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
Status s;
SSIZE_T ret = pwrite(hFile_, data.data(), data.size(), offset);
// Error break
if (ret < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to pwrite for: " + filename_, lastError);
} else {
// With positional write it is not clear at all
// if this actually extends the filesize
assert(size_t(ret) == data.size());
filesize_ += data.size();
}
return s;
}
// Need to implement this so the file is truncated correctly
// when buffered and unbuffered mode
Status WinWritableFile::Truncate(uint64_t size) {
Status s = ftruncate(filename_, hFile_, size);
if (s.ok()) {
filesize_ = size;
}
return s;
}
Status WinWritableFile::Close() {
Status s;
assert(INVALID_HANDLE_VALUE != hFile_);
if (fsync(hFile_) < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_,
lastError);
}
if (FALSE == ::CloseHandle(hFile_)) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_,
lastError);
}
hFile_ = INVALID_HANDLE_VALUE;
return s;
}
// write out the cached data to the OS cache
// This is now taken care of the WritableFileWriter
Status WinWritableFile::Flush() {
return Status::OK();
}
Status WinWritableFile::Sync() {
Status s;
// Calls flush buffers
if (fsync(hFile_) < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_,
lastError);
}
return s;
}
Status WinWritableFile::Fsync() { return Sync(); }
uint64_t WinWritableFile::GetFileSize() {
// Double accounting now here with WritableFileWriter
// and this size will be wrong when unbuffered access is used
// but tests implement their own writable files and do not use WritableFileWrapper
// so we need to squeeze a square peg through
// a round hole here.
return filesize_;
}
Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) {
Status status;
TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds);
// Make sure that we reserve an aligned amount of space
// since the reservation block size is driven outside so we want
// to check if we are ok with reservation here
size_t spaceToReserve = Roundup(offset + len, alignment_);
// Nothing to do
if (spaceToReserve <= reservedsize_) {
return status;
}
IOSTATS_TIMER_GUARD(allocate_nanos);
status = PreallocateInternal(spaceToReserve);
if (status.ok()) {
reservedsize_ = spaceToReserve;
}
return status;
}
size_t WinWritableFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size);
}
Status WinDirectory::Fsync() { return Status::OK(); }
WinFileLock::~WinFileLock() {
BOOL ret = ::CloseHandle(hFile_);
assert(ret);
}
}
}

359
port/win/io_win.h Normal file
View File

@ -0,0 +1,359 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <rocksdb/Status.h>
#include <rocksdb/env.h>
#include "util/aligned_buffer.h"
#include <string>
#include <stdint.h>
#include <Windows.h>
#include <mutex>
namespace rocksdb {
namespace port {
std::string GetWindowsErrSz(DWORD err);
inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) {
return Status::IOError(context, GetWindowsErrSz(err));
}
inline Status IOErrorFromLastWindowsError(const std::string& context) {
return IOErrorFromWindowsError(context, GetLastError());
}
inline Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number));
}
// Note the below two do not set errno because they are used only here in this
// file
// on a Windows handle and, therefore, not necessary. Translating GetLastError()
// to errno
// is a sad business
inline int fsync(HANDLE hFile) {
if (!FlushFileBuffers(hFile)) {
return -1;
}
return 0;
}
SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes,
uint64_t offset);
SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset);
Status fallocate(const std::string& filename, HANDLE hFile,
uint64_t to_size);
Status ftruncate(const std::string& filename, HANDLE hFile,
uint64_t toSize);
size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size);
// mmap() based random-access
class WinMmapReadableFile : public RandomAccessFile {
const std::string fileName_;
HANDLE hFile_;
HANDLE hMap_;
const void* mapped_region_;
const size_t length_;
public:
// mapped_region_[0,length-1] contains the mmapped contents of the file.
WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap,
const void* mapped_region, size_t length);
~WinMmapReadableFile();
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
// We preallocate and use memcpy to append new
// data to the file. This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class WinMmapFile : public WritableFile {
private:
const std::string filename_;
HANDLE hFile_;
HANDLE hMap_;
const size_t page_size_; // We flush the mapping view in page_size
// increments. We may decide if this is a memory
// page size or SSD page size
const size_t
allocation_granularity_; // View must start at such a granularity
size_t reserved_size_; // Preallocated size
size_t mapping_size_; // The max size of the mapping object
// we want to guess the final file size to minimize the remapping
size_t view_size_; // How much memory to map into a view at a time
char* mapped_begin_; // Must begin at the file offset that is aligned with
// allocation_granularity_
char* mapped_end_;
char* dst_; // Where to write next (in range [mapped_begin_,mapped_end_])
char* last_sync_; // Where have we synced up to
uint64_t file_offset_; // Offset of mapped_begin_ in file
// Do we have unsynced writes?
bool pending_sync_;
// Can only truncate or reserve to a sector size aligned if
// used on files that are opened with Unbuffered I/O
Status TruncateFile(uint64_t toSize);
Status UnmapCurrentRegion();
Status MapNewRegion();
virtual Status PreallocateInternal(uint64_t spaceToReserve);
public:
WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size,
size_t allocation_granularity, const EnvOptions& options);
~WinMmapFile();
virtual Status Append(const Slice& data) override;
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override;
virtual Status Close() override;
virtual Status Flush() override;
// Flush only data
virtual Status Sync() override;
/**
* Flush data as well as metadata to stable storage.
*/
virtual Status Fsync() override;
/**
* Get the size of valid data in the file. This will not match the
* size that is returned from the filesystem because we use mmap
* to extend file by map_size every time.
*/
virtual uint64_t GetFileSize() override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual Status Allocate(uint64_t offset, uint64_t len) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
class WinSequentialFile : public SequentialFile {
private:
const std::string filename_;
HANDLE file_;
// There is no equivalent of advising away buffered pages as in posix.
// To implement this flag we would need to do unbuffered reads which
// will need to be aligned (not sure there is a guarantee that the buffer
// passed in is aligned).
// Hence we currently ignore this flag. It is used only in a few cases
// which should not be perf critical.
// If perf evaluation finds this to be a problem, we can look into
// implementing this.
bool use_os_buffer_;
public:
WinSequentialFile(const std::string& fname, HANDLE f,
const EnvOptions& options);
~WinSequentialFile();
virtual Status Read(size_t n, Slice* result, char* scratch) override;
virtual Status Skip(uint64_t n) override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
};
// pread() based random-access
class WinRandomAccessFile : public RandomAccessFile {
const std::string filename_;
HANDLE hFile_;
const bool use_os_buffer_;
bool read_ahead_;
const size_t compaction_readahead_size_;
const size_t random_access_max_buffer_size_;
mutable std::mutex buffer_mut_;
mutable AlignedBuffer buffer_;
mutable uint64_t
buffered_start_; // file offset set that is currently buffered
/*
* The function reads a requested amount of bytes into the specified aligned
* buffer Upon success the function sets the length of the buffer to the
* amount of bytes actually read even though it might be less than actually
* requested. It then copies the amount of bytes requested by the user (left)
* to the user supplied buffer (dest) and reduces left by the amount of bytes
* copied to the user buffer
*
* @user_offset [in] - offset on disk where the read was requested by the user
* @first_page_start [in] - actual page aligned disk offset that we want to
* read from
* @bytes_to_read [in] - total amount of bytes that will be read from disk
* which is generally greater or equal to the amount
* that the user has requested due to the
* either alignment requirements or read_ahead in
* effect.
* @left [in/out] total amount of bytes that needs to be copied to the user
* buffer. It is reduced by the amount of bytes that actually
* copied
* @buffer - buffer to use
* @dest - user supplied buffer
*/
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
AlignedBuffer& buffer, char* dest) const;
SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
char* dest) const;
SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left,
char* dest) const;
void CalculateReadParameters(uint64_t offset, size_t bytes_requested,
size_t& actual_bytes_toread,
uint64_t& first_page_start) const;
// Override for behavior change
virtual SSIZE_T PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset) const;
public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options);
~WinRandomAccessFile();
virtual void EnableReadAhead() override;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
virtual bool ShouldForwardRawRequest() const override;
virtual void Hint(AccessPattern pattern) override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
// This is a sequential write class. It has been mimicked (as others) after
// the original Posix class. We add support for unbuffered I/O on windows as
// well
// we utilize the original buffer as an alignment buffer to write directly to
// file with no buffering.
// No buffering requires that the provided buffer is aligned to the physical
// sector size (SSD page size) and
// that all SetFilePointer() operations to occur with such an alignment.
// We thus always write in sector/page size increments to the drive and leave
// the tail for the next write OR for Close() at which point we pad with zeros.
// No padding is required for
// buffered access.
class WinWritableFile : public WritableFile {
private:
const std::string filename_;
HANDLE hFile_;
const bool use_os_buffer_; // Used to indicate unbuffered access, the file
const uint64_t alignment_;
// must be opened as unbuffered if false
uint64_t filesize_; // How much data is actually written disk
uint64_t reservedsize_; // how far we have reserved space
virtual Status PreallocateInternal(uint64_t spaceToReserve);
public:
WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
size_t capacity, const EnvOptions& options);
~WinWritableFile();
// Indicates if the class makes use of unbuffered I/O
virtual bool UseOSBuffer() const override;
virtual size_t GetRequiredBufferAlignment() const override;
virtual Status Append(const Slice& data) override;
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
// Need to implement this so the file is truncated correctly
// when buffered and unbuffered mode
virtual Status Truncate(uint64_t size) override;
virtual Status Close() override;
// write out the cached data to the OS cache
// This is now taken care of the WritableFileWriter
virtual Status Flush() override;
virtual Status Sync() override;
virtual Status Fsync() override;
virtual uint64_t GetFileSize() override;
virtual Status Allocate(uint64_t offset, uint64_t len) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
class WinDirectory : public Directory {
public:
WinDirectory() {}
virtual Status Fsync() override;
};
class WinFileLock : public FileLock {
public:
explicit WinFileLock(HANDLE hFile) : hFile_(hFile) {
assert(hFile != NULL);
assert(hFile != INVALID_HANDLE_VALUE);
}
~WinFileLock();
private:
HANDLE hFile_;
};
}
}

View File

@ -69,7 +69,6 @@ typedef SSIZE_T ssize_t;
namespace rocksdb {
#define PREFETCH(addr, rw, locality)
std::string GetWindowsErrSz(DWORD err);
namespace port {

View File

@ -11,6 +11,7 @@
// where enough posix functionality is available.
#include "port/win/win_logger.h"
#include "port/win/io_win.h"
#include <algorithm>
#include <stdio.h>
@ -25,6 +26,8 @@
namespace rocksdb {
namespace port {
WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
const InfoLogLevel log_level)
: Logger(log_level),
@ -152,4 +155,6 @@ void WinLogger::Logv(const char* format, va_list ap) {
size_t WinLogger::GetLogFileSize() const { return log_size_; }
}
} // namespace rocksdb

View File

@ -23,6 +23,8 @@ namespace rocksdb {
class Env;
namespace port {
class WinLogger : public rocksdb::Logger {
public:
WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
@ -55,4 +57,6 @@ class WinLogger : public rocksdb::Logger {
const static uint64_t flush_every_seconds_ = 5;
};
}
} // namespace rocksdb