Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_impl.h
	db/transaction_log_impl.cc
	db/transaction_log_impl.h
	include/rocksdb/options.h
	util/env.cc
	util/options.cc
This commit is contained in:
Igor Canadi 2014-03-03 17:54:04 -08:00
commit 9d0577a6be
45 changed files with 1413 additions and 769 deletions

View File

@ -14,6 +14,11 @@
* Added is_manual_compaction to CompactionFilter::Context
* Added "virtual void WaitForJoin() = 0" in class Env
### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files,
we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it.
## 2.7.0 (01/28/2014)
### Public API changes

View File

@ -12,6 +12,10 @@ OPT += -O2 -fno-omit-frame-pointer -momit-leaf-frame-pointer
else
OPT += -fno-omit-frame-pointer -momit-leaf-frame-pointer
endif
ifeq ($(MAKECMDGOALS),shared_lib)
PLATFORM_SHARED_LDFLAGS=-fPIC
endif
#-----------------------------------------------
# detect what platform we're building on
@ -136,7 +140,7 @@ $(SHARED2): $(SHARED3)
ln -fs $(SHARED3) $(SHARED2)
endif
$(SHARED3): $(LIBOBJECTS)
$(SHARED3):
$(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(LDFLAGS) $(SOURCES) -o $@
endif # PLATFORM_SHARED_EXT

View File

@ -151,6 +151,18 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
refs.store(1, std::memory_order_relaxed);
}
namespace {
void SuperVersionUnrefHandle(void* ptr) {
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->db_mutex->Lock();
sv->Cleanup();
sv->db_mutex->Unlock();
delete sv;
}
}
} // anonymous namespace
ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
const std::string& name,
Version* dummy_versions, Cache* table_cache,
@ -173,6 +185,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
next_(nullptr),
prev_(nullptr),
log_number_(0),
@ -209,6 +222,20 @@ ColumnFamilyData::~ColumnFamilyData() {
prev->next_ = next;
next->prev_ = prev;
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
// It also needs to be done after FlushMemTable, which can trigger local_sv_
// access.
auto sv = static_cast<SuperVersion*>(local_sv_->Get());
if (sv != nullptr) {
auto mutex = sv->db_mutex;
mutex->Unlock();
delete local_sv_;
mutex->Lock();
} else {
delete local_sv_;
}
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
@ -276,11 +303,13 @@ Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion) {
SuperVersion* new_superversion, port::Mutex* db_mutex) {
new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
super_version_->db_mutex = db_mutex;
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
@ -288,6 +317,19 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
return nullptr;
}
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
autovector<void*> sv_ptrs;
local_sv_->Scrape(&sv_ptrs);
for (auto ptr : sv_ptrs) {
assert(ptr);
auto sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->Cleanup();
delete sv;
}
}
}
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& storage_options,

View File

