Unit Tests for sync, range sync and file close failures
Summary: Closes https://github.com/facebook/rocksdb/pull/2454 Differential Revision: D5255320 Pulled By: siying fbshipit-source-id: 0080830fa8eb5da6de25e17ba68aee91018c7913
This commit is contained in:
parent
4cee11f4e3
commit
5c97a7c066
@ -252,7 +252,314 @@ TEST_F(DBIOFailureTest, PutFailsParanoid) {
|
||||
// the next put should NOT fail
|
||||
ASSERT_TRUE(s.ok());
|
||||
}
|
||||
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||
TEST_F(DBIOFailureTest, FlushSstRangeSyncError) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
options.paranoid_checks = true;
|
||||
options.write_buffer_size = 256 * 1024 * 1024;
|
||||
options.writable_file_max_buffer_size = 128 * 1024;
|
||||
options.bytes_per_sync = 128 * 1024;
|
||||
options.level0_file_num_compaction_trigger = 4;
|
||||
options.memtable_factory.reset(new SpecialSkipListFactory(10));
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Status s;
|
||||
|
||||
std::atomic<int> range_sync_called(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SpecialEnv::SStableFile::RangeSync", [&](void* arg) {
|
||||
if (range_sync_called.fetch_add(1) == 0) {
|
||||
Status* st = static_cast<Status*>(arg);
|
||||
*st = Status::IOError("range sync dummy error");
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
std::string rnd_str = RandomString(&rnd, options.bytes_per_sync / 2);
|
||||
std::string rnd_str_512kb = RandomString(&rnd, 512 * 1024);
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
// First 1MB doesn't get range synced
|
||||
ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb));
|
||||
ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb));
|
||||
ASSERT_OK(Put(1, "foo1_1", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo1_2", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo1_3", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
ASSERT_OK(Put(1, "foo3_1", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo3_2", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo3_3", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo4", "bar"));
|
||||
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
||||
|
||||
// Following writes should fail as flush failed.
|
||||
ASSERT_NOK(Put(1, "foo2", "bar3"));
|
||||
ASSERT_EQ("bar", Get(1, "foo"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ASSERT_GE(1, range_sync_called.load());
|
||||
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
ASSERT_EQ("bar", Get(1, "foo"));
|
||||
}
|
||||
|
||||
TEST_F(DBIOFailureTest, CompactSstRangeSyncError) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
options.paranoid_checks = true;
|
||||
options.write_buffer_size = 256 * 1024 * 1024;
|
||||
options.writable_file_max_buffer_size = 128 * 1024;
|
||||
options.bytes_per_sync = 128 * 1024;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.target_file_size_base = 256 * 1024 * 1024;
|
||||
options.disable_auto_compactions = true;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Status s;
|
||||
|
||||
Random rnd(301);
|
||||
std::string rnd_str =
|
||||
RandomString(&rnd, static_cast<int>(options.bytes_per_sync / 2));
|
||||
std::string rnd_str_512kb = RandomString(&rnd, 512 * 1024);
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
// First 1MB doesn't get range synced
|
||||
ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb));
|
||||
ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb));
|
||||
ASSERT_OK(Put(1, "foo1_1", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo1_2", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo1_3", rnd_str));
|
||||
Flush(1);
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
ASSERT_OK(Put(1, "foo3_1", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo3_2", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo3_3", rnd_str));
|
||||
ASSERT_OK(Put(1, "foo4", "bar"));
|
||||
Flush(1);
|
||||
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
||||
|
||||
std::atomic<int> range_sync_called(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SpecialEnv::SStableFile::RangeSync", [&](void* arg) {
|
||||
if (range_sync_called.fetch_add(1) == 0) {
|
||||
Status* st = static_cast<Status*>(arg);
|
||||
*st = Status::IOError("range sync dummy error");
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(dbfull()->SetOptions(handles_[1],
|
||||
{
|
||||
{"disable_auto_compactions", "false"},
|
||||
}));
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Following writes should fail as flush failed.
|
||||
ASSERT_NOK(Put(1, "foo2", "bar3"));
|
||||
ASSERT_EQ("bar", Get(1, "foo"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ASSERT_GE(1, range_sync_called.load());
|
||||
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
ASSERT_EQ("bar", Get(1, "foo"));
|
||||
}
|
||||
|
||||
TEST_F(DBIOFailureTest, FlushSstCloseError) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
options.paranoid_checks = true;
|
||||
options.level0_file_num_compaction_trigger = 4;
|
||||
options.memtable_factory.reset(new SpecialSkipListFactory(2));
|
||||
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Status s;
|
||||
std::atomic<int> close_called(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SpecialEnv::SStableFile::Close", [&](void* arg) {
|
||||
if (close_called.fetch_add(1) == 0) {
|
||||
Status* st = static_cast<Status*>(arg);
|
||||
*st = Status::IOError("close dummy error");
|
||||
}
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
ASSERT_OK(Put(1, "foo1", "bar1"));
|
||||
ASSERT_OK(Put(1, "foo", "bar2"));
|
||||
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
||||
|
||||
// Following writes should fail as flush failed.
|
||||
ASSERT_NOK(Put(1, "foo2", "bar3"));
|
||||
ASSERT_EQ("bar2", Get(1, "foo"));
|
||||
ASSERT_EQ("bar1", Get(1, "foo1"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
ASSERT_EQ("bar2", Get(1, "foo"));
|
||||
ASSERT_EQ("bar1", Get(1, "foo1"));
|
||||
}
|
||||
|
||||
TEST_F(DBIOFailureTest, CompactionSstCloseError) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
options.paranoid_checks = true;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.disable_auto_compactions = true;
|
||||
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Status s;
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
Flush(1);
|
||||
ASSERT_OK(Put(1, "foo", "bar2"));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
Flush(1);
|
||||
ASSERT_OK(Put(1, "foo", "bar3"));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
Flush(1);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
std::atomic<int> close_called(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SpecialEnv::SStableFile::Close", [&](void* arg) {
|
||||
if (close_called.fetch_add(1) == 0) {
|
||||
Status* st = static_cast<Status*>(arg);
|
||||
*st = Status::IOError("close dummy error");
|
||||
}
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_OK(dbfull()->SetOptions(handles_[1],
|
||||
{
|
||||
{"disable_auto_compactions", "false"},
|
||||
}));
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Following writes should fail as compaction failed.
|
||||
ASSERT_NOK(Put(1, "foo2", "bar3"));
|
||||
ASSERT_EQ("bar3", Get(1, "foo"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
ASSERT_EQ("bar3", Get(1, "foo"));
|
||||
}
|
||||
|
||||
TEST_F(DBIOFailureTest, FlushSstSyncError) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
options.paranoid_checks = true;
|
||||
options.use_fsync = false;
|
||||
options.level0_file_num_compaction_trigger = 4;
|
||||
options.memtable_factory.reset(new SpecialSkipListFactory(2));
|
||||
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Status s;
|
||||
std::atomic<int> sync_called(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SpecialEnv::SStableFile::Sync", [&](void* arg) {
|
||||
if (sync_called.fetch_add(1) == 0) {
|
||||
Status* st = static_cast<Status*>(arg);
|
||||
*st = Status::IOError("sync dummy error");
|
||||
}
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
ASSERT_OK(Put(1, "foo1", "bar1"));
|
||||
ASSERT_OK(Put(1, "foo", "bar2"));
|
||||
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
||||
|
||||
// Following writes should fail as flush failed.
|
||||
ASSERT_NOK(Put(1, "foo2", "bar3"));
|
||||
ASSERT_EQ("bar2", Get(1, "foo"));
|
||||
ASSERT_EQ("bar1", Get(1, "foo1"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
ASSERT_EQ("bar2", Get(1, "foo"));
|
||||
ASSERT_EQ("bar1", Get(1, "foo1"));
|
||||
}
|
||||
|
||||
TEST_F(DBIOFailureTest, CompactionSstSyncError) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
options.paranoid_checks = true;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.disable_auto_compactions = true;
|
||||
options.use_fsync = false;
|
||||
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Status s;
|
||||
|
||||
ASSERT_OK(Put(1, "foo", "bar"));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
Flush(1);
|
||||
ASSERT_OK(Put(1, "foo", "bar2"));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
Flush(1);
|
||||
ASSERT_OK(Put(1, "foo", "bar3"));
|
||||
ASSERT_OK(Put(1, "foo2", "bar"));
|
||||
Flush(1);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
std::atomic<int> sync_called(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SpecialEnv::SStableFile::Sync", [&](void* arg) {
|
||||
if (sync_called.fetch_add(1) == 0) {
|
||||
Status* st = static_cast<Status*>(arg);
|
||||
*st = Status::IOError("close dummy error");
|
||||
}
|
||||
});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_OK(dbfull()->SetOptions(handles_[1],
|
||||
{
|
||||
{"disable_auto_compactions", "false"},
|
||||
}));
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
// Following writes should fail as compaction failed.
|
||||
ASSERT_NOK(Put(1, "foo2", "bar3"));
|
||||
ASSERT_EQ("bar3", Get(1, "foo"));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
ASSERT_EQ("bar3", Get(1, "foo"));
|
||||
}
|
||||
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -247,6 +247,13 @@ class SpecialEnv : public EnvWrapper {
|
||||
}
|
||||
}
|
||||
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
|
||||
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
|
||||
Status s = base_->RangeSync(offset, nbytes);
|
||||
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||
TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s);
|
||||
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||
return s;
|
||||
}
|
||||
Status Close() override {
|
||||
// SyncPoint is not supported in Released Windows Mode.
|
||||
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||
@ -256,7 +263,11 @@ class SpecialEnv : public EnvWrapper {
|
||||
TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus",
|
||||
&preallocation_size);
|
||||
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||
return base_->Close();
|
||||
Status s = base_->Close();
|
||||
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||
TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s);
|
||||
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||
return s;
|
||||
}
|
||||
Status Flush() override { return base_->Flush(); }
|
||||
Status Sync() override {
|
||||
@ -264,7 +275,11 @@ class SpecialEnv : public EnvWrapper {
|
||||
while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) {
|
||||
env_->SleepForMicroseconds(100000);
|
||||
}
|
||||
return base_->Sync();
|
||||
Status s = base_->Sync();
|
||||
#if !(defined NDEBUG) || !defined(OS_WIN)
|
||||
TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s);
|
||||
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||
return s;
|
||||
}
|
||||
void SetIOPriority(Env::IOPriority pri) override {
|
||||
base_->SetIOPriority(pri);
|
||||
|
@ -535,6 +535,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
|
||||
StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
|
||||
handle->set_offset(r->offset);
|
||||
handle->set_size(block_contents.size());
|
||||
assert(r->status.ok());
|
||||
r->status = r->file->Append(block_contents);
|
||||
if (r->status.ok()) {
|
||||
char trailer[kBlockTrailerSize];
|
||||
@ -561,6 +562,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
|
||||
}
|
||||
}
|
||||
|
||||
assert(r->status.ok());
|
||||
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
|
||||
if (r->status.ok()) {
|
||||
r->status = InsertBlockInCache(block_contents, type, handle);
|
||||
@ -804,6 +806,7 @@ Status BlockBasedTableBuilder::Finish() {
|
||||
footer.set_checksum(r->table_options.checksum);
|
||||
std::string footer_encoding;
|
||||
footer.EncodeTo(&footer_encoding);
|
||||
assert(r->status.ok());
|
||||
r->status = r->file->Append(footer_encoding);
|
||||
if (r->status.ok()) {
|
||||
r->offset += footer_encoding.size();
|
||||
|
Loading…
Reference in New Issue
Block a user