Fix a bug in file ingestion (#5760)
Summary: Before this PR, when the number of column families involved in a file ingestion exceeds 2, a bug in the looping logic prevents correct file number being assigned to each ingestion job. Also skip deleting non-existing hard links during cleanup-after-failure. Test plan (devserver) ``` $COMPILE_WITH_ASAN=1 make all $./external_sst_file_test --gtest_filter=ExternalSSTFileTest/ExternalSSTFileTest.IngestFilesIntoMultipleColumnFamilies_*/* $makke check ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/5760 Differential Revision: D17142982 Pulled By: riversand963 fbshipit-source-id: 06c1847a4e7a402647bcf28d124e70f2a0f9daf6
This commit is contained in:
parent
0849844411
commit
1c6eb2b1e4
@ -3669,9 +3669,9 @@ Status DBImpl::IngestExternalFiles(
|
|||||||
exec_results.emplace_back(false, Status::OK());
|
exec_results.emplace_back(false, Status::OK());
|
||||||
}
|
}
|
||||||
// TODO(yanqin) maybe make jobs run in parallel
|
// TODO(yanqin) maybe make jobs run in parallel
|
||||||
|
uint64_t start_file_number = next_file_number;
|
||||||
for (size_t i = 1; i != num_cfs; ++i) {
|
for (size_t i = 1; i != num_cfs; ++i) {
|
||||||
uint64_t start_file_number =
|
start_file_number += args[i - 1].external_files.size();
|
||||||
next_file_number + args[i - 1].external_files.size();
|
|
||||||
auto* cfd =
|
auto* cfd =
|
||||||
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
||||||
|
@ -160,7 +160,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||||||
// We failed, remove all files that we copied into the db
|
// We failed, remove all files that we copied into the db
|
||||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||||
if (f.internal_file_path.empty()) {
|
if (f.internal_file_path.empty()) {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
Status s = env_->DeleteFile(f.internal_file_path);
|
Status s = env_->DeleteFile(f.internal_file_path);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -291,6 +291,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
|
|||||||
// We failed to add the files to the database
|
// We failed to add the files to the database
|
||||||
// remove all the files we copied
|
// remove all the files we copied
|
||||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||||
|
if (f.internal_file_path.empty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Status s = env_->DeleteFile(f.internal_file_path);
|
Status s = env_->DeleteFile(f.internal_file_path);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_WARN(db_options_.info_log,
|
ROCKS_LOG_WARN(db_options_.info_log,
|
||||||
|
@ -2369,10 +2369,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
|||||||
new FaultInjectionTestEnv(env_));
|
new FaultInjectionTestEnv(env_));
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = fault_injection_env.get();
|
options.env = fault_injection_env.get();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2386,6 +2387,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
|
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2393,8 +2397,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
|||||||
-1, true, true_data);
|
-1, true, true_data);
|
||||||
ASSERT_OK(s);
|
ASSERT_OK(s);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2426,10 +2431,11 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
|
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = fault_injection_env.get();
|
options.env = fault_injection_env.get();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
|
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
|
||||||
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
|
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
|
||||||
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
|
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
|
||||||
|
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
|
||||||
for (size_t i = 0; i != handles_.size(); ++i) {
|
for (size_t i = 0; i != handles_.size(); ++i) {
|
||||||
int cf = static_cast<int>(i);
|
int cf = static_cast<int>(i);
|
||||||
const auto& orig_data = data_before_ingestion[i];
|
const auto& orig_data = data_before_ingestion[i];
|
||||||
@ -2442,6 +2448,7 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2455,6 +2462,8 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2508,10 +2517,11 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
dbfull()->ReleaseSnapshot(read_opts.snapshot);
|
dbfull()->ReleaseSnapshot(read_opts.snapshot);
|
||||||
|
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
|
options);
|
||||||
// Should see consistent state after ingestion for all column families even
|
// Should see consistent state after ingestion for all column families even
|
||||||
// without snapshot.
|
// without snapshot.
|
||||||
ASSERT_EQ(2, handles_.size());
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2541,10 +2551,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
|||||||
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
|
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
|
||||||
});
|
});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2558,6 +2569,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
|
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2577,8 +2591,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
|||||||
|
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2607,10 +2622,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
|||||||
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
|
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
|
||||||
});
|
});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2624,6 +2640,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2643,8 +2661,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
|||||||
|
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2664,7 +2683,7 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = fault_injection_env.get();
|
options.env = fault_injection_env.get();
|
||||||
|
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
|
|
||||||
SyncPoint::GetInstance()->ClearTrace();
|
SyncPoint::GetInstance()->ClearTrace();
|
||||||
SyncPoint::GetInstance()->DisableProcessing();
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
@ -2682,6 +2701,7 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2695,6 +2715,8 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2715,8 +2737,9 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
fault_injection_env->DropUnsyncedFileData();
|
fault_injection_env->DropUnsyncedFileData();
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user