Compare commits

...

16 Commits

Author SHA1 Message Date
Peter Dillinger
f55aab3d18 Fix EnvLibrados and add to CI (#9088)
Summary:
This feature was not part of any common or CI build, so no
surprise it broke. Now we can at least ensure compilation. I don't know
how to run the test successfully (missing config file) so it is bypassed
for now.

Fixes https://github.com/facebook/rocksdb/issues/9078

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9088

Test Plan: CI

Reviewed By: mrambacher

Differential Revision: D32009467

Pulled By: pdillinger

fbshipit-source-id: 3e0d1e5fde7f0ece703d48a81479e1cc7392c25c
2021-10-29 09:21:24 -07:00
Andrew Kryczka
0103296f39 update HISTORY.md and version.h for 6.25.3 2021-10-14 10:46:37 -07:00
Andrew Kryczka
fa4e0558bf Fix sequence number bump logic in multi-CF SST ingestion (#9005)
Summary:
The code in `IngestExternalFiles()` that bumps the DB's sequence number
depending on what seqnos were assigned to the files has 3 bugs:

1) There is an assertion that the sequence number is increased in all the
affected column families, but this is unnecessary, it is fine if some files can
stick to a lower sequence number. It is very easy to hit the assertion: it is
sufficient to insert 2 files in 2 CFs, one which overlaps the CF and one that
doesn't (for example the CF is empty). The line added in the
`IngestFilesIntoMultipleColumnFamilies_Success` test makes the assertion fail.

2) SetLastSequence() is called with the sum of all the bumps across CFs, but we
should take the maximum instead, as all CFs start with the current seqno and bump
it independently.

3) The code above is accidentally under a `#ifndef NDEBUG`, so it doesn't run in
optimized builds, so some files may be assigned seqnos from the future.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9005

Test Plan:
Added line in `IngestFilesIntoMultipleColumnFamilies_Success` that
triggers the assertion, verified that the test (and all the others) pass after
the fix.

Reviewed By: ajkr

Differential Revision: D31597892

Pulled By: ot

