Compare commits

...

29 Commits

Author SHA1 Message Date
Vijay Nadimpalli
0113c61ce9 Making platform 007 (gcc 7) default in build_detect_platform.sh (#5947)
Summary:
Making platform 007 (gcc 7) default in build_detect_platform.sh.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5947

Differential Revision: D18038837

Pulled By: vjnadimpalli

fbshipit-source-id: 9ac2ddaa93bf328a416faec028970e039886378e
2019-10-30 09:58:04 -07:00
Andrew Kryczka
405017bb8c Add latest toolchain (gcc-8, etc.) build support for fbcode users (#4923)
Summary:
- When building with internal dependencies, specify this toolchain by setting `ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007=1`
- It is not enabled by default. However, it is enabled for TSAN builds in CI since there is a known problem with TSAN in gcc-5: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=71090
- I did not add support for Lua since (1) we agreed to deprecate it, and (2) we only have an internal build for v5.3 with this toolchain while that has breaking changes compared to our current version (v5.2).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4923

Differential Revision: D13827226

Pulled By: ajkr

fbshipit-source-id: 9aa3388ed3679777cfb15ef8cbcb83c07f62f947
2019-10-30 09:57:53 -07:00
Fosco Marotto
cfdea78fde Update history and version for 5.16.6 release. 2018-11-12 11:55:03 -08:00
Siying Dong
fef38d9477 Fix WriteBatchWithIndex's SeekForPrev() (#4559)
Summary:
WriteBatchWithIndex's SeekForPrev() has a bug that we internally place the position just before the seek key rather than after. This makes the iterator to miss the result that is the same as the seek key. Fix it by position the iterator equal or smaller.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4559

Differential Revision: D10468534

Pulled By: siying

fbshipit-source-id: 2fb371ae809c561b60a1c11cef71e1c66fea1f19
2018-10-19 15:44:58 -07:00
Andrew Kryczka
4d461733e4 update HISTORY.md and version number 2018-10-16 10:41:36 -07:00
anand1976
1e7cca20fb Properly determine a truncated CompactRange stop key (#4496)
Summary:
When a CompactRange() call for a level is truncated before the end key
is reached, because it exceeds max_compaction_bytes, we need to properly
set the compaction_end parameter to indicate the stop key. The next
CompactRange will use that as the begin key. We set it to the smallest
key of the next file in the level after expanding inputs to get a clean
cut.

Previously, we were setting it before expanding inputs. So we could end
up recompacting some files. In a pathological case, where a single key
has many entries spanning all the files in the level (possibly due to
merge operands without a partial merge operator, thus resulting in
compaction output identical to the input), this would result in
an endless loop over the same set of files.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4496

Differential Revision: D10395026

Pulled By: anand1976

fbshipit-source-id: f0c2f89fee29b4b3be53b6467b53abba8e9146a9
2018-10-16 10:40:21 -07:00
Andrew Kryczka
ab21dc6323 Avoid per-key linear scan over snapshots in compaction (#4495)
Summary:
`CompactionIterator::snapshots_` is ordered by ascending seqnum, just like `DBImpl`'s linked list of snapshots from which it was copied. This PR exploits this ordering to make `findEarliestVisibleSnapshot` do binary search rather than linear scan. This can make flush/compaction significantly faster when many snapshots exist since that function is called on every single key.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4495

Differential Revision: D10386470

Pulled By: ajkr

fbshipit-source-id: 29734991631227b6b7b677e156ac567690118a8b
2018-10-16 10:33:02 -07:00
Anand Ananthabhotla
fa37825fce Update version to 5.16.4
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
2018-10-10 14:10:08 -07:00
Anand Ananthabhotla
4f6da54e8d Handle mixed slowdown/no_slowdown writer properly (#4475)
Summary:
There is a bug when the write queue leader is blocked on a write
delay/stop, and the queue has writers with WriteOptions::no_slowdown set
to true. They are not woken up until the write stall is cleared.

The fix introduces a dummy writer inserted at the tail to indicate a
write stall and prevent further inserts into the queue, and a condition
variable that writers who can tolerate slowdown wait on before adding
themselves to the queue. The leader calls WriteThread::BeginWriteStall()
to add the dummy writer and then walk the queue to fail any writers with
no_slowdown set. Once the stall clears, the leader calls
WriteThread::EndWriteStall() to remove the dummy writer and signal the
condition variable.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4475

Differential Revision: D10285827

Pulled By: anand1976

fbshipit-source-id: 747465e5e7f07a829b1fb0bc1afcd7b93f4ab1a9
2018-10-10 12:59:34 -07:00
Andrew Kryczka
98f11d01bd Fix CompactFiles support for kDisableCompressionOption (#4438)
Summary:
Previously `CompactFiles` with `CompressionType::kDisableCompressionOption` caused program to crash on assertion failure. This PR fixes the crash by adding support for that setting. Now, that setting will cause RocksDB to choose compression according to the column family's options.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4438

Differential Revision: D10115761

Pulled By: ajkr

fbshipit-source-id: a553c6fa76fa5b6f73b0d165d95640da6f454122
2018-10-01 17:05:59 -07:00
Yi Wu
7b378c0de2 Bump version to 5.16.2 2018-09-21 09:13:34 -07:00
Yi Wu
e90493a865 BlobDB: handle IO error on read (#4410)
Summary:
Fix IO error on read not being handle and crashing the DB. With the fix we properly return the error.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4410

Differential Revision: D9979246

Pulled By: yiwu-arbug

fbshipit-source-id: 111a85675067a29c03cb60e9a34103f4ff636694
2018-09-21 09:11:16 -07:00
Maysam Yabandeh
e36e75d7c8 Fix bug in partition filters with format_version=4 (#4381)
Summary:
Value delta encoding in format_version 4 requires the differences between the size of two consecutive handles to be sent to BlockBuilder::Add. This applies not only to indexes on blocks but also the indexes on indexes and filters in partitioned indexes and filters respectively. The patch fixes a bug where the partitioned filters would encode the entire size of the handle rather than the difference of the size with the last size.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4381

Differential Revision: D9879505

Pulled By: maysamyabandeh

fbshipit-source-id: 27a22e49b482b927fbd5629dc310c46d63d4b6d1
2018-09-21 09:11:04 -07:00
Sagar Vemuri
895a4ad185 Bump version and update history 2018-09-17 12:20:30 -07:00
Sagar Vemuri
377d4d2c31 Fix sync-point comment in Block destructor (#4380)
Summary:
This is a follow up to #4370. The earlier comment is not correct.

Thanks to ajkr for pointing this out.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4380

Differential Revision: D9874667

Pulled By: sagar0

fbshipit-source-id: f4e092d86b29c715258210b770643d367e38caae
2018-09-17 12:02:56 -07:00
Sagar Vemuri
5271d0750a Remove sync point from Block destructor (#4370)
Summary:
AddressSanitizer: heap-use-after-free in std::__atomic_base<bool>::load(std::memory_order) const
==1798517==ABORTING
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4370

Differential Revision: D9844146

Pulled By: sagar0

fbshipit-source-id: 18a2970b1d504b4f6c8fb04857f26e0f32124dd1
2018-09-17 10:48:21 -07:00
Yanqin Jin
29713489b6 Fix Makefile target 'jtest' on PowerPC (#4357)
Summary:
Before the fix:
On a PowerPC machine, run the following
```
$ make jtest
```
The command will fail due to "undefined symbol: crc32c_ppc". It was caused by
'rocksdbjava' Makefile target not including crc32c_ppc object files when
generating the shared lib. The fix is simple.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4357

Differential Revision: D9779474

Pulled By: riversand963

fbshipit-source-id: 3c5ec9068c2b9c796e6500f71cd900267064fd51
2018-09-17 10:47:45 -07:00
Yanqin Jin
abcad80559 Remove warnings caused by unused variables in jni (#4345)
Summary:
Test plan
```
$make clean jclean
$make -j32 rocksdbjavastatic
$make -j32 rocksdbjava
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4345

Differential Revision: D9661256

Pulled By: riversand963

fbshipit-source-id: aed316c53b29d02fbdd3fa1063a3e832b8a66469
2018-09-17 10:47:14 -07:00
Anand Ananthabhotla
b6a8f16b5e Remove trace_analyzer_tool.cc from rocksdb_lib target
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
2018-09-14 18:12:11 -07:00
Maysam Yabandeh
eddc3ebd90 Reduce IndexBlockIter size (#4358)
Summary:
With #3983 the size of IndexBlockIter was increased. This had resulted in a regression on P50 latencies in one of our benchmarks. The patch reduces IndexBlockIter size be eliminating active_comparator_ field from the class.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4358

Differential Revision: D9781737

Pulled By: maysamyabandeh

fbshipit-source-id: 71e2b28d90ff0813db9e04b737ae73e185583c52
2018-09-12 10:14:07 -07:00
Anand Ananthabhotla
73fcd6b8ff Fix a lint error due to unspecified move evaluation order (#4348)
Summary:
In C++ 11, the order of argument and move evaluation in a statement such
as below is unspecified -
  foo(a.b).bar(std::move(a))
The compiler is free to evaluate std::move(a) first, and then a.b is unspecified.

In C++ 17, this will be safe if a draft proposal around function
chaining rules is accepted.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4348

Differential Revision: D9688810

Pulled By: anand1976

fbshipit-source-id: e4651d0ca03dcf007e50371a0fc72c0d1e710fb4
2018-09-06 16:41:46 -07:00
Philip Jameson
7981e040a7 Grab straggler files to explicitly import AutoHeaders
Summary: There were a few files that were missed when AutoHeaders were moved to their own file. Add explicit loads

Reviewed By: yfeldblum

Differential Revision: D9499942

fbshipit-source-id: 942bf3a683b8961e1b6244136f6337477dcc45af
2018-09-04 13:09:04 -07:00
Mikhail Antonov
5279417e73 Avoiding write stall caused by manual flushes (#4297)
Summary:
Basically at the moment it seems it's possible to cause write stall by calling flush (either manually vis DB::Flush(), or from Backup Engine directly calling FlushMemTable() while background flush may be already happening.

One of the ways to fix it is that in DBImpl::CompactRange() we already check for possible stall and delay flush if needed before we actually proceed to call FlushMemTable(). We can simply move this delay logic to separate method and call it from FlushMemTable.

This is draft patch, for first look; need to check tests/update SyncPoints and most certainly would need to add allow_write_stall method to FlushOptions().
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4297

Differential Revision: D9420705

Pulled By: mikhail-antonov

fbshipit-source-id: f81d206b55e1d7b39e4dc64242fdfbceeea03fcc
2018-08-31 14:18:59 -07:00
Andrew Kryczka
8c7b501d0f Reduce empty SST creation/deletion in compaction (#4336)
Summary:
This is a followup to #4311. Checking `!RangeDelAggregator::IsEmpty()` before opening a dedicated range tombstone SST did not properly prevent empty SSTs from being generated. That's because it relies on `CollapsedRangeDelMap::Size`, which had an underflow bug when the map was empty. This PR fixes that underflow bug.

Also fixed an uninitialized variable in db_stress.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4336

Differential Revision: D9600080

Pulled By: ajkr

fbshipit-source-id: bc6980ca79d2cd01b825ebc9dbccd51c1a70cfc7
2018-08-31 12:35:59 -07:00
Fenggang Wu
3c1781d6af DataBlockHashIndex: avoiding expensive iiter->Next when handling hash kNoEntry (#4296)
Summary:
When returning `kNoEntry` from HashIndex lookup, previously we invalidate the
`biter` by set `current_=restarts_`, so that the search can continue to the next
block in case the search result may reside in the next block.

There is one problem: when we are searching for a missing key, if the search
finds a `kNoEntry` and continue the search to the next block, there is also a
non-trivial possibility that the HashIndex return `kNoEntry` too, and the
expensive index iterator `Next()` will happen several times for nothing.

The solution is that if the hash table returns `kNoEntry`, `SeekForGetImpl()` just search the last restart interval for the key. It will stop at the first key that is large than the seek_key, or to the end of the block, and each case will be handled correctly.

Microbenchmark script:
```
TEST_TMPDIR=/dev/shm ./db_bench --benchmarks=fillseq,readtocache,readmissing \
          --cache_size=20000000000  --use_data_block_hash_index={true|false}
```

`readmissing` performance (lower is better):
```
binary:                      3.6098 micros/op
hash (before applying diff): 4.1048 micros/op
hash (after  applying diff): 3.3502 micros/op
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4296

Differential Revision: D9419159

Pulled By: fgwu

fbshipit-source-id: 21e3eedcccbc47a249aa8eb4bf405c9def0b8a05
2018-08-30 15:45:26 -07:00
Fenggang Wu
9d5eb4063c DataBlockHashIndex: fix comment in NumRestarts() (#4286)
Summary:
Improve the description of the backward compatibility check in NumRestarts()
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4286

Differential Revision: D9412490

Pulled By: fgwu

fbshipit-source-id: ea7dd5c61d8ff8eacef623b729d4e4fd53cca066
2018-08-30 15:34:53 -07:00
Sagar Vemuri
0e35aa9930 Download bzip2 packages from Internet Archive (#4306)
Summary:
Since bzip.org is no longer maintained, download the bzip2 packages from a snapshot taken by the internet archive until we figure out a more credible source.

Fixes issue: #4305
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4306

Differential Revision: D9514868

Pulled By: sagar0

fbshipit-source-id: 57c6a141a62e652f94377efc7ca9916b458e68d5
2018-08-30 15:33:45 -07:00
Yi Wu
023e9bf214 BlobDB: Implement DisableFileDeletions (#4314)
Summary:
`DB::DiableFileDeletions` and `DB::EnableFileDeletions` are used for applications to stop RocksDB background jobs to delete files while they are doing replication. Implement these methods for BlobDB. `DeleteObsolteFiles` now needs to check `disable_file_deletions_` before starting, and will hold `delete_file_mutex_` the whole time while it is running. `DisableFileDeletions` needs to wait on `delete_file_mutex_` for running `DeleteObsolteFiles` job and set `disable_file_deletions_` flag.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4314

Differential Revision: D9501373

Pulled By: yiwu-arbug

fbshipit-source-id: 81064c1228f1724eff46da22b50ff765b16292cd
2018-08-27 11:22:31 -07:00
Andrew Kryczka
096e5f5d02 Reduce empty SST creation/deletion during compaction (#4311)
Summary:
I have a PR to start calling `OnTableFileCreated` for empty SSTs: #4307. However, it is a behavior change so should not go into a patch release.

This PR adds back a check to make sure range deletions at least exist before starting file creation. This PR should be safe to backport to earlier versions.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4311

Differential Revision: D9493734

Pulled By: ajkr

fbshipit-source-id: f0d43cda4cfd904f133cfe3a6eb622f52a9ccbe8
2018-08-24 14:09:39 -07:00
57 changed files with 1354 additions and 306 deletions

View File

@ -612,6 +612,7 @@ set(SOURCES
utilities/blob_db/blob_compaction_filter.cc
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_db_impl_filesnapshot.cc
utilities/blob_db/blob_dump_tool.cc
utilities/blob_db/blob_file.cc
utilities/blob_db/blob_log_reader.cc

View File

@ -1,12 +1,38 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
### New Features
# 5.16.6 (11/24/2018)
### Bug Fixes
* Fix the bug that WriteBatchWithIndex's SeekForPrev() doesn't see the entries with the same key.
## 5.16.5 (10/16/2018)
### Bug Fixes
* Fix slow flush/compaction when DB contains many snapshots. The problem became noticeable to us in DBs with 100,000+ snapshots, though it will affect others at different thresholds.
* Properly set the stop key for a truncated manual CompactRange
## 5.16.4 (10/10/2018)
### Bug Fixes
* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set.
## 5.16.3 (10/1/2018)
### Bug Fixes
* Fix crash caused when `CompactFiles` run with `CompactionOptions::compression == CompressionType::kDisableCompressionOption`. Now that setting causes the compression type to be chosen according to the column family-wide compression options.
## 5.16.2 (9/21/2018)
### Bug Fixes
* Fix bug in partition filters with format_version=4.
## 5.16.1 (9/17/2018)
### Bug Fixes
* Remove trace_analyzer_tool from rocksdb_lib target in TARGETS file.
* Fix RocksDB Java build and tests.
* Remove sync point in Block destructor.
## 5.16.0 (8/21/2018)
### Public API Change
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
* `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero.
* Add `FlushOptions::allow_write_stall`, which controls whether Flush calls start working immediately, even if it causes user writes to stall, or will wait until flush can be performed without causing write stall (similar to `CompactRangeOptions::allow_write_stall`). Note that the default value is false, meaning we add delay to Flush calls until stalling can be avoided when possible. This is behavior change compared to previous RocksDB versions, where Flush calls didn't check if they might cause stall or not.
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
* GetAllKeyVersions() to take an extra argument of `max_num_ikeys`.
### New Features
@ -16,6 +42,7 @@
### Bug Fixes
* Fix a bug in misreporting the estimated partition index size in properties block.
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.
## 5.15.0 (7/17/2018)
### Public API Change

View File

@ -1621,7 +1621,7 @@ ZLIB_SHA256 ?= c3e5e9fdd5004dcb542feda5ee4f0ff0744628baf8ed2dd5d66f8ca1197cb1a1
ZLIB_DOWNLOAD_BASE ?= http://zlib.net
BZIP2_VER ?= 1.0.6
BZIP2_SHA256 ?= a2848f34fcd5d6cf47def00461fcb528a0484d8edef8208d6d2e2909dc61d9cd
BZIP2_DOWNLOAD_BASE ?= http://www.bzip.org
BZIP2_DOWNLOAD_BASE ?= https://web.archive.org/web/20180624184835/http://www.bzip.org
SNAPPY_VER ?= 1.1.4
SNAPPY_SHA256 ?= 134bfe122fd25599bb807bb8130e7ba6d9bdb851e0b16efcb83ac4f5d0b70057
SNAPPY_DOWNLOAD_BASE ?= https://github.com/google/snappy/releases/download
@ -1856,7 +1856,7 @@ $(java_libobjects): jl/%.o: %.cc
rocksdbjava: $(java_all_libobjects)
$(AM_V_GEN)cd java;$(MAKE) javalib;
$(AM_V_at)rm -f ./java/target/$(ROCKSDBJNILIB)
$(AM_V_at)$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/target/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(java_libobjects) $(JAVA_LDFLAGS) $(COVERAGEFLAGS)
$(AM_V_at)$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/target/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(java_all_libobjects) $(JAVA_LDFLAGS) $(COVERAGEFLAGS)
$(AM_V_at)cd java;jar -cf target/$(ROCKSDB_JAR) HISTORY*.md
$(AM_V_at)cd java/target;jar -uf $(ROCKSDB_JAR) $(ROCKSDBJNILIB)
$(AM_V_at)cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class

25
TARGETS
View File

@ -1,3 +1,5 @@
load("@fbcode_macros//build_defs:auto_headers.bzl", "AutoHeaders")
REPO_PATH = package_name() + "/"
BUCK_BINS = "buck-out/gen/" + REPO_PATH
@ -195,7 +197,6 @@ cpp_library(
"tools/ldb_cmd.cc",
"tools/ldb_tool.cc",
"tools/sst_dump_tool.cc",
"tools/trace_analyzer_tool.cc",
"util/arena.cc",
"util/auto_roll_logger.cc",
"util/bloom.cc",
@ -233,6 +234,7 @@ cpp_library(
"utilities/blob_db/blob_compaction_filter.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
"utilities/blob_db/blob_dump_tool.cc",
"utilities/blob_db/blob_file.cc",
"utilities/blob_db/blob_log_format.cc",
@ -303,6 +305,7 @@ cpp_library(
srcs = [
"db/db_test_util.cc",
"table/mock_table.cc",
"tools/trace_analyzer_tool.cc",
"util/fault_injection_test_env.cc",
"util/testharness.cc",
"util/testutil.cc",
@ -376,11 +379,6 @@ ROCKS_TESTS = [
"table/block_based_filter_block_test.cc",
"serial",
],
[
"data_block_hash_index_test",
"table/data_block_hash_index_test.cc",
"serial",
],
[
"block_test",
"table/block_test.cc",
@ -506,6 +504,11 @@ ROCKS_TESTS = [
"table/cuckoo_table_reader_test.cc",
"serial",
],
[
"data_block_hash_index_test",
"table/data_block_hash_index_test.cc",
"serial",
],
[
"date_tiered_test",
"utilities/date_tiered/date_tiered_test.cc",
@ -956,11 +959,6 @@ ROCKS_TESTS = [
"tools/sst_dump_test.cc",
"serial",
],
[
"trace_analyzer_test",
"tools/trace_analyzer_test.cc",
"serial",
],
[
"statistics_test",
"monitoring/statistics_test.cc",
@ -996,6 +994,11 @@ ROCKS_TESTS = [
"util/timer_queue_test.cc",
"serial",
],
[
"trace_analyzer_test",
"tools/trace_analyzer_test.cc",
"serial",
],
[
"transaction_test",
"utilities/transactions/transaction_test.cc",

View File

@ -2,7 +2,10 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
rocksdb_target_header = """REPO_PATH = package_name() + "/"
rocksdb_target_header = """
load("@fbcode_macros//build_defs:auto_headers.bzl", "AutoHeaders")
REPO_PATH = package_name() + "/"
BUCK_BINS = "buck-out/gen/" + REPO_PATH

View File

@ -53,11 +53,13 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
source "$PWD/build_tools/fbcode_config_platform007.sh"
fi
fi

View File

@ -0,0 +1,18 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/6e8e715624fd15256a7970073387793dfcf79b46/7.x/centos7-native/b2ef2b6
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/ef37e1faa1c29782abfac1ae65a291b9b7966f6d/stable/centos7-native/c9f9104
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/c67031f0f739ac61575a061518d6ef5038f99f90/7.x/platform007/5620abc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/60d6f124a78798b73944f5ba87c2306ae3460153/2.26/platform007/f259413
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/22c2d65676fb7c23cfa797c4f6937f38b026f3cf/1.2.8/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/907b498203d297947f3bb70b9466f47e100f1873/r131/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/3ee276cbacfad3074e3f07bf826ac47f06970f4e/1.3.5/platform007/15a3614
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/9c910d36d6235cc40e8ff559358f1833452300ca/master/platform007/5b0f53e
NUMA_BASE=/mnt/gvfs/third-party2/numa/9cbf2460284c669ed19c3ccb200a71f7dd7e53c7/2.0.11/platform007/ca4da3d
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/bf3d7497fe4e6d007354f0adffa16ce3003f8338/1.3/platform007/6f3e0a9
TBB_BASE=/mnt/gvfs/third-party2/tbb/ff4e0b093534704d8abab678a4fd7f5ea7b094c7/2018_U5/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/b5c4a61a5c483ba24722005ae07895971a2ac707/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/92ff90349e2f43ea0a8246d8b1cf17b6869013e3/2.29.1/centos7-native/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/f3f697a28122e6bcd513273dd9c1ff23852fc59f/3.13.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -0,0 +1,157 @@
#!/bin/sh
#
# Set environment variables so that we can compile rocksdb using
# fbcode settings. It uses the latest g++ and clang compilers and also
# uses jemalloc
# Environment variables that change the behavior of this script:
# PIC_BUILD -- if true, it will only take pic versions of libraries from fbcode. libraries that don't have pic variant will not be included
BASEDIR=`dirname $BASH_SOURCE`
source "$BASEDIR/dependencies_platform007.sh"
CFLAGS=""
# libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc
GLIBC_INCLUDE="$GLIBC_BASE/include"
GLIBC_LIBS=" -L $GLIBC_BASE/lib"
# snappy
SNAPPY_INCLUDE=" -I $SNAPPY_BASE/include/"
if test -z $PIC_BUILD; then
SNAPPY_LIBS=" $SNAPPY_BASE/lib/libsnappy.a"
else
SNAPPY_LIBS=" $SNAPPY_BASE/lib/libsnappy_pic.a"
fi
CFLAGS+=" -DSNAPPY"
if test -z $PIC_BUILD; then
# location of zlib headers and libraries
ZLIB_INCLUDE=" -I $ZLIB_BASE/include/"
ZLIB_LIBS=" $ZLIB_BASE/lib/libz.a"
CFLAGS+=" -DZLIB"
# location of bzip headers and libraries
BZIP_INCLUDE=" -I $BZIP2_BASE/include/"
BZIP_LIBS=" $BZIP2_BASE/lib/libbz2.a"
CFLAGS+=" -DBZIP2"
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4"
fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags.a"
else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi
CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
JEMALLOC_LIB=" $JEMALLOC_BASE/lib/libjemalloc.a"
if test -z $PIC_BUILD; then
# location of numa
NUMA_INCLUDE=" -I $NUMA_BASE/include/"
NUMA_LIB=" $NUMA_BASE/lib/libnuma.a"
CFLAGS+=" -DNUMA"
# location of libunwind
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
fi
# location of TBB
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
if test -z $PIC_BUILD; then
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
else
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
fi
CFLAGS+=" -DTBB"
# use Intel SSE support for checksum calculations
export USE_SSE=1
export PORTABLE=1
BINUTILS="$BINUTILS_BASE/bin"
AR="$BINUTILS/ar"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
STDLIBS="-L $GCC_BASE/lib64"
CLANG_BIN="$CLANG_BASE/bin"
CLANG_LIB="$CLANG_BASE/lib"
CLANG_SRC="$CLANG_BASE/../../src"
CLANG_ANALYZER="$CLANG_BIN/clang++"
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build"
if [ -z "$USE_CLANG" ]; then
# gcc
CC="$GCC_BASE/bin/gcc"
CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1
else
# clang
CLANG_INCLUDE="$CLANG_LIB/clang/stable/include"
CC="$CLANG_BIN/clang"
CXX="$CLANG_BIN/clang++"
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE"
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE/linux "
CFLAGS+=" -isystem $KERNEL_HEADERS_INCLUDE "
CFLAGS+=" -Wno-expansion-to-defined "
CXXFLAGS="-nostdinc++"
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
VALGRIND_VER="$VALGRIND_BASE/bin/"
# lua not supported because it's on track for deprecation, I think
LUA_PATH=
LUA_LIB=
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD LUA_PATH LUA_LIB

View File

@ -85,8 +85,9 @@ NON_SHM="TMPD=/tmp/rocksdb_test_tmp"
GCC_481="ROCKSDB_FBCODE_BUILD_WITH_481=1"
ASAN="COMPILE_WITH_ASAN=1"
CLANG="USE_CLANG=1"
LITE="OPT=\"-DROCKSDB_LITE -g\""
TSAN="COMPILE_WITH_TSAN=1"
# in gcc-5 there are known problems with TSAN like https://gcc.gnu.org/bugzilla/show_bug.cgi?id=71090.
# using platform007 gives us gcc-8 or higher which has that bug fixed.
TSAN="ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007=1 COMPILE_WITH_TSAN=1"
UBSAN="COMPILE_WITH_UBSAN=1"
TSAN_CRASH='CRASH_TEST_EXT_ARGS="--compression_type=zstd --log2_keys_per_lock=22"'
NON_TSAN_CRASH="CRASH_TEST_EXT_ARGS=--compression_type=zstd"

View File

@ -53,6 +53,45 @@ function get_lib_base()
log_variable $__res_var
}
###########################################################
# platform007 dependencies #
###########################################################
OUTPUT="$BASEDIR/dependencies_platform007.sh"
rm -f "$OUTPUT"
touch "$OUTPUT"
echo "Writing dependencies to $OUTPUT"
# Compilers locations
GCC_BASE=`readlink -f $TP2_LATEST/gcc/7.x/centos7-native/*/`
CLANG_BASE=`readlink -f $TP2_LATEST/llvm-fb/stable/centos7-native/*/`
log_variable GCC_BASE
log_variable CLANG_BASE
# Libraries locations
get_lib_base libgcc 7.x platform007
get_lib_base glibc 2.26 platform007
get_lib_base snappy LATEST platform007
get_lib_base zlib LATEST platform007
get_lib_base bzip2 LATEST platform007
get_lib_base lz4 LATEST platform007
get_lib_base zstd LATEST platform007
get_lib_base gflags LATEST platform007
get_lib_base jemalloc LATEST platform007
get_lib_base numa LATEST platform007
get_lib_base libunwind LATEST platform007
get_lib_base tbb LATEST platform007
get_lib_base kernel-headers fb platform007
get_lib_base binutils LATEST centos7-native
get_lib_base valgrind LATEST platform007
get_lib_base lua 5.3.4 platform007
git diff $OUTPUT
###########################################################
# 5.x dependencies #
###########################################################

View File

@ -308,6 +308,47 @@ TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
delete db;
}
TEST_F(CompactFilesTest, SentinelCompressionType) {
// Check that passing `CompressionType::kDisableCompressionOption` to
// `CompactFiles` causes it to use the column family compression options.
for (auto compaction_style :
{CompactionStyle::kCompactionStyleLevel,
CompactionStyle::kCompactionStyleUniversal,
CompactionStyle::kCompactionStyleNone}) {
DestroyDB(db_name_, Options());
Options options;
options.compaction_style = compaction_style;
// L0: Snappy, L1: ZSTD, L2: Snappy
options.compression_per_level = {CompressionType::kSnappyCompression,
CompressionType::kZlibCompression,
CompressionType::kSnappyCompression};
options.create_if_missing = true;
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
ASSERT_OK(DB::Open(options, db_name_, &db));
db->Put(WriteOptions(), "key", "val");
db->Flush(FlushOptions());
auto l0_files = collector->GetFlushedFiles();
ASSERT_EQ(1, l0_files.size());
// L0->L1 compaction, so output should be ZSTD-compressed
CompactionOptions compaction_opts;
compaction_opts.compression = CompressionType::kDisableCompressionOption;
ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1));
rocksdb::TablePropertiesCollection all_tables_props;
ASSERT_OK(db->GetPropertiesOfAllTables(&all_tables_props));
for (const auto& name_and_table_props : all_tables_props) {
ASSERT_EQ(CompressionTypeToString(CompressionType::kZlibCompression),
name_and_table_props.second->compression_name);
}
delete db;
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -77,6 +77,12 @@ CompactionIterator::CompactionIterator(
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
#ifndef NDEBUG
// findEarliestVisibleSnapshot assumes this ordering.
for (size_t i = 1; i < snapshots_->size(); ++i) {
assert(snapshots_->at(i - 1) <= snapshots_->at(i));
}
#endif
if (compaction_filter_ != nullptr) {
if (compaction_filter_->IgnoreSnapshots()) {
ignore_snapshots_ = true;
@ -603,18 +609,23 @@ void CompactionIterator::PrepareOutput() {
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot) {
assert(snapshots_->size());
SequenceNumber prev = kMaxSequenceNumber;
for (const auto cur : *snapshots_) {
assert(prev == kMaxSequenceNumber || prev <= cur);
if (cur >= in && (snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(in, cur))) {
*prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev;
auto snapshots_iter = std::lower_bound(
snapshots_->begin(), snapshots_->end(), in);
if (snapshots_iter == snapshots_->begin()) {
*prev_snapshot = 0;
} else {
*prev_snapshot = *std::prev(snapshots_iter);
assert(*prev_snapshot < in);
}
for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
auto cur = *snapshots_iter;
assert(in <= cur);
if (snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(in, cur)) {
return cur;
}
prev = cur;
assert(prev < kMaxSequenceNumber);
*prev_snapshot = cur;
}
*prev_snapshot = prev;
return kMaxSequenceNumber;
}

View File

@ -1077,7 +1077,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
if (status.ok() && sub_compact->builder == nullptr &&
sub_compact->outputs.size() == 0) {
sub_compact->outputs.size() == 0 &&
!range_del_agg->IsEmpty()) {
// handle subcompaction containing only range deletions
status = OpenCompactionOutputFile(sub_compact);
}

View File

@ -219,7 +219,8 @@ void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
VersionStorageInfo* vstorage,
CompactionInputFiles* inputs) {
CompactionInputFiles* inputs,
InternalKey** next_smallest) {
// This isn't good compaction
assert(!inputs->empty());
@ -242,7 +243,8 @@ bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
GetRange(*inputs, &smallest, &largest);
inputs->clear();
vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
hint_index, &hint_index);
hint_index, &hint_index, true,
next_smallest);
} while (inputs->size() > old_size);
// we started off with inputs non-empty and the previous loop only grew
@ -315,13 +317,29 @@ Compaction* CompactionPicker::CompactFiles(
// shouldn't have been released since.
assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
auto c =
new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
output_level, compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id,
compact_options.compression, ioptions_.compression_opts,
compact_options.max_subcompactions,
/* grandparents */ {}, true);
CompressionType compression_type;
if (compact_options.compression == kDisableCompressionOption) {
int base_level;
if (ioptions_.compaction_style == kCompactionStyleLevel) {
base_level = vstorage->base_level();
} else {
base_level = 1;
}
compression_type =
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, base_level);
} else {
// TODO(ajkr): `CompactionOptions` offers configurable `CompressionType`
// without configurable `CompressionOptions`, which is inconsistent.
compression_type = compact_options.compression;
}
auto c = new Compaction(
vstorage, ioptions_, mutable_cf_options, input_files, output_level,
compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
GetCompressionOptions(ioptions_, vstorage, output_level),
compact_options.max_subcompactions,
/* grandparents */ {}, true);
RegisterCompaction(c);
return c;
}
@ -633,7 +651,6 @@ Compaction* CompactionPicker::CompactRange(
uint64_t s = inputs[i]->compensated_file_size;
total += s;
if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest;
covering_the_whole_range = false;
inputs.files.resize(i + 1);
break;
@ -642,7 +659,10 @@ Compaction* CompactionPicker::CompactRange(
}
assert(output_path_id < static_cast<uint32_t>(ioptions_.cf_paths.size()));
if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) {
InternalKey key_storage;
InternalKey* next_smallest = &key_storage;
if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs, &next_smallest) ==
false) {
// manual compaction is now multi-threaded, so it can
// happen that ExpandWhileOverlapping fails
// we handle it higher in RunManualCompaction
@ -650,8 +670,10 @@ Compaction* CompactionPicker::CompactRange(
return nullptr;
}
if (covering_the_whole_range) {
if (covering_the_whole_range || !next_smallest) {
*compaction_end = nullptr;
} else {
**compaction_end = *next_smallest;
}
CompactionInputFiles output_level_inputs;

View File

@ -151,7 +151,8 @@ class CompactionPicker {
// Will return false if it is impossible to apply this compaction.
bool ExpandInputsToCleanCut(const std::string& cf_name,
VersionStorageInfo* vstorage,
CompactionInputFiles* inputs);
CompactionInputFiles* inputs,
InternalKey** next_smallest = nullptr);
// Returns true if any one of the parent files are being compacted
bool IsRangeInCompaction(VersionStorageInfo* vstorage,

View File

@ -22,11 +22,12 @@ class DBBloomFilterTest : public DBTestBase {
class DBBloomFilterTestWithParam
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public testing::WithParamInterface<std::tuple<bool, bool, uint32_t>> {
// public testing::WithParamInterface<bool> {
protected:
bool use_block_based_filter_;
bool partition_filters_;
uint32_t format_version_;
public:
DBBloomFilterTestWithParam() : DBTestBase("/db_bloom_filter_tests") {}
@ -36,9 +37,12 @@ class DBBloomFilterTestWithParam
void SetUp() override {
use_block_based_filter_ = std::get<0>(GetParam());
partition_filters_ = std::get<1>(GetParam());
format_version_ = std::get<2>(GetParam());
}
};
class DBBloomFilterTestDefFormatVersion : public DBBloomFilterTestWithParam {};
class SliceTransformLimitedDomainGeneric : public SliceTransform {
const char* Name() const override {
return "SliceTransformLimitedDomainGeneric";
@ -62,7 +66,7 @@ class SliceTransformLimitedDomainGeneric : public SliceTransform {
// KeyMayExist can lead to a few false positives, but not false negatives.
// To make test deterministic, use a much larger number of bits per key-20 than
// bits in the key, so that false positives are eliminated
TEST_P(DBBloomFilterTestWithParam, KeyMayExist) {
TEST_P(DBBloomFilterTestDefFormatVersion, KeyMayExist) {
do {
ReadOptions ropts;
std::string value;
@ -401,6 +405,11 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
table_options.format_version = format_version_;
if (format_version_ >= 4) {
// value delta encoding challenged more with index interval > 1
table_options.index_block_restart_interval = 8;
}
table_options.metadata_block_size = 32;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -456,10 +465,26 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
} while (ChangeCompactOptions());
}
INSTANTIATE_TEST_CASE_P(DBBloomFilterTestWithParam, DBBloomFilterTestWithParam,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, true),
std::make_tuple(false, false)));
INSTANTIATE_TEST_CASE_P(
FormatDef, DBBloomFilterTestDefFormatVersion,
::testing::Values(std::make_tuple(true, false, test::kDefaultFormatVersion),
std::make_tuple(false, true, test::kDefaultFormatVersion),
std::make_tuple(false, false,
test::kDefaultFormatVersion)));
INSTANTIATE_TEST_CASE_P(
FormatDef, DBBloomFilterTestWithParam,
::testing::Values(std::make_tuple(true, false, test::kDefaultFormatVersion),
std::make_tuple(false, true, test::kDefaultFormatVersion),
std::make_tuple(false, false,
test::kDefaultFormatVersion)));
INSTANTIATE_TEST_CASE_P(
FormatLatest, DBBloomFilterTestWithParam,
::testing::Values(std::make_tuple(true, false, test::kLatestFormatVersion),
std::make_tuple(false, true, test::kLatestFormatVersion),
std::make_tuple(false, false,
test::kLatestFormatVersion)));
TEST_F(DBBloomFilterTest, BloomFilterRate) {
while (ChangeFilterOptions()) {

View File

@ -116,6 +116,22 @@ private:
std::vector<std::atomic<int>> compaction_completed_;
};
class SstStatsCollector : public EventListener {
public:
SstStatsCollector() : num_ssts_creation_started_(0) {}
void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /* info */) override {
++num_ssts_creation_started_;
}
int num_ssts_creation_started() {
return num_ssts_creation_started_;
}
private:
std::atomic<int> num_ssts_creation_started_;
};
static const int kCDTValueSize = 1000;
static const int kCDTKeysPerBuffer = 4;
static const int kCDTNumLevels = 8;
@ -3485,12 +3501,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
// ensure the auto compaction doesn't finish until manual compaction has
// had a chance to be delayed.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", "CompactionJob::Run():End"}});
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"CompactionJob::Run():End"}});
} else {
// ensure the auto-compaction doesn't finish until manual compaction has
// continued without delay.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
{{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -3538,12 +3555,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
// ensure the flush doesn't finish until manual compaction has had a
// chance to be delayed.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", "FlushJob::WriteLevel0Table"}});
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"FlushJob::WriteLevel0Table"}});
} else {
// ensure the flush doesn't finish until manual compaction has continued
// without delay.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWaitDone",
{{"DBImpl::FlushMemTable:StallWaitDone",
"FlushJob::WriteLevel0Table"}});
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -3553,6 +3571,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
FlushOptions flush_opts;
flush_opts.wait = false;
flush_opts.allow_write_stall = true;
dbfull()->Flush(flush_opts);
}
@ -3588,7 +3607,7 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
// The auto-compaction waits until the manual compaction finishes to ensure
// the signal comes from closing CF/DB, not from compaction making progress.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait",
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
{"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
"CompactionJob::Run():End"}});
@ -3639,18 +3658,21 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
// began. So it unblocks CompactRange and precludes its flush. Throughout the
// test, stall conditions are upheld via high L0 file count.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait",
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
{"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
"DBImpl::CompactRange:StallWaitDone"},
{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
"DBImpl::FlushMemTable:StallWaitDone"},
{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
//used for the delayable flushes
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
}
Flush();
dbfull()->Flush(flush_opts);
}
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
@ -3660,7 +3682,7 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
Put(ToString(0), RandomString(&rnd, 1024));
Flush();
dbfull()->Flush(flush_opts);
Put(ToString(0), RandomString(&rnd, 1024));
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
manual_compaction_thread.join();
@ -3816,6 +3838,30 @@ TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
bg_thread.join();
}
TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
Options options = CurrentOptions();
SstStatsCollector* collector = new SstStatsCollector();
options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(collector);
Reopen(options);
// Make sure the L0 files overlap to prevent trivial move.
ASSERT_OK(Put("a", "val"));
ASSERT_OK(Put("b", "val"));
ASSERT_OK(Flush());
ASSERT_OK(Delete("a"));
ASSERT_OK(Delete("b"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
// Expect one file creation to start for each flush, and zero for compaction
// since no keys are written.
ASSERT_EQ(2, collector->num_ssts_creation_started());
}
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false),
@ -3913,6 +3959,50 @@ INSTANTIATE_TEST_CASE_P(
CompactionPri::kOldestSmallestSeqFirst,
CompactionPri::kMinOverlappingRatio));
class NoopMergeOperator : public MergeOperator {
public:
NoopMergeOperator() {}
virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
MergeOperationOutput* merge_out) const override {
std::string val("bar");
merge_out->new_value = val;
return true;
}
virtual const char* Name() const override { return "Noop"; }
};
TEST_F(DBCompactionTest, PartialManualCompaction) {
Options opts = CurrentOptions();
opts.num_levels = 3;
opts.level0_file_num_compaction_trigger = 10;
opts.compression = kNoCompression;
opts.merge_operator.reset(new NoopMergeOperator());
opts.target_file_size_base = 10240;
DestroyAndReopen(opts);
Random rnd(301);
for (auto i = 0; i < 8; ++i) {
for (auto j = 0; j < 10; ++j) {
Merge("foo", RandomString(&rnd, 1024));
}
Flush();
}
MoveFilesToLevel(2);
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
ASSERT_OK(dbfull()->SetOptions(
{{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
dbfull()->CompactRange(cro, nullptr, nullptr);
}
#endif // !defined(ROCKSDB_LITE)
} // namespace rocksdb

View File

@ -35,6 +35,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
no_wait.allow_write_stall=true;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",

View File

@ -396,7 +396,7 @@ class DBImpl : public DB {
Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true,
Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
ColumnFamilyHandle* cfh = nullptr);
// Wait for memtable compaction
@ -811,6 +811,7 @@ class DBImpl : public DB {
friend struct SuperVersion;
friend class CompactedDBImpl;
friend class DBTest_ConcurrentFlushWAL_Test;
friend class DBTest_MixedSlowdownOptionsStop_Test;
#ifndef NDEBUG
friend class DBTest2_ReadCallbackTest_Test;
friend class WriteCallbackTest_WriteWithCallbackTest_Test;
@ -918,6 +919,10 @@ class DBImpl : public DB {
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason, bool writes_stopped = false);
// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed);
// Wait for memtable flushed.
// If flush_memtable_id is non-null, wait until the memtable with the ID
// gets flush. Otherwise, wait until the column family don't have any

View File

@ -324,60 +324,12 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
CleanupSuperVersion(super_version);
}
if (!options.allow_write_stall && flush_needed) {
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
TEST_SYNC_POINT("DBImpl::CompactRange:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] CompactRange waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
// We waited so long that the memtable we were originally waiting on was
// flushed.
flush_needed = false;
break;
}
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
// Skip stalling check if we're below auto-flush and auto-compaction
// triggers. If it stalled in these conditions, that'd mean the stall
// triggers are so low that stalling is needed for any background work. In
// that case we shouldn't wait since background work won't be scheduled.
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions()->min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone");
Status s;
if (flush_needed) {
s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction);
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
false /* writes_stopped*/);
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
@ -1077,6 +1029,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
FlushReason flush_reason, bool writes_stopped) {
Status s;
uint64_t flush_memtable_id = 0;
if (!flush_options.allow_write_stall) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
if (!s.ok() || !flush_needed) {
return s;
}
}
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
@ -1115,6 +1075,69 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
return s;
}
// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
// cause write stall, for example if one memtable is being flushed already.
// This method tries to avoid write stall (similar to CompactRange() behavior)
// it emulates how the SuperVersion / LSM would change if flush happens, checks
// it against various constrains and delays flush if it'd cause write stall.
// Called should check status and flush_needed to see if flush already happened.
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed) {
{
*flush_needed = true;
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] WaitUntilFlushWouldNotStallWrites"
" waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
// We waited so long that the memtable we were originally waiting on was
// flushed.
*flush_needed = false;
return Status::OK();
}
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
// Skip stalling check if we're below auto-flush and auto-compaction
// triggers. If it stalled in these conditions, that'd mean the stall
// triggers are so low that stalling is needed for any background work. In
// that case we shouldn't wait since background work won't be scheduled.
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions()->min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
return Status::OK();
}
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id) {
Status s;

View File

@ -100,9 +100,11 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
return SwitchMemtable(cfd, &write_context);
}
Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) {
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
ColumnFamilyHandle* cfh) {
FlushOptions fo;
fo.wait = wait;
fo.allow_write_stall = allow_write_stall;
ColumnFamilyData* cfd;
if (cfh == nullptr) {
cfd = default_cf_handle_->cfd();

View File

@ -1146,10 +1146,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
if (delay > 0) {
if (write_options.no_slowdown) {
return Status::Incomplete();
return Status::Incomplete("Write stall");
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
// Notify write_thread_ about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
write_thread_.BeginWriteStall();
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
mutex_.Unlock();
// We will delay the write until we have slept for delay ms or
// we don't need a delay anymore
@ -1166,15 +1170,21 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
env_->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
write_thread_.EndWriteStall();
}
while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) {
if (write_options.no_slowdown) {
return Status::Incomplete();
return Status::Incomplete("Write stall");
}
delayed = true;
// Notify write_thread_ about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
write_thread_.BeginWriteStall();
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
bg_cv_.Wait();
write_thread_.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);

View File

@ -262,6 +262,196 @@ TEST_F(DBTest, SkipDelay) {
}
}
TEST_F(DBTest, MixedSlowdownOptions) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000;
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<port::Thread> threads;
std::atomic<int> thread_num(0);
std::function<void()> write_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = false;
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
};
// Use a small number to ensure a large delay that is still effective
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:BeginWriteStallDone",
[&](void* /*arg*/) {
sleep_count.fetch_add(1);
if (threads.empty()) {
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_slowdown_func);
}
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_no_slowdown_func);
}
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = false;
wo.disableWAL = false;
wo.no_slowdown = false;
dbfull()->Put(wo, "foo", "bar");
// We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write.
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
token.reset();
for (auto& t : threads) {
t.join();
}
ASSERT_GE(sleep_count.load(), 1);
wo.no_slowdown = true;
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
}
TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000;
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<port::Thread> threads;
std::atomic<int> thread_num(0);
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
};
// Use a small number to ensure a large delay that is still effective
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep",
[&](void* /*arg*/) {
sleep_count.fetch_add(1);
if (threads.empty()) {
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_no_slowdown_func);
}
// Sleep for 2s to allow the threads to insert themselves into the
// write queue
env_->SleepForMicroseconds(3000000ULL);
}
});
std::atomic<int> wait_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Wait",
[&](void* /*arg*/) { wait_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = false;
wo.disableWAL = false;
wo.no_slowdown = false;
dbfull()->Put(wo, "foo", "bar");
// We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write.
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
token.reset();
for (auto& t : threads) {
t.join();
}
ASSERT_EQ(sleep_count.load(), 1);
ASSERT_GE(wait_count.load(), 0);
}
TEST_F(DBTest, MixedSlowdownOptionsStop) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000;
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<port::Thread> threads;
std::atomic<int> thread_num(0);
std::function<void()> write_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = false;
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> wakeup_writer = [&]() {
dbfull()->mutex_.Lock();
dbfull()->bg_cv_.SignalAll();
dbfull()->mutex_.Unlock();
};
// Use a small number to ensure a large delay that is still effective
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetStopToken();
std::atomic<int> wait_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Wait",
[&](void* /*arg*/) {
wait_count.fetch_add(1);
if (threads.empty()) {
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_slowdown_func);
}
for (int i = 0; i < 2; ++i) {
threads.emplace_back(write_no_slowdown_func);
}
// Sleep for 2s to allow the threads to insert themselves into the
// write queue
env_->SleepForMicroseconds(3000000ULL);
}
token.reset();
threads.emplace_back(wakeup_writer);
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = false;
wo.disableWAL = false;
wo.no_slowdown = false;
dbfull()->Put(wo, "foo", "bar");
// We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write.
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
token.reset();
for (auto& t : threads) {
t.join();
}
ASSERT_GE(wait_count.load(), 1);
wo.no_slowdown = true;
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, LevelLimitReopen) {
@ -4331,7 +4521,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// Clean up memtable and L0. Block compaction threads. If continue to write
// and flush memtables. We should see put stop after 8 memtable flushes
// since level0_stop_writes_trigger = 8
dbfull()->TEST_FlushMemTable(true);
dbfull()->TEST_FlushMemTable(true, true);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction
test::SleepingBackgroundTask sleeping_task_low;
@ -4344,7 +4534,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
WriteOptions wo;
while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
dbfull()->TEST_FlushMemTable(true);
dbfull()->TEST_FlushMemTable(true, true);
count++;
if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp();
@ -4372,7 +4562,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
count = 0;
while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
dbfull()->TEST_FlushMemTable(true);
dbfull()->TEST_FlushMemTable(true, true);
count++;
if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp();
@ -5512,7 +5702,7 @@ TEST_F(DBTest, SoftLimit) {
for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) {
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
}
dbfull()->TEST_WaitForCompact();
@ -5522,7 +5712,7 @@ TEST_F(DBTest, SoftLimit) {
for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) {
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
}
dbfull()->TEST_WaitForCompact();
@ -5541,7 +5731,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(i), std::string(5000, 'x'));
Put(Key(100 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
@ -5576,7 +5766,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(10 + i), std::string(5000, 'x'));
Put(Key(90 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
// Wake up sleep task to enable compaction to run and waits
@ -5597,7 +5787,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(20 + i), std::string(5000, 'x'));
Put(Key(80 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
// Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction

View File

@ -2649,7 +2649,7 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
#endif
}
TEST_F(DBTest2, IteratorPinnedMemory) {
TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();

View File

@ -451,13 +451,14 @@ Options DBTestBase::GetOptions(
}
case kBlockBasedTableWithPartitionedIndexFormat4: {
table_options.format_version = 4;
// Format 3 changes the binary index format. Since partitioned index is a
// Format 4 changes the binary index format. Since partitioned index is a
// super-set of simple indexes, we are also using kTwoLevelIndexSearch to
// test this format.
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
// The top-level index in partition filters are also affected by format 3.
// The top-level index in partition filters are also affected by format 4.
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options.partition_filters = true;
table_options.index_block_restart_interval = 8;
break;
}
case kBlockBasedTableWithIndexRestartInterval: {

View File

@ -109,8 +109,6 @@ struct OptionsOverride {
// These will be used only if filter_policy is set
bool partition_filters = false;
uint64_t metadata_block_size = 1024;
BlockBasedTableOptions::IndexType index_type =
BlockBasedTableOptions::IndexType::kBinarySearch;
// Used as a bit mask of individual enums in which to skip an XF test point
int skip_policy = 0;

View File

@ -417,7 +417,9 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
++i) {
Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
db_->Flush(FlushOptions(), handles_[1]);
FlushOptions fo;
fo.allow_write_stall = true;
db_->Flush(fo, handles_[1]);
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
}
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);

View File

@ -344,7 +344,7 @@ class CollapsedRangeDelMap : public RangeDelMap {
}
}
size_t Size() const override { return rep_.size() - 1; }
size_t Size() const override { return rep_.empty() ? 0 : rep_.size() - 1; }
void InvalidatePosition() override { iter_ = rep_.end(); }
@ -493,7 +493,8 @@ Status RangeDelAggregator::AddTombstones(
tombstone.end_key_ = largest->user_key();
}
}
GetRangeDelMap(tombstone.seq_).AddTombstone(std::move(tombstone));
auto seq = tombstone.seq_;
GetRangeDelMap(seq).AddTombstone(std::move(tombstone));
input->Next();
}
if (!first_iter) {

View File

@ -2032,7 +2032,7 @@ bool VersionStorageInfo::OverlapInLevel(int level,
void VersionStorageInfo::GetOverlappingInputs(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
bool expand_range) const {
bool expand_range, InternalKey** next_smallest) const {
if (level >= num_non_empty_levels_) {
// this level is empty, no overlapping inputs
return;
@ -2051,11 +2051,17 @@ void VersionStorageInfo::GetOverlappingInputs(
}
const Comparator* user_cmp = user_comparator_;
if (level > 0) {
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
hint_index, file_index);
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
file_index, false, next_smallest);
return;
}
if (next_smallest) {
// next_smallest key only makes sense for non-level 0, where files are
// non-overlapping
*next_smallest = nullptr;
}
for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
const Slice file_start = ExtractUserKey(f->smallest_key);
@ -2183,7 +2189,7 @@ int sstableKeyCompare(const Comparator* user_cmp,
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
bool within_interval) const {
bool within_interval, InternalKey** next_smallest) const {
assert(level > 0);
int min = 0;
int mid = 0;
@ -2219,6 +2225,9 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
// If there were no overlapping files, return immediately.
if (!foundOverlap) {
if (next_smallest) {
next_smallest = nullptr;
}
return;
}
// returns the index where an overlap is found
@ -2239,6 +2248,15 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
for (int i = start_index; i <= end_index; i++) {
inputs->push_back(files_[level][i]);
}
if (next_smallest != nullptr) {
// Provide the next key outside the range covered by inputs
if (++end_index < static_cast<int>(files_[level].size())) {
**next_smallest = files_[level][end_index]->smallest;
} else {
*next_smallest = nullptr;
}
}
}
// Store in *start_index and *end_index the range of all files in

View File

@ -188,9 +188,11 @@ class VersionStorageInfo {
std::vector<FileMetaData*>* inputs,
int hint_index = -1, // index of overlap file
int* file_index = nullptr, // return index of overlap file
bool expand_range = true) // if set, returns files which overlap the
const; // range and overlap each other. If false,
bool expand_range = true, // if set, returns files which overlap the
// range and overlap each other. If false,
// then just files intersecting the range
InternalKey** next_smallest = nullptr) // if non-null, returns the
const; // smallest key of next file not included
void GetCleanInputsWithinInterval(
int level, const InternalKey* begin, // nullptr means before all keys
const InternalKey* end, // nullptr means after all keys
@ -200,14 +202,15 @@ class VersionStorageInfo {
const;
void GetOverlappingInputsRangeBinarySearch(
int level, // level > 0
int level, // level > 0
const InternalKey* begin, // nullptr means before all keys
const InternalKey* end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs,
int hint_index, // index of overlap file
int* file_index, // return index of overlap file
bool within_interval = false) // if set, force the inputs within interval
const;
bool within_interval = false, // if set, force the inputs within interval
InternalKey** next_smallest = nullptr) // if non-null, returns the
const; // smallest key of next file not included
void ExtendFileRangeOverlappingInterval(
int level,

View File

@ -24,7 +24,10 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options)
enable_pipelined_write_(db_options.enable_pipelined_write),
newest_writer_(nullptr),
newest_memtable_writer_(nullptr),
last_sequence_(0) {}
last_sequence_(0),
write_stall_dummy_(),
stall_mu_(),
stall_cv_(&stall_mu_) {}
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
// We're going to block. Lazily create the mutex. We guarantee
@ -219,6 +222,28 @@ bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
// If write stall in effect, and w->no_slowdown is not true,
// block here until stall is cleared. If its true, then return
// immediately
if (writers == &write_stall_dummy_) {
if (w->no_slowdown) {
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
return false;
}
// Since no_slowdown is false, wait here to be notified of the write
// stall clearing
{
MutexLock lock(&stall_mu_);
writers = newest_writer->load(std::memory_order_relaxed);
if (writers == &write_stall_dummy_) {
stall_cv_.Wait();
// Load newest_writers_ again since it may have changed
writers = newest_writer->load(std::memory_order_relaxed);
continue;
}
}
}
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
@ -303,12 +328,44 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
SetState(w, STATE_COMPLETED);
}
void WriteThread::BeginWriteStall() {
LinkOne(&write_stall_dummy_, &newest_writer_);
// Walk writer list until w->write_group != nullptr. The current write group
// will not have a mix of slowdown/no_slowdown, so its ok to stop at that
// point
Writer* w = write_stall_dummy_.link_older;
Writer* prev = &write_stall_dummy_;
while (w != nullptr && w->write_group == nullptr) {
if (w->no_slowdown) {
prev->link_older = w->link_older;
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
w = prev->link_older;
} else {
prev = w;
w = w->link_older;
}
}
}
void WriteThread::EndWriteStall() {
MutexLock lock(&stall_mu_);
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
newest_writer_.exchange(write_stall_dummy_.link_older);
// Wake up writers
stall_cv_.SignalAll();
}
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
assert(w->batch != nullptr);
bool linked_as_leader = LinkOne(w, &newest_writer_);
if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}

View File

@ -342,6 +342,13 @@ class WriteThread {
return last_sequence_;
}
// Insert a dummy writer at the tail of the write queue to indicate a write
// stall, and fail any writers in the queue with no_slowdown set to true
void BeginWriteStall();
// Remove the dummy writer and wake up waiting writers
void EndWriteStall();
private:
// See AwaitState.
const uint64_t max_yield_usec_;
@ -365,6 +372,17 @@ class WriteThread {
// is not necessary visible to reads because the writer can be ongoing.
SequenceNumber last_sequence_;
// A dummy writer to indicate a write stall condition. This will be inserted
// at the tail of the writer queue by the leader, so newer writers can just
// check for this and bail
Writer write_stall_dummy_;
// Mutex and condvar for writers to block on a write stall. During a write
// stall, writers with no_slowdown set to false will wait on this rather
// on the writer queue
port::Mutex stall_mu_;
port::CondVar stall_cv_;
// Waits for w->state & goal_mask using w->StateMutex(). Returns
// the state that satisfies goal_mask.
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);

View File

@ -1183,8 +1183,13 @@ struct FlushOptions {
// If true, the flush will wait until the flush is done.
// Default: true
bool wait;
FlushOptions() : wait(true) {}
// If true, the flush would proceed immediately even it means writes will
// stall for the duration of the flush; if false the operation will wait
// until it's possible to do flush w/o causing stall or until required flush
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}
};
// Create a Logger from provided DBOptions
@ -1196,6 +1201,9 @@ extern Status CreateLoggerFromOptions(const std::string& dbname,
struct CompactionOptions {
// Compaction output compression type
// Default: snappy
// If set to `kDisableCompressionOption`, RocksDB will choose compression type
// according to the `ColumnFamilyOptions`, taking into account the output
// level if `compression_per_level` is specified.
CompressionType compression;
// Compaction will create files of size `output_file_size_limit`.
// Default: MAX, which means that compaction will create a single file

View File

@ -6,7 +6,7 @@
#define ROCKSDB_MAJOR 5
#define ROCKSDB_MINOR 16
#define ROCKSDB_PATCH 0
#define ROCKSDB_PATCH 6
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -612,7 +612,11 @@ void txn_write_kv_parts_helper(JNIEnv* env,
const jint& jkey_parts_len,
const jobjectArray& jvalue_parts,
const jint& jvalue_parts_len) {
#ifndef DEBUG
(void) jvalue_parts_len;
#else
assert(jkey_parts_len == jvalue_parts_len);
#endif
auto key_parts = std::vector<rocksdb::Slice>();
auto value_parts = std::vector<rocksdb::Slice>();

View File

@ -306,7 +306,11 @@ rocksdb::Status WriteBatchHandlerJniCallback::PutBlobIndexCF(uint32_t column_fam
}
rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare(bool unprepare) {
#ifndef DEBUG
(void) unprepare;
#else
assert(!unprepare);
#endif
m_env->CallVoidMethod(m_jcallback_obj, m_jMarkBeginPrepareMethodId);
// check for Exception, in-particular RocksDBException

2
src.mk
View File

@ -122,6 +122,7 @@ LIB_SOURCES = \
table/plain_table_reader.cc \
table/sst_file_writer.cc \
table/table_properties.cc \
tools/trace_analyzer_tool.cc \
table/two_level_iterator.cc \
tools/dump/db_dump_tool.cc \
util/arena.cc \
@ -161,6 +162,7 @@ LIB_SOURCES = \
utilities/blob_db/blob_compaction_filter.cc \
utilities/blob_db/blob_db.cc \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_db_impl_filesnapshot.cc \
utilities/blob_db/blob_file.cc \
utilities/blob_db/blob_log_format.cc \
utilities/blob_db/blob_log_reader.cc \

View File

@ -235,7 +235,7 @@ void DataBlockIter::Seek(const Slice& target) {
//
// If the return value is FALSE, iter location is undefined, and it means:
// 1) there is no key in this block falling into the range:
// ["seek_user_key @ type | seqno", "seek_user_key @ type | 0"],
// ["seek_user_key @ type | seqno", "seek_user_key @ kTypeDeletion | 0"],
// inclusive; AND
// 2) the last key of this block has a greater user_key from seek_user_key
//
@ -243,13 +243,21 @@ void DataBlockIter::Seek(const Slice& target) {
// 1) If iter is valid, it is set to a location as if set by BinarySeek. In
// this case, it points to the first key_ with a larger user_key or a
// matching user_key with a seqno no greater than the seeking seqno.
// 2) If the iter is invalid, it means either the block has no such user_key,
// or the block ends with a matching user_key but with a larger seqno.
// 2) If the iter is invalid, it means that either all the user_key is less
// than the seek_user_key, or the block ends with a matching user_key but
// with a smaller [ type | seqno ] (i.e. a larger seqno, or the same seqno
// but larger type).
bool DataBlockIter::SeekForGetImpl(const Slice& target) {
Slice user_key = ExtractUserKey(target);
uint32_t map_offset = restarts_ + num_restarts_ * sizeof(uint32_t);
uint8_t entry = data_block_hash_index_->Lookup(data_, map_offset, user_key);
if (entry == kCollision) {
// HashSeek not effective, falling back
Seek(target);
return true;
}
if (entry == kNoEntry) {
// Even if we cannot find the user_key in this block, the result may
// exist in the next block. Consider this exmpale:
@ -260,16 +268,13 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
//
// If seek_key = axy@60, the search will starts from Block N.
// Even if the user_key is not found in the hash map, the caller still
// have to conntinue searching the next block. So we invalidate the
// iterator to tell the caller to go on.
current_ = restarts_; // Invalidate the iter
return true;
}
if (entry == kCollision) {
// HashSeek not effective, falling back
Seek(target);
return true;
// have to conntinue searching the next block.
//
// In this case, we pretend the key is the the last restart interval.
// The while-loop below will search the last restart interval for the
// key. It will stop at the first key that is larger than the seek_key,
// or to the end of the block if no one is larger.
entry = static_cast<uint8_t>(num_restarts_ - 1);
}
uint32_t restart_index = entry;
@ -299,24 +304,26 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
}
if (current_ == restarts_) {
// Search reaches to the end of the block. There are two possibilites;
// Search reaches to the end of the block. There are three possibilites:
// 1) there is only one user_key match in the block (otherwise collsion).
// the matching user_key resides in the last restart interval.
// it is the last key of the restart interval and of the block too.
// ParseNextDataKey() skiped it as its seqno is newer.
// the matching user_key resides in the last restart interval, and it
// is the last key of the restart interval and of the block as well.
// ParseNextDataKey() skiped it as its [ type | seqno ] is smaller.
//
// 2) The seek_key is a false positive and got hashed to the last restart
// interval.
// All existing keys in the restart interval are less than seek_key.
// 2) The seek_key is not found in the HashIndex Lookup(), i.e. kNoEntry,
// AND all existing user_keys in the restart interval are smaller than
// seek_user_key.
//
// The result may exist in the next block in either case, so may_exist is
// returned as true.
// 3) The seek_key is a false positive and happens to be hashed to the
// last restart interval, AND all existing user_keys in the restart
// interval are smaller than seek_user_key.
//
// The result may exist in the next block each case, so we return true.
return true;
}
if (user_comparator_->Compare(key_.GetUserKey(), user_key) != 0) {
// the key is not in this block and cannot be at the next block either.
// return false to tell the caller to break from the top-level for-loop
return false;
}
@ -349,10 +356,10 @@ void IndexBlockIter::Seek(const Slice& target) {
ok = PrefixSeek(target, &index);
} else if (value_delta_encoded_) {
ok = BinarySeek<DecodeKeyV4>(seek_key, 0, num_restarts_ - 1, &index,
active_comparator_);
comparator_);
} else {
ok = BinarySeek<DecodeKey>(seek_key, 0, num_restarts_ - 1, &index,
active_comparator_);
comparator_);
}
if (!ok) {
@ -725,13 +732,16 @@ uint32_t Block::NumRestarts() const {
uint32_t block_footer = DecodeFixed32(data_ + size_ - sizeof(uint32_t));
uint32_t num_restarts = block_footer;
if (size_ > kMaxBlockSizeSupportedByHashIndex) {
// We ensure a block with HashIndex is less than 64KiB in BlockBuilder.
// Therefore the footer cannot be encoded as a packed index type and
// In BlockBuilder, we have ensured a block with HashIndex is less than
// kMaxBlockSizeSupportedByHashIndex (64KiB).
//
// Therefore, if we encounter a block with a size > 64KiB, the block
// cannot have HashIndex. So the footer will directly interpreted as
// num_restarts.
// Such check can ensure legacy block with a vary large num_restarts
// i.e. >= 0x10000000 can be interpreted correctly as no HashIndex.
// If a legacy block hash a num_restarts >= 0x10000000, size_ will be
// much large than 64KiB.
//
// Such check is for backward compatibility. We can ensure legacy block
// with a vary large num_restarts i.e. >= 0x80000000 can be interpreted
// correctly as no HashIndex even if the MSB of num_restarts is set.
return num_restarts;
}
BlockBasedTableOptions::DataBlockIndexType index_type;
@ -752,7 +762,11 @@ BlockBasedTableOptions::DataBlockIndexType Block::IndexType() const {
return index_type;
}
Block::~Block() { TEST_SYNC_POINT("Block::~Block"); }
Block::~Block() {
// This sync point can be re-enabled if RocksDB can control the
// initialization order of any/all static options created by the user.
// TEST_SYNC_POINT("Block::~Block");
}
Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics)

View File

@ -468,10 +468,10 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
BlockPrefixIndex* prefix_index, bool key_includes_seq,
bool value_is_full, bool block_contents_pinned,
DataBlockHashIndex* /*data_block_hash_index*/) {
InitializeBase(comparator, data, restarts, num_restarts,
kDisableGlobalSequenceNumber, block_contents_pinned);
InitializeBase(key_includes_seq ? comparator : user_comparator, data,
restarts, num_restarts, kDisableGlobalSequenceNumber,
block_contents_pinned);
key_includes_seq_ = key_includes_seq;
active_comparator_ = key_includes_seq_ ? comparator_ : user_comparator;
key_.SetIsUserKey(!key_includes_seq_);
prefix_index_ = prefix_index;
value_delta_encoded_ = !value_is_full;
@ -517,8 +517,6 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
// Key is in InternalKey format
bool key_includes_seq_;
bool value_delta_encoded_;
// key_includes_seq_ ? comparator_ : user_comparator_
const Comparator* active_comparator_;
BlockPrefixIndex* prefix_index_;
// Whether the value is delta encoded. In that case the value is assumed to be
// BlockHandle. The first value in each restart interval is the full encoded
@ -535,11 +533,11 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
inline int CompareBlockKey(uint32_t block_index, const Slice& target);
inline int Compare(const Slice& a, const Slice& b) const {
return active_comparator_->Compare(a, b);
return comparator_->Compare(a, b);
}
inline int Compare(const IterKey& ikey, const Slice& b) const {
return active_comparator_->Compare(ikey.GetKey(), b);
return comparator_->Compare(ikey.GetKey(), b);
}
inline bool ParseNextIndexKey();

View File

@ -54,13 +54,6 @@ void BlockHandle::EncodeTo(std::string* dst) const {
PutVarint64Varint64(dst, offset_, size_);
}
void BlockHandle::EncodeSizeTo(std::string* dst) const {
// Sanity check that all fields have been set
assert(offset_ != ~static_cast<uint64_t>(0));
assert(size_ != ~static_cast<uint64_t>(0));
PutVarint64(dst, size_);
}
Status BlockHandle::DecodeFrom(Slice* input) {
if (GetVarint64(input, &offset_) &&
GetVarint64(input, &size_)) {

View File

@ -55,7 +55,6 @@ class BlockHandle {
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* input);
Status DecodeSizeFrom(uint64_t offset, Slice* input);
void EncodeSizeTo(std::string* dst) const;
// Return a string that contains the copy of handle.
std::string ToString(bool hex = true) const;

View File

@ -79,7 +79,10 @@ Slice PartitionedFilterBlockBuilder::Finish(
std::string handle_encoding;
last_partition_block_handle.EncodeTo(&handle_encoding);
std::string handle_delta_encoding;
last_partition_block_handle.EncodeSizeTo(&handle_delta_encoding);
PutVarsignedint64(
&handle_delta_encoding,
last_partition_block_handle.size() - last_encoded_handle_.size());
last_encoded_handle_ = last_partition_block_handle;
const Slice handle_delta_encoding_slice(handle_delta_encoding);
index_on_filter_block_builder_.Add(last_entry.key, handle_encoding,
&handle_delta_encoding_slice);

View File

@ -66,6 +66,7 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
uint32_t filters_in_partition_;
// Number of keys added
size_t num_added_;
BlockHandle last_encoded_handle_;
};
class PartitionedFilterBlockReader : public FilterBlockReader,

View File

@ -50,7 +50,9 @@ class MockedBlockBasedTable : public BlockBasedTable {
}
};
class PartitionedFilterBlockTest : public testing::Test {
class PartitionedFilterBlockTest
: public testing::Test,
virtual public ::testing::WithParamInterface<uint32_t> {
public:
BlockBasedTableOptions table_options_;
InternalKeyComparator icomp = InternalKeyComparator(BytewiseComparator());
@ -60,6 +62,8 @@ class PartitionedFilterBlockTest : public testing::Test {
table_options_.no_block_cache = true; // Otherwise BlockBasedTable::Close
// will access variable that are not
// initialized in our mocked version
table_options_.format_version = GetParam();
table_options_.index_block_restart_interval = 3;
}
std::shared_ptr<Cache> cache_;
@ -279,14 +283,19 @@ class PartitionedFilterBlockTest : public testing::Test {
}
};
TEST_F(PartitionedFilterBlockTest, EmptyBuilder) {
INSTANTIATE_TEST_CASE_P(FormatDef, PartitionedFilterBlockTest,
testing::Values(test::kDefaultFormatVersion));
INSTANTIATE_TEST_CASE_P(FormatLatest, PartitionedFilterBlockTest,
testing::Values(test::kLatestFormatVersion));
TEST_P(PartitionedFilterBlockTest, EmptyBuilder) {
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder(NewBuilder(pib.get()));
const bool empty = true;
VerifyReader(builder.get(), pib.get(), empty);
}
TEST_F(PartitionedFilterBlockTest, OneBlock) {
TEST_P(PartitionedFilterBlockTest, OneBlock) {
uint64_t max_index_size = MaxIndexSize();
for (uint64_t i = 1; i < max_index_size + 1; i++) {
table_options_.metadata_block_size = i;
@ -294,7 +303,7 @@ TEST_F(PartitionedFilterBlockTest, OneBlock) {
}
}
TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) {
TEST_P(PartitionedFilterBlockTest, TwoBlocksPerKey) {
uint64_t max_index_size = MaxIndexSize();
for (uint64_t i = 1; i < max_index_size + 1; i++) {
table_options_.metadata_block_size = i;
@ -304,7 +313,7 @@ TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) {
// This reproduces the bug that a prefix is the same among multiple consecutive
// blocks but the bug would add it only to the first block.
TEST_F(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
// some small number to cause partition cuts
table_options_.metadata_block_size = 1;
std::unique_ptr<const SliceTransform> prefix_extractor
@ -330,7 +339,7 @@ TEST_F(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
}
}
TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) {
TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) {
uint64_t max_index_size = MaxIndexSize();
for (uint64_t i = 1; i < max_index_size + 1; i++) {
table_options_.metadata_block_size = i;
@ -338,7 +347,7 @@ TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) {
}
}
TEST_F(PartitionedFilterBlockTest, PartitionCount) {
TEST_P(PartitionedFilterBlockTest, PartitionCount) {
int num_keys = sizeof(keys) / sizeof(*keys);
table_options_.metadata_block_size =
std::max(MaxIndexSize(), MaxFilterSize());

View File

@ -197,6 +197,15 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
return s;
}
Status FaultInjectionTestEnv::NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) {
if (!IsFilesystemActive()) {
return GetError();
}
return target()->NewRandomAccessFile(fname, result, soptions);
}
Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
if (!IsFilesystemActive()) {
return GetError();

View File

@ -111,6 +111,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;
virtual Status DeleteFile(const std::string& f) override;
virtual Status RenameFile(const std::string& s,

View File

@ -304,13 +304,17 @@ void BlobDBImpl::CloseRandomAccessLocked(
open_file_count_--;
}
std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader(
const std::shared_ptr<BlobFile>& bfile, Env* env,
const EnvOptions& env_options) {
Status BlobDBImpl::GetBlobFileReader(
const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<RandomAccessFileReader>* reader) {
assert(reader != nullptr);
bool fresh_open = false;
auto rar = bfile->GetOrOpenRandomAccessReader(env, env_options, &fresh_open);
if (fresh_open) open_file_count_++;
return rar;
Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
if (s.ok() && fresh_open) {
assert(*reader != nullptr);
open_file_count_++;
}
return s;
}
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
@ -621,39 +625,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
return db_->Write(options, blob_inserter.batch());
}
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
if (!s.ok()) {
return s;
}
ret.reserve(ret.size() + blob_files_.size());
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
ret.emplace_back(blob_file->PathName());
}
return Status::OK();
}
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
db_->GetLiveFilesMetaData(metadata);
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
LiveFileMetaData filemetadata;
filemetadata.size = blob_file->GetFileSize();
filemetadata.name = blob_file->PathName();
auto cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
filemetadata.column_family_name = cfh->GetName();
metadata->emplace_back(filemetadata);
}
}
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
return PutUntil(options, key, value, kNoExpiration);
@ -1031,8 +1002,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
}
// takes locks when called
std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, env_, env_options_);
std::shared_ptr<RandomAccessFileReader> reader;
s = GetBlobFileReader(bfile, &reader);
if (!s.ok()) {
return s;
}
assert(blob_index.offset() > key.size() + sizeof(uint32_t));
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
@ -1680,16 +1654,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
}
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
if (aborted) return std::make_pair(false, -1);
if (aborted) {
return std::make_pair(false, -1);
}
{
ReadLock rl(&mutex_);
if (obsolete_files_.empty()) return std::make_pair(true, -1);
MutexLock delete_file_lock(&delete_file_mutex_);
if (disable_file_deletions_ > 0) {
return std::make_pair(true, -1);
}
std::list<std::shared_ptr<BlobFile>> tobsolete;
{
WriteLock wl(&mutex_);
if (obsolete_files_.empty()) {
return std::make_pair(true, -1);
}
tobsolete.swap(obsolete_files_);
}

View File

@ -155,12 +155,6 @@ class BlobDBImpl : public BlobDB {
virtual Status Close() override;
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData>* ) override;
using BlobDB::PutWithTTL;
Status PutWithTTL(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) override;
@ -175,6 +169,15 @@ class BlobDBImpl : public BlobDB {
const DBOptions& db_options,
const ColumnFamilyOptions& cf_options);
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
~BlobDBImpl();
Status Open(std::vector<ColumnFamilyHandle*>* handles);
@ -293,11 +296,8 @@ class BlobDBImpl : public BlobDB {
// Open all blob files found in blob_dir.
Status OpenAllBlobFiles();
// hold write mutex on file and call
// creates a Random Access reader for GET call
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
const std::shared_ptr<BlobFile>& bfile, Env* env,
const EnvOptions& env_options);
Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<RandomAccessFileReader>* reader);
// hold write mutex on file and call.
// Close the above Random Access reader
@ -408,6 +408,26 @@ class BlobDBImpl : public BlobDB {
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
// DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
// on the mutex to avoid contention.
//
// While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
// the difference. mutex_ only needs to be held when access the
// data-structure, and delete_file_mutex_ needs to be held the whole time
// during DeleteObsoleteFiles to avoid being run simultaneously with
// DisableFileDeletions.
//
// If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
// to hold delete_file_mutex_ first to avoid deadlock.
mutable port::Mutex delete_file_mutex_;
// Each call of DisableFileDeletions will increase disable_file_deletion_
// by 1. EnableFileDeletions will either decrease the count by 1 or reset
// it to zeor, depending on the force flag.
//
// REQUIRES: access with delete_file_mutex_ held.
int disable_file_deletions_ = 0;
uint32_t debug_level_;
};

View File

@ -0,0 +1,97 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db_impl.h"
#include "util/logging.h"
#include "util/mutexlock.h"
// BlobDBImpl methods to get snapshot of files, e.g. for replication.
namespace rocksdb {
namespace blob_db {
Status BlobDBImpl::DisableFileDeletions() {
// Disable base DB file deletions.
Status s = db_impl_->DisableFileDeletions();
if (!s.ok()) {
return s;
}
int count = 0;
{
// Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job
// is running.
MutexLock l(&delete_file_mutex_);
count = ++disable_file_deletions_;
}
ROCKS_LOG_INFO(db_options_.info_log,
"Disalbed blob file deletions. count: %d", count);
return Status::OK();
}
Status BlobDBImpl::EnableFileDeletions(bool force) {
// Enable base DB file deletions.
Status s = db_impl_->EnableFileDeletions(force);
if (!s.ok()) {
return s;
}
int count = 0;
{
MutexLock l(&delete_file_mutex_);
if (force) {
disable_file_deletions_ = 0;
} else if (disable_file_deletions_ > 0) {
count = --disable_file_deletions_;
}
assert(count >= 0);
}
ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
count);
// Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to
// make DeleteobsoleteFiles re-run interval configuration.
return Status::OK();
}
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
if (!s.ok()) {
return s;
}
ret.reserve(ret.size() + blob_files_.size());
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
ret.emplace_back(blob_file->PathName());
}
return Status::OK();
}
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
db_->GetLiveFilesMetaData(metadata);
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
LiveFileMetaData filemetadata;
filemetadata.size = blob_file->GetFileSize();
filemetadata.name = blob_file->PathName();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
filemetadata.column_family_name = cfh->GetName();
metadata->emplace_back(filemetadata);
}
}
} // namespace blob_db
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -16,6 +16,7 @@
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "util/cast_util.h"
#include "util/fault_injection_test_env.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
@ -40,6 +41,7 @@ class BlobDBTest : public testing::Test {
BlobDBTest()
: dbname_(test::PerThreadDBPath("blob_db_test")),
mock_env_(new MockTimeEnv(Env::Default())),
fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok());
@ -236,6 +238,7 @@ class BlobDBTest : public testing::Test {
const std::string dbname_;
std::unique_ptr<MockTimeEnv> mock_env_;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
BlobDB *blob_db_;
}; // class BlobDBTest
@ -354,6 +357,23 @@ TEST_F(BlobDBTest, GetExpiration) {
ASSERT_EQ(300 /* = 100 + 200 */, expiration);
}
TEST_F(BlobDBTest, GetIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0; // Make sure value write to blob file
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
PinnableSlice value;
ASSERT_OK(Put("foo", "bar"));
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
ASSERT_TRUE(s.IsIOError());
// Reactivate file system to allow test to close DB.
fault_injection_env_->SetFilesystemActive(true);
}
TEST_F(BlobDBTest, WriteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;
@ -461,7 +481,6 @@ TEST_F(BlobDBTest, DecompressAfterReopen) {
Reopen(bdb_options);
VerifyDB(data);
}
#endif
TEST_F(BlobDBTest, MultipleWriters) {
@ -1415,6 +1434,47 @@ TEST_F(BlobDBTest, EvictExpiredFile) {
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
}
TEST_F(BlobDBTest, DisableFileDeletions) {
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (bool force : {true, false}) {
ASSERT_OK(Put("foo", "v", &data));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
// Call DisableFileDeletions twice.
ASSERT_OK(blob_db_->DisableFileDeletions());
ASSERT_OK(blob_db_->DisableFileDeletions());
// File deletions should be disabled.
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB(data);
// Enable file deletions once. If force=true, file deletion is enabled.
// Otherwise it needs to enable it for a second time.
ASSERT_OK(blob_db_->EnableFileDeletions(force));
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (!force) {
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB(data);
// Call EnableFileDeletions a second time.
ASSERT_OK(blob_db_->EnableFileDeletions(false));
blob_db_impl()->TEST_DeleteObsoleteFiles();
}
// Regardless of value of `force`, file should be deleted by now.
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB({});
}
}
} // namespace blob_db
} // namespace rocksdb

View File

@ -191,36 +191,48 @@ void BlobFile::CloseRandomAccessLocked() {
last_access_ = -1;
}
std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open) {
Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open) {
assert(reader != nullptr);
assert(fresh_open != nullptr);
*fresh_open = false;
int64_t current_time = 0;
env->GetCurrentTime(&current_time);
last_access_.store(current_time);
Status s;
{
ReadLock lockbfile_r(&mutex_);
if (ra_file_reader_) return ra_file_reader_;
if (ra_file_reader_) {
*reader = ra_file_reader_;
return s;
}
}
WriteLock lockbfile_w(&mutex_);
if (ra_file_reader_) return ra_file_reader_;
// Double check.
if (ra_file_reader_) {
*reader = ra_file_reader_;
return s;
}
std::unique_ptr<RandomAccessFile> rfile;
Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file for random-read: %s status: '%s'"
" exists: '%s'",
PathName().c_str(), s.ToString().c_str(),
env->FileExists(PathName()).ToString().c_str());
return nullptr;
return s;
}
ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile),
PathName());
*reader = ra_file_reader_;
*fresh_open = true;
return ra_file_reader_;
return s;
}
Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {

View File

@ -181,6 +181,10 @@ class BlobFile {
// footer_valid_ to false and return Status::OK.
Status ReadMetadata(Env* env, const EnvOptions& env_options);
Status GetReader(Env* env, const EnvOptions& env_options,
std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open);
private:
std::shared_ptr<Reader> OpenRandomAccessReader(
Env* env, const DBOptions& db_options,
@ -190,9 +194,6 @@ class BlobFile {
Status WriteFooterAndCloseLocked();
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open);
void CloseRandomAccessLocked();
// this is used, when you are reading only the footer of a

View File

@ -1717,7 +1717,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
}
// flush only cfa memtable
s = db_impl->TEST_FlushMemTable(true, cfa);
s = db_impl->TEST_FlushMemTable(true, false, cfa);
ASSERT_OK(s);
switch (txn_db_options.write_policy) {
@ -1736,7 +1736,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
}
// flush only cfb memtable
s = db_impl->TEST_FlushMemTable(true, cfb);
s = db_impl->TEST_FlushMemTable(true, false, cfb);
ASSERT_OK(s);
// should show not dependency on logs
@ -3786,7 +3786,7 @@ TEST_P(TransactionTest, SavepointTest3) {
s = txn1->Put("A", "");
ASSERT_OK(s);
s = txn1->PopSavePoint(); // Still no SavePoint present
ASSERT_TRUE(s.IsNotFound());
@ -3796,21 +3796,21 @@ TEST_P(TransactionTest, SavepointTest3) {
ASSERT_OK(s);
s = txn1->PopSavePoint(); // Remove 1
ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
// Verify that "A" is still locked
// Verify that "A" is still locked
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2);
s = txn2->Put("A", "a2");
ASSERT_TRUE(s.IsTimedOut());
delete txn2;
txn1->SetSavePoint(); // 2
s = txn1->Put("B", "b");
ASSERT_OK(s);
txn1->SetSavePoint(); // 3
s = txn1->Put("B", "b2");
@ -3820,7 +3820,7 @@ TEST_P(TransactionTest, SavepointTest3) {
s = txn1->PopSavePoint();
ASSERT_OK(s);
s = txn1->PopSavePoint();
ASSERT_TRUE(s.IsNotFound());
@ -3834,12 +3834,12 @@ TEST_P(TransactionTest, SavepointTest3) {
s = db->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a", value);
// tnx1 should have set "B" to just "b"
s = db->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_EQ("b", value);
s = db->Get(read_options, "C", &value);
ASSERT_TRUE(s.IsNotFound());
}
@ -5511,7 +5511,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
db->FlushWAL(true);
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5549,7 +5549,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5582,7 +5582,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5609,7 +5609,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5636,7 +5636,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");

View File

@ -352,14 +352,16 @@ class WBWIIteratorImpl : public WBWIIterator {
}
virtual void SeekToFirst() override {
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_, 0, 0);
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
virtual void SeekToLast() override {
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_ + 1, 0, 0);
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_ + 1,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast();
@ -369,12 +371,16 @@ class WBWIIteratorImpl : public WBWIIterator {
}
virtual void Seek(const Slice& key) override {
WriteBatchIndexEntry search_entry(&key, column_family_id_);
WriteBatchIndexEntry search_entry(&key, column_family_id_,
true /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
virtual void SeekForPrev(const Slice& key) override {
WriteBatchIndexEntry search_entry(&key, column_family_id_);
WriteBatchIndexEntry search_entry(&key, column_family_id_,
false /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.SeekForPrev(&search_entry);
}

View File

@ -85,6 +85,20 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
return Status::OK();
}
// If both of `entry1` and `entry2` point to real entry in write batch, we
// compare the entries as following:
// 1. first compare the column family, the one with larger CF will be larger;
// 2. Inside the same CF, we first decode the entry to find the key of the entry
// and the entry with larger key will be larger;
// 3. If two entries are of the same CF and offset, the one with larger offset
// will be larger.
// Some times either `entry1` or `entry2` is dummy entry, which is actually
// a search key. In this case, in step 2, we don't go ahead and decode the
// entry but use the value in WriteBatchIndexEntry::search_key.
// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
// This indicate that we are going to seek to the first of the column family.
// Once we see this, this entry will be smaller than all the real entries of
// the column family.
int WriteBatchEntryComparator::operator()(
const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const {
@ -94,9 +108,10 @@ int WriteBatchEntryComparator::operator()(
return -1;
}
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
// Deal with special case of seeking to the beginning of a column family
if (entry1->is_min_in_cf()) {
return -1;
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
} else if (entry2->is_min_in_cf()) {
return 1;
}

View File

@ -31,21 +31,52 @@ struct WriteBatchIndexEntry {
key_offset(ko),
key_size(ksz),
search_key(nullptr) {}
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
: offset(0),
column_family(c),
// Create a dummy entry as the search key. This index entry won't be backed
// by an entry from the write batch, but a pointer to the search key. Or a
// special flag of offset can indicate we are seek to first.
// @_search_key: the search key
// @_column_family: column family
// @is_forward_direction: true for Seek(). False for SeekForPrev()
// @is_seek_to_first: true if we seek to the beginning of the column family
// _search_key should be null in this case.
WriteBatchIndexEntry(const Slice* _search_key, uint32_t _column_family,
bool is_forward_direction, bool is_seek_to_first)
// For SeekForPrev(), we need to make the dummy entry larger than any
// entry who has the same search key. Otherwise, we'll miss those entries.
: offset(is_forward_direction ? 0 : port::kMaxSizet),
column_family(_column_family),
key_offset(0),
key_size(0),
search_key(sk) {}
key_size(is_seek_to_first ? kFlagMinInCf : 0),
search_key(_search_key) {
assert(_search_key != nullptr || is_seek_to_first);
}
// If this flag appears in the offset, it indicates a key that is smaller
// than any other entry for the same column family
static const size_t kFlagMin = port::kMaxSizet;
// If this flag appears in the key_size, it indicates a
// key that is smaller than any other entry for the same column family.
static const size_t kFlagMinInCf = port::kMaxSizet;
size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry.
bool is_min_in_cf() const {
assert(key_size != kFlagMinInCf ||
(key_offset == 0 && search_key == nullptr));
return key_size == kFlagMinInCf;
}
// offset of an entry in write batch's string buffer. If this is a dummy
// lookup key, in which case search_key != nullptr, offset is set to either
// 0 or max, only for comparison purpose. Because when entries have the same
// key, the entry with larger offset is larger, offset = 0 will make a seek
// key small or equal than all the entries with the seek key, so that Seek()
// will find all the entries of the same key. Similarly, offset = MAX will
// make the entry just larger than all entries with the search key so
// SeekForPrev() will see all the keys with the same key.
size_t offset;
uint32_t column_family; // c1olumn family of the entry.
size_t key_offset; // offset of the key in write batch's string buffer.
size_t key_size; // size of the key.
size_t key_size; // size of the key. kFlagMinInCf indicates
// that this is a dummy look up entry for
// SeekToFirst() to the beginning of the column
// family. We use the flag here to save a boolean
// in the struct.
const Slice* search_key; // if not null, instead of reading keys from
// write batch, use it to compare. This is used

View File

@ -621,7 +621,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
for (int i = 0; i < 128; i++) {
// Random walk and make sure iter and result_iter returns the
// same key and value
int type = rnd.Uniform(5);
int type = rnd.Uniform(6);
ASSERT_OK(iter->status());
switch (type) {
case 0:
@ -642,7 +642,15 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
result_iter->Seek(key);
break;
}
case 3:
case 3: {
// SeekForPrev to random key
auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
auto key = source_strings[key_idx];
iter->SeekForPrev(key);
result_iter->SeekForPrev(key);
break;
}
case 4:
// Next
if (is_valid) {
iter->Next();
@ -652,7 +660,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
}
break;
default:
assert(type == 4);
assert(type == 5);
// Prev
if (is_valid) {
iter->Prev();