diff --git a/CMakeLists.txt b/CMakeLists.txt index f3a1a7677..fe97a2487 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -153,7 +153,9 @@ set(SOURCES memtable/skiplistrep.cc memtable/vectorrep.cc port/stack_trace.cc + port/win/io_win.cc port/win/env_win.cc + port/win/env_default.cc port/win/port_win.cc port/win/win_logger.cc port/win/xpress_win.cc diff --git a/port/win/env_default.cc b/port/win/env_default.cc new file mode 100644 index 000000000..09c25c02b --- /dev/null +++ b/port/win/env_default.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include + +#include +#include "port/win/env_win.h" + +namespace rocksdb { +namespace port { + +// We choose to create this on the heap and using std::once for the following +// reasons +// 1) Currently available MS compiler does not implement atomic C++11 +// initialization of +// function local statics +// 2) We choose not to destroy the env because joining the threads from the +// system loader +// which destroys the statics (same as from DLLMain) creates a system loader +// dead-lock. +// in this manner any remaining threads are terminated OK. +namespace { + std::once_flag winenv_once_flag; + Env* envptr; +}; + +} + +Env* Env::Default() { + using namespace port; + std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); + return envptr; +} + +} + diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 1805b7979..9cf01ec53 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -8,14 +8,13 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include -#include #include #include #include -#include -#include -#include +#include // _getpid +#include // _access +#include // _rmdir, _mkdir, _getcwd #include #include @@ -25,15 +24,11 @@ #include "port/port.h" #include "port/dirent.h" #include "port/win/win_logger.h" +#include "port/win/io_win.h" +#include "port/win/env_win.h" -#include "util/random.h" -#include "util/coding.h" #include "util/iostats_context_imp.h" -#include "util/rate_limiter.h" -#include "util/sync_point.h" -#include "util/aligned_buffer.h" -#include "util/threadpool.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" @@ -42,1868 +37,34 @@ namespace rocksdb { -std::string GetWindowsErrSz(DWORD err) { - LPSTR lpMsgBuf; - FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, err, - 0, // Default language - reinterpret_cast(&lpMsgBuf), 0, NULL); - - std::string Err = lpMsgBuf; - LocalFree(lpMsgBuf); - return Err; -} - - ThreadStatusUpdater* CreateThreadStatusUpdater() { return new ThreadStatusUpdater(); } namespace { -const size_t c_OneMB = (1 << 20); - -inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { - return Status::IOError(context, GetWindowsErrSz(err)); -} - -inline Status IOErrorFromLastWindowsError(const std::string& context) { - return IOErrorFromWindowsError(context, GetLastError()); -} - -inline Status IOError(const std::string& context, int err_number) { - return Status::IOError(context, strerror(err_number)); -} - -// TODO(sdong): temp logging. Need to help debugging. Remove it when -// the feature is proved to be stable. -inline void PrintThreadInfo(size_t thread_id, size_t terminatingId) { - fprintf(stdout, "Bg thread %Iu terminates %Iu\n", thread_id, terminatingId); -} - -// returns the ID of the current process -inline int current_process_id() { return _getpid(); } - // RAII helpers for HANDLEs const auto CloseHandleFunc = [](HANDLE h) { ::CloseHandle(h); }; typedef std::unique_ptr UniqueCloseHandlePtr; -// We preserve the original name of this interface to denote the original idea -// behind it. -// All reads happen by a specified offset and pwrite interface does not change -// the position of the file pointer. Judging from the man page and errno it does -// execute -// lseek atomically to return the position of the file back where it was. -// WriteFile() does not -// have this capability. Therefore, for both pread and pwrite the pointer is -// advanced to the next position -// which is fine for writes because they are (should be) sequential. -// Because all the reads/writes happen by the specified offset, the caller in -// theory should not -// rely on the current file offset. -SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, - uint64_t offset) { - assert(numBytes <= std::numeric_limits::max()); - OVERLAPPED overlapped = {0}; - ULARGE_INTEGER offsetUnion; - offsetUnion.QuadPart = offset; - - overlapped.Offset = offsetUnion.LowPart; - overlapped.OffsetHigh = offsetUnion.HighPart; - - SSIZE_T result = 0; - - unsigned long bytesWritten = 0; - - if (FALSE == WriteFile(hFile, src, static_cast(numBytes), &bytesWritten, - &overlapped)) { - result = -1; - } else { - result = bytesWritten; - } - - return result; -} - -// See comments for pwrite above -SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset) { - assert(numBytes <= std::numeric_limits::max()); - OVERLAPPED overlapped = {0}; - ULARGE_INTEGER offsetUnion; - offsetUnion.QuadPart = offset; - - overlapped.Offset = offsetUnion.LowPart; - overlapped.OffsetHigh = offsetUnion.HighPart; - - SSIZE_T result = 0; - - unsigned long bytesRead = 0; - - if (FALSE == ReadFile(hFile, src, static_cast(numBytes), &bytesRead, - &overlapped)) { - return -1; - } else { - result = bytesRead; - } - - return result; -} - -// Note the below two do not set errno because they are used only here in this -// file -// on a Windows handle and, therefore, not necessary. Translating GetLastError() -// to errno -// is a sad business -inline int fsync(HANDLE hFile) { - if (!FlushFileBuffers(hFile)) { - return -1; - } - - return 0; -} - -// SetFileInformationByHandle() is capable of fast pre-allocates. -// However, this does not change the file end position unless the file is -// truncated and the pre-allocated space is not considered filled with zeros. -inline Status fallocate(const std::string& filename, HANDLE hFile, - uint64_t to_size) { - Status status; - - FILE_ALLOCATION_INFO alloc_info; - alloc_info.AllocationSize.QuadPart = to_size; - - if (!SetFileInformationByHandle(hFile, FileAllocationInfo, &alloc_info, - sizeof(FILE_ALLOCATION_INFO))) { - auto lastError = GetLastError(); - status = IOErrorFromWindowsError( - "Failed to pre-allocate space: " + filename, lastError); - } - - return status; -} - -inline Status ftruncate(const std::string& filename, HANDLE hFile, - uint64_t toSize) { - Status status; - - FILE_END_OF_FILE_INFO end_of_file; - end_of_file.EndOfFile.QuadPart = toSize; - - if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file, - sizeof(FILE_END_OF_FILE_INFO))) { - auto lastError = GetLastError(); - status = IOErrorFromWindowsError("Failed to Set end of file: " + filename, - lastError); - } - - return status; -} - -size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) { - - if (max_size < kMaxVarint64Length * 3) { - return 0; - } - - BY_HANDLE_FILE_INFORMATION FileInfo; - - BOOL result = GetFileInformationByHandle(hFile, &FileInfo); - - TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); - - if (!result) { - return 0; - } - - char* rid = id; - rid = EncodeVarint64(rid, uint64_t(FileInfo.dwVolumeSerialNumber)); - rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexHigh)); - rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexLow)); - - assert(rid >= id); - return static_cast(rid - id); -} - -// mmap() based random-access -class WinMmapReadableFile : public RandomAccessFile { - const std::string fileName_; - HANDLE hFile_; - HANDLE hMap_; - - const void* mapped_region_; - const size_t length_; - - public: - // mapped_region_[0,length-1] contains the mmapped contents of the file. - WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, - const void* mapped_region, size_t length) - : fileName_(fileName), - hFile_(hFile), - hMap_(hMap), - mapped_region_(mapped_region), - length_(length) {} - - ~WinMmapReadableFile() { - BOOL ret = ::UnmapViewOfFile(mapped_region_); - assert(ret); - - ret = ::CloseHandle(hMap_); - assert(ret); - - ret = ::CloseHandle(hFile_); - assert(ret); - } - - virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { - Status s; - - if (offset > length_) { - *result = Slice(); - return IOError(fileName_, EINVAL); - } else if (offset + n > length_) { - n = length_ - offset; - } - *result = - Slice(reinterpret_cast(mapped_region_) + offset, n); - return s; - } - - virtual Status InvalidateCache(size_t offset, size_t length) override { - return Status::OK(); - } - - virtual size_t GetUniqueId(char* id, size_t max_size) const override { - return GetUniqueIdFromFile(hFile_, id, max_size); - } -}; - -// We preallocate up to an extra megabyte and use memcpy to append new -// data to the file. This is safe since we either properly close the -// file before reading from it, or for log files, the reading code -// knows enough to skip zero suffixes. -class WinMmapFile : public WritableFile { - private: - const std::string filename_; - HANDLE hFile_; - HANDLE hMap_; - - const size_t page_size_; // We flush the mapping view in page_size - // increments. We may decide if this is a memory - // page size or SSD page size - const size_t - allocation_granularity_; // View must start at such a granularity - - size_t reserved_size_; // Preallocated size - - size_t mapping_size_; // The max size of the mapping object - // we want to guess the final file size to minimize the remapping - size_t view_size_; // How much memory to map into a view at a time - - char* mapped_begin_; // Must begin at the file offset that is aligned with - // allocation_granularity_ - char* mapped_end_; - char* dst_; // Where to write next (in range [mapped_begin_,mapped_end_]) - char* last_sync_; // Where have we synced up to - - uint64_t file_offset_; // Offset of mapped_begin_ in file - - // Do we have unsynced writes? - bool pending_sync_; - - // Can only truncate or reserve to a sector size aligned if - // used on files that are opened with Unbuffered I/O - Status TruncateFile(uint64_t toSize) { - return ftruncate(filename_, hFile_, toSize); - } - - Status UnmapCurrentRegion() { - Status status; - - if (mapped_begin_ != nullptr) { - if (!::UnmapViewOfFile(mapped_begin_)) { - status = IOErrorFromWindowsError( - "Failed to unmap file view: " + filename_, GetLastError()); - } - - // Move on to the next portion of the file - file_offset_ += view_size_; - - // UnmapView automatically sends data to disk but not the metadata - // which is good and provides some equivalent of fdatasync() on Linux - // therefore, we donot need separate flag for metadata - mapped_begin_ = nullptr; - mapped_end_ = nullptr; - dst_ = nullptr; - - last_sync_ = nullptr; - pending_sync_ = false; - } - - return status; - } - - Status MapNewRegion() { - - Status status; - - assert(mapped_begin_ == nullptr); - - size_t minDiskSize = file_offset_ + view_size_; - - if (minDiskSize > reserved_size_) { - status = Allocate(file_offset_, view_size_); - if (!status.ok()) { - return status; - } - } - - // Need to remap - if (hMap_ == NULL || reserved_size_ > mapping_size_) { - - if (hMap_ != NULL) { - // Unmap the previous one - BOOL ret = ::CloseHandle(hMap_); - assert(ret); - hMap_ = NULL; - } - - ULARGE_INTEGER mappingSize; - mappingSize.QuadPart = reserved_size_; - - hMap_ = CreateFileMappingA( - hFile_, - NULL, // Security attributes - PAGE_READWRITE, // There is not a write only mode for mapping - mappingSize.HighPart, // Enable mapping the whole file but the actual - // amount mapped is determined by MapViewOfFile - mappingSize.LowPart, - NULL); // Mapping name - - if (NULL == hMap_) { - return IOErrorFromWindowsError( - "WindowsMmapFile failed to create file mapping for: " + filename_, - GetLastError()); - } - - mapping_size_ = reserved_size_; - } - - ULARGE_INTEGER offset; - offset.QuadPart = file_offset_; - - // View must begin at the granularity aligned offset - mapped_begin_ = reinterpret_cast( - MapViewOfFileEx(hMap_, FILE_MAP_WRITE, offset.HighPart, offset.LowPart, - view_size_, NULL)); - - if (!mapped_begin_) { - status = IOErrorFromWindowsError( - "WindowsMmapFile failed to map file view: " + filename_, - GetLastError()); - } else { - mapped_end_ = mapped_begin_ + view_size_; - dst_ = mapped_begin_; - last_sync_ = mapped_begin_; - pending_sync_ = false; - } - return status; - } - - public: - WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, - size_t allocation_granularity, const EnvOptions& options) - : filename_(fname), - hFile_(hFile), - hMap_(NULL), - page_size_(page_size), - allocation_granularity_(allocation_granularity), - reserved_size_(0), - mapping_size_(0), - view_size_(0), - mapped_begin_(nullptr), - mapped_end_(nullptr), - dst_(nullptr), - last_sync_(nullptr), - file_offset_(0), - pending_sync_(false) { - // Allocation granularity must be obtained from GetSystemInfo() and must be - // a power of two. - assert(allocation_granularity > 0); - assert((allocation_granularity & (allocation_granularity - 1)) == 0); - - assert(page_size > 0); - assert((page_size & (page_size - 1)) == 0); - - // Only for memory mapped writes - assert(options.use_mmap_writes); - - // View size must be both the multiple of allocation_granularity AND the - // page size and the granularity is usually a multiple of a page size. - const size_t viewSize = 32 * 1024; // 32Kb similar to the Windows File Cache in buffered mode - view_size_ = Roundup(viewSize, allocation_granularity_); - } - - ~WinMmapFile() { - if (hFile_) { - this->Close(); - } - } - - virtual Status Append(const Slice& data) override { - const char* src = data.data(); - size_t left = data.size(); - - while (left > 0) { - assert(mapped_begin_ <= dst_); - size_t avail = mapped_end_ - dst_; - - if (avail == 0) { - Status s = UnmapCurrentRegion(); - if (s.ok()) { - s = MapNewRegion(); - } - - if (!s.ok()) { - return s; - } - } else { - size_t n = std::min(left, avail); - memcpy(dst_, src, n); - dst_ += n; - src += n; - left -= n; - pending_sync_ = true; - } - } - - // Now make sure that the last partial page is padded with zeros if needed - size_t bytesToPad = Roundup(size_t(dst_), page_size_) - size_t(dst_); - if (bytesToPad > 0) { - memset(dst_, 0, bytesToPad); - } - - return Status::OK(); - } - - // Means Close() will properly take care of truncate - // and it does not need any additional information - virtual Status Truncate(uint64_t size) override { - return Status::OK(); - } - - virtual Status Close() override { - Status s; - - assert(NULL != hFile_); - - // We truncate to the precise size so no - // uninitialized data at the end. SetEndOfFile - // which we use does not write zeros and it is good. - uint64_t targetSize = GetFileSize(); - - if (mapped_begin_ != nullptr) { - // Sync before unmapping to make sure everything - // is on disk and there is not a lazy writing - // so we are deterministic with the tests - Sync(); - s = UnmapCurrentRegion(); - } - - if (NULL != hMap_) { - BOOL ret = ::CloseHandle(hMap_); - if (!ret && s.ok()) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to Close mapping for file: " + filename_, lastError); - } - - hMap_ = NULL; - } - - if (hFile_ != NULL) { - - TruncateFile(targetSize); - - BOOL ret = ::CloseHandle(hFile_); - hFile_ = NULL; - - if (!ret && s.ok()) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to close file map handle: " + filename_, lastError); - } - } - - return s; - } - - virtual Status Flush() override { return Status::OK(); } - - // Flush only data - virtual Status Sync() override { - Status s; - - // Some writes occurred since last sync - if (dst_ > last_sync_) { - assert(mapped_begin_); - assert(dst_); - assert(dst_ > mapped_begin_); - assert(dst_ < mapped_end_); - - size_t page_begin = - TruncateToPageBoundary(page_size_, last_sync_ - mapped_begin_); - size_t page_end = - TruncateToPageBoundary(page_size_, dst_ - mapped_begin_ - 1); - - // Flush only the amount of that is a multiple of pages - if (!::FlushViewOfFile(mapped_begin_ + page_begin, - (page_end - page_begin) + page_size_)) { - s = IOErrorFromWindowsError("Failed to FlushViewOfFile: " + filename_, - GetLastError()); - } else { - last_sync_ = dst_; - } - } - - return s; - } - - /** - * Flush data as well as metadata to stable storage. - */ - virtual Status Fsync() override { - Status s = Sync(); - - // Flush metadata - if (s.ok() && pending_sync_) { - if (!::FlushFileBuffers(hFile_)) { - s = IOErrorFromWindowsError("Failed to FlushFileBuffers: " + filename_, - GetLastError()); - } - pending_sync_ = false; - } - - return s; - } - - /** - * Get the size of valid data in the file. This will not match the - * size that is returned from the filesystem because we use mmap - * to extend file by map_size every time. - */ - virtual uint64_t GetFileSize() override { - size_t used = dst_ - mapped_begin_; - return file_offset_ + used; - } - - virtual Status InvalidateCache(size_t offset, size_t length) override { - return Status::OK(); - } - - virtual Status Allocate(uint64_t offset, uint64_t len) override { - Status status; - TEST_KILL_RANDOM("WinMmapFile::Allocate", rocksdb_kill_odds); - - // Make sure that we reserve an aligned amount of space - // since the reservation block size is driven outside so we want - // to check if we are ok with reservation here - size_t spaceToReserve = Roundup(offset + len, view_size_); - // Nothing to do - if (spaceToReserve <= reserved_size_) { - return status; - } - - IOSTATS_TIMER_GUARD(allocate_nanos); - status = fallocate(filename_, hFile_, spaceToReserve); - if (status.ok()) { - reserved_size_ = spaceToReserve; - } - return status; - } - - virtual size_t GetUniqueId(char* id, size_t max_size) const override { - return GetUniqueIdFromFile(hFile_, id, max_size); - } -}; - -class WinSequentialFile : public SequentialFile { - private: - const std::string filename_; - HANDLE file_; - - // There is no equivalent of advising away buffered pages as in posix. - // To implement this flag we would need to do unbuffered reads which - // will need to be aligned (not sure there is a guarantee that the buffer - // passed in is aligned). - // Hence we currently ignore this flag. It is used only in a few cases - // which should not be perf critical. - // If perf evaluation finds this to be a problem, we can look into - // implementing this. - bool use_os_buffer_; - - public: - WinSequentialFile(const std::string& fname, HANDLE f, - const EnvOptions& options) - : filename_(fname), - file_(f), - use_os_buffer_(options.use_os_buffer) {} - - virtual ~WinSequentialFile() { - assert(file_ != INVALID_HANDLE_VALUE); - CloseHandle(file_); - } - - virtual Status Read(size_t n, Slice* result, char* scratch) override { - Status s; - size_t r = 0; - - // Windows ReadFile API accepts a DWORD. - // While it is possible to read in a loop if n is > UINT_MAX - // it is a highly unlikely case. - if (n > UINT_MAX) { - return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); - } - - DWORD bytesToRead = static_cast(n); //cast is safe due to the check above - DWORD bytesRead = 0; - BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL); - if (ret == TRUE) { - r = bytesRead; - } else { - return IOErrorFromWindowsError(filename_, GetLastError()); - } - - *result = Slice(scratch, r); - - return s; - } - - virtual Status Skip(uint64_t n) override { - // Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit - // integer. As such it is a highly unlikley case to have n so large. - if (n > _I64_MAX) { - return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); - } - - LARGE_INTEGER li; - li.QuadPart = static_cast(n); //cast is safe due to the check above - BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT); - if (ret == FALSE) { - return IOErrorFromWindowsError(filename_, GetLastError()); - } - return Status::OK(); - } - - virtual Status InvalidateCache(size_t offset, size_t length) override { - return Status::OK(); - } -}; - -// pread() based random-access -class WinRandomAccessFile : public RandomAccessFile { - const std::string filename_; - HANDLE hFile_; - const bool use_os_buffer_; - bool read_ahead_; - const size_t compaction_readahead_size_; - const size_t random_access_max_buffer_size_; - mutable std::mutex buffer_mut_; - mutable AlignedBuffer buffer_; - mutable uint64_t - buffered_start_; // file offset set that is currently buffered - - /* - * The function reads a requested amount of bytes into the specified aligned - * buffer Upon success the function sets the length of the buffer to the - * amount of bytes actually read even though it might be less than actually - * requested. It then copies the amount of bytes requested by the user (left) - * to the user supplied buffer (dest) and reduces left by the amount of bytes - * copied to the user buffer - * - * @user_offset [in] - offset on disk where the read was requested by the user - * @first_page_start [in] - actual page aligned disk offset that we want to - * read from - * @bytes_to_read [in] - total amount of bytes that will be read from disk - * which is generally greater or equal to the amount - * that the user has requested due to the - * either alignment requirements or read_ahead in - * effect. - * @left [in/out] total amount of bytes that needs to be copied to the user - * buffer. It is reduced by the amount of bytes that actually - * copied - * @buffer - buffer to use - * @dest - user supplied buffer - */ - SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, - AlignedBuffer& buffer, char* dest) const { - assert(buffer.CurrentSize() == 0); - assert(buffer.Capacity() >= bytes_to_read); - - SSIZE_T read = - pread(hFile_, buffer.Destination(), bytes_to_read, first_page_start); - - if (read > 0) { - buffer.Size(read); - - // Let's figure out how much we read from the users standpoint - if ((first_page_start + buffer.CurrentSize()) > user_offset) { - assert(first_page_start <= user_offset); - size_t buffer_offset = user_offset - first_page_start; - read = buffer.Read(dest, buffer_offset, left); - } else { - read = 0; - } - left -= read; - } - return read; - } - - SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, - size_t bytes_to_read, size_t& left, - char* dest) const { - AlignedBuffer bigBuffer; - bigBuffer.Alignment(buffer_.Alignment()); - bigBuffer.AllocateNewBuffer(bytes_to_read); - - return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, - bigBuffer, dest); - } - - SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, - uint64_t first_page_start, - size_t bytes_to_read, size_t& left, - char* dest) const { - SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, - left, buffer_, dest); - - if (read > 0) { - buffered_start_ = first_page_start; - } - - return read; - } - - void CalculateReadParameters(uint64_t offset, size_t bytes_requested, - size_t& actual_bytes_toread, - uint64_t& first_page_start) const { - - const size_t alignment = buffer_.Alignment(); - - first_page_start = TruncateToPageBoundary(alignment, offset); - const uint64_t last_page_start = - TruncateToPageBoundary(alignment, offset + bytes_requested - 1); - actual_bytes_toread = (last_page_start - first_page_start) + alignment; - } - - public: - WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, - const EnvOptions& options) - : filename_(fname), - hFile_(hFile), - use_os_buffer_(options.use_os_buffer), - read_ahead_(false), - compaction_readahead_size_(options.compaction_readahead_size), - random_access_max_buffer_size_(options.random_access_max_buffer_size), - buffer_(), - buffered_start_(0) { - assert(!options.use_mmap_reads); - - // Unbuffered access, use internal buffer for reads - if (!use_os_buffer_) { - // Do not allocate the buffer either until the first request or - // until there is a call to allocate a read-ahead buffer - buffer_.Alignment(alignment); - } - } - - virtual ~WinRandomAccessFile() { - if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) { - ::CloseHandle(hFile_); - } - } - - virtual void EnableReadAhead() override { this->Hint(SEQUENTIAL); } - - virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { - - Status s; - SSIZE_T r = -1; - size_t left = n; - char* dest = scratch; - - if (n == 0) { - *result = Slice(scratch, 0); - return s; - } - - // When in unbuffered mode we need to do the following changes: - // - use our own aligned buffer - // - always read at the offset of that is a multiple of alignment - if (!use_os_buffer_) { - - uint64_t first_page_start = 0; - size_t actual_bytes_toread = 0; - size_t bytes_requested = left; - - if (!read_ahead_ && random_access_max_buffer_size_ == 0) { - CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, - first_page_start); - - assert(actual_bytes_toread > 0); - - r = ReadIntoOneShotBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); - } else { - - std::unique_lock lock(buffer_mut_); - - // Let's see if at least some of the requested data is already - // in the buffer - if (offset >= buffered_start_ && - offset < (buffered_start_ + buffer_.CurrentSize())) { - size_t buffer_offset = offset - buffered_start_; - r = buffer_.Read(dest, buffer_offset, left); - assert(r >= 0); - - left -= size_t(r); - offset += r; - dest += r; - } - - // Still some left or none was buffered - if (left > 0) { - // Figure out the start/end offset for reading and amount to read - bytes_requested = left; - - if (read_ahead_ && bytes_requested < compaction_readahead_size_) { - bytes_requested = compaction_readahead_size_; - } - - CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, - first_page_start); - - assert(actual_bytes_toread > 0); - - if (buffer_.Capacity() < actual_bytes_toread) { - // If we are in read-ahead mode or the requested size - // exceeds max buffer size then use one-shot - // big buffer otherwise reallocate main buffer - if (read_ahead_ || - (actual_bytes_toread > random_access_max_buffer_size_)) { - // Unlock the mutex since we are not using instance buffer - lock.unlock(); - r = ReadIntoOneShotBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); - } - else { - buffer_.AllocateNewBuffer(actual_bytes_toread); - r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); - } - } - else { - buffer_.Clear(); - r = ReadIntoInstanceBuffer(offset, first_page_start, - actual_bytes_toread, left, dest); - } - } - } - } else { - r = pread(hFile_, scratch, left, offset); - if (r > 0) { - left -= r; - } - } - - *result = Slice(scratch, (r < 0) ? 0 : n - left); - - if (r < 0) { - s = IOErrorFromLastWindowsError(filename_); - } - return s; - } - - virtual bool ShouldForwardRawRequest() const override { - return true; - } - - virtual void Hint(AccessPattern pattern) override { - if (pattern == SEQUENTIAL && !use_os_buffer_ && - compaction_readahead_size_ > 0) { - std::lock_guard lg(buffer_mut_); - if (!read_ahead_) { - read_ahead_ = true; - // This would allocate read-ahead size + 2 alignments - // - one for memory alignment which added implicitly by AlignedBuffer - // - We add one more alignment because we will read one alignment more - // from disk - buffer_.AllocateNewBuffer(compaction_readahead_size_ + - buffer_.Alignment()); - } - } - } - - virtual Status InvalidateCache(size_t offset, size_t length) override { - return Status::OK(); - } - - virtual size_t GetUniqueId(char* id, size_t max_size) const override { - return GetUniqueIdFromFile(hFile_, id, max_size); - } -}; - -// This is a sequential write class. It has been mimicked (as others) after -// the original Posix class. We add support for unbuffered I/O on windows as -// well -// we utilize the original buffer as an alignment buffer to write directly to -// file with no buffering. -// No buffering requires that the provided buffer is aligned to the physical -// sector size (SSD page size) and -// that all SetFilePointer() operations to occur with such an alignment. -// We thus always write in sector/page size increments to the drive and leave -// the tail for the next write OR for Close() at which point we pad with zeros. -// No padding is required for -// buffered access. -class WinWritableFile : public WritableFile { - private: - const std::string filename_; - HANDLE hFile_; - const bool use_os_buffer_; // Used to indicate unbuffered access, the file - const uint64_t alignment_; - // must be opened as unbuffered if false - uint64_t filesize_; // How much data is actually written disk - uint64_t reservedsize_; // how far we have reserved space - - public: - WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, - size_t capacity, const EnvOptions& options) - : filename_(fname), - hFile_(hFile), - use_os_buffer_(options.use_os_buffer), - alignment_(alignment), - filesize_(0), - reservedsize_(0) { - assert(!options.use_mmap_writes); - } - - ~WinWritableFile() { - if (NULL != hFile_ && INVALID_HANDLE_VALUE != hFile_) { - WinWritableFile::Close(); - } - } - - // Indicates if the class makes use of unbuffered I/O - virtual bool UseOSBuffer() const override { - return use_os_buffer_; - } - - virtual size_t GetRequiredBufferAlignment() const override { - return alignment_; - } - - virtual Status Append(const Slice& data) override { - - // Used for buffered access ONLY - assert(use_os_buffer_); - assert(data.size() < std::numeric_limits::max()); - - Status s; - - DWORD bytesWritten = 0; - if (!WriteFile(hFile_, data.data(), - static_cast(data.size()), &bytesWritten, NULL)) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to WriteFile: " + filename_, - lastError); - } else { - assert(size_t(bytesWritten) == data.size()); - filesize_ += data.size(); - } - - return s; - } - - virtual Status PositionedAppend(const Slice& data, uint64_t offset) override { - Status s; - - SSIZE_T ret = pwrite(hFile_, data.data(), data.size(), offset); - - // Error break - if (ret < 0) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to pwrite for: " + filename_, lastError); - } else { - // With positional write it is not clear at all - // if this actually extends the filesize - assert(size_t(ret) == data.size()); - filesize_ += data.size(); - } - return s; - } - - // Need to implement this so the file is truncated correctly - // when buffered and unbuffered mode - virtual Status Truncate(uint64_t size) override { - Status s = ftruncate(filename_, hFile_, size); - if (s.ok()) { - filesize_ = size; - } - return s; - } - - virtual Status Close() override { - - Status s; - - assert(INVALID_HANDLE_VALUE != hFile_); - - if (fsync(hFile_) < 0) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, - lastError); - } - - if (FALSE == ::CloseHandle(hFile_)) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, - lastError); - } - - hFile_ = INVALID_HANDLE_VALUE; - return s; - } - - // write out the cached data to the OS cache - // This is now taken care of the WritableFileWriter - virtual Status Flush() override { - return Status::OK(); - } - - virtual Status Sync() override { - Status s; - // Calls flush buffers - if (fsync(hFile_) < 0) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, - lastError); - } - return s; - } - - virtual Status Fsync() override { return Sync(); } - - virtual uint64_t GetFileSize() override { - // Double accounting now here with WritableFileWriter - // and this size will be wrong when unbuffered access is used - // but tests implement their own writable files and do not use WritableFileWrapper - // so we need to squeeze a square peg through - // a round hole here. - return filesize_; - } - - virtual Status Allocate(uint64_t offset, uint64_t len) override { - Status status; - TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); - - // Make sure that we reserve an aligned amount of space - // since the reservation block size is driven outside so we want - // to check if we are ok with reservation here - size_t spaceToReserve = Roundup(offset + len, alignment_); - // Nothing to do - if (spaceToReserve <= reservedsize_) { - return status; - } - - IOSTATS_TIMER_GUARD(allocate_nanos); - status = fallocate(filename_, hFile_, spaceToReserve); - if (status.ok()) { - reservedsize_ = spaceToReserve; - } - return status; - } - - virtual size_t GetUniqueId(char* id, size_t max_size) const override { - return GetUniqueIdFromFile(hFile_, id, max_size); - } -}; - -class WinDirectory : public Directory { - public: - WinDirectory() {} - - virtual Status Fsync() override { return Status::OK(); } -}; - -class WinFileLock : public FileLock { - public: - explicit WinFileLock(HANDLE hFile) : hFile_(hFile) { - assert(hFile != NULL); - assert(hFile != INVALID_HANDLE_VALUE); - } - - ~WinFileLock() { - BOOL ret = ::CloseHandle(hFile_); - assert(ret); - } - - private: - HANDLE hFile_; -}; - -namespace { - void WinthreadCall(const char* label, std::error_code result) { if (0 != result.value()) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result.value())); abort(); } } + } -typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME); +namespace port { -class WinEnv : public Env { - public: - WinEnv(); - - virtual ~WinEnv() { - for (auto& th : threads_to_join_) { - th.join(); - } - - threads_to_join_.clear(); - - for (auto& thpool : thread_pools_) { - thpool.JoinAllThreads(); - } - // All threads must be joined before the deletion of - // thread_status_updater_. - delete thread_status_updater_; - } - - virtual Status DeleteFile(const std::string& fname) override { - Status result; - - if (_unlink(fname.c_str())) { - result = IOError("Failed to delete: " + fname, errno); - } - - return result; - } - - Status GetCurrentTime(int64_t* unix_time) override { - time_t time = std::time(nullptr); - if (time == (time_t)(-1)) { - return Status::NotSupported("Failed to get time"); - } - - *unix_time = time; - return Status::OK(); - } - - virtual Status NewSequentialFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) override { - Status s; - - result->reset(); - - // Corruption test needs to rename and delete files of these kind - // while they are still open with another handle. For that reason we - // allow share_write and delete(allows rename). - HANDLE hFile = INVALID_HANDLE_VALUE; - { - IOSTATS_TIMER_GUARD(open_nanos); - hFile = CreateFileA( - fname.c_str(), GENERIC_READ, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, - OPEN_EXISTING, // Original fopen mode is "rb" - FILE_ATTRIBUTE_NORMAL, NULL); - } - - if (INVALID_HANDLE_VALUE == hFile) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname, - lastError); - } else { - result->reset(new WinSequentialFile(fname, hFile, options)); - } - return s; - } - - virtual Status NewRandomAccessFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) override { - result->reset(); - Status s; - - // Open the file for read-only random access - // Random access is to disable read-ahead as the system reads too much data - DWORD fileFlags = FILE_ATTRIBUTE_READONLY; - - if (!options.use_os_buffer && !options.use_mmap_reads) { - fileFlags |= FILE_FLAG_NO_BUFFERING; - } else { - fileFlags |= FILE_FLAG_RANDOM_ACCESS; - } - - /// Shared access is necessary for corruption test to pass - // almost all tests would work with a possible exception of fault_injection - HANDLE hFile = 0; - { - IOSTATS_TIMER_GUARD(open_nanos); - hFile = - CreateFileA(fname.c_str(), GENERIC_READ, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - NULL, OPEN_EXISTING, fileFlags, NULL); - } - - if (INVALID_HANDLE_VALUE == hFile) { - auto lastError = GetLastError(); - return IOErrorFromWindowsError( - "NewRandomAccessFile failed to Create/Open: " + fname, lastError); - } - - UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc); - - // CAUTION! This will map the entire file into the process address space - if (options.use_mmap_reads && sizeof(void*) >= 8) { - // Use mmap when virtual address-space is plentiful. - uint64_t fileSize; - - s = GetFileSize(fname, &fileSize); - - if (s.ok()) { - // Will not map empty files - if (fileSize == 0) { - return IOError( - "NewRandomAccessFile failed to map empty file: " + fname, EINVAL); - } - - HANDLE hMap = CreateFileMappingA(hFile, NULL, PAGE_READONLY, - 0, // Whole file at its present length - 0, - NULL); // Mapping name - - if (!hMap) { - auto lastError = GetLastError(); - return IOErrorFromWindowsError( - "Failed to create file mapping for NewRandomAccessFile: " + fname, - lastError); - } - - UniqueCloseHandlePtr mapGuard(hMap, CloseHandleFunc); - - const void* mapped_region = - MapViewOfFileEx(hMap, FILE_MAP_READ, - 0, // High DWORD of access start - 0, // Low DWORD - fileSize, - NULL); // Let the OS choose the mapping - - if (!mapped_region) { - auto lastError = GetLastError(); - return IOErrorFromWindowsError( - "Failed to MapViewOfFile for NewRandomAccessFile: " + fname, - lastError); - } - - result->reset(new WinMmapReadableFile(fname, hFile, hMap, mapped_region, - fileSize)); - - mapGuard.release(); - fileGuard.release(); - } - } else { - result->reset(new WinRandomAccessFile(fname, hFile, page_size_, options)); - fileGuard.release(); - } - return s; - } - - virtual Status NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) override { - const size_t c_BufferCapacity = 64 * 1024; - - EnvOptions local_options(options); - - result->reset(); - Status s; - - DWORD fileFlags = FILE_ATTRIBUTE_NORMAL; - - if (!local_options.use_os_buffer && !local_options.use_mmap_writes) { - fileFlags = FILE_FLAG_NO_BUFFERING; - } - - // Desired access. We are want to write only here but if we want to memory - // map - // the file then there is no write only mode so we have to create it - // Read/Write - // However, MapViewOfFile specifies only Write only - DWORD desired_access = GENERIC_WRITE; - DWORD shared_mode = FILE_SHARE_READ; - - if (local_options.use_mmap_writes) { - desired_access |= GENERIC_READ; - } else { - // Adding this solely for tests to pass (fault_injection_test, - // wal_manager_test). - shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); - } - - HANDLE hFile = 0; - { - IOSTATS_TIMER_GUARD(open_nanos); - hFile = CreateFileA( - fname.c_str(), - desired_access, // Access desired - shared_mode, - NULL, // Security attributes - CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC - fileFlags, // Flags - NULL); // Template File - } - - if (INVALID_HANDLE_VALUE == hFile) { - auto lastError = GetLastError(); - return IOErrorFromWindowsError( - "Failed to create a NewWriteableFile: " + fname, lastError); - } - - if (options.use_mmap_writes) { - // We usually do not use mmmapping on SSD and thus we pass memory - // page_size - result->reset(new WinMmapFile(fname, hFile, page_size_, - allocation_granularity_, local_options)); - } else { - // Here we want the buffer allocation to be aligned by the SSD page size - // and to be a multiple of it - result->reset(new WinWritableFile(fname, hFile, page_size_, - c_BufferCapacity, local_options)); - } - return s; - } - - virtual Status NewDirectory(const std::string& name, - std::unique_ptr* result) override { - Status s; - // Must be nullptr on failure - result->reset(); - // Must fail if directory does not exist - if (!DirExists(name)) { - s = IOError("Directory does not exist: " + name, EEXIST); - } else { - IOSTATS_TIMER_GUARD(open_nanos); - result->reset(new WinDirectory); - } - return s; - } - - virtual Status FileExists(const std::string& fname) override { - // F_OK == 0 - const int F_OK_ = 0; - return _access(fname.c_str(), F_OK_) == 0 ? Status::OK() - : Status::NotFound(); - } - - virtual Status GetChildren(const std::string& dir, - std::vector* result) override { - std::vector output; - - Status status; - - auto CloseDir = [](DIR* p) { closedir(p); }; - std::unique_ptr dirp(opendir(dir.c_str()), - CloseDir); - - if (!dirp) { - status = IOError(dir, errno); - } else { - if (result->capacity() > 0) { - output.reserve(result->capacity()); - } - - struct dirent* ent = readdir(dirp.get()); - while (ent) { - output.push_back(ent->d_name); - ent = readdir(dirp.get()); - } - } - - output.swap(*result); - - return status; - } - - virtual Status CreateDir(const std::string& name) override { - Status result; - - if (_mkdir(name.c_str()) != 0) { - auto code = errno; - result = IOError("Failed to create dir: " + name, code); - } - - return result; - } - - virtual Status CreateDirIfMissing(const std::string& name) override { - Status result; - - if (DirExists(name)) { - return result; - } - - if (_mkdir(name.c_str()) != 0) { - if (errno == EEXIST) { - result = - Status::IOError("`" + name + "' exists but is not a directory"); - } else { - auto code = errno; - result = IOError("Failed to create dir: " + name, code); - } - } - - return result; - } - - virtual Status DeleteDir(const std::string& name) override { - Status result; - if (_rmdir(name.c_str()) != 0) { - auto code = errno; - result = IOError("Failed to remove dir: " + name, code); - } - return result; - } - - virtual Status GetFileSize(const std::string& fname, - uint64_t* size) override { - Status s; - - WIN32_FILE_ATTRIBUTE_DATA attrs; - if (GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) { - ULARGE_INTEGER file_size; - file_size.HighPart = attrs.nFileSizeHigh; - file_size.LowPart = attrs.nFileSizeLow; - *size = file_size.QuadPart; - } else { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("Can not get size for: " + fname, lastError); - } - return s; - } - - static inline uint64_t FileTimeToUnixTime(const FILETIME& ftTime) { - const uint64_t c_FileTimePerSecond = 10000000U; - // UNIX epoch starts on 1970-01-01T00:00:00Z - // Windows FILETIME starts on 1601-01-01T00:00:00Z - // Therefore, we need to subtract the below number of seconds from - // the seconds that we obtain from FILETIME with an obvious loss of - // precision - const uint64_t c_SecondBeforeUnixEpoch = 11644473600U; - - ULARGE_INTEGER li; - li.HighPart = ftTime.dwHighDateTime; - li.LowPart = ftTime.dwLowDateTime; - - uint64_t result = - (li.QuadPart / c_FileTimePerSecond) - c_SecondBeforeUnixEpoch; - return result; - } - - virtual Status GetFileModificationTime(const std::string& fname, - uint64_t* file_mtime) override { - Status s; - - WIN32_FILE_ATTRIBUTE_DATA attrs; - if (GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) { - *file_mtime = FileTimeToUnixTime(attrs.ftLastWriteTime); - } else { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Can not get file modification time for: " + fname, lastError); - *file_mtime = 0; - } - - return s; - } - - virtual Status RenameFile(const std::string& src, - const std::string& target) override { - Status result; - - // rename() is not capable of replacing the existing file as on Linux - // so use OS API directly - if (!MoveFileExA(src.c_str(), target.c_str(), MOVEFILE_REPLACE_EXISTING)) { - DWORD lastError = GetLastError(); - - std::string text("Failed to rename: "); - text.append(src).append(" to: ").append(target); - - result = IOErrorFromWindowsError(text, lastError); - } - - return result; - } - - virtual Status LinkFile(const std::string& src, - const std::string& target) override { - Status result; - - if (!CreateHardLinkA(target.c_str(), src.c_str(), NULL)) { - DWORD lastError = GetLastError(); - - std::string text("Failed to link: "); - text.append(src).append(" to: ").append(target); - - result = IOErrorFromWindowsError(text, lastError); - } - - return result; - } - - virtual Status LockFile(const std::string& lockFname, - FileLock** lock) override { - assert(lock != nullptr); - - *lock = NULL; - Status result; - - // No-sharing, this is a LOCK file - const DWORD ExclusiveAccessON = 0; - - // Obtain exclusive access to the LOCK file - // Previously, instead of NORMAL attr we set DELETE on close and that worked - // well except with fault_injection test that insists on deleting it. - HANDLE hFile = 0; - { - IOSTATS_TIMER_GUARD(open_nanos); - hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE), - ExclusiveAccessON, NULL, CREATE_ALWAYS, - FILE_ATTRIBUTE_NORMAL, NULL); - } - - if (INVALID_HANDLE_VALUE == hFile) { - auto lastError = GetLastError(); - result = IOErrorFromWindowsError( - "Failed to create lock file: " + lockFname, lastError); - } else { - *lock = new WinFileLock(hFile); - } - - return result; - } - - virtual Status UnlockFile(FileLock* lock) override { - Status result; - - assert(lock != nullptr); - - delete lock; - - return result; - } - - virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW, - void* tag = nullptr, - void (*unschedFunction)(void* arg) = 0) override; - - virtual int UnSchedule(void* arg, Priority pri) override; - - virtual void StartThread(void (*function)(void* arg), void* arg) override; - - virtual void WaitForJoin() override; - - virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; - - virtual Status GetTestDirectory(std::string* result) override { - std::string output; - - const char* env = getenv("TEST_TMPDIR"); - if (env && env[0] != '\0') { - output = env; - CreateDir(output); - } else { - env = getenv("TMP"); - - if (env && env[0] != '\0') { - output = env; - } else { - output = "c:\\tmp"; - } - - CreateDir(output); - } - - output.append("\\testrocksdb-"); - output.append(std::to_string(_getpid())); - - CreateDir(output); - - output.swap(*result); - - return Status::OK(); - } - - virtual Status GetThreadList( - std::vector* thread_list) override { - assert(thread_status_updater_); - return thread_status_updater_->GetThreadList(thread_list); - } - - static uint64_t gettid() { - uint64_t thread_id = GetCurrentThreadId(); - return thread_id; - } - - virtual uint64_t GetThreadID() const override { return gettid(); } - - virtual Status NewLogger(const std::string& fname, - std::shared_ptr* result) override { - Status s; - - result->reset(); - - HANDLE hFile = 0; - { - IOSTATS_TIMER_GUARD(open_nanos); - hFile = CreateFileA( - fname.c_str(), GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are - // renamed and deleted before - // they are closed. This enables - // doing so. - NULL, - CREATE_ALWAYS, // Original fopen mode is "w" - FILE_ATTRIBUTE_NORMAL, NULL); - } - - if (INVALID_HANDLE_VALUE == hFile) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("Failed to open LogFile" + fname, lastError); - } else { - { - // With log files we want to set the true creation time as of now - // because the system - // for some reason caches the attributes of the previous file that just - // been renamed from - // this name so auto_roll_logger_test fails - FILETIME ft; - GetSystemTimeAsFileTime(&ft); - // Set creation, last access and last write time to the same value - SetFileTime(hFile, &ft, &ft, &ft); - } - result->reset(new WinLogger(&WinEnv::gettid, this, hFile)); - } - return s; - } - - virtual uint64_t NowMicros() override { - if (GetSystemTimePreciseAsFileTime_ != NULL) { - // all std::chrono clocks on windows proved to return - // values that may repeat that is not good enough for some uses. - const int64_t c_UnixEpochStartTicks = 116444736000000000i64; - const int64_t c_FtToMicroSec = 10; - - // This interface needs to return system time and not - // just any microseconds because it is often used as an argument - // to TimedWait() on condition variable - FILETIME ftSystemTime; - GetSystemTimePreciseAsFileTime_(&ftSystemTime); - - LARGE_INTEGER li; - li.LowPart = ftSystemTime.dwLowDateTime; - li.HighPart = ftSystemTime.dwHighDateTime; - // Subtract unix epoch start - li.QuadPart -= c_UnixEpochStartTicks; - // Convert to microsecs - li.QuadPart /= c_FtToMicroSec; - return li.QuadPart; - } - using namespace std::chrono; - return duration_cast(system_clock::now().time_since_epoch()).count(); - } - - virtual uint64_t NowNanos() override { - // all std::chrono clocks on windows have the same resolution that is only - // good enough for microseconds but not nanoseconds - // On Windows 8 and Windows 2012 Server - // GetSystemTimePreciseAsFileTime(¤t_time) can be used - LARGE_INTEGER li; - QueryPerformanceCounter(&li); - // Convert to nanoseconds first to avoid loss of precision - // and divide by frequency - li.QuadPart *= std::nano::den; - li.QuadPart /= perf_counter_frequency_; - return li.QuadPart; - } - - virtual void SleepForMicroseconds(int micros) override { - std::this_thread::sleep_for(std::chrono::microseconds(micros)); - } - - virtual Status GetHostName(char* name, uint64_t len) override { - Status s; - DWORD nSize = static_cast( - std::min(len, std::numeric_limits::max())); - - if (!::GetComputerNameA(name, &nSize)) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("GetHostName", lastError); - } else { - name[nSize] = 0; - } - - return s; - } - - virtual Status GetCurrTime(int64_t* unix_time) { - Status s; - - time_t ret = time(nullptr); - if (ret == (time_t)-1) { - *unix_time = 0; - s = IOError("GetCurrTime", errno); - } else { - *unix_time = (int64_t)ret; - } - - return s; - } - - virtual Status GetAbsolutePath(const std::string& db_path, - std::string* output_path) override { - // Check if we already have an absolute path - // that starts with non dot and has a semicolon in it - if ((!db_path.empty() && (db_path[0] == '/' || db_path[0] == '\\')) || - (db_path.size() > 2 && db_path[0] != '.' && - ((db_path[1] == ':' && db_path[2] == '\\') || - (db_path[1] == ':' && db_path[2] == '/')))) { - *output_path = db_path; - return Status::OK(); - } - - std::string result; - result.resize(_MAX_PATH); - - char* ret = _getcwd(&result[0], _MAX_PATH); - if (ret == nullptr) { - return Status::IOError("Failed to get current working directory", - strerror(errno)); - } - - result.resize(strlen(result.data())); - - result.swap(*output_path); - return Status::OK(); - } - - // Allow increasing the number of worker threads. - virtual void SetBackgroundThreads(int num, Priority pri) override { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); - thread_pools_[pri].SetBackgroundThreads(num); - } - - virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); - thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); - } - - virtual std::string TimeToString(uint64_t secondsSince1970) override { - std::string result; - - const time_t seconds = secondsSince1970; - const int maxsize = 64; - - struct tm t; - errno_t ret = localtime_s(&t, &seconds); - - if (ret) { - result = std::to_string(seconds); - } else { - result.resize(maxsize); - char* p = &result[0]; - - int len = snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", - t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, - t.tm_min, t.tm_sec); - assert(len > 0); - - result.resize(len); - } - - return result; - } - - EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, - const DBOptions& db_options) const override { - EnvOptions optimized = env_options; - optimized.use_mmap_writes = false; - optimized.bytes_per_sync = db_options.wal_bytes_per_sync; - optimized.use_os_buffer = - true; // This is because we flush only whole pages on unbuffered io and - // the last records are not guaranteed to be flushed. - // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it - // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit - // test and make this false - optimized.fallocate_with_keep_size = true; - return optimized; - } - - EnvOptions OptimizeForManifestWrite( - const EnvOptions& env_options) const override { - EnvOptions optimized = env_options; - optimized.use_mmap_writes = false; - optimized.use_os_buffer = true; - optimized.fallocate_with_keep_size = true; - return optimized; - } - - private: - // Returns true iff the named directory exists and is a directory. - virtual bool DirExists(const std::string& dname) { - WIN32_FILE_ATTRIBUTE_DATA attrs; - if (GetFileAttributesExA(dname.c_str(), GetFileExInfoStandard, &attrs)) { - return 0 != (attrs.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY); - } - return false; - } - - bool SupportsFastAllocate(const std::string& /* path */) { return false; } - - bool checkedDiskForMmap_; - bool forceMmapOff; // do we override Env options? - size_t page_size_; - size_t allocation_granularity_; - uint64_t perf_counter_frequency_; - std::vector thread_pools_; - mutable std::mutex mu_; - std::vector threads_to_join_; - FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; -}; - -WinEnv::WinEnv() - : checkedDiskForMmap_(false), - forceMmapOff(false), +WinEnvIO::WinEnvIO(Env* hosted_env) + : hosted_env_(hosted_env), page_size_(4 * 1012), allocation_granularity_(page_size_), perf_counter_frequency_(0), - thread_pools_(Priority::TOTAL), GetSystemTimePreciseAsFileTime_(NULL) { - HMODULE module = GetModuleHandle("kernel32.dll"); - if (module != NULL) { - GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( - module, "GetSystemTimePreciseAsFileTime"); - } - SYSTEM_INFO sinfo; GetSystemInfo(&sinfo); @@ -1917,52 +78,707 @@ WinEnv::WinEnv() perf_counter_frequency_ = qpf.QuadPart; } - for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { - thread_pools_[pool_id].SetThreadPriority( - static_cast(pool_id)); - // This allows later initializing the thread-local-env of each thread. - thread_pools_[pool_id].SetHostEnv(this); + HMODULE module = GetModuleHandle("kernel32.dll"); + if (module != NULL) { + GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( + module, "GetSystemTimePreciseAsFileTime"); } - - // Protected member of the base class - thread_status_updater_ = CreateThreadStatusUpdater(); } -void WinEnv::Schedule(void (*function)(void*), void* arg, Priority pri, - void* tag, void (*unschedFunction)(void* arg)) { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); +WinEnvIO::~WinEnvIO() { +} + +Status WinEnvIO::DeleteFile(const std::string& fname) { + Status result; + + if (_unlink(fname.c_str())) { + result = IOError("Failed to delete: " + fname, errno); + } + + return result; +} + +Status WinEnvIO::GetCurrentTime(int64_t* unix_time) { + time_t time = std::time(nullptr); + if (time == (time_t)(-1)) { + return Status::NotSupported("Failed to get time"); + } + + *unix_time = time; + return Status::OK(); +} + +Status WinEnvIO::NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + Status s; + + result->reset(); + + // Corruption test needs to rename and delete files of these kind + // while they are still open with another handle. For that reason we + // allow share_write and delete(allows rename). + HANDLE hFile = INVALID_HANDLE_VALUE; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA( + fname.c_str(), GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, + OPEN_EXISTING, // Original fopen mode is "rb" + FILE_ATTRIBUTE_NORMAL, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname, + lastError); + } else { + result->reset(new WinSequentialFile(fname, hFile, options)); + } + return s; +} + +Status WinEnvIO::NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + result->reset(); + Status s; + + // Open the file for read-only random access + // Random access is to disable read-ahead as the system reads too much data + DWORD fileFlags = FILE_ATTRIBUTE_READONLY; + + if (!options.use_os_buffer && !options.use_mmap_reads) { + fileFlags |= FILE_FLAG_NO_BUFFERING; + } else { + fileFlags |= FILE_FLAG_RANDOM_ACCESS; + } + + /// Shared access is necessary for corruption test to pass + // almost all tests would work with a possible exception of fault_injection + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = + CreateFileA(fname.c_str(), GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, OPEN_EXISTING, fileFlags, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "NewRandomAccessFile failed to Create/Open: " + fname, lastError); + } + + UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc); + + // CAUTION! This will map the entire file into the process address space + if (options.use_mmap_reads && sizeof(void*) >= 8) { + // Use mmap when virtual address-space is plentiful. + uint64_t fileSize; + + s = GetFileSize(fname, &fileSize); + + if (s.ok()) { + // Will not map empty files + if (fileSize == 0) { + return IOError( + "NewRandomAccessFile failed to map empty file: " + fname, EINVAL); + } + + HANDLE hMap = CreateFileMappingA(hFile, NULL, PAGE_READONLY, + 0, // Whole file at its present length + 0, + NULL); // Mapping name + + if (!hMap) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create file mapping for NewRandomAccessFile: " + fname, + lastError); + } + + UniqueCloseHandlePtr mapGuard(hMap, CloseHandleFunc); + + const void* mapped_region = + MapViewOfFileEx(hMap, FILE_MAP_READ, + 0, // High DWORD of access start + 0, // Low DWORD + fileSize, + NULL); // Let the OS choose the mapping + + if (!mapped_region) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to MapViewOfFile for NewRandomAccessFile: " + fname, + lastError); + } + + result->reset(new WinMmapReadableFile(fname, hFile, hMap, mapped_region, + fileSize)); + + mapGuard.release(); + fileGuard.release(); + } + } else { + result->reset(new WinRandomAccessFile(fname, hFile, page_size_, options)); + fileGuard.release(); + } + return s; +} + +Status WinEnvIO::NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + const size_t c_BufferCapacity = 64 * 1024; + + EnvOptions local_options(options); + + result->reset(); + Status s; + + DWORD fileFlags = FILE_ATTRIBUTE_NORMAL; + + if (!local_options.use_os_buffer && !local_options.use_mmap_writes) { + fileFlags = FILE_FLAG_NO_BUFFERING; + } + + // Desired access. We are want to write only here but if we want to memory + // map + // the file then there is no write only mode so we have to create it + // Read/Write + // However, MapViewOfFile specifies only Write only + DWORD desired_access = GENERIC_WRITE; + DWORD shared_mode = FILE_SHARE_READ; + + if (local_options.use_mmap_writes) { + desired_access |= GENERIC_READ; + } else { + // Adding this solely for tests to pass (fault_injection_test, + // wal_manager_test). + shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); + } + + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA( + fname.c_str(), + desired_access, // Access desired + shared_mode, + NULL, // Security attributes + CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC + fileFlags, // Flags + NULL); // Template File + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create a NewWriteableFile: " + fname, lastError); + } + + if (options.use_mmap_writes) { + // We usually do not use mmmapping on SSD and thus we pass memory + // page_size + result->reset(new WinMmapFile(fname, hFile, page_size_, + allocation_granularity_, local_options)); + } else { + // Here we want the buffer allocation to be aligned by the SSD page size + // and to be a multiple of it + result->reset(new WinWritableFile(fname, hFile, page_size_, + c_BufferCapacity, local_options)); + } + return s; +} + +Status WinEnvIO::NewDirectory(const std::string& name, + std::unique_ptr* result) { + Status s; + // Must be nullptr on failure + result->reset(); + // Must fail if directory does not exist + if (!DirExists(name)) { + s = IOError("Directory does not exist: " + name, EEXIST); + } else { + IOSTATS_TIMER_GUARD(open_nanos); + result->reset(new WinDirectory); + } + return s; +} + +Status WinEnvIO::FileExists(const std::string& fname) { + // F_OK == 0 + const int F_OK_ = 0; + return _access(fname.c_str(), F_OK_) == 0 ? Status::OK() + : Status::NotFound(); +} + +Status WinEnvIO::GetChildren(const std::string& dir, + std::vector* result) { + std::vector output; + + Status status; + + auto CloseDir = [](DIR* p) { closedir(p); }; + std::unique_ptr dirp(opendir(dir.c_str()), + CloseDir); + + if (!dirp) { + status = IOError(dir, errno); + } else { + if (result->capacity() > 0) { + output.reserve(result->capacity()); + } + + struct dirent* ent = readdir(dirp.get()); + while (ent) { + output.push_back(ent->d_name); + ent = readdir(dirp.get()); + } + } + + output.swap(*result); + + return status; +} + +Status WinEnvIO::CreateDir(const std::string& name) { + Status result; + + if (_mkdir(name.c_str()) != 0) { + auto code = errno; + result = IOError("Failed to create dir: " + name, code); + } + + return result; +} + +Status WinEnvIO::CreateDirIfMissing(const std::string& name) { + Status result; + + if (DirExists(name)) { + return result; + } + + if (_mkdir(name.c_str()) != 0) { + if (errno == EEXIST) { + result = + Status::IOError("`" + name + "' exists but is not a directory"); + } else { + auto code = errno; + result = IOError("Failed to create dir: " + name, code); + } + } + + return result; +} + +Status WinEnvIO::DeleteDir(const std::string& name) { + Status result; + if (_rmdir(name.c_str()) != 0) { + auto code = errno; + result = IOError("Failed to remove dir: " + name, code); + } + return result; +} + +Status WinEnvIO::GetFileSize(const std::string& fname, + uint64_t* size) { + Status s; + + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) { + ULARGE_INTEGER file_size; + file_size.HighPart = attrs.nFileSizeHigh; + file_size.LowPart = attrs.nFileSizeLow; + *size = file_size.QuadPart; + } else { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Can not get size for: " + fname, lastError); + } + return s; +} + +uint64_t WinEnvIO::FileTimeToUnixTime(const FILETIME& ftTime) { + const uint64_t c_FileTimePerSecond = 10000000U; + // UNIX epoch starts on 1970-01-01T00:00:00Z + // Windows FILETIME starts on 1601-01-01T00:00:00Z + // Therefore, we need to subtract the below number of seconds from + // the seconds that we obtain from FILETIME with an obvious loss of + // precision + const uint64_t c_SecondBeforeUnixEpoch = 11644473600U; + + ULARGE_INTEGER li; + li.HighPart = ftTime.dwHighDateTime; + li.LowPart = ftTime.dwLowDateTime; + + uint64_t result = + (li.QuadPart / c_FileTimePerSecond) - c_SecondBeforeUnixEpoch; + return result; +} + +Status WinEnvIO::GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) { + Status s; + + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) { + *file_mtime = FileTimeToUnixTime(attrs.ftLastWriteTime); + } else { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Can not get file modification time for: " + fname, lastError); + *file_mtime = 0; + } + + return s; +} + +Status WinEnvIO::RenameFile(const std::string& src, + const std::string& target) { + Status result; + + // rename() is not capable of replacing the existing file as on Linux + // so use OS API directly + if (!MoveFileExA(src.c_str(), target.c_str(), MOVEFILE_REPLACE_EXISTING)) { + DWORD lastError = GetLastError(); + + std::string text("Failed to rename: "); + text.append(src).append(" to: ").append(target); + + result = IOErrorFromWindowsError(text, lastError); + } + + return result; +} + +Status WinEnvIO::LinkFile(const std::string& src, + const std::string& target) { + Status result; + + if (!CreateHardLinkA(target.c_str(), src.c_str(), NULL)) { + DWORD lastError = GetLastError(); + + std::string text("Failed to link: "); + text.append(src).append(" to: ").append(target); + + result = IOErrorFromWindowsError(text, lastError); + } + + return result; +} + +Status WinEnvIO::LockFile(const std::string& lockFname, + FileLock** lock) { + assert(lock != nullptr); + + *lock = NULL; + Status result; + + // No-sharing, this is a LOCK file + const DWORD ExclusiveAccessON = 0; + + // Obtain exclusive access to the LOCK file + // Previously, instead of NORMAL attr we set DELETE on close and that worked + // well except with fault_injection test that insists on deleting it. + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE), + ExclusiveAccessON, NULL, CREATE_ALWAYS, + FILE_ATTRIBUTE_NORMAL, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + result = IOErrorFromWindowsError( + "Failed to create lock file: " + lockFname, lastError); + } else { + *lock = new WinFileLock(hFile); + } + + return result; +} + +Status WinEnvIO::UnlockFile(FileLock* lock) { + Status result; + + assert(lock != nullptr); + + delete lock; + + return result; +} + +Status WinEnvIO::GetTestDirectory(std::string* result) { + std::string output; + + const char* env = getenv("TEST_TMPDIR"); + if (env && env[0] != '\0') { + output = env; + CreateDir(output); + } else { + env = getenv("TMP"); + + if (env && env[0] != '\0') { + output = env; + } else { + output = "c:\\tmp"; + } + + CreateDir(output); + } + + output.append("\\testrocksdb-"); + output.append(std::to_string(_getpid())); + + CreateDir(output); + + output.swap(*result); + + return Status::OK(); +} + +Status WinEnvIO::NewLogger(const std::string& fname, + std::shared_ptr* result) { + Status s; + + result->reset(); + + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA( + fname.c_str(), GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are + // renamed and deleted before + // they are closed. This enables + // doing so. + NULL, + CREATE_ALWAYS, // Original fopen mode is "w" + FILE_ATTRIBUTE_NORMAL, NULL); + } + + if (INVALID_HANDLE_VALUE == hFile) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("Failed to open LogFile" + fname, lastError); + } else { + { + // With log files we want to set the true creation time as of now + // because the system + // for some reason caches the attributes of the previous file that just + // been renamed from + // this name so auto_roll_logger_test fails + FILETIME ft; + GetSystemTimeAsFileTime(&ft); + // Set creation, last access and last write time to the same value + SetFileTime(hFile, &ft, &ft, &ft); + } + result->reset(new WinLogger(&WinEnvThreads::gettid, hosted_env_, hFile)); + } + return s; +} + +uint64_t WinEnvIO::NowMicros() { + + if (GetSystemTimePreciseAsFileTime_ != NULL) { + // all std::chrono clocks on windows proved to return + // values that may repeat that is not good enough for some uses. + const int64_t c_UnixEpochStartTicks = 116444736000000000i64; + const int64_t c_FtToMicroSec = 10; + + // This interface needs to return system time and not + // just any microseconds because it is often used as an argument + // to TimedWait() on condition variable + FILETIME ftSystemTime; + GetSystemTimePreciseAsFileTime_(&ftSystemTime); + + LARGE_INTEGER li; + li.LowPart = ftSystemTime.dwLowDateTime; + li.HighPart = ftSystemTime.dwHighDateTime; + // Subtract unix epoch start + li.QuadPart -= c_UnixEpochStartTicks; + // Convert to microsecs + li.QuadPart /= c_FtToMicroSec; + return li.QuadPart; + } + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); +} + +uint64_t WinEnvIO::NowNanos() { + // all std::chrono clocks on windows have the same resolution that is only + // good enough for microseconds but not nanoseconds + // On Windows 8 and Windows 2012 Server + // GetSystemTimePreciseAsFileTime(¤t_time) can be used + LARGE_INTEGER li; + QueryPerformanceCounter(&li); + // Convert to nanoseconds first to avoid loss of precision + // and divide by frequency + li.QuadPart *= std::nano::den; + li.QuadPart /= perf_counter_frequency_; + return li.QuadPart; +} + +Status WinEnvIO::GetHostName(char* name, uint64_t len) { + Status s; + DWORD nSize = static_cast( + std::min(len, std::numeric_limits::max())); + + if (!::GetComputerNameA(name, &nSize)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("GetHostName", lastError); + } else { + name[nSize] = 0; + } + + return s; +} + +Status WinEnvIO::GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + // Check if we already have an absolute path + // that starts with non dot and has a semicolon in it + if ((!db_path.empty() && (db_path[0] == '/' || db_path[0] == '\\')) || + (db_path.size() > 2 && db_path[0] != '.' && + ((db_path[1] == ':' && db_path[2] == '\\') || + (db_path[1] == ':' && db_path[2] == '/')))) { + *output_path = db_path; + return Status::OK(); + } + + std::string result; + result.resize(_MAX_PATH); + + char* ret = _getcwd(&result[0], _MAX_PATH); + if (ret == nullptr) { + return Status::IOError("Failed to get current working directory", + strerror(errno)); + } + + result.resize(strlen(result.data())); + + result.swap(*output_path); + return Status::OK(); +} + +std::string WinEnvIO::TimeToString(uint64_t secondsSince1970) { + std::string result; + + const time_t seconds = secondsSince1970; + const int maxsize = 64; + + struct tm t; + errno_t ret = localtime_s(&t, &seconds); + + if (ret) { + result = std::to_string(seconds); + } else { + result.resize(maxsize); + char* p = &result[0]; + + int len = snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, + t.tm_min, t.tm_sec); + assert(len > 0); + + result.resize(len); + } + + return result; +} + +EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const { + EnvOptions optimized = env_options; + optimized.use_mmap_writes = false; + optimized.bytes_per_sync = db_options.wal_bytes_per_sync; + optimized.use_os_buffer = + true; // This is because we flush only whole pages on unbuffered io and + // the last records are not guaranteed to be flushed. + // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it + // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit + // test and make this false + optimized.fallocate_with_keep_size = true; + return optimized; +} + +EnvOptions WinEnvIO::OptimizeForManifestWrite( + const EnvOptions& env_options) const { + EnvOptions optimized = env_options; + optimized.use_mmap_writes = false; + optimized.use_os_buffer = true; + optimized.fallocate_with_keep_size = true; + return optimized; +} + +// Returns true iff the named directory exists and is a directory. +bool WinEnvIO::DirExists(const std::string& dname) { + WIN32_FILE_ATTRIBUTE_DATA attrs; + if (GetFileAttributesExA(dname.c_str(), GetFileExInfoStandard, &attrs)) { + return 0 != (attrs.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY); + } + return false; +} + +//////////////////////////////////////////////////////////////////////// +// WinEnvThreads + +WinEnvThreads::WinEnvThreads(Env* hosted_env) : hosted_env_(hosted_env), thread_pools_(Env::Priority::TOTAL) { + + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].SetThreadPriority( + static_cast(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(hosted_env); + } +} + +WinEnvThreads::~WinEnvThreads() { + + WaitForJoin(); + + for (auto& thpool : thread_pools_) { + thpool.JoinAllThreads(); + } +} + +void WinEnvThreads::Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, void(*unschedFunction)(void* arg)) { + assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } -int WinEnv::UnSchedule(void* arg, Priority pri) { +int WinEnvThreads::UnSchedule(void* arg, Env::Priority pri) { return thread_pools_[pri].UnSchedule(arg); } -unsigned int WinEnv::GetThreadPoolQueueLen(Priority pri) const { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); - return thread_pools_[pri].GetQueueLen(); -} - namespace { -struct StartThreadState { - void (*user_function)(void*); - void* arg; -}; -} -static void* StartThreadWrapper(void* arg) { - std::unique_ptr state( + struct StartThreadState { + void(*user_function)(void*); + void* arg; + }; + + void* StartThreadWrapper(void* arg) { + std::unique_ptr state( reinterpret_cast(arg)); - state->user_function(state->arg); - return nullptr; + state->user_function(state->arg); + return nullptr; + } + } -void WinEnv::StartThread(void (*function)(void* arg), void* arg) { - StartThreadState* state = new StartThreadState; +void WinEnvThreads::StartThread(void(*function)(void* arg), void* arg) { + std::unique_ptr state(new StartThreadState); state->user_function = function; state->arg = arg; try { - std::thread th(&StartThreadWrapper, state); + + std::thread th(&StartThreadWrapper, state.get()); + state.release(); std::lock_guard lg(mu_); threads_to_join_.push_back(std::move(th)); @@ -1972,15 +788,221 @@ void WinEnv::StartThread(void (*function)(void* arg), void* arg) { } } -void WinEnv::WaitForJoin() { +void WinEnvThreads::WaitForJoin() { for (auto& th : threads_to_join_) { th.join(); } - threads_to_join_.clear(); } -} // namespace +unsigned int WinEnvThreads::GetThreadPoolQueueLen(Env::Priority pri) const { + assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + +uint64_t WinEnvThreads::gettid() { + uint64_t thread_id = GetCurrentThreadId(); + return thread_id; +} + +uint64_t WinEnvThreads::GetThreadID() const { return gettid(); } + +void WinEnvThreads::SleepForMicroseconds(int micros) { + std::this_thread::sleep_for(std::chrono::microseconds(micros)); +} + +void WinEnvThreads::SetBackgroundThreads(int num, Env::Priority pri) { + assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); +} + +void WinEnvThreads::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { + assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); +} + +///////////////////////////////////////////////////////////////////////// +// WinEnv + +WinEnv::WinEnv() : winenv_io_(this), winenv_threads_(this) { + // Protected member of the base class + thread_status_updater_ = CreateThreadStatusUpdater(); +} + + +WinEnv::~WinEnv() { + // All threads must be joined before the deletion of + // thread_status_updater_. + delete thread_status_updater_; +} + +Status WinEnv::GetThreadList( + std::vector* thread_list) { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); +} + +Status WinEnv::DeleteFile(const std::string& fname) { + return winenv_io_.DeleteFile(fname); +} + +Status WinEnv::GetCurrentTime(int64_t* unix_time) { + return winenv_io_.GetCurrentTime(unix_time); +} + +Status WinEnv::NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + return winenv_io_.NewSequentialFile(fname, result, options); +} + +Status WinEnv::NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + return winenv_io_.NewRandomAccessFile(fname, result, options); +} + +Status WinEnv::NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + return winenv_io_.NewWritableFile(fname, result, options); +} + +Status WinEnv::NewDirectory(const std::string& name, + std::unique_ptr* result) { + return winenv_io_.NewDirectory(name, result); +} + +Status WinEnv::FileExists(const std::string& fname) { + return winenv_io_.FileExists(fname); +} + +Status WinEnv::GetChildren(const std::string& dir, + std::vector* result) { + return winenv_io_.GetChildren(dir, result); +} + +Status WinEnv::CreateDir(const std::string& name) { + return winenv_io_.CreateDir(name); +} + +Status WinEnv::CreateDirIfMissing(const std::string& name) { + return winenv_io_.CreateDirIfMissing(name); +} + +Status WinEnv::DeleteDir(const std::string& name) { + return winenv_io_.DeleteDir(name); +} + +Status WinEnv::GetFileSize(const std::string& fname, + uint64_t* size) { + return winenv_io_.GetFileSize(fname, size); +} + +Status WinEnv::GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) { + return winenv_io_.GetFileModificationTime(fname, file_mtime); +} + +Status WinEnv::RenameFile(const std::string& src, + const std::string& target) { + return winenv_io_.RenameFile(src, target); +} + +Status WinEnv::LinkFile(const std::string& src, + const std::string& target) { + return winenv_io_.LinkFile(src, target); +} + +Status WinEnv::LockFile(const std::string& lockFname, + FileLock** lock) { + return winenv_io_.LockFile(lockFname, lock); +} + +Status WinEnv::UnlockFile(FileLock* lock) { + return winenv_io_.UnlockFile(lock); +} + +Status WinEnv::GetTestDirectory(std::string* result) { + return winenv_io_.GetTestDirectory(result); +} + +Status WinEnv::NewLogger(const std::string& fname, + std::shared_ptr* result) { + return winenv_io_.NewLogger(fname, result); +} + +uint64_t WinEnv::NowMicros() { + return winenv_io_.NowMicros(); +} + +uint64_t WinEnv::NowNanos() { + return winenv_io_.NowNanos(); +} + +Status WinEnv::GetHostName(char* name, uint64_t len) { + return winenv_io_.GetHostName(name, len); +} + +Status WinEnv::GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + return winenv_io_.GetAbsolutePath(db_path, output_path); +} + +std::string WinEnv::TimeToString(uint64_t secondsSince1970) { + return winenv_io_.TimeToString(secondsSince1970); +} + +void WinEnv::Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, + void(*unschedFunction)(void* arg)) { + return winenv_threads_.Schedule(function, arg, pri, tag, unschedFunction); +} + +int WinEnv::UnSchedule(void* arg, Env::Priority pri) { + return winenv_threads_.UnSchedule(arg, pri); +} + +void WinEnv::StartThread(void(*function)(void* arg), void* arg) { + return winenv_threads_.StartThread(function, arg); +} + +void WinEnv::WaitForJoin() { + return winenv_threads_.WaitForJoin(); +} + +unsigned int WinEnv::GetThreadPoolQueueLen(Env::Priority pri) const { + return winenv_threads_.GetThreadPoolQueueLen(pri); +} + +uint64_t WinEnv::GetThreadID() const { + return winenv_threads_.GetThreadID(); +} + +void WinEnv::SleepForMicroseconds(int micros) { + return winenv_threads_.SleepForMicroseconds(micros); +} + +// Allow increasing the number of worker threads. +void WinEnv::SetBackgroundThreads(int num, Env::Priority pri) { + return winenv_threads_.SetBackgroundThreads(num, pri); +} + +void WinEnv::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { + return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri); +} + +EnvOptions WinEnv::OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const { + return winenv_io_.OptimizeForLogWrite(env_options, db_options); +} + +EnvOptions WinEnv::OptimizeForManifestWrite( + const EnvOptions& env_options) const { + return winenv_io_.OptimizeForManifestWrite(env_options); +} + +} // namespace port std::string Env::GenerateUniqueId() { std::string result; @@ -2000,24 +1022,4 @@ std::string Env::GenerateUniqueId() { return result; } -// We choose to create this on the heap and using std::once for the following -// reasons -// 1) Currently available MS compiler does not implement atomic C++11 -// initialization of -// function local statics -// 2) We choose not to destroy the env because joining the threads from the -// system loader -// which destroys the statics (same as from DLLMain) creates a system loader -// dead-lock. -// in this manner any remaining threads are terminated OK. -namespace { -std::once_flag winenv_once_flag; -Env* envptr; -}; - -Env* Env::Default() { - std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); - return envptr; -} - } // namespace rocksdb diff --git a/port/win/env_win.h b/port/win/env_win.h new file mode 100644 index 000000000..9b1e012c9 --- /dev/null +++ b/port/win/env_win.h @@ -0,0 +1,276 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// An Env is an interface used by the rocksdb implementation to access +// operating system functionality like the filesystem etc. Callers +// may wish to provide a custom Env object when opening a database to +// get fine gain control; e.g., to rate limit file system operations. +// +// All Env implementations are safe for concurrent access from +// multiple threads without any external synchronization. + +#pragma once + +#include +#include "util/threadpool.h" + +#include +#include + +namespace rocksdb { +namespace port { + +// Currently not designed for inheritance but rather a replacement +class WinEnvThreads { +public: + + explicit WinEnvThreads(Env* hosted_env); + + ~WinEnvThreads(); + + WinEnvThreads(const WinEnvThreads&) = delete; + WinEnvThreads& operator=(const WinEnvThreads&) = delete; + + void Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, + void(*unschedFunction)(void* arg)); + + int UnSchedule(void* arg, Env::Priority pri); + + void StartThread(void(*function)(void* arg), void* arg); + + void WaitForJoin(); + + unsigned int GetThreadPoolQueueLen(Env::Priority pri) const; + + static uint64_t gettid(); + + uint64_t GetThreadID() const; + + void SleepForMicroseconds(int micros); + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Env::Priority pri); + + void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri); + +private: + + Env* hosted_env_; + mutable std::mutex mu_; + std::vector thread_pools_; + std::vector threads_to_join_; + +}; + +// Designed for inheritance so can be re-used +// but certain parts replaced +class WinEnvIO { +public: + explicit WinEnvIO(Env* hosted_env); + + virtual ~WinEnvIO(); + + virtual Status DeleteFile(const std::string& fname); + + virtual Status GetCurrentTime(int64_t* unix_time); + + virtual Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + + virtual Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + + virtual Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + + virtual Status NewDirectory(const std::string& name, + std::unique_ptr* result); + + virtual Status FileExists(const std::string& fname); + + virtual Status GetChildren(const std::string& dir, + std::vector* result); + + virtual Status CreateDir(const std::string& name); + + virtual Status CreateDirIfMissing(const std::string& name); + + virtual Status DeleteDir(const std::string& name); + + virtual Status GetFileSize(const std::string& fname, + uint64_t* size); + + static uint64_t FileTimeToUnixTime(const FILETIME& ftTime); + + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime); + + virtual Status RenameFile(const std::string& src, + const std::string& target); + + virtual Status LinkFile(const std::string& src, + const std::string& target); + + virtual Status LockFile(const std::string& lockFname, + FileLock** lock); + + virtual Status UnlockFile(FileLock* lock); + + virtual Status GetTestDirectory(std::string* result); + + virtual Status NewLogger(const std::string& fname, + std::shared_ptr* result); + + virtual uint64_t NowMicros(); + + virtual uint64_t NowNanos(); + + virtual Status GetHostName(char* name, uint64_t len); + + virtual Status GetAbsolutePath(const std::string& db_path, + std::string* output_path); + + virtual std::string TimeToString(uint64_t secondsSince1970); + + virtual EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const; + + virtual EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const; + + size_t GetPageSize() const { return page_size_; } + + size_t GetAllocationGranularity() const { return allocation_granularity_; } + + uint64_t GetPerfCounterFrequency() const { return perf_counter_frequency_; } + +private: + // Returns true iff the named directory exists and is a directory. + virtual bool DirExists(const std::string& dname); + + typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME); + + Env* hosted_env_; + size_t page_size_; + size_t allocation_granularity_; + uint64_t perf_counter_frequency_; + FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; +}; + +class WinEnv : public Env { +public: + WinEnv(); + + ~WinEnv(); + + Status DeleteFile(const std::string& fname) override; + + Status GetCurrentTime(int64_t* unix_time) override; + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + Status NewDirectory(const std::string& name, + std::unique_ptr* result) override; + + Status FileExists(const std::string& fname) override; + + Status GetChildren(const std::string& dir, + std::vector* result) override; + + Status CreateDir(const std::string& name) override; + + Status CreateDirIfMissing(const std::string& name) override; + + Status DeleteDir(const std::string& name) override; + + Status GetFileSize(const std::string& fname, + uint64_t* size) override; + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override; + + Status RenameFile(const std::string& src, + const std::string& target) override; + + Status LinkFile(const std::string& src, + const std::string& target) override; + + Status LockFile(const std::string& lockFname, + FileLock** lock) override; + + Status UnlockFile(FileLock* lock) override; + + Status GetTestDirectory(std::string* result) override; + + Status NewLogger(const std::string& fname, + std::shared_ptr* result) override; + + uint64_t NowMicros() override; + + uint64_t NowNanos() override; + + Status GetHostName(char* name, uint64_t len) override; + + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override; + + std::string TimeToString(uint64_t secondsSince1970) override; + + Status GetThreadList( + std::vector* thread_list) override; + + void Schedule(void(*function)(void*), void* arg, Env::Priority pri, + void* tag, + void(*unschedFunction)(void* arg)) override; + + int UnSchedule(void* arg, Env::Priority pri) override; + + void StartThread(void(*function)(void* arg), void* arg) override; + + void WaitForJoin(); + + unsigned int GetThreadPoolQueueLen(Env::Priority pri) const override; + + uint64_t GetThreadID() const override; + + void SleepForMicroseconds(int micros) override; + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Env::Priority pri) override; + + void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override; + + EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const override; + + EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const override; + +private: + + WinEnvIO winenv_io_; + WinEnvThreads winenv_threads_; + +}; + +} +} \ No newline at end of file diff --git a/port/win/io_win.cc b/port/win/io_win.cc new file mode 100644 index 000000000..c9ef1f29e --- /dev/null +++ b/port/win/io_win.cc @@ -0,0 +1,963 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/io_win.h" + +#include "util/sync_point.h" +#include "util/coding.h" +#include "util/iostats_context_imp.h" +#include "util/sync_point.h" +#include "util/aligned_buffer.h" + + +namespace rocksdb { +namespace port { + +std::string GetWindowsErrSz(DWORD err) { + LPSTR lpMsgBuf; + FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, err, + 0, // Default language + reinterpret_cast(&lpMsgBuf), 0, NULL); + + std::string Err = lpMsgBuf; + LocalFree(lpMsgBuf); + return Err; +} + +// We preserve the original name of this interface to denote the original idea +// behind it. +// All reads happen by a specified offset and pwrite interface does not change +// the position of the file pointer. Judging from the man page and errno it does +// execute +// lseek atomically to return the position of the file back where it was. +// WriteFile() does not +// have this capability. Therefore, for both pread and pwrite the pointer is +// advanced to the next position +// which is fine for writes because they are (should be) sequential. +// Because all the reads/writes happen by the specified offset, the caller in +// theory should not +// rely on the current file offset. +SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, + uint64_t offset) { + assert(numBytes <= std::numeric_limits::max()); + OVERLAPPED overlapped = { 0 }; + ULARGE_INTEGER offsetUnion; + offsetUnion.QuadPart = offset; + + overlapped.Offset = offsetUnion.LowPart; + overlapped.OffsetHigh = offsetUnion.HighPart; + + SSIZE_T result = 0; + + unsigned long bytesWritten = 0; + + if (FALSE == WriteFile(hFile, src, static_cast(numBytes), &bytesWritten, + &overlapped)) { + result = -1; + } else { + result = bytesWritten; + } + + return result; +} + +// See comments for pwrite above +SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset) { + assert(numBytes <= std::numeric_limits::max()); + OVERLAPPED overlapped = { 0 }; + ULARGE_INTEGER offsetUnion; + offsetUnion.QuadPart = offset; + + overlapped.Offset = offsetUnion.LowPart; + overlapped.OffsetHigh = offsetUnion.HighPart; + + SSIZE_T result = 0; + + unsigned long bytesRead = 0; + + if (FALSE == ReadFile(hFile, src, static_cast(numBytes), &bytesRead, + &overlapped)) { + return -1; + } else { + result = bytesRead; + } + + return result; +} + +// SetFileInformationByHandle() is capable of fast pre-allocates. +// However, this does not change the file end position unless the file is +// truncated and the pre-allocated space is not considered filled with zeros. +Status fallocate(const std::string& filename, HANDLE hFile, + uint64_t to_size) { + Status status; + + FILE_ALLOCATION_INFO alloc_info; + alloc_info.AllocationSize.QuadPart = to_size; + + if (!SetFileInformationByHandle(hFile, FileAllocationInfo, &alloc_info, + sizeof(FILE_ALLOCATION_INFO))) { + auto lastError = GetLastError(); + status = IOErrorFromWindowsError( + "Failed to pre-allocate space: " + filename, lastError); + } + + return status; +} + +Status ftruncate(const std::string& filename, HANDLE hFile, + uint64_t toSize) { + Status status; + + FILE_END_OF_FILE_INFO end_of_file; + end_of_file.EndOfFile.QuadPart = toSize; + + if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file, + sizeof(FILE_END_OF_FILE_INFO))) { + auto lastError = GetLastError(); + status = IOErrorFromWindowsError("Failed to Set end of file: " + filename, + lastError); + } + + return status; +} + +size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) { + + if (max_size < kMaxVarint64Length * 3) { + return 0; + } + + BY_HANDLE_FILE_INFORMATION FileInfo; + + BOOL result = GetFileInformationByHandle(hFile, &FileInfo); + + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); + + if (!result) { + return 0; + } + + char* rid = id; + rid = EncodeVarint64(rid, uint64_t(FileInfo.dwVolumeSerialNumber)); + rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexHigh)); + rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexLow)); + + assert(rid >= id); + return static_cast(rid - id); +} + +WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, + const void* mapped_region, size_t length) + : fileName_(fileName), + hFile_(hFile), + hMap_(hMap), + mapped_region_(mapped_region), + length_(length) {} + +WinMmapReadableFile::~WinMmapReadableFile() { + BOOL ret = ::UnmapViewOfFile(mapped_region_); + assert(ret); + + ret = ::CloseHandle(hMap_); + assert(ret); + + ret = ::CloseHandle(hFile_); + assert(ret); +} + +Status WinMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + + if (offset > length_) { + *result = Slice(); + return IOError(fileName_, EINVAL); + } else if (offset + n > length_) { + n = length_ - offset; + } + *result = + Slice(reinterpret_cast(mapped_region_)+offset, n); + return s; +} + +Status WinMmapReadableFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +size_t WinMmapReadableFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(hFile_, id, max_size); +} + +// Can only truncate or reserve to a sector size aligned if +// used on files that are opened with Unbuffered I/O +Status WinMmapFile::TruncateFile(uint64_t toSize) { + return ftruncate(filename_, hFile_, toSize); +} + +Status WinMmapFile::UnmapCurrentRegion() { + Status status; + + if (mapped_begin_ != nullptr) { + if (!::UnmapViewOfFile(mapped_begin_)) { + status = IOErrorFromWindowsError( + "Failed to unmap file view: " + filename_, GetLastError()); + } + + // Move on to the next portion of the file + file_offset_ += view_size_; + + // UnmapView automatically sends data to disk but not the metadata + // which is good and provides some equivalent of fdatasync() on Linux + // therefore, we donot need separate flag for metadata + mapped_begin_ = nullptr; + mapped_end_ = nullptr; + dst_ = nullptr; + + last_sync_ = nullptr; + pending_sync_ = false; + } + + return status; +} + +Status WinMmapFile::MapNewRegion() { + + Status status; + + assert(mapped_begin_ == nullptr); + + size_t minDiskSize = file_offset_ + view_size_; + + if (minDiskSize > reserved_size_) { + status = Allocate(file_offset_, view_size_); + if (!status.ok()) { + return status; + } + } + + // Need to remap + if (hMap_ == NULL || reserved_size_ > mapping_size_) { + + if (hMap_ != NULL) { + // Unmap the previous one + BOOL ret = ::CloseHandle(hMap_); + assert(ret); + hMap_ = NULL; + } + + ULARGE_INTEGER mappingSize; + mappingSize.QuadPart = reserved_size_; + + hMap_ = CreateFileMappingA( + hFile_, + NULL, // Security attributes + PAGE_READWRITE, // There is not a write only mode for mapping + mappingSize.HighPart, // Enable mapping the whole file but the actual + // amount mapped is determined by MapViewOfFile + mappingSize.LowPart, + NULL); // Mapping name + + if (NULL == hMap_) { + return IOErrorFromWindowsError( + "WindowsMmapFile failed to create file mapping for: " + filename_, + GetLastError()); + } + + mapping_size_ = reserved_size_; + } + + ULARGE_INTEGER offset; + offset.QuadPart = file_offset_; + + // View must begin at the granularity aligned offset + mapped_begin_ = reinterpret_cast( + MapViewOfFileEx(hMap_, FILE_MAP_WRITE, offset.HighPart, offset.LowPart, + view_size_, NULL)); + + if (!mapped_begin_) { + status = IOErrorFromWindowsError( + "WindowsMmapFile failed to map file view: " + filename_, + GetLastError()); + } else { + mapped_end_ = mapped_begin_ + view_size_; + dst_ = mapped_begin_; + last_sync_ = mapped_begin_; + pending_sync_ = false; + } + return status; +} + +Status WinMmapFile::PreallocateInternal(uint64_t spaceToReserve) { + return fallocate(filename_, hFile_, spaceToReserve); +} + +WinMmapFile::WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, + size_t allocation_granularity, const EnvOptions& options) + : filename_(fname), + hFile_(hFile), + hMap_(NULL), + page_size_(page_size), + allocation_granularity_(allocation_granularity), + reserved_size_(0), + mapping_size_(0), + view_size_(0), + mapped_begin_(nullptr), + mapped_end_(nullptr), + dst_(nullptr), + last_sync_(nullptr), + file_offset_(0), + pending_sync_(false) { + // Allocation granularity must be obtained from GetSystemInfo() and must be + // a power of two. + assert(allocation_granularity > 0); + assert((allocation_granularity & (allocation_granularity - 1)) == 0); + + assert(page_size > 0); + assert((page_size & (page_size - 1)) == 0); + + // Only for memory mapped writes + assert(options.use_mmap_writes); + + // View size must be both the multiple of allocation_granularity AND the + // page size and the granularity is usually a multiple of a page size. + const size_t viewSize = 32 * 1024; // 32Kb similar to the Windows File Cache in buffered mode + view_size_ = Roundup(viewSize, allocation_granularity_); +} + +WinMmapFile::~WinMmapFile() { + if (hFile_) { + this->Close(); + } +} + +Status WinMmapFile::Append(const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + + while (left > 0) { + assert(mapped_begin_ <= dst_); + size_t avail = mapped_end_ - dst_; + + if (avail == 0) { + Status s = UnmapCurrentRegion(); + if (s.ok()) { + s = MapNewRegion(); + } + + if (!s.ok()) { + return s; + } + } else { + size_t n = std::min(left, avail); + memcpy(dst_, src, n); + dst_ += n; + src += n; + left -= n; + pending_sync_ = true; + } + } + + // Now make sure that the last partial page is padded with zeros if needed + size_t bytesToPad = Roundup(size_t(dst_), page_size_) - size_t(dst_); + if (bytesToPad > 0) { + memset(dst_, 0, bytesToPad); + } + + return Status::OK(); +} + +// Means Close() will properly take care of truncate +// and it does not need any additional information +Status WinMmapFile::Truncate(uint64_t size) { + return Status::OK(); +} + +Status WinMmapFile::Close() { + Status s; + + assert(NULL != hFile_); + + // We truncate to the precise size so no + // uninitialized data at the end. SetEndOfFile + // which we use does not write zeros and it is good. + uint64_t targetSize = GetFileSize(); + + if (mapped_begin_ != nullptr) { + // Sync before unmapping to make sure everything + // is on disk and there is not a lazy writing + // so we are deterministic with the tests + Sync(); + s = UnmapCurrentRegion(); + } + + if (NULL != hMap_) { + BOOL ret = ::CloseHandle(hMap_); + if (!ret && s.ok()) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to Close mapping for file: " + filename_, lastError); + } + + hMap_ = NULL; + } + + if (hFile_ != NULL) { + + TruncateFile(targetSize); + + BOOL ret = ::CloseHandle(hFile_); + hFile_ = NULL; + + if (!ret && s.ok()) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to close file map handle: " + filename_, lastError); + } + } + + return s; +} + +Status WinMmapFile::Flush() { return Status::OK(); } + +// Flush only data +Status WinMmapFile::Sync() { + Status s; + + // Some writes occurred since last sync + if (dst_ > last_sync_) { + assert(mapped_begin_); + assert(dst_); + assert(dst_ > mapped_begin_); + assert(dst_ < mapped_end_); + + size_t page_begin = + TruncateToPageBoundary(page_size_, last_sync_ - mapped_begin_); + size_t page_end = + TruncateToPageBoundary(page_size_, dst_ - mapped_begin_ - 1); + + // Flush only the amount of that is a multiple of pages + if (!::FlushViewOfFile(mapped_begin_ + page_begin, + (page_end - page_begin) + page_size_)) { + s = IOErrorFromWindowsError("Failed to FlushViewOfFile: " + filename_, + GetLastError()); + } else { + last_sync_ = dst_; + } + } + + return s; +} + +/** +* Flush data as well as metadata to stable storage. +*/ +Status WinMmapFile::Fsync() { + Status s = Sync(); + + // Flush metadata + if (s.ok() && pending_sync_) { + if (!::FlushFileBuffers(hFile_)) { + s = IOErrorFromWindowsError("Failed to FlushFileBuffers: " + filename_, + GetLastError()); + } + pending_sync_ = false; + } + + return s; +} + +/** +* Get the size of valid data in the file. This will not match the +* size that is returned from the filesystem because we use mmap +* to extend file by map_size every time. +*/ +uint64_t WinMmapFile::GetFileSize() { + size_t used = dst_ - mapped_begin_; + return file_offset_ + used; +} + +Status WinMmapFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +Status WinMmapFile::Allocate(uint64_t offset, uint64_t len) { + Status status; + TEST_KILL_RANDOM("WinMmapFile::Allocate", rocksdb_kill_odds); + + // Make sure that we reserve an aligned amount of space + // since the reservation block size is driven outside so we want + // to check if we are ok with reservation here + size_t spaceToReserve = Roundup(offset + len, view_size_); + // Nothing to do + if (spaceToReserve <= reserved_size_) { + return status; + } + + IOSTATS_TIMER_GUARD(allocate_nanos); + status = PreallocateInternal(spaceToReserve); + if (status.ok()) { + reserved_size_ = spaceToReserve; + } + return status; +} + +size_t WinMmapFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(hFile_, id, max_size); +} + +WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f, + const EnvOptions& options) + : filename_(fname), + file_(f), + use_os_buffer_(options.use_os_buffer) +{} + +WinSequentialFile::~WinSequentialFile() { + assert(file_ != INVALID_HANDLE_VALUE); + CloseHandle(file_); +} + +Status WinSequentialFile::Read(size_t n, Slice* result, char* scratch) { + Status s; + size_t r = 0; + + // Windows ReadFile API accepts a DWORD. + // While it is possible to read in a loop if n is > UINT_MAX + // it is a highly unlikely case. + if (n > UINT_MAX) { + return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); + } + + DWORD bytesToRead = static_cast(n); //cast is safe due to the check above + DWORD bytesRead = 0; + BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL); + if (ret == TRUE) { + r = bytesRead; + } else { + return IOErrorFromWindowsError(filename_, GetLastError()); + } + + *result = Slice(scratch, r); + + return s; +} + +Status WinSequentialFile::Skip(uint64_t n) { + // Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit + // integer. As such it is a highly unlikley case to have n so large. + if (n > _I64_MAX) { + return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); + } + + LARGE_INTEGER li; + li.QuadPart = static_cast(n); //cast is safe due to the check above + BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT); + if (ret == FALSE) { + return IOErrorFromWindowsError(filename_, GetLastError()); + } + return Status::OK(); +} + +Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +SSIZE_T WinRandomAccessFile::ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + AlignedBuffer& buffer, char* dest) const { + assert(buffer.CurrentSize() == 0); + assert(buffer.Capacity() >= bytes_to_read); + + SSIZE_T read = + PositionedReadInternal(buffer.Destination(), bytes_to_read, first_page_start); + + if (read > 0) { + buffer.Size(read); + + // Let's figure out how much we read from the users standpoint + if ((first_page_start + buffer.CurrentSize()) > user_offset) { + assert(first_page_start <= user_offset); + size_t buffer_offset = user_offset - first_page_start; + read = buffer.Read(dest, buffer_offset, left); + } else { + read = 0; + } + left -= read; + } + return read; +} + +SSIZE_T WinRandomAccessFile::ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + char* dest) const { + AlignedBuffer bigBuffer; + bigBuffer.Alignment(buffer_.Alignment()); + bigBuffer.AllocateNewBuffer(bytes_to_read); + + return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, + bigBuffer, dest); +} + +SSIZE_T WinRandomAccessFile::ReadIntoInstanceBuffer(uint64_t user_offset, + uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + char* dest) const { + SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, + left, buffer_, dest); + + if (read > 0) { + buffered_start_ = first_page_start; + } + + return read; +} + +void WinRandomAccessFile::CalculateReadParameters(uint64_t offset, size_t bytes_requested, + size_t& actual_bytes_toread, + uint64_t& first_page_start) const { + + const size_t alignment = buffer_.Alignment(); + + first_page_start = TruncateToPageBoundary(alignment, offset); + const uint64_t last_page_start = + TruncateToPageBoundary(alignment, offset + bytes_requested - 1); + actual_bytes_toread = (last_page_start - first_page_start) + alignment; +} + +SSIZE_T WinRandomAccessFile::PositionedReadInternal(char* src, size_t numBytes, + uint64_t offset) const { + return pread(hFile_, src, numBytes, offset); +} + +WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, + const EnvOptions& options) + : filename_(fname), + hFile_(hFile), + use_os_buffer_(options.use_os_buffer), + read_ahead_(false), + compaction_readahead_size_(options.compaction_readahead_size), + random_access_max_buffer_size_(options.random_access_max_buffer_size), + buffer_(), + buffered_start_(0) { + assert(!options.use_mmap_reads); + + // Unbuffered access, use internal buffer for reads + if (!use_os_buffer_) { + // Do not allocate the buffer either until the first request or + // until there is a call to allocate a read-ahead buffer + buffer_.Alignment(alignment); + } +} + +WinRandomAccessFile::~WinRandomAccessFile() { + if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) { + ::CloseHandle(hFile_); + } +} + +void WinRandomAccessFile::EnableReadAhead() { this->Hint(SEQUENTIAL); } + +Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + + Status s; + SSIZE_T r = -1; + size_t left = n; + char* dest = scratch; + + if (n == 0) { + *result = Slice(scratch, 0); + return s; + } + + // When in unbuffered mode we need to do the following changes: + // - use our own aligned buffer + // - always read at the offset of that is a multiple of alignment + if (!use_os_buffer_) { + + uint64_t first_page_start = 0; + size_t actual_bytes_toread = 0; + size_t bytes_requested = left; + + if (!read_ahead_ && random_access_max_buffer_size_ == 0) { + CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, + first_page_start); + + assert(actual_bytes_toread > 0); + + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } else { + + std::unique_lock lock(buffer_mut_); + + // Let's see if at least some of the requested data is already + // in the buffer + if (offset >= buffered_start_ && + offset < (buffered_start_ + buffer_.CurrentSize())) { + size_t buffer_offset = offset - buffered_start_; + r = buffer_.Read(dest, buffer_offset, left); + assert(r >= 0); + + left -= size_t(r); + offset += r; + dest += r; + } + + // Still some left or none was buffered + if (left > 0) { + // Figure out the start/end offset for reading and amount to read + bytes_requested = left; + + if (read_ahead_ && bytes_requested < compaction_readahead_size_) { + bytes_requested = compaction_readahead_size_; + } + + CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, + first_page_start); + + assert(actual_bytes_toread > 0); + + if (buffer_.Capacity() < actual_bytes_toread) { + // If we are in read-ahead mode or the requested size + // exceeds max buffer size then use one-shot + // big buffer otherwise reallocate main buffer + if (read_ahead_ || + (actual_bytes_toread > random_access_max_buffer_size_)) { + // Unlock the mutex since we are not using instance buffer + lock.unlock(); + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } else { + buffer_.AllocateNewBuffer(actual_bytes_toread); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } + } else { + buffer_.Clear(); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } + } + } + } else { + r = PositionedReadInternal(scratch, left, offset); + if (r > 0) { + left -= r; + } + } + + *result = Slice(scratch, (r < 0) ? 0 : n - left); + + if (r < 0) { + s = IOErrorFromLastWindowsError(filename_); + } + return s; +} + +bool WinRandomAccessFile::ShouldForwardRawRequest() const { + return true; +} + +void WinRandomAccessFile::Hint(AccessPattern pattern) { + if (pattern == SEQUENTIAL && !use_os_buffer_ && + compaction_readahead_size_ > 0) { + std::lock_guard lg(buffer_mut_); + if (!read_ahead_) { + read_ahead_ = true; + // This would allocate read-ahead size + 2 alignments + // - one for memory alignment which added implicitly by AlignedBuffer + // - We add one more alignment because we will read one alignment more + // from disk + buffer_.AllocateNewBuffer(compaction_readahead_size_ + + buffer_.Alignment()); + } + } +} + +Status WinRandomAccessFile::InvalidateCache(size_t offset, size_t length) { + return Status::OK(); +} + +size_t WinRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(hFile_, id, max_size); +} + +Status WinWritableFile::PreallocateInternal(uint64_t spaceToReserve) { + return fallocate(filename_, hFile_, spaceToReserve); +} + +WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, + size_t capacity, const EnvOptions& options) + : filename_(fname), + hFile_(hFile), + use_os_buffer_(options.use_os_buffer), + alignment_(alignment), + filesize_(0), + reservedsize_(0) { + assert(!options.use_mmap_writes); +} + +WinWritableFile::~WinWritableFile() { + if (NULL != hFile_ && INVALID_HANDLE_VALUE != hFile_) { + WinWritableFile::Close(); + } +} + + // Indicates if the class makes use of unbuffered I/O +bool WinWritableFile::UseOSBuffer() const { + return use_os_buffer_; +} + +size_t WinWritableFile::GetRequiredBufferAlignment() const { + return alignment_; +} + +Status WinWritableFile::Append(const Slice& data) { + + // Used for buffered access ONLY + assert(use_os_buffer_); + assert(data.size() < std::numeric_limits::max()); + + Status s; + + DWORD bytesWritten = 0; + if (!WriteFile(hFile_, data.data(), + static_cast(data.size()), &bytesWritten, NULL)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to WriteFile: " + filename_, + lastError); + } else { + assert(size_t(bytesWritten) == data.size()); + filesize_ += data.size(); + } + + return s; +} + +Status WinWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { + Status s; + + SSIZE_T ret = pwrite(hFile_, data.data(), data.size(), offset); + + // Error break + if (ret < 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to pwrite for: " + filename_, lastError); + } else { + // With positional write it is not clear at all + // if this actually extends the filesize + assert(size_t(ret) == data.size()); + filesize_ += data.size(); + } + return s; +} + + // Need to implement this so the file is truncated correctly + // when buffered and unbuffered mode +Status WinWritableFile::Truncate(uint64_t size) { + Status s = ftruncate(filename_, hFile_, size); + if (s.ok()) { + filesize_ = size; + } + return s; +} + +Status WinWritableFile::Close() { + + Status s; + + assert(INVALID_HANDLE_VALUE != hFile_); + + if (fsync(hFile_) < 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, + lastError); + } + + if (FALSE == ::CloseHandle(hFile_)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, + lastError); + } + + hFile_ = INVALID_HANDLE_VALUE; + return s; +} + + // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter +Status WinWritableFile::Flush() { + return Status::OK(); +} + +Status WinWritableFile::Sync() { + Status s; + // Calls flush buffers + if (fsync(hFile_) < 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, + lastError); + } + return s; +} + +Status WinWritableFile::Fsync() { return Sync(); } + +uint64_t WinWritableFile::GetFileSize() { + // Double accounting now here with WritableFileWriter + // and this size will be wrong when unbuffered access is used + // but tests implement their own writable files and do not use WritableFileWrapper + // so we need to squeeze a square peg through + // a round hole here. + return filesize_; +} + +Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) { + Status status; + TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); + + // Make sure that we reserve an aligned amount of space + // since the reservation block size is driven outside so we want + // to check if we are ok with reservation here + size_t spaceToReserve = Roundup(offset + len, alignment_); + // Nothing to do + if (spaceToReserve <= reservedsize_) { + return status; + } + + IOSTATS_TIMER_GUARD(allocate_nanos); + status = PreallocateInternal(spaceToReserve); + if (status.ok()) { + reservedsize_ = spaceToReserve; + } + return status; +} + +size_t WinWritableFile::GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(hFile_, id, max_size); +} + +Status WinDirectory::Fsync() { return Status::OK(); } + +WinFileLock::~WinFileLock() { + BOOL ret = ::CloseHandle(hFile_); + assert(ret); +} + + +} +} + diff --git a/port/win/io_win.h b/port/win/io_win.h new file mode 100644 index 000000000..8c3a4ba7a --- /dev/null +++ b/port/win/io_win.h @@ -0,0 +1,359 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include + +#include "util/aligned_buffer.h" + +#include +#include + +#include + +#include + +namespace rocksdb { +namespace port { + +std::string GetWindowsErrSz(DWORD err); + +inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { + return Status::IOError(context, GetWindowsErrSz(err)); +} + +inline Status IOErrorFromLastWindowsError(const std::string& context) { + return IOErrorFromWindowsError(context, GetLastError()); +} + +inline Status IOError(const std::string& context, int err_number) { + return Status::IOError(context, strerror(err_number)); +} + +// Note the below two do not set errno because they are used only here in this +// file +// on a Windows handle and, therefore, not necessary. Translating GetLastError() +// to errno +// is a sad business +inline int fsync(HANDLE hFile) { + if (!FlushFileBuffers(hFile)) { + return -1; + } + + return 0; +} + +SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, + uint64_t offset); + +SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset); + +Status fallocate(const std::string& filename, HANDLE hFile, + uint64_t to_size); + +Status ftruncate(const std::string& filename, HANDLE hFile, + uint64_t toSize); + + +size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size); + +// mmap() based random-access +class WinMmapReadableFile : public RandomAccessFile { + const std::string fileName_; + HANDLE hFile_; + HANDLE hMap_; + + const void* mapped_region_; + const size_t length_; + +public: + // mapped_region_[0,length-1] contains the mmapped contents of the file. + WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, + const void* mapped_region, size_t length); + + ~WinMmapReadableFile(); + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +// We preallocate and use memcpy to append new +// data to the file. This is safe since we either properly close the +// file before reading from it, or for log files, the reading code +// knows enough to skip zero suffixes. +class WinMmapFile : public WritableFile { +private: + const std::string filename_; + HANDLE hFile_; + HANDLE hMap_; + + const size_t page_size_; // We flush the mapping view in page_size + // increments. We may decide if this is a memory + // page size or SSD page size + const size_t + allocation_granularity_; // View must start at such a granularity + + size_t reserved_size_; // Preallocated size + + size_t mapping_size_; // The max size of the mapping object + // we want to guess the final file size to minimize the remapping + size_t view_size_; // How much memory to map into a view at a time + + char* mapped_begin_; // Must begin at the file offset that is aligned with + // allocation_granularity_ + char* mapped_end_; + char* dst_; // Where to write next (in range [mapped_begin_,mapped_end_]) + char* last_sync_; // Where have we synced up to + + uint64_t file_offset_; // Offset of mapped_begin_ in file + + // Do we have unsynced writes? + bool pending_sync_; + + // Can only truncate or reserve to a sector size aligned if + // used on files that are opened with Unbuffered I/O + Status TruncateFile(uint64_t toSize); + + Status UnmapCurrentRegion(); + + Status MapNewRegion(); + + virtual Status PreallocateInternal(uint64_t spaceToReserve); + +public: + + WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, + size_t allocation_granularity, const EnvOptions& options); + + ~WinMmapFile(); + + virtual Status Append(const Slice& data) override; + + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override; + + virtual Status Close() override; + + virtual Status Flush() override; + + // Flush only data + virtual Status Sync() override; + + /** + * Flush data as well as metadata to stable storage. + */ + virtual Status Fsync() override; + + /** + * Get the size of valid data in the file. This will not match the + * size that is returned from the filesystem because we use mmap + * to extend file by map_size every time. + */ + virtual uint64_t GetFileSize() override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual Status Allocate(uint64_t offset, uint64_t len) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +class WinSequentialFile : public SequentialFile { +private: + const std::string filename_; + HANDLE file_; + + // There is no equivalent of advising away buffered pages as in posix. + // To implement this flag we would need to do unbuffered reads which + // will need to be aligned (not sure there is a guarantee that the buffer + // passed in is aligned). + // Hence we currently ignore this flag. It is used only in a few cases + // which should not be perf critical. + // If perf evaluation finds this to be a problem, we can look into + // implementing this. + bool use_os_buffer_; + +public: + WinSequentialFile(const std::string& fname, HANDLE f, + const EnvOptions& options); + + ~WinSequentialFile(); + + virtual Status Read(size_t n, Slice* result, char* scratch) override; + + virtual Status Skip(uint64_t n) override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; +}; + +// pread() based random-access +class WinRandomAccessFile : public RandomAccessFile { + const std::string filename_; + HANDLE hFile_; + const bool use_os_buffer_; + bool read_ahead_; + const size_t compaction_readahead_size_; + const size_t random_access_max_buffer_size_; + mutable std::mutex buffer_mut_; + mutable AlignedBuffer buffer_; + mutable uint64_t + buffered_start_; // file offset set that is currently buffered + + /* + * The function reads a requested amount of bytes into the specified aligned + * buffer Upon success the function sets the length of the buffer to the + * amount of bytes actually read even though it might be less than actually + * requested. It then copies the amount of bytes requested by the user (left) + * to the user supplied buffer (dest) and reduces left by the amount of bytes + * copied to the user buffer + * + * @user_offset [in] - offset on disk where the read was requested by the user + * @first_page_start [in] - actual page aligned disk offset that we want to + * read from + * @bytes_to_read [in] - total amount of bytes that will be read from disk + * which is generally greater or equal to the amount + * that the user has requested due to the + * either alignment requirements or read_ahead in + * effect. + * @left [in/out] total amount of bytes that needs to be copied to the user + * buffer. It is reduced by the amount of bytes that actually + * copied + * @buffer - buffer to use + * @dest - user supplied buffer + */ + SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + AlignedBuffer& buffer, char* dest) const; + + SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + char* dest) const; + + SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, + uint64_t first_page_start, + size_t bytes_to_read, size_t& left, + char* dest) const; + + void CalculateReadParameters(uint64_t offset, size_t bytes_requested, + size_t& actual_bytes_toread, + uint64_t& first_page_start) const; + + // Override for behavior change + virtual SSIZE_T PositionedReadInternal(char* src, size_t numBytes, + uint64_t offset) const; + +public: + WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, + const EnvOptions& options); + + ~WinRandomAccessFile(); + + virtual void EnableReadAhead() override; + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual bool ShouldForwardRawRequest() const override; + + virtual void Hint(AccessPattern pattern) override; + + virtual Status InvalidateCache(size_t offset, size_t length) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + + +// This is a sequential write class. It has been mimicked (as others) after +// the original Posix class. We add support for unbuffered I/O on windows as +// well +// we utilize the original buffer as an alignment buffer to write directly to +// file with no buffering. +// No buffering requires that the provided buffer is aligned to the physical +// sector size (SSD page size) and +// that all SetFilePointer() operations to occur with such an alignment. +// We thus always write in sector/page size increments to the drive and leave +// the tail for the next write OR for Close() at which point we pad with zeros. +// No padding is required for +// buffered access. +class WinWritableFile : public WritableFile { +private: + const std::string filename_; + HANDLE hFile_; + const bool use_os_buffer_; // Used to indicate unbuffered access, the file + const uint64_t alignment_; + // must be opened as unbuffered if false + uint64_t filesize_; // How much data is actually written disk + uint64_t reservedsize_; // how far we have reserved space + + virtual Status PreallocateInternal(uint64_t spaceToReserve); + +public: + WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, + size_t capacity, const EnvOptions& options); + + ~WinWritableFile(); + + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const override; + + virtual size_t GetRequiredBufferAlignment() const override; + + virtual Status Append(const Slice& data) override; + + virtual Status PositionedAppend(const Slice& data, uint64_t offset) override; + + // Need to implement this so the file is truncated correctly + // when buffered and unbuffered mode + virtual Status Truncate(uint64_t size) override; + + virtual Status Close() override; + + // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter + virtual Status Flush() override; + + virtual Status Sync() override; + + virtual Status Fsync() override; + + virtual uint64_t GetFileSize() override; + + virtual Status Allocate(uint64_t offset, uint64_t len) override; + + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +}; + +class WinDirectory : public Directory { +public: + WinDirectory() {} + + virtual Status Fsync() override; +}; + +class WinFileLock : public FileLock { +public: + explicit WinFileLock(HANDLE hFile) : hFile_(hFile) { + assert(hFile != NULL); + assert(hFile != INVALID_HANDLE_VALUE); + } + + ~WinFileLock(); + +private: + HANDLE hFile_; +}; + +} +} diff --git a/port/win/port_win.h b/port/win/port_win.h index 116ebc4d9..a677bd662 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -69,7 +69,6 @@ typedef SSIZE_T ssize_t; namespace rocksdb { #define PREFETCH(addr, rw, locality) -std::string GetWindowsErrSz(DWORD err); namespace port { diff --git a/port/win/win_logger.cc b/port/win/win_logger.cc index 764d75325..3c4ae1f88 100644 --- a/port/win/win_logger.cc +++ b/port/win/win_logger.cc @@ -11,6 +11,7 @@ // where enough posix functionality is available. #include "port/win/win_logger.h" +#include "port/win/io_win.h" #include #include @@ -25,6 +26,8 @@ namespace rocksdb { +namespace port { + WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file, const InfoLogLevel log_level) : Logger(log_level), @@ -152,4 +155,6 @@ void WinLogger::Logv(const char* format, va_list ap) { size_t WinLogger::GetLogFileSize() const { return log_size_; } +} + } // namespace rocksdb diff --git a/port/win/win_logger.h b/port/win/win_logger.h index a85f24c83..84971363d 100644 --- a/port/win/win_logger.h +++ b/port/win/win_logger.h @@ -23,6 +23,8 @@ namespace rocksdb { class Env; +namespace port { + class WinLogger : public rocksdb::Logger { public: WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file, @@ -55,4 +57,6 @@ class WinLogger : public rocksdb::Logger { const static uint64_t flush_every_seconds_ = 5; }; +} + } // namespace rocksdb