Compare commits

...

25 Commits
main ... 6.3.fb

Author SHA1 Message Date
Vijay Nadimpalli
efb42cfa47 Making platform 007 (gcc 7) default in build_detect_platform.sh (#5947)
Summary:
Making platform 007 (gcc 7) default in build_detect_platform.sh.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5947

Differential Revision: D18038837

Pulled By: vjnadimpalli

fbshipit-source-id: 9ac2ddaa93bf328a416faec028970e039886378e
2019-10-30 10:30:21 -07:00
Adam Retter
29b8a70a2e Fixes for building RocksJava releases on arm64v8
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/5674

Differential Revision: D16870338

fbshipit-source-id: c8dac644b1479fa734b491f3a8d50151772290f7
2019-10-23 15:19:39 -07:00
sdong
d47cdbc188 Bump up version to 6.3.6 2019-10-01 17:04:43 -07:00
sdong
56cb92bec6 Revert "Merging iterator to avoid child iterator reseek for some cases (#5286)"
This reverts commit 9fad3e21eb.
2019-10-01 16:48:35 -07:00
sdong
fbf178ec68 Bump up the version to 6.3.5 2019-09-17 17:32:10 -07:00
sdong
648b0141fa Merging iterator to disble reseek optimization in prefix seek (#5815)
Summary:
We are seeing a bug of wrong results with merging iterator's reseek avoidence feature and prefix extractor. Disable this optimization for now.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5815

Test Plan: Validated the same MyRocks case was fixed; run all existing tests.

Differential Revision: D17430776

fbshipit-source-id: aef664277ba0ab8a2e68331ff0db6ae682535371
2019-09-17 17:30:56 -07:00
Yanqin Jin
d3f28558e7 Bump patch version to 6.3.4 2019-09-03 13:17:54 -07:00
Yanqin Jin
17a3c25564 Fix a bug in file ingestion (#5760)
Summary:
Before this PR, when the number of column families involved in a file ingestion exceeds 2, a bug in the looping logic prevents correct file number being assigned to each ingestion job.
Also skip deleting non-existing hard links during cleanup-after-failure.

Test plan (devserver)
```
$COMPILE_WITH_ASAN=1 make all
$./external_sst_file_test --gtest_filter=ExternalSSTFileTest/ExternalSSTFileTest.IngestFilesIntoMultipleColumnFamilies_*/*
$makke check
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5760

Differential Revision: D17142982

Pulled By: riversand963

fbshipit-source-id: 06c1847a4e7a402647bcf28d124e70f2a0f9daf6
2019-09-03 13:05:22 -07:00
Maysam Yabandeh
4ff493bb38 Disable snapshot refresh feature when snap_refresh_nanos is 0 (#5724)
Summary:
The comments of snap_refresh_nanos advertise that the snapshot refresh feature will be disabled when the option is set to 0. This contract is however not honored in the code: https://github.com/facebook/rocksdb/pull/5278
The patch fixes that and also adds an assert to ensure that the feature is not used when the option  is zero.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5724

Differential Revision: D16918185

Pulled By: maysamyabandeh

fbshipit-source-id: fec167287df7d85093e087fc39c0eb243e3bbd7e
2019-08-20 13:20:32 -07:00
Levi Tamasi
0e7d927217 Update HISTORY.md for 6.3.2 2019-08-16 11:26:11 -07:00
Levi Tamasi
bb2ab26b9f Fix regression affecting partitioned indexes/filters when cache_index_and_filter_blocks is false (#5705)
Summary:
PR https://github.com/facebook/rocksdb/issues/5298 (and subsequent related patches) unintentionally changed the
semantics of cache_index_and_filter_blocks: historically, this option
only affected the main index/filter block; with the changes, it affects
index/filter partitions as well. This can cause performance issues when
cache_index_and_filter_blocks is false since in this case, partitions are
neither cached nor preloaded (i.e. they are loaded on demand upon each
access). The patch reverts to the earlier behavior, that is, partitions
are cached similarly to data blocks regardless of the value of the above
option.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5705

Test Plan:
make check
./db_bench -benchmarks=fillrandom --statistics --stats_interval_seconds=1 --duration=30 --num=500000000 --bloom_bits=20 --partition_index_and_filters=true --cache_index_and_filter_blocks=false
./db_bench -benchmarks=readrandom --use_existing_db --statistics --stats_interval_seconds=1 --duration=10 --num=500000000 --bloom_bits=20 --partition_index_and_filters=true --cache_index_and_filter_blocks=false --cache_size=8000000000

Relevant statistics from the readrandom benchmark with the old code:

rocksdb.block.cache.index.miss COUNT : 0
rocksdb.block.cache.index.hit COUNT : 0
rocksdb.block.cache.index.add COUNT : 0
rocksdb.block.cache.index.bytes.insert COUNT : 0
rocksdb.block.cache.index.bytes.evict COUNT : 0
rocksdb.block.cache.filter.miss COUNT : 0
rocksdb.block.cache.filter.hit COUNT : 0
rocksdb.block.cache.filter.add COUNT : 0
rocksdb.block.cache.filter.bytes.insert COUNT : 0
rocksdb.block.cache.filter.bytes.evict COUNT : 0

With the new code:

rocksdb.block.cache.index.miss COUNT : 2500
rocksdb.block.cache.index.hit COUNT : 42696
rocksdb.block.cache.index.add COUNT : 2500
rocksdb.block.cache.index.bytes.insert COUNT : 4050048
rocksdb.block.cache.index.bytes.evict COUNT : 0
rocksdb.block.cache.filter.miss COUNT : 2500
rocksdb.block.cache.filter.hit COUNT : 4550493
rocksdb.block.cache.filter.add COUNT : 2500
rocksdb.block.cache.filter.bytes.insert COUNT : 10331040
rocksdb.block.cache.filter.bytes.evict COUNT : 0

Differential Revision: D16817382

Pulled By: ltamasi

fbshipit-source-id: 28a516b0da1f041a03313e0b70b28cf5cf205d00
2019-08-15 12:54:34 -07:00
Levi Tamasi
4ce0542eeb Make the 'block read count' performance counters consistent (#5484)
Summary:
The patch brings the semantics of per-block-type read performance
context counters in sync with the generic block_read_count by only
incrementing the counter if the block was actually read from the file.
It also fixes index_block_read_count, which fell victim to the
refactoring in PR https://github.com/facebook/rocksdb/issues/5298.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5484

Test Plan: Extended the unit tests.

Differential Revision: D15887431

Pulled By: ltamasi

fbshipit-source-id: a3889759d0ac5759d56625d692cd828d1b9207a6
2019-08-15 11:48:40 -07:00
Levi Tamasi
0d3679d0af Revert to respecting only the read_tier read option for index blocks (#5481)
Summary:
PR https://github.com/facebook/rocksdb/issues/5298 subtly changed how read options are applied to the index block
during a Get, MultiGet, or iteration. Earlier, only the read_tier option
applied to the index block read; since PR https://github.com/facebook/rocksdb/issues/5298, fill_cache and
verify_checksums also have an effect. This patch restores the earlier
behavior to prevent surprise memory increases for clients due to the
index block not being cached.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5481

Test Plan: make check

Differential Revision: D15883082

Pulled By: ltamasi

fbshipit-source-id: 9a065ec3a6db5a365cf6dd5e95190a20c5756356
2019-08-15 11:44:23 -07:00
Levi Tamasi
317a78197b Bump version to 6.3.2 2019-08-15 11:25:42 -07:00
Levi Tamasi
1a523dc802 Fix the changelog for 6.3.0 on the 6.3 branch as well 2019-07-31 10:18:33 -07:00
sdong
5369e08d83 Add HISTORY.md for the bug fix. 2019-07-24 13:01:39 -07:00
sdong
92f04d6c5f Fix regression bug of Auto rolling logger when handling failures (#5622)
Summary:
Auto roll logger fails to handle file creation error in the correct way, which may expose to seg fault condition to users. Fix it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5622

Test Plan: Add a unit test on creating file under a non-existing directory. The test fails without the fix.

Differential Revision: D16460853

fbshipit-source-id: e96da4bef4f16db171ea04a11b2ec5a9448ddbde
2019-07-24 12:57:07 -07:00
Maysam Yabandeh
8e7a29ca1f Disable refresh snapshot feature by default (#5606)
Summary:
There are concerns about the correctness of this patch. Disabling by default until the concerns are resolved.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5606

Differential Revision: D16428064

Pulled By: maysamyabandeh

fbshipit-source-id: a89280f0ea85796c9c9dfbfd9a8e91dad9b000b3
2019-07-22 20:09:26 -07:00
anand76
eff6e2a549 MultiGet parallel IO (#5464)
Summary:
Enhancement to MultiGet batching to read data blocks required for keys in a batch in parallel from disk. It uses Env::MultiRead() API to read multiple blocks and reduce latency.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5464

Test Plan:
1. make check
2. make asan_check
3. make asan_crash

Differential Revision: D15911771

Pulled By: anand1976

fbshipit-source-id: 605036b9af0f90ca0020dc87c3a86b4da6e83394
2019-07-12 15:38:53 -07:00
anand76
b6c329e406 Fix bugs in WAL trash file handling (#5520)
Summary:
1. Cleanup WAL trash files on open
2. Don't apply deletion rate limit if WAL dir is different from db dir
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5520

Test Plan: Add new unit tests and make check

Differential Revision: D16096750

Pulled By: anand1976

fbshipit-source-id: 6f07858ad864b754b711db416f0389c45ede599b
2019-07-12 13:58:35 -07:00
Yanqin Jin
3e1a6dd665 Ref and unref cfd before and after calling WaitForFlushMemTables (#5513)
Summary:
This is to prevent bg flush thread from unrefing and deleting the cfd that has been dropped by a concurrent thread.
Before RocksDB calls `DBImpl::WaitForFlushMemTables`, we should increase the refcount of each `ColumnFamilyData` so that its ref count will not drop to 0 even if the column family is dropped by another thread. Otherwise the bg flush thread can deref the cfd and deletes it, causing a segfault in `WaitForFlushMemtables` upon accessing `cfd`.

Test plan (on devserver):
```
$make clean && COMPILE_WITH_ASAN=1 make -j32
$make check
```
All unit tests must pass.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5513

Differential Revision: D16062898

Pulled By: riversand963

fbshipit-source-id: 37dc511f1dc99f036d0201bbd7f0a8f5677c763d
2019-07-11 23:47:58 -07:00
Yanqin Jin
1dcbe51bdf Add C binding for secondary instance (#5505)
Summary:
Add C binding for secondary instance as well as unit test.

Test plan (on devserver)
```
$make clean && COMPILE_WITH_ASAN=1 make -j20 all
$./c_test
$make check
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5505

Differential Revision: D16000043

Pulled By: riversand963

fbshipit-source-id: 3361ef6bfdf4ce12438cee7290a0ac203b5250bd
2019-07-11 23:47:29 -07:00
haoyuhuang
6d10405754 Block cache tracer: Do not populate block cache trace record when tracing is disabled. (#5510)
Summary:
This PR makes sure that trace record is not populated when tracing is disabled.

Before this PR:
DB path: [/data/mysql/rocks_regression_tests/OPTIONS-myrocks-40-33-10000000/2019-06-26-13-04-41/db]
readwhilewriting :       9.803 micros/op 1550408 ops/sec;  107.9 MB/s (5000000 of 5000000 found)
Microseconds per read:
Count: 80000000 Average: 9.8045  StdDev: 12.64
Min: 1  Median: 7.5246  Max: 25343
Percentiles: P50: 7.52 P75: 12.10 P99: 37.44 P99.9: 75.07 P99.99: 133.60

After this PR:
DB path: [/data/mysql/rocks_regression_tests/OPTIONS-myrocks-40-33-10000000/2019-06-26-14-08-21/db]
readwhilewriting :       8.723 micros/op 1662882 ops/sec;  115.8 MB/s (5000000 of 5000000 found)
Microseconds per read:
Count: 80000000 Average: 8.7236  StdDev: 12.19
Min: 1  Median: 6.7262  Max: 25229
Percentiles: P50: 6.73 P75: 10.50 P99: 31.54 P99.9: 74.81 P99.99: 132.82
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5510

Differential Revision: D16016428

Pulled By: HaoyuHuang

fbshipit-source-id: 3b3d11e6accf207d18ec2545b802aa01ee65901f
2019-07-10 12:08:55 -07:00
Vaibhav Gogte
b4f8173984 Export Cache::GetCharge (#5476)
Summary:
Exporting GetCharge to cache.hh
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5476

Differential Revision: D15881882

Pulled By: riversand963

fbshipit-source-id: 3d99084d10059b4fcaaaba240606ed50bc23351c
2019-06-24 10:19:31 -07:00
Fosco Marotto
d9aecd909f Update version to 6.3.0 2019-06-18 13:59:54 -07:00
52 changed files with 1563 additions and 282 deletions

View File

@ -1,15 +1,44 @@
# Rocksdb Change Log
## Unreleased
## 6.3.6 (10/1/2019)
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound.
## 6.3.5 (9/17/2019)
* Fix a bug introduced 6.3 which could cause wrong results in a corner case when prefix bloom filter is used and the iterator is reseeked.
## 6.3.4 (9/3/2019)
### Bug Fixes
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
## 6.3.3 (8/20/2019)
### Bug Fixes
* Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0..
## 6.3.2 (8/15/2019)
### Public API Change
* The semantics of the per-block-type block read counts in the performance context now match those of the generic block_read_count.
### Bug Fixes
* Fixed a regression where the fill_cache read option also affected index blocks.
* Fixed an issue where using cache_index_and_filter_blocks==false affected partitions of partitioned indexes as well.
## 6.3.1 (7/24/2019)
### Bug Fixes
* Fix auto rolling bug introduced in 6.3.0, which causes segfault if log file creation fails.
## 6.3.0 (6/18/2019)
### Public API Change
* Now DB::Close() will return Aborted() error when there is unreleased snapshot. Users can retry after all snapshots are released.
* Index blocks are now handled similarly to data blocks with regards to the block cache: instead of storing objects in the cache, only the blocks themselves are cached. In addition, index blocks no longer get evicted from the cache when a table is closed, can now use the compressed block cache (if any), and can be shared among multiple table readers.
* Partitions of partitioned indexes no longer affect the read amplification statistics.
* Due to a refactoring, block cache eviction statistics for indexes are temporarily broken. We plan to reintroduce them in a later phase.
* Due to the above refactoring, block cache eviction statistics for indexes are temporarily broken. We plan to reintroduce them in a later phase.
* options.keep_log_file_num will be enforced strictly all the time. File names of all log files will be tracked, which may take significantly amount of memory if options.keep_log_file_num is large and either of options.max_log_file_size or options.log_file_time_to_roll is set.
* Add initial support for Get/Put with user timestamps. Users can specify timestamps via ReadOptions and WriteOptions when calling DB::Get and DB::Put.
* Accessing a partition of a partitioned filter or index through a pinned reference is no longer considered a cache hit.
* Add C bindings for secondary instance, i.e. DBImplSecondary.
* Rate limited deletion of WALs is only enabled if DBOptions::wal_dir is not set, or explicitly set to db_name passed to DB::Open and DBOptions::db_paths is empty, or same as db_paths[0].path
### New Features
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
* Add an option `snap_refresh_nanos` (default to 0) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
* Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions with two_write_queues=true, it offers higher throughput with however no compromise on guarantees.
* Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL.
* Add an option `failed_move_fall_back_to_copy` (default is true) for external SST ingestion. When `move_files` is true and hard link fails, ingestion falls back to copy if `failed_move_fall_back_to_copy` is true. Otherwise, ingestion reports an error.
@ -19,6 +48,7 @@
* DBIter::Next() can skip user key checking if previous entry's seqnum is 0.
* Merging iterator to avoid child iterator reseek for some cases
* Log Writer will flush after finishing the whole record, rather than a fragment.
* Lower MultiGet batching API latency by reading data blocks from disk in parallel
### General Improvements
* Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress.
@ -29,6 +59,7 @@
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family.
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
* On DB open, delete WAL trash files left behind in wal_dir
## 6.2.0 (4/30/2019)
### New Features

View File

@ -1649,7 +1649,7 @@ JAVA_INCLUDE = -I$(JAVA_HOME)/include/ -I$(JAVA_HOME)/include/linux
ifeq ($(PLATFORM), OS_SOLARIS)
ARCH := $(shell isainfo -b)
else ifeq ($(PLATFORM), OS_OPENBSD)
ifneq (,$(filter $(MACHINE), amd64 arm64 sparc64 aarch64))
ifneq (,$(filter $(MACHINE), amd64 arm64 aarch64 sparc64))
ARCH := 64
else
ARCH := 32
@ -1658,14 +1658,11 @@ else
ARCH := $(shell getconf LONG_BIT)
endif
ifeq (,$(findstring ppc,$(MACHINE)))
ifeq (,$(filter $(MACHINE), ppc arm64 aarch64 sparc64))
ROCKSDBJNILIB = librocksdbjni-linux$(ARCH).so
else
ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE).so
endif
ifneq (,$(findstring aarch64,$(MACHINE)))
ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE).so
endif
ROCKSDB_JAR = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-linux$(ARCH).jar
ROCKSDB_JAR_ALL = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH).jar
ROCKSDB_JAVADOCS_JAR = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-javadoc.jar
@ -1862,6 +1859,14 @@ rocksdbjavastaticdockerppc64le:
mkdir -p java/target
docker run --rm --name rocksdb_linux_ppc64le-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos7_ppc64le-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
rocksdbjavastaticdockerarm64v8:
mkdir -p java/target
DOCKER_LINUX_ARM64V8_CONTAINER=`docker ps -aqf name=rocksdb_linux_arm64v8-be`; \
if [ -z "$$DOCKER_LINUX_ARM64V8_CONTAINER" ]; then \
docker container create --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --name rocksdb_linux_arm64v8-be evolvedbinary/rocksjava:centos7_arm64v8-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh; \
fi
docker start -a rocksdb_linux_arm64v8-be
rocksdbjavastaticpublish: rocksdbjavastaticrelease rocksdbjavastaticpublishcentral
rocksdbjavastaticpublishdocker: rocksdbjavastaticreleasedocker rocksdbjavastaticpublishcentral

View File

@ -56,10 +56,10 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007" ]; then
source "$PWD/build_tools/fbcode_config_platform007.sh"
else
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
source "$PWD/build_tools/fbcode_config_platform007.sh"
fi
fi

8
cache/cache_test.cc vendored
View File

@ -686,6 +686,14 @@ TEST_P(CacheTest, DefaultShardBits) {
ASSERT_EQ(6, sc->GetNumShardBits());
}
TEST_P(CacheTest, GetCharge) {
Insert(1, 2);
Cache::Handle* h1 = cache_->Lookup(EncodeKey(1));
ASSERT_EQ(2, DecodeValue(cache_->Value(h1)));
ASSERT_EQ(1, cache_->GetCharge(h1));
cache_->Release(h1);
}
#ifdef SUPPORT_CLOCK_CACHE
std::shared_ptr<Cache> (*new_clock_cache_func)(size_t, int,
bool) = NewClockCache;

View File

@ -54,7 +54,8 @@ class ShardedCache : public Cache {
virtual CacheShard* GetShard(int shard) = 0;
virtual const CacheShard* GetShard(int shard) const = 0;
virtual void* Value(Handle* handle) override = 0;
virtual size_t GetCharge(Handle* handle) const = 0;
virtual size_t GetCharge(Handle* handle) const override = 0;
virtual uint32_t GetHash(Handle* handle) const = 0;
virtual void DisownData() override = 0;

50
db/c.cc
View File

@ -517,6 +517,21 @@ rocksdb_t* rocksdb_open_for_read_only(
return result;
}
rocksdb_t* rocksdb_open_as_secondary(const rocksdb_options_t* options,
const char* name,
const char* secondary_path,
char** errptr) {
DB* db;
if (SaveError(errptr,
DB::OpenAsSecondary(options->rep, std::string(name),
std::string(secondary_path), &db))) {
return nullptr;
}
rocksdb_t* result = new rocksdb_t;
result->rep = db;
return result;
}
rocksdb_backup_engine_t* rocksdb_backup_engine_open(
const rocksdb_options_t* options, const char* path, char** errptr) {
BackupEngine* be;
@ -717,6 +732,37 @@ rocksdb_t* rocksdb_open_for_read_only_column_families(
return result;
}
rocksdb_t* rocksdb_open_as_secondary_column_families(
const rocksdb_options_t* db_options, const char* name,
const char* secondary_path, int num_column_families,
const char** column_family_names,
const rocksdb_options_t** column_family_options,
rocksdb_column_family_handle_t** column_family_handles, char** errptr) {
std::vector<ColumnFamilyDescriptor> column_families;
for (int i = 0; i != num_column_families; ++i) {
column_families.emplace_back(
std::string(column_family_names[i]),
ColumnFamilyOptions(column_family_options[i]->rep));
}
DB* db;
std::vector<ColumnFamilyHandle*> handles;
if (SaveError(errptr, DB::OpenAsSecondary(DBOptions(db_options->rep),
std::string(name),
std::string(secondary_path),
column_families, &handles, &db))) {
return nullptr;
}
for (size_t i = 0; i != handles.size(); ++i) {
rocksdb_column_family_handle_t* c_handle =
new rocksdb_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
rocksdb_t* result = new rocksdb_t;
result->rep = db;
return result;
}
char** rocksdb_list_column_families(
const rocksdb_options_t* options,
const char* name,
@ -3423,6 +3469,10 @@ void rocksdb_ingest_external_file_cf(
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
}
void rocksdb_try_catch_up_with_primary(rocksdb_t* db, char** errptr) {
SaveError(errptr, db->rep->TryCatchUpWithPrimary());
}
rocksdb_slicetransform_t* rocksdb_slicetransform_create(
void* state,
void (*destructor)(void*),

View File

@ -45,6 +45,7 @@ static char sstfilename[200];
static char dbbackupname[200];
static char dbcheckpointname[200];
static char dbpathname[200];
static char secondary_path[200];
static void StartPhase(const char* name) {
fprintf(stderr, "=== Test %s\n", name);
@ -1722,6 +1723,59 @@ int main(int argc, char** argv) {
CheckNoError(err);
}
// Check that secondary instance works.
StartPhase("open_as_secondary");
{
rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err);
rocksdb_options_t* db_options = rocksdb_options_create();
rocksdb_options_set_create_if_missing(db_options, 1);
db = rocksdb_open(db_options, dbname, &err);
CheckNoError(err);
rocksdb_t* db1;
rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_set_max_open_files(opts, -1);
rocksdb_options_set_create_if_missing(opts, 1);
snprintf(secondary_path, sizeof(secondary_path),
"%s/rocksdb_c_test_secondary-%d", GetTempDir(), ((int)geteuid()));
db1 = rocksdb_open_as_secondary(opts, dbname, secondary_path, &err);
CheckNoError(err);
rocksdb_writeoptions_set_sync(woptions, 0);
rocksdb_writeoptions_disable_WAL(woptions, 1);
rocksdb_put(db, woptions, "key0", 4, "value0", 6, &err);
CheckNoError(err);
rocksdb_flushoptions_t* flush_opts = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flush_opts, 1);
rocksdb_flush(db, flush_opts, &err);
CheckNoError(err);
rocksdb_try_catch_up_with_primary(db1, &err);
CheckNoError(err);
rocksdb_readoptions_t* ropts = rocksdb_readoptions_create();
rocksdb_readoptions_set_verify_checksums(ropts, 1);
rocksdb_readoptions_set_snapshot(ropts, NULL);
CheckGet(db, ropts, "key0", "value0");
CheckGet(db1, ropts, "key0", "value0");
rocksdb_writeoptions_disable_WAL(woptions, 0);
rocksdb_put(db, woptions, "key1", 4, "value1", 6, &err);
CheckNoError(err);
rocksdb_try_catch_up_with_primary(db1, &err);
CheckNoError(err);
CheckGet(db1, ropts, "key0", "value0");
CheckGet(db1, ropts, "key1", "value1");
rocksdb_close(db1);
rocksdb_destroy_db(opts, secondary_path, &err);
CheckNoError(err);
rocksdb_options_destroy(db_options);
rocksdb_options_destroy(opts);
rocksdb_readoptions_destroy(ropts);
rocksdb_flushoptions_destroy(flush_opts);
}
// Simple sanity check that options setting db_paths work.
StartPhase("open_db_paths");
{

View File

@ -39,6 +39,7 @@ class SnapshotListFetchCallback {
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
SequenceNumber max) = 0;
inline bool TimeToRefresh(const size_t key_index) {
assert(snap_refresh_nanos_ != 0);
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
if ((key_index & every_nth_key_minus_one_) != 0) {
return false;

View File

@ -964,7 +964,7 @@ TEST_F(CompactionJobTest, SnapshotRefresh) {
public:
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
std::vector<SequenceNumber>* snapshots)
: SnapshotListFetchCallback(env, 0 /*no time delay*/,
: SnapshotListFetchCallback(env, 1 /*short time delay*/,
1 /*fetch after each key*/),
rand_(rand),
snapshots_(snapshots) {}

View File

@ -10,6 +10,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
#include "table/block_based/block_builder.h"
#include "test_util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
#include "test_util/sync_point.h"
@ -1285,6 +1286,294 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
}
}
class DBBasicTestWithParallelIO
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> {
public:
DBBasicTestWithParallelIO()
: DBTestBase("/db_basic_test_with_parallel_io") {
bool compressed_cache = std::get<0>(GetParam());
bool uncompressed_cache = std::get<1>(GetParam());
compression_enabled_ = std::get<2>(GetParam());
fill_cache_ = std::get<3>(GetParam());
if (compressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
compressed_cache_ = std::make_shared<MyBlockCache>(cache);
}
if (uncompressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
}
env_->count_random_reads_ = true;
Options options = CurrentOptions();
Random rnd(301);
BlockBasedTableOptions table_options;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.block_cache = uncompressed_cache_;
table_options.block_cache_compressed = compressed_cache_;
table_options.flush_block_policy_factory.reset(
new MyFlushBlockPolicyFactory());
options.table_factory.reset(new BlockBasedTableFactory(table_options));
if (!compression_enabled_) {
options.compression = kNoCompression;
}
Reopen(options);
std::string zero_str(128, '\0');
for (int i = 0; i < 100; ++i) {
// Make the value compressible. A purely random string doesn't compress
// and the resultant data block will not be compressed
values_.emplace_back(RandomString(&rnd, 128) + zero_str);
assert(Put(Key(i), values_[i]) == Status::OK());
}
Flush();
}
bool CheckValue(int i, const std::string& value) {
if (values_[i].compare(value) == 0) {
return true;
}
return false;
}
int num_lookups() { return uncompressed_cache_->num_lookups(); }
int num_found() { return uncompressed_cache_->num_found(); }
int num_inserts() { return uncompressed_cache_->num_inserts(); }
int num_lookups_compressed() {
return compressed_cache_->num_lookups();
}
int num_found_compressed() {
return compressed_cache_->num_found();
}
int num_inserts_compressed() {
return compressed_cache_->num_inserts();
}
bool fill_cache() { return fill_cache_; }
static void SetUpTestCase() {}
static void TearDownTestCase() {}
private:
class MyFlushBlockPolicyFactory
: public FlushBlockPolicyFactory {
public:
MyFlushBlockPolicyFactory() {}
virtual const char* Name() const override {
return "MyFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& data_block_builder) const override {
return new MyFlushBlockPolicy(data_block_builder);
}
};
class MyFlushBlockPolicy
: public FlushBlockPolicy {
public:
explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
: num_keys_(0), data_block_builder_(data_block_builder) {}
bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
if (data_block_builder_.empty()) {
// First key in this block
num_keys_ = 1;
return false;
}
// Flush every 10 keys
if (num_keys_ == 10) {
num_keys_ = 1;
return true;
}
num_keys_++;
return false;
}
private:
int num_keys_;
const BlockBuilder& data_block_builder_;
};
class MyBlockCache
: public Cache {
public:
explicit MyBlockCache(std::shared_ptr<Cache>& target)
: target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
virtual const char* Name() const override { return "MyBlockCache"; }
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
num_inserts_++;
return target_->Insert(key, value, charge, deleter, handle, priority);
}
virtual Handle* Lookup(const Slice& key,
Statistics* stats = nullptr) override {
num_lookups_++;
Handle* handle = target_->Lookup(key, stats);
if (handle != nullptr) {
num_found_++;
}
return handle;
}
virtual bool Ref(Handle* handle) override {
return target_->Ref(handle);
}
virtual bool Release(Handle* handle, bool force_erase = false) override {
return target_->Release(handle, force_erase);
}
virtual void* Value(Handle* handle) override {
return target_->Value(handle);
}
virtual void Erase(const Slice& key) override {
target_->Erase(key);
}
virtual uint64_t NewId() override {
return target_->NewId();
}
virtual void SetCapacity(size_t capacity) override {
target_->SetCapacity(capacity);
}
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
target_->SetStrictCapacityLimit(strict_capacity_limit);
}
virtual bool HasStrictCapacityLimit() const override {
return target_->HasStrictCapacityLimit();
}
virtual size_t GetCapacity() const override {
return target_->GetCapacity();
}
virtual size_t GetUsage() const override {
return target_->GetUsage();
}
virtual size_t GetUsage(Handle* handle) const override {
return target_->GetUsage(handle);
}
virtual size_t GetPinnedUsage() const override {
return target_->GetPinnedUsage();
}
virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override {
return target_->ApplyToAllCacheEntries(callback, thread_safe);
}
virtual void EraseUnRefEntries() override {
return target_->EraseUnRefEntries();
}
int num_lookups() { return num_lookups_; }
int num_found() { return num_found_; }
int num_inserts() { return num_inserts_; }
private:
std::shared_ptr<Cache> target_;
int num_lookups_;
int num_found_;
int num_inserts_;
};
std::shared_ptr<MyBlockCache> compressed_cache_;
std::shared_ptr<MyBlockCache> uncompressed_cache_;
bool compression_enabled_;
std::vector<std::string> values_;
bool fill_cache_;
};
TEST_P(DBBasicTestWithParallelIO, MultiGet) {
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
// We cannot resize a PinnableSlice vector, so just set initial size to
// largest we think we will need
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
ro.fill_cache = fill_cache();
// Warm up the cache first
key_data.emplace_back(Key(0));
keys.emplace_back(Slice(key_data.back()));
key_data.emplace_back(Key(50));
keys.emplace_back(Slice(key_data.back()));
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
ASSERT_TRUE(CheckValue(50, values[1].ToString()));
int random_reads = env_->random_read_counter_.Read();
key_data[0] = Key(1);
key_data[1] = Key(51);
keys[0] = Slice(key_data[0]);
keys[1] = Slice(key_data[1]);
values[0].Reset();
values[1].Reset();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
int expected_reads = random_reads + (fill_cache() ? 0 : 2);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(10);
statuses.resize(10);
std::vector<int> key_ints{1,2,15,16,55,81,82,83,84,85};
for (size_t i = 0; i < key_ints.size(); ++i) {
key_data[i] = Key(key_ints[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_ints.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
}
expected_reads += (fill_cache() ? 2 : 4);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
}
INSTANTIATE_TEST_CASE_P(
ParallelIO, DBBasicTestWithParallelIO,
// Params are as follows -
// Param 0 - Compressed cache enabled
// Param 1 - Uncompressed cache enabled
// Param 2 - Data compression enabled
// Param 3 - ReadOptions::fill_cache
::testing::Values(std::make_tuple(false, true, true, true),
std::make_tuple(true, true, true, true),
std::make_tuple(false, true, false, true),
std::make_tuple(false, true, true, false),
std::make_tuple(true, true, true, false),
std::make_tuple(false, true, false, false)));
class DBBasicTestWithTimestampWithParam
: public DBTestBase,
public testing::WithParamInterface<bool> {

View File

@ -290,6 +290,39 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
Close();
}
TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
Options options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({"pikachu"}, options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
{"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BackgroundCallFlush:start",
"DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(2, handles_.size());
ASSERT_OK(Put(1, "key", "value"));
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
port::Thread drop_cf_thr([&]() {
TEST_SYNC_POINT(
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
handles_.resize(1);
TEST_SYNC_POINT(
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
});
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
drop_cf_thr.join();
Close();
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
@ -545,6 +578,49 @@ TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
handles_.clear();
}
TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
CreateAndReopenWithCF({"pikachu"}, options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
{"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BackgroundCallFlush:start",
"DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(2, handles_.size());
ASSERT_OK(Put(0, "key", "value"));
ASSERT_OK(Put(1, "key", "value"));
auto* cfd_default =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
->cfd();
auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
port::Thread drop_cf_thr([&]() {
TEST_SYNC_POINT(
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
delete handles_[1];
handles_.resize(1);
TEST_SYNC_POINT(
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
});
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
flush_opts));
drop_cf_thr.join();
Close();
SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

View File

@ -3148,6 +3148,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
// Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal
@ -3170,7 +3171,9 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile || type == kLogFile) {
del = DeleteDBFile(&soptions, path_to_delete, dbname);
del =
DeleteDBFile(&soptions, path_to_delete, dbname,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -3204,7 +3207,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(fname, &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
std::string table_path = path + "/" + fname;
Status del = DeleteDBFile(&soptions, table_path, dbname);
Status del = DeleteDBFile(&soptions, table_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3231,7 +3235,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
for (const auto& file : archiveFiles) {
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del =
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir);
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3246,7 +3251,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del =
DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
soptions.wal_dir);
soptions.wal_dir, /*force_bg=*/false,
/*force_fg=*/!wal_in_db_path);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3651,9 +3657,9 @@ Status DBImpl::IngestExternalFiles(
exec_results.emplace_back(false, Status::OK());
}
// TODO(yanqin) maybe make jobs run in parallel
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
uint64_t start_file_number =
next_file_number + args[i - 1].external_files.size();
start_file_number += args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);

View File

@ -756,6 +756,16 @@ class DBImpl : public DB {
Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
ColumnFamilyHandle* cfh = nullptr);
Status TEST_FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_opts);
// Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This
// is because in certain cases, we can flush column families, wait for the
// flush to complete, but delete the column family handle before the wait
// finishes. For example in CompactRange.
Status TEST_AtomicFlushMemTables(const autovector<ColumnFamilyData*>& cfds,
const FlushOptions& flush_opts);
// Wait for memtable compaction
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
@ -1873,6 +1883,8 @@ class DBImpl : public DB {
// results sequentially. Flush results of memtables with lower IDs get
// installed to MANIFEST first.
InstrumentedCondVar atomic_flush_install_cv_;
bool wal_in_db_path_;
};
extern Options SanitizeOptions(const std::string& db, const Options& src);

View File

@ -1008,8 +1008,10 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER,
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
: nullptr);
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
@ -1565,6 +1567,16 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
ColumnFamilyData* loop_cfd = elem.first;
loop_cfd->imm()->FlushRequested();
}
// If the caller wants to wait for this flush to complete, it indicates
// that the caller expects the ColumnFamilyData not to be free'ed by
// other threads which may drop the column family concurrently.
// Therefore, we increase the cfd's ref count.
if (flush_options.wait) {
for (auto& elem : flush_req) {
ColumnFamilyData* loop_cfd = elem.first;
loop_cfd->Ref();
}
}
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
}
@ -1573,7 +1585,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
write_thread_.ExitUnbatched(&w);
}
}
TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
autovector<ColumnFamilyData*> cfds;
autovector<const uint64_t*> flush_memtable_ids;
@ -1583,6 +1596,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
for (auto* tmp_cfd : cfds) {
if (tmp_cfd->Unref()) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard(&mutex_);
delete tmp_cfd;
}
}
}
TEST_SYNC_POINT("FlushMemTableFinished");
return s;
@ -1646,6 +1666,15 @@ Status DBImpl::AtomicFlushMemTables(
for (auto cfd : cfds) {
cfd->imm()->FlushRequested();
}
// If the caller wants to wait for this flush to complete, it indicates
// that the caller expects the ColumnFamilyData not to be free'ed by
// other threads which may drop the column family concurrently.
// Therefore, we increase the cfd's ref count.
if (flush_options.wait) {
for (auto cfd : cfds) {
cfd->Ref();
}
}
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
@ -1656,7 +1685,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
autovector<const uint64_t*> flush_memtable_ids;
for (auto& iter : flush_req) {
@ -1664,6 +1693,13 @@ Status DBImpl::AtomicFlushMemTables(
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
for (auto* cfd : cfds) {
if (cfd->Unref()) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard(&mutex_);
delete cfd;
}
}
}
return s;
}
@ -2125,6 +2161,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
}
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
*reason = bg_flush_args[0].cfd_->GetFlushReason();
@ -2676,8 +2713,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri,
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
: nullptr);
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr);
compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

View File

@ -122,6 +122,16 @@ Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
return FlushMemTable(cfd, fo, FlushReason::kTest);
}
Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_opts) {
return FlushMemTable(cfd, flush_opts, FlushReason::kTest);
}
Status DBImpl::TEST_AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& cfds, const FlushOptions& flush_opts) {
return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest);
}
Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {

View File

@ -258,7 +258,8 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
Status file_deletion_status;
if (type == kTableFile || type == kLogFile) {
file_deletion_status =
DeleteDBFile(&immutable_db_options_, fname, path_to_sync);
DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
} else {
file_deletion_status = env_->DeleteFile(fname);
}

View File

@ -122,6 +122,25 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
}
#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we
// cannot tell for sure. In either case, assume they're different and
// explicitly cleanup the trash log files (bypass DeleteScheduler)
// Do this first so even if we end up calling
// DeleteScheduler::CleanupDirectory on the same dir later, it will be
// safe
std::vector<std::string> filenames;
result.env->GetChildren(result.wal_dir, &filenames);
for (std::string& filename : filenames) {
if (filename.find(".log.trash",
filename.length() - std::string(".log.trash").length()) !=
std::string::npos) {
std::string trash_file = result.wal_dir + "/" + filename;
result.env->DeleteFile(trash_file);
}
}
}
// When the DB is stopped, it's possible that there are some .trash files that
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
@ -1294,6 +1313,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
delete impl;
return s;
}
impl->wal_in_db_path_ =
IsWalDirSameAsDBPath(&impl->immutable_db_options_);
impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists
s = impl->Recover(column_families);

