Compare commits

...

15 Commits

Author SHA1 Message Date
anand76
822e08eeab Update HISTORY.md and bump version to 6.20.4 2021-05-19 12:22:55 -07:00
anand76
9177a0673b Sync ingested files only if reopen is supported by the FS (#8296)
Summary:
Some file systems (especially distributed FS) do not support reopening a file for writing. The ExternalSstFileIngestionJob calls ReopenWritableFile in order to sync the ingested file, which typically makes sense only on a local file system with a page cache (i.e Posix). So this change tries to sync the ingested file only if ReopenWritableFile doesn't return Status::NotSupported().

Tests:
Add a new unit test in external_sst_file_basic_test

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8296

Reviewed By: jay-zhuang

Differential Revision: D28420865

Pulled By: anand1976

fbshipit-source-id: 380e7f5ff95324997f7a59864a9ac96ebbd0100c
2021-05-19 12:12:05 -07:00
Andrew Kryczka
8608d75d85 Update HISTORY.md and bump version for 6.20.3 2021-05-05 13:35:30 -07:00
Andrew Kryczka
75c83c5b61 Fix GetLiveFiles() returning OPTIONS-000000 (#8268)
Summary:
See release note in HISTORY.md.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8268

Test Plan: unit test repro

Reviewed By: siying

Differential Revision: D28227901

Pulled By: ajkr

fbshipit-source-id: faf61d13b9e43a761e3d5dcf8203923126b51339
2021-05-05 13:34:08 -07:00
sdong
939ffdc206 db_stress to add --open_metadata_write_fault_one_in (#8235)
Summary:
DB Stress to add --open_metadata_write_fault_one_in which would randomly fail in some file metadata modification operations during DB Open, including file creation, close, renaming and directory sync. Some operations can fail before and after the operations take place.
If DB open fails, db_stress would retry without the failure ingestion, and DB is expected to open successfully.
This option is enabled in crash test in half of the time.
Some follow up changes would allow write failures in open time, and ingesting those failures in non-DB open cases.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8235

Test Plan: Run stress tests for a while and see failures got triggered. This can reproduce the bug fixed by https://github.com/facebook/rocksdb/pull/8192 and a similar one that fails when fsyncing parent directory.

Reviewed By: anand1976

Differential Revision: D28010944

fbshipit-source-id: 36a96da4dc3633e5f7680cef3ea0a900fcdb5558
2021-05-05 13:30:04 -07:00
Zhichao Cao
c56ad3c60a Update HISTORY.md and bump version for 6.20.2 2021-04-23 17:02:08 -07:00
Zhichao Cao
f9c6a87d18 make format 2021-04-23 16:58:38 -07:00
Zhichao Cao
8bd665331a Fix the false positive alert of CF consistency check in WAL recovery (#8207)
Summary:
In current RocksDB, in recover the information form WAL, we do the consistency check for each column family when one WAL file is corrupted and PointInTimeRecovery is set. However, it will report a false positive alert on "SST file is ahead of WALs" when one of the CF current log number is greater than the corrupted WAL number (CF contains the data beyond the corrupted WAl) due to a new column family creation during flush. In this case, a new WAL is created (it is empty) during a flush. Also, due to some reason (e.g., storage issue or crash happens before SyncCloseLog is called), the old WAL is corrupted. The new CF has no data, therefore, it does not have the consistency issue.

Fix: when checking cfd->GetLogNumber() > corrupted_wal_number also check cfd->GetLiveSstFilesSize() > 0. So the CFs with no SST file data will skip the check here.

Note potential ignored inconsistency caused due to fix: empty CF can also be caused by write+delete. In this case, after flush, there is no SST files being generated. However, this CF still have the log in the WAL. When the WAL is corrupted, the DB might be inconsistent.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8207

Test Plan: added unit test, make crash_test

Reviewed By: riversand963

Differential Revision: D27898839

Pulled By: zhichao-cao

fbshipit-source-id: 931fc2d8b92dd00b4169bf84b94e712fd688a83e
2021-04-23 16:42:20 -07:00
Andrew Kryczka
9da3585891 Fix seqno in ingested file boundary key metadata (#8209)
Summary:
Fixes https://github.com/facebook/rocksdb/issues/6245.

Adapted from https://github.com/facebook/rocksdb/issues/8201 and https://github.com/facebook/rocksdb/issues/8205.

Previously we were writing the ingested file's smallest/largest internal keys
with sequence number zero, or `kMaxSequenceNumber` in case of range
tombstone. The former (sequence number zero) is incorrect and can lead
to files being incorrectly ordered. The fix in this PR is to overwrite
boundary keys that have sequence number zero with the ingested file's assigned
sequence number.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8209

Test Plan: repro unit test

Reviewed By: riversand963

Differential Revision: D27885678

Pulled By: ajkr

fbshipit-source-id: 4a9f2c6efdfff81c3a9923e915ea88b250ee7b6a
2021-04-23 16:41:51 -07:00
Peter Dillinger
d21b2a9699 Revert Ribbon starting level support from #8198 (#8212)
Summary:
This partially reverts commit 10196d7edc.

The problem with this change is because of important filter use cases:
FIFO compaction and SST writer. FIFO "compaction" always uses level 0 so
would only use Ribbon filters if specifically including level 0 for the
Ribbon filter policy. SST writer sets level_at_creation=-1 to indicate
unknown level, and this would be treated the same as level 0 unless
fixed.

We are keeping the part about committing to permanent schema, which is
only changes to API comments and HISTORY.md.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8212

Test Plan: CI

Reviewed By: jay-zhuang

Differential Revision: D27896468

Pulled By: pdillinger

fbshipit-source-id: 50a775f7cba5d64fb729d9b982e355864020596e
2021-04-20 20:59:01 -07:00
Andrew Gallagher
43aee72181 Cleanup include (#8208)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8208

Make include of "file_system.h" use the same include path as everywhere
else.

Reviewed By: riversand963, akankshamahajan15

Differential Revision: D27881606

fbshipit-source-id: fc1e076229fde21041a813c655ce017b5070c8b3
2021-04-20 15:52:26 -07:00
Levi Tamasi
8956288860 Mention PR 8206 in HISTORY.md (#8210)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8210

Reviewed By: akankshamahajan15

Differential Revision: D27887612

Pulled By: ltamasi

fbshipit-source-id: 0db8d0b6047334dc47fe30a98804449043454386
2021-04-20 12:11:28 -07:00
Levi Tamasi
f2228962c5 Fix a data race related to DB properties (#8206)
Summary:
Historically, the DB properties `rocksdb.cur-size-active-mem-table`,
`rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables` called
the method `MemTable::ApproximateMemoryUsage` for mutable memtables,
which is not safe without synchronization. This resulted in data races with
memtable inserts. The patch changes the code handling these properties
to use `MemTable::ApproximateMemoryUsageFast` instead, which returns a
cached value backed by an atomic variable. Two test cases had to be updated
for this change. `MemoryTest.MemTableAndTableReadersTotal` was fixed by
increasing the value size used so each value ends up in its own memtable,
which was the original intention (note: the test has been broken in the sense
that the test code didn't consider that memtable sizes below 64 KB get
increased to 64 KB by `SanitizeOptions`, and has been passing only by
accident). `DBTest.MemoryUsageWithMaxWriteBufferSizeToMaintain` relies on
completely up-to-date values and thus was changed to use `ApproximateMemoryUsage`
directly instead of going through the DB properties. Note: this should be safe in this case
since there's only a single thread involved.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8206

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D27866811

Pulled By: ltamasi

fbshipit-source-id: 7bd754d0565e0a65f1f7f0e78ffc093beef79394
2021-04-20 10:17:49 -07:00
Yanqin Jin
eef93446a3 Update HISTORY and bump version 2021-04-19 20:30:06 -07:00
Yanqin Jin
51ebe09f9a Handle rename() failure in non-local FS (#8192)
Summary:
In a distributed environment, a file `rename()` operation can succeed on server (remote)
side, but the client can somehow return non-ok status to RocksDB. Possible reasons include
network partition, connection issue, etc. This happens in `rocksdb::SetCurrentFile()`, which
can be called in `LogAndApply() -> ProcessManifestWrites()` if RocksDB tries to switch to a
new MANIFEST. We currently always delete the new MANIFEST if an error occurs.

This is problematic in distributed world. If the server-side successfully updates the CURRENT
file via renaming, then a subsequent `DB::Open()` will try to look for the new MANIFEST and fail.

As a fix, we can track the execution result of IO operations on the new MANIFEST.
- If IO operations on the new MANIFEST fail, then we know the CURRENT must point to the original
  MANIFEST. Therefore, it is safe to remove the new MANIFEST.
- If IO operations on the new MANIFEST all succeed, but somehow we end up in the clean up
  code block, then we do not know whether CURRENT points to the new or old MANIFEST. (For local
  POSIX-compliant FS, it should still point to old MANIFEST, but it does not matter if we keep the
  new MANIFEST.) Therefore, we keep the new MANIFEST.
    - Any future `LogAndApply()` will switch to a new MANIFEST and update CURRENT.
    - If process reopens the db immediately after the failure, then the CURRENT file can point
      to either the new MANIFEST or the old one, both of which exist. Therefore, recovery can
      succeed and ignore the other.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8192

Test Plan: make check

Reviewed By: zhichao-cao

Differential Revision: D27804648

Pulled By: riversand963

fbshipit-source-id: 9c16f2a5ce41bc6aadf085e48449b19ede8423e4
2021-04-19 20:29:21 -07:00
33 changed files with 712 additions and 263 deletions

View File

@ -1,4 +1,22 @@
# Rocksdb Change Log
## 6.20.3 (05/05/2021)
### Bug Fixes
* In the IngestExternalFile() API, only try to sync the ingested file if the file is linked and the FileSystem/Env supports reopening a writable file.
## 6.20.3 (05/05/2021)
### Bug Fixes
* Fixed a bug where `GetLiveFiles()` output included a non-existent file called "OPTIONS-000000". Backups and checkpoints, which use `GetLiveFiles()`, failed on DBs impacted by this bug. Read-write DBs were impacted when the latest OPTIONS file failed to write and `fail_if_options_file_error == false`. Read-only DBs were impacted when no OPTIONS files existed.
## 6.20.2 (04/23/2021)
### Bug Fixes
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
* Fixed a bug where ingested files were written with incorrect boundary key metadata. In rare cases this could have led to a level's files being wrongly ordered and queries for the boundary keys returning wrong results.
* Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`.
* Fixed the false-positive alert when recovering from the WAL file. Avoid reporting "SST file is ahead of WAL" on a newly created empty column family, if the previous WAL file is corrupted.
### Behavior Changes
* Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status.
## 6.20.0 (04/16/2021)
### Behavior Changes
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.
@ -13,6 +31,8 @@
* Fixed crash (divide by zero) when compression dictionary is applied to a file containing only range tombstones.
* Fixed a backward iteration bug with partitioned filter enabled: not including the prefix of the last key of the previous filter partition in current filter partition can cause wrong iteration result.
* Fixed a bug that allowed `DBOptions::max_open_files` to be set with a non-negative integer with `ColumnFamilyOptions::compaction_style = kCompactionStyleFIFO`.
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
* Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`.
### Performance Improvements
* On ARM platform, use `yield` instead of `wfe` to relax cpu to gain better performance.
@ -29,7 +49,6 @@
* Added an optional output parameter to BackupEngine::CreateNewBackup(WithMetadata) to return the BackupID of the new backup.
* Added BackupEngine::GetBackupInfo / GetLatestBackupInfo for querying individual backups.
* Made the Ribbon filter a long-term supported feature in terms of the SST schema(compatible with version >= 6.15.0) though the API for enabling it is expected to change.
* Added hybrid configuration of Ribbon filter and Bloom filter where some LSM levels use Ribbon for memory space efficiency and some use Bloom for speed. See NewExperimentalRibbonFilterPolicy. This also changes the default behavior of NewExperimentalRibbonFilterPolicy to use Bloom on level 0 and Ribbon on later levels.
## 6.19.0 (03/21/2021)
### Bug Fixes

View File

@ -98,7 +98,14 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
ret.emplace_back(CurrentFileName(""));
ret.emplace_back(DescriptorFileName("", versions_->manifest_file_number()));
ret.emplace_back(OptionsFileName("", versions_->options_file_number()));
// The OPTIONS file number is zero in read-write mode when OPTIONS file
// writing failed and the DB was configured with
// `fail_if_options_file_error == false`. In read-only mode the OPTIONS file
// number is zero when no OPTIONS file exist at all. In those cases we do not
// record any OPTIONS file in the live file list.
if (versions_->options_file_number() != 0) {
ret.emplace_back(OptionsFileName("", versions_->options_file_number()));
}
// find length of manifest file while holding the mutex lock
*manifest_file_size = versions_->manifest_file_size();

View File

@ -2582,6 +2582,8 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
{
InstrumentedMutexLock l(&mutex_);
assert(bg_flush_scheduled_);

View File

@ -943,7 +943,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
return s;
}
if (largest_file_number > next_file_number) {
if (largest_file_number >= next_file_number) {
versions_->next_file_number_.store(largest_file_number + 1);
}

View File

@ -285,6 +285,9 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
const std::string manifest = DescriptorFileName(dbname_, 1);
{
if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
}
std::unique_ptr<FSWritableFile> file;
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
@ -314,7 +317,7 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
manifest.substr(manifest.find_last_of("/\\") + 1));
}
} else {
fs_->DeleteFile(manifest, IOOptions(), nullptr);
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
}
return s;
}
@ -1134,11 +1137,29 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->GetLogNumber() > corrupted_wal_number) {
// One special case cause cfd->GetLogNumber() > corrupted_wal_number but
// the CF is still consistent: If a new column family is created during
// the flush and the WAL sync fails at the same time, the new CF points to
// the new WAL but the old WAL is curropted. Since the new CF is empty, it
// is still consistent. We add the check of CF sst file size to avoid the
// false positive alert.
// Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
// the ignorance of a very rare inconsistency case caused in data
// canclation. One CF is empty due to KV deletion. But those operations
// are in the WAL. If the WAL is corrupted, the status of this CF might
// not be consistent with others. However, the consistency check will be
// bypassed due to empty CF.
// TODO: a better and complete implementation is needed to ensure strict
// consistency check in WAL recovery including hanlding the tailing
// issues.
if (cfd->GetLogNumber() > corrupted_wal_number &&
cfd->GetLiveSstFilesSize() > 0) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Column family inconsistency: SST file contains data"
" beyond the point of corruption.");
return Status::Corruption("SST file is ahead of WALs");
return Status::Corruption("SST file is ahead of WALs in CF " +
cfd->GetName());
}
}
}

View File

@ -6701,20 +6701,19 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) {
Reopen(options);
Random rnd(301);
bool memory_limit_exceeded = false;
uint64_t size_all_mem_table = 0;
uint64_t cur_active_mem = 0;
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
for (int i = 0; i < 1000; i++) {
std::string value = rnd.RandomString(1000);
ASSERT_OK(Put("keykey_" + std::to_string(i), value));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_TRUE(db_->GetIntProperty(db_->DefaultColumnFamily(),
DB::Properties::kSizeAllMemTables,
&size_all_mem_table));
ASSERT_TRUE(db_->GetIntProperty(db_->DefaultColumnFamily(),
DB::Properties::kCurSizeActiveMemTable,
&cur_active_mem));
const uint64_t cur_active_mem = cfd->mem()->ApproximateMemoryUsage();
const uint64_t size_all_mem_table =
cur_active_mem + cfd->imm()->ApproximateMemoryUsage();
// Errors out if memory usage keeps on increasing beyond the limit.
// Once memory limit exceeds, memory_limit_exceeded is set and if

View File

@ -5439,6 +5439,98 @@ TEST_F(DBTest2, AutoPrefixMode1) {
ASSERT_EQ("a1", iterator->key().ToString());
}
}
class RenameCurrentTest : public DBTestBase,
public testing::WithParamInterface<std::string> {
public:
RenameCurrentTest()
: DBTestBase("rename_current_test", /*env_do_fsync=*/true),
sync_point_(GetParam()) {}
~RenameCurrentTest() override {}
void SetUp() override {
env_->no_file_overwrite_.store(true, std::memory_order_release);
}
void TearDown() override {
env_->no_file_overwrite_.store(false, std::memory_order_release);
}
void SetupSyncPoints() {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
Status* s = reinterpret_cast<Status*>(arg);
assert(s);
*s = Status::IOError("Injected IO error.");
});
}
const std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
::testing::Values("SetCurrentFile:BeforeRename",
"SetCurrentFile:AfterRename"));
TEST_P(RenameCurrentTest, Open) {
Destroy(last_options_);
Options options = GetDefaultOptions();
options.create_if_missing = true;
SetupSyncPoints();
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
ASSERT_NOK(s);
SyncPoint::GetInstance()->DisableProcessing();
Reopen(options);
}
TEST_P(RenameCurrentTest, Flush) {
Destroy(last_options_);
Options options = GetDefaultOptions();
options.max_manifest_file_size = 1;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(Put("key", "value"));
SetupSyncPoints();
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(Flush());
ASSERT_NOK(Put("foo", "value"));
SyncPoint::GetInstance()->DisableProcessing();
Reopen(options);
ASSERT_EQ("value", Get("key"));
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
TEST_P(RenameCurrentTest, Compaction) {
Destroy(last_options_);
Options options = GetDefaultOptions();
options.max_manifest_file_size = 1;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(Put("a", "a_value"));
ASSERT_OK(Put("c", "c_value"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "b_value"));
ASSERT_OK(Put("d", "d_value"));
ASSERT_OK(Flush());
SetupSyncPoints();
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
ASSERT_NOK(Put("foo", "value"));
SyncPoint::GetInstance()->DisableProcessing();
Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("foo"));
ASSERT_EQ("d_value", Get("d"));
}
#endif // ROCKSDB_LITE
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
@ -5466,6 +5558,35 @@ TEST_F(DBTest2, PointInTimeRecoveryWithIOErrorWhileReadingWal) {
Status s = TryReopen(options);
ASSERT_TRUE(s.IsIOError());
}
TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCallFlush:Start:1",
"PointInTimeRecoveryWithSyncFailureInCFCreation:1"},
{"PointInTimeRecoveryWithSyncFailureInCFCreation:2",
"DBImpl::BackgroundCallFlush:Start:2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
CreateColumnFamilies({"test1"}, Options());
ASSERT_OK(Put("foo", "bar"));
// Creating a CF when a flush is going on, log is synced but the
// closed log file is not synced and corrupted.
port::Thread flush_thread([&]() { ASSERT_NOK(Flush()); });
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:1");
CreateColumnFamilies({"test2"}, Options());
env_->corrupt_in_sync_ = true;
TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:2");
flush_thread.join();
env_->corrupt_in_sync_ = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
// Reopening the DB should not corrupt anything
Options options = CurrentOptions();
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

View File

@ -44,6 +44,7 @@ SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep)
manifest_sync_error_.store(false, std::memory_order_release);
manifest_write_error_.store(false, std::memory_order_release);
log_write_error_.store(false, std::memory_order_release);
no_file_overwrite_.store(false, std::memory_order_release);
random_file_open_counter_.store(0, std::memory_order_relaxed);
delete_count_.store(0, std::memory_order_relaxed);
num_open_wal_file_.store(0);