fbshipit-source-id: c2d3237f90290df1178736ace8653a9623f5a770
2021-10-14 10:45:19 -07:00
Giuseppe Ottaviano
3ebe8658d0 Fix race in WriteBufferManager (#9009)
Summary:
EndWriteStall has a data race: `queue_.empty()` is checked outside of the
mutex, so once we enter the critical section another thread may already have
cleared the list, and accessing the `front()` is undefined behavior (and causes
interesting crashes under high concurrency).

This PR fixes the bug, and also rewrites the logic to make it easier to reason
about it. It also fixes another subtle bug: if some writers are stalled and
`SetBufferSize(0)` is called, which disables the WBM, the writer are not
unblocked because of an early `enabled()` check in `EndWriteStall()`.

It doesn't significantly change the locking behavior, as before writers won't
lock unless entering a stall condition, and `FreeMem` almost always locks if
stalling is allowed, but that is inevitable with the current design. Liveness is
guaranteed by the fact that if some writes are blocked, eventually all writes
will be blocked due to `stall_active_`, and eventually all memory is freed.

While at it, do a couple of optimizations:

- In `WBMStallInterface::Signal()` signal the CV only after releasing the
  lock. Signaling under the lock is a common pitfall, as it causes the woken-up
  thread to immediately go back to sleep because the mutex is still locked by
  the awaker.

- Move all allocations and deallocations outside of the lock.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9009

Test Plan:
```
USE_CLANG=1 make -j64 all check
```

Reviewed By: akankshamahajan15

Differential Revision: D31550668

Pulled By: ot

fbshipit-source-id: 5125387c3dc7ecaaa2b8bbc736e58c4156698580
2021-10-14 10:44:46 -07:00
Andrew Kryczka
430fd40e87 update HISTORY.md and version.h for 6.25.2 2021-10-11 16:42:46 -07:00
Andrew Kryczka
2df8905531 Protect existing files in FaultInjectionTest{Env,FS}::ReopenWritableFile() (#8995)
Summary:
`FaultInjectionTest{Env,FS}::ReopenWritableFile()` functions were accidentally deleting WALs from previous `db_stress` runs causing verification to fail. They were operating under the assumption that `ReopenWritableFile()` would delete any existing file. It was a reasonable assumption considering the `{Env,FileSystem}::ReopenWritableFile()` documentation stated that would happen. The only problem was neither the implementations we offer nor the "real" clients in RocksDB code followed that contract. So, this PR updates the contract as well as fixing the fault injection client usage.

The fault injection change exposed that `ExternalSSTFileBasicTest.SyncFailure` was relying on a fault injection `Env` dropping unsynced data written by a regular `Env`. I changed that test to make its `SstFileWriter` use fault injection `Env`, and also implemented `LinkFile()` in fault injection so the unsynced data is tracked under the new name.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8995

Test Plan:
- Verified it fixes the following failure:

```
$ ./db_stress --clear_column_family_one_in=0 --column_families=1 --db=/dev/shm/rocksdb_crashtest_whitebox --delpercent=5 --expected_values_dir=/dev/shm/rocksdb_crashtest_expected --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=100000 --max_key_len=3 --nooverwritepercent=1 --ops_per_thread=1000 --prefixpercent=0 --readpercent=60 --reopen=0 --target_file_size_base=1048576 --test_batches_snapshots=0 --write_buffer_size=1048576 --writepercent=35 --value_size_mult=33 -threads=1
...
$ ./db_stress --avoid_flush_during_recovery=1 --clear_column_family_one_in=0 --column_families=1 --db=/dev/shm/rocksdb_crashtest_whitebox --delpercent=5 --destroy_db_initially=0 --expected_values_dir=/dev/shm/rocksdb_crashtest_expected --iterpercent=10 --key_len_percent_dist=1,30,69 --max_bytes_for_level_base=4194304 --max_key=100000 --max_key_len=3 --nooverwritepercent=1 --open_files=-1 --open_metadata_write_fault_one_in=8 --open_write_fault_one_in=16 --ops_per_thread=1000 --prefix_size=-1 --prefixpercent=0 --readpercent=50 --sync=1 --target_file_size_base=1048576 --test_batches_snapshots=0 --write_buffer_size=1048576 --writepercent=35 --value_size_mult=33 -threads=1
...
Verification failed for column family 0 key 000000000000001300000000000000857878787878 (1143): Value not found: NotFound:
Crash-recovery verification failed :(
...
```

- `make check -j48`

Reviewed By: ltamasi

Differential Revision: D31495388

Pulled By: ajkr

fbshipit-source-id: 7886ccb6a07cb8b78ad7b6c1c341ccf40bb68385
2021-10-11 16:39:36 -07:00
Hui Xiao
ffd4e9675e Update HISTORY.md for #8428 (#9001)
Summary:
Context:
HISTORY.md was not properly updated along with the change in https://github.com/facebook/rocksdb/pull/8428, where we introduced a change of accounting compression dictionary buffering memory and an extra condition of triggering data unbuffering.
Updated HISTORY.md for https://github.com/facebook/rocksdb/pull/8428 in 6.25.0 HISTORY.md section.
Updated blog post https://rocksdb.org/blog/2021/05/31/dictionary-compression.html.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9001

Reviewed By: ajkr

Differential Revision: D31517836

Pulled By: hx235

fbshipit-source-id: 01f6b30de4e1ff6b315aa8221139d9b700c7c629
2021-10-11 16:38:13 -07:00
Andrew Kryczka
ab2aceb4f5 Cancel manual compactions waiting on automatic compactions to drain (#8991)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8991

Test Plan: the new test hangs forever without this fix and passes with this fix.

Reviewed By: hx235

Differential Revision: D31456419

Pulled By: ajkr

fbshipit-source-id: a82c0e5560b6e6153089dccd8e46163c61b07bff
2021-10-11 16:36:54 -07:00
Yanqin Jin
307a65525a Bump version 2021-09-28 13:24:51 -07:00
Yanqin Jin
9e15f7bff3 Sort per-file blob read requests by offset (#8953)
Summary:
`RandomAccessFileReader::MultiRead()` tries to merge requests in direct IO, assuming input IO requests are
sorted by offsets.

Add a test in direct IO mode.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8953

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D31183546

Pulled By: riversand963

fbshipit-source-id: 5d043ec68e2daa47a3149066150afd41ee3d73e6
2021-09-28 13:24:17 -07:00
Hui Xiao
14e3f8cd9b Return Status::NotSupported() in RateLimiter::GetTotalPendingRequests default impl (#8950)
Summary:
Context:
After more discussion, a fix in https://github.com/facebook/rocksdb/issues/8938 might turn out to be too restrictive for the case where `GetTotalPendingRequests` might be invoked on RateLimiter classes that does not support the recently added API `RateLimiter::GetTotalPendingRequests` (https://github.com/facebook/rocksdb/issues/8890) due to the `assert(false)` in https://github.com/facebook/rocksdb/issues/8938. Furthermore, sentinel value like `-1` proposed in https://github.com/facebook/rocksdb/issues/8938 is easy to be ignored and unchecked. Therefore we decided to adopt `Status::NotSupported()`, which is also a convention of adding new API to public header in RocksDB.
- Changed return value type of  `RateLimiter::GetTotalPendingRequests` in related declaration/definition
- Passed in pointer argument to hold the output instead of returning it as before
- Adapted to the changes above in calling `RateLimiter::GetTotalPendingRequests` in test
- Minor improvement to `TEST_F(RateLimiterTest, GetTotalPendingRequests)`:  added failure message for assertion and replaced repetitive statements with a loop

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8950

Reviewed By: ajkr, pdillinger

Differential Revision: D31128450

Pulled By: hx235

fbshipit-source-id: 282ac9c4f3dacaa0aec6d0a993161f77ad47a040
2021-09-22 21:41:04 -07:00
sdong
0fb79b0aca Add HISTORY.md entry to a recent bug fix. (#8948)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/8948

Reviewed By: anand1976

Differential Revision: D31127368

fbshipit-source-id: a374cb0baf88c3e15cd587a8f31e8a2d84432928
2021-09-22 16:25:45 -07:00
sdong
ff033921d8 RandomAccessFileReader::MultiRead() should not return read bytes not read (#8941)
Summary:
Right now, if underlying read returns fewer bytes than asked for, RandomAccessFileReader::MultiRead() still returns those in the buffer to upper layer. This can be a surprise to upper layer.
This is unlikely to cause incorrect data. To cause incorrect data, checksum checking in upper layer should pass with short reads, whose chance is low.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8941

Test Plan: Run stress tests for a while

Reviewed By: anand1976

Differential Revision: D31085780

fbshipit-source-id: 999adf2d6c2712f1323d14bb68b678df59969973
2021-09-22 15:26:10 -07:00
Peter Dillinger
33f11b2625 Clean up HISTORY 2021-09-21 21:59:24 -07:00
Hui Xiao
c92f7a29aa Make RateLimiter::GetTotalPendingRequest() non pure virtual for backward compability (#8938)
Summary:
Context/Summary:
https://github.com/facebook/rocksdb/pull/8890 added a public API `RateLimiter::GetTotalPendingRequest()` but mistakenly marked it as pure virtual, forcing RateLimiter's derived classes to implement this function and breaking backward compatibility.

This PR makes `RateLimiter::GetTotalPendingRequest()` as non-pure virtual method by providing a trivial implementation in rate_limiter.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8938

Test Plan: Passing existing tests

Reviewed By: pdillinger

Differential Revision: D31100661

Pulled By: hx235

fbshipit-source-id: 06eff1005156a6e5a881e393b2c5b2ad706897d8
2021-09-21 21:48:55 -07:00
Peter Dillinger
e74dfee7fc Finish BackupEngine migration to IOStatus (#8940)
Summary:
Updates a few remaining functions that should have been updated
from Status -> IOStatus, and adds to HISTORY for the overall change
including https://github.com/facebook/rocksdb/issues/8820.

This change is for inclusion in version 6.25.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8940

Test Plan: CI

Reviewed By: zhichao-cao

Differential Revision: D31085029

Pulled By: pdillinger

fbshipit-source-id: 91557c6a39ef1d90357d4f4dcd79af0645d87c7b
2021-09-21 21:48:34 -07:00
35 changed files with 777 additions and 224 deletions

View File

@ -98,6 +98,13 @@ commands:
command: | command: |
sudo apt-get update -y && sudo apt-get install -y libbenchmark-dev sudo apt-get update -y && sudo apt-get install -y libbenchmark-dev
install-librados:
steps:
- run:
name: Install librados
command: |
sudo apt-get update -y && sudo apt-get install -y librados-dev
upgrade-cmake: upgrade-cmake:
steps: steps:
- run: - run:
@ -171,14 +178,15 @@ jobs:
- run: make V=1 J=32 -j32 check | .circleci/cat_ignore_eagain - run: make V=1 J=32 -j32 check | .circleci/cat_ignore_eagain
- post-steps - post-steps
build-linux-mem-env: build-linux-mem-env-librados:
machine: machine:
image: ubuntu-1604:202104-01 image: ubuntu-1604:202104-01
resource_class: 2xlarge resource_class: 2xlarge
steps: steps:
- pre-steps - pre-steps
- install-gflags - install-gflags
- run: MEM_ENV=1 make V=1 J=32 -j32 check | .circleci/cat_ignore_eagain - install-librados
- run: MEM_ENV=1 ROCKSDB_USE_LIBRADOS=1 make V=1 J=32 -j32 check | .circleci/cat_ignore_eagain
- post-steps - post-steps
build-linux-encrypted-env: build-linux-encrypted-env:
@ -698,9 +706,9 @@ workflows:
jobs: jobs:
- build-linux-cmake - build-linux-cmake
- build-linux-cmake-ubuntu-20 - build-linux-cmake-ubuntu-20
build-linux-mem-env: build-linux-mem-env-librados:
jobs: jobs:
- build-linux-mem-env - build-linux-mem-env-librados
build-linux-encrypted-env: build-linux-encrypted-env:
jobs: jobs:
- build-linux-encrypted-env - build-linux-encrypted-env

View File

@ -1,8 +1,18 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## 6.25.3 (2021-10-14)
### Bug Fixes ### Bug Fixes
### New Features * Fixed bug in calls to `IngestExternalFiles()` with files for multiple column families. The bug could have introduced a delay in ingested file keys becoming visible after `IngestExternalFiles()` returned. Furthermore, mutations to ingested file keys while they were invisible could have been dropped (not necessarily immediately).
### Public API change * Fixed a possible race condition impacting users of `WriteBufferManager` who constructed it with `allow_stall == true`. The race condition led to undefined behavior (in our experience, typically a process crash).
* Fixed a bug where stalled writes would remain stalled forever after the user calls `WriteBufferManager::SetBufferSize()` with `new_size == 0` to dynamically disable memory limiting.
## 6.25.2 (2021-10-11)
### Bug Fixes
* Fix `DisableManualCompaction()` to cancel compactions even when they are waiting on automatic compactions to drain due to `CompactRangeOptions::exclusive_manual_compactions == true`.
* Fix contract of `Env::ReopenWritableFile()` and `FileSystem::ReopenWritableFile()` to specify any existing file must not be deleted or truncated.
## 6.25.1 (2021-09-28)
### Bug Fixes
* Fixes a bug in directed IO mode when calling MultiGet() for blobs in the same blob file. The bug is caused by not sorting the blob read requests by file offsets.
## 6.25.0 (2021-09-20) ## 6.25.0 (2021-09-20)
### Bug Fixes ### Bug Fixes
@ -18,6 +28,7 @@
* Fix WAL log data corruption when using DBOptions.manual_wal_flush(true) and WriteOptions.sync(true) together. The sync WAL should work with locked log_write_mutex_. * Fix WAL log data corruption when using DBOptions.manual_wal_flush(true) and WriteOptions.sync(true) together. The sync WAL should work with locked log_write_mutex_.
* Add checks for validity of the IO uring completion queue entries, and fail the BlockBasedTableReader MultiGet sub-batch if there's an invalid completion * Add checks for validity of the IO uring completion queue entries, and fail the BlockBasedTableReader MultiGet sub-batch if there's an invalid completion
* Add an interface RocksDbIOUringEnable() that, if defined by the user, will allow them to enable/disable the use of IO uring by RocksDB * Add an interface RocksDbIOUringEnable() that, if defined by the user, will allow them to enable/disable the use of IO uring by RocksDB
* Fix the bug that when direct I/O is used and MultiRead() returns a short result, RandomAccessFileReader::MultiRead() still returns full size buffer, with returned short value together with some data in original buffer. This bug is unlikely cause incorrect results, because (1) since FileSystem layer is expected to retry on short result, returning short results is only possible when asking more bytes in the end of the file, which RocksDB doesn't do when using MultiRead(); (2) checksum is unlikely to match.
### New Features ### New Features
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions. * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.
@ -31,14 +42,16 @@
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
* Batch blob read requests for `DB::MultiGet` using `MultiRead`. * Batch blob read requests for `DB::MultiGet` using `MultiRead`.
* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result. * Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
* Add built-in rate limiter's implementation of `RateLimiter::GetTotalPendingRequest(int64_t* total_pending_requests, const Env::IOPriority pri)` for the total number of requests that are pending for bytes in the rate limiter.
* Charge memory usage during data buffering, from which training samples are gathered for dictionary compression, to block cache. Unbuffering data can now be triggered if the block cache becomes full and `strict_capacity_limit=true` for the block cache, in addition to existing conditions that can trigger unbuffering.
### Public API change ### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API * Remove obsolete implementation details FullKey and ParseFullKey from public API
* Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter.
* Change `SstFileMetaData::size` from `size_t` to `uint64_t`. * Change `SstFileMetaData::size` from `size_t` to `uint64_t`.
* Made Statistics extend the Customizable class and added a CreateFromString method. Implementations of Statistics need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. * Made Statistics extend the Customizable class and added a CreateFromString method. Implementations of Statistics need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
* Extended `FlushJobInfo` and `CompactionJobInfo` in listener.h to provide information about the blob files generated by a flush/compaction and garbage collected during compaction in Integrated BlobDB. Added struct members `blob_file_addition_infos` and `blob_file_garbage_infos` that contain this information. * Extended `FlushJobInfo` and `CompactionJobInfo` in listener.h to provide information about the blob files generated by a flush/compaction and garbage collected during compaction in Integrated BlobDB. Added struct members `blob_file_addition_infos` and `blob_file_garbage_infos` that contain this information.
* Extended parameter `output_file_names` of `CompactFiles` API to also include paths of the blob files generated by the compaction in Integrated BlobDB. * Extended parameter `output_file_names` of `CompactFiles` API to also include paths of the blob files generated by the compaction in Integrated BlobDB.
* Most `BackupEngine` functions now return `IOStatus` instead of `Status`. Most existing code should be compatible with this change but some calls might need to be updated.
## 6.24.0 (2021-08-20) ## 6.24.0 (2021-08-20)
### Bug Fixes ### Bug Fixes

View File

@ -222,6 +222,7 @@ am__v_AR_1 =
ifdef ROCKSDB_USE_LIBRADOS ifdef ROCKSDB_USE_LIBRADOS
LIB_SOURCES += utilities/env_librados.cc LIB_SOURCES += utilities/env_librados.cc
TEST_MAIN_SOURCES += utilities/env_librados_test.cc
LDFLAGS += -lrados LDFLAGS += -lrados
endif endif

View File

@ -127,6 +127,197 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) {
} }
} }
#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) {
Options options = GetDefaultOptions();
// First, create an external SST file ["b"].
const std::string file_path = dbname_ + "/test.sst";
{
SstFileWriter sst_file_writer(EnvOptions(), GetDefaultOptions());
Status s = sst_file_writer.Open(file_path);
ASSERT_OK(s);
ASSERT_OK(sst_file_writer.Put("b", "b_value"));
ASSERT_OK(sst_file_writer.Finish());
}
options.enable_blob_files = true;
options.min_blob_size = 1000;
options.use_direct_reads = true;
options.allow_ingest_behind = true;
// Open DB with fixed-prefix sst-partitioner so that compaction will cut
// new table file when encountering a new key whose 1-byte prefix changes.
constexpr size_t key_len = 1;
options.sst_partitioner_factory =
NewSstPartitionerFixedPrefixFactory(key_len);
Status s = TryReopen(options);
if (s.IsInvalidArgument()) {
ROCKSDB_GTEST_SKIP("This test requires direct IO support");
return;
}
ASSERT_OK(s);
constexpr size_t num_keys = 3;
constexpr size_t blob_size = 3000;
constexpr char first_key[] = "a";
const std::string first_blob(blob_size, 'a');
ASSERT_OK(Put(first_key, first_blob));
constexpr char second_key[] = "b";
const std::string second_blob(2 * blob_size, 'b');
ASSERT_OK(Put(second_key, second_blob));
constexpr char third_key[] = "d";
const std::string third_blob(blob_size, 'd');
ASSERT_OK(Put(third_key, third_blob));
// first_blob, second_blob and third_blob in the same blob file.
// SST Blob file
// L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'|
// | | | ^ ^ ^
// | | | | | |
// | | +---------|-------|--------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_OK(Flush());
constexpr char fourth_key[] = "c";
const std::string fourth_blob(blob_size, 'c');
ASSERT_OK(Put(fourth_key, fourth_blob));
// fourth_blob in another blob file.
// SST Blob file SST Blob file
// L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| ["c"] |'cccc'|
// | | | ^ ^ ^ | ^
// | | | | | | | |
// | | +---------|-------|--------+ +-------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
// Due to the above sst partitioner, we get 4 L1 files. The blob files are
// unchanged.
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["b"] ["c"] | | ["d"] |
// | | | | | |
// | | +---------|-------|---------------+
// | +-----------------|-------+
// +-------------------------+
ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
{
// Ingest the external SST file into bottommost level.
std::vector<std::string> ext_files{file_path};
IngestExternalFileOptions opts;
opts.ingest_behind = true;
ASSERT_OK(
db_->IngestExternalFile(db_->DefaultColumnFamily(), ext_files, opts));
}
// Now the database becomes as follows.
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["b"] ["c"] | | ["d"] |
// | | | | | |
// | | +---------|-------|---------------+
// | +-----------------|-------+
// +-------------------------+
//
// L6 ["b"]
{
// Compact ["b"] to bottommost level.
Slice begin = Slice(second_key);
Slice end = Slice(second_key);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, &begin, &end));
}
// |'aaaa', 'bbbb', 'dddd'| |'cccc'|
// ^ ^ ^ ^
// | | | |
// L0 | | | |
// L1 ["a"] ["c"] | | ["d"] |
// | | | | |
// | +---------|-------|---------------+
// | +-----------------|-------+
// +-------|-----------------+
// |
// L6 ["b"]
ASSERT_EQ(3, NumTableFilesAtLevel(/*level=*/1));
ASSERT_EQ(1, NumTableFilesAtLevel(/*level=*/6));
bool called = false;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* arg) {
auto* aligned_reqs = static_cast<std::vector<FSReadRequest>*>(arg);
assert(aligned_reqs);
ASSERT_EQ(1, aligned_reqs->size());
called = true;
});
SyncPoint::GetInstance()->EnableProcessing();
std::array<Slice, num_keys> keys{{first_key, third_key, second_key}};
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
// The MultiGet(), when constructing the KeyContexts, will process the keys
// in such order: a, d, b. The reason is that ["a"] and ["d"] are in L1,
// while ["b"] resides in L6.
// Consequently, the original FSReadRequest list prepared by
// Version::MultiGetblob() will be for "a", "d" and "b". It is unsorted as
// follows:
//
// ["a", offset=30, len=3033],
// ["d", offset=9096, len=3033],
// ["b", offset=3063, len=6033]
//
// If we do not sort them before calling MultiRead() in DirectIO, then the
// underlying IO merging logic will yield two requests.
//
// [offset=0, len=4096] (for "a")
// [offset=0, len=12288] (result of merging the request for "d" and "b")
//
// We need to sort them in Version::MultiGetBlob() so that the underlying
// IO merging logic in DirectIO mode works as expected. The correct
// behavior will be one aligned request:
//
// [offset=0, len=12288]
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(called);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_blob);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], third_blob);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], second_blob);
}
}
#endif // !ROCKSDB_LITE
TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.enable_blob_files = true; options.enable_blob_files = true;

