CompactionJob

Summary:
Long awaited CompactionJob class! Move most compaction-related things from DBImpl to CompactionJob, making CompactionJob easier to test and understand.

Currently this is just replicating exactly the same functionality with as little as change as possible. As future work, we should:
1. Add CompactionJob tests (I think I'll do that tomorrow)
2. Reduce CompactionJob's state that it inherits from DBImpl
3. Figure out how to do yielding to flush better. Currently I implemented a callback as we agreed yesterday, but I don't think it's a good long term solution.

This reduces db_impl.cc from 5000+ LOC to 3400!

Test Plan: make check, will add CompactionJob-specific tests, probably also move some tests from db_test to compaction_job_test

Reviewers: rven, yhchiang, sdong, ljin

Reviewed By: ljin

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27957
This commit is contained in:
Igor Canadi 2014-10-31 16:31:25 -07:00
parent 8ddddd62d0
commit 74eb4fbe93
4 changed files with 1276 additions and 1087 deletions

1124
db/compaction_job.cc Normal file

File diff suppressed because it is too large Load Diff

131
db/compaction_job.h Normal file
View File

@ -0,0 +1,131 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <deque>
#include <limits>
#include <set>
#include <utility>
#include <vector>
#include <string>
#include <functional>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"
namespace rocksdb {
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class Arena;
class CompactionJob {
public:
// TODO(icanadi) make effort to reduce number of parameters here
// IMPORTANT: mutable_cf_options needs to be alive while CompactionJob is
// alive
CompactionJob(Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
FileNumToPathIdMap* pending_outputs, LogBuffer* log_buffer,
Directory* db_directory, Statistics* stats,
SnapshotList* snapshot_list, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback);
~CompactionJob() { assert(compact_ == nullptr); }
// no copy/move
CompactionJob(CompactionJob&& job) = delete;
CompactionJob(const CompactionJob& job) = delete;
CompactionJob& operator=(const CompactionJob& job) = delete;
// REQUIRED: mutex held
void Prepare();
// REQUIRED mutex not held
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
Status Install(Status status);
private:
void AllocateCompactionOutputFileNumbers();
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
bool is_compaction_v2,
int* num_output_records);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2);
Status FinishCompactionOutputFile(Iterator* input);
Status InstallCompactionResults();
SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
void RecordCompactionIOStats();
void ReleaseCompactionUnusedFileNumbers();
Status OpenCompactionOutputFile();
void CleanupCompaction(Status status);
// CompactionJob state
struct CompactionState;
CompactionState* compact_;
bool bottommost_level_;
SequenceNumber earliest_snapshot_;
SequenceNumber visible_at_tip_;
SequenceNumber latest_snapshot_;
InternalStats::CompactionStats compaction_stats_;
// DBImpl state
const DBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
const EnvOptions& env_options_;
Env* env_;
VersionSet* versions_;
port::Mutex* db_mutex_;
std::atomic<bool>* shutting_down_;
FileNumToPathIdMap* pending_outputs_;
LogBuffer* log_buffer_;
Directory* db_directory_;
Statistics* stats_;
SnapshotList* snapshots_;
bool is_snapshot_supported_;
std::shared_ptr<Cache> table_cache_;
// yield callback
std::function<uint64_t()> yield_callback_;
};
} // namespace rocksdb

File diff suppressed because it is too large Load Diff

View File

@ -308,41 +308,14 @@ class DBImpl : public DB {
LogBuffer* log_buffer);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
JobContext* job_context, LogBuffer* log_buffer);
// This function is called as part of compaction. It enables Flush process to
// preempt compaction, since it's higher prioirty
// Returns: micros spent executing
uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
JobContext* job_context,
LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported,
SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot, JobContext* job_context,
bool bottommost_level, int64_t* imm_micros, Iterator* input,
CompactionState* compact, bool is_compaction_v2, int* num_output_records,
LogBuffer* log_buffer);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2);
Status OpenCompactionOutputFile(CompactionState* compact,
const MutableCFOptions& mutable_cf_options);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact,
const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer);
void AllocateCompactionOutputFileNumbers(CompactionState* compact);
void ReleaseCompactionUnusedFileNumbers(CompactionState* compact);
void PrintStatistics();
// dump rocksdb.stats to LOG