View File

@ -393,6 +393,10 @@ class SpecialEnv : public EnvWrapper {
Status Flush() override { return base_->Flush(); }
Status Sync() override {
++env_->sync_counter_;
if (env_->corrupt_in_sync_) {
Append(std::string(33000, ' '));
return Status::IOError("Ingested Sync Failure");
}
if (env_->skip_fsync_) {
return Status::OK();
} else {
@ -440,6 +444,11 @@ class SpecialEnv : public EnvWrapper {
std::unique_ptr<WritableFile> base_;
};
if (no_file_overwrite_.load(std::memory_order_acquire) &&
target()->FileExists(f).ok()) {
return Status::NotSupported("SpecialEnv::no_file_overwrite_ is true.");
}
if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
uint32_t random_number;
{
@ -687,6 +696,9 @@ class SpecialEnv : public EnvWrapper {
// Slow down every log write, in micro-seconds.
std::atomic<int> log_write_slowdown_;
// If true, returns Status::NotSupported for file overwrite.
std::atomic<bool> no_file_overwrite_;
// Number of WAL files that are still open for write.
std::atomic<int> num_open_wal_file_;
@ -709,6 +721,9 @@ class SpecialEnv : public EnvWrapper {
// If true, all fsync to files and directories are skipped.
bool skip_fsync_ = false;
// If true, ingest the corruption to file during sync.
bool corrupt_in_sync_ = false;
std::atomic<uint32_t> non_writeable_rate_;
std::atomic<uint32_t> new_writable_count_;

View File

@ -1136,6 +1136,41 @@ TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
}
}
TEST_F(ExternalSSTFileBasicTest, ReopenNotSupported) {
Options options;
options.create_if_missing = true;
options.env = env_;
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Prepare:Reopen", [&](void* arg) {
Status* s = static_cast<Status*>(arg);
*s = Status::NotSupported();
});
SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Options sst_file_writer_options;
sst_file_writer_options.env = env_;
std::unique_ptr<SstFileWriter> sst_file_writer(
new SstFileWriter(EnvOptions(), sst_file_writer_options));
std::string file_name =
sst_files_dir_ + "reopen_not_supported_test_" + ".sst";
ASSERT_OK(sst_file_writer->Open(file_name));
ASSERT_OK(sst_file_writer->Put("bar", "v2"));
ASSERT_OK(sst_file_writer->Finish());
IngestExternalFileOptions ingest_opt;
ingest_opt.move_files = true;
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->IngestExternalFile({file_name}, ingest_opt));
db_->ReleaseSnapshot(snapshot);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Destroy(options);
}
TEST_F(ExternalSSTFileBasicTest, VerifyChecksumReadahead) {
Options options;
options.create_if_missing = true;
@ -1542,6 +1577,44 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
ASSERT_EQ(2, NumTableFilesAtLevel(0));
}
TEST_F(ExternalSSTFileBasicTest, IngestFileAfterDBPut) {
// Repro https://github.com/facebook/rocksdb/issues/6245.
// Flush three files to L0. Ingest one more file to trigger L0->L1 compaction
// via trivial move. The bug happened when L1 files were incorrectly sorted
// resulting in an old value for "k" returned by `Get()`.
Options options = CurrentOptions();
ASSERT_OK(Put("k", "a"));
Flush();
ASSERT_OK(Put("k", "a"));
Flush();
ASSERT_OK(Put("k", "a"));
Flush();
SstFileWriter sst_file_writer(EnvOptions(), options);
// Current file size should be 0 after sst_file_writer init and before open a
// file.
ASSERT_EQ(sst_file_writer.FileSize(), 0);
std::string file1 = sst_files_dir_ + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
ASSERT_OK(sst_file_writer.Put("k", "b"));
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_OK(s) << s.ToString();
// Current file size should be non-zero after success write.
ASSERT_GT(sst_file_writer.FileSize(), 0);
IngestExternalFileOptions ifo;
s = db_->IngestExternalFile({file1}, ifo);
ASSERT_OK(s);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(Get("k"), "b");
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),

View File

@ -109,17 +109,26 @@ Status ExternalSstFileIngestionJob::Prepare(
// directory before ingest the file. For integrity of RocksDB we need
// to sync the file.
std::unique_ptr<FSWritableFile> file_to_sync;
status = fs_->ReopenWritableFile(path_inside_db, env_options_,
&file_to_sync, nullptr);
if (status.ok()) {
TEST_SYNC_POINT(
"ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
status = SyncIngestedFile(file_to_sync.get());
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync ingested file %s: %s",
path_inside_db.c_str(), status.ToString().c_str());
Status s = fs_->ReopenWritableFile(path_inside_db, env_options_,
&file_to_sync, nullptr);
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen",
&s);
// Some file systems (especially remote/distributed) don't support
// reopening a file for writing and don't require reopening and
// syncing the file. Ignore the NotSupported error in that case.
if (!s.IsNotSupported()) {
status = s;
if (status.ok()) {
TEST_SYNC_POINT(
"ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
status = SyncIngestedFile(file_to_sync.get());
TEST_SYNC_POINT(
"ExternalSstFileIngestionJob::AfterSyncIngestedFile");
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync ingested file %s: %s",
path_inside_db.c_str(), status.ToString().c_str());
}
}
}
} else if (status.IsNotSupported() &&
@ -367,9 +376,32 @@ Status ExternalSstFileIngestionJob::Run() {
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
last_seqno, &f, &assigned_seqno);
}
// Modify the smallest/largest internal key to include the sequence number
// that we just learned. Only overwrite sequence number zero. There could
// be a nonzero sequence number already to indicate a range tombstone's
// exclusive endpoint.
ParsedInternalKey smallest_parsed, largest_parsed;
if (status.ok()) {
status = ParseInternalKey(*f.smallest_internal_key.rep(),
&smallest_parsed, false /* log_err_key */);
}
if (status.ok()) {
status = ParseInternalKey(*f.largest_internal_key.rep(), &largest_parsed,
false /* log_err_key */);
}
if (!status.ok()) {
return status;
}
if (smallest_parsed.sequence == 0) {
UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno,
smallest_parsed.type);
}
if (largest_parsed.sequence == 0) {
UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno,
largest_parsed.type);
}
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno);