@ -19,6 +19,7 @@
#include "db/memtable_list.h"
#include "db/write_batch_internal.h"
#include "db/table_cache.h"
#include "util/thread_local.h"
namespace rocksdb {
@ -72,6 +73,9 @@ struct SuperVersion {
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
autovector<MemTable*> to_delete;
// Version number of the current SuperVersion
uint64_t version_number;
port::Mutex* db_mutex;
// should be called outside the mutex
SuperVersion() = default;
@ -159,6 +163,12 @@ class ColumnFamilyData {
}
SuperVersion* GetSuperVersion() const { return super_version_; }
SuperVersion* GetAndResetThreadLocalSuperVersion() const {
return static_cast<SuperVersion*>(local_sv_->Swap(nullptr));
}
void SetThreadLocalSuperVersion(SuperVersion* super_version) {
local_sv_->Reset(static_cast<void*>(super_version));
}
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
}
@ -166,7 +176,10 @@ class ColumnFamilyData {
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion);
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex);
void ResetThreadLocalSuperVersions();
// A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files.
@ -212,6 +225,10 @@ class ColumnFamilyData {
// changes.
std::atomic<uint64_t> super_version_number_;
// Thread's local copy of SuperVersion pointer
// This needs to be destructed before mutex_
ThreadLocalPtr* local_sv_;
// pointers for a circular linked list. we use it to support iterations
// that can be concurrent with writes
ColumnFamilyData* next_;

View File

@ -43,6 +43,7 @@
#include "db/write_batch_internal.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "port/likely.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
@ -238,8 +239,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
delayed_writes_(0),
storage_options_(options),
bg_work_gate_closed_(false),
refitting_level_(false) {
refitting_level_(false),
opened_successfully_(false) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
@ -298,6 +299,26 @@ DBImpl::~DBImpl() {
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
if (options_.allow_thread_local) {
// Clean up obsolete files due to SuperVersion release.
// (1) Need to delete to obsolete files before closing because RepairDB()
// scans all existing files in the file system and builds manifest file.
// Keeping obsolete files confuses the repair process.
// (2) Need to check if we Open()/Recover() the DB successfully before
// deleting because if VersionSet recover fails (may be due to corrupted
// manifest file), it is not able to identify live files correctly. As a
// result, all "live" files can get deleted by accident. However, corrupted
// manifest is recoverable by RepairDB().
if (opened_successfully_) {
DeletionState deletion_state;
FindObsoleteFiles(deletion_state, true);
// manifest number starting from 2
deletion_state.manifest_file_number = 1;
PurgeObsoleteFiles(deletion_state);
}
}
mutex_.Unlock();
if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
@ -358,7 +379,8 @@ Status DBImpl::NewDB() {
const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(manifest, &file, storage_options_);
Status s = env_->NewWritableFile(manifest, &file,
storage_options_.AdaptForLogWrite());
if (!s.ok()) {
return s;
}
@ -1229,6 +1251,10 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
if (s.ok()) {
InstallSuperVersion(cfd, deletion_state);
// Reset SuperVersions cached in thread local storage
if (options_.allow_thread_local) {
cfd->ResetThreadLocalSuperVersions();
}
if (madeProgress) {
*madeProgress = 1;
}
@ -1361,7 +1387,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.DebugString().data());
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
superversion_to_free = cfd->InstallSuperVersion(new_superversion);
superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_);
new_superversion = nullptr;
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());
@ -1406,8 +1432,9 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence();
}
Status DBImpl::GetUpdatesSince(SequenceNumber seq,
unique_ptr<TransactionLogIterator>* iter) {
Status DBImpl::GetUpdatesSince(
SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {
RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS);
if (seq > versions_->LastSequence()) {
@ -1427,13 +1454,9 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
if (!s.ok()) {
return s;
}
iter->reset(
new TransactionLogIteratorImpl(options_.wal_dir,
&options_,
storage_options_,
seq,
std::move(wal_files),
this));
iter->reset(new TransactionLogIteratorImpl(options_.wal_dir, &options_,
read_options, storage_options_,
seq, std::move(wal_files), this));
return (*iter)->status();
}
@ -2004,6 +2027,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state);
if (options_.allow_thread_local) {
c->column_family_data()->ResetThreadLocalSuperVersions();
}
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
@ -2815,7 +2841,7 @@ Status DBImpl::Get(const ReadOptions& options,
// DeletionState gets created and destructed outside of the lock -- we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete one SuperVersion() outside of the lock -- superversion_to_free
// * delete SuperVersion()s outside of the lock -- superversions_to_free
//
// However, if InstallSuperVersion() gets called twice with the same,
// deletion_state, we can't reuse the SuperVersion() that got malloced because
@ -2829,14 +2855,10 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
SuperVersion* old_superversion = cfd->InstallSuperVersion(new_superversion);
SuperVersion* old_superversion =
cfd->InstallSuperVersion(new_superversion, &mutex_);
deletion_state.new_superversion = nullptr;
if (deletion_state.superversion_to_free != nullptr) {
// somebody already put it there
delete old_superversion;
} else {
deletion_state.superversion_to_free = old_superversion;
}
deletion_state.superversions_to_free.push_back(old_superversion);
}
Status DBImpl::GetImpl(const ReadOptions& options,
@ -2849,10 +2871,6 @@ Status DBImpl::GetImpl(const ReadOptions& options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
SuperVersion* get_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
@ -2860,6 +2878,41 @@ Status DBImpl::GetImpl(const ReadOptions& options,
snapshot = versions_->LastSequence();
}
// Acquire SuperVersion
SuperVersion* sv = nullptr;
if (LIKELY(options_.allow_thread_local)) {
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up
// cached SuperVersion in all existing thread local storage. To avoid
// acquiring mutex for this operation, we use atomic Swap() on the thread
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. It will eventually get refreshed either
// on the next GetImpl() call or next SuperVersion installation.
sv = cfd->GetAndResetThreadLocalSuperVersion();
if (!sv || sv->version_number != cfd->GetSuperVersionNumber()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
mutex_.Lock();
sv->Cleanup();
sv_to_delete = sv;
} else {
mutex_.Lock();
}
sv = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
delete sv_to_delete;
}
} else {
mutex_.Lock();
sv = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
}
bool have_stat_update = false;
Version::GetStats stats;
@ -2872,11 +2925,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
if (get_version->mem->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->full_options())) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (get_version->imm->Get(lkey, value, &s, merge_context,
} else if (sv->imm->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
@ -2885,7 +2937,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
StopWatchNano from_files_timer(env_, false);
StartPerfTimer(&from_files_timer);
get_version->current->Get(options, lkey, value, &s, &merge_context, &stats,
sv->current->Get(options, lkey, value, &s, &merge_context, &stats,
*cfd->full_options(), value_found);
have_stat_update = true;
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
@ -2895,31 +2947,32 @@ Status DBImpl::GetImpl(const ReadOptions& options,
StopWatchNano post_process_timer(env_, false);
StartPerfTimer(&post_process_timer);
bool delete_get_version = false;
if (!cfd->options()->disable_seek_compaction && have_stat_update) {
mutex_.Lock();
if (get_version->current->UpdateStats(stats)) {
if (sv->current->UpdateStats(stats)) {
MaybeScheduleFlushOrCompaction();
}
if (get_version->Unref()) {
get_version->Cleanup();
delete_get_version = true;
}
mutex_.Unlock();
}
// Release SuperVersion
if (LIKELY(options_.allow_thread_local)) {
// Put the SuperVersion back
cfd->SetThreadLocalSuperVersion(sv);
} else {
if (get_version->Unref()) {
bool delete_sv = false;
if (sv->Unref()) {
mutex_.Lock();
get_version->Cleanup();
sv->Cleanup();
mutex_.Unlock();
delete_get_version = true;
delete_sv = true;
}
if (delete_sv) {
delete sv;
}
if (delete_get_version) {
delete get_version;
}
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
RecordTick(options_.statistics.get(), BYTES_READ, value->size());
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);
@ -3074,6 +3127,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_);
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(options_.info_log, "Created column family \"%s\" (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
@ -3575,11 +3629,9 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
SuperVersion* new_superversion = nullptr;
mutex_.Unlock();
{
EnvOptions soptions(storage_options_);
soptions.use_mmap_writes = false;
DelayLoggingAndReset();
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
&lfile, soptions);
&lfile, storage_options_.AdaptForLogWrite());
if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
@ -3621,7 +3673,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
cfd->GetID(), (unsigned long)logfile_number_);
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
delete cfd->InstallSuperVersion(new_superversion);
delete cfd->InstallSuperVersion(new_superversion, &mutex_);
}
}
return s;
@ -3888,7 +3940,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
*dbptr = nullptr;
handles->clear();
EnvOptions soptions(db_options);
size_t max_write_buffer_size = 0;
for (auto cf : column_families) {
@ -3918,12 +3969,10 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
soptions.use_mmap_writes = false;
EnvOptions soptions(db_options);
s = impl->options_.env->NewWritableFile(
LogFileName(impl->options_.wal_dir, new_log_number),
&lfile,
soptions
);
LogFileName(impl->options_.wal_dir, new_log_number), &lfile,
soptions.AdaptForLogWrite());
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size);
VersionEdit edit;
@ -3953,7 +4002,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete cfd->InstallSuperVersion(new SuperVersion());
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
impl->alive_log_files_.push_back(impl->logfile_number_);
}
impl->DeleteObsoleteFiles();
@ -3985,6 +4034,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
impl->mutex_.Unlock();
if (s.ok()) {
impl->opened_successfully_ = true;
*dbptr = impl;
} else {
for (auto h : *handles) {

View File

@ -28,6 +28,7 @@
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/stats_logger.h"
#include "util/thread_local.h"
#include "db/internal_stats.h"
namespace rocksdb {
@ -121,8 +122,10 @@ class DBImpl : public DB {
bool flush_memtable = true);
virtual Status GetSortedWalFiles(VectorLogPtr& files);
virtual SequenceNumber GetLatestSequenceNumber() const;
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter);
virtual Status GetUpdatesSince(
SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions());
virtual Status DeleteFile(std::string name);
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);
@ -204,7 +207,7 @@ class DBImpl : public DB {
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;
SuperVersion* superversion_to_free; // if nullptr nothing to free
autovector<SuperVersion*> superversions_to_free;
SuperVersion* new_superversion; // if nullptr no new superversion
@ -216,7 +219,6 @@ class DBImpl : public DB {
manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
superversion_to_free = nullptr;
new_superversion = create_superversion ? new SuperVersion() : nullptr;
}
@ -225,8 +227,10 @@ class DBImpl : public DB {
for (auto m : memtables_to_free) {
delete m;
}
// free superversion. if nullptr, this will be noop
delete superversion_to_free;
// free superversions
for (auto s : superversions_to_free) {
delete s;
}
// if new_superversion was not used, it will be non-nullptr and needs
// to be freed here
delete new_superversion;
@ -476,6 +480,9 @@ class DBImpl : public DB {
// Guard against multiple concurrent refitting
bool refitting_level_;
// Indicate DB was opened successfully
bool opened_successfully_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);

View File

@ -103,7 +103,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
error_if_log_file_exist);
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete cfd->InstallSuperVersion(new SuperVersion());
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
}
}
impl->mutex_.Unlock();

View File

@ -5288,8 +5288,10 @@ class ModelDB: public DB {
virtual SequenceNumber GetLatestSequenceNumber() const {
return 0;
}
virtual Status GetUpdatesSince(rocksdb::SequenceNumber,
unique_ptr<rocksdb::TransactionLogIterator>*) {
virtual Status GetUpdatesSince(
rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) {
return Status::NotSupported("Not supported in Model DB");
}

View File

@ -140,7 +140,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
case kEof:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "partial record without end(3)");
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
@ -264,13 +266,12 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
eof_offset_ = buffer_.size();
}
continue;
} else if (buffer_.size() == 0) {
// End of file
return kEof;
} else {
size_t drop_size = buffer_.size();
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Instead of considering this an error,
// just report EOF.
buffer_.clear();
ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
@ -284,14 +285,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption.
return kEof;
}
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
// NOTE: this should never happen in DB written by new RocksDB versions,
// since we turn off mmap writes to manifest and log files
buffer_.clear();
return kBadRecord;
}

View File

