// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include #include #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" #include "db/db_test_util.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" #include "utilities/fault_injection_env.h" namespace ROCKSDB_NAMESPACE { class DBBlobBasicTest : public DBTestBase { protected: DBBlobBasicTest() : DBTestBase("db_blob_basic_test", /* env_do_fsync */ false) {} }; TEST_F(DBBlobBasicTest, GetBlob) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); ASSERT_EQ(Get(key), blob_value); // Try again with no I/O allowed. The table and the necessary blocks should // already be in their respective caches; however, the blob itself can only be // read from the blob file, so the read should return Incomplete. ReadOptions read_options; read_options.read_tier = kBlockCacheTier; PinnableSlice result; ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result) .IsIncomplete()); } TEST_F(DBBlobBasicTest, MultiGetBlobs) { constexpr size_t min_blob_size = 6; Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = min_blob_size; Reopen(options); // Put then retrieve three key-values. The first value is below the size limit // and is thus stored inline; the other two are stored separately as blobs. constexpr size_t num_keys = 3; constexpr char first_key[] = "first_key"; constexpr char first_value[] = "short"; static_assert(sizeof(first_value) - 1 < min_blob_size, "first_value too long to be inlined"); ASSERT_OK(Put(first_key, first_value)); constexpr char second_key[] = "second_key"; constexpr char second_value[] = "long_value"; static_assert(sizeof(second_value) - 1 >= min_blob_size, "second_value too short to be stored as blob"); ASSERT_OK(Put(second_key, second_value)); constexpr char third_key[] = "third_key"; constexpr char third_value[] = "other_long_value"; static_assert(sizeof(third_value) - 1 >= min_blob_size, "third_value too short to be stored as blob"); ASSERT_OK(Put(third_key, third_value)); ASSERT_OK(Flush()); ReadOptions read_options; std::array keys{{first_key, second_key, third_key}}; { std::array values; std::array statuses; db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); ASSERT_OK(statuses[0]); ASSERT_EQ(values[0], first_value); ASSERT_OK(statuses[1]); ASSERT_EQ(values[1], second_value); ASSERT_OK(statuses[2]); ASSERT_EQ(values[2], third_value); } // Try again with no I/O allowed. The table and the necessary blocks should // already be in their respective caches. The first (inlined) value should be // successfully read; however, the two blob values could only be read from the // blob file, so for those the read should return Incomplete. read_options.read_tier = kBlockCacheTier; { std::array values; std::array statuses; db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); ASSERT_OK(statuses[0]); ASSERT_EQ(values[0], first_value); ASSERT_TRUE(statuses[1].IsIncomplete()); ASSERT_TRUE(statuses[2].IsIncomplete()); } } TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr size_t kNumBlobFiles = 3; constexpr size_t kNumBlobsPerFile = 3; constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles; std::vector key_strs; std::vector value_strs; for (size_t i = 0; i < kNumBlobFiles; ++i) { for (size_t j = 0; j < kNumBlobsPerFile; ++j) { std::string key = "key" + std::to_string(i) + "_" + std::to_string(j); std::string value = "value_as_blob" + std::to_string(i) + "_" + std::to_string(j); ASSERT_OK(Put(key, value)); key_strs.push_back(key); value_strs.push_back(value); } ASSERT_OK(Flush()); } assert(key_strs.size() == kNumKeys); std::array keys; for (size_t i = 0; i < keys.size(); ++i) { keys[i] = key_strs[i]; } std::array values; std::array statuses; db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0], &values[0], &statuses[0]); for (size_t i = 0; i < kNumKeys; ++i) { ASSERT_OK(statuses[i]); ASSERT_EQ(value_strs[i], values[i]); } } TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; // Fake a corrupt blob index. const std::string blob_index("foobar"); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsCorruption()); } TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; options.create_if_missing = true; DestroyAndReopen(options); constexpr size_t kNumOfKeys = 3; std::array key_strs; std::array value_strs; std::array keys; for (size_t i = 0; i < kNumOfKeys; ++i) { key_strs[i] = "foo" + std::to_string(i); value_strs[i] = "blob_value" + std::to_string(i); ASSERT_OK(Put(key_strs[i], value_strs[i])); keys[i] = key_strs[i]; } constexpr char key[] = "key"; { // Fake a corrupt blob index. const std::string blob_index("foobar"); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); keys[kNumOfKeys] = Slice(static_cast(key), sizeof(key) - 1); } ASSERT_OK(Flush()); std::array values; std::array statuses; db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1, keys.data(), values.data(), statuses.data(), /*sorted_input=*/false); for (size_t i = 0; i < kNumOfKeys + 1; ++i) { if (i != kNumOfKeys) { ASSERT_OK(statuses[i]); ASSERT_EQ("blob_value" + std::to_string(i), values[i]); } else { ASSERT_TRUE(statuses[i].IsCorruption()); } } } TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr size_t kNumOfKeys = 3; std::array key_bufs; std::array value_bufs; std::array keys; for (size_t i = 0; i < kNumOfKeys; ++i) { key_bufs[i] = "foo" + std::to_string(i); value_bufs[i] = "blob_value" + std::to_string(i); ASSERT_OK(Put(key_bufs[i], value_bufs[i])); keys[i] = key_bufs[i]; } ASSERT_OK(Flush()); std::array values; std::array statuses; ReadOptions read_opts; read_opts.value_size_soft_limit = 1; db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys, keys.data(), values.data(), statuses.data(), /*sorted_input=*/true); for (const auto& s : statuses) { ASSERT_TRUE(s.IsAborted()); } } TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) { constexpr uint64_t min_blob_size = 10; Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = min_blob_size; Reopen(options); constexpr char key[] = "key"; constexpr char blob[] = "short"; static_assert(sizeof(short) - 1 < min_blob_size, "Blob too long to be inlined"); // Fake an inlined TTL blob index. std::string blob_index; constexpr uint64_t expiration = 1234567890; BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsCorruption()); } TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; // Fake a blob index referencing a non-existent blob file. std::string blob_index; constexpr uint64_t blob_file_number = 1000; constexpr uint64_t offset = 1234; constexpr uint64_t size = 5678; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, kNoCompression); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsCorruption()); } #ifndef ROCKSDB_LITE TEST_F(DBBlobBasicTest, GenerateIOTracing) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; std::string trace_file = dbname_ + "/io_trace_file"; Reopen(options); { // Create IO trace file std::unique_ptr trace_writer; ASSERT_OK( NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer)); ASSERT_OK(db_->StartIOTrace(TraceOptions(), std::move(trace_writer))); constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); ASSERT_EQ(Get(key), blob_value); ASSERT_OK(db_->EndIOTrace()); ASSERT_OK(env_->FileExists(trace_file)); } { // Parse trace file to check file operations related to blob files are // recorded. std::unique_ptr trace_reader; ASSERT_OK( NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader)); IOTraceReader reader(std::move(trace_reader)); IOTraceHeader header; ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, static_cast(header.rocksdb_major_version)); ASSERT_EQ(kMinorVersion, static_cast(header.rocksdb_minor_version)); // Read records. int blob_files_op_count = 0; Status status; while (true) { IOTraceRecord record; status = reader.ReadIOOp(&record); if (!status.ok()) { break; } if (record.file_name.find("blob") != std::string::npos) { blob_files_op_count++; } } // Assuming blob files will have Append, Close and then Read operations. ASSERT_GT(blob_files_op_count, 2); } } #endif // !ROCKSDB_LITE TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; options.create_if_missing = true; Reopen(options); ASSERT_OK(dbfull()->DisableFileDeletions()); constexpr int kNumTableFiles = 2; for (int i = 0; i < kNumTableFiles; ++i) { for (char ch = 'a'; ch != 'c'; ++ch) { std::string key(1, ch); ASSERT_OK(Put(key, "value" + std::to_string(i))); } ASSERT_OK(Flush()); } Close(); std::vector files; ASSERT_OK(env_->GetChildren(dbname_, &files)); std::string blob_file_path; uint64_t max_blob_file_num = kInvalidBlobFileNumber; for (const auto& fname : files) { uint64_t file_num = 0; FileType type; if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) && type == kBlobFile) { if (file_num > max_blob_file_num) { max_blob_file_num = file_num; blob_file_path = dbname_ + "/" + fname; } } } ASSERT_OK(env_->DeleteFile(blob_file_path)); options.best_efforts_recovery = true; Reopen(options); std::string value; ASSERT_OK(db_->Get(ReadOptions(), "a", &value)); ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value); } TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) { Options options = GetDefaultOptions(); options.merge_operator = MergeOperators::CreateStringAppendOperator(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); ASSERT_OK(Put("Key1", "v1")); ASSERT_OK(Flush()); ASSERT_OK(Merge("Key1", "v2")); ASSERT_OK(Flush()); ASSERT_OK(Merge("Key1", "v3")); ASSERT_OK(Flush()); std::string value; ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value)); ASSERT_EQ(Get("Key1"), "v1,v2,v3"); } TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) { constexpr size_t num_keys = 3; Options options = GetDefaultOptions(); options.merge_operator = MergeOperators::CreateStringAppendOperator(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); ASSERT_OK(Put("Key0", "v0_0")); ASSERT_OK(Put("Key1", "v1_0")); ASSERT_OK(Put("Key2", "v2_0")); ASSERT_OK(Flush()); ASSERT_OK(Merge("Key0", "v0_1")); ASSERT_OK(Merge("Key1", "v1_1")); ASSERT_OK(Flush()); ASSERT_OK(Merge("Key0", "v0_2")); ASSERT_OK(Flush()); std::array keys{{"Key0", "Key1", "Key2"}}; std::array values; std::array statuses; db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); ASSERT_OK(statuses[0]); ASSERT_EQ(values[0], "v0_0,v0_1,v0_2"); ASSERT_OK(statuses[1]); ASSERT_EQ(values[1], "v1_0,v1_1"); ASSERT_OK(statuses[2]); ASSERT_EQ(values[2], "v2_0"); } #ifndef ROCKSDB_LITE TEST_F(DBBlobBasicTest, Properties) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key1[] = "key1"; constexpr size_t key1_size = sizeof(key1) - 1; constexpr char key2[] = "key2"; constexpr size_t key2_size = sizeof(key2) - 1; constexpr char key3[] = "key3"; constexpr size_t key3_size = sizeof(key3) - 1; constexpr char blob[] = "0000000000"; constexpr size_t blob_size = sizeof(blob) - 1; ASSERT_OK(Put(key1, blob)); ASSERT_OK(Put(key2, blob)); ASSERT_OK(Flush()); constexpr size_t first_blob_file_expected_size = BlobLogHeader::kSize + BlobLogRecord::CalculateAdjustmentForRecordHeader(key1_size) + blob_size + BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) + blob_size + BlobLogFooter::kSize; ASSERT_OK(Put(key3, blob)); ASSERT_OK(Flush()); constexpr size_t second_blob_file_expected_size = BlobLogHeader::kSize + BlobLogRecord::CalculateAdjustmentForRecordHeader(key3_size) + blob_size + BlobLogFooter::kSize; constexpr size_t total_expected_size = first_blob_file_expected_size + second_blob_file_expected_size; // Number of blob files uint64_t num_blob_files = 0; ASSERT_TRUE( db_->GetIntProperty(DB::Properties::kNumBlobFiles, &num_blob_files)); ASSERT_EQ(num_blob_files, 2); // Total size of live blob files uint64_t live_blob_file_size = 0; ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileSize, &live_blob_file_size)); ASSERT_EQ(live_blob_file_size, total_expected_size); // Total size of all blob files across all versions // Note: this should be the same as above since we only have one // version at this point. uint64_t total_blob_file_size = 0; ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize, &total_blob_file_size)); ASSERT_EQ(total_blob_file_size, total_expected_size); // Delete key2 to create some garbage ASSERT_OK(Delete(key2)); ASSERT_OK(Flush()); constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); constexpr size_t expected_garbage_size = BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) + blob_size; // Blob file stats std::string blob_stats; ASSERT_TRUE(db_->GetProperty(DB::Properties::kBlobStats, &blob_stats)); std::ostringstream oss; oss << "Number of blob files: 2\nTotal size of blob files: " << total_expected_size << "\nTotal size of garbage in blob files: " << expected_garbage_size << '\n'; ASSERT_EQ(blob_stats, oss.str()); } TEST_F(DBBlobBasicTest, PropertiesMultiVersion) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key1[] = "key1"; constexpr char key2[] = "key2"; constexpr char key3[] = "key3"; constexpr size_t key_size = sizeof(key1) - 1; static_assert(sizeof(key2) - 1 == key_size, "unexpected size: key2"); static_assert(sizeof(key3) - 1 == key_size, "unexpected size: key3"); constexpr char blob[] = "0000000000"; constexpr size_t blob_size = sizeof(blob) - 1; ASSERT_OK(Put(key1, blob)); ASSERT_OK(Flush()); ASSERT_OK(Put(key2, blob)); ASSERT_OK(Flush()); // Create an iterator to keep the current version alive std::unique_ptr iter(db_->NewIterator(ReadOptions())); ASSERT_OK(iter->status()); // Note: the Delete and subsequent compaction results in the first blob file // not making it to the final version. (It is still part of the previous // version kept alive by the iterator though.) On the other hand, the Put // results in a third blob file. ASSERT_OK(Delete(key1)); ASSERT_OK(Put(key3, blob)); ASSERT_OK(Flush()); constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); // Total size of all blob files across all versions: between the two versions, // we should have three blob files of the same size with one blob each. // The version kept alive by the iterator contains the first and the second // blob file, while the final version contains the second and the third blob // file. (The second blob file is thus shared by the two versions but should // be counted only once.) uint64_t total_blob_file_size = 0; ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize, &total_blob_file_size)); ASSERT_EQ(total_blob_file_size, 3 * (BlobLogHeader::kSize + BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + blob_size + BlobLogFooter::kSize)); } #endif // !ROCKSDB_LITE class DBBlobBasicIOErrorTest : public DBBlobBasicTest, public testing::WithParamInterface { protected: DBBlobBasicIOErrorTest() : sync_point_(GetParam()) { fault_injection_env_.reset(new FaultInjectionTestEnv(env_)); } ~DBBlobBasicIOErrorTest() { Close(); } std::unique_ptr fault_injection_env_; std::string sync_point_; }; class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest { public: DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {} }; INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest, ::testing::ValuesIn(std::vector{ "BlobFileReader::OpenFile:NewRandomAccessFile", "BlobFileReader::GetBlob:ReadFromFile"})); INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest, ::testing::ValuesIn(std::vector{ "BlobFileReader::OpenFile:NewRandomAccessFile", "BlobFileReader::MultiGetBlob:ReadFromFile"})); TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { Options options; options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsIOError()); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) { Options options = GetDefaultOptions(); options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr size_t num_keys = 2; constexpr char first_key[] = "first_key"; constexpr char first_value[] = "first_value"; ASSERT_OK(Put(first_key, first_value)); constexpr char second_key[] = "second_key"; constexpr char second_value[] = "second_value"; ASSERT_OK(Put(second_key, second_value)); ASSERT_OK(Flush()); std::array keys{{first_key, second_key}}; std::array values; std::array statuses; SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); ASSERT_TRUE(statuses[0].IsIOError()); ASSERT_TRUE(statuses[1].IsIOError()); } TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) { Options options = GetDefaultOptions(); options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr size_t num_keys = 2; constexpr char key1[] = "key1"; constexpr char value1[] = "blob1"; ASSERT_OK(Put(key1, value1)); ASSERT_OK(Flush()); constexpr char key2[] = "key2"; constexpr char value2[] = "blob2"; ASSERT_OK(Put(key2, value2)); ASSERT_OK(Flush()); std::array keys{{key1, key2}}; std::array values; std::array statuses; bool first_blob_file = true; SyncPoint::GetInstance()->SetCallBack( sync_point_, [&first_blob_file, this](void* /* arg */) { if (first_blob_file) { first_blob_file = false; return; } fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, keys.data(), values.data(), statuses.data()); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); ASSERT_OK(statuses[0]); ASSERT_EQ(value1, values[0]); ASSERT_TRUE(statuses[1].IsIOError()); } namespace { class ReadBlobCompactionFilter : public CompactionFilter { public: ReadBlobCompactionFilter() = default; const char* Name() const override { return "rocksdb.compaction.filter.read.blob"; } CompactionFilter::Decision FilterV2( int /*level*/, const Slice& /*key*/, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* /*skip_until*/) const override { if (value_type != CompactionFilter::ValueType::kValue) { return CompactionFilter::Decision::kKeep; } assert(new_value); new_value->assign(existing_value.data(), existing_value.size()); return CompactionFilter::Decision::kChangeValue; } }; } // anonymous namespace TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) { Options options = GetDefaultOptions(); options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; options.create_if_missing = true; std::unique_ptr compaction_filter_guard( new ReadBlobCompactionFilter); options.compaction_filter = compaction_filter_guard.get(); DestroyAndReopen(options); constexpr char key[] = "foo"; constexpr char blob_value[] = "foo_blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, /*end=*/nullptr) .IsIOError()); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS extern "C" { void RegisterCustomObjects(int argc, char** argv); } #else void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {} #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); RegisterCustomObjects(argc, argv); return RUN_ALL_TESTS(); }