Pending output file number should be released after bulkload failure (#4145)

Summary:
If bulkload fails for an input error, the pending output file number wasn't released. This bug can cause all future files with larger number than the current number won't be deleted, even they are compacted. This commit fixes the bug.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4145

Differential Revision: D8877900

Pulled By: siying

fbshipit-source-id: 080be92a23d43305ca1e13fe1c06eb4cd0b01466
This commit is contained in:
Siying Dong 2018-07-17 14:04:18 -07:00 committed by Facebook Github Bot
parent 5a59ce4149
commit 995fcf7573
2 changed files with 35 additions and 0 deletions

View File

@ -2937,6 +2937,7 @@ Status DBImpl::IngestExternalFile(
status = ingestion_job.Prepare(external_files, super_version); status = ingestion_job.Prepare(external_files, super_version);
CleanupSuperVersion(super_version); CleanupSuperVersion(super_version);
if (!status.ok()) { if (!status.ok()) {
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status; return status;
} }

View File

@ -10,6 +10,7 @@
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/sst_file_writer.h" #include "rocksdb/sst_file_writer.h"
#include "util/filename.h"
#include "util/testutil.h" #include "util/testutil.h"
namespace rocksdb { namespace rocksdb {
@ -1292,6 +1293,39 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(ExternalSSTFileTest, IngestNonExistingFile) {
Options options = CurrentOptions();
DestroyAndReopen(options);
Status s = db_->IngestExternalFile({"non_existing_file"},
IngestExternalFileOptions());
ASSERT_NOK(s);
// Verify file deletion is not impacted (verify a bug fix)
ASSERT_OK(Put(Key(1), Key(1)));
ASSERT_OK(Put(Key(9), Key(9)));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(1), Key(1)));
ASSERT_OK(Put(Key(9), Key(9)));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// After full compaction, there should be only 1 file.
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
int num_sst_files = 0;
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kTableFile) {
num_sst_files++;
}
}
ASSERT_EQ(1, num_sst_files);
}
TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = false; options.disable_auto_compactions = false;