@ -446,20 +446,32 @@ TEST(LogTest, BadRecordType) {
ASSERT_EQ("OK", MatchError("unknown record type"));
}
TEST(LogTest, TruncatedTrailingRecord) {
TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read());
ASSERT_EQ((unsigned int)(kHeaderSize - 1), DroppedBytes());
ASSERT_EQ("OK", MatchError("truncated record at end of file"));
// Truncated last record is ignored, not treated as an error
ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}
TEST(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
}
TEST(LogTest, BadLengthAtEndIsIgnored) {
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
ASSERT_EQ((unsigned int)(kHeaderSize + 2), DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}
TEST(LogTest, ChecksumMismatch) {
@ -510,6 +522,24 @@ TEST(LogTest, UnexpectedFirstType) {
ASSERT_EQ("OK", MatchError("partial record without end"));
}
TEST(LogTest, MissingLastIsIgnored) {
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("", ReportMessage());
ASSERT_EQ(0, DroppedBytes());
}
TEST(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("", ReportMessage());
ASSERT_EQ(0, DroppedBytes());
}
TEST(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2)

View File

@ -251,7 +251,6 @@ class Repairer {
}
void ExtractMetaData() {
std::vector<TableInfo> kept;
for (size_t i = 0; i < table_numbers_.size(); i++) {
TableInfo t;
t.meta.number = table_numbers_[i];
@ -317,7 +316,8 @@ class Repairer {
Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status status = env_->NewWritableFile(tmp, &file, storage_options_);
Status status =
env_->NewWritableFile(tmp, &file, storage_options_.AdaptForLogWrite());
if (!status.ok()) {
return status;
}

View File

@ -10,10 +10,12 @@ namespace rocksdb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const DBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl)
: dir_(dir),
options_(options),
read_options_(read_options),
soptions_(soptions),
startingSequenceNumber_(seq),
files_(std::move(files)),
@ -250,9 +252,8 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
return status;
}
assert(file);
currentLogReader_.reset(
new log::Reader(std::move(file), &reporter_, true, 0)
);
currentLogReader_.reset(new log::Reader(std::move(file), &reporter_,
read_options_.verify_checksums_, 0));
return Status::OK();
}
} // namespace rocksdb

View File

@ -66,11 +66,11 @@ class LogFileImpl : public LogFile {
class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(const std::string& dir, const DBOptions* options,
const EnvOptions& soptions,
const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files,
DBImpl const* const dbimpl);
TransactionLogIteratorImpl(
const std::string& dir, const DBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl);
virtual bool Valid();
@ -83,6 +83,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
private:
const std::string& dir_;
const DBOptions* options_;
const TransactionLogIterator::ReadOptions read_options_;
const EnvOptions& soptions_;
SequenceNumber startingSequenceNumber_;
std::unique_ptr<VectorLogPtr> files_;

View File

@ -1585,9 +1585,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// only one thread can be here at the same time
if (!new_manifest_filename.empty()) {
unique_ptr<WritableFile> descriptor_file;
s = env_->NewWritableFile(new_manifest_filename,
&descriptor_file,
storage_options_);
s = env_->NewWritableFile(new_manifest_filename, &descriptor_file,
storage_options_.AdaptForLogWrite());
if (s.ok()) {
descriptor_log_.reset(new log::Writer(std::move(descriptor_file)));
s = WriteSnapshot(descriptor_log_.get());
@ -2615,7 +2614,6 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
AppendVersion(new_cfd, new Version(new_cfd, this, current_version_number_++));
new_cfd->CreateNewMemtable();
new_cfd->SetLogNumber(edit->log_number_);
delete new_cfd->InstallSuperVersion(new SuperVersion());
return new_cfd;
}

View File

@ -420,8 +420,10 @@ class DB {
// use this api, else the WAL files will get
// cleared aggressively and the iterator might keep getting invalid before
// an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) = 0;
virtual Status GetUpdatesSince(
SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) = 0;
// Delete the file name from the db directory and update the internal state to
// reflect that. Supports deletion of sst and log files only. 'name' must be

View File

@ -49,6 +49,8 @@ struct EnvOptions {
// construct from Options
explicit EnvOptions(const DBOptions& options);
EnvOptions AdaptForLogWrite() const;
// If true, then allow caching of data in environment buffers
bool use_os_buffer = true;
@ -511,25 +513,56 @@ class Directory {
virtual Status Fsync() = 0;
};
enum InfoLogLevel {
DEBUG = 0,
INFO,
WARN,
ERROR,
FATAL,
NUM_INFO_LOG_LEVELS,
};
// An interface for writing log messages.
class Logger {
public:
enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 };
Logger() { }
explicit Logger(const InfoLogLevel log_level = InfoLogLevel::ERROR)
: log_level_(log_level) {}
virtual ~Logger();
// Write an entry to the log file with the specified format.
virtual void Logv(const char* format, va_list ap) = 0;
// Write an entry to the log file with the specified log level
// and format. Any log with level under the internal log level
// of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
// printed.
void Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
static const char* kInfoLogLevelNames[5] = {"DEBUG", "INFO", "WARN",
"ERROR", "FATAL"};
if (log_level < log_level_) {
return;
}
char new_format[500];
snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
kInfoLogLevelNames[log_level], format);
Logv(new_format, ap);
}
virtual size_t GetLogFileSize() const {
return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE;
}
// Flush to the OS buffers
virtual void Flush() {}
virtual InfoLogLevel GetInfoLogLevel() const { return log_level_; }
virtual void SetInfoLogLevel(const InfoLogLevel log_level) {
log_level_ = log_level;
}
private:
// No copying allowed
Logger(const Logger&);
void operator=(const Logger&);
InfoLogLevel log_level_;
};
@ -547,7 +580,18 @@ class FileLock {
extern void LogFlush(const shared_ptr<Logger>& info_log);
extern void Log(const InfoLogLevel log_level,
const shared_ptr<Logger>& info_log, const char* format, ...);
// a set of log functions with different log levels.
extern void Debug(const shared_ptr<Logger>& info_log, const char* format, ...);
extern void Info(const shared_ptr<Logger>& info_log, const char* format, ...);
extern void Warn(const shared_ptr<Logger>& info_log, const char* format, ...);
extern void Error(const shared_ptr<Logger>& info_log, const char* format, ...);
extern void Fatal(const shared_ptr<Logger>& info_log, const char* format, ...);
// Log the specified data to *info_log if info_log is non-nullptr.
// The default info log level is InfoLogLevel::ERROR.
extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
__attribute__((__format__ (__printf__, 2, 3)))
@ -556,12 +600,23 @@ extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
extern void LogFlush(Logger *info_log);
extern void Log(const InfoLogLevel log_level, Logger* info_log,
const char* format, ...);
// The default info log level is InfoLogLevel::ERROR.
extern void Log(Logger* info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
__attribute__((__format__ (__printf__, 2, 3)))
# endif
;
// a set of log functions with different log levels.
extern void Debug(Logger* info_log, const char* format, ...);
extern void Info(Logger* info_log, const char* format, ...);
extern void Warn(Logger* info_log, const char* format, ...);
extern void Error(Logger* info_log, const char* format, ...);
extern void Fatal(Logger* info_log, const char* format, ...);
// A utility routine: write "data" to the named file.
extern Status WriteStringToFile(Env* env, const Slice& data,
const std::string& fname);

View File

@ -11,6 +11,7 @@ namespace rocksdb {
class Slice;
class BlockBuilder;
struct Options;
// FlushBlockPolicy provides a configurable way to determine when to flush a
// block in the block based tables,
@ -36,29 +37,22 @@ class FlushBlockPolicyFactory {
// Callers must delete the result after any database that is using the
// result has been closed.
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBuilder& data_block_builder) const = 0;
const Options& options, const BlockBuilder& data_block_builder) const = 0;
virtual ~FlushBlockPolicyFactory() { }
};
class FlushBlockBySizePolicyFactory : public FlushBlockPolicyFactory {
public:
FlushBlockBySizePolicyFactory(const uint64_t block_size,
const uint64_t block_size_deviation) :
block_size_(block_size),
block_size_deviation_(block_size_deviation) {
}
FlushBlockBySizePolicyFactory() {}
virtual const char* Name() const override {
return "FlushBlockBySizePolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const Options& options,
const BlockBuilder& data_block_builder) const override;
private:
const uint64_t block_size_;
const uint64_t block_size_deviation_;
};
} // rocksdb

View File

@ -717,6 +717,10 @@ struct DBOptions {
// Default: 0
uint64_t bytes_per_sync;
// Allow RocksDB to use thread local storage to optimize performance.
// Default: true
bool allow_thread_local;
// Create DBOptions with default values for all fields
DBOptions();
// Create DBOptions from Options

View File

@ -122,6 +122,7 @@ enum Tickers {
// Number of table's properties loaded directly from file, without creating
// table reader object.
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
NUMBER_SUPERVERSION_UPDATES,
TICKER_ENUM_MAX
};
@ -176,7 +177,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
"rocksdb.number.direct.load.table.properties"}, };
"rocksdb.number.direct.load.table.properties"},
{NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"},
};
/**
* Keep adding histogram's here.

View File

@ -54,6 +54,21 @@ struct BlockBasedTableOptions {
// If not specified, each "table reader" object will pre-load index/filter
// block during table initialization.
bool cache_index_and_filter_blocks = false;
// The index type that will be used for this table.
enum IndexType : char {
// A space efficient index block that is optimized for
// binary-search-based index.
kBinarySearch,
};
IndexType index_type = kBinarySearch;
};
// Table Properties that are specific to block-based table properties.
struct BlockBasedTablePropertyNames {
// value of this propertis is a fixed int32 number.
static const std::string kIndexType;
};
// Create default block based table factory.

View File

@ -85,6 +85,19 @@ class TransactionLogIterator {
// earliest transaction contained in the batch.
// ONLY use if Valid() is true and status() is OK.
virtual BatchResult GetBatch() = 0;
// The read options for TransactionLogIterator.
struct ReadOptions {
// If true, all data read from underlying storage will be
// verified against corresponding checksums.
// Default: true
bool verify_checksums_;
ReadOptions() : verify_checksums_(true) {}
explicit ReadOptions(bool verify_checksums)
: verify_checksums_(verify_checksums) {}
};
};
} // namespace rocksdb

View File

@ -188,10 +188,10 @@ class StackableDB : public DB {
return db_->GetPropertiesOfAllTables(column_family, props);
}
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter)
override {
return db_->GetUpdatesSince(seq_number, iter);
virtual Status GetUpdatesSince(
SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) override {
return db_->GetUpdatesSince(seq_number, iter, read_options);
}
virtual ColumnFamilyHandle* DefaultColumnFamily() const override {

21
port/likely.h Normal file
View File

@ -0,0 +1,21 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef PORT_LIKELY_H_
#define PORT_LIKELY_H_
#if defined(__GNUC__) && __GNUC__ >= 4
#define LIKELY(x) (__builtin_expect((x), 1))
#define UNLIKELY(x) (__builtin_expect((x), 0))
#else
#define LIKELY(x) (x)
#define UNLIKELY(x) (x)
#endif
#endif // PORT_LIKELY_H_

View File

@ -26,8 +26,8 @@ class Block {
~Block();
size_t size() const { return size_; }
bool isCachable() const { return cachable_; }
CompressionType compressionType() const { return compression_type_; }
bool cachable() const { return cachable_; }
CompressionType compression_type() const { return compression_type_; }
Iterator* NewIterator(const Comparator* comparator);
const char* data() { return data_; }

View File

@ -11,23 +11,29 @@
#include <assert.h>
#include <inttypes.h>
#include <map>
#include <stdio.h>
#include "rocksdb/flush_block_policy.h"
#include <map>
#include <memory>
#include "db/dbformat.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "table/table_builder.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "db/dbformat.h"
#include "table/block_based_table_reader.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_builder.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/stop_watch.h"
@ -36,11 +42,167 @@ namespace rocksdb {
namespace {
static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
typedef BlockBasedTableOptions::IndexType IndexType;
// The interface for building index.
// Instruction for adding a new concrete IndexBuilder:
// 1. Create a subclass instantiated from IndexBuilder.
// 2. Add a new entry associated with that subclass in TableOptions::IndexType.
// 3. Add a create function for the new subclass in CreateIndexBuilder.
// Note: we can devise more advanced design to simplify the process for adding
// new subclass, which will, on the other hand, increase the code complexity and
// catch unwanted attention from readers. Given that we won't add/change
// indexes frequently, it makes sense to just embrace a more straightforward
// design that just works.
class IndexBuilder {
public:
explicit IndexBuilder(const Comparator* comparator)
: comparator_(comparator) {}
virtual ~IndexBuilder() {}
// Add a new index entry to index block.
// To allow further optimization, we provide `last_key_in_current_block` and
// `first_key_in_next_block`, based on which the specific implementation can
// determine the best index key to be used for the index block.
// @last_key_in_current_block: this parameter maybe overridden with the value
// "substitute key".
// @first_key_in_next_block: it will be nullptr if the entry being added is
// the last one in the table
//
// REQUIRES: Finish() has not yet been called.
virtual void AddEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) = 0;
// Inform the index builder that all entries has been written. Block builder
// may therefore perform any operation required for block finalization.
//
// REQUIRES: Finish() has not yet been called.
virtual Slice Finish() = 0;
// Get the estimated size for index block.
virtual size_t EstimatedSize() const = 0;
protected:
const Comparator* comparator_;
};
// This index builder builds space-efficient index block for binary-search-based
// index.
//
// Optimizations:
// 1. Made block's `block_restart_interval` to be 1, which will avoid linear
// search when doing index lookup.
// 2. Shorten the key length for index block. Other than honestly using the
// last key in the data block as the index key, we instead find a shortest
// substitute key that serves the same function.
class BinarySearchIndexBuilder : public IndexBuilder {
public:
explicit BinarySearchIndexBuilder(const Comparator* comparator)
: IndexBuilder(comparator),
index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {}
virtual void AddEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override {
if (first_key_in_next_block != nullptr) {
comparator_->FindShortestSeparator(last_key_in_current_block,
*first_key_in_next_block);
} else {
comparator_->FindShortSuccessor(last_key_in_current_block);
}
std::string handle_encoding;
block_handle.EncodeTo(&handle_encoding);
index_block_builder_.Add(*last_key_in_current_block, handle_encoding);
}
virtual Slice Finish() override { return index_block_builder_.Finish(); }
virtual size_t EstimatedSize() const {
return index_block_builder_.CurrentSizeEstimate();
}
private:
BlockBuilder index_block_builder_;
};
// Create a index builder based on its type.
IndexBuilder* CreateIndexBuilder(IndexType type, const Comparator* comparator) {
switch (type) {
case BlockBasedTableOptions::kBinarySearch: {
return new BinarySearchIndexBuilder(comparator);
}
default: {
assert(!"Do not recognize the index type ");
return nullptr;
}
}
// impossible.
assert(false);
return nullptr;
}
bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
// Check to see if compressed less than 12.5%
return compressed_size < raw_size - (raw_size / 8u);
}
Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
CompressionType* type, std::string* compressed_output) {
if (*type == kNoCompression) {
return raw;
}
// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (*type) {
case kSnappyCompression:
if (port::Snappy_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kZlibCompression:
if (port::Zlib_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kBZip2Compression:
if (port::BZip2_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4Compression:
if (port::LZ4_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4HCCompression:
if (port::LZ4HC_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
default: {} // Do not recognize this compression type
}
// Compression method is not supported, or not good compression ratio, so just
// fall back to uncompressed form.
*type = kNoCompression;
return raw;
}
} // anonymous namespace
// kBlockBasedTableMagicNumber was picked by running
@ -51,6 +213,46 @@ static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
extern const uint64_t kBlockBasedTableMagicNumber
= 0xdb4775248b80fb57ull;
// A collector that collects properties of interest to block-based table.
// For now this class looks heavy-weight since we only write one additional
// property.
// But in the forseeable future, we will add more and more properties that are
// specific to block-based table.
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
: public TablePropertiesCollector {
public:
BlockBasedTablePropertiesCollector(
BlockBasedTableOptions::IndexType index_type)
: index_type_(index_type) {}
virtual Status Add(const Slice& key, const Slice& value) {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* properties) {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
return Status::OK();
}
// The name of the properties collector can be used for debugging purpose.
virtual const char* Name() const {
return "BlockBasedTablePropertiesCollector";
}
virtual UserCollectedProperties GetReadableProperties() const {
// Intentionally left blank.
return UserCollectedProperties();
}
private:
BlockBasedTableOptions::IndexType index_type_;
};
struct BlockBasedTableBuilder::Rep {
Options options;
const InternalKeyComparator& internal_comparator;
@ -58,7 +260,8 @@ struct BlockBasedTableBuilder::Rep {
uint64_t offset = 0;
Status status;
BlockBuilder data_block;
BlockBuilder index_block;
std::unique_ptr<IndexBuilder> index_builder;
std::string last_key;
CompressionType compression_type;
TableProperties props;
@ -75,28 +278,31 @@ struct BlockBasedTableBuilder::Rep {
Rep(const Options& opt, const InternalKeyComparator& icomparator,
WritableFile* f, FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
CompressionType compression_type, IndexType index_block_type)
: options(opt),
internal_comparator(icomparator),
file(f),
data_block(options, &internal_comparator),
// To avoid linear scan, we make the block_restart_interval to be `1`
// in index block builder
index_block(1 /* block_restart_interval */, &internal_comparator),
index_builder(
CreateIndexBuilder(index_block_type, &internal_comparator)),
compression_type(compression_type),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt, &internal_comparator)),
flush_block_policy(
flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {}
flush_block_policy(flush_block_policy_factory->NewFlushBlockPolicy(
options, data_block)) {
options.table_properties_collectors.push_back(
std::make_shared<BlockBasedTablePropertiesCollector>(index_block_type));
}
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory,
const Options& options, const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator, WritableFile* file,
CompressionType compression_type)
: rep_(new Rep(options, internal_comparator, file,
flush_block_policy_factory, compression_type)) {
table_options.flush_block_policy_factory.get(),
compression_type, table_options.index_type)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
@ -136,10 +342,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// entries in the first block and < all entries in subsequent
// blocks.
if (ok()) {
r->internal_comparator.FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->index_builder->AddEntry(&r->last_key, &key, r->pending_handle);
}
}
@ -179,88 +382,25 @@ void BlockBasedTableBuilder::Flush() {
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle) {
WriteBlock(block->Finish(), handle);
block->Reset();
}
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
std::string* compressed = &r->compressed_output;
CompressionType type = r->compression_type;
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Snappy not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZlibCompression:
if (port::Zlib_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Zlib not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kBZip2Compression:
if (port::BZip2_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// BZip not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kLZ4Compression:
if (port::LZ4_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// LZ4 not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kLZ4HCCompression:
if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// LZ4 not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
auto type = r->compression_type;
auto block_contents =
CompressBlock(raw_block_contents, r->options.compression_opts, &type,
&r->compressed_output);
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
@ -364,11 +504,8 @@ Status BlockBasedTableBuilder::Finish() {
// block, we will finish writing all index entries here and flush them
// to storage after metaindex block is written.
if (ok() && !empty_data_block) {
r->internal_comparator.FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, handle_encoding);
r->index_builder->AddEntry(&r->last_key, nullptr /* no next data block */,
r->pending_handle);
}
// Write meta blocks and metaindex block with the following order.
@ -394,11 +531,12 @@ Status BlockBasedTableBuilder::Finish() {
r->props.filter_policy_name = r->options.filter_policy != nullptr ?
r->options.filter_policy->Name() : "";
r->props.index_size =
r->index_block.CurrentSizeEstimate() + kBlockTrailerSize;
r->index_builder->EstimatedSize() + kBlockTrailerSize;
// Add basic properties
property_block_builder.AddTableProperty(r->props);
// Add use collected properties
NotifyCollectTableCollectorsOnFinish(
r->options.table_properties_collectors,
r->options.info_log.get(),
@ -425,7 +563,7 @@ Status BlockBasedTableBuilder::Finish() {
// Write index block
if (ok()) {
WriteBlock(&r->index_block, &index_block_handle);
WriteBlock(r->index_builder->Finish(), &index_block_handle);
}
// Write footer

View File

@ -9,6 +9,7 @@
#pragma once
#include <stdint.h>
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
@ -19,6 +20,7 @@ namespace rocksdb {
class BlockBuilder;
class BlockHandle;
class WritableFile;
struct BlockBasedTableOptions;
class BlockBasedTableBuilder : public TableBuilder {
public:
@ -26,10 +28,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish().
BlockBasedTableBuilder(const Options& options,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type);
WritableFile* file, CompressionType compression_type);
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();
@ -63,11 +64,17 @@ class BlockBasedTableBuilder : public TableBuilder {
private:
bool ok() const { return status().ok(); }
// Call block's Finish() method and then write the finalize block contents to
// file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle);
// Directly write block content to the file.
void WriteBlock(const Slice& block_contents, BlockHandle* handle);
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);
Status InsertBlockInCache(const Slice& block_contents,
const CompressionType type, const BlockHandle* handle);
const CompressionType type,
const BlockHandle* handle);
struct Rep;
class BlockBasedTablePropertiesCollector;
Rep* rep_;
// Advanced operation: flush any buffered key/value pairs to file.
@ -82,4 +89,3 @@ class BlockBasedTableBuilder : public TableBuilder {
};
} // namespace rocksdb

View File

@ -11,13 +11,25 @@
#include "table/block_based_table_factory.h"
#include <memory>
#include <string>
#include <stdint.h>
#include "rocksdb/flush_block_policy.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "port/port.h"
namespace rocksdb {
BlockBasedTableFactory::BlockBasedTableFactory(
const BlockBasedTableOptions& table_options)
: table_options_(table_options) {
if (table_options_.flush_block_policy_factory == nullptr) {
table_options_.flush_block_policy_factory.reset(
new FlushBlockBySizePolicyFactory());
}
}
Status BlockBasedTableFactory::NewTableReader(
const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
@ -31,34 +43,8 @@ Status BlockBasedTableFactory::NewTableReader(
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const {
auto flush_block_policy_factory =
table_options_.flush_block_policy_factory.get();
// if flush block policy factory is not set, we'll create the default one
// from the options.
//
// NOTE: we cannot pre-cache the "default block policy factory" because
// `FlushBlockBySizePolicyFactory` takes `options.block_size` and
// `options.block_size_deviation` as parameters, which may be different
// every time.
if (flush_block_policy_factory == nullptr) {
flush_block_policy_factory =
new FlushBlockBySizePolicyFactory(options.block_size,
options.block_size_deviation);
}
auto table_builder =
new BlockBasedTableBuilder(options, internal_comparator, file,
flush_block_policy_factory, compression_type);
// Delete flush_block_policy_factory only when it's just created from the
// options.
// We can safely delete flush_block_policy_factory since it will only be used
// during the construction of `BlockBasedTableBuilder`.
if (flush_block_policy_factory !=
table_options_.flush_block_policy_factory.get()) {
delete flush_block_policy_factory;
}
auto table_builder = new BlockBasedTableBuilder(
options, table_options_, internal_comparator, file, compression_type);
return table_builder;
}
@ -68,4 +54,7 @@ TableFactory* NewBlockBasedTableFactory(
return new BlockBasedTableFactory(table_options);
}
const std::string BlockBasedTablePropertyNames::kIndexType =
"rocksdb.block.based.table.index.type";
} // namespace rocksdb

View File

@ -26,8 +26,7 @@ class BlockBasedTableBuilder;
class BlockBasedTableFactory : public TableFactory {
public:
explicit BlockBasedTableFactory(
const BlockBasedTableOptions& table_options = BlockBasedTableOptions())
: table_options_(table_options) {}
const BlockBasedTableOptions& table_options = BlockBasedTableOptions());
~BlockBasedTableFactory() {}

File diff suppressed because it is too large Load Diff

View File

@ -8,12 +8,14 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <memory>
#include <stdint.h>
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include <memory>
#include <utility>
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/table_reader.h"
#include "util/coding.h"
@ -21,14 +23,19 @@ namespace rocksdb {
class Block;
class BlockHandle;
class Cache;
class FilterBlockReader;
class Footer;
struct Options;
class InternalKeyComparator;
class Iterator;
class RandomAccessFile;
struct ReadOptions;
class TableCache;
class TableReader;
class FilterBlockReader;
class WritableFile;
struct BlockBasedTableOptions;
struct EnvOptions;
struct Options;
struct ReadOptions;
using std::unique_ptr;
@ -91,7 +98,9 @@ class BlockBasedTable : public TableReader {
~BlockBasedTable();
bool TEST_filter_block_preloaded() const;
bool TEST_index_block_preloaded() const;
bool TEST_index_reader_preloaded() const;
// Implementation of IndexReader will be exposed to internal cc file only.
class IndexReader;
private:
template <class TValue>
@ -101,40 +110,51 @@ class BlockBasedTable : public TableReader {
Rep* rep_;
bool compaction_optimized_;
static Iterator* BlockReader(void*, const ReadOptions&,
static Iterator* DataBlockReader(void*, const ReadOptions&,
const EnvOptions& soptions,
const InternalKeyComparator& icomparator,
const Slice&, bool for_compaction);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
static Iterator* DataBlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO, bool for_compaction = false);
// if `no_io == true`, we will not try to read filter from sst file
// if it is not cached yet.
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
// were they not present in cache yet.
CachableEntry<FilterBlockReader> GetFilter(bool no_io = false) const;
Iterator* IndexBlockReader(const ReadOptions& options) const;
// Get the iterator from the index reader.
//
// Note: ErrorIterator with Status::Incomplete shall be returned if all the
// following conditions are met:
// 1. We enabled table_options.cache_index_and_filter_blocks.
// 2. index is not present in block cache.
// 3. We disallowed any io to be performed, that is, read_options ==
// kBlockCacheTier
Iterator* NewIndexIterator(const ReadOptions& read_options) const;
// Read the block, either from sst file or from cache. This method will try
// to read from cache only when block_cache is set or ReadOption doesn't
// explicitly prohibit storage IO.
// Read block cache from block caches (if set): block_cache and
// block_cache_compressed.
// On success, Status::OK with be returned and @block will be populated with
// pointer to the block as well as its block handle.
static Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
// populate the block caches.
// On success, Status::OK will be returned; also @block will be populated with
// uncompressed block and its cache handle.
//
// If the block is read from cache, the statistics for cache miss/hit of the
// the given type of block will be updated. User can specify
// `block_cache_miss_ticker` and `block_cache_hit_ticker` for the statistics
// update.
//
// On success, the `result` parameter will be populated, which contains a
// pointer to the block and its cache handle, which will be nullptr if it's
// not read from the cache.
static Status GetBlock(const BlockBasedTable* table,
const BlockHandle& handle,
const ReadOptions& options,
bool for_compaction,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker,
bool* didIO,
CachableEntry<Block>* result);
// REQUIRES: raw_block is heap-allocated. PutDataBlockToCache() will be
// responsible for releasing its memory if error occurs.
static Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics,
CachableEntry<Block>* block, Block* raw_block);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
@ -144,6 +164,7 @@ class BlockBasedTable : public TableReader {
void ReadMeta(const Footer& footer);
void ReadFilter(const Slice& filter_handle_value);
Status CreateIndexReader(IndexReader** index_reader) const;
// Read the meta block from sst.
static Status ReadMetaBlock(
@ -159,10 +180,9 @@ class BlockBasedTable : public TableReader {
static void SetupCacheKeyPrefix(Rep* rep);
explicit BlockBasedTable(Rep* rep) :
compaction_optimized_(false) {
rep_ = rep;
}
explicit BlockBasedTable(Rep* rep)
: rep_(rep), compaction_optimized_(false) {}
// Generate a cache key prefix from the file
static void GenerateCachePrefix(Cache* cc,
RandomAccessFile* file, char* buffer, size_t* size);

View File

@ -3,6 +3,7 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/options.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/slice.h"
#include "table/block_builder.h"
@ -61,10 +62,9 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy {
};
FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
const BlockBuilder& data_block_builder) const {
return new FlushBlockBySizePolicy(block_size_,
block_size_deviation_,
data_block_builder);
const Options& options, const BlockBuilder& data_block_builder) const {
return new FlushBlockBySizePolicy(
options.block_size, options.block_size_deviation, data_block_builder);
}
} // namespace rocksdb

View File

@ -527,13 +527,14 @@ Status PlainTableReader::ReadKey(const char* start, ParsedInternalKey* key,
key_ptr =
GetVarint32Ptr(start, file_data_.data() + data_end_offset_, &tmp_size);
if (key_ptr == nullptr) {
return Status::Corruption("Unable to read the next key");
return Status::Corruption(
"Unexpected EOF when reading the next key's size");
}
user_key_size = (size_t)tmp_size;
*bytes_read = key_ptr - start;
}
if (key_ptr + user_key_size + 1 >= file_data_.data() + data_end_offset_) {
return Status::Corruption("Unable to read the next key");
return Status::Corruption("Unexpected EOF when reading the next key");
}
if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) {
@ -544,10 +545,12 @@ Status PlainTableReader::ReadKey(const char* start, ParsedInternalKey* key,
*bytes_read += user_key_size + 1;
} else {
if (start + user_key_size + 8 >= file_data_.data() + data_end_offset_) {
return Status::Corruption("Unable to read the next key");
return Status::Corruption(
"Unexpected EOF when reading internal bytes of the next key");
}
if (!ParseInternalKey(Slice(key_ptr, user_key_size + 8), key)) {
return Status::Corruption(Slice());
return Status::Corruption(
Slice("Incorrect value type found when reading the next key"));
}
*bytes_read += user_key_size + 8;
}
@ -569,15 +572,19 @@ Status PlainTableReader::Next(uint32_t* offset, ParsedInternalKey* key,
const char* start = file_data_.data() + *offset;
size_t bytes_for_key;
Status s = ReadKey(start, key, &bytes_for_key);
if (!s.ok()) {
return s;
}
uint32_t value_size;
const char* value_ptr = GetVarint32Ptr(
start + bytes_for_key, file_data_.data() + data_end_offset_, &value_size);
if (value_ptr == nullptr) {
return Status::Corruption("Error reading value length.");
return Status::Corruption(
"Unexpected EOF when reading the next value's size.");
}
*offset = *offset + (value_ptr - start) + value_size;
if (*offset > data_end_offset_) {
return Status::Corruption("Reach end of file when reading value");
return Status::Corruption("Unexpected EOF when reading the next value. ");
}
*value = Slice(value_ptr, value_size);

View File

@ -9,6 +9,7 @@
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
#include <map>
#include <string>
@ -16,8 +17,6 @@
#include <vector>
#include "db/dbformat.h"
#include "rocksdb/statistics.h"
#include "util/statistics.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
@ -25,11 +24,11 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "table/block.h"
#include "table/meta_blocks.h"
#include "table/block_based_table_reader.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_factory.h"
#include "table/block_based_table_reader.h"
@ -39,6 +38,7 @@
#include "table/plain_table_factory.h"
#include "util/random.h"
#include "util/statistics.h"
#include "util/testharness.h"
#include "util/testutil.h"
@ -690,8 +690,7 @@ class Harness {
switch (args.type) {
case BLOCK_BASED_TABLE_TEST:
table_options.flush_block_policy_factory.reset(
new FlushBlockBySizePolicyFactory(options_.block_size,
options_.block_size_deviation));
new FlushBlockBySizePolicyFactory());
options_.table_factory.reset(new BlockBasedTableFactory(table_options));
constructor_ = new TableConstructor(options_.comparator);
break;
@ -1203,7 +1202,7 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) {
// preloading filter/index blocks is enabled.
auto reader = dynamic_cast<BlockBasedTable*>(c.table_reader());
ASSERT_TRUE(reader->TEST_filter_block_preloaded());
ASSERT_TRUE(reader->TEST_index_block_preloaded());
ASSERT_TRUE(reader->TEST_index_reader_preloaded());
{
// nothing happens in the beginning
@ -1244,7 +1243,7 @@ TEST(BlockBasedTableTest, FilterBlockInBlockCache) {
// preloading filter/index blocks is prohibited.
auto reader = dynamic_cast<BlockBasedTable*>(c.table_reader());
ASSERT_TRUE(!reader->TEST_filter_block_preloaded());
ASSERT_TRUE(!reader->TEST_index_block_preloaded());
ASSERT_TRUE(!reader->TEST_index_reader_preloaded());
// -- PART 1: Open with regular block cache.
// Since block_cache is disabled, no cache activities will be involved.

View File

@ -70,7 +70,7 @@ def main(argv):
--threads=%s
--write_buffer_size=%s
--destroy_db_initially=0
--reopen=0
--reopen=20
--readpercent=45
--prefixpercent=5
--writepercent=35

View File

@ -84,7 +84,7 @@ def main(argv):
--threads=%s
--write_buffer_size=%s
--destroy_db_initially=0
--reopen=0
--reopen=20
--readpercent=45
--prefixpercent=5
--writepercent=35

View File

@ -17,9 +17,10 @@ namespace rocksdb {
class AutoRollLogger : public Logger {
public:
AutoRollLogger(Env* env, const std::string& dbname,
const std::string& db_log_dir,
size_t log_max_size,
size_t log_file_time_to_roll):
const std::string& db_log_dir, size_t log_max_size,
size_t log_file_time_to_roll,
const InfoLogLevel log_level = InfoLogLevel::ERROR)
: Logger(log_level),
dbname_(dbname),
db_log_dir_(db_log_dir),
env_(env),

View File

@ -5,12 +5,15 @@
//
#include <string>
#include <cmath>
#include <iostream>
#include <fstream>
#include <iterator>
#include <algorithm>
#include "util/testharness.h"
#include "util/auto_roll_logger.h"
#include "rocksdb/db.h"
#include <sys/stat.h>
#include <errno.h>
#include <iostream>
using namespace std;
@ -39,10 +42,8 @@ class AutoRollLoggerTest {
const string AutoRollLoggerTest::kSampleMessage(
"this is the message to be written to the log file!!");
const string AutoRollLoggerTest::kTestDir(
test::TmpDir() + "/db_log_test");
const string AutoRollLoggerTest::kLogFile(
test::TmpDir() + "/db_log_test/LOG");
const string AutoRollLoggerTest::kTestDir(test::TmpDir() + "/db_log_test");
const string AutoRollLoggerTest::kLogFile(test::TmpDir() + "/db_log_test/LOG");
Env* AutoRollLoggerTest::env = Env::Default();
// In this test we only want to Log some simple log message with
@ -53,6 +54,11 @@ void LogMessage(Logger* logger, const char* message) {
Log(logger, "%s", message);
}
void LogMessage(const InfoLogLevel log_level, Logger* logger,
const char* message) {
Log(log_level, logger, "%s", message);
}
void GetFileCreateTime(const std::string& fname, uint64_t* file_ctime) {
struct stat s;
if (stat(fname.c_str(), &s) != 0) {
@ -64,6 +70,7 @@ void GetFileCreateTime(const std::string& fname, uint64_t* file_ctime) {
void AutoRollLoggerTest::RollLogFileBySizeTest(AutoRollLogger* logger,
size_t log_max_size,
const string& log_message) {
logger->SetInfoLogLevel(InfoLogLevel::INFO);
// measure the size of each message, which is supposed
// to be equal or greater than log_message.size()
LogMessage(logger, log_message.c_str());
@ -131,7 +138,6 @@ TEST(AutoRollLoggerTest, RollLogFileBySize) {
RollLogFileBySizeTest(&logger, log_max_size,
kSampleMessage + ":RollLogFileBySize");
}
TEST(AutoRollLoggerTest, RollLogFileByTime) {
@ -235,6 +241,46 @@ TEST(AutoRollLoggerTest, CreateLoggerFromOptions) {
kSampleMessage + ":CreateLoggerFromOptions - both");
}
TEST(AutoRollLoggerTest, InfoLogLevel) {
InitTestDb();
size_t log_size = 8192;
size_t log_lines = 0;
// an extra-scope to force the AutoRollLogger to flush the log file when it
// becomes out of scope.
{
AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0);
for (int log_level = InfoLogLevel::FATAL; log_level >= InfoLogLevel::DEBUG;
log_level--) {
logger.SetInfoLogLevel((InfoLogLevel)log_level);
for (int log_type = InfoLogLevel::DEBUG; log_type <= InfoLogLevel::FATAL;
log_type++) {
// log messages with log level smaller than log_level will not be
// logged.
LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str());
}
log_lines += InfoLogLevel::FATAL - log_level + 1;
}
for (int log_level = InfoLogLevel::FATAL; log_level >= InfoLogLevel::DEBUG;
log_level--) {
logger.SetInfoLogLevel((InfoLogLevel)log_level);
// again, messages with level smaller than log_level will not be logged.
Debug(&logger, "%s", kSampleMessage.c_str());
Info(&logger, "%s", kSampleMessage.c_str());
Warn(&logger, "%s", kSampleMessage.c_str());
Error(&logger, "%s", kSampleMessage.c_str());
Fatal(&logger, "%s", kSampleMessage.c_str());
log_lines += InfoLogLevel::FATAL - log_level + 1;
}
}
std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str());
size_t lines = std::count(std::istreambuf_iterator<char>(inFile),
std::istreambuf_iterator<char>(), '\n');
ASSERT_EQ(log_lines, lines);
inFile.close();
}
int OldLogFileCount(const string& dir) {
std::vector<std::string> files;
Env::Default()->GetChildren(dir, &files);

View File

@ -45,12 +45,120 @@ void Log(Logger* info_log, const char* format, ...) {
}
}
void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(log_level, format, ap);
va_end(ap);
}
}
void Debug(Logger* info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::DEBUG, format, ap);
va_end(ap);
}
}
void Info(Logger* info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::INFO, format, ap);
va_end(ap);
}
}
void Warn(Logger* info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::WARN, format, ap);
va_end(ap);
}
}
void Error(Logger* info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::ERROR, format, ap);
va_end(ap);
}
}
void Fatal(Logger* info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::FATAL, format, ap);
va_end(ap);
}
}
void LogFlush(const shared_ptr<Logger>& info_log) {
if (info_log) {
info_log->Flush();
}
}
void Log(const InfoLogLevel log_level, const shared_ptr<Logger>& info_log,
const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(log_level, format, ap);
va_end(ap);
}
}
void Debug(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::DEBUG, format, ap);
va_end(ap);
}
}
void Info(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::INFO, format, ap);
va_end(ap);
}
}
void Warn(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::WARN, format, ap);
va_end(ap);
}
}
void Error(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::ERROR, format, ap);
va_end(ap);
}
}
void Fatal(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) {
va_list ap;
va_start(ap, format);
info_log->Logv(InfoLogLevel::FATAL, format, ap);
va_end(ap);
}
}
void Log(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) {
va_list ap;
@ -129,6 +237,12 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
}
EnvOptions EnvOptions::AdaptForLogWrite() const {
EnvOptions adapted = *this;
adapted.use_mmap_writes = false;
return adapted;
}
EnvOptions::EnvOptions(const DBOptions& options) {
AssignEnvOptions(this, options);
}