View File

@ -2548,75 +2548,6 @@ TEST_P(DBIteratorTest, AvoidReseekLevelIterator) {
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBIteratorTest, AvoidReseekChildIterator) {
Options options = CurrentOptions();
options.compression = CompressionType::kNoCompression;
BlockBasedTableOptions table_options;
table_options.block_size = 800;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
std::string random_str = RandomString(&rnd, 180);
ASSERT_OK(Put("1", random_str));
ASSERT_OK(Put("2", random_str));
ASSERT_OK(Put("3", random_str));
ASSERT_OK(Put("4", random_str));
ASSERT_OK(Put("8", random_str));
ASSERT_OK(Put("9", random_str));
ASSERT_OK(Flush());
ASSERT_OK(Put("5", random_str));
ASSERT_OK(Put("6", random_str));
ASSERT_OK(Put("7", random_str));
ASSERT_OK(Flush());
// These two keys will be kept in memtable.
ASSERT_OK(Put("0", random_str));
ASSERT_OK(Put("8", random_str));
int num_iter_wrapper_seek = 0;
SyncPoint::GetInstance()->SetCallBack(
"IteratorWrapper::Seek:0",
[&](void* /*arg*/) { num_iter_wrapper_seek++; });
SyncPoint::GetInstance()->EnableProcessing();
{
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
iter->Seek("1");
ASSERT_TRUE(iter->Valid());
// DBIter always wraps internal iterator with IteratorWrapper,
// and in merging iterator each child iterator will be wrapped
// with IteratorWrapper.
ASSERT_EQ(4, num_iter_wrapper_seek);
// child position: 1 and 5
num_iter_wrapper_seek = 0;
iter->Seek("2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_iter_wrapper_seek);
// child position: 2 and 5
num_iter_wrapper_seek = 0;
iter->Seek("6");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(4, num_iter_wrapper_seek);
// child position: 8 and 6
num_iter_wrapper_seek = 0;
iter->Seek("7");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_iter_wrapper_seek);
// child position: 8 and 7
num_iter_wrapper_seek = 0;
iter->Seek("5");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(4, num_iter_wrapper_seek);
}
SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest,
testing::Values(true, false));

