Should flush and sync WAL when writing it in DB::Open() (#6417)

Summary:
A recent fix related to 2pc https://github.com/facebook/rocksdb/pull/6313/ writes something to WAL, but does not flush or sync. This causes assertion failure "impl->TEST_WALBufferIsEmpty()" if manual_wal_flush = true. We should fsync the entry to make sure a second power reset can recover.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6417

Test Plan: Add manual_wal_flush=true case in TransactionTest.DoubleCrashInRecovery and fix a bug in the test so that the bug can be reproduced. It passes with the fix.

Differential Revision: D19894537

fbshipit-source-id: f1e84e49e2269f583c6019743118292cd8b6598e
This commit is contained in:
sdong 2020-02-13 18:39:38 -08:00 committed by Facebook Github Bot
parent 46516778dd
commit ac8e89a443
2 changed files with 79 additions and 69 deletions

View File

@ -1494,6 +1494,13 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
uint64_t log_used, log_size; uint64_t log_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer; log::Writer* log_writer = impl->logs_.back().writer;
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size); s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
if (s.ok()) {
s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
}
}
} }
} }
} }

View File

@ -6120,84 +6120,87 @@ TEST_P(TransactionTest, ReseekOptimization) {
// there. The new log files should be still read succesfully during recovery of // there. The new log files should be still read succesfully during recovery of
// the 2nd crash. // the 2nd crash.
TEST_P(TransactionTest, DoubleCrashInRecovery) { TEST_P(TransactionTest, DoubleCrashInRecovery) {
for (const bool write_after_recovery : {false, true}) { for (const bool manual_wal_flush : {false, true}) {
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; for (const bool write_after_recovery : {false, true}) {
ReOpen(); options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
std::string cf_name = "two"; options.manual_wal_flush = manual_wal_flush;
ColumnFamilyOptions cf_options; ReOpen();
ColumnFamilyHandle* cf_handle = nullptr; std::string cf_name = "two";
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); ColumnFamilyOptions cf_options;
ColumnFamilyHandle* cf_handle = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
// Add a prepare entry to prevent the older logs from being deleted. // Add a prepare entry to prevent the older logs from being deleted.
WriteOptions write_options; WriteOptions write_options;
TransactionOptions txn_options; TransactionOptions txn_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options); Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName("xid")); ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare"))); ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
ASSERT_OK(txn->Prepare()); ASSERT_OK(txn->Prepare());
FlushOptions flush_ops; FlushOptions flush_ops;
db->Flush(flush_ops); db->Flush(flush_ops);
// Now we have a log that cannot be deleted // Now we have a log that cannot be deleted
ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1")); ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
// Flush only the 2nd cf // Flush only the 2nd cf
db->Flush(flush_ops, cf_handle); db->Flush(flush_ops, cf_handle);
// The value is large enough to be touched by the corruption we ingest // The value is large enough to be touched by the corruption we ingest
// below. // below.
std::string large_value(400, ' '); std::string large_value(400, ' ');
// key/value not touched by corruption // key/value not touched by corruption
ASSERT_OK(db->Put(write_options, "foo2", "bar2")); ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
// key/value touched by corruption // key/value touched by corruption
ASSERT_OK(db->Put(write_options, "foo3", large_value)); ASSERT_OK(db->Put(write_options, "foo3", large_value));
// key/value not touched by corruption // key/value not touched by corruption
ASSERT_OK(db->Put(write_options, "foo4", "bar4")); ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
db->FlushWAL(true); db->FlushWAL(true);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
uint64_t wal_file_id = db_impl->TEST_LogfileNumber(); uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
std::string fname = LogFileName(dbname, wal_file_id); std::string fname = LogFileName(dbname, wal_file_id);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
delete txn; delete txn;
delete cf_handle; delete cf_handle;
delete db; delete db;
db = nullptr; db = nullptr;
// Corrupt the last log file in the middle, so that it is not corrupted // Corrupt the last log file in the middle, so that it is not corrupted
// in the tail. // in the tail.
std::string file_content; std::string file_content;
ASSERT_OK(ReadFileToString(env, fname, &file_content)); ASSERT_OK(ReadFileToString(env, fname, &file_content));
file_content[400] = 'h'; file_content[400] = 'h';
file_content[401] = 'a'; file_content[401] = 'a';
ASSERT_OK(env->DeleteFile(fname)); ASSERT_OK(env->DeleteFile(fname));
ASSERT_OK(WriteStringToFile(env, file_content, fname)); ASSERT_OK(WriteStringToFile(env, file_content, fname, true));
// Recover from corruption // Recover from corruption
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName, column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions())); ColumnFamilyOptions()));
column_families.push_back( column_families.push_back(
ColumnFamilyDescriptor("two", ColumnFamilyOptions())); ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
ASSERT_OK(ReOpenNoDelete(column_families, &handles)); ASSERT_OK(ReOpenNoDelete(column_families, &handles));
if (write_after_recovery) { if (write_after_recovery) {
// Write data to the log right after the corrupted log // Write data to the log right after the corrupted log
ASSERT_OK(db->Put(write_options, "foo5", large_value)); ASSERT_OK(db->Put(write_options, "foo5", large_value));
} }
// Persist data written to WAL during recovery or by the last Put // Persist data written to WAL during recovery or by the last Put
db->FlushWAL(true); db->FlushWAL(true);
// 2nd crash to recover while having a valid log after the corrupted one. // 2nd crash to recover while having a valid log after the corrupted one.
ASSERT_OK(ReOpenNoDelete(column_families, &handles)); ASSERT_OK(ReOpenNoDelete(column_families, &handles));
assert(db != nullptr); assert(db != nullptr);
txn = db->GetTransactionByName("xid"); txn = db->GetTransactionByName("xid");
ASSERT_TRUE(txn != nullptr); ASSERT_TRUE(txn != nullptr);
ASSERT_OK(txn->Commit()); ASSERT_OK(txn->Commit());
delete txn; delete txn;
for (auto handle : handles) { for (auto handle : handles) {
delete handle; delete handle;
}
} }
} }
} }