397ab11152
Summary: We've got some DBs where iterators return Status with message "Corruption: block checksum mismatch" all the time. That's not very informative. It would be much easier to investigate if the error message contained the file name - then we would know e.g. how old the corrupted file is, which would be very useful for finding the root cause. This PR adds file name, offset and other stuff to some block corruption-related status messages. It doesn't improve all the error messages, just a few that were easy to improve. I'm mostly interested in "block checksum mismatch" and "Bad table magic number" since they're the only corruption errors that I've ever seen in the wild. Closes https://github.com/facebook/rocksdb/pull/2507 Differential Revision: D5345702 Pulled By: al13n321 fbshipit-source-id: fc8023d43f1935ad927cef1b9c55481ab3cb1339
596 lines
16 KiB
C++
596 lines
16 KiB
C++
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
// This source code is also licensed under the GPLv2 license found in the
|
|
// COPYING file in the root directory of this source tree.
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include "utilities/persistent_cache/block_cache_tier_file.h"
|
|
|
|
#ifndef OS_WIN
|
|
#include <unistd.h>
|
|
#endif
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <vector>
|
|
|
|
#include "port/port.h"
|
|
#include "util/crc32c.h"
|
|
#include "util/logging.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
//
|
|
// File creation factories
|
|
//
|
|
Status NewWritableCacheFile(Env* const env, const std::string& filepath,
|
|
std::unique_ptr<WritableFile>* file,
|
|
const bool use_direct_writes = false) {
|
|
EnvOptions opt;
|
|
opt.use_direct_writes = use_direct_writes;
|
|
Status s = env->NewWritableFile(filepath, file, opt);
|
|
return s;
|
|
}
|
|
|
|
Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
|
|
std::unique_ptr<RandomAccessFile>* file,
|
|
const bool use_direct_reads = true) {
|
|
EnvOptions opt;
|
|
opt.use_direct_reads = use_direct_reads;
|
|
Status s = env->NewRandomAccessFile(filepath, file, opt);
|
|
return s;
|
|
}
|
|
|
|
//
|
|
// BlockCacheFile
|
|
//
|
|
Status BlockCacheFile::Delete(uint64_t* size) {
|
|
Status status = env_->GetFileSize(Path(), size);
|
|
if (!status.ok()) {
|
|
return status;
|
|
}
|
|
return env_->DeleteFile(Path());
|
|
}
|
|
|
|
//
|
|
// CacheRecord
|
|
//
|
|
// Cache record represents the record on disk
|
|
//
|
|
// +--------+---------+----------+------------+---------------+-------------+
|
|
// | magic | crc | key size | value size | key data | value data |
|
|
// +--------+---------+----------+------------+---------------+-------------+
|
|
// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
|
|
//
|
|
struct CacheRecordHeader {
|
|
CacheRecordHeader() {}
|
|
CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
|
|
const uint32_t val_size)
|
|
: magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
|
|
|
|
uint32_t magic_;
|
|
uint32_t crc_;
|
|
uint32_t key_size_;
|
|
uint32_t val_size_;
|
|
};
|
|
|
|
struct CacheRecord {
|
|
CacheRecord() {}
|
|
CacheRecord(const Slice& key, const Slice& val)
|
|
: hdr_(MAGIC, static_cast<uint32_t>(key.size()),
|
|
static_cast<uint32_t>(val.size())),
|
|
key_(key),
|
|
val_(val) {
|
|
hdr_.crc_ = ComputeCRC();
|
|
}
|
|
|
|
uint32_t ComputeCRC() const;
|
|
bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
|
|
bool Deserialize(const Slice& buf);
|
|
|
|
static uint32_t CalcSize(const Slice& key, const Slice& val) {
|
|
return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
|
|
val.size());
|
|
}
|
|
|
|
static const uint32_t MAGIC = 0xfefa;
|
|
|
|
bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
|
|
const char* data, const size_t size);
|
|
|
|
CacheRecordHeader hdr_;
|
|
Slice key_;
|
|
Slice val_;
|
|
};
|
|
|
|
static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
|
|
|
|
uint32_t CacheRecord::ComputeCRC() const {
|
|
uint32_t crc = 0;
|
|
CacheRecordHeader tmp = hdr_;
|
|
tmp.crc_ = 0;
|
|
crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
|
|
crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
|
|
key_.size());
|
|
crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
|
|
val_.size());
|
|
return crc;
|
|
}
|
|
|
|
bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
|
|
size_t* woff) {
|
|
assert(bufs->size());
|
|
return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
|
|
sizeof(hdr_)) &&
|
|
Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
|
|
key_.size()) &&
|
|
Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
|
|
val_.size());
|
|
}
|
|
|
|
bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
|
|
const char* data, const size_t data_size) {
|
|
assert(*woff < bufs->size());
|
|
|
|
const char* p = data;
|
|
size_t size = data_size;
|
|
|
|
while (size && *woff < bufs->size()) {
|
|
CacheWriteBuffer* buf = (*bufs)[*woff];
|
|
const size_t free = buf->Free();
|
|
if (size <= free) {
|
|
buf->Append(p, size);
|
|
size = 0;
|
|
} else {
|
|
buf->Append(p, free);
|
|
p += free;
|
|
size -= free;
|
|
assert(!buf->Free());
|
|
assert(buf->Used() == buf->Capacity());
|
|
}
|
|
|
|
if (!buf->Free()) {
|
|
*woff += 1;
|
|
}
|
|
}
|
|
|
|
assert(!size);
|
|
|
|
return !size;
|
|
}
|
|
|
|
bool CacheRecord::Deserialize(const Slice& data) {
|
|
assert(data.size() >= sizeof(CacheRecordHeader));
|
|
if (data.size() < sizeof(CacheRecordHeader)) {
|
|
return false;
|
|
}
|
|
|
|
memcpy(&hdr_, data.data(), sizeof(hdr_));
|
|
|
|
assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
|
|
if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
|
|
return false;
|
|
}
|
|
|
|
key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
|
|
val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
|
|
|
|
if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
|
|
fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
|
|
fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
|
|
fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
|
|
fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
|
|
fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
|
|
for (size_t i = 0; i < hdr_.val_size_; ++i) {
|
|
fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
|
|
}
|
|
fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
|
|
}
|
|
|
|
assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
|
|
return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
|
|
}
|
|
|
|
//
|
|
// RandomAccessFile
|
|
//
|
|
|
|
bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
|
|
WriteLock _(&rwlock_);
|
|
return OpenImpl(enable_direct_reads);
|
|
}
|
|
|
|
bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
|
|
rwlock_.AssertHeld();
|
|
|
|
ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
|
|
|
|
std::unique_ptr<RandomAccessFile> file;
|
|
Status status =
|
|
NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
|
|
if (!status.ok()) {
|
|
Error(log_, "Error opening random access file %s. %s", Path().c_str(),
|
|
status.ToString().c_str());
|
|
return false;
|
|
}
|
|
freader_.reset(new RandomAccessFileReader(std::move(file), Path(), env_));
|
|
|
|
return true;
|
|
}
|
|
|
|
bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
|
|
char* scratch) {
|
|
ReadLock _(&rwlock_);
|
|
|
|
assert(lba.cache_id_ == cache_id_);
|
|
|
|
if (!freader_) {
|
|
return false;
|
|
}
|
|
|
|
Slice result;
|
|
Status s = freader_->Read(lba.off_, lba.size_, &result, scratch);
|
|
if (!s.ok()) {
|
|
Error(log_, "Error reading from file %s. %s", Path().c_str(),
|
|
s.ToString().c_str());
|
|
return false;
|
|
}
|
|
|
|
assert(result.data() == scratch);
|
|
|
|
return ParseRec(lba, key, val, scratch);
|
|
}
|
|
|
|
bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
|
|
char* scratch) {
|
|
Slice data(scratch, lba.size_);
|
|
|
|
CacheRecord rec;
|
|
if (!rec.Deserialize(data)) {
|
|
assert(!"Error deserializing data");
|
|
Error(log_, "Error de-serializing record from file %s off %d",
|
|
Path().c_str(), lba.off_);
|
|
return false;
|
|
}
|
|
|
|
*key = Slice(rec.key_);
|
|
*val = Slice(rec.val_);
|
|
|
|
return true;
|
|
}
|
|
|
|
//
|
|
// WriteableCacheFile
|
|
//
|
|
|
|
WriteableCacheFile::~WriteableCacheFile() {
|
|
WriteLock _(&rwlock_);
|
|
if (!eof_) {
|
|
// This file never flushed. We give priority to shutdown since this is a
|
|
// cache
|
|
// TODO(krad): Figure a way to flush the pending data
|
|
if (file_) {
|
|
assert(refs_ == 1);
|
|
--refs_;
|
|
}
|
|
}
|
|
assert(!refs_);
|
|
ClearBuffers();
|
|
}
|
|
|
|
bool WriteableCacheFile::Create(const bool enable_direct_writes,
|
|
const bool enable_direct_reads) {
|
|
WriteLock _(&rwlock_);
|
|
|
|
enable_direct_reads_ = enable_direct_reads;
|
|
|
|
ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
|
|
Path().c_str(), max_size_);
|
|
|
|
Status s = env_->FileExists(Path());
|
|
if (s.ok()) {
|
|
ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
|
|
s.ToString().c_str());
|
|
}
|
|
|
|
s = NewWritableCacheFile(env_, Path(), &file_);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
|
|
s.ToString().c_str());
|
|
return false;
|
|
}
|
|
|
|
assert(!refs_);
|
|
++refs_;
|
|
|
|
return true;
|
|
}
|
|
|
|
bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
|
|
WriteLock _(&rwlock_);
|
|
|
|
if (eof_) {
|
|
// We can't append since the file is full
|
|
return false;
|
|
}
|
|
|
|
// estimate the space required to store the (key, val)
|
|
uint32_t rec_size = CacheRecord::CalcSize(key, val);
|
|
|
|
if (!ExpandBuffer(rec_size)) {
|
|
// unable to expand the buffer
|
|
ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
|
|
return false;
|
|
}
|
|
|
|
lba->cache_id_ = cache_id_;
|
|
lba->off_ = disk_woff_;
|
|
lba->size_ = rec_size;
|
|
|
|
CacheRecord rec(key, val);
|
|
if (!rec.Serialize(&bufs_, &buf_woff_)) {
|
|
// unexpected error: unable to serialize the data
|
|
assert(!"Error serializing record");
|
|
return false;
|
|
}
|
|
|
|
disk_woff_ += rec_size;
|
|
eof_ = disk_woff_ >= max_size_;
|
|
|
|
// dispatch buffer for flush
|
|
DispatchBuffer();
|
|
|
|
return true;
|
|
}
|
|
|
|
bool WriteableCacheFile::ExpandBuffer(const size_t size) {
|
|
rwlock_.AssertHeld();
|
|
assert(!eof_);
|
|
|
|
// determine if there is enough space
|
|
size_t free = 0; // compute the free space left in buffer
|
|
for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
|
|
free += bufs_[i]->Free();
|
|
if (size <= free) {
|
|
// we have enough space in the buffer
|
|
return true;
|
|
}
|
|
}
|
|
|
|
// expand the buffer until there is enough space to write `size` bytes
|
|
assert(free < size);
|
|
while (free < size) {
|
|
CacheWriteBuffer* const buf = alloc_->Allocate();
|
|
if (!buf) {
|
|
ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
|
|
return false;
|
|
}
|
|
|
|
size_ += static_cast<uint32_t>(buf->Free());
|
|
free += buf->Free();
|
|
bufs_.push_back(buf);
|
|
}
|
|
|
|
assert(free >= size);
|
|
return true;
|
|
}
|
|
|
|
void WriteableCacheFile::DispatchBuffer() {
|
|
rwlock_.AssertHeld();
|
|
|
|
assert(bufs_.size());
|
|
assert(buf_doff_ <= buf_woff_);
|
|
assert(buf_woff_ <= bufs_.size());
|
|
|
|
if (pending_ios_) {
|
|
return;
|
|
}
|
|
|
|
if (!eof_ && buf_doff_ == buf_woff_) {
|
|
// dispatch buffer is pointing to write buffer and we haven't hit eof
|
|
return;
|
|
}
|
|
|
|
assert(eof_ || buf_doff_ < buf_woff_);
|
|
assert(buf_doff_ < bufs_.size());
|
|
assert(file_);
|
|
|
|
auto* buf = bufs_[buf_doff_];
|
|
const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
|
|
|
|
assert(!buf->Free() ||
|
|
(eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
|
|
// we have reached end of file, and there is space in the last buffer
|
|
// pad it with zero for direct IO
|
|
buf->FillTrailingZeros();
|
|
|
|
assert(buf->Used() % kFileAlignmentSize == 0);
|
|
|
|
writer_->Write(file_.get(), buf, file_off,
|
|
std::bind(&WriteableCacheFile::BufferWriteDone, this));
|
|
pending_ios_++;
|
|
buf_doff_++;
|
|
}
|
|
|
|
void WriteableCacheFile::BufferWriteDone() {
|
|
WriteLock _(&rwlock_);
|
|
|
|
assert(bufs_.size());
|
|
|
|
pending_ios_--;
|
|
|
|
if (buf_doff_ < bufs_.size()) {
|
|
DispatchBuffer();
|
|
}
|
|
|
|
if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
|
|
// end-of-file reached, move to read mode
|
|
CloseAndOpenForReading();
|
|
}
|
|
}
|
|
|
|
void WriteableCacheFile::CloseAndOpenForReading() {
|
|
// Our env abstraction do not allow reading from a file opened for appending
|
|
// We need close the file and re-open it for reading
|
|
Close();
|
|
RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
|
|
}
|
|
|
|
bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
|
|
char* scratch) {
|
|
rwlock_.AssertHeld();
|
|
|
|
if (!ReadBuffer(lba, scratch)) {
|
|
Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
|
|
lba.off_);
|
|
return false;
|
|
}
|
|
|
|
return ParseRec(lba, key, block, scratch);
|
|
}
|
|
|
|
bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
|
|
rwlock_.AssertHeld();
|
|
|
|
assert(lba.off_ < disk_woff_);
|
|
|
|
// we read from the buffers like reading from a flat file. The list of buffers
|
|
// are treated as contiguous stream of data
|
|
|
|
char* tmp = data;
|
|
size_t pending_nbytes = lba.size_;
|
|
// start buffer
|
|
size_t start_idx = lba.off_ / alloc_->BufferSize();
|
|
// offset into the start buffer
|
|
size_t start_off = lba.off_ % alloc_->BufferSize();
|
|
|
|
assert(start_idx <= buf_woff_);
|
|
|
|
for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
|
|
assert(i <= buf_woff_);
|
|
auto* buf = bufs_[i];
|
|
assert(i == buf_woff_ || !buf->Free());
|
|
// bytes to write to the buffer
|
|
size_t nbytes = pending_nbytes > (buf->Used() - start_off)
|
|
? (buf->Used() - start_off)
|
|
: pending_nbytes;
|
|
memcpy(tmp, buf->Data() + start_off, nbytes);
|
|
|
|
// left over to be written
|
|
pending_nbytes -= nbytes;
|
|
start_off = 0;
|
|
tmp += nbytes;
|
|
}
|
|
|
|
assert(!pending_nbytes);
|
|
if (pending_nbytes) {
|
|
return false;
|
|
}
|
|
|
|
assert(tmp == data + lba.size_);
|
|
return true;
|
|
}
|
|
|
|
void WriteableCacheFile::Close() {
|
|
rwlock_.AssertHeld();
|
|
|
|
assert(size_ >= max_size_);
|
|
assert(disk_woff_ >= max_size_);
|
|
assert(buf_doff_ == bufs_.size());
|
|
assert(bufs_.size() - buf_woff_ <= 1);
|
|
assert(!pending_ios_);
|
|
|
|
Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
|
|
disk_woff_);
|
|
|
|
ClearBuffers();
|
|
file_.reset();
|
|
|
|
assert(refs_);
|
|
--refs_;
|
|
}
|
|
|
|
void WriteableCacheFile::ClearBuffers() {
|
|
for (size_t i = 0; i < bufs_.size(); ++i) {
|
|
alloc_->Deallocate(bufs_[i]);
|
|
}
|
|
|
|
bufs_.clear();
|
|
}
|
|
|
|
//
|
|
// ThreadedFileWriter implementation
|
|
//
|
|
ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
|
|
const size_t qdepth, const size_t io_size)
|
|
: Writer(cache), io_size_(io_size) {
|
|
for (size_t i = 0; i < qdepth; ++i) {
|
|
port::Thread th(&ThreadedWriter::ThreadMain, this);
|
|
threads_.push_back(std::move(th));
|
|
}
|
|
}
|
|
|
|
void ThreadedWriter::Stop() {
|
|
// notify all threads to exit
|
|
for (size_t i = 0; i < threads_.size(); ++i) {
|
|
q_.Push(IO(/*signal=*/true));
|
|
}
|
|
|
|
// wait for all threads to exit
|
|
for (auto& th : threads_) {
|
|
th.join();
|
|
assert(!th.joinable());
|
|
}
|
|
threads_.clear();
|
|
}
|
|
|
|
void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
|
|
const uint64_t file_off,
|
|
const std::function<void()> callback) {
|
|
q_.Push(IO(file, buf, file_off, callback));
|
|
}
|
|
|
|
void ThreadedWriter::ThreadMain() {
|
|
while (true) {
|
|
// Fetch the IO to process
|
|
IO io(q_.Pop());
|
|
if (io.signal_) {
|
|
// that's secret signal to exit
|
|
break;
|
|
}
|
|
|
|
// Reserve space for writing the buffer
|
|
while (!cache_->Reserve(io.buf_->Used())) {
|
|
// We can fail to reserve space if every file in the system
|
|
// is being currently accessed
|
|
/* sleep override */
|
|
Env::Default()->SleepForMicroseconds(1000000);
|
|
}
|
|
|
|
DispatchIO(io);
|
|
|
|
io.callback_();
|
|
}
|
|
}
|
|
|
|
void ThreadedWriter::DispatchIO(const IO& io) {
|
|
size_t written = 0;
|
|
while (written < io.buf_->Used()) {
|
|
Slice data(io.buf_->Data() + written, io_size_);
|
|
Status s = io.file_->Append(data);
|
|
assert(s.ok());
|
|
if (!s.ok()) {
|
|
// That is definite IO error to device. There is not much we can
|
|
// do but ignore the failure. This can lead to corruption of data on
|
|
// disk, but the cache will skip while reading
|
|
fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
|
|
}
|
|
written += io_size_;
|
|
}
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
#endif
|