View File

@ -470,6 +470,111 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
class DBWALTestWithParam
: public DBSSTTest,
public testing::WithParamInterface<std::tuple<std::string, bool>> {
public:
DBWALTestWithParam() {
wal_dir_ = std::get<0>(GetParam());
wal_dir_same_as_dbname_ = std::get<1>(GetParam());
}
std::string wal_dir_;
bool wal_dir_same_as_dbname_;
};
TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
Status DeleteFile(const std::string& fname) {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}
return target()->DeleteFile(fname);
}
void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
private:
bool fake_log_delete;
};
std::unique_ptr<MyEnv> env(new MyEnv(Env::Default()));
Destroy(last_options_);
env->set_fake_log_delete(true);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
options.env = env.get();
options.wal_dir = dbname_ + wal_dir_;
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
ASSERT_OK(TryReopen(options));
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Put("Key1", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
ASSERT_EQ("4", FilesPerLevel(0));
Close();
options.sst_file_manager.reset();
std::vector<std::string> filenames;
int trash_log_count = 0;
if (!wal_dir_same_as_dbname_) {
// Forcibly create some trash log files
std::unique_ptr<WritableFile> result;
env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
EnvOptions());
result.reset();
}
env->GetChildren(options.wal_dir, &filenames);
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_GE(trash_log_count, 1);
env->set_fake_log_delete(false);
ASSERT_OK(TryReopen(options));
filenames.clear();
trash_log_count = 0;
env->GetChildren(options.wal_dir, &filenames);
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_EQ(trash_log_count, 0);
Close();
}
INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
::testing::Values(std::make_tuple("", true),
std::make_tuple("_wal_dir", false)));
TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
Options options = CurrentOptions();