View File

@ -6817,6 +6817,49 @@ TEST_F(DBCompactionTest, FIFOWarm) {
Destroy(options); Destroy(options);
} }
TEST_F(DBCompactionTest,
DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) {
// When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait
// for automatic compactions to drain before starting the manual compaction.
// This test verifies `DisableManualCompaction()` can cancel such a compaction
// without waiting for the drain to complete.
const int kNumL0Files = 4;
// Enforces manual compaction enters wait loop due to pending automatic
// compaction.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkCompaction", "DBImpl::RunManualCompaction:NotScheduled"},
{"DBImpl::RunManualCompaction:WaitScheduled",
"BackgroundCallCompaction:0"}});
// The automatic compaction will cancel the waiting manual compaction.
// Completing this implies the cancellation did not wait on automatic
// compactions to finish.
bool callback_completed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void* /*arg*/) {
db_->DisableManualCompaction();
callback_completed = true;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
for (int i = 0; i < kNumL0Files; ++i) {
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value2"));
ASSERT_OK(Flush());
}
CompactRangeOptions cro;
cro.exclusive_manual_compaction = true;
ASSERT_TRUE(db_->CompactRange(cro, nullptr, nullptr).IsIncomplete());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(callback_completed);
}
#endif // !defined(ROCKSDB_LITE) #endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -4679,14 +4679,11 @@ Status DBImpl::IngestExternalFiles(
if (status.ok()) { if (status.ok()) {
int consumed_seqno_count = int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount(); ingestion_jobs[0].ConsumedSequenceNumbersCount();
#ifndef NDEBUG
for (size_t i = 1; i != num_cfs; ++i) { for (size_t i = 1; i != num_cfs; ++i) {
assert(!!consumed_seqno_count == consumed_seqno_count =
!!ingestion_jobs[i].ConsumedSequenceNumbersCount()); std::max(consumed_seqno_count,
consumed_seqno_count += ingestion_jobs[i].ConsumedSequenceNumbersCount());
ingestion_jobs[i].ConsumedSequenceNumbersCount();
} }
#endif
if (consumed_seqno_count > 0) { if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence(); const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);

View File

@ -1099,8 +1099,10 @@ class DBImpl : public DB {
// Called from WriteBufferManager. This function changes the state_ // Called from WriteBufferManager. This function changes the state_
// to State::RUNNING indicating the stall is cleared and DB can proceed. // to State::RUNNING indicating the stall is cleared and DB can proceed.
void Signal() override { void Signal() override {
MutexLock lock(&state_mutex_); {
state_ = State::RUNNING; MutexLock lock(&state_mutex_);
state_ = State::RUNNING;
}
state_cv_.Signal(); state_cv_.Signal();
} }

View File

@ -1714,8 +1714,11 @@ Status DBImpl::RunManualCompaction(
// When a manual compaction arrives, temporarily disable scheduling of // When a manual compaction arrives, temporarily disable scheduling of
// non-manual compactions and wait until the number of scheduled compaction // non-manual compactions and wait until the number of scheduled compaction
// jobs drops to zero. This is needed to ensure that this manual compaction // jobs drops to zero. This used to be needed to ensure that this manual
// can compact any range of keys/files. // compaction can compact any range of keys/files. Now it is optional
// (see `CompactRangeOptions::exclusive_manual_compaction`). The use case for
// `exclusive_manual_compaction=true` (the default) is unclear beyond not
// trusting the new code.
// //
// HasPendingManualCompaction() is true when at least one thread is inside // HasPendingManualCompaction() is true when at least one thread is inside
// RunManualCompaction(), i.e. during that time no other compaction will // RunManualCompaction(), i.e. during that time no other compaction will
@ -1729,8 +1732,20 @@ Status DBImpl::RunManualCompaction(
AddManualCompaction(&manual); AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) { if (exclusive) {
// Limitation: there's no way to wake up the below loop when user sets
// `*manual.canceled`. So `CompactRangeOptions::exclusive_manual_compaction`
// and `CompactRangeOptions::canceled` might not work well together.
while (bg_bottom_compaction_scheduled_ > 0 || while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) { bg_compaction_scheduled_ > 0) {
if (manual_compaction_paused_ > 0 ||
(manual.canceled != nullptr && *manual.canceled == true)) {
// Pretend the error came from compaction so the below cleanup/error
// handling code can process it.
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
break;
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled"); TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
immutable_db_options_.info_log, immutable_db_options_.info_log,
@ -2223,6 +2238,10 @@ Status DBImpl::EnableAutoCompaction(
void DBImpl::DisableManualCompaction() { void DBImpl::DisableManualCompaction() {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
manual_compaction_paused_.fetch_add(1, std::memory_order_release); manual_compaction_paused_.fetch_add(1, std::memory_order_release);
// Wake up manual compactions waiting to start.
bg_cv_.SignalAll();
// Wait for any pending manual compactions to finish (typically through // Wait for any pending manual compactions to finish (typically through
// failing with `Status::Incomplete`) prior to returning. This way we are // failing with `Status::Incomplete`) prior to returning. This way we are
// guaranteed no pending manual compaction will commit while manual // guaranteed no pending manual compaction will commit while manual

View File

@ -3937,6 +3937,56 @@ TEST_F(DBTest, DISABLED_RateLimitingTest) {
ASSERT_LT(ratio, 0.6); ASSERT_LT(ratio, 0.6);
} }
// This is a mocked customed rate limiter without implementing optional APIs
// (e.g, RateLimiter::GetTotalPendingRequests())
class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter {
public:
MockedRateLimiterWithNoOptionalAPIImpl() {}
~MockedRateLimiterWithNoOptionalAPIImpl() override {}
void SetBytesPerSecond(int64_t bytes_per_second) override {
(void)bytes_per_second;
}
using RateLimiter::Request;
void Request(const int64_t bytes, const Env::IOPriority pri,
Statistics* stats) override {
(void)bytes;
(void)pri;
(void)stats;
}
int64_t GetSingleBurstBytes() const override { return 200; }
int64_t GetTotalBytesThrough(
const Env::IOPriority pri = Env::IO_TOTAL) const override {
(void)pri;
return 0;
}
int64_t GetTotalRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const override {
(void)pri;
return 0;
}
int64_t GetBytesPerSecond() const override { return 0; }
};
// To test that customed rate limiter not implementing optional APIs (e.g,
// RateLimiter::GetTotalPendingRequests()) works fine with RocksDB basic
// operations (e.g, Put, Get, Flush)
TEST_F(DBTest, CustomedRateLimiterWithNoOptionalAPIImplTest) {
Options options = CurrentOptions();
options.rate_limiter.reset(new MockedRateLimiterWithNoOptionalAPIImpl());
DestroyAndReopen(options);
ASSERT_OK(Put("abc", "def"));
ASSERT_EQ(Get("abc"), "def");
ASSERT_OK(Flush());
ASSERT_EQ(Get("abc"), "def");
}
TEST_F(DBTest, TableOptionsSanitizeTest) { TEST_F(DBTest, TableOptionsSanitizeTest) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;

View File

@ -1147,7 +1147,7 @@ TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
} }
Options sst_file_writer_options; Options sst_file_writer_options;
sst_file_writer_options.env = env_; sst_file_writer_options.env = fault_injection_test_env_.get();
std::unique_ptr<SstFileWriter> sst_file_writer( std::unique_ptr<SstFileWriter> sst_file_writer(
new SstFileWriter(EnvOptions(), sst_file_writer_options)); new SstFileWriter(EnvOptions(), sst_file_writer_options));
std::string file_name = std::string file_name =

View File

@ -139,7 +139,7 @@ class ExternalSstFileIngestionJob {
IngestedFileInfo* file_to_ingest, IngestedFileInfo* file_to_ingest,
SuperVersion* sv); SuperVersion* sv);
// Assign `file_to_ingest` the appropriate sequence number and the lowest // Assign `file_to_ingest` the appropriate sequence number and the lowest
// possible level that it can be ingested to according to compaction_style. // possible level that it can be ingested to according to compaction_style.
// REQUIRES: Mutex held // REQUIRES: Mutex held
Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,

View File

@ -2421,6 +2421,12 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = fault_injection_env.get(); options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu", "eevee"}, options); CreateAndReopenWithCF({"pikachu", "eevee"}, options);
// Exercise different situations in different column families: two are empty
// (so no new sequence number is needed), but at least one overlaps with the
// DB and needs to bump the sequence number.
ASSERT_OK(db_->Put(WriteOptions(), "foo1", "oldvalue"));
std::vector<ColumnFamilyHandle*> column_families; std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]); column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]); column_families.push_back(handles_[1]);

View File

@ -1866,7 +1866,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
void Version::MultiGetBlob( void Version::MultiGetBlob(
const ReadOptions& read_options, MultiGetRange& range, const ReadOptions& read_options, MultiGetRange& range,
const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) { std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
if (read_options.read_tier == kBlockCacheTier) { if (read_options.read_tier == kBlockCacheTier) {
Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
for (const auto& elem : blob_rqs) { for (const auto& elem : blob_rqs) {
@ -1916,7 +1916,14 @@ void Version::MultiGetBlob(
const CompressionType compression = const CompressionType compression =
blob_file_reader.GetValue()->GetCompressionType(); blob_file_reader.GetValue()->GetCompressionType();
// TODO: sort blobs_in_file by file offset. // sort blobs_in_file by file offset.
std::sort(
blobs_in_file.begin(), blobs_in_file.end(),
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
assert(lhs.first.file_number() == rhs.first.file_number());
return lhs.first.offset() < rhs.first.offset();
});
autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts; autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
autovector<std::reference_wrapper<const Slice>> user_keys; autovector<std::reference_wrapper<const Slice>> user_keys;
autovector<uint64_t> offsets; autovector<uint64_t> offsets;

View File

@ -713,11 +713,11 @@ class Version {
const BlobIndex& blob_index, PinnableSlice* value, const BlobIndex& blob_index, PinnableSlice* value,
uint64_t* bytes_read) const; uint64_t* bytes_read) const;
using BlobReadRequests = std::vector< using BlobReadRequest =
std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>>; std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>;
void MultiGetBlob( using BlobReadRequests = std::vector<BlobReadRequest>;
const ReadOptions& read_options, MultiGetRange& range, void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range,
const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs); std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs);
// Loads some stats information from files. Call without mutex held. It needs // Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set. // to be called before applying the version to the version set.

View File

@ -19,7 +19,10 @@ static bool ValidateUint32Range(const char* flagname, uint64_t value) {
return true; return true;
} }
DEFINE_uint64(seed, 2341234, "Seed for PRNG"); DEFINE_uint64(seed, 2341234,
"Seed for PRNG. When --nooverwritepercent is "
"nonzero and --expected_values_dir is nonempty, this value "
"must be fixed across invocations.");
static const bool FLAGS_seed_dummy __attribute__((__unused__)) = static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range); RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
@ -453,7 +456,8 @@ DEFINE_string(
"provided and non-empty, the DB state will be verified against these " "provided and non-empty, the DB state will be verified against these "
"values after recovery. --max_key and --column_family must be kept the " "values after recovery. --max_key and --column_family must be kept the "
"same across invocations of this program that use the same " "same across invocations of this program that use the same "
"--expected_values_path."); "--expected_values_path. See --seed and --nooverwritepercent for further "
"requirements.");
DEFINE_bool(verify_checksum, false, DEFINE_bool(verify_checksum, false,
"Verify checksum for every block read from storage"); "Verify checksum for every block read from storage");
@ -644,7 +648,8 @@ static const bool FLAGS_delrangepercent_dummy __attribute__((__unused__)) =
DEFINE_int32(nooverwritepercent, 60, DEFINE_int32(nooverwritepercent, 60,
"Ratio of keys without overwrite to total workload (expressed as " "Ratio of keys without overwrite to total workload (expressed as "
" a percentage)"); "a percentage). When --expected_values_dir is nonempty, must "
"keep this value constant across invocations.");
static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) = static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent); RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent);

View File

@ -296,8 +296,14 @@ IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
r.status = fs_r.status; r.status = fs_r.status;
if (r.status.ok()) { if (r.status.ok()) {
uint64_t offset = r.offset - fs_r.offset; uint64_t offset = r.offset - fs_r.offset;
size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset)); if (fs_r.result.size() <= offset) {
r.result = Slice(fs_r.scratch + offset, len); // No byte in the read range is returned.
r.result = Slice();
} else {
size_t len = std::min(
r.len, static_cast<size_t>(fs_r.result.size() - offset));
r.result = Slice(fs_r.scratch + offset, len);
}
} else { } else {
r.result = Slice(); r.result = Slice();
} }

