43890774ed
Summary: There is a race condition if WAL tracking in the MANIFEST is enabled in a database that disables 2PC. The race condition is between two background flush threads trying to install flush results to the MANIFEST. Consider an example database with two column families: "default" (cfd0) and "cf1" (cfd1). Initially, both column families have one mutable (active) memtable whose data backed by 6.log. 1. Trigger a manual flush for "cf1", creating a 7.log 2. Insert another key to "default", and trigger flush for "default", creating 8.log 3. BgFlushThread1 finishes writing 9.sst 4. BgFlushThread2 finishes writing 10.sst ``` Time BgFlushThread1 BgFlushThread2 | mutex_.Lock() | precompute min_wal_to_keep as 6 | mutex_.Unlock() | mutex_.Lock() | precompute min_wal_to_keep as 6 | join MANIFEST write queue and mutex_.Unlock() | write to MANIFEST | mutex_.Lock() | cfd1->log_number = 7 | Signal bg_flush_2 and mutex_.Unlock() | wake up and mutex_.Lock() | cfd0->log_number = 8 | FindObsoleteFiles() with job_context->log_number == 7 | mutex_.Unlock() | PurgeObsoleteFiles() deletes 6.log V ``` As shown in the above, BgFlushThread2 thinks that the min wal to keep is 6.log because "cf1" has unflushed data in 6.log (cf1.log_number=6). Similarly, BgThread1 thinks that min wal to keep is also 6.log because "default" has unflushed data (default.log_number=6). No WAL deletion will be written to MANIFEST because 6 is equal to `versions_->wals_.min_wal_number_to_keep`, due to https://github.com/facebook/rocksdb/blob/7.1.fb/db/memtable_list.cc#L513:L514. The bg flush thread that finishes last will perform file purging. `job_context.log_number` will be evaluated as 7, i.e. the min wal that contains unflushed data, causing 6.log to be deleted. However, MANIFEST thinks 6.log should still exist. If you close the db at this point, you won't be able to re-open it if `track_and_verify_wal_in_manifest` is true. We must handle the case of multiple bg flush threads, and it is difficult for one bg flush thread to know the correct min wal number until the other bg flush threads have finished committing to the manifest and updated the `cfd::log_number`. To fix this issue, we rename an existing variable `min_log_number_to_keep_2pc` to `min_log_number_to_keep`, and use it to track WAL file deletion in non-2pc mode as well. This variable is updated only 1) during recovery with mutex held, or 2) in the MANIFEST write thread. `min_log_number_to_keep` means RocksDB will delete WALs below it, although there may be WALs above it which are also obsolete. Formally, we will have [min_wal_to_keep, max_obsolete_wal]. During recovery, we make sure that only WALs above max_obsolete_wal are checked and added back to `alive_log_files_`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9715 Test Plan: ``` make check ``` Also ran stress test below (with asan) to make sure it completes successfully. ``` TEST_TMPDIR=/dev/shm/rocksdb OPT=-g ASAN_OPTIONS=disable_coredump=0 \ CRASH_TEST_EXT_ARGS=--compression_type=zstd SKIP_FORMAT_BUCK_CHECKS=1 \ make J=52 -j52 blackbox_asan_crash_test ``` Reviewed By: ltamasi Differential Revision: D34984412 Pulled By: riversand963 fbshipit-source-id: c7b21a8d84751bb55ea79c9f387103d21b231005
359 lines
12 KiB
C++
359 lines
12 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/event_helpers.h"
|
|
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/utilities/customizable_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
#ifndef ROCKSDB_LITE
|
|
Status EventListener::CreateFromString(const ConfigOptions& config_options,
|
|
const std::string& id,
|
|
std::shared_ptr<EventListener>* result) {
|
|
return LoadSharedObject<EventListener>(config_options, id, nullptr, result);
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
namespace {
|
|
template <class T>
|
|
inline T SafeDivide(T a, T b) {
|
|
return b == 0 ? 0 : a / b;
|
|
}
|
|
} // namespace
|
|
|
|
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
|
|
*jwriter << "time_micros"
|
|
<< std::chrono::duration_cast<std::chrono::microseconds>(
|
|
std::chrono::system_clock::now().time_since_epoch())
|
|
.count();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
void EventHelpers::NotifyTableFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, TableFileCreationReason reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationBriefInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.job_id = job_id;
|
|
info.reason = reason;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreationStarted(info);
|
|
}
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
void EventHelpers::NotifyOnBackgroundError(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
|
|
bool* auto_recovery) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
db_mutex->AssertHeld();
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
for (auto& listener : listeners) {
|
|
listener->OnBackgroundError(reason, bg_error);
|
|
bg_error->PermitUncheckedError();
|
|
if (*auto_recovery) {
|
|
listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
|
|
}
|
|
}
|
|
db_mutex->Lock();
|
|
#else
|
|
(void)listeners;
|
|
(void)reason;
|
|
(void)bg_error;
|
|
(void)db_mutex;
|
|
(void)auto_recovery;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, const FileDescriptor& fd,
|
|
uint64_t oldest_blob_file_number, const TableProperties& table_properties,
|
|
TableFileCreationReason reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name) {
|
|
if (s.ok() && event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "table_file_creation"
|
|
<< "file_number" << fd.GetNumber() << "file_size"
|
|
<< fd.GetFileSize() << "file_checksum"
|
|
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
|
|
<< file_checksum_func_name;
|
|
|
|
// table_properties
|
|
{
|
|
jwriter << "table_properties";
|
|
jwriter.StartObject();
|
|
|
|
// basic properties:
|
|
jwriter << "data_size" << table_properties.data_size << "index_size"
|
|
<< table_properties.index_size << "index_partitions"
|
|
<< table_properties.index_partitions << "top_level_index_size"
|
|
<< table_properties.top_level_index_size
|
|
<< "index_key_is_user_key"
|
|
<< table_properties.index_key_is_user_key
|
|
<< "index_value_is_delta_encoded"
|
|
<< table_properties.index_value_is_delta_encoded << "filter_size"
|
|
<< table_properties.filter_size << "raw_key_size"
|
|
<< table_properties.raw_key_size << "raw_average_key_size"
|
|
<< SafeDivide(table_properties.raw_key_size,
|
|
table_properties.num_entries)
|
|
<< "raw_value_size" << table_properties.raw_value_size
|
|
<< "raw_average_value_size"
|
|
<< SafeDivide(table_properties.raw_value_size,
|
|
table_properties.num_entries)
|
|
<< "num_data_blocks" << table_properties.num_data_blocks
|
|
<< "num_entries" << table_properties.num_entries
|
|
<< "num_filter_entries" << table_properties.num_filter_entries
|
|
<< "num_deletions" << table_properties.num_deletions
|
|
<< "num_merge_operands" << table_properties.num_merge_operands
|
|
<< "num_range_deletions" << table_properties.num_range_deletions
|
|
<< "format_version" << table_properties.format_version
|
|
<< "fixed_key_len" << table_properties.fixed_key_len
|
|
<< "filter_policy" << table_properties.filter_policy_name
|
|
<< "column_family_name" << table_properties.column_family_name
|
|
<< "column_family_id" << table_properties.column_family_id
|
|
<< "comparator" << table_properties.comparator_name
|
|
<< "merge_operator" << table_properties.merge_operator_name
|
|
<< "prefix_extractor_name"
|
|
<< table_properties.prefix_extractor_name << "property_collectors"
|
|
<< table_properties.property_collectors_names << "compression"
|
|
<< table_properties.compression_name << "compression_options"
|
|
<< table_properties.compression_options << "creation_time"
|
|
<< table_properties.creation_time << "oldest_key_time"
|
|
<< table_properties.oldest_key_time << "file_creation_time"
|
|
<< table_properties.file_creation_time
|
|
<< "slow_compression_estimated_data_size"
|
|
<< table_properties.slow_compression_estimated_data_size
|
|
<< "fast_compression_estimated_data_size"
|
|
<< table_properties.fast_compression_estimated_data_size
|
|
<< "db_id" << table_properties.db_id << "db_session_id"
|
|
<< table_properties.db_session_id << "orig_file_number"
|
|
<< table_properties.orig_file_number;
|
|
|
|
// user collected properties
|
|
for (const auto& prop : table_properties.readable_properties) {
|
|
jwriter << prop.first << prop.second;
|
|
}
|
|
jwriter.EndObject();
|
|
}
|
|
|
|
if (oldest_blob_file_number != kInvalidBlobFileNumber) {
|
|
jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.file_size = fd.file_size;
|
|
info.job_id = job_id;
|
|
info.table_properties = table_properties;
|
|
info.reason = reason;
|
|
info.status = s;
|
|
info.file_checksum = file_checksum;
|
|
info.file_checksum_func_name = file_checksum_func_name;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
#else
|
|
(void)listeners;
|
|
(void)db_name;
|
|
(void)cf_name;
|
|
(void)file_path;
|
|
(void)reason;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileDeletion(
|
|
EventLogger* event_logger, int job_id, uint64_t file_number,
|
|
const std::string& file_path, const Status& status,
|
|
const std::string& dbname,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event"
|
|
<< "table_file_deletion"
|
|
<< "file_number" << file_number;
|
|
if (!status.ok()) {
|
|
jwriter << "status" << status.ToString();
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileDeletionInfo info;
|
|
info.db_name = dbname;
|
|
info.job_id = job_id;
|
|
info.file_path = file_path;
|
|
info.status = status;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
#else
|
|
(void)file_path;
|
|
(void)dbname;
|
|
(void)listeners;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
void EventHelpers::NotifyOnErrorRecoveryEnd(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const Status& old_bg_error, const Status& new_bg_error,
|
|
InstrumentedMutex* db_mutex) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (!listeners.empty()) {
|
|
db_mutex->AssertHeld();
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
for (auto& listener : listeners) {
|
|
BackgroundErrorRecoveryInfo info;
|
|
info.old_bg_error = old_bg_error;
|
|
info.new_bg_error = new_bg_error;
|
|
listener->OnErrorRecoveryCompleted(old_bg_error);
|
|
listener->OnErrorRecoveryEnd(info);
|
|
info.old_bg_error.PermitUncheckedError();
|
|
info.new_bg_error.PermitUncheckedError();
|
|
}
|
|
db_mutex->Lock();
|
|
}
|
|
#else
|
|
(void)listeners;
|
|
(void)old_bg_error;
|
|
(void)new_bg_error;
|
|
(void)db_mutex;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
void EventHelpers::NotifyBlobFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id,
|
|
BlobFileCreationReason creation_reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreationStarted(info);
|
|
}
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, uint64_t file_number,
|
|
BlobFileCreationReason creation_reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name, uint64_t total_blob_count,
|
|
uint64_t total_blob_bytes) {
|
|
if (s.ok() && event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "blob_file_creation"
|
|
<< "file_number" << file_number << "total_blob_count"
|
|
<< total_blob_count << "total_blob_bytes" << total_blob_bytes
|
|
<< "file_checksum" << file_checksum << "file_checksum_func_name"
|
|
<< file_checksum_func_name << "status" << s.ToString();
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason, total_blob_count, total_blob_bytes,
|
|
s, file_checksum, file_checksum_func_name);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
#else
|
|
(void)listeners;
|
|
(void)db_name;
|
|
(void)file_path;
|
|
(void)creation_reason;
|
|
#endif
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileDeletion(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
|
|
uint64_t file_number, const std::string& file_path, const Status& status,
|
|
const std::string& dbname) {
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event"
|
|
<< "blob_file_deletion"
|
|
<< "file_number" << file_number;
|
|
if (!status.ok()) {
|
|
jwriter << "status" << status.ToString();
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
#ifndef ROCKSDB_LITE
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileDeletionInfo info(dbname, file_path, job_id, status);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
#else
|
|
(void)listeners;
|
|
(void)dbname;
|
|
(void)file_path;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|