rocksdb/db/db_impl.h
Mark Callaghan 70c42bf05f Adds DB::GetNextCompaction and then uses that for rate limiting db_bench
Summary:
Adds a method that returns the score for the next level that most
needs compaction. That method is then used by db_bench to rate limit threads.
Threads are put to sleep at the end of each stats interval until the score
is less than the limit. The limit is set via the --rate_limit=$double option.
The specified value must be > 1.0. Also adds the option --stats_per_interval
to enable additional metrics reported every stats interval.

Task ID: #

Blame Rev:

Test Plan:
run db_bench

Revert Plan:

Database Impact:

Memcache Impact:

Other Notes:

EImportant:

- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D6243
2012-10-29 10:17:43 -07:00

292 lines
9.5 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <deque>
#include <set>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/stats_logger.h"
#ifdef USE_SCRIBE
#include "scribe/scribe_logger.h"
#endif
namespace leveldb {
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class DBImpl : public DB {
public:
DBImpl(const Options& options, const std::string& dbname);
virtual ~DBImpl();
// Implementations of the DB interface
virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value);
virtual Status Delete(const WriteOptions&, const Slice& key);
virtual Status Write(const WriteOptions& options, WriteBatch* updates);
virtual Status Get(const ReadOptions& options,
const Slice& key,
std::string* value);
virtual Iterator* NewIterator(const ReadOptions&);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end);
virtual int NumberLevels();
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
virtual Status Flush(const FlushOptions& options);
virtual Status DisableFileDeletions();
virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
// Wait for memtable compaction
Status TEST_WaitForCompactMemTable();
// Wait for any compaction
Status TEST_WaitForCompact();
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
Iterator* TEST_NewInternalIterator();
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
private:
friend class DB;
struct CompactionState;
struct Writer;
struct DeletionState;
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit);
void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable();
Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
WriteBatch* BuildBatchGroup(Writer** last_writer);
// Force current memtable contents to be flushed.
Status FlushMemTable(const FlushOptions& options);
// Wait for memtable compaction
Status WaitForCompactMemTable();
void MaybeScheduleLogDBDeployStats();
static void BGLogDBDeployStats(void* db);
void LogDBDeployStats();
void MaybeScheduleCompaction();
static void BGWork(void* db);
void BackgroundCall();
Status BackgroundCompaction(DeletionState& deletion_state);
void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact);
Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact);
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'allfiles'.
void FindObsoleteFiles(DeletionState& deletion_state);
// Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. If the removed file
// is a sst file, then it returns the file number in files_to_evict.
void PurgeObsoleteFiles(DeletionState& deletion_state);
// Removes the file listed in files_to_evict from the table_cache
void EvictObsoleteFiles(DeletionState& deletion_state);
// Constant after construction
Env* const env_;
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
bool owns_info_log_;
bool owns_cache_;
const std::string dbname_;
// table_cache_ provides its own synchronization
TableCache* table_cache_;
// Lock over the persistent DB state. Non-NULL iff successfully acquired.
FileLock* db_lock_;
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
MemTable* mem_;
MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
std::string host_name_;
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch* tmp_batch_;
SnapshotList snapshots_;
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
std::set<uint64_t> pending_outputs_;
// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;
// Has a background stats log thread scheduled?
bool bg_logstats_scheduled_;
// Information for a manual compaction
struct ManualCompaction {
int level;
bool done;
const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
ManualCompaction* manual_compaction_;
VersionSet* versions_;
// Have we encountered a background error in paranoid mode?
Status bg_error_;
StatsLogger* logger_;
int64_t volatile last_log_ts;
// shall we disable deletion of obsolete files
bool disable_delete_obsolete_files_;
// last time when DeleteObsoleteFiles was invoked
uint64_t delete_obsolete_files_last_run_;
// These count the number of microseconds for which MakeRoomForWrite stalls.
uint64_t stall_level0_slowdown_;
uint64_t stall_memtable_compaction_;
uint64_t stall_level0_num_files_;
uint64_t stall_leveln_slowdown_;
// Time at which this instance was started.
const uint64_t started_at_;
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
int64_t micros;
// Bytes read from level N during compaction between levels N and N+1
int64_t bytes_readn;
// Bytes read from level N+1 during compaction between levels N and N+1
int64_t bytes_readnp1;
// Total bytes written during compaction between levels N and N+1
int64_t bytes_written;
// Files read from level N during compaction between levels N and N+1
int files_in_leveln;
// Files read from level N+1 during compaction between levels N and N+1
int files_in_levelnp1;
// Files written during compaction between levels N and N+1
int files_out_levelnp1;
// Number of compactions done
int count;
CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0),
bytes_written(0), files_in_leveln(0),
files_in_levelnp1(0), files_out_levelnp1(0),
count(0) { }
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->bytes_readn += c.bytes_readn;
this->bytes_readnp1 += c.bytes_readnp1;
this->bytes_written += c.bytes_written;
this->files_in_leveln += c.files_in_leveln;
this->files_in_levelnp1 += c.files_in_levelnp1;
this->files_out_levelnp1 += c.files_out_levelnp1;
this->count += 1;
}
};
CompactionStats* stats_;
static const int KEEP_LOG_FILE_NUM = 1000;
std::string db_absolute_path_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
};
// Sanitize db options. The caller should delete result.info_log if
// it is not equal to src.info_log.
extern Options SanitizeOptions(const std::string& db,
const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const Options& src);
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_DB_IMPL_H_