View File

@ -266,11 +266,12 @@ class Env {
std::unique_ptr<WritableFile>* result, std::unique_ptr<WritableFile>* result,
const EnvOptions& options) = 0; const EnvOptions& options) = 0;
// Create an object that writes to a new file with the specified // Create an object that writes to a file with the specified name.
// name. Deletes any existing file with the same name and creates a // `WritableFile::Append()`s will append after any existing content. If the
// new file. On success, stores a pointer to the new file in // file does not already exist, creates it.
// *result and returns OK. On failure stores nullptr in *result and //
// returns non-OK. // On success, stores a pointer to the file in *result and returns OK. On
// failure stores nullptr in *result and returns non-OK.
// //
// The returned file will only be accessed by one thread at a time. // The returned file will only be accessed by one thread at a time.
virtual Status ReopenWritableFile(const std::string& /*fname*/, virtual Status ReopenWritableFile(const std::string& /*fname*/,

View File

@ -309,11 +309,12 @@ class FileSystem {
std::unique_ptr<FSWritableFile>* result, std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) = 0; IODebugContext* dbg) = 0;
// Create an object that writes to a new file with the specified // Create an object that writes to a file with the specified name.
// name. Deletes any existing file with the same name and creates a // `FSWritableFile::Append()`s will append after any existing content. If the
// new file. On success, stores a pointer to the new file in // file does not already exist, creates it.
// *result and returns OK. On failure stores nullptr in *result and //
// returns non-OK. // On success, stores a pointer to the file in *result and returns OK. On
// failure stores nullptr in *result and returns non-OK.
// //
// The returned file will only be accessed by one thread at a time. // The returned file will only be accessed by one thread at a time.
virtual IOStatus ReopenWritableFile( virtual IOStatus ReopenWritableFile(

View File

@ -1739,6 +1739,9 @@ struct CompactRangeOptions {
Slice* full_history_ts_low = nullptr; Slice* full_history_ts_low = nullptr;
// Allows cancellation of an in-progress manual compaction. // Allows cancellation of an in-progress manual compaction.
//
// Cancellation can be delayed waiting on automatic compactions when used
// together with `exclusive_manual_compaction == true`.
std::atomic<bool>* canceled = nullptr; std::atomic<bool>* canceled = nullptr;
}; };

View File

@ -11,6 +11,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -90,8 +91,18 @@ class RateLimiter {
const Env::IOPriority pri = Env::IO_TOTAL) const = 0; const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
// Total # of requests that are pending for bytes in rate limiter // Total # of requests that are pending for bytes in rate limiter
virtual int64_t GetTotalPendingRequests( // For convenience, this function is supported by the RateLimiter returned
const Env::IOPriority pri = Env::IO_TOTAL) const = 0; // by NewGenericRateLimiter but is not required by RocksDB.
//
// REQUIRED: total_pending_request != nullptr
virtual Status GetTotalPendingRequests(
int64_t* total_pending_requests,
const Env::IOPriority pri = Env::IO_TOTAL) const {
assert(total_pending_requests != nullptr);
(void)total_pending_requests;
(void)pri;
return Status::NotSupported();
}
virtual int64_t GetBytesPerSecond() const = 0; virtual int64_t GetBytesPerSecond() const = 0;

View File

@ -392,10 +392,10 @@ class BackupEngineReadOnlyBase {
virtual Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, virtual Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info,
bool include_file_details = false) const = 0; bool include_file_details = false) const = 0;
// Returns info about backups in backup_info // Returns info about non-corrupt backups in backup_infos.
// Setting include_file_details=true provides information about each // Setting include_file_details=true provides information about each
// backed-up file in BackupInfo::file_details and more. // backed-up file in BackupInfo::file_details and more.
virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info, virtual void GetBackupInfo(std::vector<BackupInfo>* backup_infos,
bool include_file_details = false) const = 0; bool include_file_details = false) const = 0;
// Returns info about corrupt backups in corrupt_backups. // Returns info about corrupt backups in corrupt_backups.
@ -475,13 +475,13 @@ class BackupEngineAppendOnlyBase {
// Captures the state of the database by creating a new (latest) backup. // Captures the state of the database by creating a new (latest) backup.
// On success (OK status), the BackupID of the new backup is saved to // On success (OK status), the BackupID of the new backup is saved to
// *new_backup_id when not nullptr. // *new_backup_id when not nullptr.
virtual Status CreateNewBackup(const CreateBackupOptions& options, DB* db, virtual IOStatus CreateNewBackup(const CreateBackupOptions& options, DB* db,
BackupID* new_backup_id = nullptr) { BackupID* new_backup_id = nullptr) {
return CreateNewBackupWithMetadata(options, db, "", new_backup_id); return CreateNewBackupWithMetadata(options, db, "", new_backup_id);
} }
// keep here for backward compatibility. // keep here for backward compatibility.
virtual Status CreateNewBackup( virtual IOStatus CreateNewBackup(
DB* db, bool flush_before_backup = false, DB* db, bool flush_before_backup = false,
std::function<void()> progress_callback = []() {}) { std::function<void()> progress_callback = []() {}) {
CreateBackupOptions options; CreateBackupOptions options;
@ -575,12 +575,12 @@ class BackupEngine : public BackupEngineReadOnlyBase,
// BackupEngineOptions have to be the same as the ones used in previous // BackupEngineOptions have to be the same as the ones used in previous
// BackupEngines for the same backup directory. // BackupEngines for the same backup directory.
static Status Open(const BackupEngineOptions& options, Env* db_env, static IOStatus Open(const BackupEngineOptions& options, Env* db_env,
BackupEngine** backup_engine_ptr); BackupEngine** backup_engine_ptr);
// keep for backward compatibility. // keep for backward compatibility.
static Status Open(Env* db_env, const BackupEngineOptions& options, static IOStatus Open(Env* db_env, const BackupEngineOptions& options,
BackupEngine** backup_engine_ptr) { BackupEngine** backup_engine_ptr) {
return BackupEngine::Open(options, db_env, backup_engine_ptr); return BackupEngine::Open(options, db_env, backup_engine_ptr);
} }
@ -601,11 +601,11 @@ class BackupEngineReadOnly : public BackupEngineReadOnlyBase {
public: public:
virtual ~BackupEngineReadOnly() {} virtual ~BackupEngineReadOnly() {}
static Status Open(const BackupEngineOptions& options, Env* db_env, static IOStatus Open(const BackupEngineOptions& options, Env* db_env,
BackupEngineReadOnly** backup_engine_ptr); BackupEngineReadOnly** backup_engine_ptr);
// keep for backward compatibility. // keep for backward compatibility.
static Status Open(Env* db_env, const BackupEngineOptions& options, static IOStatus Open(Env* db_env, const BackupEngineOptions& options,
BackupEngineReadOnly** backup_engine_ptr) { BackupEngineReadOnly** backup_engine_ptr) {
return BackupEngineReadOnly::Open(options, db_env, backup_engine_ptr); return BackupEngineReadOnly::Open(options, db_env, backup_engine_ptr);
} }
}; };

View File

@ -76,7 +76,8 @@ class EnvLibrados : public EnvWrapper {
// Store in *result the names of the children of the specified directory. // Store in *result the names of the children of the specified directory.
// The names are relative to "dir". // The names are relative to "dir".
// Original contents of *results are dropped. // Original contents of *results are dropped.
Status GetChildren(const std::string& dir, std::vector<std::string>* result); Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
// Delete the named file. // Delete the named file.
Status DeleteFile(const std::string& fname) override; Status DeleteFile(const std::string& fname) override;
@ -116,18 +117,16 @@ class EnvLibrados : public EnvWrapper {
// to go away. // to go away.
// //
// May create the named file if it does not already exist. // May create the named file if it does not already exist.
Status LockFile(const std::string& fname, FileLock** lock); Status LockFile(const std::string& fname, FileLock** lock) override;
// Release the lock acquired by a previous successful call to LockFile. // Release the lock acquired by a previous successful call to LockFile.
// REQUIRES: lock was returned by a successful LockFile() call // REQUIRES: lock was returned by a successful LockFile() call
// REQUIRES: lock has not already been unlocked. // REQUIRES: lock has not already been unlocked.
Status UnlockFile(FileLock* lock); Status UnlockFile(FileLock* lock) override;
// Get full directory name for this db. // Get full directory name for this db.
Status GetAbsolutePath(const std::string& db_path, std::string* output_path); Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override;
// Generate unique id
std::string GenerateUniqueId();
// Get default EnvLibrados // Get default EnvLibrados
static EnvLibrados* Default(); static EnvLibrados* Default();

View File

@ -11,7 +11,7 @@
#define ROCKSDB_MAJOR 6 #define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 25 #define ROCKSDB_MINOR 25
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 3
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -85,9 +85,7 @@ class WriteBufferManager {
buffer_size_.store(new_size, std::memory_order_relaxed); buffer_size_.store(new_size, std::memory_order_relaxed);
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
// Check if stall is active and can be ended. // Check if stall is active and can be ended.
if (allow_stall_) { MaybeEndWriteStall();
EndWriteStall();
}
} }
// Below functions should be called by RocksDB internally. // Below functions should be called by RocksDB internally.
@ -118,17 +116,12 @@ class WriteBufferManager {
// pass allow_stall = true during WriteBufferManager instance creation. // pass allow_stall = true during WriteBufferManager instance creation.
// //
// Should only be called by RocksDB internally . // Should only be called by RocksDB internally .
bool ShouldStall() { bool ShouldStall() const {
if (allow_stall_ && enabled()) { if (!allow_stall_ || !enabled()) {
if (IsStallActive()) { return false;
return true;
}
if (IsStallThresholdExceeded()) {
stall_active_.store(true, std::memory_order_relaxed);
return true;
}
} }
return false;
return IsStallActive() || IsStallThresholdExceeded();
} }
// Returns true if stall is active. // Returns true if stall is active.
@ -137,7 +130,9 @@ class WriteBufferManager {
} }
// Returns true if stalling condition is met. // Returns true if stalling condition is met.
bool IsStallThresholdExceeded() { return memory_usage() >= buffer_size_; } bool IsStallThresholdExceeded() const {
return memory_usage() >= buffer_size_;
}
void ReserveMem(size_t mem); void ReserveMem(size_t mem);
@ -151,8 +146,9 @@ class WriteBufferManager {
// Should only be called by RocksDB internally. // Should only be called by RocksDB internally.
void BeginWriteStall(StallInterface* wbm_stall); void BeginWriteStall(StallInterface* wbm_stall);
// Remove DB instances from queue and signal them to continue. // If stall conditions have resolved, remove DB instances from queue and
void EndWriteStall(); // signal them to continue.
void MaybeEndWriteStall();
void RemoveDBFromQueue(StallInterface* wbm_stall); void RemoveDBFromQueue(StallInterface* wbm_stall);
@ -167,9 +163,11 @@ class WriteBufferManager {
std::mutex cache_rev_mng_mu_; std::mutex cache_rev_mng_mu_;
std::list<StallInterface*> queue_; std::list<StallInterface*> queue_;
// Protects the queue_ // Protects the queue_ and stall_active_.
std::mutex mu_; std::mutex mu_;
bool allow_stall_; bool allow_stall_;
// Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
// while holding mu_, but it can be read without a lock.
std::atomic<bool> stall_active_; std::atomic<bool> stall_active_;
void ReserveMemWithCache(size_t mem); void ReserveMemWithCache(size_t mem);

View File

@ -39,7 +39,12 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
WriteBufferManager::~WriteBufferManager() = default; WriteBufferManager::~WriteBufferManager() {
#ifndef NDEBUG
std::unique_lock<std::mutex> lock(mu_);
assert(queue_.empty());
#endif
}
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
if (cache_rev_mng_ != nullptr) { if (cache_rev_mng_ != nullptr) {
@ -98,9 +103,7 @@ void WriteBufferManager::FreeMem(size_t mem) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed); memory_used_.fetch_sub(mem, std::memory_order_relaxed);
} }
// Check if stall is active and can be ended. // Check if stall is active and can be ended.
if (allow_stall_) { MaybeEndWriteStall();
EndWriteStall();
}
} }
void WriteBufferManager::FreeMemWithCache(size_t mem) { void WriteBufferManager::FreeMemWithCache(size_t mem) {
@ -127,47 +130,74 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) {
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr); assert(wbm_stall != nullptr);
if (wbm_stall) { assert(allow_stall_);
// Allocate outside of the lock.
std::list<StallInterface*> new_node = {wbm_stall};
{
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
queue_.push_back(wbm_stall); // Verify if the stall conditions are stil active.
if (ShouldStall()) {
stall_active_.store(true, std::memory_order_relaxed);
queue_.splice(queue_.end(), std::move(new_node));
}
} }
// In case thread enqueue itself and memory got freed in parallel, end the
// stall. // If the node was not consumed, the stall has ended already and we can signal
if (!ShouldStall()) { // the caller.
EndWriteStall(); if (!new_node.empty()) {
new_node.front()->Signal();
} }
} }
// Called when memory is freed in FreeMem. // Called when memory is freed in FreeMem or the buffer size has changed.
void WriteBufferManager::EndWriteStall() { void WriteBufferManager::MaybeEndWriteStall() {
if (enabled() && !IsStallThresholdExceeded()) { // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
{ // the writers.
std::unique_lock<std::mutex> lock(mu_); if (!allow_stall_) {
stall_active_.store(false, std::memory_order_relaxed); return;
if (queue_.empty()) {
return;
}
}
// Get the instances from the list and call WBMStallInterface::Signal to
// change the state to running and unblock the DB instances.
// Check ShouldStall() incase stall got active by other DBs.
while (!ShouldStall() && !queue_.empty()) {
std::unique_lock<std::mutex> lock(mu_);
StallInterface* wbm_stall = queue_.front();
queue_.pop_front();
wbm_stall->Signal();
}
} }
if (IsStallThresholdExceeded()) {
return; // Stall conditions have not resolved.
}
// Perform all deallocations outside of the lock.
std::list<StallInterface*> cleanup;
std::unique_lock<std::mutex> lock(mu_);
if (!stall_active_.load(std::memory_order_relaxed)) {
return; // Nothing to do.
}
// Unblock new writers.
stall_active_.store(false, std::memory_order_relaxed);
// Unblock the writers in the queue.
for (StallInterface* wbm_stall : queue_) {
wbm_stall->Signal();
}
cleanup = std::move(queue_);
} }
void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) { void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr); assert(wbm_stall != nullptr);
// Deallocate the removed nodes outside of the lock.
std::list<StallInterface*> cleanup;
if (enabled() && allow_stall_) { if (enabled() && allow_stall_) {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
queue_.remove(wbm_stall); for (auto it = queue_.begin(); it != queue_.end();) {
wbm_stall->Signal(); auto next = std::next(it);
if (*it == wbm_stall) {
cleanup.splice(cleanup.end(), queue_, std::move(it));
}
it = next;
}
} }
wbm_stall->Signal();
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -78,6 +78,9 @@ default_params = {
"max_key": 100000000, "max_key": 100000000,
"max_write_buffer_number": 3, "max_write_buffer_number": 3,
"mmap_read": lambda: random.randint(0, 1), "mmap_read": lambda: random.randint(0, 1),
# Setting `nooverwritepercent > 0` is only possible because we do not vary
# the random seed, so the same keys are chosen by every run for disallowing
# overwrites.
"nooverwritepercent": 1, "nooverwritepercent": 1,
"open_files": lambda : random.choice([-1, -1, 100, 500000]), "open_files": lambda : random.choice([-1, -1, 100, 500000]),
"optimize_filters_for_memory": lambda: random.randint(0, 1), "optimize_filters_for_memory": lambda: random.randint(0, 1),

View File

@ -17,6 +17,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/status.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
@ -72,17 +73,21 @@ class GenericRateLimiter : public RateLimiter {
return total_requests_[pri]; return total_requests_[pri];
} }
virtual int64_t GetTotalPendingRequests( virtual Status GetTotalPendingRequests(
int64_t* total_pending_requests,
const Env::IOPriority pri = Env::IO_TOTAL) const override { const Env::IOPriority pri = Env::IO_TOTAL) const override {
assert(total_pending_requests != nullptr);
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
if (pri == Env::IO_TOTAL) { if (pri == Env::IO_TOTAL) {
int64_t total_pending_requests_sum = 0; int64_t total_pending_requests_sum = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_pending_requests_sum += static_cast<int64_t>(queue_[i].size()); total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
} }
return total_pending_requests_sum; *total_pending_requests = total_pending_requests_sum;
} else {
*total_pending_requests = static_cast<int64_t>(queue_[pri].size());
} }
return static_cast<int64_t>(queue_[pri].size()); return Status::OK();
} }
virtual int64_t GetBytesPerSecond() const override { virtual int64_t GetBytesPerSecond() const override {

View File

@ -100,9 +100,11 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter( std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
10 /* fairness */)); 10 /* fairness */));
int64_t total_pending_requests = 0;
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)), ASSERT_OK(limiter->GetTotalPendingRequests(
0); &total_pending_requests, static_cast<Env::IOPriority>(i)));
ASSERT_EQ(total_pending_requests, 0);
} }
// This is a variable for making sure the following callback is called // This is a variable for making sure the following callback is called
// and the assertions in it are indeed excuted // and the assertions in it are indeed excuted
@ -113,11 +115,23 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
// We temporarily unlock the mutex so that the following // We temporarily unlock the mutex so that the following
// GetTotalPendingRequests() can acquire it // GetTotalPendingRequests() can acquire it
request_mutex->Unlock(); request_mutex->Unlock();
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1); for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); EXPECT_OK(limiter->GetTotalPendingRequests(
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); &total_pending_requests, static_cast<Env::IOPriority>(i)))
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); << "Failed to return total pending requests for priority level = "
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1); << static_cast<Env::IOPriority>(i);
if (i == Env::IO_USER || i == Env::IO_TOTAL) {
EXPECT_EQ(total_pending_requests, 1)
<< "Failed to correctly return total pending requests for "
"priority level = "
<< static_cast<Env::IOPriority>(i);
} else {
EXPECT_EQ(total_pending_requests, 0)
<< "Failed to correctly return total pending requests for "
"priority level = "
<< static_cast<Env::IOPriority>(i);
}
}
// We lock the mutex again so that the request thread can resume running // We lock the mutex again so that the request thread can resume running
// with the mutex locked // with the mutex locked
request_mutex->Lock(); request_mutex->Lock();
@ -128,11 +142,16 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
limiter->Request(200, Env::IO_USER, nullptr /* stats */, limiter->Request(200, Env::IO_USER, nullptr /* stats */,
RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
ASSERT_EQ(nonzero_pending_requests_verified, true); ASSERT_EQ(nonzero_pending_requests_verified, true);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0); for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); static_cast<Env::IOPriority>(i)))
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); << "Failed to return total pending requests for priority level = "
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0); << static_cast<Env::IOPriority>(i);
EXPECT_EQ(total_pending_requests, 0)
<< "Failed to correctly return total pending requests for priority "
"level = "
<< static_cast<Env::IOPriority>(i);
}
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearCallBack( SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::Request:PostEnqueueRequest"); "GenericRateLimiter::Request:PostEnqueueRequest");

View File

@ -895,7 +895,7 @@ class BackupEngineImplThreadSafe : public BackupEngine,
} }
// Not public API but needed // Not public API but needed
Status Initialize() { IOStatus Initialize() {
// No locking needed // No locking needed
return impl_.Initialize(); return impl_.Initialize();
} }
@ -912,8 +912,8 @@ class BackupEngineImplThreadSafe : public BackupEngine,
BackupEngineImpl impl_; BackupEngineImpl impl_;
}; };
Status BackupEngine::Open(const BackupEngineOptions& options, Env* env, IOStatus BackupEngine::Open(const BackupEngineOptions& options, Env* env,
BackupEngine** backup_engine_ptr) { BackupEngine** backup_engine_ptr) {
std::unique_ptr<BackupEngineImplThreadSafe> backup_engine( std::unique_ptr<BackupEngineImplThreadSafe> backup_engine(
new BackupEngineImplThreadSafe(options, env)); new BackupEngineImplThreadSafe(options, env));
auto s = backup_engine->Initialize(); auto s = backup_engine->Initialize();
@ -922,7 +922,7 @@ Status BackupEngine::Open(const BackupEngineOptions& options, Env* env,
return s; return s;
} }
*backup_engine_ptr = backup_engine.release(); *backup_engine_ptr = backup_engine.release();
return Status::OK(); return IOStatus::OK();
} }
BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options, BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
@ -2986,10 +2986,11 @@ IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
return io_s; return io_s;
} }
Status BackupEngineReadOnly::Open(const BackupEngineOptions& options, Env* env, IOStatus BackupEngineReadOnly::Open(const BackupEngineOptions& options,
BackupEngineReadOnly** backup_engine_ptr) { Env* env,
BackupEngineReadOnly** backup_engine_ptr) {
if (options.destroy_old_data) { if (options.destroy_old_data) {
return Status::InvalidArgument( return IOStatus::InvalidArgument(
"Can't destroy old data with ReadOnly BackupEngine"); "Can't destroy old data with ReadOnly BackupEngine");
} }
std::unique_ptr<BackupEngineImplThreadSafe> backup_engine( std::unique_ptr<BackupEngineImplThreadSafe> backup_engine(
@ -3000,7 +3001,7 @@ Status BackupEngineReadOnly::Open(const BackupEngineOptions& options, Env* env,
return s; return s;
} }
*backup_engine_ptr = backup_engine.release(); *backup_engine_ptr = backup_engine.release();
return Status::OK(); return IOStatus::OK();
} }
void TEST_EnableWriteFutureSchemaVersion2( void TEST_EnableWriteFutureSchemaVersion2(

View File

@ -172,7 +172,7 @@ public:
* *
* @return [description] * @return [description]
*/ */
Status InvalidateCache(size_t offset, size_t length) { Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::OK(); return Status::OK();
} }
}; };
@ -237,8 +237,7 @@ public:
}; };
//enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
void Hint(AccessPattern pattern) { void Hint(AccessPattern /*pattern*/) { /* Do nothing */
/* Do nothing */
} }
/** /**
@ -250,7 +249,7 @@ public:
* *
* @return [description] * @return [description]
*/ */
Status InvalidateCache(size_t offset, size_t length) { Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::OK(); return Status::OK();
} }
}; };
@ -315,6 +314,7 @@ class LibradosWritableFile : public WritableFile {
Sync(); Sync();
} }
using WritableFile::Append;
/** /**
* @brief append data to file * @brief append data to file
* @details * @details
@ -324,7 +324,7 @@ class LibradosWritableFile : public WritableFile {
* @param data [description] * @param data [description]
* @return [description] * @return [description]
*/ */
Status Append(const Slice& data) { Status Append(const Slice& data) override {
// append buffer // append buffer
LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data()); LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data());
int r = 0; int r = 0;
@ -341,14 +341,14 @@ class LibradosWritableFile : public WritableFile {
return err_to_status(r); return err_to_status(r);
} }
using WritableFile::PositionedAppend;
/** /**
* @brief not supported * @brief not supported
* @details [long description] * @details [long description]
* @return [description] * @return [description]
*/ */
Status PositionedAppend( Status PositionedAppend(const Slice& /* data */,
const Slice& /* data */, uint64_t /* offset */) override {
uint64_t /* offset */) {
return Status::NotSupported(); return Status::NotSupported();
} }
@ -359,7 +359,7 @@ class LibradosWritableFile : public WritableFile {
* @param size [description] * @param size [description]
* @return [description] * @return [description]
*/ */
Status Truncate(uint64_t size) { Status Truncate(uint64_t size) override {
LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size); LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size);
int r = 0; int r = 0;
@ -391,7 +391,7 @@ class LibradosWritableFile : public WritableFile {
* @details [long description] * @details [long description]
* @return [description] * @return [description]
*/ */
Status Close() { Status Close() override {
LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size); LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size);
return Sync(); return Sync();
} }
@ -402,7 +402,7 @@ class LibradosWritableFile : public WritableFile {
* *
* @return [description] * @return [description]
*/ */
Status Flush() { Status Flush() override {
librados::AioCompletion *write_completion = librados::Rados::aio_create_completion(); librados::AioCompletion *write_completion = librados::Rados::aio_create_completion();
int r = 0; int r = 0;
@ -425,7 +425,7 @@ class LibradosWritableFile : public WritableFile {
* @details initiate an aio write and wait for result * @details initiate an aio write and wait for result
* @return [description] * @return [description]
*/ */
Status Sync() { // sync data Status Sync() override { // sync data
int r = 0; int r = 0;
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
@ -441,18 +441,14 @@ class LibradosWritableFile : public WritableFile {
* @details [long description] * @details [long description]
* @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush(). * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
*/ */
bool IsSyncThreadSafe() const { bool IsSyncThreadSafe() const override { return true; }
return true;
}
/** /**
* @brief Indicates the upper layers if the current WritableFile implementation uses direct IO. * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
* @details [long description] * @details [long description]
* @return [description] * @return [description]
*/ */
bool use_direct_io() const { bool use_direct_io() const override { return false; }
return false;
}
/** /**
* @brief Get file size * @brief Get file size
@ -460,7 +456,7 @@ class LibradosWritableFile : public WritableFile {
* This API will use cached file_size. * This API will use cached file_size.
* @return [description] * @return [description]
*/ */
uint64_t GetFileSize() { uint64_t GetFileSize() override {
LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size); LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size);
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
@ -478,7 +474,7 @@ class LibradosWritableFile : public WritableFile {
* *
* @return [description] * @return [description]
*/ */
size_t GetUniqueId(char* id, size_t max_size) const { size_t GetUniqueId(char* id, size_t max_size) const override {
// All fid has the same db_id prefix, so we need to ignore db_id prefix // All fid has the same db_id prefix, so we need to ignore db_id prefix
size_t s = std::min(max_size, _fid.size()); size_t s = std::min(max_size, _fid.size());
strncpy(id, _fid.c_str() + (_fid.size() - s), s); strncpy(id, _fid.c_str() + (_fid.size() - s), s);
@ -495,11 +491,10 @@ class LibradosWritableFile : public WritableFile {
* *
* @return [description] * @return [description]
*/ */
Status InvalidateCache(size_t offset, size_t length) { Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
return Status::OK(); return Status::OK();
} }
using WritableFile::RangeSync;
/** /**
* @brief No RangeSync support, just call Sync() * @brief No RangeSync support, just call Sync()
* @details [long description] * @details [long description]
@ -509,12 +504,11 @@ class LibradosWritableFile : public WritableFile {
* *
* @return [description] * @return [description]
*/ */
Status RangeSync(off_t offset, off_t nbytes) { Status RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/) override {
return Sync(); return Sync();
} }
protected: protected:
using WritableFile::Allocate;
/** /**
* @brief noop * @brief noop
* @details [long description] * @details [long description]
@ -524,7 +518,7 @@ protected:
* *
* @return [description] * @return [description]
*/ */
Status Allocate(off_t offset, off_t len) { Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
return Status::OK(); return Status::OK();
} }
}; };
@ -533,16 +527,14 @@ protected:
// Directory object represents collection of files and implements // Directory object represents collection of files and implements
// filesystem operations that can be executed on directories. // filesystem operations that can be executed on directories.
class LibradosDirectory : public Directory { class LibradosDirectory : public Directory {
librados::IoCtx * _io_ctx;
std::string _fid; std::string _fid;
public:
explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid): public:
_io_ctx(io_ctx), _fid(fid) {} explicit LibradosDirectory(librados::IoCtx* /*io_ctx*/, std::string fid)
: _fid(fid) {}
// Fsync directory. Can be called concurrently from multiple threads. // Fsync directory. Can be called concurrently from multiple threads.
Status Fsync() { Status Fsync() { return Status::OK(); }
return Status::OK();
}
}; };
// Identifies a locked file. // Identifies a locked file.
@ -552,8 +544,8 @@ class LibradosFileLock : public FileLock {
const std::string _obj_name; const std::string _obj_name;
const std::string _lock_name; const std::string _lock_name;
const std::string _cookie; const std::string _cookie;
int lock_state;
public: public:
LibradosFileLock( LibradosFileLock(
librados::IoCtx * io_ctx, librados::IoCtx * io_ctx,
const std::string obj_name): const std::string obj_name):
@ -870,11 +862,9 @@ librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) {
* @param options [description] * @param options [description]
* @return [description] * @return [description]
*/ */
Status EnvLibrados::NewSequentialFile( Status EnvLibrados::NewSequentialFile(const std::string& fname,
const std::string& fname, std::unique_ptr<SequentialFile>* result,
std::unique_ptr<SequentialFile>* result, const EnvOptions& /*options*/) {
const EnvOptions& options)
{
LOG_DEBUG("[IN]%s\n", fname.c_str()); LOG_DEBUG("[IN]%s\n", fname.c_str());
std::string dir, file, fid; std::string dir, file, fid;
split(fname, &dir, &file); split(fname, &dir, &file);
@ -914,10 +904,8 @@ Status EnvLibrados::NewSequentialFile(
* @return [description] * @return [description]
*/ */
Status EnvLibrados::NewRandomAccessFile( Status EnvLibrados::NewRandomAccessFile(
const std::string& fname, const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
std::unique_ptr<RandomAccessFile>* result, const EnvOptions& /*options*/) {
const EnvOptions& options)
{
LOG_DEBUG("[IN]%s\n", fname.c_str()); LOG_DEBUG("[IN]%s\n", fname.c_str());
std::string dir, file, fid; std::string dir, file, fid;
split(fname, &dir, &file); split(fname, &dir, &file);
@ -1374,6 +1362,8 @@ Status EnvLibrados::LinkFile(
const std::string& src, const std::string& src,
const std::string& target_in) const std::string& target_in)
{ {
(void)src;
(void)target_in;
LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str()); LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str());
return Status::NotSupported(); return Status::NotSupported();
} }
@ -1455,10 +1445,9 @@ Status EnvLibrados::UnlockFile(FileLock* lock)
* *
* @return [description] * @return [description]
*/ */
Status EnvLibrados::GetAbsolutePath( Status EnvLibrados::GetAbsolutePath(const std::string& db_path,
const std::string& db_path, std::string* /*output_path*/) {
std::string* output_path) (void)db_path;
{
LOG_DEBUG("[IO]%s\n", db_path.c_str()); LOG_DEBUG("[IO]%s\n", db_path.c_str());
return Status::NotSupported(); return Status::NotSupported();
} }

View File

@ -1133,6 +1133,11 @@ TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
if (getenv("CIRCLECI")) {
fprintf(stderr,
"TODO: get env_librados_test working in CI. Skipping for now.\n");
return 0;
}
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
@ -1140,7 +1145,7 @@ int main(int argc, char** argv) {
#include <stdio.h> #include <stdio.h>
int main(int argc, char** argv) { int main(int argc, char** argv) {
fprintf(stderr, "SKIPPED as EnvMirror is not supported in ROCKSDB_LITE\n"); fprintf(stderr, "SKIPPED as EnvLibrados is not supported in ROCKSDB_LITE\n");
return 0; return 0;
} }

View File

@ -288,7 +288,7 @@ Status FaultInjectionTestEnv::NewWritableFile(
// again then it will be truncated - so forget our saved state. // again then it will be truncated - so forget our saved state.
UntrackFile(fname); UntrackFile(fname);
MutexLock l(&mutex_); MutexLock l(&mutex_);
open_files_.insert(fname); open_managed_files_.insert(fname);
auto dir_and_name = GetDirAndName(fname); auto dir_and_name = GetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second); list.insert(dir_and_name.second);
@ -302,17 +302,49 @@ Status FaultInjectionTestEnv::ReopenWritableFile(
if (!IsFilesystemActive()) { if (!IsFilesystemActive()) {
return GetError(); return GetError();
} }
Status s = target()->ReopenWritableFile(fname, result, soptions);
bool exists;
Status s, exists_s = target()->FileExists(fname);
if (exists_s.IsNotFound()) {
exists = false;
} else if (exists_s.ok()) {
exists = true;
} else {
s = exists_s;
exists = false;
}
if (s.ok()) { if (s.ok()) {
result->reset(new TestWritableFile(fname, std::move(*result), this)); s = target()->ReopenWritableFile(fname, result, soptions);
// WritableFileWriter* file is opened }
// again then it will be truncated - so forget our saved state.
UntrackFile(fname); // Only track files we created. Files created outside of this
MutexLock l(&mutex_); // `FaultInjectionTestEnv` are not eligible for tracking/data dropping
open_files_.insert(fname); // (for example, they may contain data a previous db_stress run expects to
auto dir_and_name = GetDirAndName(fname); // be recovered). This could be extended to track/drop data appended once
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; // the file is under `FaultInjectionTestEnv`'s control.
list.insert(dir_and_name.second); if (s.ok()) {
bool should_track;
{
MutexLock l(&mutex_);
if (db_file_state_.find(fname) != db_file_state_.end()) {
// It was written by this `Env` earlier.
assert(exists);
should_track = true;
} else if (!exists) {
// It was created by this `Env` just now.
should_track = true;
open_managed_files_.insert(fname);
auto dir_and_name = GetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
} else {
should_track = false;
}
}
if (should_track) {
result->reset(new TestWritableFile(fname, std::move(*result), this));
}
} }
return s; return s;
} }
@ -330,7 +362,7 @@ Status FaultInjectionTestEnv::NewRandomRWFile(
// again then it will be truncated - so forget our saved state. // again then it will be truncated - so forget our saved state.
UntrackFile(fname); UntrackFile(fname);
MutexLock l(&mutex_); MutexLock l(&mutex_);
open_files_.insert(fname); open_managed_files_.insert(fname);
auto dir_and_name = GetDirAndName(fname); auto dir_and_name = GetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second); list.insert(dir_and_name.second);
@ -394,17 +426,43 @@ Status FaultInjectionTestEnv::RenameFile(const std::string& s,
return ret; return ret;
} }
Status FaultInjectionTestEnv::LinkFile(const std::string& s,
const std::string& t) {
if (!IsFilesystemActive()) {
return GetError();
}
Status ret = EnvWrapper::LinkFile(s, t);
if (ret.ok()) {
MutexLock l(&mutex_);
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
}
auto sdn = GetDirAndName(s);
auto tdn = GetDirAndName(t);
if (dir_to_new_files_since_last_sync_[sdn.first].find(sdn.second) !=
dir_to_new_files_since_last_sync_[sdn.first].end()) {
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
assert(tlist.find(tdn.second) == tlist.end());
tlist.insert(tdn.second);
}
}
return ret;
}
void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) { void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
db_file_state_[state.filename_] = state; db_file_state_[state.filename_] = state;
open_files_.erase(state.filename_); open_managed_files_.erase(state.filename_);
} }
} }
void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) { void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) { if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state)); db_file_state_.insert(std::make_pair(state.filename_, state));
} else { } else {
@ -415,7 +473,7 @@ void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) {
void FaultInjectionTestEnv::WritableFileAppended(const FileState& state) { void FaultInjectionTestEnv::WritableFileAppended(const FileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) { if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state)); db_file_state_.insert(std::make_pair(state.filename_, state));
} else { } else {
@ -485,6 +543,6 @@ void FaultInjectionTestEnv::UntrackFile(const std::string& f) {
dir_to_new_files_since_last_sync_[dir_and_name.first].erase( dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
dir_and_name.second); dir_and_name.second);
db_file_state_.erase(f); db_file_state_.erase(f);
open_files_.erase(f); open_managed_files_.erase(f);
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -175,6 +175,8 @@ class FaultInjectionTestEnv : public EnvWrapper {
virtual Status RenameFile(const std::string& s, virtual Status RenameFile(const std::string& s,
const std::string& t) override; const std::string& t) override;
virtual Status LinkFile(const std::string& s, const std::string& t) override;
// Undef to eliminate clash on Windows // Undef to eliminate clash on Windows
#undef GetFreeSpace #undef GetFreeSpace
virtual Status GetFreeSpace(const std::string& path, virtual Status GetFreeSpace(const std::string& path,
@ -237,13 +239,13 @@ class FaultInjectionTestEnv : public EnvWrapper {
SetFilesystemActiveNoLock(active, error); SetFilesystemActiveNoLock(active, error);
error.PermitUncheckedError(); error.PermitUncheckedError();
} }
void AssertNoOpenFile() { assert(open_files_.empty()); } void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
Status GetError() { return error_; } Status GetError() { return error_; }
private: private:
port::Mutex mutex_; port::Mutex mutex_;
std::map<std::string, FileState> db_file_state_; std::map<std::string, FileState> db_file_state_;
std::set<std::string> open_files_; std::set<std::string> open_managed_files_;
std::unordered_map<std::string, std::set<std::string>> std::unordered_map<std::string, std::set<std::string>>
dir_to_new_files_since_last_sync_; dir_to_new_files_since_last_sync_;
bool filesystem_active_; // Record flushes, syncs, writes bool filesystem_active_; // Record flushes, syncs, writes

View File

@ -443,7 +443,7 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
UntrackFile(fname); UntrackFile(fname);
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
open_files_.insert(fname); open_managed_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname); auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
// The new file could overwrite an old one. Here we simplify // The new file could overwrite an old one. Here we simplify
@ -476,19 +476,50 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
return in_s; return in_s;
} }
} }
IOStatus io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
bool exists;
IOStatus io_s,
exists_s = target()->FileExists(fname, IOOptions(), nullptr /* dbg */);
if (exists_s.IsNotFound()) {
exists = false;
} else if (exists_s.ok()) {
exists = true;
} else {
io_s = exists_s;
exists = false;
}
if (io_s.ok()) { if (io_s.ok()) {
result->reset( io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
new TestFSWritableFile(fname, file_opts, std::move(*result), this)); }
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state. // Only track files we created. Files created outside of this
UntrackFile(fname); // `FaultInjectionTestFS` are not eligible for tracking/data dropping
// (for example, they may contain data a previous db_stress run expects to
// be recovered). This could be extended to track/drop data appended once
// the file is under `FaultInjectionTestFS`'s control.
if (io_s.ok()) {
bool should_track;
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
open_files_.insert(fname); if (db_file_state_.find(fname) != db_file_state_.end()) {
auto dir_and_name = TestFSGetDirAndName(fname); // It was written by this `FileSystem` earlier.
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; assert(exists);
list[dir_and_name.second] = kNewFileNoOverwrite; should_track = true;
} else if (!exists) {
// It was created by this `FileSystem` just now.
should_track = true;
open_managed_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list[dir_and_name.second] = kNewFileNoOverwrite;
} else {
should_track = false;
}
}
if (should_track) {
result->reset(
new TestFSWritableFile(fname, file_opts, std::move(*result), this));
} }
{ {
IOStatus in_s = InjectMetadataWriteError(); IOStatus in_s = InjectMetadataWriteError();
@ -523,7 +554,7 @@ IOStatus FaultInjectionTestFS::NewRandomRWFile(
UntrackFile(fname); UntrackFile(fname);
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
open_files_.insert(fname); open_managed_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname); auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
// It could be overwriting an old file, but we simplify the // It could be overwriting an old file, but we simplify the
@ -655,17 +686,62 @@ IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
return io_s; return io_s;
} }
IOStatus FaultInjectionTestFS::LinkFile(const std::string& s,
const std::string& t,
const IOOptions& options,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
{
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
// Using the value in `dir_to_new_files_since_last_sync_` for the source file
// may be a more reasonable choice.
std::string previous_contents = kNewFileNoOverwrite;
IOStatus io_s = FileSystemWrapper::LinkFile(s, t, options, dbg);
if (io_s.ok()) {
{
MutexLock l(&mutex_);
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
}
auto sdn = TestFSGetDirAndName(s);
auto tdn = TestFSGetDirAndName(t);
if (dir_to_new_files_since_last_sync_[sdn.first].find(sdn.second) !=
dir_to_new_files_since_last_sync_[sdn.first].end()) {
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
assert(tlist.find(tdn.second) == tlist.end());
tlist[tdn.second] = previous_contents;
}
}
IOStatus in_s = InjectMetadataWriteError();
if (!in_s.ok()) {
return in_s;
}
}
return io_s;
}
void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) { void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
db_file_state_[state.filename_] = state; db_file_state_[state.filename_] = state;
open_files_.erase(state.filename_); open_managed_files_.erase(state.filename_);
} }
} }
void FaultInjectionTestFS::WritableFileSynced(const FSFileState& state) { void FaultInjectionTestFS::WritableFileSynced(const FSFileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) { if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state)); db_file_state_.insert(std::make_pair(state.filename_, state));
} else { } else {
@ -676,7 +752,7 @@ void FaultInjectionTestFS::WritableFileSynced(const FSFileState& state) {
void FaultInjectionTestFS::WritableFileAppended(const FSFileState& state) { void FaultInjectionTestFS::WritableFileAppended(const FSFileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) { if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state)); db_file_state_.insert(std::make_pair(state.filename_, state));
} else { } else {
@ -755,7 +831,7 @@ void FaultInjectionTestFS::UntrackFile(const std::string& f) {
dir_to_new_files_since_last_sync_[dir_and_name.first].erase( dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
dir_and_name.second); dir_and_name.second);
db_file_state_.erase(f); db_file_state_.erase(f);
open_files_.erase(f); open_managed_files_.erase(f);
} }
IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError( IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(

View File

@ -236,6 +236,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
const IOOptions& options, const IOOptions& options,
IODebugContext* dbg) override; IODebugContext* dbg) override;
virtual IOStatus LinkFile(const std::string& src, const std::string& target,
const IOOptions& options,
IODebugContext* dbg) override;
// Undef to eliminate clash on Windows // Undef to eliminate clash on Windows
#undef GetFreeSpace #undef GetFreeSpace
virtual IOStatus GetFreeSpace(const std::string& path, virtual IOStatus GetFreeSpace(const std::string& path,
@ -321,7 +325,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
MutexLock l(&mutex_); MutexLock l(&mutex_);
filesystem_writable_ = writable; filesystem_writable_ = writable;
} }
void AssertNoOpenFile() { assert(open_files_.empty()); } void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
IOStatus GetError() { return error_; } IOStatus GetError() { return error_; }
@ -500,7 +504,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
private: private:
port::Mutex mutex_; port::Mutex mutex_;
std::map<std::string, FSFileState> db_file_state_; std::map<std::string, FSFileState> db_file_state_;
std::set<std::string> open_files_; std::set<std::string> open_managed_files_;
// directory -> (file name -> file contents to recover) // directory -> (file name -> file contents to recover)
// When data is recovered from unsyned parent directory, the files with // When data is recovered from unsyned parent directory, the files with
// empty file contents to recover is deleted. Those with non-empty ones // empty file contents to recover is deleted. Those with non-empty ones