Fixed CompactFiles() spuriously failing or corrupting DB
Summary: We started getting two kinds of crashes since we started using `DB::CompactFiles()`: (1) `CompactFiles()` fails saying something like "/data/logdevice/4440/shard12/012302.sst: No such file or directory", and presumably makes DB read-only, (2) DB fails to open saying "Corruption: Can't access /267000.sst: IO error: /data/logdevice/4440/shard1/267000.sst: No such file or directory". AFAICT, both can be explained by background thread deleting compaction output as "obsolete" while it's being written, before it's committed to manifest. If it ends up committed to the manifest, we get (2); if compaction notices the disappearance and fails, we get (1). The internal tasks t10068021 and t10134177 have some details about the investigation that led to this. Test Plan: `make -j check`; the new test fails to reopen the DB without the fix Reviewers: yhchiang Reviewed By: yhchiang Subscribers: dhruba, sdong Differential Revision: https://reviews.facebook.net/D54561
This commit is contained in:
parent
4a0ee7570c
commit
af42561165
@ -7,6 +7,7 @@
|
||||
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/db.h"
|
||||
@ -142,9 +143,6 @@ TEST_F(CompactFilesTest, ObsoleteFiles) {
|
||||
}
|
||||
|
||||
auto l0_files = collector->GetFlushedFiles();
|
||||
CompactionOptions compact_opt;
|
||||
compact_opt.compression = kNoCompression;
|
||||
compact_opt.output_file_size_limit = kWriteBufferSize * 5;
|
||||
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
|
||||
|
||||
// verify all compaction input files are deleted
|
||||
@ -154,6 +152,62 @@ TEST_F(CompactFilesTest, ObsoleteFiles) {
|
||||
delete db;
|
||||
}
|
||||
|
||||
TEST_F(CompactFilesTest, CapturingPendingFiles) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
// Disable RocksDB background compaction.
|
||||
options.compaction_style = kCompactionStyleNone;
|
||||
// Always do full scans for obsolete files (needed to reproduce the issue).
|
||||
options.delete_obsolete_files_period_micros = 0;
|
||||
|
||||
// Add listener.
|
||||
FlushedFileCollector* collector = new FlushedFileCollector();
|
||||
options.listeners.emplace_back(collector);
|
||||
|
||||
DB* db = nullptr;
|
||||
DestroyDB(db_name_, options);
|
||||
Status s = DB::Open(options, db_name_, &db);
|
||||
assert(s.ok());
|
||||
assert(db);
|
||||
|
||||
// Create 5 files.
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
db->Put(WriteOptions(), "key" + ToString(i), "value");
|
||||
db->Flush(FlushOptions());
|
||||
}
|
||||
|
||||
auto l0_files = collector->GetFlushedFiles();
|
||||
EXPECT_EQ(5, l0_files.size());
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"},
|
||||
{"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Start compacting files.
|
||||
std::thread compaction_thread(
|
||||
[&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
|
||||
|
||||
// In the meantime flush another file.
|
||||
TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
|
||||
db->Put(WriteOptions(), "key5", "value");
|
||||
db->Flush(FlushOptions());
|
||||
TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
|
||||
|
||||
compaction_thread.join();
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
delete db;
|
||||
|
||||
// Make sure we can reopen the DB.
|
||||
s = DB::Open(options, db_name_, &db);
|
||||
ASSERT_TRUE(s.ok());
|
||||
assert(db);
|
||||
delete db;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -1814,6 +1814,9 @@ Status DBImpl::CompactFilesImpl(
|
||||
std::vector<SequenceNumber> snapshot_seqs =
|
||||
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
||||
|
||||
auto pending_outputs_inserted_elem =
|
||||
CaptureCurrentFileNumberInPendingOutputs();
|
||||
|
||||
assert(is_snapshot_supported_ || snapshots_.empty());
|
||||
CompactionJob compaction_job(
|
||||
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
|
||||
@ -1841,6 +1844,8 @@ Status DBImpl::CompactFilesImpl(
|
||||
TEST_SYNC_POINT("CompactFilesImpl:0");
|
||||
TEST_SYNC_POINT("CompactFilesImpl:1");
|
||||
compaction_job.Run();
|
||||
TEST_SYNC_POINT("CompactFilesImpl:2");
|
||||
TEST_SYNC_POINT("CompactFilesImpl:3");
|
||||
mutex_.Lock();
|
||||
|
||||
Status status = compaction_job.Install(*c->mutable_cf_options(), &mutex_);
|
||||
@ -1851,6 +1856,8 @@ Status DBImpl::CompactFilesImpl(
|
||||
c->ReleaseCompactionFiles(s);
|
||||
c.reset();
|
||||
|
||||
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
||||
|
||||
if (status.ok()) {
|
||||
// Done
|
||||
} else if (status.IsShutdownInProgress()) {
|
||||
|
Loading…
Reference in New Issue
Block a user