View File

@ -751,21 +751,24 @@ bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/,
bool InternalStats::HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// Current size of the active memtable
*value = cfd_->mem()->ApproximateMemoryUsage();
// Using ApproximateMemoryUsageFast to avoid the need for synchronization
*value = cfd_->mem()->ApproximateMemoryUsageFast();
return true;
}
bool InternalStats::HandleCurSizeAllMemTables(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// Current size of the active memtable + immutable memtables
*value = cfd_->mem()->ApproximateMemoryUsage() +
// Using ApproximateMemoryUsageFast to avoid the need for synchronization
*value = cfd_->mem()->ApproximateMemoryUsageFast() +
cfd_->imm()->ApproximateUnflushedMemTablesMemoryUsage();
return true;
}
bool InternalStats::HandleSizeAllMemTables(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
*value = cfd_->mem()->ApproximateMemoryUsage() +
// Using ApproximateMemoryUsageFast to avoid the need for synchronization
*value = cfd_->mem()->ApproximateMemoryUsageFast() +
cfd_->imm()->ApproximateMemoryUsage();
return true;
}

View File

@ -4083,6 +4083,7 @@ Status VersionSet::ProcessManifestWrites(
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
IOStatus manifest_io_status;
{
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock();
@ -4134,6 +4135,7 @@ Status VersionSet::ProcessManifestWrites(
s = WriteCurrentStateToManifest(curr_state, wal_additions,
descriptor_log_.get(), io_s);
} else {
manifest_io_status = io_s;
s = io_s;
}
}
@ -4171,11 +4173,13 @@ Status VersionSet::ProcessManifestWrites(
io_s = descriptor_log_->AddRecord(record);
if (!io_s.ok()) {
s = io_s;
manifest_io_status = io_s;
break;
}
}
if (s.ok()) {
io_s = SyncManifest(db_options_, descriptor_log_->file());
manifest_io_status = io_s;
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
@ -4188,6 +4192,9 @@ Status VersionSet::ProcessManifestWrites(
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok()) {
assert(manifest_io_status.ok());
}
if (s.ok() && new_descriptor_log) {
io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
db_directory);
@ -4303,11 +4310,41 @@ Status VersionSet::ProcessManifestWrites(
for (auto v : versions) {
delete v;
}
if (manifest_io_status.ok()) {
manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size;
}
// If manifest append failed for whatever reason, the file could be
// corrupted. So we need to force the next version update to start a
// new manifest file.
descriptor_log_.reset();
if (new_descriptor_log) {
// If manifest operations failed, then we know the CURRENT file still
// points to the original MANIFEST. Therefore, we can safely delete the
// new MANIFEST.
// If manifest operations succeeded, and we are here, then it is possible
// that renaming tmp file to CURRENT failed.
//
// On local POSIX-compliant FS, the CURRENT must point to the original
// MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
// keep it. Future recovery will ignore this MANIFEST. It's also ok for the
// process not to crash and continue using the db. Any future LogAndApply()
// call will switch to a new MANIFEST and update CURRENT, still ignoring
// this one.
//
// On non-local FS, it is
// possible that the rename operation succeeded on the server (remote)
// side, but the client somehow returns a non-ok status to RocksDB. Note
// that this does not violate atomicity. Should we delete the new MANIFEST
// successfully, a subsequent recovery attempt will likely see the CURRENT
// pointing to the new MANIFEST, thus fail. We will not be able to open the
// DB again. Therefore, if manifest operations succeed, we should keep the
// the new MANIFEST. If the process proceeds, any future LogAndApply() call
// will switch to a new MANIFEST and update CURRENT. If user tries to
// re-open the DB,
// a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
// b) CURRENT points to the original MANIFEST, and the original MANIFEST
// also exists.
if (new_descriptor_log && !manifest_io_status.ok()) {
ROCKS_LOG_INFO(db_options_->info_log,
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
"\n",

View File

@ -144,7 +144,7 @@ DECLARE_bool(enable_write_thread_adaptive_yield);
DECLARE_int32(reopen);
DECLARE_double(bloom_bits);
DECLARE_bool(use_block_based_filter);
DECLARE_int32(ribbon_starting_level);
DECLARE_bool(use_ribbon_filter);
DECLARE_bool(partition_filters);
DECLARE_bool(optimize_filters_for_memory);
DECLARE_int32(index_type);
@ -258,6 +258,7 @@ DECLARE_bool(best_efforts_recovery);
DECLARE_bool(skip_verifydb);
DECLARE_bool(enable_compaction_filter);
DECLARE_bool(paranoid_file_checks);
DECLARE_bool(fail_if_options_file_error);
DECLARE_uint64(batch_protection_bytes_per_key);
DECLARE_uint64(user_timestamp_size);

View File

@ -28,7 +28,9 @@ class DbStressEnvWrapper : public EnvWrapper {
f.find(".restore") != std::string::npos) {
return target()->DeleteFile(f);
}
return Status::OK();
// Rename the file instead of deletion to keep the history, and
// at the same time it is not visible to RocksDB.
return target()->RenameFile(f, f + "_renamed_");
}
// If true, all manifest files will not be delted in DeleteFile().

View File

@ -410,8 +410,8 @@ DEFINE_bool(use_block_based_filter, false,
"use block based filter"
"instead of full filter for block based table");
DEFINE_int32(ribbon_starting_level, false,
"First level to use Ribbon filter instead of Bloom");
DEFINE_bool(use_ribbon_filter, false,
"Use Ribbon filter instead of Bloom filter");
DEFINE_bool(partition_filters, false,
"use partitioned filters "
@ -792,6 +792,10 @@ DEFINE_bool(paranoid_file_checks, true,
"After writing every SST file, reopen it and read all the keys "
"and validate checksums");
DEFINE_bool(fail_if_options_file_error, false,
"Fail operations that fail to detect or properly persist options "
"file.");
DEFINE_uint64(batch_protection_bytes_per_key, 0,
"If nonzero, enables integrity protection in `WriteBatch` at the "
"specified number of bytes per key. Currently the only supported "
@ -808,4 +812,8 @@ DEFINE_uint64(user_timestamp_size, 0,
"Number of bytes for a user-defined timestamp. Currently, only "
"8-byte is supported");
DEFINE_int32(open_metadata_write_fault_one_in, 0,
"On non-zero, enables fault injection on file metadata write "
"during DB reopen.");
#endif // GFLAGS

View File

@ -30,6 +30,7 @@ DECLARE_int32(compaction_thread_pool_adjust_interval);
DECLARE_int32(continuous_verification_interval);
DECLARE_int32(read_fault_one_in);
DECLARE_int32(write_fault_one_in);
DECLARE_int32(open_metadata_write_fault_one_in);
namespace ROCKSDB_NAMESPACE {
class StressTest;

View File

@ -26,12 +26,11 @@ StressTest::StressTest()
compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
filter_policy_(
FLAGS_bloom_bits >= 0
? FLAGS_ribbon_starting_level < FLAGS_num_levels
? NewExperimentalRibbonFilterPolicy(
FLAGS_bloom_bits, FLAGS_ribbon_starting_level)
: FLAGS_use_block_based_filter
? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
: NewBloomFilterPolicy(FLAGS_bloom_bits, false)
? FLAGS_use_ribbon_filter
? NewExperimentalRibbonFilterPolicy(FLAGS_bloom_bits)
: FLAGS_use_block_based_filter
? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
: NewBloomFilterPolicy(FLAGS_bloom_bits, false)
: nullptr),
db_(nullptr),
#ifndef ROCKSDB_LITE
@ -2105,9 +2104,14 @@ void StressTest::PrintEnv() const {
static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
fprintf(stdout, "Open metadata write fault one in:\n");
fprintf(stdout, " %d\n",
FLAGS_open_metadata_write_fault_one_in);
fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
fprintf(stdout, "Best efforts recovery : %d\n",
static_cast<int>(FLAGS_best_efforts_recovery));
fprintf(stdout, "Fail if OPTIONS file error: %d\n",
static_cast<int>(FLAGS_fail_if_options_file_error));
fprintf(stdout, "User timestamp size bytes : %d\n",
static_cast<int>(FLAGS_user_timestamp_size));
@ -2326,6 +2330,7 @@ void StressTest::Open() {
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
FLAGS_allow_setting_blob_options_dynamically) &&
@ -2410,33 +2415,78 @@ void StressTest::Open() {
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
#ifndef ROCKSDB_LITE
// StackableDB-based BlobDB
if (FLAGS_use_blob_db) {
blob_db::BlobDBOptions blob_db_options;
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
blob_db::BlobDB* blob_db = nullptr;
s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
cf_descriptors, &column_families_, &blob_db);
if (s.ok()) {
db_ = blob_db;
}
} else
#endif // !ROCKSDB_LITE
{
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
} else {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
}
#ifndef NDEBUG
// Determine whether we need to ingest file metadata write failures
// during DB reopen. If it does, enable it.
// Only ingest metadata error if it is reopening, as initial open
// failure doesn't need to be handled.
// TODO cover transaction DB is not covered in this fault test too.
bool ingest_meta_error =
FLAGS_open_metadata_write_fault_one_in &&
fault_fs_guard
->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
.ok();
if (ingest_meta_error) {
fault_fs_guard->EnableMetadataWriteErrorInjection();
fault_fs_guard->SetRandomMetadataWriteError(
FLAGS_open_metadata_write_fault_one_in);
}
while (true) {
#endif // NDEBUG
#ifndef ROCKSDB_LITE
// StackableDB-based BlobDB
if (FLAGS_use_blob_db) {
blob_db::BlobDBOptions blob_db_options;
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
blob_db::BlobDB* blob_db = nullptr;
s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
cf_descriptors, &column_families_,
&blob_db);
if (s.ok()) {
db_ = blob_db;
}
} else
#endif // !ROCKSDB_LITE
{
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
} else {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
}
}
#ifndef NDEBUG
if (ingest_meta_error) {
fault_fs_guard->DisableMetadataWriteErrorInjection();
if (!s.ok()) {
// After failure to opening a DB due to IO error, retry should
// successfully open the DB with correct data if no IO error shows
// up.
ingest_meta_error = false;
Random rand(static_cast<uint32_t>(FLAGS_seed));
if (rand.OneIn(2)) {
fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
nullptr);
}
if (rand.OneIn(3)) {
fault_fs_guard->DropUnsyncedFileData();
} else if (rand.OneIn(2)) {
fault_fs_guard->DropRandomUnsyncedFileData(&rand);
}
continue;
}
}
break;
}
#endif // NDEBUG
} else {
#ifndef ROCKSDB_LITE
TransactionDBOptions txn_db_options;

View File

@ -98,7 +98,7 @@ int db_stress_tool(int argc, char** argv) {
#ifndef NDEBUG
if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection ||
FLAGS_write_fault_one_in) {
FLAGS_write_fault_one_in || FLAGS_open_metadata_write_fault_one_in) {
FaultInjectionTestFS* fs =
new FaultInjectionTestFS(raw_env->GetFileSystem());
fault_fs_guard.reset(fs);

View File

@ -383,10 +383,12 @@ IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number);
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:BeforeRename", &s);
if (s.ok()) {
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:AfterRename", &s);
}
if (s.ok()) {
if (directory_to_fsync != nullptr) {

View File

@ -217,7 +217,7 @@ extern const FilterPolicy* NewBloomFilterPolicy(
double bits_per_key, bool use_block_based_builder = false);
// An new Bloom alternative that saves about 30% space compared to
// Bloom filters, with about 3-4x construction time and similar
// Bloom filters, with about 3-4x construction CPU time and similar
// query times. For example, if you pass in 10 for
// bloom_equivalent_bits_per_key, you'll get the same 0.95% FP rate
// as Bloom filter but only using about 7 bits per key. (This
@ -225,24 +225,16 @@ extern const FilterPolicy* NewBloomFilterPolicy(
// and/or transitional, so is expected to be replaced with a new API.
// The constructed filters will be given long-term support.)
//
// The space savings of Ribbon filters makes sense for lower (higher
// numbered; larger; longer-lived) levels of LSM, whereas the speed of
// Bloom filters make sense for highest levels of LSM. Setting
// ribbon_starting_level allows for this design. For example,
// ribbon_starting_level=1 means that Bloom filters will be used in
// level 0, including flushes, and Ribbon filters elsewhere.
// ribbon_starting_level=0 means (almost) always use Ribbon.
//
// Ribbon filters are compatible with RocksDB >= 6.15.0. Earlier
// versions reading the data will behave as if no filter was used
// (degraded performance until compaction rebuilds filters).
//
// Note: even with ribbon_starting_level=0, this policy can generate
// Bloom filters in some cases. For very small filters (well under 1KB),
// Bloom fallback is by design, as the current Ribbon schema is not
// optimized to save vs. Bloom for such small filters. Other cases of
// Bloom fallback should be exceptional and log an appropriate warning.
// Note: this policy can generate Bloom filters in some cases.
// For very small filters (well under 1KB), Bloom fallback is by
// design, as the current Ribbon schema is not optimized to save vs.
// Bloom for such small filters. Other cases of Bloom fallback should
// be exceptional and log an appropriate warning.
extern const FilterPolicy* NewExperimentalRibbonFilterPolicy(
double bloom_equivalent_bits_per_key, int ribbon_starting_level = 1);
double bloom_equivalent_bits_per_key);
} // namespace ROCKSDB_NAMESPACE

View File

@ -11,7 +11,7 @@
#define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 20
#define ROCKSDB_PATCH 0
#define ROCKSDB_PATCH 4
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -1479,8 +1479,8 @@ public class RocksDBTest {
assertThat(livefiles.manifestFileSize).isEqualTo(57);
assertThat(livefiles.files.size()).isEqualTo(3);
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000003");
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000006");
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007");
}
}
}

View File

@ -940,15 +940,6 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
&new_opt));
ASSERT_TRUE(new_opt.filter_policy != nullptr);
bfp = dynamic_cast<const BloomFilterPolicy*>(new_opt.filter_policy.get());
// Not a BloomFilterPolicy
EXPECT_FALSE(bfp);
ASSERT_OK(GetBlockBasedTableOptionsFromString(
config_options, table_opt, "filter_policy=experimental_ribbon:5.678:0;",
&new_opt));
ASSERT_TRUE(new_opt.filter_policy != nullptr);
bfp = dynamic_cast<const BloomFilterPolicy*>(new_opt.filter_policy.get());
// Pure Ribbon configuration is (oddly) BloomFilterPolicy
EXPECT_EQ(bfp->GetMillibitsPerKey(), 5678);
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kStandard128Ribbon);

View File

@ -23,7 +23,6 @@
#include "util/hash.h"
#include "util/ribbon_config.h"
#include "util/ribbon_impl.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
@ -1055,7 +1054,7 @@ BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
BloomFilterPolicy::~BloomFilterPolicy() {}
const char* BuiltinFilterPolicy::Name() const {
const char* BloomFilterPolicy::Name() const {
return "rocksdb.BuiltinBloomFilter";
}
@ -1088,8 +1087,8 @@ void BloomFilterPolicy::CreateFilter(const Slice* keys, int n,
}
}
bool BuiltinFilterPolicy::KeyMayMatch(const Slice& key,
const Slice& bloom_filter) const {
bool BloomFilterPolicy::KeyMayMatch(const Slice& key,
const Slice& bloom_filter) const {
const size_t len = bloom_filter.size();
if (len < 2 || len > 0xffffffffU) {
return false;
@ -1111,7 +1110,7 @@ bool BuiltinFilterPolicy::KeyMayMatch(const Slice& key,
array);
}
FilterBitsBuilder* BuiltinFilterPolicy::GetFilterBitsBuilder() const {
FilterBitsBuilder* BloomFilterPolicy::GetFilterBitsBuilder() const {
// This code path should no longer be used, for the built-in
// BloomFilterPolicy. Internal to RocksDB and outside
// BloomFilterPolicy, only get a FilterBitsBuilder with
@ -1185,7 +1184,7 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext(
// Read metadata to determine what kind of FilterBitsReader is needed
// and return a new one.
FilterBitsReader* BuiltinFilterPolicy::GetFilterBitsReader(
FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
const Slice& contents) const {
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
if (len_with_meta <= kMetadataLen) {
@ -1266,7 +1265,7 @@ FilterBitsReader* BuiltinFilterPolicy::GetFilterBitsReader(
log2_cache_line_size);
}
FilterBitsReader* BuiltinFilterPolicy::GetRibbonBitsReader(
FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
const Slice& contents) const {
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
uint32_t len = len_with_meta - kMetadataLen;
@ -1290,7 +1289,7 @@ FilterBitsReader* BuiltinFilterPolicy::GetRibbonBitsReader(
}
// For newer Bloom filter implementations
FilterBitsReader* BuiltinFilterPolicy::GetBloomBitsReader(
FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
const Slice& contents) const {
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
uint32_t len = len_with_meta - kMetadataLen;
@ -1363,50 +1362,10 @@ const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
return new BloomFilterPolicy(bits_per_key, m);
}
// Chooses between two filter policies based on LSM level
class LevelThresholdFilterPolicy : public BuiltinFilterPolicy {
public:
LevelThresholdFilterPolicy(std::unique_ptr<const FilterPolicy>&& a,
std::unique_ptr<const FilterPolicy>&& b,
int starting_level_for_b)
: policy_a_(std::move(a)),
policy_b_(std::move(b)),
starting_level_for_b_(starting_level_for_b) {
assert(starting_level_for_b_ >= 0);
}
// Deprecated block-based filter only
void CreateFilter(const Slice* keys, int n, std::string* dst) const override {
policy_a_->CreateFilter(keys, n, dst);
}
FilterBitsBuilder* GetBuilderWithContext(
const FilterBuildingContext& context) const override {
if (context.level_at_creation >= starting_level_for_b_) {
return policy_b_->GetBuilderWithContext(context);
} else {
return policy_a_->GetBuilderWithContext(context);
}
}
private:
const std::unique_ptr<const FilterPolicy> policy_a_;
const std::unique_ptr<const FilterPolicy> policy_b_;
int starting_level_for_b_;
};
extern const FilterPolicy* NewExperimentalRibbonFilterPolicy(
double bloom_equivalent_bits_per_key, int ribbon_starting_level) {
std::unique_ptr<const FilterPolicy> ribbon_only{new BloomFilterPolicy(
bloom_equivalent_bits_per_key, BloomFilterPolicy::kStandard128Ribbon)};
if (ribbon_starting_level > 0) {
std::unique_ptr<const FilterPolicy> bloom_only{new BloomFilterPolicy(
bloom_equivalent_bits_per_key, BloomFilterPolicy::kFastLocalBloom)};
return new LevelThresholdFilterPolicy(
std::move(bloom_only), std::move(ribbon_only), ribbon_starting_level);
} else {
return ribbon_only.release();
}
double bloom_equivalent_bits_per_key) {
return new BloomFilterPolicy(bloom_equivalent_bits_per_key,
BloomFilterPolicy::kStandard128Ribbon);
}
FilterBuildingContext::FilterBuildingContext(
@ -1437,18 +1396,10 @@ Status FilterPolicy::CreateFromString(
NewBloomFilterPolicy(bits_per_key, use_block_based_builder));
}
} else if (value.compare(0, kExpRibbonName.size(), kExpRibbonName) == 0) {
size_t pos = value.find(':', kExpRibbonName.size());
int ribbon_starting_level;
if (pos == std::string::npos) {
pos = value.size();
ribbon_starting_level = 1;
} else {
ribbon_starting_level = ParseInt(trim(value.substr(pos + 1)));
}
double bloom_equivalent_bits_per_key =
ParseDouble(trim(value.substr(kExpRibbonName.size(), pos)));
policy->reset(NewExperimentalRibbonFilterPolicy(
bloom_equivalent_bits_per_key, ribbon_starting_level));
ParseDouble(trim(value.substr(kExpRibbonName.size())));
policy->reset(
NewExperimentalRibbonFilterPolicy(bloom_equivalent_bits_per_key));
} else {
return Status::NotFound("Invalid filter policy name ", value);
#else

View File

@ -38,38 +38,10 @@ class BuiltinFilterBitsBuilder : public FilterBitsBuilder {
virtual double EstimatedFpRate(size_t num_entries, size_t bytes) = 0;
};
// Abstract base class for RocksDB built-in filter policies.
// This class is considered internal API and subject to change.
class BuiltinFilterPolicy : public FilterPolicy {
public:
// Shared name because any built-in policy can read filters from
// any other
const char* Name() const override;
// Deprecated block-based filter only
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override;
// Old API
FilterBitsBuilder* GetFilterBitsBuilder() const override;
// Read metadata to determine what kind of FilterBitsReader is needed
// and return a new one. This must successfully process any filter data
// generated by a built-in FilterBitsBuilder, regardless of the impl
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
private:
// For newer Bloom filter implementation(s)
FilterBitsReader* GetBloomBitsReader(const Slice& contents) const;
// For Ribbon filter implementation(s)
FilterBitsReader* GetRibbonBitsReader(const Slice& contents) const;
};
// RocksDB built-in filter policy for Bloom or Bloom-like filters.
// This class is considered internal API and subject to change.
// See NewBloomFilterPolicy.
class BloomFilterPolicy : public BuiltinFilterPolicy {
class BloomFilterPolicy : public FilterPolicy {
public:
// An internal marker for operating modes of BloomFilterPolicy, in terms
// of selecting an implementation. This makes it easier for tests to track
@ -116,9 +88,16 @@ class BloomFilterPolicy : public BuiltinFilterPolicy {
~BloomFilterPolicy() override;
const char* Name() const override;
// Deprecated block-based filter only
void CreateFilter(const Slice* keys, int n, std::string* dst) const override;
// Deprecated block-based filter only
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override;
FilterBitsBuilder* GetFilterBitsBuilder() const override;
// To use this function, call GetBuilderFromContext().
//
// Neither the context nor any objects therein should be saved beyond
@ -131,6 +110,12 @@ class BloomFilterPolicy : public BuiltinFilterPolicy {
// (An internal convenience function to save boilerplate.)
static FilterBitsBuilder* GetBuilderFromContext(const FilterBuildingContext&);
// Read metadata to determine what kind of FilterBitsReader is needed
// and return a new one. This must successfully process any filter data
// generated by a built-in FilterBitsBuilder, regardless of the impl
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
// Essentially for testing only: configured millibits/key
int GetMillibitsPerKey() const { return millibits_per_key_; }
// Essentially for testing only: legacy whole bits/key
@ -172,6 +157,12 @@ class BloomFilterPolicy : public BuiltinFilterPolicy {
// Sum over all generated filters f:
// (predicted_fp_rate(f) - predicted_fp_rate(f|o_f_f_m=false)) * 2^32
mutable std::atomic<int64_t> aggregate_rounding_balance_;
// For newer Bloom filter implementation(s)
FilterBitsReader* GetBloomBitsReader(const Slice& contents) const;
// For Ribbon filter implementation(s)
FilterBitsReader* GetRibbonBitsReader(const Slice& contents) const;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -61,6 +61,7 @@ default_params = {
"enable_pipelined_write": lambda: random.randint(0, 1),
"enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
"expected_values_path": lambda: setup_expected_values_file(),
"fail_if_options_file_error": lambda: random.randint(0, 1),
"flush_one_in": 1000000,
"file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
"get_live_files_one_in": 1000000,
@ -102,7 +103,7 @@ default_params = {
"mock_direct_io": False,
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
"ribbon_starting_level": lambda: random.randint(0, 10),
"use_ribbon_filter": lambda: random.randint(0, 1),
"verify_checksum": 1,
"write_buffer_size": 4 * 1024 * 1024,
"writepercent": 35,
@ -137,6 +138,7 @@ default_params = {
"max_key_len": 3,
"key_len_percent_dist": "1,30,69",
"read_fault_one_in": lambda: random.choice([0, 1000]),
"open_metadata_write_fault_one_in": lambda: random.choice([0, 8]),
"sync_fault_injection": False,
"get_property_one_in": 1000000,
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),

View File

@ -1195,51 +1195,6 @@ INSTANTIATE_TEST_CASE_P(Full, FullBloomTest,
BloomFilterPolicy::kFastLocalBloom,
BloomFilterPolicy::kStandard128Ribbon));
static double GetEffectiveBitsPerKey(FilterBitsBuilder* builder) {
union {
uint64_t key_value;
char key_bytes[8];
};
const unsigned kNumKeys = 1000;
Slice key_slice{key_bytes, 8};
for (key_value = 0; key_value < kNumKeys; ++key_value) {
builder->AddKey(key_slice);
}
std::unique_ptr<const char[]> buf;
auto filter = builder->Finish(&buf);
return filter.size() * /*bits per byte*/ 8 / (1.0 * kNumKeys);
}
TEST(RibbonTest, RibbonTestLevelThreshold) {
BlockBasedTableOptions opts;
FilterBuildingContext ctx(opts);
// A few settings
for (int ribbon_starting_level : {0, 1, 10}) {
std::unique_ptr<const FilterPolicy> policy{
NewExperimentalRibbonFilterPolicy(8, ribbon_starting_level)};
// Claim to be generating filter for this level
ctx.level_at_creation = ribbon_starting_level;
std::unique_ptr<FilterBitsBuilder> builder{
policy->GetBuilderWithContext(ctx)};
// Must be Ribbon (more space efficient than 8 bits per key)
ASSERT_LT(GetEffectiveBitsPerKey(builder.get()), 7.5);
if (ribbon_starting_level > 0) {
// Claim to be generating filter for this level
ctx.level_at_creation = ribbon_starting_level - 1;
builder.reset(policy->GetBuilderWithContext(ctx));
// Must be Bloom (~ 8 bits per key)
ASSERT_GT(GetEffectiveBitsPerKey(builder.get()), 7.5);
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -2716,19 +2716,19 @@ TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) {
OpenDBAndBackupEngine(true);
ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"));
std::string file_five = backupdir_ + "/shared/000008.sst";
std::string file_five = backupdir_ + "/shared/000009.sst";
std::string file_five_contents = "I'm not really a sst file";
// this depends on the fact that 00008.sst is the first file created by the DB
// this depends on the fact that 00009.sst is the first file created by the DB
ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents));
FillDB(db_.get(), 0, 100);
// backup overwrites file 000008.sst
// backup overwrites file 000009.sst
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
std::string new_file_five_contents;
ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five,
&new_file_five_contents));
// file 000008.sst was overwritten
// file 000009.sst was overwritten
ASSERT_TRUE(new_file_five_contents != file_five_contents);
CloseDBAndBackupEngine();

View File

@ -29,6 +29,7 @@
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
class CheckpointTest : public testing::Test {
@ -793,6 +794,50 @@ TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) {
db_ = nullptr;
}
TEST_F(CheckpointTest, CheckpointOptionsFileFailedToPersist) {
// Regression test for a bug where checkpoint failed on a DB where persisting
// OPTIONS file failed and the DB was opened with
// `fail_if_options_file_error == false`.
Options options = CurrentOptions();
options.fail_if_options_file_error = false;
auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
// Setup `FaultInjectionTestFS` and `SyncPoint` callbacks to fail one
// operation when inside the OPTIONS file persisting code.
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
fault_fs->SetRandomMetadataWriteError(1 /* one_in */);
SyncPoint::GetInstance()->SetCallBack(
"PersistRocksDBOptions:start", [fault_fs](void* /* arg */) {
fault_fs->EnableMetadataWriteErrorInjection();
});
SyncPoint::GetInstance()->SetCallBack(
"FaultInjectionTestFS::InjectMetadataWriteError:Injected",
[fault_fs](void* /* arg */) {
fault_fs->DisableMetadataWriteErrorInjection();
});
options.env = fault_fs_env.get();
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(Put("key1", "val1"));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
delete checkpoint;
// Make sure it's usable.
options.env = env_;
DB* snapshot_db;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ReadOptions read_opts;
std::string get_result;
ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result));
ASSERT_EQ("val1", get_result);
delete snapshot_db;
delete db_;
db_ = nullptr;
}
TEST_F(CheckpointTest, CheckpointReadOnlyDB) {
ASSERT_OK(Put("foo", "foo_value"));
ASSERT_OK(Flush());

View File

@ -22,6 +22,7 @@
#include "env/composite_env_wrapper.h"
#include "port/lang.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/random.h"
@ -87,8 +88,21 @@ IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
{
IOStatus in_s = fs_->InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
fs_->SyncDir(dirname_);
return dir_->Fsync(options, dbg);
IOStatus s = dir_->Fsync(options, dbg);
{
IOStatus in_s = fs_->InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
return s;
}
TestFSWritableFile::TestFSWritableFile(const std::string& fname,
@ -159,6 +173,12 @@ IOStatus TestFSWritableFile::Close(const IOOptions& options,
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
{
IOStatus in_s = fs_->InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
writable_file_opened_ = false;
IOStatus io_s;
io_s = target_->Append(state_.buffer_, options, dbg);
@ -170,6 +190,10 @@ IOStatus TestFSWritableFile::Close(const IOOptions& options,
}
if (io_s.ok()) {
fs_->WritableFileClosed(state_);
IOStatus in_s = fs_->InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
return io_s;
}
@ -294,6 +318,12 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
if (!IsFilesystemActive()) {
return GetError();
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
if (IsFilesystemDirectWritable()) {
return target()->NewWritableFile(fname, file_opts, result, dbg);
}
@ -305,11 +335,19 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
{
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
}
return io_s;
}
@ -323,6 +361,12 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
if (IsFilesystemDirectWritable()) {
return target()->ReopenWritableFile(fname, file_opts, result, dbg);
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
IOStatus io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(
@ -330,11 +374,19 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
{
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
}
return io_s;
}
@ -348,17 +400,31 @@ IOStatus FaultInjectionTestFS::NewRandomRWFile(
if (IsFilesystemDirectWritable()) {
return target()->NewRandomRWFile(fname, file_opts, result, dbg);
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
IOStatus io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(new TestFSRandomRWFile(fname, std::move(*result), this));
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
{
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
}
return io_s;
}
@ -385,9 +451,21 @@ IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
if (!IsFilesystemActive()) {
return GetError();
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
IOStatus io_s = FileSystemWrapper::DeleteFile(f, options, dbg);
if (io_s.ok()) {
UntrackFile(f);
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
}
return io_s;
}
@ -399,21 +477,33 @@ IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
if (!IsFilesystemActive()) {
return GetError();
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
IOStatus io_s = FileSystemWrapper::RenameFile(s, t, options, dbg);
if (io_s.ok()) {
MutexLock l(&mutex_);
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
db_file_state_.erase(s);
}
{
MutexLock l(&mutex_);
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
db_file_state_.erase(s);
}
auto sdn = TestFSGetDirAndName(s);
auto tdn = TestFSGetDirAndName(t);
if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
assert(tlist.find(tdn.second) == tlist.end());
tlist.insert(tdn.second);
auto sdn = TestFSGetDirAndName(s);
auto tdn = TestFSGetDirAndName(t);
if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
assert(tlist.find(tdn.second) == tlist.end());
tlist.insert(tdn.second);
}
}
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
@ -618,6 +708,19 @@ IOStatus FaultInjectionTestFS::InjectWriteError(const std::string& file_name) {
return IOStatus::OK();
}
IOStatus FaultInjectionTestFS::InjectMetadataWriteError() {
{
MutexLock l(&mutex_);
if (!enable_metadata_write_error_injection_ ||
!metadata_write_error_one_in_ ||
!write_error_rand_.OneIn(metadata_write_error_one_in_)) {
return IOStatus::OK();
}
}
TEST_SYNC_POINT("FaultInjectionTestFS::InjectMetadataWriteError:Injected");
return IOStatus::IOError();
}
void FaultInjectionTestFS::PrintFaultBacktrace() {
#if defined(OS_LINUX)
ErrorContext* ctx =

View File

@ -22,7 +22,7 @@
#include <string>
#include "file/filename.h"
#include "include/rocksdb/file_system.h"
#include "rocksdb/file_system.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/thread_local.h"
@ -174,7 +174,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
filesystem_writable_(false),
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)),
enable_write_error_injection_(false),
enable_metadata_write_error_injection_(false),
write_error_rand_(0),
write_error_one_in_(0),
metadata_write_error_one_in_(0),
ingest_data_corruption_before_write_(false) {}
virtual ~FaultInjectionTestFS() { error_.PermitUncheckedError(); }
@ -361,10 +364,18 @@ class FaultInjectionTestFS : public FileSystemWrapper {
write_error_allowed_types_ = types;
}
void SetRandomMetadataWriteError(int one_in) {
MutexLock l(&mutex_);
metadata_write_error_one_in_ = one_in;
}
// Inject an write error with randomlized parameter and the predefined
// error type. Only the allowed file types will inject the write error
IOStatus InjectWriteError(const std::string& file_name);
// Ingest error to metadata operations.
IOStatus InjectMetadataWriteError();
// Inject an error. For a READ operation, a status of IOError(), a
// corruption in the contents of scratch, or truncation of slice
// are the types of error with equal probability. For OPEN,
@ -397,6 +408,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
enable_write_error_injection_ = true;
}
void EnableMetadataWriteErrorInjection() {
MutexLock l(&mutex_);
enable_metadata_write_error_injection_ = true;
}
void DisableWriteErrorInjection() {
MutexLock l(&mutex_);
enable_write_error_injection_ = false;
@ -410,6 +426,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
}
}
void DisableMetadataWriteErrorInjection() {
MutexLock l(&mutex_);
enable_metadata_write_error_injection_ = false;
}
// We capture a backtrace every time a fault is injected, for debugging
// purposes. This call prints the backtrace to stderr and frees the
// saved callstack
@ -456,8 +477,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
std::unique_ptr<ThreadLocalPtr> thread_local_error_;
bool enable_write_error_injection_;
bool enable_metadata_write_error_injection_;
Random write_error_rand_;
int write_error_one_in_;
int metadata_write_error_one_in_;
std::vector<FileType> write_error_allowed_types_;
bool ingest_data_corruption_before_write_;
ChecksumType checksum_handoff_func_tpye_;

View File

@ -145,8 +145,10 @@ TEST_F(MemoryTest, MemTableAndTableReadersTotal) {
std::vector<uint64_t> usage_by_type;
std::vector<std::vector<ColumnFamilyHandle*>> vec_handles;
const int kNumDBs = 10;
// These key/value sizes ensure each KV has its own memtable. Note that the
// minimum write_buffer_size allowed is 64 KB.
const int kKeySize = 100;
const int kValueSize = 500;
const int kValueSize = 1 << 16;
Options opt;
opt.create_if_missing = true;
opt.create_missing_column_families = true;