diff --git a/db/db_io_failure_test.cc b/db/db_io_failure_test.cc index 7d9e773aa..f0b250f16 100644 --- a/db/db_io_failure_test.cc +++ b/db/db_io_failure_test.cc @@ -252,7 +252,314 @@ TEST_F(DBIOFailureTest, PutFailsParanoid) { // the next put should NOT fail ASSERT_TRUE(s.ok()); } +#if !(defined NDEBUG) || !defined(OS_WIN) +TEST_F(DBIOFailureTest, FlushSstRangeSyncError) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.error_if_exists = false; + options.paranoid_checks = true; + options.write_buffer_size = 256 * 1024 * 1024; + options.writable_file_max_buffer_size = 128 * 1024; + options.bytes_per_sync = 128 * 1024; + options.level0_file_num_compaction_trigger = 4; + options.memtable_factory.reset(new SpecialSkipListFactory(10)); + BlockBasedTableOptions table_options; + table_options.filter_policy.reset(NewBloomFilterPolicy(10)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Status s; + + std::atomic range_sync_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SpecialEnv::SStableFile::RangeSync", [&](void* arg) { + if (range_sync_called.fetch_add(1) == 0) { + Status* st = static_cast(arg); + *st = Status::IOError("range sync dummy error"); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + std::string rnd_str = RandomString(&rnd, options.bytes_per_sync / 2); + std::string rnd_str_512kb = RandomString(&rnd, 512 * 1024); + + ASSERT_OK(Put(1, "foo", "bar")); + // First 1MB doesn't get range synced + ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb)); + ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb)); + ASSERT_OK(Put(1, "foo1_1", rnd_str)); + ASSERT_OK(Put(1, "foo1_2", rnd_str)); + ASSERT_OK(Put(1, "foo1_3", rnd_str)); + ASSERT_OK(Put(1, "foo2", "bar")); + ASSERT_OK(Put(1, "foo3_1", rnd_str)); + ASSERT_OK(Put(1, "foo3_2", rnd_str)); + ASSERT_OK(Put(1, "foo3_3", rnd_str)); + ASSERT_OK(Put(1, "foo4", "bar")); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + + // Following writes should fail as flush failed. + ASSERT_NOK(Put(1, "foo2", "bar3")); + ASSERT_EQ("bar", Get(1, "foo")); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_GE(1, range_sync_called.load()); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("bar", Get(1, "foo")); +} + +TEST_F(DBIOFailureTest, CompactSstRangeSyncError) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.error_if_exists = false; + options.paranoid_checks = true; + options.write_buffer_size = 256 * 1024 * 1024; + options.writable_file_max_buffer_size = 128 * 1024; + options.bytes_per_sync = 128 * 1024; + options.level0_file_num_compaction_trigger = 2; + options.target_file_size_base = 256 * 1024 * 1024; + options.disable_auto_compactions = true; + BlockBasedTableOptions table_options; + table_options.filter_policy.reset(NewBloomFilterPolicy(10)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Status s; + + Random rnd(301); + std::string rnd_str = + RandomString(&rnd, static_cast(options.bytes_per_sync / 2)); + std::string rnd_str_512kb = RandomString(&rnd, 512 * 1024); + + ASSERT_OK(Put(1, "foo", "bar")); + // First 1MB doesn't get range synced + ASSERT_OK(Put(1, "foo0_0", rnd_str_512kb)); + ASSERT_OK(Put(1, "foo0_1", rnd_str_512kb)); + ASSERT_OK(Put(1, "foo1_1", rnd_str)); + ASSERT_OK(Put(1, "foo1_2", rnd_str)); + ASSERT_OK(Put(1, "foo1_3", rnd_str)); + Flush(1); + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "foo3_1", rnd_str)); + ASSERT_OK(Put(1, "foo3_2", rnd_str)); + ASSERT_OK(Put(1, "foo3_3", rnd_str)); + ASSERT_OK(Put(1, "foo4", "bar")); + Flush(1); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + + std::atomic range_sync_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SpecialEnv::SStableFile::RangeSync", [&](void* arg) { + if (range_sync_called.fetch_add(1) == 0) { + Status* st = static_cast(arg); + *st = Status::IOError("range sync dummy error"); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->SetOptions(handles_[1], + { + {"disable_auto_compactions", "false"}, + })); + dbfull()->TEST_WaitForCompact(); + + // Following writes should fail as flush failed. + ASSERT_NOK(Put(1, "foo2", "bar3")); + ASSERT_EQ("bar", Get(1, "foo")); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_GE(1, range_sync_called.load()); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("bar", Get(1, "foo")); +} + +TEST_F(DBIOFailureTest, FlushSstCloseError) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.error_if_exists = false; + options.paranoid_checks = true; + options.level0_file_num_compaction_trigger = 4; + options.memtable_factory.reset(new SpecialSkipListFactory(2)); + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Status s; + std::atomic close_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SpecialEnv::SStableFile::Close", [&](void* arg) { + if (close_called.fetch_add(1) == 0) { + Status* st = static_cast(arg); + *st = Status::IOError("close dummy error"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "foo1", "bar1")); + ASSERT_OK(Put(1, "foo", "bar2")); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + + // Following writes should fail as flush failed. + ASSERT_NOK(Put(1, "foo2", "bar3")); + ASSERT_EQ("bar2", Get(1, "foo")); + ASSERT_EQ("bar1", Get(1, "foo1")); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("bar2", Get(1, "foo")); + ASSERT_EQ("bar1", Get(1, "foo1")); +} + +TEST_F(DBIOFailureTest, CompactionSstCloseError) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.error_if_exists = false; + options.paranoid_checks = true; + options.level0_file_num_compaction_trigger = 2; + options.disable_auto_compactions = true; + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Status s; + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "foo2", "bar")); + Flush(1); + ASSERT_OK(Put(1, "foo", "bar2")); + ASSERT_OK(Put(1, "foo2", "bar")); + Flush(1); + ASSERT_OK(Put(1, "foo", "bar3")); + ASSERT_OK(Put(1, "foo2", "bar")); + Flush(1); + dbfull()->TEST_WaitForCompact(); + + std::atomic close_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SpecialEnv::SStableFile::Close", [&](void* arg) { + if (close_called.fetch_add(1) == 0) { + Status* st = static_cast(arg); + *st = Status::IOError("close dummy error"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(dbfull()->SetOptions(handles_[1], + { + {"disable_auto_compactions", "false"}, + })); + dbfull()->TEST_WaitForCompact(); + + // Following writes should fail as compaction failed. + ASSERT_NOK(Put(1, "foo2", "bar3")); + ASSERT_EQ("bar3", Get(1, "foo")); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("bar3", Get(1, "foo")); +} + +TEST_F(DBIOFailureTest, FlushSstSyncError) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.error_if_exists = false; + options.paranoid_checks = true; + options.use_fsync = false; + options.level0_file_num_compaction_trigger = 4; + options.memtable_factory.reset(new SpecialSkipListFactory(2)); + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Status s; + std::atomic sync_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SpecialEnv::SStableFile::Sync", [&](void* arg) { + if (sync_called.fetch_add(1) == 0) { + Status* st = static_cast(arg); + *st = Status::IOError("sync dummy error"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "foo1", "bar1")); + ASSERT_OK(Put(1, "foo", "bar2")); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + + // Following writes should fail as flush failed. + ASSERT_NOK(Put(1, "foo2", "bar3")); + ASSERT_EQ("bar2", Get(1, "foo")); + ASSERT_EQ("bar1", Get(1, "foo1")); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("bar2", Get(1, "foo")); + ASSERT_EQ("bar1", Get(1, "foo1")); +} + +TEST_F(DBIOFailureTest, CompactionSstSyncError) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.error_if_exists = false; + options.paranoid_checks = true; + options.level0_file_num_compaction_trigger = 2; + options.disable_auto_compactions = true; + options.use_fsync = false; + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Status s; + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "foo2", "bar")); + Flush(1); + ASSERT_OK(Put(1, "foo", "bar2")); + ASSERT_OK(Put(1, "foo2", "bar")); + Flush(1); + ASSERT_OK(Put(1, "foo", "bar3")); + ASSERT_OK(Put(1, "foo2", "bar")); + Flush(1); + dbfull()->TEST_WaitForCompact(); + + std::atomic sync_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SpecialEnv::SStableFile::Sync", [&](void* arg) { + if (sync_called.fetch_add(1) == 0) { + Status* st = static_cast(arg); + *st = Status::IOError("close dummy error"); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(dbfull()->SetOptions(handles_[1], + { + {"disable_auto_compactions", "false"}, + })); + dbfull()->TEST_WaitForCompact(); + + // Following writes should fail as compaction failed. + ASSERT_NOK(Put(1, "foo2", "bar3")); + ASSERT_EQ("bar3", Get(1, "foo")); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("bar3", Get(1, "foo")); +} +#endif // !(defined NDEBUG) || !defined(OS_WIN) } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test_util.h b/db/db_test_util.h index 755b6929f..d8f9ba8d7 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -247,6 +247,13 @@ class SpecialEnv : public EnvWrapper { } } Status Truncate(uint64_t size) override { return base_->Truncate(size); } + Status RangeSync(uint64_t offset, uint64_t nbytes) override { + Status s = base_->RangeSync(offset, nbytes); +#if !(defined NDEBUG) || !defined(OS_WIN) + TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s); +#endif // !(defined NDEBUG) || !defined(OS_WIN) + return s; + } Status Close() override { // SyncPoint is not supported in Released Windows Mode. #if !(defined NDEBUG) || !defined(OS_WIN) @@ -256,7 +263,11 @@ class SpecialEnv : public EnvWrapper { TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus", &preallocation_size); #endif // !(defined NDEBUG) || !defined(OS_WIN) - return base_->Close(); + Status s = base_->Close(); +#if !(defined NDEBUG) || !defined(OS_WIN) + TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s); +#endif // !(defined NDEBUG) || !defined(OS_WIN) + return s; } Status Flush() override { return base_->Flush(); } Status Sync() override { @@ -264,7 +275,11 @@ class SpecialEnv : public EnvWrapper { while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) { env_->SleepForMicroseconds(100000); } - return base_->Sync(); + Status s = base_->Sync(); +#if !(defined NDEBUG) || !defined(OS_WIN) + TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s); +#endif // !(defined NDEBUG) || !defined(OS_WIN) + return s; } void SetIOPriority(Env::IOPriority pri) override { base_->SetIOPriority(pri); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 910a70fb2..286ae37ee 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -535,6 +535,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); handle->set_offset(r->offset); handle->set_size(block_contents.size()); + assert(r->status.ok()); r->status = r->file->Append(block_contents); if (r->status.ok()) { char trailer[kBlockTrailerSize]; @@ -561,6 +562,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, } } + assert(r->status.ok()); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); if (r->status.ok()) { r->status = InsertBlockInCache(block_contents, type, handle); @@ -804,6 +806,7 @@ Status BlockBasedTableBuilder::Finish() { footer.set_checksum(r->table_options.checksum); std::string footer_encoding; footer.EncodeTo(&footer_encoding); + assert(r->status.ok()); r->status = r->file->Append(footer_encoding); if (r->status.ok()) { r->offset += footer_encoding.size();