View File

@ -236,8 +236,9 @@ class HdfsLogger : public Logger {
uint64_t (*gettid_)(); // Return the thread id for the current thread
public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
: file_(f), gettid_(gettid) {
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)(),
const InfoLogLevel log_level = InfoLogLevel::ERROR)
: Logger(log_level), file_(f), gettid_(gettid) {
Log(mylog, "[hdfs] HdfsLogger opened %s\n",
file_->getName().c_str());
}

View File

@ -176,7 +176,8 @@ DBOptions::DBOptions()
advise_random_on_open(true),
access_hint_on_compaction_start(NORMAL),
use_adaptive_mutex(false),
bytes_per_sync(0) { }
bytes_per_sync(0),
allow_thread_local(true) {}
DBOptions::DBOptions(const Options& options)
: create_if_missing(options.create_if_missing),
@ -214,7 +215,8 @@ DBOptions::DBOptions(const Options& options)
advise_random_on_open(options.advise_random_on_open),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync) {}
bytes_per_sync(options.bytes_per_sync),
allow_thread_local(options.allow_thread_local) {}
static const char* const access_hints[] = {
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"

View File

@ -38,9 +38,16 @@ class PosixLogger : public Logger {
Env* env_;
bool flush_pending_;
public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) :
file_(f), gettid_(gettid), log_size_(0), fd_(fileno(f)),
last_flush_micros_(0), env_(env), flush_pending_(false) { }
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env,
const InfoLogLevel log_level = InfoLogLevel::ERROR)
: Logger(log_level),
file_(f),
gettid_(gettid),
log_size_(0),
fd_(fileno(f)),
last_flush_micros_(0),
env_(env),
flush_pending_(false) {}
virtual ~PosixLogger() {
fclose(file_);
}

