Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
822e08eeab | ||
|
9177a0673b | ||
|
8608d75d85 | ||
|
75c83c5b61 | ||
|
939ffdc206 | ||
|
c56ad3c60a | ||
|
f9c6a87d18 | ||
|
8bd665331a | ||
|
9da3585891 | ||
|
d21b2a9699 | ||
|
43aee72181 | ||
|
8956288860 | ||
|
f2228962c5 | ||
|
eef93446a3 | ||
|
51ebe09f9a |
21
HISTORY.md
21
HISTORY.md
@ -1,4 +1,22 @@
|
||||
# Rocksdb Change Log
|
||||
## 6.20.3 (05/05/2021)
|
||||
### Bug Fixes
|
||||
* In the IngestExternalFile() API, only try to sync the ingested file if the file is linked and the FileSystem/Env supports reopening a writable file.
|
||||
|
||||
## 6.20.3 (05/05/2021)
|
||||
### Bug Fixes
|
||||
* Fixed a bug where `GetLiveFiles()` output included a non-existent file called "OPTIONS-000000". Backups and checkpoints, which use `GetLiveFiles()`, failed on DBs impacted by this bug. Read-write DBs were impacted when the latest OPTIONS file failed to write and `fail_if_options_file_error == false`. Read-only DBs were impacted when no OPTIONS files existed.
|
||||
|
||||
## 6.20.2 (04/23/2021)
|
||||
### Bug Fixes
|
||||
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
|
||||
* Fixed a bug where ingested files were written with incorrect boundary key metadata. In rare cases this could have led to a level's files being wrongly ordered and queries for the boundary keys returning wrong results.
|
||||
* Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`.
|
||||
* Fixed the false-positive alert when recovering from the WAL file. Avoid reporting "SST file is ahead of WAL" on a newly created empty column family, if the previous WAL file is corrupted.
|
||||
|
||||
### Behavior Changes
|
||||
* Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status.
|
||||
|
||||
## 6.20.0 (04/16/2021)
|
||||
### Behavior Changes
|
||||
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.
|
||||
@ -13,6 +31,8 @@
|
||||
* Fixed crash (divide by zero) when compression dictionary is applied to a file containing only range tombstones.
|
||||
* Fixed a backward iteration bug with partitioned filter enabled: not including the prefix of the last key of the previous filter partition in current filter partition can cause wrong iteration result.
|
||||
* Fixed a bug that allowed `DBOptions::max_open_files` to be set with a non-negative integer with `ColumnFamilyOptions::compaction_style = kCompactionStyleFIFO`.
|
||||
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
|
||||
* Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`.
|
||||
|
||||
### Performance Improvements
|
||||
* On ARM platform, use `yield` instead of `wfe` to relax cpu to gain better performance.
|
||||
@ -29,7 +49,6 @@
|
||||
* Added an optional output parameter to BackupEngine::CreateNewBackup(WithMetadata) to return the BackupID of the new backup.
|
||||
* Added BackupEngine::GetBackupInfo / GetLatestBackupInfo for querying individual backups.
|
||||
* Made the Ribbon filter a long-term supported feature in terms of the SST schema(compatible with version >= 6.15.0) though the API for enabling it is expected to change.
|
||||
* Added hybrid configuration of Ribbon filter and Bloom filter where some LSM levels use Ribbon for memory space efficiency and some use Bloom for speed. See NewExperimentalRibbonFilterPolicy. This also changes the default behavior of NewExperimentalRibbonFilterPolicy to use Bloom on level 0 and Ribbon on later levels.
|
||||
|
||||
## 6.19.0 (03/21/2021)
|
||||
### Bug Fixes
|
||||
|
@ -98,7 +98,14 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
|
||||
ret.emplace_back(CurrentFileName(""));
|
||||
ret.emplace_back(DescriptorFileName("", versions_->manifest_file_number()));
|
||||
ret.emplace_back(OptionsFileName("", versions_->options_file_number()));
|
||||
// The OPTIONS file number is zero in read-write mode when OPTIONS file
|
||||
// writing failed and the DB was configured with
|
||||
// `fail_if_options_file_error == false`. In read-only mode the OPTIONS file
|
||||
// number is zero when no OPTIONS file exist at all. In those cases we do not
|
||||
// record any OPTIONS file in the live file list.
|
||||
if (versions_->options_file_number() != 0) {
|
||||
ret.emplace_back(OptionsFileName("", versions_->options_file_number()));
|
||||
}
|
||||
|
||||
// find length of manifest file while holding the mutex lock
|
||||
*manifest_file_size = versions_->manifest_file_size();
|
||||
|
@ -2582,6 +2582,8 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
|
||||
|
||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
||||
immutable_db_options_.info_log.get());
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
assert(bg_flush_scheduled_);
|
||||
|
@ -943,7 +943,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (largest_file_number > next_file_number) {
|
||||
if (largest_file_number >= next_file_number) {
|
||||
versions_->next_file_number_.store(largest_file_number + 1);
|
||||
}
|
||||
|
||||
|
@ -285,6 +285,9 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
|
||||
const std::string manifest = DescriptorFileName(dbname_, 1);
|
||||
{
|
||||
if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
|
||||
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
|
||||
}
|
||||
std::unique_ptr<FSWritableFile> file;
|
||||
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
|
||||
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
|
||||
@ -314,7 +317,7 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
|
||||
manifest.substr(manifest.find_last_of("/\\") + 1));
|
||||
}
|
||||
} else {
|
||||
fs_->DeleteFile(manifest, IOOptions(), nullptr);
|
||||
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -1134,11 +1137,29 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
|
||||
immutable_db_options_.wal_recovery_mode ==
|
||||
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->GetLogNumber() > corrupted_wal_number) {
|
||||
// One special case cause cfd->GetLogNumber() > corrupted_wal_number but
|
||||
// the CF is still consistent: If a new column family is created during
|
||||
// the flush and the WAL sync fails at the same time, the new CF points to
|
||||
// the new WAL but the old WAL is curropted. Since the new CF is empty, it
|
||||
// is still consistent. We add the check of CF sst file size to avoid the
|
||||
// false positive alert.
|
||||
|
||||
// Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
|
||||
// the ignorance of a very rare inconsistency case caused in data
|
||||
// canclation. One CF is empty due to KV deletion. But those operations
|
||||
// are in the WAL. If the WAL is corrupted, the status of this CF might
|
||||
// not be consistent with others. However, the consistency check will be
|
||||
// bypassed due to empty CF.
|
||||
// TODO: a better and complete implementation is needed to ensure strict
|
||||
// consistency check in WAL recovery including hanlding the tailing
|
||||
// issues.
|
||||
if (cfd->GetLogNumber() > corrupted_wal_number &&
|
||||
cfd->GetLiveSstFilesSize() > 0) {
|
||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
||||
"Column family inconsistency: SST file contains data"
|
||||
" beyond the point of corruption.");
|
||||
return Status::Corruption("SST file is ahead of WALs");
|
||||
return Status::Corruption("SST file is ahead of WALs in CF " +
|
||||
cfd->GetName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6701,20 +6701,19 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) {
|
||||
Reopen(options);
|
||||
Random rnd(301);
|
||||
bool memory_limit_exceeded = false;
|
||||
uint64_t size_all_mem_table = 0;
|
||||
uint64_t cur_active_mem = 0;
|
||||
|
||||
ColumnFamilyData* cfd =
|
||||
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
std::string value = rnd.RandomString(1000);
|
||||
ASSERT_OK(Put("keykey_" + std::to_string(i), value));
|
||||
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
|
||||
ASSERT_TRUE(db_->GetIntProperty(db_->DefaultColumnFamily(),
|
||||
DB::Properties::kSizeAllMemTables,
|
||||
&size_all_mem_table));
|
||||
ASSERT_TRUE(db_->GetIntProperty(db_->DefaultColumnFamily(),
|
||||
DB::Properties::kCurSizeActiveMemTable,
|
||||
&cur_active_mem));
|
||||
const uint64_t cur_active_mem = cfd->mem()->ApproximateMemoryUsage();
|
||||
const uint64_t size_all_mem_table =
|
||||
cur_active_mem + cfd->imm()->ApproximateMemoryUsage();
|
||||
|
||||
// Errors out if memory usage keeps on increasing beyond the limit.
|
||||
// Once memory limit exceeds, memory_limit_exceeded is set and if
|
||||
|
121
db/db_test2.cc
121
db/db_test2.cc
@ -5439,6 +5439,98 @@ TEST_F(DBTest2, AutoPrefixMode1) {
|
||||
ASSERT_EQ("a1", iterator->key().ToString());
|
||||
}
|
||||
}
|
||||
|
||||
class RenameCurrentTest : public DBTestBase,
|
||||
public testing::WithParamInterface<std::string> {
|
||||
public:
|
||||
RenameCurrentTest()
|
||||
: DBTestBase("rename_current_test", /*env_do_fsync=*/true),
|
||||
sync_point_(GetParam()) {}
|
||||
|
||||
~RenameCurrentTest() override {}
|
||||
|
||||
void SetUp() override {
|
||||
env_->no_file_overwrite_.store(true, std::memory_order_release);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
env_->no_file_overwrite_.store(false, std::memory_order_release);
|
||||
}
|
||||
|
||||
void SetupSyncPoints() {
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
|
||||
Status* s = reinterpret_cast<Status*>(arg);
|
||||
assert(s);
|
||||
*s = Status::IOError("Injected IO error.");
|
||||
});
|
||||
}
|
||||
|
||||
const std::string sync_point_;
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
|
||||
::testing::Values("SetCurrentFile:BeforeRename",
|
||||
"SetCurrentFile:AfterRename"));
|
||||
|
||||
TEST_P(RenameCurrentTest, Open) {
|
||||
Destroy(last_options_);
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
SetupSyncPoints();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_NOK(s);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
Reopen(options);
|
||||
}
|
||||
|
||||
TEST_P(RenameCurrentTest, Flush) {
|
||||
Destroy(last_options_);
|
||||
Options options = GetDefaultOptions();
|
||||
options.max_manifest_file_size = 1;
|
||||
options.create_if_missing = true;
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("key", "value"));
|
||||
SetupSyncPoints();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_NOK(Flush());
|
||||
|
||||
ASSERT_NOK(Put("foo", "value"));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
Reopen(options);
|
||||
ASSERT_EQ("value", Get("key"));
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
}
|
||||
|
||||
TEST_P(RenameCurrentTest, Compaction) {
|
||||
Destroy(last_options_);
|
||||
Options options = GetDefaultOptions();
|
||||
options.max_manifest_file_size = 1;
|
||||
options.create_if_missing = true;
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("a", "a_value"));
|
||||
ASSERT_OK(Put("c", "c_value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ASSERT_OK(Put("b", "b_value"));
|
||||
ASSERT_OK(Put("d", "d_value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
SetupSyncPoints();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
||||
/*end=*/nullptr));
|
||||
|
||||
ASSERT_NOK(Put("foo", "value"));
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
Reopen(options);
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
ASSERT_EQ("d_value", Get("d"));
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
|
||||
@ -5466,6 +5558,35 @@ TEST_F(DBTest2, PointInTimeRecoveryWithIOErrorWhileReadingWal) {
|
||||
Status s = TryReopen(options);
|
||||
ASSERT_TRUE(s.IsIOError());
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::BackgroundCallFlush:Start:1",
|
||||
"PointInTimeRecoveryWithSyncFailureInCFCreation:1"},
|
||||
{"PointInTimeRecoveryWithSyncFailureInCFCreation:2",
|
||||
"DBImpl::BackgroundCallFlush:Start:2"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
CreateColumnFamilies({"test1"}, Options());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
|
||||
// Creating a CF when a flush is going on, log is synced but the
|
||||
// closed log file is not synced and corrupted.
|
||||
port::Thread flush_thread([&]() { ASSERT_NOK(Flush()); });
|
||||
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:1");
|
||||
CreateColumnFamilies({"test2"}, Options());
|
||||
env_->corrupt_in_sync_ = true;
|
||||
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:2");
|
||||
flush_thread.join();
|
||||
env_->corrupt_in_sync_ = false;
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
// Reopening the DB should not corrupt anything
|
||||
Options options = CurrentOptions();
|
||||
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
||||
ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -44,6 +44,7 @@ SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep)
|
||||
manifest_sync_error_.store(false, std::memory_order_release);
|
||||
manifest_write_error_.store(false, std::memory_order_release);
|
||||
log_write_error_.store(false, std::memory_order_release);
|
||||
no_file_overwrite_.store(false, std::memory_order_release);
|
||||
random_file_open_counter_.store(0, std::memory_order_relaxed);
|
||||
delete_count_.store(0, std::memory_order_relaxed);
|
||||
num_open_wal_file_.store(0);
|
||||
|
@ -393,6 +393,10 @@ class SpecialEnv : public EnvWrapper {
|
||||
Status Flush() override { return base_->Flush(); }
|
||||
Status Sync() override {
|
||||
++env_->sync_counter_;
|
||||
if (env_->corrupt_in_sync_) {
|
||||
Append(std::string(33000, ' '));
|
||||
return Status::IOError("Ingested Sync Failure");
|
||||
}
|
||||
if (env_->skip_fsync_) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
@ -440,6 +444,11 @@ class SpecialEnv : public EnvWrapper {
|
||||
std::unique_ptr<WritableFile> base_;
|
||||
};
|
||||
|
||||
if (no_file_overwrite_.load(std::memory_order_acquire) &&
|
||||
target()->FileExists(f).ok()) {
|
||||
return Status::NotSupported("SpecialEnv::no_file_overwrite_ is true.");
|
||||
}
|
||||
|
||||
if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
|
||||
uint32_t random_number;
|
||||
{
|
||||
@ -687,6 +696,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
// Slow down every log write, in micro-seconds.
|
||||
std::atomic<int> log_write_slowdown_;
|
||||
|
||||
// If true, returns Status::NotSupported for file overwrite.
|
||||
std::atomic<bool> no_file_overwrite_;
|
||||
|
||||
// Number of WAL files that are still open for write.
|
||||
std::atomic<int> num_open_wal_file_;
|
||||
|
||||
@ -709,6 +721,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
// If true, all fsync to files and directories are skipped.
|
||||
bool skip_fsync_ = false;
|
||||
|
||||
// If true, ingest the corruption to file during sync.
|
||||
bool corrupt_in_sync_ = false;
|
||||
|
||||
std::atomic<uint32_t> non_writeable_rate_;
|
||||
|
||||
std::atomic<uint32_t> new_writable_count_;
|
||||
|
@ -1136,6 +1136,41 @@ TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, ReopenNotSupported) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.env = env_;
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"ExternalSstFileIngestionJob::Prepare:Reopen", [&](void* arg) {
|
||||
Status* s = static_cast<Status*>(arg);
|
||||
*s = Status::NotSupported();
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Options sst_file_writer_options;
|
||||
sst_file_writer_options.env = env_;
|
||||
std::unique_ptr<SstFileWriter> sst_file_writer(
|
||||
new SstFileWriter(EnvOptions(), sst_file_writer_options));
|
||||
std::string file_name =
|
||||
sst_files_dir_ + "reopen_not_supported_test_" + ".sst";
|
||||
ASSERT_OK(sst_file_writer->Open(file_name));
|
||||
ASSERT_OK(sst_file_writer->Put("bar", "v2"));
|
||||
ASSERT_OK(sst_file_writer->Finish());
|
||||
|
||||
IngestExternalFileOptions ingest_opt;
|
||||
ingest_opt.move_files = true;
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
ASSERT_OK(db_->IngestExternalFile({file_name}, ingest_opt));
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, VerifyChecksumReadahead) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
@ -1542,6 +1577,44 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
|
||||
ASSERT_EQ(2, NumTableFilesAtLevel(0));
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, IngestFileAfterDBPut) {
|
||||
// Repro https://github.com/facebook/rocksdb/issues/6245.
|
||||
// Flush three files to L0. Ingest one more file to trigger L0->L1 compaction
|
||||
// via trivial move. The bug happened when L1 files were incorrectly sorted
|
||||
// resulting in an old value for "k" returned by `Get()`.
|
||||
Options options = CurrentOptions();
|
||||
|
||||
ASSERT_OK(Put("k", "a"));
|
||||
Flush();
|
||||
ASSERT_OK(Put("k", "a"));
|
||||
Flush();
|
||||
ASSERT_OK(Put("k", "a"));
|
||||
Flush();
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
|
||||
// Current file size should be 0 after sst_file_writer init and before open a
|
||||
// file.
|
||||
ASSERT_EQ(sst_file_writer.FileSize(), 0);
|
||||
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
ASSERT_OK(sst_file_writer.Put("k", "b"));
|
||||
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
ASSERT_OK(s) << s.ToString();
|
||||
|
||||
// Current file size should be non-zero after success write.
|
||||
ASSERT_GT(sst_file_writer.FileSize(), 0);
|
||||
|
||||
IngestExternalFileOptions ifo;
|
||||
s = db_->IngestExternalFile({file1}, ifo);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
|
||||
ASSERT_EQ(Get("k"), "b");
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
|
||||
testing::Values(std::make_tuple(true, true),
|
||||
std::make_tuple(true, false),
|
||||
|
@ -109,17 +109,26 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
// directory before ingest the file. For integrity of RocksDB we need
|
||||
// to sync the file.
|
||||
std::unique_ptr<FSWritableFile> file_to_sync;
|
||||
status = fs_->ReopenWritableFile(path_inside_db, env_options_,
|
||||
&file_to_sync, nullptr);
|
||||
if (status.ok()) {
|
||||
TEST_SYNC_POINT(
|
||||
"ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
|
||||
status = SyncIngestedFile(file_to_sync.get());
|
||||
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
|
||||
if (!status.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"Failed to sync ingested file %s: %s",
|
||||
path_inside_db.c_str(), status.ToString().c_str());
|
||||
Status s = fs_->ReopenWritableFile(path_inside_db, env_options_,
|
||||
&file_to_sync, nullptr);
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen",
|
||||
&s);
|
||||
// Some file systems (especially remote/distributed) don't support
|
||||
// reopening a file for writing and don't require reopening and
|
||||
// syncing the file. Ignore the NotSupported error in that case.
|
||||
if (!s.IsNotSupported()) {
|
||||
status = s;
|
||||
if (status.ok()) {
|
||||
TEST_SYNC_POINT(
|
||||
"ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
|
||||
status = SyncIngestedFile(file_to_sync.get());
|
||||
TEST_SYNC_POINT(
|
||||
"ExternalSstFileIngestionJob::AfterSyncIngestedFile");
|
||||
if (!status.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"Failed to sync ingested file %s: %s",
|
||||
path_inside_db.c_str(), status.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (status.IsNotSupported() &&
|
||||
@ -367,9 +376,32 @@ Status ExternalSstFileIngestionJob::Run() {
|
||||
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
|
||||
last_seqno, &f, &assigned_seqno);
|
||||
}
|
||||
|
||||
// Modify the smallest/largest internal key to include the sequence number
|
||||
// that we just learned. Only overwrite sequence number zero. There could
|
||||
// be a nonzero sequence number already to indicate a range tombstone's
|
||||
// exclusive endpoint.
|
||||
ParsedInternalKey smallest_parsed, largest_parsed;
|
||||
if (status.ok()) {
|
||||
status = ParseInternalKey(*f.smallest_internal_key.rep(),
|
||||
&smallest_parsed, false /* log_err_key */);
|
||||
}
|
||||
if (status.ok()) {
|
||||
status = ParseInternalKey(*f.largest_internal_key.rep(), &largest_parsed,
|
||||
false /* log_err_key */);
|
||||
}
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
if (smallest_parsed.sequence == 0) {
|
||||
UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno,
|
||||
smallest_parsed.type);
|
||||
}
|
||||
if (largest_parsed.sequence == 0) {
|
||||
UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno,
|
||||
largest_parsed.type);
|
||||
}
|
||||
|
||||
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
|
||||
&assigned_seqno);
|
||||
|
@ -751,21 +751,24 @@ bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/,
|
||||
bool InternalStats::HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
// Current size of the active memtable
|
||||
*value = cfd_->mem()->ApproximateMemoryUsage();
|
||||
// Using ApproximateMemoryUsageFast to avoid the need for synchronization
|
||||
*value = cfd_->mem()->ApproximateMemoryUsageFast();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool InternalStats::HandleCurSizeAllMemTables(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
// Current size of the active memtable + immutable memtables
|
||||
*value = cfd_->mem()->ApproximateMemoryUsage() +
|
||||
// Using ApproximateMemoryUsageFast to avoid the need for synchronization
|
||||
*value = cfd_->mem()->ApproximateMemoryUsageFast() +
|
||||
cfd_->imm()->ApproximateUnflushedMemTablesMemoryUsage();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool InternalStats::HandleSizeAllMemTables(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
*value = cfd_->mem()->ApproximateMemoryUsage() +
|
||||
// Using ApproximateMemoryUsageFast to avoid the need for synchronization
|
||||
*value = cfd_->mem()->ApproximateMemoryUsageFast() +
|
||||
cfd_->imm()->ApproximateMemoryUsage();
|
||||
return true;
|
||||
}
|
||||
|
@ -4083,6 +4083,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
uint64_t new_manifest_file_size = 0;
|
||||
Status s;
|
||||
IOStatus io_s;
|
||||
IOStatus manifest_io_status;
|
||||
{
|
||||
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
|
||||
mu->Unlock();
|
||||
@ -4134,6 +4135,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
s = WriteCurrentStateToManifest(curr_state, wal_additions,
|
||||
descriptor_log_.get(), io_s);
|
||||
} else {
|
||||
manifest_io_status = io_s;
|
||||
s = io_s;
|
||||
}
|
||||
}
|
||||
@ -4171,11 +4173,13 @@ Status VersionSet::ProcessManifestWrites(
|
||||
io_s = descriptor_log_->AddRecord(record);
|
||||
if (!io_s.ok()) {
|
||||
s = io_s;
|
||||
manifest_io_status = io_s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
io_s = SyncManifest(db_options_, descriptor_log_->file());
|
||||
manifest_io_status = io_s;
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
|
||||
}
|
||||
@ -4188,6 +4192,9 @@ Status VersionSet::ProcessManifestWrites(
|
||||
|
||||
// If we just created a new descriptor file, install it by writing a
|
||||
// new CURRENT file that points to it.
|
||||
if (s.ok()) {
|
||||
assert(manifest_io_status.ok());
|
||||
}
|
||||
if (s.ok() && new_descriptor_log) {
|
||||
io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
|
||||
db_directory);
|
||||
@ -4303,11 +4310,41 @@ Status VersionSet::ProcessManifestWrites(
|
||||
for (auto v : versions) {
|
||||
delete v;
|
||||
}
|
||||
if (manifest_io_status.ok()) {
|
||||
manifest_file_number_ = pending_manifest_file_number_;
|
||||
manifest_file_size_ = new_manifest_file_size;
|
||||
}
|
||||
// If manifest append failed for whatever reason, the file could be
|
||||
// corrupted. So we need to force the next version update to start a
|
||||
// new manifest file.
|
||||
descriptor_log_.reset();
|
||||
if (new_descriptor_log) {
|
||||
// If manifest operations failed, then we know the CURRENT file still
|
||||
// points to the original MANIFEST. Therefore, we can safely delete the
|
||||
// new MANIFEST.
|
||||
// If manifest operations succeeded, and we are here, then it is possible
|
||||
// that renaming tmp file to CURRENT failed.
|
||||
//
|
||||
// On local POSIX-compliant FS, the CURRENT must point to the original
|
||||
// MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
|
||||
// keep it. Future recovery will ignore this MANIFEST. It's also ok for the
|
||||
// process not to crash and continue using the db. Any future LogAndApply()
|
||||
// call will switch to a new MANIFEST and update CURRENT, still ignoring
|
||||
// this one.
|
||||
//
|
||||
// On non-local FS, it is
|
||||
// possible that the rename operation succeeded on the server (remote)
|
||||
// side, but the client somehow returns a non-ok status to RocksDB. Note
|
||||
// that this does not violate atomicity. Should we delete the new MANIFEST
|
||||
// successfully, a subsequent recovery attempt will likely see the CURRENT
|
||||
// pointing to the new MANIFEST, thus fail. We will not be able to open the
|
||||
// DB again. Therefore, if manifest operations succeed, we should keep the
|
||||
// the new MANIFEST. If the process proceeds, any future LogAndApply() call
|
||||
// will switch to a new MANIFEST and update CURRENT. If user tries to
|
||||
// re-open the DB,
|
||||
// a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
|
||||
// b) CURRENT points to the original MANIFEST, and the original MANIFEST
|
||||
// also exists.
|
||||
if (new_descriptor_log && !manifest_io_status.ok()) {
|
||||
ROCKS_LOG_INFO(db_options_->info_log,
|
||||
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
|
||||
"\n",
|
||||
|
@ -144,7 +144,7 @@ DECLARE_bool(enable_write_thread_adaptive_yield);
|
||||
DECLARE_int32(reopen);
|
||||
DECLARE_double(bloom_bits);
|
||||
DECLARE_bool(use_block_based_filter);
|
||||
DECLARE_int32(ribbon_starting_level);
|
||||
DECLARE_bool(use_ribbon_filter);
|
||||
DECLARE_bool(partition_filters);
|
||||
DECLARE_bool(optimize_filters_for_memory);
|
||||
DECLARE_int32(index_type);
|
||||
@ -258,6 +258,7 @@ DECLARE_bool(best_efforts_recovery);
|
||||
DECLARE_bool(skip_verifydb);
|
||||
DECLARE_bool(enable_compaction_filter);
|
||||
DECLARE_bool(paranoid_file_checks);
|
||||
DECLARE_bool(fail_if_options_file_error);
|
||||
DECLARE_uint64(batch_protection_bytes_per_key);
|
||||
|
||||
DECLARE_uint64(user_timestamp_size);
|
||||
|
@ -28,7 +28,9 @@ class DbStressEnvWrapper : public EnvWrapper {
|
||||
f.find(".restore") != std::string::npos) {
|
||||
return target()->DeleteFile(f);
|
||||
}
|
||||
return Status::OK();
|
||||
// Rename the file instead of deletion to keep the history, and
|
||||
// at the same time it is not visible to RocksDB.
|
||||
return target()->RenameFile(f, f + "_renamed_");
|
||||
}
|
||||
|
||||
// If true, all manifest files will not be delted in DeleteFile().
|
||||
|
@ -410,8 +410,8 @@ DEFINE_bool(use_block_based_filter, false,
|
||||
"use block based filter"
|
||||
"instead of full filter for block based table");
|
||||
|
||||
DEFINE_int32(ribbon_starting_level, false,
|
||||
"First level to use Ribbon filter instead of Bloom");
|
||||
DEFINE_bool(use_ribbon_filter, false,
|
||||
"Use Ribbon filter instead of Bloom filter");
|
||||
|
||||
DEFINE_bool(partition_filters, false,
|
||||
"use partitioned filters "
|
||||
@ -792,6 +792,10 @@ DEFINE_bool(paranoid_file_checks, true,
|
||||
"After writing every SST file, reopen it and read all the keys "
|
||||
"and validate checksums");
|
||||
|
||||
DEFINE_bool(fail_if_options_file_error, false,
|
||||
"Fail operations that fail to detect or properly persist options "
|
||||
"file.");
|
||||
|
||||
DEFINE_uint64(batch_protection_bytes_per_key, 0,
|
||||
"If nonzero, enables integrity protection in `WriteBatch` at the "
|
||||
"specified number of bytes per key. Currently the only supported "
|
||||
@ -808,4 +812,8 @@ DEFINE_uint64(user_timestamp_size, 0,
|
||||
"Number of bytes for a user-defined timestamp. Currently, only "
|
||||
"8-byte is supported");
|
||||
|
||||
DEFINE_int32(open_metadata_write_fault_one_in, 0,
|
||||
"On non-zero, enables fault injection on file metadata write "
|
||||
"during DB reopen.");
|
||||
|
||||
#endif // GFLAGS
|
||||
|
@ -30,6 +30,7 @@ DECLARE_int32(compaction_thread_pool_adjust_interval);
|
||||
DECLARE_int32(continuous_verification_interval);
|
||||
DECLARE_int32(read_fault_one_in);
|
||||
DECLARE_int32(write_fault_one_in);
|
||||
DECLARE_int32(open_metadata_write_fault_one_in);
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
class StressTest;
|
||||
|
@ -26,12 +26,11 @@ StressTest::StressTest()
|
||||
compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
|
||||
filter_policy_(
|
||||
FLAGS_bloom_bits >= 0
|
||||
? FLAGS_ribbon_starting_level < FLAGS_num_levels
|
||||
? NewExperimentalRibbonFilterPolicy(
|
||||
FLAGS_bloom_bits, FLAGS_ribbon_starting_level)
|
||||
: FLAGS_use_block_based_filter
|
||||
? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
|
||||
: NewBloomFilterPolicy(FLAGS_bloom_bits, false)
|
||||
? FLAGS_use_ribbon_filter
|
||||
? NewExperimentalRibbonFilterPolicy(FLAGS_bloom_bits)
|
||||
: FLAGS_use_block_based_filter
|
||||
? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
|
||||
: NewBloomFilterPolicy(FLAGS_bloom_bits, false)
|
||||
: nullptr),
|
||||
db_(nullptr),
|
||||
#ifndef ROCKSDB_LITE
|
||||
@ -2105,9 +2104,14 @@ void StressTest::PrintEnv() const {
|
||||
static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
|
||||
fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
|
||||
fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
|
||||
fprintf(stdout, "Open metadata write fault one in:\n");
|
||||
fprintf(stdout, " %d\n",
|
||||
FLAGS_open_metadata_write_fault_one_in);
|
||||
fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
|
||||
fprintf(stdout, "Best efforts recovery : %d\n",
|
||||
static_cast<int>(FLAGS_best_efforts_recovery));
|
||||
fprintf(stdout, "Fail if OPTIONS file error: %d\n",
|
||||
static_cast<int>(FLAGS_fail_if_options_file_error));
|
||||
fprintf(stdout, "User timestamp size bytes : %d\n",
|
||||
static_cast<int>(FLAGS_user_timestamp_size));
|
||||
|
||||
@ -2326,6 +2330,7 @@ void StressTest::Open() {
|
||||
|
||||
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
|
||||
options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
|
||||
options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
|
||||
|
||||
if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
|
||||
FLAGS_allow_setting_blob_options_dynamically) &&
|
||||
@ -2410,33 +2415,78 @@ void StressTest::Open() {
|
||||
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
|
||||
options_.create_missing_column_families = true;
|
||||
if (!FLAGS_use_txn) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
// StackableDB-based BlobDB
|
||||
if (FLAGS_use_blob_db) {
|
||||
blob_db::BlobDBOptions blob_db_options;
|
||||
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
|
||||
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
|
||||
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
|
||||
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
|
||||
blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
|
||||
|
||||
blob_db::BlobDB* blob_db = nullptr;
|
||||
s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
|
||||
cf_descriptors, &column_families_, &blob_db);
|
||||
if (s.ok()) {
|
||||
db_ = blob_db;
|
||||
}
|
||||
} else
|
||||
#endif // !ROCKSDB_LITE
|
||||
{
|
||||
if (db_preload_finished_.load() && FLAGS_read_only) {
|
||||
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
|
||||
&column_families_, &db_);
|
||||
} else {
|
||||
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
|
||||
&column_families_, &db_);
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
// Determine whether we need to ingest file metadata write failures
|
||||
// during DB reopen. If it does, enable it.
|
||||
// Only ingest metadata error if it is reopening, as initial open
|
||||
// failure doesn't need to be handled.
|
||||
// TODO cover transaction DB is not covered in this fault test too.
|
||||
bool ingest_meta_error =
|
||||
FLAGS_open_metadata_write_fault_one_in &&
|
||||
fault_fs_guard
|
||||
->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
|
||||
.ok();
|
||||
if (ingest_meta_error) {
|
||||
fault_fs_guard->EnableMetadataWriteErrorInjection();
|
||||
fault_fs_guard->SetRandomMetadataWriteError(
|
||||
FLAGS_open_metadata_write_fault_one_in);
|
||||
}
|
||||
while (true) {
|
||||
#endif // NDEBUG
|
||||
#ifndef ROCKSDB_LITE
|
||||
// StackableDB-based BlobDB
|
||||
if (FLAGS_use_blob_db) {
|
||||
blob_db::BlobDBOptions blob_db_options;
|
||||
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
|
||||
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
|
||||
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
|
||||
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
|
||||
blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
|
||||
|
||||
blob_db::BlobDB* blob_db = nullptr;
|
||||
s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
|
||||
cf_descriptors, &column_families_,
|
||||
&blob_db);
|
||||
if (s.ok()) {
|
||||
db_ = blob_db;
|
||||
}
|
||||
} else
|
||||
#endif // !ROCKSDB_LITE
|
||||
{
|
||||
if (db_preload_finished_.load() && FLAGS_read_only) {
|
||||
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
|
||||
cf_descriptors, &column_families_, &db_);
|
||||
} else {
|
||||
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
|
||||
&column_families_, &db_);
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
if (ingest_meta_error) {
|
||||
fault_fs_guard->DisableMetadataWriteErrorInjection();
|
||||
if (!s.ok()) {
|
||||
// After failure to opening a DB due to IO error, retry should
|
||||
// successfully open the DB with correct data if no IO error shows
|
||||
// up.
|
||||
ingest_meta_error = false;
|
||||
|
||||
Random rand(static_cast<uint32_t>(FLAGS_seed));
|
||||
if (rand.OneIn(2)) {
|
||||
fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
|
||||
nullptr);
|
||||
}
|
||||
if (rand.OneIn(3)) {
|
||||
fault_fs_guard->DropUnsyncedFileData();
|
||||
} else if (rand.OneIn(2)) {
|
||||
fault_fs_guard->DropRandomUnsyncedFileData(&rand);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif // NDEBUG
|
||||
} else {
|
||||
#ifndef ROCKSDB_LITE
|
||||
TransactionDBOptions txn_db_options;
|
||||
|
@ -98,7 +98,7 @@ int db_stress_tool(int argc, char** argv) {
|
||||
|
||||
#ifndef NDEBUG
|
||||
if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection ||
|
||||
FLAGS_write_fault_one_in) {
|
||||
FLAGS_write_fault_one_in || FLAGS_open_metadata_write_fault_one_in) {
|
||||
FaultInjectionTestFS* fs =
|
||||
new FaultInjectionTestFS(raw_env->GetFileSystem());
|
||||
fault_fs_guard.reset(fs);
|
||||
|
@ -383,10 +383,12 @@ IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
|
||||
contents.remove_prefix(dbname.size() + 1);
|
||||
std::string tmp = TempFileName(dbname, descriptor_number);
|
||||
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
|
||||
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:BeforeRename", &s);
|
||||
if (s.ok()) {
|
||||
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
|
||||
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
|
||||
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
|
||||
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:AfterRename", &s);
|
||||
}
|
||||
if (s.ok()) {
|
||||
if (directory_to_fsync != nullptr) {
|
||||
|
@ -217,7 +217,7 @@ extern const FilterPolicy* NewBloomFilterPolicy(
|
||||
double bits_per_key, bool use_block_based_builder = false);
|
||||
|
||||
// An new Bloom alternative that saves about 30% space compared to
|
||||
// Bloom filters, with about 3-4x construction time and similar
|
||||
// Bloom filters, with about 3-4x construction CPU time and similar
|
||||
// query times. For example, if you pass in 10 for
|
||||
// bloom_equivalent_bits_per_key, you'll get the same 0.95% FP rate
|
||||
// as Bloom filter but only using about 7 bits per key. (This
|
||||
@ -225,24 +225,16 @@ extern const FilterPolicy* NewBloomFilterPolicy(
|
||||
// and/or transitional, so is expected to be replaced with a new API.
|
||||
// The constructed filters will be given long-term support.)
|
||||
//
|
||||
// The space savings of Ribbon filters makes sense for lower (higher
|
||||
// numbered; larger; longer-lived) levels of LSM, whereas the speed of
|
||||
// Bloom filters make sense for highest levels of LSM. Setting
|
||||
// ribbon_starting_level allows for this design. For example,
|
||||
// ribbon_starting_level=1 means that Bloom filters will be used in
|
||||
// level 0, including flushes, and Ribbon filters elsewhere.
|
||||
// ribbon_starting_level=0 means (almost) always use Ribbon.
|
||||
//
|
||||
// Ribbon filters are compatible with RocksDB >= 6.15.0. Earlier
|
||||
// versions reading the data will behave as if no filter was used
|
||||
// (degraded performance until compaction rebuilds filters).
|
||||
//
|
||||
// Note: even with ribbon_starting_level=0, this policy can generate
|
||||
// Bloom filters in some cases. For very small filters (well under 1KB),
|
||||
// Bloom fallback is by design, as the current Ribbon schema is not
|
||||
// optimized to save vs. Bloom for such small filters. Other cases of
|
||||
// Bloom fallback should be exceptional and log an appropriate warning.
|
||||
// Note: this policy can generate Bloom filters in some cases.
|
||||
// For very small filters (well under 1KB), Bloom fallback is by
|
||||
// design, as the current Ribbon schema is not optimized to save vs.
|
||||
// Bloom for such small filters. Other cases of Bloom fallback should
|
||||
// be exceptional and log an appropriate warning.
|
||||
extern const FilterPolicy* NewExperimentalRibbonFilterPolicy(
|
||||
double bloom_equivalent_bits_per_key, int ribbon_starting_level = 1);
|
||||
double bloom_equivalent_bits_per_key);
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 20
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 4
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -1479,8 +1479,8 @@ public class RocksDBTest {
|
||||
assertThat(livefiles.manifestFileSize).isEqualTo(57);
|
||||
assertThat(livefiles.files.size()).isEqualTo(3);
|
||||
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
|
||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000003");
|
||||
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000006");
|
||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");
|
||||
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -940,15 +940,6 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
|
||||
&new_opt));
|
||||
ASSERT_TRUE(new_opt.filter_policy != nullptr);
|
||||
bfp = dynamic_cast<const BloomFilterPolicy*>(new_opt.filter_policy.get());
|
||||
// Not a BloomFilterPolicy
|
||||
EXPECT_FALSE(bfp);
|
||||
|
||||
ASSERT_OK(GetBlockBasedTableOptionsFromString(
|
||||
config_options, table_opt, "filter_policy=experimental_ribbon:5.678:0;",
|
||||
&new_opt));
|
||||
ASSERT_TRUE(new_opt.filter_policy != nullptr);
|
||||
bfp = dynamic_cast<const BloomFilterPolicy*>(new_opt.filter_policy.get());
|
||||
// Pure Ribbon configuration is (oddly) BloomFilterPolicy
|
||||
EXPECT_EQ(bfp->GetMillibitsPerKey(), 5678);
|
||||
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kStandard128Ribbon);
|
||||
|
||||
|
@ -23,7 +23,6 @@
|
||||
#include "util/hash.h"
|
||||
#include "util/ribbon_config.h"
|
||||
#include "util/ribbon_impl.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
@ -1055,7 +1054,7 @@ BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
|
||||
|
||||
BloomFilterPolicy::~BloomFilterPolicy() {}
|
||||
|
||||
const char* BuiltinFilterPolicy::Name() const {
|
||||
const char* BloomFilterPolicy::Name() const {
|
||||
return "rocksdb.BuiltinBloomFilter";
|
||||
}
|
||||
|
||||
@ -1088,8 +1087,8 @@ void BloomFilterPolicy::CreateFilter(const Slice* keys, int n,
|
||||
}
|
||||
}
|
||||
|
||||
bool BuiltinFilterPolicy::KeyMayMatch(const Slice& key,
|
||||
const Slice& bloom_filter) const {
|
||||
bool BloomFilterPolicy::KeyMayMatch(const Slice& key,
|
||||
const Slice& bloom_filter) const {
|
||||
const size_t len = bloom_filter.size();
|
||||
if (len < 2 || len > 0xffffffffU) {
|
||||
return false;
|
||||
@ -1111,7 +1110,7 @@ bool BuiltinFilterPolicy::KeyMayMatch(const Slice& key,
|
||||
array);
|
||||
}
|
||||
|
||||
FilterBitsBuilder* BuiltinFilterPolicy::GetFilterBitsBuilder() const {
|
||||
FilterBitsBuilder* BloomFilterPolicy::GetFilterBitsBuilder() const {
|
||||
// This code path should no longer be used, for the built-in
|
||||
// BloomFilterPolicy. Internal to RocksDB and outside
|
||||
// BloomFilterPolicy, only get a FilterBitsBuilder with
|
||||
@ -1185,7 +1184,7 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext(
|
||||
|
||||
// Read metadata to determine what kind of FilterBitsReader is needed
|
||||
// and return a new one.
|
||||
FilterBitsReader* BuiltinFilterPolicy::GetFilterBitsReader(
|
||||
FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
|
||||
const Slice& contents) const {
|
||||
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
|
||||
if (len_with_meta <= kMetadataLen) {
|
||||
@ -1266,7 +1265,7 @@ FilterBitsReader* BuiltinFilterPolicy::GetFilterBitsReader(
|
||||
log2_cache_line_size);
|
||||
}
|
||||
|
||||
FilterBitsReader* BuiltinFilterPolicy::GetRibbonBitsReader(
|
||||
FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
|
||||
const Slice& contents) const {
|
||||
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
|
||||
uint32_t len = len_with_meta - kMetadataLen;
|
||||
@ -1290,7 +1289,7 @@ FilterBitsReader* BuiltinFilterPolicy::GetRibbonBitsReader(
|
||||
}
|
||||
|
||||
// For newer Bloom filter implementations
|
||||
FilterBitsReader* BuiltinFilterPolicy::GetBloomBitsReader(
|
||||
FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
|
||||
const Slice& contents) const {
|
||||
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
|
||||
uint32_t len = len_with_meta - kMetadataLen;
|
||||
@ -1363,50 +1362,10 @@ const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
|
||||
return new BloomFilterPolicy(bits_per_key, m);
|
||||
}
|
||||
|
||||
// Chooses between two filter policies based on LSM level
|
||||
class LevelThresholdFilterPolicy : public BuiltinFilterPolicy {
|
||||
public:
|
||||
LevelThresholdFilterPolicy(std::unique_ptr<const FilterPolicy>&& a,
|
||||
std::unique_ptr<const FilterPolicy>&& b,
|
||||
int starting_level_for_b)
|
||||
: policy_a_(std::move(a)),
|
||||
policy_b_(std::move(b)),
|
||||
starting_level_for_b_(starting_level_for_b) {
|
||||
assert(starting_level_for_b_ >= 0);
|
||||
}
|
||||
|
||||
// Deprecated block-based filter only
|
||||
void CreateFilter(const Slice* keys, int n, std::string* dst) const override {
|
||||
policy_a_->CreateFilter(keys, n, dst);
|
||||
}
|
||||
|
||||
FilterBitsBuilder* GetBuilderWithContext(
|
||||
const FilterBuildingContext& context) const override {
|
||||
if (context.level_at_creation >= starting_level_for_b_) {
|
||||
return policy_b_->GetBuilderWithContext(context);
|
||||
} else {
|
||||
return policy_a_->GetBuilderWithContext(context);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
const std::unique_ptr<const FilterPolicy> policy_a_;
|
||||
const std::unique_ptr<const FilterPolicy> policy_b_;
|
||||
int starting_level_for_b_;
|
||||
};
|
||||
|
||||
extern const FilterPolicy* NewExperimentalRibbonFilterPolicy(
|
||||
double bloom_equivalent_bits_per_key, int ribbon_starting_level) {
|
||||
std::unique_ptr<const FilterPolicy> ribbon_only{new BloomFilterPolicy(
|
||||
bloom_equivalent_bits_per_key, BloomFilterPolicy::kStandard128Ribbon)};
|
||||
if (ribbon_starting_level > 0) {
|
||||
std::unique_ptr<const FilterPolicy> bloom_only{new BloomFilterPolicy(
|
||||
bloom_equivalent_bits_per_key, BloomFilterPolicy::kFastLocalBloom)};
|
||||
return new LevelThresholdFilterPolicy(
|
||||
std::move(bloom_only), std::move(ribbon_only), ribbon_starting_level);
|
||||
} else {
|
||||
return ribbon_only.release();
|
||||
}
|
||||
double bloom_equivalent_bits_per_key) {
|
||||
return new BloomFilterPolicy(bloom_equivalent_bits_per_key,
|
||||
BloomFilterPolicy::kStandard128Ribbon);
|
||||
}
|
||||
|
||||
FilterBuildingContext::FilterBuildingContext(
|
||||
@ -1437,18 +1396,10 @@ Status FilterPolicy::CreateFromString(
|
||||
NewBloomFilterPolicy(bits_per_key, use_block_based_builder));
|
||||
}
|
||||
} else if (value.compare(0, kExpRibbonName.size(), kExpRibbonName) == 0) {
|
||||
size_t pos = value.find(':', kExpRibbonName.size());
|
||||
int ribbon_starting_level;
|
||||
if (pos == std::string::npos) {
|
||||
pos = value.size();
|
||||
ribbon_starting_level = 1;
|
||||
} else {
|
||||
ribbon_starting_level = ParseInt(trim(value.substr(pos + 1)));
|
||||
}
|
||||
double bloom_equivalent_bits_per_key =
|
||||
ParseDouble(trim(value.substr(kExpRibbonName.size(), pos)));
|
||||
policy->reset(NewExperimentalRibbonFilterPolicy(
|
||||
bloom_equivalent_bits_per_key, ribbon_starting_level));
|
||||
ParseDouble(trim(value.substr(kExpRibbonName.size())));
|
||||
policy->reset(
|
||||
NewExperimentalRibbonFilterPolicy(bloom_equivalent_bits_per_key));
|
||||
} else {
|
||||
return Status::NotFound("Invalid filter policy name ", value);
|
||||
#else
|
||||
|
@ -38,38 +38,10 @@ class BuiltinFilterBitsBuilder : public FilterBitsBuilder {
|
||||
virtual double EstimatedFpRate(size_t num_entries, size_t bytes) = 0;
|
||||
};
|
||||
|
||||
// Abstract base class for RocksDB built-in filter policies.
|
||||
// This class is considered internal API and subject to change.
|
||||
class BuiltinFilterPolicy : public FilterPolicy {
|
||||
public:
|
||||
// Shared name because any built-in policy can read filters from
|
||||
// any other
|
||||
const char* Name() const override;
|
||||
|
||||
// Deprecated block-based filter only
|
||||
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override;
|
||||
|
||||
// Old API
|
||||
FilterBitsBuilder* GetFilterBitsBuilder() const override;
|
||||
|
||||
// Read metadata to determine what kind of FilterBitsReader is needed
|
||||
// and return a new one. This must successfully process any filter data
|
||||
// generated by a built-in FilterBitsBuilder, regardless of the impl
|
||||
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
|
||||
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
|
||||
|
||||
private:
|
||||
// For newer Bloom filter implementation(s)
|
||||
FilterBitsReader* GetBloomBitsReader(const Slice& contents) const;
|
||||
|
||||
// For Ribbon filter implementation(s)
|
||||
FilterBitsReader* GetRibbonBitsReader(const Slice& contents) const;
|
||||
};
|
||||
|
||||
// RocksDB built-in filter policy for Bloom or Bloom-like filters.
|
||||
// This class is considered internal API and subject to change.
|
||||
// See NewBloomFilterPolicy.
|
||||
class BloomFilterPolicy : public BuiltinFilterPolicy {
|
||||
class BloomFilterPolicy : public FilterPolicy {
|
||||
public:
|
||||
// An internal marker for operating modes of BloomFilterPolicy, in terms
|
||||
// of selecting an implementation. This makes it easier for tests to track
|
||||
@ -116,9 +88,16 @@ class BloomFilterPolicy : public BuiltinFilterPolicy {
|
||||
|
||||
~BloomFilterPolicy() override;
|
||||
|
||||
const char* Name() const override;
|
||||
|
||||
// Deprecated block-based filter only
|
||||
void CreateFilter(const Slice* keys, int n, std::string* dst) const override;
|
||||
|
||||
// Deprecated block-based filter only
|
||||
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override;
|
||||
|
||||
FilterBitsBuilder* GetFilterBitsBuilder() const override;
|
||||
|
||||
// To use this function, call GetBuilderFromContext().
|
||||
//
|
||||
// Neither the context nor any objects therein should be saved beyond
|
||||
@ -131,6 +110,12 @@ class BloomFilterPolicy : public BuiltinFilterPolicy {
|
||||
// (An internal convenience function to save boilerplate.)
|
||||
static FilterBitsBuilder* GetBuilderFromContext(const FilterBuildingContext&);
|
||||
|
||||
// Read metadata to determine what kind of FilterBitsReader is needed
|
||||
// and return a new one. This must successfully process any filter data
|
||||
// generated by a built-in FilterBitsBuilder, regardless of the impl
|
||||
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
|
||||
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
|
||||
|
||||
// Essentially for testing only: configured millibits/key
|
||||
int GetMillibitsPerKey() const { return millibits_per_key_; }
|
||||
// Essentially for testing only: legacy whole bits/key
|
||||
@ -172,6 +157,12 @@ class BloomFilterPolicy : public BuiltinFilterPolicy {
|
||||
// Sum over all generated filters f:
|
||||
// (predicted_fp_rate(f) - predicted_fp_rate(f|o_f_f_m=false)) * 2^32
|
||||
mutable std::atomic<int64_t> aggregate_rounding_balance_;
|
||||
|
||||
// For newer Bloom filter implementation(s)
|
||||
FilterBitsReader* GetBloomBitsReader(const Slice& contents) const;
|
||||
|
||||
// For Ribbon filter implementation(s)
|
||||
FilterBitsReader* GetRibbonBitsReader(const Slice& contents) const;
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -61,6 +61,7 @@ default_params = {
|
||||
"enable_pipelined_write": lambda: random.randint(0, 1),
|
||||
"enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
|
||||
"expected_values_path": lambda: setup_expected_values_file(),
|
||||
"fail_if_options_file_error": lambda: random.randint(0, 1),
|
||||
"flush_one_in": 1000000,
|
||||
"file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
|
||||
"get_live_files_one_in": 1000000,
|
||||
@ -102,7 +103,7 @@ default_params = {
|
||||
"mock_direct_io": False,
|
||||
"use_full_merge_v1": lambda: random.randint(0, 1),
|
||||
"use_merge": lambda: random.randint(0, 1),
|
||||
"ribbon_starting_level": lambda: random.randint(0, 10),
|
||||
"use_ribbon_filter": lambda: random.randint(0, 1),
|
||||
"verify_checksum": 1,
|
||||
"write_buffer_size": 4 * 1024 * 1024,
|
||||
"writepercent": 35,
|
||||
@ -137,6 +138,7 @@ default_params = {
|
||||
"max_key_len": 3,
|
||||
"key_len_percent_dist": "1,30,69",
|
||||
"read_fault_one_in": lambda: random.choice([0, 1000]),
|
||||
"open_metadata_write_fault_one_in": lambda: random.choice([0, 8]),
|
||||
"sync_fault_injection": False,
|
||||
"get_property_one_in": 1000000,
|
||||
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
|
||||
|
@ -1195,51 +1195,6 @@ INSTANTIATE_TEST_CASE_P(Full, FullBloomTest,
|
||||
BloomFilterPolicy::kFastLocalBloom,
|
||||
BloomFilterPolicy::kStandard128Ribbon));
|
||||
|
||||
static double GetEffectiveBitsPerKey(FilterBitsBuilder* builder) {
|
||||
union {
|
||||
uint64_t key_value;
|
||||
char key_bytes[8];
|
||||
};
|
||||
|
||||
const unsigned kNumKeys = 1000;
|
||||
|
||||
Slice key_slice{key_bytes, 8};
|
||||
for (key_value = 0; key_value < kNumKeys; ++key_value) {
|
||||
builder->AddKey(key_slice);
|
||||
}
|
||||
|
||||
std::unique_ptr<const char[]> buf;
|
||||
auto filter = builder->Finish(&buf);
|
||||
return filter.size() * /*bits per byte*/ 8 / (1.0 * kNumKeys);
|
||||
}
|
||||
|
||||
TEST(RibbonTest, RibbonTestLevelThreshold) {
|
||||
BlockBasedTableOptions opts;
|
||||
FilterBuildingContext ctx(opts);
|
||||
// A few settings
|
||||
for (int ribbon_starting_level : {0, 1, 10}) {
|
||||
std::unique_ptr<const FilterPolicy> policy{
|
||||
NewExperimentalRibbonFilterPolicy(8, ribbon_starting_level)};
|
||||
|
||||
// Claim to be generating filter for this level
|
||||
ctx.level_at_creation = ribbon_starting_level;
|
||||
std::unique_ptr<FilterBitsBuilder> builder{
|
||||
policy->GetBuilderWithContext(ctx)};
|
||||
|
||||
// Must be Ribbon (more space efficient than 8 bits per key)
|
||||
ASSERT_LT(GetEffectiveBitsPerKey(builder.get()), 7.5);
|
||||
|
||||
if (ribbon_starting_level > 0) {
|
||||
// Claim to be generating filter for this level
|
||||
ctx.level_at_creation = ribbon_starting_level - 1;
|
||||
builder.reset(policy->GetBuilderWithContext(ctx));
|
||||
|
||||
// Must be Bloom (~ 8 bits per key)
|
||||
ASSERT_GT(GetEffectiveBitsPerKey(builder.get()), 7.5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -2716,19 +2716,19 @@ TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) {
|
||||
OpenDBAndBackupEngine(true);
|
||||
|
||||
ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"));
|
||||
std::string file_five = backupdir_ + "/shared/000008.sst";
|
||||
std::string file_five = backupdir_ + "/shared/000009.sst";
|
||||
std::string file_five_contents = "I'm not really a sst file";
|
||||
// this depends on the fact that 00008.sst is the first file created by the DB
|
||||
// this depends on the fact that 00009.sst is the first file created by the DB
|
||||
ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents));
|
||||
|
||||
FillDB(db_.get(), 0, 100);
|
||||
// backup overwrites file 000008.sst
|
||||
// backup overwrites file 000009.sst
|
||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||
|
||||
std::string new_file_five_contents;
|
||||
ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five,
|
||||
&new_file_five_contents));
|
||||
// file 000008.sst was overwritten
|
||||
// file 000009.sst was overwritten
|
||||
ASSERT_TRUE(new_file_five_contents != file_five_contents);
|
||||
|
||||
CloseDBAndBackupEngine();
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "test_util/testharness.h"
|
||||
#include "test_util/testutil.h"
|
||||
#include "utilities/fault_injection_env.h"
|
||||
#include "utilities/fault_injection_fs.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
class CheckpointTest : public testing::Test {
|
||||
@ -793,6 +794,50 @@ TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) {
|
||||
db_ = nullptr;
|
||||
}
|
||||
|
||||
TEST_F(CheckpointTest, CheckpointOptionsFileFailedToPersist) {
|
||||
// Regression test for a bug where checkpoint failed on a DB where persisting
|
||||
// OPTIONS file failed and the DB was opened with
|
||||
// `fail_if_options_file_error == false`.
|
||||
Options options = CurrentOptions();
|
||||
options.fail_if_options_file_error = false;
|
||||
auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
|
||||
|
||||
// Setup `FaultInjectionTestFS` and `SyncPoint` callbacks to fail one
|
||||
// operation when inside the OPTIONS file persisting code.
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
fault_fs->SetRandomMetadataWriteError(1 /* one_in */);
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"PersistRocksDBOptions:start", [fault_fs](void* /* arg */) {
|
||||
fault_fs->EnableMetadataWriteErrorInjection();
|
||||
});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"FaultInjectionTestFS::InjectMetadataWriteError:Injected",
|
||||
[fault_fs](void* /* arg */) {
|
||||
fault_fs->DisableMetadataWriteErrorInjection();
|
||||
});
|
||||
options.env = fault_fs_env.get();
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("key1", "val1"));
|
||||
Checkpoint* checkpoint;
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
|
||||
delete checkpoint;
|
||||
|
||||
// Make sure it's usable.
|
||||
options.env = env_;
|
||||
DB* snapshot_db;
|
||||
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
|
||||
ReadOptions read_opts;
|
||||
std::string get_result;
|
||||
ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result));
|
||||
ASSERT_EQ("val1", get_result);
|
||||
delete snapshot_db;
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
}
|
||||
|
||||
TEST_F(CheckpointTest, CheckpointReadOnlyDB) {
|
||||
ASSERT_OK(Put("foo", "foo_value"));
|
||||
ASSERT_OK(Flush());
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "env/composite_env_wrapper.h"
|
||||
#include "port/lang.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/crc32c.h"
|
||||
#include "util/random.h"
|
||||
@ -87,8 +88,21 @@ IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) {
|
||||
if (!fs_->IsFilesystemActive()) {
|
||||
return fs_->GetError();
|
||||
}
|
||||
{
|
||||
IOStatus in_s = fs_->InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
fs_->SyncDir(dirname_);
|
||||
return dir_->Fsync(options, dbg);
|
||||
IOStatus s = dir_->Fsync(options, dbg);
|
||||
{
|
||||
IOStatus in_s = fs_->InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
TestFSWritableFile::TestFSWritableFile(const std::string& fname,
|
||||
@ -159,6 +173,12 @@ IOStatus TestFSWritableFile::Close(const IOOptions& options,
|
||||
if (!fs_->IsFilesystemActive()) {
|
||||
return fs_->GetError();
|
||||
}
|
||||
{
|
||||
IOStatus in_s = fs_->InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
writable_file_opened_ = false;
|
||||
IOStatus io_s;
|
||||
io_s = target_->Append(state_.buffer_, options, dbg);
|
||||
@ -170,6 +190,10 @@ IOStatus TestFSWritableFile::Close(const IOOptions& options,
|
||||
}
|
||||
if (io_s.ok()) {
|
||||
fs_->WritableFileClosed(state_);
|
||||
IOStatus in_s = fs_->InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
return io_s;
|
||||
}
|
||||
@ -294,6 +318,12 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
|
||||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
if (IsFilesystemDirectWritable()) {
|
||||
return target()->NewWritableFile(fname, file_opts, result, dbg);
|
||||
}
|
||||
@ -305,11 +335,19 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
|
||||
// WritableFileWriter* file is opened
|
||||
// again then it will be truncated - so forget our saved state.
|
||||
UntrackFile(fname);
|
||||
MutexLock l(&mutex_);
|
||||
open_files_.insert(fname);
|
||||
auto dir_and_name = TestFSGetDirAndName(fname);
|
||||
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
|
||||
list.insert(dir_and_name.second);
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
open_files_.insert(fname);
|
||||
auto dir_and_name = TestFSGetDirAndName(fname);
|
||||
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
|
||||
list.insert(dir_and_name.second);
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
return io_s;
|
||||
}
|
||||
@ -323,6 +361,12 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
|
||||
if (IsFilesystemDirectWritable()) {
|
||||
return target()->ReopenWritableFile(fname, file_opts, result, dbg);
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
IOStatus io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
|
||||
if (io_s.ok()) {
|
||||
result->reset(
|
||||
@ -330,11 +374,19 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
|
||||
// WritableFileWriter* file is opened
|
||||
// again then it will be truncated - so forget our saved state.
|
||||
UntrackFile(fname);
|
||||
MutexLock l(&mutex_);
|
||||
open_files_.insert(fname);
|
||||
auto dir_and_name = TestFSGetDirAndName(fname);
|
||||
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
|
||||
list.insert(dir_and_name.second);
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
open_files_.insert(fname);
|
||||
auto dir_and_name = TestFSGetDirAndName(fname);
|
||||
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
|
||||
list.insert(dir_and_name.second);
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
return io_s;
|
||||
}
|
||||
@ -348,17 +400,31 @@ IOStatus FaultInjectionTestFS::NewRandomRWFile(
|
||||
if (IsFilesystemDirectWritable()) {
|
||||
return target()->NewRandomRWFile(fname, file_opts, result, dbg);
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
IOStatus io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg);
|
||||
if (io_s.ok()) {
|
||||
result->reset(new TestFSRandomRWFile(fname, std::move(*result), this));
|
||||
// WritableFileWriter* file is opened
|
||||
// again then it will be truncated - so forget our saved state.
|
||||
UntrackFile(fname);
|
||||
MutexLock l(&mutex_);
|
||||
open_files_.insert(fname);
|
||||
auto dir_and_name = TestFSGetDirAndName(fname);
|
||||
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
|
||||
list.insert(dir_and_name.second);
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
open_files_.insert(fname);
|
||||
auto dir_and_name = TestFSGetDirAndName(fname);
|
||||
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
|
||||
list.insert(dir_and_name.second);
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
return io_s;
|
||||
}
|
||||
@ -385,9 +451,21 @@ IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
|
||||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
IOStatus io_s = FileSystemWrapper::DeleteFile(f, options, dbg);
|
||||
if (io_s.ok()) {
|
||||
UntrackFile(f);
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
return io_s;
|
||||
}
|
||||
@ -399,21 +477,33 @@ IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
|
||||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
{
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
IOStatus io_s = FileSystemWrapper::RenameFile(s, t, options, dbg);
|
||||
|
||||
if (io_s.ok()) {
|
||||
MutexLock l(&mutex_);
|
||||
if (db_file_state_.find(s) != db_file_state_.end()) {
|
||||
db_file_state_[t] = db_file_state_[s];
|
||||
db_file_state_.erase(s);
|
||||
}
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
if (db_file_state_.find(s) != db_file_state_.end()) {
|
||||
db_file_state_[t] = db_file_state_[s];
|
||||
db_file_state_.erase(s);
|
||||
}
|
||||
|
||||
auto sdn = TestFSGetDirAndName(s);
|
||||
auto tdn = TestFSGetDirAndName(t);
|
||||
if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
|
||||
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
|
||||
assert(tlist.find(tdn.second) == tlist.end());
|
||||
tlist.insert(tdn.second);
|
||||
auto sdn = TestFSGetDirAndName(s);
|
||||
auto tdn = TestFSGetDirAndName(t);
|
||||
if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
|
||||
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
|
||||
assert(tlist.find(tdn.second) == tlist.end());
|
||||
tlist.insert(tdn.second);
|
||||
}
|
||||
}
|
||||
IOStatus in_s = InjectMetadataWriteError();
|
||||
if (!in_s.ok()) {
|
||||
return in_s;
|
||||
}
|
||||
}
|
||||
|
||||
@ -618,6 +708,19 @@ IOStatus FaultInjectionTestFS::InjectWriteError(const std::string& file_name) {
|
||||
return IOStatus::OK();
|
||||
}
|
||||
|
||||
IOStatus FaultInjectionTestFS::InjectMetadataWriteError() {
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
if (!enable_metadata_write_error_injection_ ||
|
||||
!metadata_write_error_one_in_ ||
|
||||
!write_error_rand_.OneIn(metadata_write_error_one_in_)) {
|
||||
return IOStatus::OK();
|
||||
}
|
||||
}
|
||||
TEST_SYNC_POINT("FaultInjectionTestFS::InjectMetadataWriteError:Injected");
|
||||
return IOStatus::IOError();
|
||||
}
|
||||
|
||||
void FaultInjectionTestFS::PrintFaultBacktrace() {
|
||||
#if defined(OS_LINUX)
|
||||
ErrorContext* ctx =
|
||||
|
@ -22,7 +22,7 @@
|
||||
#include <string>
|
||||
|
||||
#include "file/filename.h"
|
||||
#include "include/rocksdb/file_system.h"
|
||||
#include "rocksdb/file_system.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
#include "util/thread_local.h"
|
||||
@ -174,7 +174,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
||||
filesystem_writable_(false),
|
||||
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)),
|
||||
enable_write_error_injection_(false),
|
||||
enable_metadata_write_error_injection_(false),
|
||||
write_error_rand_(0),
|
||||
write_error_one_in_(0),
|
||||
metadata_write_error_one_in_(0),
|
||||
ingest_data_corruption_before_write_(false) {}
|
||||
virtual ~FaultInjectionTestFS() { error_.PermitUncheckedError(); }
|
||||
|
||||
@ -361,10 +364,18 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
||||
write_error_allowed_types_ = types;
|
||||
}
|
||||
|
||||
void SetRandomMetadataWriteError(int one_in) {
|
||||
MutexLock l(&mutex_);
|
||||
metadata_write_error_one_in_ = one_in;
|
||||
}
|
||||
|
||||
// Inject an write error with randomlized parameter and the predefined
|
||||
// error type. Only the allowed file types will inject the write error
|
||||
IOStatus InjectWriteError(const std::string& file_name);
|
||||
|
||||
// Ingest error to metadata operations.
|
||||
IOStatus InjectMetadataWriteError();
|
||||
|
||||
// Inject an error. For a READ operation, a status of IOError(), a
|
||||
// corruption in the contents of scratch, or truncation of slice
|
||||
// are the types of error with equal probability. For OPEN,
|
||||
@ -397,6 +408,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
||||
enable_write_error_injection_ = true;
|
||||
}
|
||||
|
||||
void EnableMetadataWriteErrorInjection() {
|
||||
MutexLock l(&mutex_);
|
||||
enable_metadata_write_error_injection_ = true;
|
||||
}
|
||||
|
||||
void DisableWriteErrorInjection() {
|
||||
MutexLock l(&mutex_);
|
||||
enable_write_error_injection_ = false;
|
||||
@ -410,6 +426,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
void DisableMetadataWriteErrorInjection() {
|
||||
MutexLock l(&mutex_);
|
||||
enable_metadata_write_error_injection_ = false;
|
||||
}
|
||||
|
||||
// We capture a backtrace every time a fault is injected, for debugging
|
||||
// purposes. This call prints the backtrace to stderr and frees the
|
||||
// saved callstack
|
||||
@ -456,8 +477,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
||||
|
||||
std::unique_ptr<ThreadLocalPtr> thread_local_error_;
|
||||
bool enable_write_error_injection_;
|
||||
bool enable_metadata_write_error_injection_;
|
||||
Random write_error_rand_;
|
||||
int write_error_one_in_;
|
||||
int metadata_write_error_one_in_;
|
||||
std::vector<FileType> write_error_allowed_types_;
|
||||
bool ingest_data_corruption_before_write_;
|
||||
ChecksumType checksum_handoff_func_tpye_;
|
||||
|
@ -145,8 +145,10 @@ TEST_F(MemoryTest, MemTableAndTableReadersTotal) {
|
||||
std::vector<uint64_t> usage_by_type;
|
||||
std::vector<std::vector<ColumnFamilyHandle*>> vec_handles;
|
||||
const int kNumDBs = 10;
|
||||
// These key/value sizes ensure each KV has its own memtable. Note that the
|
||||
// minimum write_buffer_size allowed is 64 KB.
|
||||
const int kKeySize = 100;
|
||||
const int kValueSize = 500;
|
||||
const int kValueSize = 1 << 16;
|
||||
Options opt;
|
||||
opt.create_if_missing = true;
|
||||
opt.create_missing_column_families = true;
|
||||
|
Loading…
Reference in New Issue
Block a user