View File

@ -3771,6 +3771,46 @@ TEST_F(DBTest2, CloseWithUnreleasedSnapshot) {
delete db_;
db_ = nullptr;
}
TEST_F(DBTest2, PrefixBloomReseek) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
// Construct two L1 files with keys:
// f1:[aaa1 ccc1] f2:[ddd0]
ASSERT_OK(Put("aaa1", ""));
ASSERT_OK(Put("ccc1", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("ddd0", ""));
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_OK(Put("bbb1", ""));
Iterator* iter = db_->NewIterator(ReadOptions());
// Seeking into f1, the iterator will check bloom filter which returns the
// file iterator ot be invalidate, and the cursor will put into f2, with
// the next key to be "ddd0".
iter->Seek("bbb1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbb1", iter->key().ToString());
// Reseek ccc1, the L1 iterator needs to go back to f1 and reseek.
iter->Seek("ccc1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ccc1", iter->key().ToString());
delete iter;
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -121,7 +121,7 @@ Status ExternalSstFileIngestionJob::Prepare(
// We failed, remove all files that we copied into the db
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
break;
continue;
}
Status s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) {
@ -252,6 +252,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
// We failed to add the files to the database
// remove all the files we copied
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
continue;
}
Status s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,

View File

@ -2369,10 +2369,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2386,6 +2387,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2393,8 +2397,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
-1, true, true_data);
ASSERT_OK(s);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2426,10 +2431,11 @@ TEST_P(ExternalSSTFileTest,
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
for (size_t i = 0; i != handles_.size(); ++i) {
int cf = static_cast<int>(i);
const auto& orig_data = data_before_ingestion[i];
@ -2442,6 +2448,7 @@ TEST_P(ExternalSSTFileTest,
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2455,6 +2462,8 @@ TEST_P(ExternalSSTFileTest,
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2508,10 +2517,11 @@ TEST_P(ExternalSSTFileTest,
dbfull()->ReleaseSnapshot(read_opts.snapshot);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
// Should see consistent state after ingestion for all column families even
// without snapshot.
ASSERT_EQ(2, handles_.size());
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2541,10 +2551,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2558,6 +2569,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2577,8 +2591,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2607,10 +2622,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2624,6 +2640,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2643,8 +2661,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2664,7 +2683,7 @@ TEST_P(ExternalSSTFileTest,
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
SyncPoint::GetInstance()->ClearTrace();
SyncPoint::GetInstance()->DisableProcessing();
@ -2682,6 +2701,7 @@ TEST_P(ExternalSSTFileTest,
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2695,6 +2715,8 @@ TEST_P(ExternalSSTFileTest,
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2715,8 +2737,9 @@ TEST_P(ExternalSSTFileTest,
fault_injection_env->DropUnsyncedFileData();
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {

View File

@ -858,8 +858,7 @@ class LevelIterator final : public InternalIterator {
bool skip_filters, int level, RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
nullptr)
: InternalIterator(false),
table_cache_(table_cache),
: table_cache_(table_cache),
read_options_(read_options),
env_options_(env_options),
icomparator_(icomparator),

View File

@ -187,7 +187,8 @@ void WalManager::PurgeObsoleteWALFiles() {
continue;
}
if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
@ -213,7 +214,8 @@ void WalManager::PurgeObsoleteWALFiles() {
log_file_size = std::max(log_file_size, file_size);
++log_files_num;
} else {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Unable to delete file: %s: %s", file_path.c_str(),
@ -253,7 +255,8 @@ void WalManager::PurgeObsoleteWALFiles() {
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
db_options_.wal_dir, false);
db_options_.wal_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());

View File

@ -18,6 +18,7 @@
#include <memory>
#include "db/version_set.h"
#include "file/file_util.h"
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/env.h"
@ -40,7 +41,8 @@ class WalManager {
env_options_(env_options),
env_(db_options.env),
purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch) {}
seq_per_batch_(seq_per_batch),
wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)) {}
Status GetSortedWalFiles(VectorLogPtr& files);
@ -97,6 +99,8 @@ class WalManager {
bool seq_per_batch_;
bool wal_in_db_path_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;

View File

@ -88,12 +88,12 @@ Status CreateFile(Env* env, const std::string& destination,
}
Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg) {
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg, const bool force_fg) {
#ifndef ROCKSDB_LITE
SstFileManagerImpl* sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
if (sfm) {
if (sfm && !force_fg) {
return sfm->ScheduleFileDeletion(fname, dir_to_sync, force_bg);
} else {
return db_options->env->DeleteFile(fname);
@ -101,10 +101,21 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
#else
(void)dir_to_sync;
(void)force_bg;
(void)force_fg;
// SstFileManager is not supported in ROCKSDB_LITE
// Delete file immediately
return db_options->env->DeleteFile(fname);
#endif
}
bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) {
bool same = false;
Status s = db_options->env->AreFilesSame(db_options->wal_dir,
db_options->db_paths[0].path, &same);
if (s.IsNotSupported()) {
same = db_options->wal_dir == db_options->db_paths[0].path;
}
return same;
}
} // namespace rocksdb

View File

@ -24,7 +24,9 @@ extern Status CreateFile(Env* env, const std::string& destination,
extern Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname,
const std::string& path_to_sync,
const bool force_bg = false);
const std::string& path_to_sync, const bool force_bg,
const bool force_fg);
extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);
} // namespace rocksdb

View File

@ -138,6 +138,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_t* rocksdb_open_for_read_only(
const rocksdb_options_t* options, const char* name,
unsigned char error_if_log_file_exist, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_t* rocksdb_open_as_secondary(
const rocksdb_options_t* options, const char* name,
const char* secondary_path, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_backup_engine_t* rocksdb_backup_engine_open(
const rocksdb_options_t* options, const char* path, char** errptr);
@ -218,6 +222,13 @@ rocksdb_open_for_read_only_column_families(
rocksdb_column_family_handle_t** column_family_handles,
unsigned char error_if_log_file_exist, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_t* rocksdb_open_as_secondary_column_families(
const rocksdb_options_t* options, const char* name,
const char* secondary_path, int num_column_families,
const char** column_family_names,
const rocksdb_options_t** column_family_options,
rocksdb_column_family_handle_t** colummn_family_handles, char** errptr);
extern ROCKSDB_LIBRARY_API char** rocksdb_list_column_families(
const rocksdb_options_t* options, const char* name, size_t* lencf,
char** errptr);
@ -1375,6 +1386,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_ingest_external_file_cf(
const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_try_catch_up_with_primary(
rocksdb_t* db, char** errptr);
/* SliceTransform */
extern ROCKSDB_LIBRARY_API rocksdb_slicetransform_t*

View File

@ -226,6 +226,9 @@ class Cache {
// returns the memory size for the entries in use by the system
virtual size_t GetPinnedUsage() const = 0;
// returns the charge for the specific entry in the cache.
virtual size_t GetCharge(Handle* handle) const = 0;
// Call this on shutdown if you want to speed it up. Cache will disown
// any underlying data and will not free it on delete. This call will leak
// memory - call this only if you're shutting down the process.

View File

@ -275,10 +275,10 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// this option helps reducing the cpu usage of long-running compactions. The
// feature is disabled when max_subcompactions is greater than one.
//
// Default: 0.1s
// Default: 0
//
// Dynamically changeable through SetOptions() API
uint64_t snap_refresh_nanos = 100 * 1000 * 1000; // 0.1s
uint64_t snap_refresh_nanos = 0;
// Disable automatic compactions. Manual compactions can still
// be issued on this column family

View File

@ -5,8 +5,8 @@
#pragma once
#define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 2
#define ROCKSDB_PATCH 0
#define ROCKSDB_MINOR 3
#define ROCKSDB_PATCH 6
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -12,17 +12,17 @@ cd /rocksdb-local
if hash scl 2>/dev/null; then
if scl --list | grep -q 'devtoolset-7'; then
scl enable devtoolset-7 'make jclean clean'
scl enable devtoolset-7 'PORTABLE=1 make -j6 rocksdbjavastatic'
scl enable devtoolset-7 'PORTABLE=1 make -j2 rocksdbjavastatic'
elif scl --list | grep -q 'devtoolset-2'; then
scl enable devtoolset-2 'make jclean clean'
scl enable devtoolset-2 'PORTABLE=1 make -j6 rocksdbjavastatic'
scl enable devtoolset-2 'PORTABLE=1 make -j2 rocksdbjavastatic'
else
echo "Could not find devtoolset"
exit 1;
fi
else
make jclean clean
PORTABLE=1 make -j6 rocksdbjavastatic
PORTABLE=1 make -j2 rocksdbjavastatic
fi
cp java/target/librocksdbjni-linux*.so java/target/rocksdbjni-*-linux*.jar /rocksdb-host/java/target

View File

@ -46,9 +46,8 @@ AutoRollLogger::AutoRollLogger(Env* env, const std::string& dbname,
}
GetExistingFiles();
ResetLogger();
s = TrimOldLogFiles();
if (!status_.ok()) {
status_ = s;
if (status_.ok()) {
status_ = TrimOldLogFiles();
}
}

View File

@ -653,6 +653,15 @@ TEST_F(AutoRollLoggerTest, LogFileExistence) {
delete db;
}
TEST_F(AutoRollLoggerTest, FileCreateFailure) {
Options options;
options.max_log_file_size = 100 * 1024 * 1024;
options.db_log_dir = "/a/dir/does/not/exist/at/all";
std::shared_ptr<Logger> logger;
ASSERT_NOK(CreateLoggerFromOptions("", options, &logger));
ASSERT_TRUE(!logger);
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -80,14 +80,14 @@ Status ReadBlockFromFile(
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed,
bool do_uncompress, bool maybe_compressed, BlockType block_type,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) {
BlockContents contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
maybe_compressed, uncompression_dict,
maybe_compressed, block_type, uncompression_dict,
cache_options, memory_allocator);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
@ -159,6 +159,13 @@ bool PrefixExtractorChanged(const TableProperties* table_properties,
}
}
CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
CacheAllocationPtr heap_buf;
heap_buf = AllocateBlock(buf.size(), allocator);
memcpy(heap_buf.get(), buf.data(), buf.size());
return heap_buf;
}
} // namespace
// Encapsulates common functionality for the various index reader
@ -176,7 +183,7 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
protected:
static Status ReadIndexBlock(const BlockBasedTable* table,
FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options,
const ReadOptions& read_options, bool use_cache,
GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block);
@ -210,8 +217,13 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
return properties == nullptr || !properties->index_value_is_delta_encoded;
}
Status GetOrReadIndexBlock(const ReadOptions& read_options,
GetContext* get_context,
bool cache_index_blocks() const {
assert(table_ != nullptr);
assert(table_->get_rep() != nullptr);
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
}
Status GetOrReadIndexBlock(bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const;
@ -229,7 +241,7 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, GetContext* get_context,
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) {
PERF_TIMER_GUARD(read_index_block_nanos);
@ -244,13 +256,13 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->footer.index_handle(),
UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex,
get_context, lookup_context);
get_context, lookup_context, use_cache);
return s;
}
Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
const ReadOptions& read_options, GetContext* get_context,
bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const {
assert(index_block != nullptr);
@ -260,8 +272,14 @@ Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
return Status::OK();
}
ReadOptions read_options;
if (no_io) {
read_options.read_tier = kBlockCacheTier;
}
return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options,
get_context, lookup_context, index_block);
cache_index_blocks(), get_context, lookup_context,
index_block);
}
// Index that allows binary search lookup in a two-level index structure.
@ -283,7 +301,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ReadOptions(),
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
@ -304,9 +322,10 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override {
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(read_options, get_context,
lookup_context, &index_block);
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
@ -366,7 +385,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
Statistics* kNullStats = nullptr;
CachableEntry<Block> index_block;
Status s = GetOrReadIndexBlock(ReadOptions(), nullptr /* get_context */,
Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
&lookup_context, &index_block);
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
@ -416,7 +435,8 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context);
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
/*contents=*/nullptr);
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
@ -469,7 +489,7 @@ class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon {
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ReadOptions(),
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
@ -489,9 +509,10 @@ class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon {
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override {
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(read_options, get_context,
lookup_context, &index_block);
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
@ -549,7 +570,7 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ReadOptions(),
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
@ -597,8 +618,8 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
BlockFetcher prefixes_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, ioptions, true /*decompress*/,
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, memory_allocator);
true /*maybe_compressed*/, BlockType::kHashIndexPrefixes,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = prefixes_block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -607,8 +628,8 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
BlockFetcher prefixes_meta_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, ioptions, true /*decompress*/,
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, memory_allocator);
true /*maybe_compressed*/, BlockType::kHashIndexMetadata,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = prefixes_meta_block_fetcher.ReadBlockContents();
if (!s.ok()) {
// TODO: log error
@ -631,9 +652,10 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
const ReadOptions& read_options, bool disable_prefix_seek,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override {
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(read_options, get_context,
lookup_context, &index_block);
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
@ -1366,7 +1388,8 @@ Status BlockBasedTable::ReadCompressionDictBlock(
rep_->file.get(), prefetch_buffer, rep_->footer, read_options,
rep_->compression_dict_handle, compression_dict_cont.get(),
rep_->ioptions, false /* decompress */, false /*maybe_compressed*/,
UncompressionDict::GetEmptyDict(), cache_options);
BlockType::kCompressionDictionary, UncompressionDict::GetEmptyDict(),
cache_options);
s = compression_block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -1576,7 +1599,7 @@ Status BlockBasedTable::ReadMetaBlock(FilePrefetchBuffer* prefetch_buffer,
Status s = ReadBlockFromFile(
rep_->file.get(), prefetch_buffer, rep_->footer, ReadOptions(),
rep_->footer.metaindex_handle(), &meta, rep_->ioptions,
true /* decompress */, true /*maybe_compressed*/,
true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
GetMemoryAllocator(rep_->table_options));
@ -1718,8 +1741,6 @@ Status BlockBasedTable::PutDataBlockToCache(
: Cache::Priority::LOW;
assert(cached_block);
assert(cached_block->IsEmpty());
assert(raw_block_comp_type == kNoCompression ||
block_cache_compressed != nullptr);
Status s;
Statistics* statistics = ioptions.statistics;
@ -1811,8 +1832,9 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
BlockFetcher block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(),
filter_handle, &block, rep->ioptions, false /* decompress */,
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
rep->persistent_cache_options, GetMemoryAllocator(rep->table_options));
false /*maybe_compressed*/, BlockType::kFilter,
UncompressionDict::GetEmptyDict(), rep->persistent_cache_options,
GetMemoryAllocator(rep->table_options));
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -1933,7 +1955,6 @@ CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
? Cache::Priority::HIGH
: Cache::Priority::LOW);
if (s.ok()) {
PERF_COUNTER_ADD(filter_block_read_count, 1);
UpdateCacheInsertionMetrics(BlockType::kFilter, get_context, usage);
} else {
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES);
@ -1943,7 +1964,8 @@ CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
}
}
if (block_cache_tracer_ && lookup_context) {
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
lookup_context) {
// Avoid making copy of block_key and cf_name when constructing the access
// record.
BlockCacheTraceRecord access_record(
@ -2014,7 +2036,6 @@ CachableEntry<UncompressionDict> BlockBasedTable::GetUncompressionDict(
: Cache::Priority::LOW);
if (s.ok()) {
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
UpdateCacheInsertionMetrics(BlockType::kCompressionDictionary,
get_context, usage);
dict = uncompression_dict.release();
@ -2025,7 +2046,8 @@ CachableEntry<UncompressionDict> BlockBasedTable::GetUncompressionDict(
}
}
}
if (block_cache_tracer_ && lookup_context) {
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
lookup_context) {
// Avoid making copy of block_key and cf_name when constructing the access
// record.
BlockCacheTraceRecord access_record(
@ -2087,7 +2109,8 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block,
block_type, get_context, lookup_context);
block_type, get_context, lookup_context,
/* use_cache */ true);
if (!s.ok()) {
assert(block.IsEmpty());
@ -2151,11 +2174,110 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
return iter;
}
// Convert an uncompressed data block (i.e CachableEntry<Block>)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, CachableEntry<Block>& block, TBlockIter* input_iter,
Status s) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
constexpr bool kTotalOrderSeek = true;
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = block.GetValue()->NewIterator<TBlockIter>(
&rep_->internal_comparator, rep_->internal_comparator.user_comparator(),
iter, rep_->ioptions.statistics, kTotalOrderSeek, false, true,
block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
Cache* const block_cache = rep_->table_options.block_cache.get();
Cache::Handle* cache_handle = nullptr;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
// Prefix: use rep_->cache_key_prefix padded by 0s
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
assert(rep_->cache_key_prefix_size != 0);
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
next_cache_key_id_++);
assert(end - cache_key <=
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
cache_handle);
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
// Lookup the cache for the given data block referenced by an index iterator
// value (i.e BlockHandle). If it exists in the cache, initialize block to
// the contents of the data block.
Status BlockBasedTable::GetDataBlockFromCache(
const ReadOptions& ro, const BlockHandle& handle,
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block, BlockType block_type,
GetContext* get_context) const {
BlockCacheLookupContext lookup_data_block_context(
BlockCacheLookupCaller::kUserMGet);
assert(block_type == BlockType::kData);
Status s = RetrieveBlock(nullptr, ro, handle, uncompression_dict, block,
block_type, get_context, &lookup_data_block_context,
/* use_cache */ true);
if (s.IsIncomplete()) {
s = Status::OK();
}
return s;
}
// If contents is nullptr, this function looks up the block caches for the
// data block referenced by handle, and read the block from disk if necessary.
// If contents is non-null, it skips the cache lookup and disk read, since
// the caller has already read it. In both cases, if ro.fill_cache is true,
// it inserts the block into the block cache.
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context) const {
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep_->table_options.block_cache.get();
@ -2187,14 +2309,17 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
compressed_cache_key);
}
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
ro, block_entry, uncompression_dict, block_type,
get_context);
if (block_entry->GetValue()) {
// TODO(haoyu): Differentiate cache hit on uncompressed block cache and
// compressed block cache.
is_cache_hit = true;
if (!contents) {
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
ro, block_entry, uncompression_dict, block_type,
get_context);
if (block_entry->GetValue()) {
// TODO(haoyu): Differentiate cache hit on uncompressed block cache and
// compressed block cache.
is_cache_hit = true;
}
}
// Can't find the block from the cache. If I/O is allowed, read from the
// file.
if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) {
@ -2204,17 +2329,20 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
block_cache_compressed == nullptr && rep_->blocks_maybe_compressed;
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
{
if (!contents) {
StopWatch sw(rep_->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
BlockFetcher block_fetcher(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
&raw_block_contents, rep_->ioptions,
do_decompress /* do uncompress */, rep_->blocks_maybe_compressed,
uncompression_dict, rep_->persistent_cache_options,
block_type, uncompression_dict, rep_->persistent_cache_options,
GetMemoryAllocator(rep_->table_options),
GetMemoryAllocatorForCompressedBlock(rep_->table_options));
s = block_fetcher.ReadBlockContents();
raw_block_comp_type = block_fetcher.get_compression_type();
contents = &raw_block_contents;
} else {
raw_block_comp_type = contents->get_compression_type();
}
if (s.ok()) {
@ -2222,7 +2350,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// If filling cache is allowed and a cache is configured, try to put the
// block to the cache.
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
block_entry, &raw_block_contents,
block_entry, contents,
raw_block_comp_type, uncompression_dict, seq_no,
GetMemoryAllocator(rep_->table_options),
block_type, get_context);
@ -2231,7 +2359,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
}
// Fill lookup_context.
if (block_cache_tracer_ && lookup_context) {
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
lookup_context) {
size_t usage = 0;
uint64_t nkeys = 0;
if (block_entry->GetValue()) {
@ -2286,22 +2415,189 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
return s;
}
// This function reads multiple data blocks from disk using Env::MultiRead()
// and optionally inserts them into the block cache. It uses the scratch
// buffer provided by the caller, which is contiguous. If scratch is a nullptr
// it allocates a separate buffer for each block. Typically, if the blocks
// need to be uncompressed and there is no compressed block cache, callers
// can allocate a temporary scratch buffer in order to minimize memory
// allocations.
// If options.fill_cache is true, it inserts the blocks into cache. If its
// false and scratch is non-null and the blocks are uncompressed, it copies
// the buffers to heap. In any case, the CachableEntry<Block> returned will
// own the data bytes.
// batch - A MultiGetRange with only those keys with unique data blocks not
// found in cache
// handles - A vector of block handles. Some of them me be NULL handles
// scratch - An optional contiguous buffer to read compressed blocks into
void BlockBasedTable::MaybeLoadBlocksToCache(
const ReadOptions& options,
const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector<
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
char* scratch,
const UncompressionDict& uncompression_dict) const {
RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer;
const ImmutableCFOptions& ioptions = rep_->ioptions;
SequenceNumber global_seqno = rep_->get_global_seqno(BlockType::kData);
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
if (file->use_direct_io() || ioptions.allow_mmap_reads) {
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
BlockCacheLookupContext lookup_data_block_context(
BlockCacheLookupCaller::kUserMGet);
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
(*statuses)[idx_in_batch] =
RetrieveBlock(nullptr, options, handle, uncompression_dict,
&(*results)[idx_in_batch], BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
/* use_cache */ true);
}
return;
}
autovector<ReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
size_t buf_offset = 0;
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
ReadRequest req;
req.len = handle.size() + kBlockTrailerSize;
if (scratch == nullptr) {
req.scratch = new char[req.len];
} else {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
}
req.offset = handle.offset();
req.status = Status::OK();
read_reqs.emplace_back(req);
}
file->MultiRead(&read_reqs[0], read_reqs.size());
size_t read_req_idx = 0;
idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
ReadRequest& req = read_reqs[read_req_idx++];
Status s = req.status;
if (s.ok()) {
if (req.result.size() != handle.size() + kBlockTrailerSize) {
s = Status::Corruption("truncated block read from " +
rep_->file->file_name() + " offset " +
ToString(handle.offset()) + ", expected " +
ToString(handle.size() + kBlockTrailerSize) +
" bytes, got " + ToString(req.result.size()));
}
}
BlockContents raw_block_contents;
if (s.ok()) {
if (scratch == nullptr) {
// We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory
assert(req.result.data() == req.scratch);
std::unique_ptr<char[]> raw_block(req.scratch);
raw_block_contents = BlockContents(std::move(raw_block),
handle.size());
} else {
// We used the scratch buffer, so no need to free anything
raw_block_contents = BlockContents(Slice(req.scratch,
handle.size()));
}
#ifndef NDEBUG
raw_block_contents.is_raw_block = true;
#endif
if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
const char* data = req.result.data();
uint32_t expected = DecodeFixed32(data + handle.size() + 1);
s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data(),
handle.size() + 1, expected);
}
}
if (s.ok()) {
if (options.fill_cache) {
BlockCacheLookupContext lookup_data_block_context(
BlockCacheLookupCaller::kUserMGet);
CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
// MaybeReadBlockAndLoadToCache will insert into the block caches if
// necessary. Since we're passing the raw block contents, it will
// avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(nullptr, options, handle,
uncompression_dict, block_entry, BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
&raw_block_contents);
} else {
CompressionType compression_type =
raw_block_contents.get_compression_type();
BlockContents contents;
if (compression_type != kNoCompression) {
UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents(info, req.result.data(), handle.size(),
&contents, footer.version(), rep_->ioptions,
memory_allocator);
} else {
if (scratch != nullptr) {
// If we used the scratch buffer, then the contents need to be
// copied to heap
Slice raw = Slice(req.result.data(), handle.size());
contents = BlockContents(CopyBufferToHeap(
GetMemoryAllocator(rep_->table_options), raw),
handle.size());
} else {
contents = std::move(raw_block_contents);
}
}
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(new Block(std::move(contents),
global_seqno, read_amp_bytes_per_bit, ioptions.statistics));
}
}
}
(*statuses)[idx_in_batch] = s;
}
}
Status BlockBasedTable::RetrieveBlock(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context) const {
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool use_cache) const {
assert(block_entry);
assert(block_entry->IsEmpty());
Status s;
if (rep_->table_options.cache_index_and_filter_blocks ||
(block_type != BlockType::kFilter &&
block_type != BlockType::kCompressionDictionary &&
block_type != BlockType::kIndex)) {
if (use_cache) {
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle,
uncompression_dict, block_entry,
block_type, get_context, lookup_context);
block_type, get_context, lookup_context,
/*contents=*/nullptr);
if (!s.ok()) {
return s;
@ -2328,7 +2624,7 @@ Status BlockBasedTable::RetrieveBlock(
s = ReadBlockFromFile(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
rep_->ioptions, rep_->blocks_maybe_compressed,
rep_->blocks_maybe_compressed, uncompression_dict,
rep_->blocks_maybe_compressed, block_type, uncompression_dict,
rep_->persistent_cache_options, rep_->get_global_seqno(block_type),
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
@ -3023,7 +3319,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
s = biter.status();
}
// Write the block cache access record.
if (block_cache_tracer_) {
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
// Avoid making copy of block_key, cf_name, and referenced_key when
// constructing the access record.
BlockCacheTraceRecord access_record(
@ -3103,8 +3399,90 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
iiter_unique_ptr.reset(iiter);
}
DataBlockIter biter;
uint64_t offset = std::numeric_limits<uint64_t>::max();
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
static const size_t kMultiGetReadStackBufSize = 8192;
char stack_buf[kMultiGetReadStackBufSize];
std::unique_ptr<char[]> block_buf;
{
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
sst_file_range.end());
BlockCacheLookupContext lookup_compression_dict_context(
BlockCacheLookupCaller::kUserMGet);
auto uncompression_dict_storage = GetUncompressionDict(nullptr, no_io,
sst_file_range.begin()->get_context,
&lookup_compression_dict_context);
const UncompressionDict& uncompression_dict =
uncompression_dict_storage.GetValue() == nullptr
? UncompressionDict::GetEmptyDict()
: *uncompression_dict_storage.GetValue();
size_t total_len = 0;
ReadOptions ro = read_options;
ro.read_tier = kBlockCacheTier;
for (auto miter = data_block_range.begin();
miter != data_block_range.end(); ++miter) {
iiter->Seek(miter->ikey);
if (!iiter->Valid()) {
*(miter->s) = iiter->status();
data_block_range.SkipKey(miter);
sst_file_range.SkipKey(miter);
continue;
}
statuses.emplace_back();
results.emplace_back();
if (iiter->value().offset() == offset) {
// We're going to reuse the block for this key later on. No need to
// look it up now. Place a null handle
block_handles.emplace_back(BlockHandle::NullBlockHandle());
continue;
}
offset = iiter->value().offset();
BlockHandle handle = iiter->value();
Status s = GetDataBlockFromCache(ro, handle, uncompression_dict,
&(results.back()), BlockType::kData, miter->get_context);
if (s.ok() && !results.back().IsEmpty()) {
// Found it in the cache. Add NULL handle to indicate there is
// nothing to read from disk
block_handles.emplace_back(BlockHandle::NullBlockHandle());
} else {
block_handles.emplace_back(handle);
total_len += handle.size();
}
}
if (total_len) {
char* scratch = nullptr;
// If the blocks need to be uncompressed and we don't need the
// compressed blocks, then we can use a contiguous block of
// memory to read in all the blocks as it will be temporary
// storage
// 1. If blocks are compressed and compressed block cache is there,
// alloc heap bufs
// 2. If blocks are uncompressed, alloc heap bufs
// 3. If blocks are compressed and no compressed block cache, use
// stack buf
if (rep_->table_options.block_cache_compressed == nullptr &&
rep_->blocks_maybe_compressed) {
if (total_len <= kMultiGetReadStackBufSize) {
scratch = stack_buf;
} else {
scratch = new char[total_len];
block_buf.reset(scratch);
}
}
MaybeLoadBlocksToCache(read_options,
&data_block_range, &block_handles, &statuses, &results,
scratch, uncompression_dict);
}
}
DataBlockIter first_biter;
DataBlockIter next_biter;
size_t idx_in_batch = 0;
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
++miter) {
Status s;
@ -3112,77 +3490,90 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
const Slice& key = miter->ikey;
bool matched = false; // if such user key matched a key in SST
bool done = false;
for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
bool first_block = true;
do {
DataBlockIter* biter = nullptr;
bool reusing_block = true;
uint64_t referenced_data_size = 0;
bool does_referenced_key_exist = false;
BlockCacheLookupContext lookup_data_block_context(
BlockCacheLookupCaller::kUserMGet);
if (iiter->value().offset() != offset) {
offset = iiter->value().offset();
biter.Invalidate(Status::OK());
if (first_block) {
if (!block_handles[idx_in_batch].IsNull() ||
!results[idx_in_batch].IsEmpty()) {
first_biter.Invalidate(Status::OK());
NewDataBlockIterator<DataBlockIter>(
read_options, results[idx_in_batch], &first_biter,
statuses[idx_in_batch]);
reusing_block = false;
}
biter = &first_biter;
idx_in_batch++;
} else {
next_biter.Invalidate(Status::OK());
NewDataBlockIterator<DataBlockIter>(
read_options, iiter->value(), &biter, BlockType::kData,
read_options, iiter->value(), &next_biter, BlockType::kData,
/*key_includes_seq=*/false,
/*index_key_is_full=*/true, get_context,
&lookup_data_block_context, Status(), nullptr);
biter = &next_biter;
reusing_block = false;
}
if (read_options.read_tier == kBlockCacheTier &&
biter.status().IsIncomplete()) {
biter->status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for
// whether we can guarantee the key is not there when "no_io" is set
get_context->MarkKeyMayExist();
break;
}
if (!biter.status().ok()) {
s = biter.status();
if (!biter->status().ok()) {
s = biter->status();
break;
}
bool may_exist = biter.SeekForGet(key);
bool may_exist = biter->SeekForGet(key);
if (!may_exist) {
// HashSeek cannot find the key this block and the the iter is not
// the end of the block, i.e. cannot be in the following blocks
// either. In this case, the seek_key cannot be found, so we break
// from the top level for-loop.
done = true;
} else {
// Call the *saver function on each entry/block until it returns false
for (; biter.Valid(); biter.Next()) {
ParsedInternalKey parsed_key;
Cleanable dummy;
Cleanable* value_pinner = nullptr;
break;
}
if (!ParseInternalKey(biter.key(), &parsed_key)) {
s = Status::Corruption(Slice());
}
if (biter.IsValuePinned()) {
if (reusing_block) {
Cache* block_cache = rep_->table_options.block_cache.get();
assert(biter.cache_handle() != nullptr);
block_cache->Ref(biter.cache_handle());
dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
biter.cache_handle());
value_pinner = &dummy;
} else {
value_pinner = &biter;
}
}
// Call the *saver function on each entry/block until it returns false
for (; biter->Valid(); biter->Next()) {
ParsedInternalKey parsed_key;
Cleanable dummy;
Cleanable* value_pinner = nullptr;
if (!get_context->SaveValue(parsed_key, biter.value(), &matched,
value_pinner)) {
does_referenced_key_exist = true;
referenced_data_size = biter.key().size() + biter.value().size();
done = true;
break;
if (!ParseInternalKey(biter->key(), &parsed_key)) {
s = Status::Corruption(Slice());
}
if (biter->IsValuePinned()) {
if (reusing_block) {
Cache* block_cache = rep_->table_options.block_cache.get();
assert(biter->cache_handle() != nullptr);
block_cache->Ref(biter->cache_handle());
dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
biter->cache_handle());
value_pinner = &dummy;
} else {
value_pinner = biter;
}
}
s = biter.status();
if (!get_context->SaveValue(
parsed_key, biter->value(), &matched, value_pinner)) {
does_referenced_key_exist = true;
referenced_data_size = biter->key().size() + biter->value().size();
done = true;
break;
}
s = biter->status();
}
// Write the block cache access.
if (block_cache_tracer_) {
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
// Avoid making copy of block_key, cf_name, and referenced_key when
// constructing the access record.
BlockCacheTraceRecord access_record(
@ -3200,11 +3591,18 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
access_record, lookup_data_block_context.block_key,
rep_->cf_name_for_tracing(), key);
}
s = biter->status();
if (done) {
// Avoid the extra Next which is expensive in two-level indexes
break;
}
}
if (first_block) {
iiter->Seek(key);
}
first_block = false;
iiter->Next();
} while (iiter->Valid());
if (matched && filter != nullptr && !filter->IsBlockBased()) {
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
@ -3328,7 +3726,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, false /*maybe_compressed*/,
false /* decompress */, false /*maybe_compressed*/, BlockType::kData,
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -3338,6 +3736,38 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
return s;
}
BlockType BlockBasedTable::GetBlockTypeForMetaBlockByName(
const Slice& meta_block_name) {
if (meta_block_name.starts_with(kFilterBlockPrefix) ||
meta_block_name.starts_with(kFullFilterBlockPrefix) ||
meta_block_name.starts_with(kPartitionedFilterBlockPrefix)) {
return BlockType::kFilter;
}
if (meta_block_name == kPropertiesBlock) {
return BlockType::kProperties;
}
if (meta_block_name == kCompressionDictBlock) {
return BlockType::kCompressionDictionary;
}
if (meta_block_name == kRangeDelBlock) {
return BlockType::kRangeDeletion;
}
if (meta_block_name == kHashIndexPrefixesBlock) {
return BlockType::kHashIndexPrefixes;
}
if (meta_block_name == kHashIndexPrefixesMetadataBlock) {
return BlockType::kHashIndexMetadata;
}
assert(false);
return BlockType::kInvalid;
}
Status BlockBasedTable::VerifyChecksumInMetaBlocks(
InternalIteratorBase<Slice>* index_iter) {
Status s;
@ -3350,13 +3780,15 @@ Status BlockBasedTable::VerifyChecksumInMetaBlocks(
Slice input = index_iter->value();
s = handle.DecodeFrom(&input);
BlockContents contents;
const Slice meta_block_name = index_iter->key();
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, false /*maybe_compressed*/,
GetBlockTypeForMetaBlockByName(meta_block_name),
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (s.IsCorruption() && index_iter->key() == kPropertiesBlock) {
if (s.IsCorruption() && meta_block_name == kPropertiesBlock) {
TableProperties* table_properties;
s = TryReadPropertiesWithGlobalSeqno(nullptr /* prefetch_buffer */,
index_iter->value(),
@ -3655,7 +4087,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, false /*maybe_compressed*/,
UncompressionDict::GetEmptyDict(),
BlockType::kFilter, UncompressionDict::GetEmptyDict(),
rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {

View File

@ -236,6 +236,12 @@ class BlockBasedTable : public TableReader {
BlockCacheLookupContext* lookup_context, Status s,
FilePrefetchBuffer* prefetch_buffer) const;
// input_iter: if it is not null, update this one and return it as Iterator
template <typename TBlockIter>
TBlockIter* NewDataBlockIterator(const ReadOptions& ro,
CachableEntry<Block>& block,
TBlockIter* input_iter, Status s) const;
class PartitionedIndexIteratorState;
friend class PartitionIndexReader;
@ -273,7 +279,8 @@ class BlockBasedTable : public TableReader {
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context) const;
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const;
// Similar to the above, with one crucial difference: it will retrieve the
// block from the file even if there are no caches configured (assuming the
@ -283,7 +290,22 @@ class BlockBasedTable : public TableReader {
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context,
BlockCacheLookupContext* lookup_context) const;
BlockCacheLookupContext* lookup_context,
bool use_cache) const;
Status GetDataBlockFromCache(
const ReadOptions& ro, const BlockHandle& handle,
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context) const;
void MaybeLoadBlocksToCache(
const ReadOptions& options, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector<
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
char* scratch, const UncompressionDict& uncompression_dict) const;
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
@ -407,6 +429,8 @@ class BlockBasedTable : public TableReader {
const BlockBasedTableOptions& table_options, const int level,
BlockCacheLookupContext* lookup_context);
static BlockType GetBlockTypeForMetaBlockByName(const Slice& meta_block_name);
Status VerifyChecksumInMetaBlocks(InternalIteratorBase<Slice>* index_iter);
Status VerifyChecksumInBlocks(InternalIteratorBase<BlockHandle>* index_iter);
@ -604,8 +628,7 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
BlockType block_type, bool key_includes_seq = true,
bool index_key_is_full = true,
bool for_compaction = false)
: InternalIteratorBase<TValue>(false),
table_(table),
: table_(table),
read_options_(read_options),
icomp_(icomp),
user_comparator_(icomp.user_comparator()),

View File

@ -5,6 +5,8 @@
#pragma once
#include <cstdint>
namespace rocksdb {
// Represents the types of blocks used in the block based table format.
@ -17,8 +19,12 @@ enum class BlockType : uint8_t {
kProperties,
kCompressionDictionary,
kRangeDeletion,
kHashIndexPrefixes,
kHashIndexMetadata,
kMetaIndex,
kIndex,
// Note: keep kInvalid the last value when adding new enum values.
kInvalid
};
} // namespace rocksdb

View File

@ -220,6 +220,26 @@ Status BlockFetcher::ReadBlockContents() {
&slice_, used_buf_);
}
PERF_COUNTER_ADD(block_read_count, 1);
// TODO: introduce dedicated perf counter for range tombstones
switch (block_type_) {
case BlockType::kFilter:
PERF_COUNTER_ADD(filter_block_read_count, 1);
break;
case BlockType::kCompressionDictionary:
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(index_block_read_count, 1);
break;
// Nothing to do here as we don't have counters for the other types.
default:
break;
}
PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
if (!status_.ok()) {
return status_;

View File

@ -10,6 +10,7 @@
#pragma once
#include "memory/memory_allocator.h"
#include "table/block_based/block.h"
#include "table/block_based/block_type.h"
#include "table/format.h"
namespace rocksdb {
@ -39,7 +40,7 @@ class BlockFetcher {
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ReadOptions& read_options, const BlockHandle& handle,
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed,
bool do_uncompress, bool maybe_compressed, BlockType block_type,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options,
MemoryAllocator* memory_allocator = nullptr,
@ -53,6 +54,7 @@ class BlockFetcher {
ioptions_(ioptions),
do_uncompress_(do_uncompress),
maybe_compressed_(maybe_compressed),
block_type_(block_type),
uncompression_dict_(uncompression_dict),
cache_options_(cache_options),
memory_allocator_(memory_allocator),
@ -72,6 +74,7 @@ class BlockFetcher {
const ImmutableCFOptions& ioptions_;
bool do_uncompress_;
bool maybe_compressed_;
BlockType block_type_;
const UncompressionDict& uncompression_dict_;
const PersistentCacheOptions& cache_options_;
MemoryAllocator* memory_allocator_;

View File

@ -26,7 +26,9 @@
#include "options/cf_options.h"
#include "port/port.h" // noexcept
#include "table/persistent_cache_options.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/xxhash.h"
namespace rocksdb {

View File

@ -20,8 +20,7 @@ class PinnedIteratorsManager;
template <class TValue>
class InternalIteratorBase : public Cleanable {
public:
InternalIteratorBase() : is_mutable_(true) {}
InternalIteratorBase(bool _is_mutable) : is_mutable_(_is_mutable) {}
InternalIteratorBase() {}
virtual ~InternalIteratorBase() {}
// An iterator is either positioned at a key/value pair, or
@ -120,7 +119,6 @@ class InternalIteratorBase : public Cleanable {
virtual Status GetProperty(std::string /*prop_name*/, std::string* /*prop*/) {
return Status::NotSupported("");
}
bool is_mutable() const { return is_mutable_; }
protected:
void SeekForPrevImpl(const Slice& target, const Comparator* cmp) {
@ -132,7 +130,6 @@ class InternalIteratorBase : public Cleanable {
Prev();
}
}
bool is_mutable_;
private:
// No copying allowed

View File

@ -69,12 +69,7 @@ class IteratorWrapperBase {
assert(!valid_ || iter_->status().ok());
}
void Prev() { assert(iter_); iter_->Prev(); Update(); }
void Seek(const Slice& k) {
TEST_SYNC_POINT("IteratorWrapper::Seek:0");
assert(iter_);
iter_->Seek(k);
Update();
}
void Seek(const Slice& k) { assert(iter_); iter_->Seek(k); Update(); }
void SeekForPrev(const Slice& k) {
assert(iter_);
iter_->SeekForPrev(k);

View File

@ -127,29 +127,14 @@ class MergingIterator : public InternalIterator {
}
void Seek(const Slice& target) override {
bool is_increasing_reseek = false;
if (current_ != nullptr && direction_ == kForward && status_.ok() &&
comparator_->Compare(target, key()) >= 0) {
is_increasing_reseek = true;
}
ClearHeaps();
status_ = Status::OK();
for (auto& child : children_) {
// If upper bound never changes, we can skip Seek() for
// the !Valid() case too, but people do hack the code to change
// upper bound between Seek(), so it's not a good idea to break
// the API.
// If DBIter is used on top of merging iterator, we probably
// can skip mutable child iterators if they are invalid too,
// but it's a less clean API. We can optimize for it later if
// needed.
if (!is_increasing_reseek || !child.Valid() ||
comparator_->Compare(target, child.key()) > 0 ||
child.iter()->is_mutable()) {
{
PERF_TIMER_GUARD(seek_child_seek_time);
child.Seek(target);
PERF_COUNTER_ADD(seek_child_seek_count, 1);
}
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) {
assert(child.status().ok());

View File

@ -216,7 +216,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, read_options, handle, &block_contents,
ioptions, false /* decompress */, false /*maybe_compressed*/,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
BlockType::kProperties, UncompressionDict::GetEmptyDict(), cache_options,
memory_allocator);
s = block_fetcher.ReadBlockContents();
// property block is never compressed. Need to add uncompress logic if we are
// to compress it..
@ -375,8 +376,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, memory_allocator);
false /*maybe_compressed*/, BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -446,7 +447,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
file, nullptr /* prefetch_buffer */, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions,
false /* do decompression */, false /*maybe_compressed*/,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
BlockType::kMetaIndex, UncompressionDict::GetEmptyDict(), cache_options,
memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -467,7 +469,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, uint64_t file_size,
uint64_t table_magic_number,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
const std::string& meta_block_name, BlockType block_type,
BlockContents* contents, bool /*compression_type_missing*/,
MemoryAllocator* memory_allocator) {
Status status;
@ -488,6 +490,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions,
false /* decompress */, false /*maybe_compressed*/,
BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), cache_options,
memory_allocator);
status = block_fetcher.ReadBlockContents();
@ -515,7 +518,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
// Reading metablock
BlockFetcher block_fetcher2(
file, prefetch_buffer, footer, read_options, block_handle, contents,
ioptions, false /* decompress */, false /*maybe_compressed*/,
ioptions, false /* decompress */, false /*maybe_compressed*/, block_type,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
return block_fetcher2.ReadBlockContents();
}

View File

@ -16,6 +16,7 @@
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/block_type.h"
#include "table/format.h"
#include "util/kv_map.h"
@ -143,7 +144,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, uint64_t file_size,
uint64_t table_magic_number,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
const std::string& meta_block_name, BlockType block_type,
BlockContents* contents,
bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr);

View File

@ -299,7 +299,7 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
Status s = ReadMetaBlock(file_info_.file.get(), nullptr /* prefetch_buffer */,
file_size_, kPlainTableMagicNumber, ioptions_,
PlainTableIndexBuilder::kPlainTableIndexBlock,
&index_block_contents,
BlockType::kIndex, &index_block_contents,
true /* compression_type_missing */);
bool index_in_file = s.ok();
@ -310,7 +310,8 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
if (index_in_file) {
s = ReadMetaBlock(file_info_.file.get(), nullptr /* prefetch_buffer */,
file_size_, kPlainTableMagicNumber, ioptions_,
BloomBlockBuilder::kBloomBlock, &bloom_block_contents,
BloomBlockBuilder::kBloomBlock, BlockType::kFilter,
&bloom_block_contents,
true /* compression_type_missing */);
bloom_in_file = s.ok() && bloom_block_contents.data.size() > 0;
}

View File

@ -2268,6 +2268,8 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
if (index_and_filter_in_cache) {
// data, index and filter block
ASSERT_EQ(get_perf_context()->block_read_count, 3);
ASSERT_EQ(get_perf_context()->index_block_read_count, 1);
ASSERT_EQ(get_perf_context()->filter_block_read_count, 1);
} else {
// just the data block
ASSERT_EQ(get_perf_context()->block_read_count, 1);
@ -2293,9 +2295,12 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
if (bloom_filter_type == 0) {
// with block-based, we read index and then the filter
ASSERT_EQ(get_perf_context()->block_read_count, 2);
ASSERT_EQ(get_perf_context()->index_block_read_count, 1);
ASSERT_EQ(get_perf_context()->filter_block_read_count, 1);
} else {
// with full-filter, we read filter first and then we stop
ASSERT_EQ(get_perf_context()->block_read_count, 1);
ASSERT_EQ(get_perf_context()->filter_block_read_count, 1);
}
} else {
// filter is already in memory and it figures out that the key doesn't
@ -3565,7 +3570,7 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
ASSERT_OK(ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size,
&footer, kBlockBasedTableMagicNumber));
auto BlockFetchHelper = [&](const BlockHandle& handle,
auto BlockFetchHelper = [&](const BlockHandle& handle, BlockType block_type,
BlockContents* contents) {
ReadOptions read_options;
read_options.verify_checksums = false;
@ -3574,8 +3579,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options, handle,
contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options);
false /*maybe_compressed*/, block_type,
UncompressionDict::GetEmptyDict(), cache_options);
ASSERT_OK(block_fetcher.ReadBlockContents());
};
@ -3584,7 +3589,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
auto metaindex_handle = footer.metaindex_handle();
BlockContents metaindex_contents;
BlockFetchHelper(metaindex_handle, &metaindex_contents);
BlockFetchHelper(metaindex_handle, BlockType::kMetaIndex,
&metaindex_contents);
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
@ -3601,7 +3607,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
ASSERT_OK(properties_handle.DecodeFrom(&v));
BlockContents properties_contents;
BlockFetchHelper(properties_handle, &properties_contents);
BlockFetchHelper(properties_handle, BlockType::kProperties,
&properties_contents);
Block properties_block(std::move(properties_contents),
kDisableGlobalSequenceNumber);
@ -3660,8 +3667,9 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
BlockFetcher block_fetcher(
table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(),
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
pcache_opts, nullptr /*memory_allocator*/);
false /*maybe_compressed*/, BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), pcache_opts,
nullptr /*memory_allocator*/);
ASSERT_OK(block_fetcher.ReadBlockContents());
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);

View File

@ -209,6 +209,10 @@ class BlockCacheTracer {
// Stop writing block cache accesses to the trace_writer.
void EndTrace();
bool is_tracing_enabled() const {
return writer_.load(std::memory_order_relaxed);
}
Status WriteBlockAccess(const BlockCacheTraceRecord& record,
const Slice& block_key, const Slice& cf_name,
const Slice& referenced_key);

View File

@ -192,6 +192,49 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
return s;
}
Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs,
size_t num_reqs) const {
Status s;
uint64_t elapsed = 0;
assert(!use_direct_io());
assert(!for_compaction_);
{
StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
IOSTATS_TIMER_GUARD(read_nanos);
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif // ROCKSDB_LITE
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->MultiRead(read_reqs, num_reqs);
}
for (size_t i = 0; i < num_reqs; ++i) {
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(read_reqs[i].offset,
read_reqs[i].result.size(), start_ts, finish_ts,
read_reqs[i].status);
}
#endif // ROCKSDB_LITE
IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
}
SetPerfLevel(prev_perf_level);
}
if (stats_ != nullptr && file_read_hist_ != nullptr) {
file_read_hist_->Add(elapsed);
}
return s;
}
Status WritableFileWriter::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();

View File

@ -160,6 +160,8 @@ class RandomAccessFileReader {
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;
Status MultiRead(ReadRequest* reqs, size_t num_reqs) const;
Status Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n);
}

View File

@ -1758,7 +1758,8 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
blob_files_.erase(bfile->BlobFileNumber());
Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
bfile->PathName(), blob_dir_, true);
bfile->PathName(), blob_dir_, true,
/*force_fg=*/false);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File failed to be deleted as obsolete %s",
@ -1848,7 +1849,8 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kBlobFile) {
Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true);
Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
/*force_fg=*/false);
if (status.ok() && !del.ok()) {
status = del;
}

View File

@ -235,6 +235,8 @@ class SimCacheImpl : public SimCache {
return cache_->GetUsage(handle);
}
size_t GetCharge(Handle* handle) const override { return cache_->GetCharge(handle); }
size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); }
void DisownData() override {