View File

@ -7,11 +7,11 @@
#include "rocksdb/statistics.h"
#include "util/histogram.h"
#include "util/mutexlock.h"
#include "port/likely.h"
#include <vector>
#include <atomic>
#define UNLIKELY(val) (__builtin_expect((val), 0))
namespace rocksdb {

View File

@ -9,12 +9,8 @@
#include "util/thread_local.h"
#include "util/mutexlock.h"
#include "port/likely.h"
#if defined(__GNUC__) && __GNUC__ >= 4
#define UNLIKELY(x) (__builtin_expect((x), 0))
#else
#define UNLIKELY(x) (x)
#endif
namespace rocksdb {

View File

@ -16,6 +16,7 @@
#include "util/autovector.h"
#include "port/port_posix.h"
#include "util/thread_local.h"
namespace rocksdb {

View File

@ -58,52 +58,52 @@ TEST(ThreadLocalTest, UniqueIdTest) {
port::Mutex mu;
port::CondVar cv(&mu);
ASSERT_EQ(IDChecker::PeekId(), 0);
ASSERT_EQ(IDChecker::PeekId(), 0u);
// New ThreadLocal instance bumps id by 1
{
// Id used 0
Params p1(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 1);
Params p1(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 1u);
// Id used 1
Params p2(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 2);
Params p2(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 2u);
// Id used 2
Params p3(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 3);
Params p3(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 3u);
// Id used 3
Params p4(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 4);
Params p4(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 4u);
}
// id 3, 2, 1, 0 are in the free queue in order
ASSERT_EQ(IDChecker::PeekId(), 0);
ASSERT_EQ(IDChecker::PeekId(), 0u);
// pick up 0
Params p1(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 1);
Params p1(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 1u);
// pick up 1
Params* p2 = new Params(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 2);
Params* p2 = new Params(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 2u);
// pick up 2
Params p3(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 3);
Params p3(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 3u);
// return up 1
delete p2;
ASSERT_EQ(IDChecker::PeekId(), 1);
ASSERT_EQ(IDChecker::PeekId(), 1u);
// Now we have 3, 1 in queue
// pick up 1
Params p4(&mu, &cv, nullptr, 1);
ASSERT_EQ(IDChecker::PeekId(), 3);
Params p4(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 3u);
// pick up 3
Params p5(&mu, &cv, nullptr, 1);
Params p5(&mu, &cv, nullptr, 1u);
// next new id
ASSERT_EQ(IDChecker::PeekId(), 4);
ASSERT_EQ(IDChecker::PeekId(), 4u);
// After exit, id sequence in queue:
// 3, 1, 2, 0
}
TEST(ThreadLocalTest, SequentialReadWriteTest) {
// global id list carries over 3, 1, 2, 0
ASSERT_EQ(IDChecker::PeekId(), 0);
ASSERT_EQ(IDChecker::PeekId(), 0u);
port::Mutex mu;
port::CondVar cv(&mu);
@ -133,7 +133,7 @@ TEST(ThreadLocalTest, SequentialReadWriteTest) {
};
for (int iter = 0; iter < 1024; ++iter) {
ASSERT_EQ(IDChecker::PeekId(), 1);
ASSERT_EQ(IDChecker::PeekId(), 1u);
// Another new thread, read/write should not see value from previous thread
env_->StartThread(func, static_cast<void*>(&p));
mu.Lock();
@ -141,13 +141,13 @@ TEST(ThreadLocalTest, SequentialReadWriteTest) {
cv.Wait();
}
mu.Unlock();
ASSERT_EQ(IDChecker::PeekId(), 1);
ASSERT_EQ(IDChecker::PeekId(), 1u);
}
}
TEST(ThreadLocalTest, ConcurrentReadWriteTest) {
// global id list carries over 3, 1, 2, 0
ASSERT_EQ(IDChecker::PeekId(), 0);
ASSERT_EQ(IDChecker::PeekId(), 0u);
ThreadLocalPtr tls2;
port::Mutex mu1;
@ -226,11 +226,11 @@ TEST(ThreadLocalTest, ConcurrentReadWriteTest) {
}
mu2.Unlock();
ASSERT_EQ(IDChecker::PeekId(), 3);
ASSERT_EQ(IDChecker::PeekId(), 3u);
}
TEST(ThreadLocalTest, Unref) {
ASSERT_EQ(IDChecker::PeekId(), 0);
ASSERT_EQ(IDChecker::PeekId(), 0u);
auto unref = [](void* ptr) {
auto& p = *static_cast<Params*>(ptr);