rocksdb/db/flush_job.cc
sdong d5a51d4de3 Need to make sure log file synced before flushing memtable of one column family
Summary: Multiput atomiciy is broken across multiple column families if we don't sync WAL before flushing one column family. The WAL file may contain a write batch containing writes to a key to the CF to be flushed and a key to other CF. If we don't sync WAL before flushing, if machine crashes after flushing, the write batch will only be partial recovered. Data to other CFs are lost.

Test Plan: Add a new unit test which will fail without the diff.

Reviewers: yhchiang, IslamAbdelRahman, igor, yiwu

Reviewed By: yiwu

Subscribers: yiwu, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60915
2016-07-21 16:29:06 -07:00

331 lines
12 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/flush_job.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/likely.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/event_logger.h"
#include "util/file_util.h"
#include "util/iostats_context_imp.h"
#include "util/log_buffer.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
namespace rocksdb {
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
env_options_(env_options),
versions_(versions),
db_mutex_(db_mutex),
shutting_down_(shutting_down),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
job_context_(job_context),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_file_directory_(output_file_directory),
output_compression_(output_compression),
stats_(stats),
event_logger_(event_logger),
measure_io_stats_(measure_io_stats),
pick_memtable_called(false) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
}
FlushJob::~FlushJob() {
ThreadStatusUtil::ResetThreadStatus();
}
void FlushJob::ReportStartedFlush() {
ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
cfd_->options()->enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_JOB_ID,
job_context_->job_id);
IOSTATS_RESET(bytes_written);
}
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
uint64_t input_size = 0;
for (auto* mem : mems) {
input_size += mem->ApproximateMemoryUsage();
}
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_MEMTABLES,
input_size);
}
void FlushJob::RecordFlushIOStats() {
RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
void FlushJob::PickMemTable() {
db_mutex_->AssertHeld();
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);
if (mems_.empty()) {
return;
}
ReportFlushInputSize(mems_);
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems_[0];
edit_ = m->GetEdits();
edit_->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
// will no longer be picked up for recovery.
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
edit_->SetColumnFamily(cfd_->GetID());
// path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
base_ = cfd_->current();
base_->Ref(); // it is likely that we do not need this reference
}
Status FlushJob::Run(FileMetaData* file_meta) {
db_mutex_->AssertHeld();
assert(pick_memtable_called);
AutoThreadOperationStageUpdater stage_run(
ThreadStatus::STAGE_FLUSH_RUN);
if (mems_.empty()) {
LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
cfd_->GetName().c_str());
return Status::OK();
}
// I/O measurement variables
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
uint64_t prev_write_nanos = 0;
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
SetPerfLevel(PerfLevel::kEnableTime);
prev_write_nanos = IOSTATS(write_nanos);
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
}
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table();
if (s.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
s = Status::ShutdownInProgress(
"Database shutdown or Column family drop during flush");
}
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
}
if (s.ok() && file_meta != nullptr) {
*file_meta = meta_;
}
RecordFlushIOStats();
auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_context_->job_id << "event"
<< "flush_finished";
stream << "lsm_state";
stream.StartArray();
auto vstorage = cfd_->current()->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();
stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
if (measure_io_stats_) {
if (prev_perf_level != PerfLevel::kEnableTime) {
SetPerfLevel(prev_perf_level);
}
stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
stream << "file_range_sync_nanos"
<< (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
stream << "file_prepare_write_nanos"
<< (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
}
return s;
}
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros();
Status s;
{
db_mutex_->Unlock();
if (log_buffer_) {
log_buffer_->FlushBufferToLog();
}
std::vector<InternalIterator*> memtables;
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
uint64_t total_num_entries = 0, total_num_deletes = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems_) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
total_memory_usage += m->ApproximateMemoryUsage();
}
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber());
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(), &meta_,
cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(),
cfd_->GetID(), cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, output_compression_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */);
LogFlush(db_options_.info_log);
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
" bytes %s"
"%s",
cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber(),
meta_.fd.GetFileSize(), s.ToString().c_str(),
meta_.marked_for_compaction ? " (needs compaction)" : "");
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
db_mutex_->Lock();
}
base_->Unref();
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
if (s.ok() && meta_.fd.GetFileSize() > 0) {
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for
// that key range.
// Add file to L0
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.smallest_seqno, meta_.largest_seqno,
meta_.marked_for_compaction);
}
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta_.fd.GetFileSize();
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta_.fd.GetFileSize());
RecordFlushIOStats();
return s;
}
} // namespace rocksdb