Compare commits
27 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e69ce1041f | ||
|
e3169e3ea8 | ||
|
62976c633e | ||
|
d46141e11d | ||
|
0eeeda89ff | ||
|
3c0bb7fdd4 | ||
|
e1eb14133a | ||
|
650acf64d6 | ||
|
172b9ec84c | ||
|
2407812292 | ||
|
d70f081aa7 | ||
|
b96d31b60f | ||
|
9840ed9eef | ||
|
7ff41fea6e | ||
|
a372c5b00f | ||
|
3a10ffe954 | ||
|
a28b2cf9ed | ||
|
1d65661837 | ||
|
fdc9b5248b | ||
|
d039c41b43 | ||
|
08d5a83499 | ||
|
ab3e6f6476 | ||
|
5042d04a96 | ||
|
7b1a12f182 | ||
|
ada6e87f5a | ||
|
4f3ed8622e | ||
|
222f84e2c3 |
25
HISTORY.md
25
HISTORY.md
@ -1,5 +1,23 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 6.4.6 (10/16/2019)
|
||||
### Bug Fixes
|
||||
* Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand.
|
||||
|
||||
## 6.4.5 (10/1/2019)
|
||||
### Bug Fixes
|
||||
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strange results when reseek happens with a different iterator upper bound.
|
||||
* Fix a bug in BlockBasedTableIterator that might return incorrect results when reseek happens with a different iterator upper bound.
|
||||
|
||||
## 6.4.4 (9/17/2019)
|
||||
* Fix a bug introduced 6.3 which could cause wrong results in a corner case when prefix bloom filter is used and the iterator is reseeked.
|
||||
|
||||
## 6.4.2 (9/3/2019)
|
||||
### Bug Fixes
|
||||
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
|
||||
|
||||
## 6.4.1 (8/20/2019)
|
||||
### Bug Fixes
|
||||
* Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0..
|
||||
|
||||
## 6.4.0 (7/30/2019)
|
||||
### Default Option Change
|
||||
@ -22,6 +40,7 @@
|
||||
* Add argument `--secondary_path` to ldb to open the database as the secondary instance. This would keep the original DB intact.
|
||||
* Compression dictionary blocks are now prefetched and pinned in the cache (based on the customer's settings) the same way as index and filter blocks.
|
||||
* Added DBOptions::log_readahead_size which specifies the number of bytes to prefetch when reading the log. This is mostly useful for reading a remotely located log, as it can save the number of round-trips. If 0 (default), then the prefetching is disabled.
|
||||
* Support loading custom objects in unit tests. In the affected unit tests, RocksDB will create custom Env objects based on environment variable TEST_ENV_URI. Users need to make sure custom object types are properly registered. For example, a static library should expose a `RegisterCustomObjects` function. By linking the unit test binary with the static library, the unit test can execute this function.
|
||||
|
||||
### Performance Improvements
|
||||
* Reduce iterator key comparision for upper/lower bound check.
|
||||
@ -31,7 +50,8 @@
|
||||
### Bug Fixes
|
||||
* Fix ingested file and directory not being fsync.
|
||||
* Return TryAgain status in place of Corruption when new tail is not visible to TransactionLogIterator.
|
||||
|
||||
* Fixed a regression where the fill_cache read option also affected index blocks.
|
||||
* Fixed an issue where using cache_index_and_filter_blocks==false affected partitions of partitioned indexes/filters as well.
|
||||
|
||||
## 6.3.1 (7/24/2019)
|
||||
### Bug Fixes
|
||||
@ -73,7 +93,6 @@
|
||||
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
|
||||
* On DB open, delete WAL trash files left behind in wal_dir
|
||||
|
||||
|
||||
## 6.2.0 (4/30/2019)
|
||||
### New Features
|
||||
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.
|
||||
|
15
Makefile
15
Makefile
@ -1662,7 +1662,7 @@ JAVA_INCLUDE = -I$(JAVA_HOME)/include/ -I$(JAVA_HOME)/include/linux
|
||||
ifeq ($(PLATFORM), OS_SOLARIS)
|
||||
ARCH := $(shell isainfo -b)
|
||||
else ifeq ($(PLATFORM), OS_OPENBSD)
|
||||
ifneq (,$(filter $(MACHINE), amd64 arm64 sparc64 aarch64))
|
||||
ifneq (,$(filter $(MACHINE), amd64 arm64 aarch64 sparc64))
|
||||
ARCH := 64
|
||||
else
|
||||
ARCH := 32
|
||||
@ -1671,14 +1671,11 @@ else
|
||||
ARCH := $(shell getconf LONG_BIT)
|
||||
endif
|
||||
|
||||
ifeq (,$(findstring ppc,$(MACHINE)))
|
||||
ifeq (,$(filter $(MACHINE), ppc arm64 aarch64 sparc64))
|
||||
ROCKSDBJNILIB = librocksdbjni-linux$(ARCH).so
|
||||
else
|
||||
ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE).so
|
||||
endif
|
||||
ifneq (,$(findstring aarch64,$(MACHINE)))
|
||||
ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE).so
|
||||
endif
|
||||
ROCKSDB_JAR = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-linux$(ARCH).jar
|
||||
ROCKSDB_JAR_ALL = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH).jar
|
||||
ROCKSDB_JAVADOCS_JAR = rocksdbjni-$(ROCKSDB_MAJOR).$(ROCKSDB_MINOR).$(ROCKSDB_PATCH)-javadoc.jar
|
||||
@ -1875,6 +1872,14 @@ rocksdbjavastaticdockerppc64le:
|
||||
mkdir -p java/target
|
||||
docker run --rm --name rocksdb_linux_ppc64le-be --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --env DEBUG_LEVEL=$(DEBUG_LEVEL) evolvedbinary/rocksjava:centos7_ppc64le-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh
|
||||
|
||||
rocksdbjavastaticdockerarm64v8:
|
||||
mkdir -p java/target
|
||||
DOCKER_LINUX_ARM64V8_CONTAINER=`docker ps -aqf name=rocksdb_linux_arm64v8-be`; \
|
||||
if [ -z "$$DOCKER_LINUX_ARM64V8_CONTAINER" ]; then \
|
||||
docker container create --attach stdin --attach stdout --attach stderr --volume `pwd`:/rocksdb-host --name rocksdb_linux_arm64v8-be evolvedbinary/rocksjava:centos7_arm64v8-be /rocksdb-host/java/crossbuild/docker-build-linux-centos.sh; \
|
||||
fi
|
||||
docker start -a rocksdb_linux_arm64v8-be
|
||||
|
||||
rocksdbjavastaticpublish: rocksdbjavastaticrelease rocksdbjavastaticpublishcentral
|
||||
|
||||
rocksdbjavastaticpublishdocker: rocksdbjavastaticreleasedocker rocksdbjavastaticpublishcentral
|
||||
|
@ -4,12 +4,31 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
from targets_builder import TARGETSBuilder
|
||||
import json
|
||||
import os
|
||||
import fnmatch
|
||||
import sys
|
||||
|
||||
from util import ColorString
|
||||
|
||||
# This script generates TARGETS file for Buck.
|
||||
# Buck is a build tool specifying dependencies among different build targets.
|
||||
# User can pass extra dependencies as a JSON object via command line, and this
|
||||
# script can include these dependencies in the generate TARGETS file.
|
||||
# Usage:
|
||||
# $python buckifier/buckify_rocksdb.py
|
||||
# (This generates a TARGET file without user-specified dependency for unit
|
||||
# tests.)
|
||||
# $python buckifier/buckify_rocksdb.py \
|
||||
# '{"fake": { \
|
||||
# "extra_deps": [":test_dep", "//fakes/module:mock1"], \
|
||||
# "extra_compiler_flags": ["-DROCKSDB_LITE", "-Os"], \
|
||||
# } \
|
||||
# }'
|
||||
# (Generated TARGETS file has test_dep and mock1 as dependencies for RocksDB
|
||||
# unit tests, and will use the extra_compiler_flags to compile the unit test
|
||||
# source.)
|
||||
|
||||
# tests to export as libraries for inclusion in other projects
|
||||
_EXPORTED_TEST_LIBS = ["env_basic_test"]
|
||||
|
||||
@ -86,8 +105,38 @@ def get_tests(repo_path):
|
||||
return tests
|
||||
|
||||
|
||||
# Parse extra dependencies passed by user from command line
|
||||
def get_dependencies():
|
||||
deps_map = {
|
||||
''.encode('ascii'): {
|
||||
'extra_deps'.encode('ascii'): [],
|
||||
'extra_compiler_flags'.encode('ascii'): []
|
||||
}
|
||||
}
|
||||
if len(sys.argv) < 2:
|
||||
return deps_map
|
||||
|
||||
def encode_dict(data):
|
||||
rv = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(k, unicode):
|
||||
k = k.encode('ascii')
|
||||
if isinstance(v, unicode):
|
||||
v = v.encode('ascii')
|
||||
elif isinstance(v, list):
|
||||
v = [x.encode('ascii') for x in v]
|
||||
elif isinstance(v, dict):
|
||||
v = encode_dict(v)
|
||||
rv[k] = v
|
||||
return rv
|
||||
extra_deps = json.loads(sys.argv[1], object_hook=encode_dict)
|
||||
for target_alias, deps in extra_deps.items():
|
||||
deps_map[target_alias] = deps
|
||||
return deps_map
|
||||
|
||||
|
||||
# Prepare TARGETS file for buck
|
||||
def generate_targets(repo_path):
|
||||
def generate_targets(repo_path, deps_map):
|
||||
print(ColorString.info("Generating TARGETS"))
|
||||
# parsed src.mk file
|
||||
src_mk = parse_src_mk(repo_path)
|
||||
@ -121,24 +170,33 @@ def generate_targets(repo_path):
|
||||
["test_util/testutil.cc"],
|
||||
[":rocksdb_lib"])
|
||||
|
||||
print("Extra dependencies:\n{0}".format(str(deps_map)))
|
||||
# test for every test we found in the Makefile
|
||||
for test in sorted(tests):
|
||||
match_src = [src for src in cc_files if ("/%s.c" % test) in src]
|
||||
if len(match_src) == 0:
|
||||
print(ColorString.warning("Cannot find .cc file for %s" % test))
|
||||
continue
|
||||
elif len(match_src) > 1:
|
||||
print(ColorString.warning("Found more than one .cc for %s" % test))
|
||||
print(match_src)
|
||||
continue
|
||||
for target_alias, deps in deps_map.items():
|
||||
for test in sorted(tests):
|
||||
match_src = [src for src in cc_files if ("/%s.c" % test) in src]
|
||||
if len(match_src) == 0:
|
||||
print(ColorString.warning("Cannot find .cc file for %s" % test))
|
||||
continue
|
||||
elif len(match_src) > 1:
|
||||
print(ColorString.warning("Found more than one .cc for %s" % test))
|
||||
print(match_src)
|
||||
continue
|
||||
|
||||
assert(len(match_src) == 1)
|
||||
is_parallel = tests[test]
|
||||
TARGETS.register_test(test, match_src[0], is_parallel)
|
||||
assert(len(match_src) == 1)
|
||||
is_parallel = tests[test]
|
||||
test_target_name = \
|
||||
test if not target_alias else test + "_" + target_alias
|
||||
TARGETS.register_test(
|
||||
test_target_name,
|
||||
match_src[0],
|
||||
is_parallel,
|
||||
deps['extra_deps'],
|
||||
deps['extra_compiler_flags'])
|
||||
|
||||
if test in _EXPORTED_TEST_LIBS:
|
||||
test_library = "%s_lib" % test
|
||||
TARGETS.add_library(test_library, match_src, [":rocksdb_test_lib"])
|
||||
if test in _EXPORTED_TEST_LIBS:
|
||||
test_library = "%s_lib" % test_target_name
|
||||
TARGETS.add_library(test_library, match_src, [":rocksdb_test_lib"])
|
||||
TARGETS.flush_tests()
|
||||
|
||||
print(ColorString.info("Generated TARGETS Summary:"))
|
||||
@ -163,8 +221,9 @@ def exit_with_error(msg):
|
||||
|
||||
|
||||
def main():
|
||||
deps_map = get_dependencies()
|
||||
# Generate TARGETS file for buck
|
||||
ok = generate_targets(get_rocksdb_path())
|
||||
ok = generate_targets(get_rocksdb_path(), deps_map)
|
||||
if not ok:
|
||||
exit_with_error("Failed to generate TARGETS files")
|
||||
|
||||
|
@ -51,14 +51,21 @@ class TARGETSBuilder:
|
||||
pretty_list(deps)))
|
||||
self.total_bin = self.total_bin + 1
|
||||
|
||||
def register_test(self, test_name, src, is_parallel):
|
||||
def register_test(self,
|
||||
test_name,
|
||||
src,
|
||||
is_parallel,
|
||||
extra_deps,
|
||||
extra_compiler_flags):
|
||||
exec_mode = "serial"
|
||||
if is_parallel:
|
||||
exec_mode = "parallel"
|
||||
self.tests_cfg += targets_cfg.test_cfg_template % (
|
||||
test_name,
|
||||
str(src),
|
||||
str(exec_mode))
|
||||
str(exec_mode),
|
||||
extra_deps,
|
||||
extra_compiler_flags)
|
||||
|
||||
self.total_test = self.total_test + 1
|
||||
|
||||
|
@ -140,11 +140,13 @@ test_cfg_template = """ [
|
||||
"%s",
|
||||
"%s",
|
||||
"%s",
|
||||
%s,
|
||||
%s,
|
||||
],
|
||||
"""
|
||||
|
||||
unittests_template = """
|
||||
# [test_name, test_src, test_type]
|
||||
# [test_name, test_src, test_type, extra_deps, extra_compiler_flags]
|
||||
ROCKS_TESTS = [
|
||||
%s]
|
||||
|
||||
@ -153,6 +155,8 @@ ROCKS_TESTS = [
|
||||
# will not be included.
|
||||
[
|
||||
test_binary(
|
||||
extra_compiler_flags = extra_compiler_flags,
|
||||
extra_deps = extra_deps,
|
||||
parallelism = parallelism,
|
||||
rocksdb_arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
|
||||
rocksdb_compiler_flags = ROCKSDB_COMPILER_FLAGS,
|
||||
@ -163,7 +167,7 @@ ROCKS_TESTS = [
|
||||
test_cc = test_cc,
|
||||
test_name = test_name,
|
||||
)
|
||||
for test_name, test_cc, parallelism in ROCKS_TESTS
|
||||
for test_name, test_cc, parallelism, extra_deps, extra_compiler_flags in ROCKS_TESTS
|
||||
if not is_opt_mode
|
||||
]
|
||||
"""
|
||||
|
@ -56,10 +56,10 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007" ]; then
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
else
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -17,9 +17,11 @@
|
||||
#include "memtable/hash_skiplist_rep.h"
|
||||
#include "options/options_parser.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/utilities/object_registry.h"
|
||||
#include "test_util/fault_injection_test_env.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "test_util/testharness.h"
|
||||
@ -60,8 +62,20 @@ class EnvCounter : public EnvWrapper {
|
||||
|
||||
class ColumnFamilyTestBase : public testing::Test {
|
||||
public:
|
||||
ColumnFamilyTestBase(uint32_t format) : rnd_(139), format_(format) {
|
||||
env_ = new EnvCounter(Env::Default());
|
||||
explicit ColumnFamilyTestBase(uint32_t format) : rnd_(139), format_(format) {
|
||||
Env* base_env = Env::Default();
|
||||
#ifndef ROCKSDB_LITE
|
||||
const char* test_env_uri = getenv("TEST_ENV_URI");
|
||||
if (test_env_uri) {
|
||||
Status s = ObjectRegistry::NewInstance()->NewSharedObject(test_env_uri,
|
||||
&env_guard_);
|
||||
base_env = env_guard_.get();
|
||||
EXPECT_OK(s);
|
||||
EXPECT_NE(Env::Default(), base_env);
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
EXPECT_NE(nullptr, base_env);
|
||||
env_ = new EnvCounter(base_env);
|
||||
dbname_ = test::PerThreadDBPath("column_family_test");
|
||||
db_options_.create_if_missing = true;
|
||||
db_options_.fail_if_options_file_error = true;
|
||||
@ -532,6 +546,7 @@ class ColumnFamilyTestBase : public testing::Test {
|
||||
std::string dbname_;
|
||||
DB* db_ = nullptr;
|
||||
EnvCounter* env_;
|
||||
std::shared_ptr<Env> env_guard_;
|
||||
Random rnd_;
|
||||
uint32_t format_;
|
||||
};
|
||||
@ -3312,7 +3327,17 @@ TEST_P(ColumnFamilyTest, MultipleCFPathsTest) {
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
extern "C" {
|
||||
void RegisterCustomObjects(int argc, char** argv);
|
||||
}
|
||||
#else
|
||||
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
|
||||
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
RegisterCustomObjects(argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ class SnapshotListFetchCallback {
|
||||
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
|
||||
SequenceNumber max) = 0;
|
||||
inline bool TimeToRefresh(const size_t key_index) {
|
||||
assert(snap_refresh_nanos_ != 0);
|
||||
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
|
||||
if ((key_index & every_nth_key_minus_one_) != 0) {
|
||||
return false;
|
||||
|
@ -964,7 +964,7 @@ TEST_F(CompactionJobTest, SnapshotRefresh) {
|
||||
public:
|
||||
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
|
||||
std::vector<SequenceNumber>* snapshots)
|
||||
: SnapshotListFetchCallback(env, 0 /*no time delay*/,
|
||||
: SnapshotListFetchCallback(env, 1 /*short time delay*/,
|
||||
1 /*fetch after each key*/),
|
||||
rand_(rand),
|
||||
snapshots_(snapshots) {}
|
||||
|
@ -1776,8 +1776,17 @@ INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
extern "C" {
|
||||
void RegisterCustomObjects(int argc, char** argv);
|
||||
}
|
||||
#else
|
||||
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
|
||||
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
RegisterCustomObjects(argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
@ -3696,9 +3696,9 @@ Status DBImpl::IngestExternalFiles(
|
||||
exec_results.emplace_back(false, Status::OK());
|
||||
}
|
||||
// TODO(yanqin) maybe make jobs run in parallel
|
||||
uint64_t start_file_number = next_file_number;
|
||||
for (size_t i = 1; i != num_cfs; ++i) {
|
||||
uint64_t start_file_number =
|
||||
next_file_number + args[i - 1].external_files.size();
|
||||
start_file_number += args[i - 1].external_files.size();
|
||||
auto* cfd =
|
||||
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
||||
|
@ -1008,8 +1008,10 @@ Status DBImpl::CompactFilesImpl(
|
||||
c->mutable_cf_options()->paranoid_file_checks,
|
||||
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
||||
&compaction_job_stats, Env::Priority::USER,
|
||||
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
|
||||
: nullptr);
|
||||
immutable_db_options_.max_subcompactions <= 1 &&
|
||||
c->mutable_cf_options()->snap_refresh_nanos > 0
|
||||
? &fetch_callback
|
||||
: nullptr);
|
||||
|
||||
// Creating a compaction influences the compaction score because the score
|
||||
// takes running compactions into account (by skipping files that are already
|
||||
@ -2737,8 +2739,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
|
||||
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
||||
&compaction_job_stats, thread_pri,
|
||||
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
|
||||
: nullptr);
|
||||
immutable_db_options_.max_subcompactions <= 1 &&
|
||||
c->mutable_cf_options()->snap_refresh_nanos > 0
|
||||
? &fetch_callback
|
||||
: nullptr);
|
||||
compaction_job.Prepare();
|
||||
|
||||
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
|
||||
|
@ -182,6 +182,33 @@ TEST_P(DBIteratorTest, IterSeekBeforePrev) {
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST_P(DBIteratorTest, IterReseekNewUpperBound) {
|
||||
Random rnd(301);
|
||||
Options options = CurrentOptions();
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 1024;
|
||||
table_options.block_size_deviation = 50;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.compression = kNoCompression;
|
||||
Reopen(options);
|
||||
|
||||
ASSERT_OK(Put("a", RandomString(&rnd, 400)));
|
||||
ASSERT_OK(Put("aabb", RandomString(&rnd, 400)));
|
||||
ASSERT_OK(Put("aaef", RandomString(&rnd, 400)));
|
||||
ASSERT_OK(Put("b", RandomString(&rnd, 400)));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
ReadOptions opts;
|
||||
Slice ub = Slice("aa");
|
||||
opts.iterate_upper_bound = &ub;
|
||||
auto iter = NewIterator(opts);
|
||||
iter->Seek(Slice("a"));
|
||||
ub = Slice("b");
|
||||
iter->Seek(Slice("aabc"));
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().ToString(), "aaef");
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
|
||||
ASSERT_OK(Put("a", "b"));
|
||||
ASSERT_OK(Put("c", "d"));
|
||||
@ -2690,75 +2717,6 @@ TEST_P(DBIteratorTest, AvoidReseekLevelIterator) {
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_P(DBIteratorTest, AvoidReseekChildIterator) {
|
||||
Options options = CurrentOptions();
|
||||
options.compression = CompressionType::kNoCompression;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 800;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
std::string random_str = RandomString(&rnd, 180);
|
||||
|
||||
ASSERT_OK(Put("1", random_str));
|
||||
ASSERT_OK(Put("2", random_str));
|
||||
ASSERT_OK(Put("3", random_str));
|
||||
ASSERT_OK(Put("4", random_str));
|
||||
ASSERT_OK(Put("8", random_str));
|
||||
ASSERT_OK(Put("9", random_str));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("5", random_str));
|
||||
ASSERT_OK(Put("6", random_str));
|
||||
ASSERT_OK(Put("7", random_str));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
// These two keys will be kept in memtable.
|
||||
ASSERT_OK(Put("0", random_str));
|
||||
ASSERT_OK(Put("8", random_str));
|
||||
|
||||
int num_iter_wrapper_seek = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"IteratorWrapper::Seek:0",
|
||||
[&](void* /*arg*/) { num_iter_wrapper_seek++; });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
{
|
||||
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
|
||||
iter->Seek("1");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
// DBIter always wraps internal iterator with IteratorWrapper,
|
||||
// and in merging iterator each child iterator will be wrapped
|
||||
// with IteratorWrapper.
|
||||
ASSERT_EQ(4, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 1 and 5
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("2");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(3, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 2 and 5
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("6");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(4, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 8 and 6
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("7");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(3, num_iter_wrapper_seek);
|
||||
|
||||
// child position: 8 and 7
|
||||
num_iter_wrapper_seek = 0;
|
||||
iter->Seek("5");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(4, num_iter_wrapper_seek);
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
// MyRocks may change iterate bounds before seek. Simply test to make sure such
|
||||
// usage doesn't break iterator.
|
||||
TEST_P(DBIteratorTest, IterateBoundChangedBeforeSeek) {
|
||||
|
@ -6180,8 +6180,17 @@ TEST_F(DBTest, LargeBlockSizeTest) {
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
extern "C" {
|
||||
void RegisterCustomObjects(int argc, char** argv);
|
||||
}
|
||||
#else
|
||||
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
|
||||
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
RegisterCustomObjects(argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
@ -3772,6 +3772,46 @@ TEST_F(DBTest2, CloseWithUnreleasedSnapshot) {
|
||||
db_ = nullptr;
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, PrefixBloomReseek) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||
bbto.whole_key_filtering = false;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// Construct two L1 files with keys:
|
||||
// f1:[aaa1 ccc1] f2:[ddd0]
|
||||
ASSERT_OK(Put("aaa1", ""));
|
||||
ASSERT_OK(Put("ccc1", ""));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("ddd0", ""));
|
||||
ASSERT_OK(Flush());
|
||||
CompactRangeOptions cro;
|
||||
cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
|
||||
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
|
||||
|
||||
ASSERT_OK(Put("bbb1", ""));
|
||||
|
||||
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||
|
||||
// Seeking into f1, the iterator will check bloom filter which returns the
|
||||
// file iterator ot be invalidate, and the cursor will put into f2, with
|
||||
// the next key to be "ddd0".
|
||||
iter->Seek("bbb1");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ("bbb1", iter->key().ToString());
|
||||
|
||||
// Reseek ccc1, the L1 iterator needs to go back to f1 and reseek.
|
||||
iter->Seek("ccc1");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ("ccc1", iter->key().ToString());
|
||||
|
||||
delete iter;
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(DBTest2, RowCacheSnapshot) {
|
||||
Options options = CurrentOptions();
|
||||
@ -3819,8 +3859,17 @@ TEST_F(DBTest2, RowCacheSnapshot) {
|
||||
#endif // ROCKSDB_LITE
|
||||
} // namespace rocksdb
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
extern "C" {
|
||||
void RegisterCustomObjects(int argc, char** argv);
|
||||
}
|
||||
#else
|
||||
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
|
||||
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
rocksdb::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
RegisterCustomObjects(argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "db/db_test_util.h"
|
||||
#include "db/forward_iterator.h"
|
||||
#include "rocksdb/env_encryption.h"
|
||||
#include "rocksdb/utilities/object_registry.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -47,20 +48,32 @@ ROT13BlockCipher rot13Cipher_(16);
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
DBTestBase::DBTestBase(const std::string path)
|
||||
: mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())),
|
||||
#ifndef ROCKSDB_LITE
|
||||
encrypted_env_(
|
||||
!getenv("ENCRYPTED_ENV")
|
||||
? nullptr
|
||||
: NewEncryptedEnv(mem_env_ ? mem_env_ : Env::Default(),
|
||||
new CTREncryptionProvider(rot13Cipher_))),
|
||||
#else
|
||||
: mem_env_(nullptr),
|
||||
encrypted_env_(nullptr),
|
||||
#endif // ROCKSDB_LITE
|
||||
env_(new SpecialEnv(encrypted_env_
|
||||
? encrypted_env_
|
||||
: (mem_env_ ? mem_env_ : Env::Default()))),
|
||||
option_config_(kDefault) {
|
||||
Env* base_env = Env::Default();
|
||||
#ifndef ROCKSDB_LITE
|
||||
const char* test_env_uri = getenv("TEST_ENV_URI");
|
||||
if (test_env_uri) {
|
||||
Status s = ObjectRegistry::NewInstance()->NewSharedObject(test_env_uri,
|
||||
&env_guard_);
|
||||
base_env = env_guard_.get();
|
||||
EXPECT_OK(s);
|
||||
EXPECT_NE(Env::Default(), base_env);
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
EXPECT_NE(nullptr, base_env);
|
||||
if (getenv("MEM_ENV")) {
|
||||
mem_env_ = new MockEnv(base_env);
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (getenv("ENCRYPTED_ENV")) {
|
||||
encrypted_env_ = NewEncryptedEnv(mem_env_ ? mem_env_ : base_env,
|
||||
new CTREncryptionProvider(rot13Cipher_));
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
env_ = new SpecialEnv(encrypted_env_ ? encrypted_env_
|
||||
: (mem_env_ ? mem_env_ : base_env));
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
dbname_ = test::PerThreadDBPath(env_, path);
|
||||
|
@ -702,6 +702,7 @@ class DBTestBase : public testing::Test {
|
||||
MockEnv* mem_env_;
|
||||
Env* encrypted_env_;
|
||||
SpecialEnv* env_;
|
||||
std::shared_ptr<Env> env_guard_;
|
||||
DB* db_;
|
||||
std::vector<ColumnFamilyHandle*> handles_;
|
||||
|
||||
|
@ -160,7 +160,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
// We failed, remove all files that we copied into the db
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
Status s = env_->DeleteFile(f.internal_file_path);
|
||||
if (!s.ok()) {
|
||||
@ -291,6 +291,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
|
||||
// We failed to add the files to the database
|
||||
// remove all the files we copied
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
continue;
|
||||
}
|
||||
Status s = env_->DeleteFile(f.internal_file_path);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
|
@ -2369,10 +2369,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
new FaultInjectionTestEnv(env_));
|
||||
Options options = CurrentOptions();
|
||||
options.env = fault_injection_env.get();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2386,6 +2387,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2393,8 +2397,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
-1, true, true_data);
|
||||
ASSERT_OK(s);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2426,10 +2431,11 @@ TEST_P(ExternalSSTFileTest,
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.env = fault_injection_env.get();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
|
||||
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
|
||||
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
|
||||
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
|
||||
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
|
||||
for (size_t i = 0; i != handles_.size(); ++i) {
|
||||
int cf = static_cast<int>(i);
|
||||
const auto& orig_data = data_before_ingestion[i];
|
||||
@ -2442,6 +2448,7 @@ TEST_P(ExternalSSTFileTest,
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2455,6 +2462,8 @@ TEST_P(ExternalSSTFileTest,
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2508,10 +2517,11 @@ TEST_P(ExternalSSTFileTest,
|
||||
dbfull()->ReleaseSnapshot(read_opts.snapshot);
|
||||
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
// Should see consistent state after ingestion for all column families even
|
||||
// without snapshot.
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2541,10 +2551,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
||||
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2558,6 +2569,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2577,8 +2591,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
||||
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2607,10 +2622,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
||||
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2624,6 +2640,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2643,8 +2661,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
||||
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2664,7 +2683,7 @@ TEST_P(ExternalSSTFileTest,
|
||||
Options options = CurrentOptions();
|
||||
options.env = fault_injection_env.get();
|
||||
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
|
||||
SyncPoint::GetInstance()->ClearTrace();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
@ -2682,6 +2701,7 @@ TEST_P(ExternalSSTFileTest,
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2695,6 +2715,8 @@ TEST_P(ExternalSSTFileTest,
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2715,8 +2737,9 @@ TEST_P(ExternalSSTFileTest,
|
||||
fault_injection_env->DropUnsyncedFileData();
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
|
@ -859,8 +859,7 @@ class LevelIterator final : public InternalIterator {
|
||||
bool skip_filters, int level, RangeDelAggregator* range_del_agg,
|
||||
const std::vector<AtomicCompactionUnitBoundary>*
|
||||
compaction_boundaries = nullptr)
|
||||
: InternalIterator(false),
|
||||
table_cache_(table_cache),
|
||||
: table_cache_(table_cache),
|
||||
read_options_(read_options),
|
||||
env_options_(env_options),
|
||||
icomparator_(icomparator),
|
||||
|
8
defs.bzl
8
defs.bzl
@ -12,7 +12,9 @@ def test_binary(
|
||||
rocksdb_compiler_flags,
|
||||
rocksdb_preprocessor_flags,
|
||||
rocksdb_external_deps,
|
||||
rocksdb_os_deps):
|
||||
rocksdb_os_deps,
|
||||
extra_deps,
|
||||
extra_compiler_flags):
|
||||
TEST_RUNNER = native.package_name() + "/buckifier/rocks_test_runner.sh"
|
||||
|
||||
ttype = "gtest" if parallelism == "parallel" else "simple"
|
||||
@ -23,9 +25,9 @@ def test_binary(
|
||||
srcs = [test_cc],
|
||||
arch_preprocessor_flags = rocksdb_arch_preprocessor_flags,
|
||||
os_preprocessor_flags = rocksdb_os_preprocessor_flags,
|
||||
compiler_flags = rocksdb_compiler_flags,
|
||||
compiler_flags = rocksdb_compiler_flags + extra_compiler_flags,
|
||||
preprocessor_flags = rocksdb_preprocessor_flags,
|
||||
deps = [":rocksdb_test_lib"],
|
||||
deps = [":rocksdb_test_lib"] + extra_deps,
|
||||
os_deps = rocksdb_os_deps,
|
||||
external_deps = rocksdb_external_deps,
|
||||
)
|
||||
|
1
env/io_posix.cc
vendored
1
env/io_posix.cc
vendored
@ -14,6 +14,7 @@
|
||||
#include <algorithm>
|
||||
#if defined(OS_LINUX)
|
||||
#include <linux/fs.h>
|
||||
#include <linux/falloc.h>
|
||||
#endif
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 4
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_PATCH 6
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -12,17 +12,17 @@ cd /rocksdb-local
|
||||
if hash scl 2>/dev/null; then
|
||||
if scl --list | grep -q 'devtoolset-7'; then
|
||||
scl enable devtoolset-7 'make jclean clean'
|
||||
scl enable devtoolset-7 'PORTABLE=1 make -j6 rocksdbjavastatic'
|
||||
scl enable devtoolset-7 'PORTABLE=1 make -j2 rocksdbjavastatic'
|
||||
elif scl --list | grep -q 'devtoolset-2'; then
|
||||
scl enable devtoolset-2 'make jclean clean'
|
||||
scl enable devtoolset-2 'PORTABLE=1 make -j6 rocksdbjavastatic'
|
||||
scl enable devtoolset-2 'PORTABLE=1 make -j2 rocksdbjavastatic'
|
||||
else
|
||||
echo "Could not find devtoolset"
|
||||
exit 1;
|
||||
fi
|
||||
else
|
||||
make jclean clean
|
||||
PORTABLE=1 make -j6 rocksdbjavastatic
|
||||
PORTABLE=1 make -j2 rocksdbjavastatic
|
||||
fi
|
||||
|
||||
cp java/target/librocksdbjni-linux*.so java/target/rocksdbjni-*-linux*.jar /rocksdb-host/java/target
|
||||
|
@ -181,8 +181,8 @@ std::unique_ptr<FilterBlockReader> BlockBasedFilterBlockReader::Create(
|
||||
CachableEntry<BlockContents> filter_block;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(),
|
||||
nullptr /* get_context */, lookup_context,
|
||||
&filter_block);
|
||||
use_cache, nullptr /* get_context */,
|
||||
lookup_context, &filter_block);
|
||||
if (!s.ok()) {
|
||||
return std::unique_ptr<FilterBlockReader>();
|
||||
}
|
||||
|
@ -68,6 +68,57 @@ BlockBasedTable::~BlockBasedTable() {
|
||||
|
||||
std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0);
|
||||
|
||||
template <typename TBlocklike>
|
||||
class BlocklikeTraits;
|
||||
|
||||
template <>
|
||||
class BlocklikeTraits<BlockContents> {
|
||||
public:
|
||||
static BlockContents* Create(BlockContents&& contents,
|
||||
SequenceNumber /* global_seqno */,
|
||||
size_t /* read_amp_bytes_per_bit */,
|
||||
Statistics* /* statistics */,
|
||||
bool /* using_zstd */) {
|
||||
return new BlockContents(std::move(contents));
|
||||
}
|
||||
|
||||
static uint32_t GetNumRestarts(const BlockContents& /* contents */) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class BlocklikeTraits<Block> {
|
||||
public:
|
||||
static Block* Create(BlockContents&& contents, SequenceNumber global_seqno,
|
||||
size_t read_amp_bytes_per_bit, Statistics* statistics,
|
||||
bool /* using_zstd */) {
|
||||
return new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
|
||||
statistics);
|
||||
}
|
||||
|
||||
static uint32_t GetNumRestarts(const Block& block) {
|
||||
return block.NumRestarts();
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class BlocklikeTraits<UncompressionDict> {
|
||||
public:
|
||||
static UncompressionDict* Create(BlockContents&& contents,
|
||||
SequenceNumber /* global_seqno */,
|
||||
size_t /* read_amp_bytes_per_bit */,
|
||||
Statistics* /* statistics */,
|
||||
bool using_zstd) {
|
||||
return new UncompressionDict(contents.data, std::move(contents.allocation),
|
||||
using_zstd);
|
||||
}
|
||||
|
||||
static uint32_t GetNumRestarts(const UncompressionDict& /* dict */) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
namespace {
|
||||
// Read the block identified by "handle" from "file".
|
||||
// The only relevant option is options.verify_checksums for now.
|
||||
@ -75,15 +126,16 @@ namespace {
|
||||
// On success fill *result and return OK - caller owns *result
|
||||
// @param uncompression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
template <typename TBlocklike>
|
||||
Status ReadBlockFromFile(
|
||||
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
|
||||
const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
|
||||
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
|
||||
std::unique_ptr<TBlocklike>* result, const ImmutableCFOptions& ioptions,
|
||||
bool do_uncompress, bool maybe_compressed, BlockType block_type,
|
||||
const UncompressionDict& uncompression_dict,
|
||||
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
|
||||
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
|
||||
bool for_compaction = false) {
|
||||
bool for_compaction, bool using_zstd) {
|
||||
assert(result);
|
||||
|
||||
BlockContents contents;
|
||||
@ -93,34 +145,9 @@ Status ReadBlockFromFile(
|
||||
cache_options, memory_allocator, nullptr, for_compaction);
|
||||
Status s = block_fetcher.ReadBlockContents();
|
||||
if (s.ok()) {
|
||||
result->reset(new Block(std::move(contents), global_seqno,
|
||||
read_amp_bytes_per_bit, ioptions.statistics));
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
Status ReadBlockFromFile(
|
||||
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
|
||||
const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
|
||||
std::unique_ptr<BlockContents>* result, const ImmutableCFOptions& ioptions,
|
||||
bool do_uncompress, bool maybe_compressed, BlockType block_type,
|
||||
const UncompressionDict& uncompression_dict,
|
||||
const PersistentCacheOptions& cache_options,
|
||||
SequenceNumber /* global_seqno */, size_t /* read_amp_bytes_per_bit */,
|
||||
MemoryAllocator* memory_allocator, bool for_compaction = false) {
|
||||
assert(result);
|
||||
|
||||
result->reset(new BlockContents);
|
||||
|
||||
BlockFetcher block_fetcher(
|
||||
file, prefetch_buffer, footer, options, handle, result->get(), ioptions,
|
||||
do_uncompress, maybe_compressed, block_type, uncompression_dict,
|
||||
cache_options, memory_allocator, nullptr, for_compaction);
|
||||
|
||||
const Status s = block_fetcher.ReadBlockContents();
|
||||
if (!s.ok()) {
|
||||
result->reset();
|
||||
result->reset(BlocklikeTraits<TBlocklike>::Create(
|
||||
std::move(contents), global_seqno, read_amp_bytes_per_bit,
|
||||
ioptions.statistics, using_zstd));
|
||||
}
|
||||
|
||||
return s;
|
||||
@ -208,7 +235,7 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
|
||||
protected:
|
||||
static Status ReadIndexBlock(const BlockBasedTable* table,
|
||||
FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options,
|
||||
const ReadOptions& read_options, bool use_cache,
|
||||
GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<Block>* index_block);
|
||||
@ -240,6 +267,12 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
|
||||
return table_->get_rep()->index_value_is_full;
|
||||
}
|
||||
|
||||
bool cache_index_blocks() const {
|
||||
assert(table_ != nullptr);
|
||||
assert(table_->get_rep() != nullptr);
|
||||
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
|
||||
}
|
||||
|
||||
Status GetOrReadIndexBlock(bool no_io, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<Block>* index_block) const;
|
||||
@ -258,7 +291,7 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
|
||||
|
||||
Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
|
||||
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options, GetContext* get_context,
|
||||
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<Block>* index_block) {
|
||||
PERF_TIMER_GUARD(read_index_block_nanos);
|
||||
@ -273,7 +306,7 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
|
||||
const Status s = table->RetrieveBlock(
|
||||
prefetch_buffer, read_options, rep->footer.index_handle(),
|
||||
UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex,
|
||||
get_context, lookup_context);
|
||||
get_context, lookup_context, /* for_compaction */ false, use_cache);
|
||||
|
||||
return s;
|
||||
}
|
||||
@ -295,7 +328,8 @@ Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
|
||||
}
|
||||
|
||||
return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options,
|
||||
get_context, lookup_context, index_block);
|
||||
cache_index_blocks(), get_context, lookup_context,
|
||||
index_block);
|
||||
}
|
||||
|
||||
// Index that allows binary search lookup in a two-level index structure.
|
||||
@ -318,7 +352,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
|
||||
CachableEntry<Block> index_block;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s =
|
||||
ReadIndexBlock(table, prefetch_buffer, ReadOptions(),
|
||||
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
|
||||
/*get_context=*/nullptr, lookup_context, &index_block);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
@ -509,7 +543,7 @@ class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon {
|
||||
CachableEntry<Block> index_block;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s =
|
||||
ReadIndexBlock(table, prefetch_buffer, ReadOptions(),
|
||||
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
|
||||
/*get_context=*/nullptr, lookup_context, &index_block);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
@ -593,7 +627,7 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
|
||||
CachableEntry<Block> index_block;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s =
|
||||
ReadIndexBlock(table, prefetch_buffer, ReadOptions(),
|
||||
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
|
||||
/*get_context=*/nullptr, lookup_context, &index_block);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
@ -1588,7 +1622,8 @@ Status BlockBasedTable::ReadMetaBlock(FilePrefetchBuffer* prefetch_buffer,
|
||||
true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex,
|
||||
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options,
|
||||
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
|
||||
GetMemoryAllocator(rep_->table_options));
|
||||
GetMemoryAllocator(rep_->table_options), false /* for_compaction */,
|
||||
rep_->blocks_definitely_zstd_compressed);
|
||||
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(rep_->ioptions.info_log,
|
||||
@ -1605,38 +1640,6 @@ Status BlockBasedTable::ReadMetaBlock(FilePrefetchBuffer* prefetch_buffer,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
template <typename TBlocklike>
|
||||
class BlocklikeTraits;
|
||||
|
||||
template <>
|
||||
class BlocklikeTraits<BlockContents> {
|
||||
public:
|
||||
static BlockContents* Create(BlockContents&& contents,
|
||||
SequenceNumber /* global_seqno */,
|
||||
size_t /* read_amp_bytes_per_bit */,
|
||||
Statistics* /* statistics */) {
|
||||
return new BlockContents(std::move(contents));
|
||||
}
|
||||
|
||||
static uint32_t GetNumRestarts(const BlockContents& /* contents */) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class BlocklikeTraits<Block> {
|
||||
public:
|
||||
static Block* Create(BlockContents&& contents, SequenceNumber global_seqno,
|
||||
size_t read_amp_bytes_per_bit, Statistics* statistics) {
|
||||
return new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
|
||||
statistics);
|
||||
}
|
||||
|
||||
static uint32_t GetNumRestarts(const Block& block) {
|
||||
return block.NumRestarts();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename TBlocklike>
|
||||
Status BlockBasedTable::GetDataBlockFromCache(
|
||||
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
|
||||
@ -1708,7 +1711,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
std::unique_ptr<TBlocklike> block_holder(
|
||||
BlocklikeTraits<TBlocklike>::Create(
|
||||
std::move(contents), rep_->get_global_seqno(block_type),
|
||||
read_amp_bytes_per_bit, statistics)); // uncompressed block
|
||||
read_amp_bytes_per_bit, statistics,
|
||||
rep_->blocks_definitely_zstd_compressed)); // uncompressed block
|
||||
|
||||
if (block_cache != nullptr && block_holder->own_bytes() &&
|
||||
read_options.fill_cache) {
|
||||
@ -1779,11 +1783,11 @@ Status BlockBasedTable::PutDataBlockToCache(
|
||||
|
||||
block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
|
||||
std::move(uncompressed_block_contents), seq_no, read_amp_bytes_per_bit,
|
||||
statistics));
|
||||
statistics, rep_->blocks_definitely_zstd_compressed));
|
||||
} else {
|
||||
block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
|
||||
std::move(*raw_block_contents), seq_no, read_amp_bytes_per_bit,
|
||||
statistics));
|
||||
statistics, rep_->blocks_definitely_zstd_compressed));
|
||||
}
|
||||
|
||||
// Insert compressed block into compressed block cache.
|
||||
@ -1901,7 +1905,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
|
||||
return iter;
|
||||
}
|
||||
|
||||
UncompressionDict uncompression_dict;
|
||||
CachableEntry<UncompressionDict> uncompression_dict;
|
||||
if (rep_->uncompression_dict_reader) {
|
||||
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
|
||||
@ -1913,9 +1917,14 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
|
||||
}
|
||||
}
|
||||
|
||||
const UncompressionDict& dict = uncompression_dict.GetValue()
|
||||
? *uncompression_dict.GetValue()
|
||||
: UncompressionDict::GetEmptyDict();
|
||||
|
||||
CachableEntry<Block> block;
|
||||
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block,
|
||||
block_type, get_context, lookup_context, for_compaction);
|
||||
s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
|
||||
get_context, lookup_context, for_compaction,
|
||||
/* use_cache */ true);
|
||||
|
||||
if (!s.ok()) {
|
||||
assert(block.IsEmpty());
|
||||
@ -2078,8 +2087,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
GetContext* get_context) const {
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
assert(block_type == BlockType::kData);
|
||||
Status s = RetrieveBlock(nullptr, ro, handle, uncompression_dict, block,
|
||||
block_type, get_context, &lookup_data_block_context);
|
||||
block_type, get_context, &lookup_data_block_context,
|
||||
/* for_compaction */ false, /* use_cache */ true);
|
||||
if (s.IsIncomplete()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
@ -2262,15 +2273,11 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
|
||||
// handles - A vector of block handles. Some of them me be NULL handles
|
||||
// scratch - An optional contiguous buffer to read compressed blocks into
|
||||
void BlockBasedTable::MaybeLoadBlocksToCache(
|
||||
const ReadOptions& options,
|
||||
const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
const ReadOptions& options, const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<
|
||||
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
char* scratch,
|
||||
const UncompressionDict& uncompression_dict) const {
|
||||
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const {
|
||||
RandomAccessFileReader* file = rep_->file.get();
|
||||
const Footer& footer = rep_->footer;
|
||||
const ImmutableCFOptions& ioptions = rep_->ioptions;
|
||||
@ -2289,9 +2296,11 @@ void BlockBasedTable::MaybeLoadBlocksToCache(
|
||||
continue;
|
||||
}
|
||||
|
||||
(*statuses)[idx_in_batch] = RetrieveBlock(nullptr, options, handle,
|
||||
uncompression_dict, &(*results)[idx_in_batch], BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context);
|
||||
(*statuses)[idx_in_batch] =
|
||||
RetrieveBlock(nullptr, options, handle, uncompression_dict,
|
||||
&(*results)[idx_in_batch], BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
/* for_compaction */ false, /* use_cache */ true);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -2418,15 +2427,12 @@ Status BlockBasedTable::RetrieveBlock(
|
||||
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
|
||||
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
|
||||
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
||||
bool for_compaction) const {
|
||||
bool for_compaction, bool use_cache) const {
|
||||
assert(block_entry);
|
||||
assert(block_entry->IsEmpty());
|
||||
|
||||
Status s;
|
||||
if (rep_->table_options.cache_index_and_filter_blocks ||
|
||||
(block_type != BlockType::kFilter &&
|
||||
block_type != BlockType::kCompressionDictionary &&
|
||||
block_type != BlockType::kIndex)) {
|
||||
if (use_cache) {
|
||||
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle,
|
||||
uncompression_dict, block_entry,
|
||||
block_type, get_context, lookup_context,
|
||||
@ -2467,7 +2473,8 @@ Status BlockBasedTable::RetrieveBlock(
|
||||
block_type == BlockType::kData
|
||||
? rep_->table_options.read_amp_bytes_per_bit
|
||||
: 0,
|
||||
GetMemoryAllocator(rep_->table_options), for_compaction);
|
||||
GetMemoryAllocator(rep_->table_options), for_compaction,
|
||||
rep_->blocks_definitely_zstd_compressed);
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
@ -2487,14 +2494,21 @@ template Status BlockBasedTable::RetrieveBlock<BlockContents>(
|
||||
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
|
||||
CachableEntry<BlockContents>* block_entry, BlockType block_type,
|
||||
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
||||
bool for_compaction) const;
|
||||
bool for_compaction, bool use_cache) const;
|
||||
|
||||
template Status BlockBasedTable::RetrieveBlock<Block>(
|
||||
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
|
||||
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
|
||||
CachableEntry<Block>* block_entry, BlockType block_type,
|
||||
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
||||
bool for_compaction) const;
|
||||
bool for_compaction, bool use_cache) const;
|
||||
|
||||
template Status BlockBasedTable::RetrieveBlock<UncompressionDict>(
|
||||
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
|
||||
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
|
||||
CachableEntry<UncompressionDict>* block_entry, BlockType block_type,
|
||||
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
||||
bool for_compaction, bool use_cache) const;
|
||||
|
||||
BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
|
||||
const BlockBasedTable* table,
|
||||
@ -2709,11 +2723,21 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
|
||||
// Index contains the first key of the block, and it's >= target.
|
||||
// We can defer reading the block.
|
||||
is_at_first_key_from_index_ = true;
|
||||
// ResetDataIter() will invalidate block_iter_. Thus, there is no need to
|
||||
// call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound
|
||||
// as that will be done later when the data block is actually read.
|
||||
ResetDataIter();
|
||||
} else {
|
||||
// Need to use the data block.
|
||||
if (!same_block) {
|
||||
InitDataBlock();
|
||||
} else {
|
||||
// When the user does a reseek, the iterate_upper_bound might have
|
||||
// changed. CheckDataBlockWithinUpperBound() needs to be called
|
||||
// explicitly if the reseek ends up in the same data block.
|
||||
// If the reseek ends up in a different block, InitDataBlock() will do
|
||||
// the iterator upper bound check.
|
||||
CheckDataBlockWithinUpperBound();
|
||||
}
|
||||
|
||||
if (target) {
|
||||
@ -2724,7 +2748,6 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
|
||||
FindKeyForward();
|
||||
}
|
||||
|
||||
CheckDataBlockWithinUpperBound();
|
||||
CheckOutOfBound();
|
||||
|
||||
if (target) {
|
||||
@ -3389,7 +3412,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
|
||||
sst_file_range.end());
|
||||
|
||||
UncompressionDict uncompression_dict;
|
||||
CachableEntry<UncompressionDict> uncompression_dict;
|
||||
Status uncompression_dict_status;
|
||||
if (rep_->uncompression_dict_reader) {
|
||||
uncompression_dict_status =
|
||||
@ -3399,6 +3422,10 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
&uncompression_dict);
|
||||
}
|
||||
|
||||
const UncompressionDict& dict = uncompression_dict.GetValue()
|
||||
? *uncompression_dict.GetValue()
|
||||
: UncompressionDict::GetEmptyDict();
|
||||
|
||||
size_t total_len = 0;
|
||||
ReadOptions ro = read_options;
|
||||
ro.read_tier = kBlockCacheTier;
|
||||
@ -3442,8 +3469,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
}
|
||||
offset = v.handle.offset();
|
||||
BlockHandle handle = v.handle;
|
||||
Status s = GetDataBlockFromCache(ro, handle, uncompression_dict,
|
||||
&(results.back()), BlockType::kData, miter->get_context);
|
||||
Status s = GetDataBlockFromCache(ro, handle, dict, &(results.back()),
|
||||
BlockType::kData, miter->get_context);
|
||||
if (s.ok() && !results.back().IsEmpty()) {
|
||||
// Found it in the cache. Add NULL handle to indicate there is
|
||||
// nothing to read from disk
|
||||
@ -3474,9 +3501,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
block_buf.reset(scratch);
|
||||
}
|
||||
}
|
||||
MaybeLoadBlocksToCache(read_options,
|
||||
&data_block_range, &block_handles, &statuses, &results,
|
||||
scratch, uncompression_dict);
|
||||
MaybeLoadBlocksToCache(read_options, &data_block_range, &block_handles,
|
||||
&statuses, &results, scratch, dict);
|
||||
}
|
||||
}
|
||||
|
||||
@ -4089,7 +4115,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
||||
|
||||
// Output compression dictionary
|
||||
if (rep_->uncompression_dict_reader) {
|
||||
UncompressionDict uncompression_dict;
|
||||
CachableEntry<UncompressionDict> uncompression_dict;
|
||||
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
|
||||
nullptr /* prefetch_buffer */, false /* no_io */,
|
||||
nullptr /* get_context */, nullptr /* lookup_context */,
|
||||
@ -4098,7 +4124,9 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
||||
return s;
|
||||
}
|
||||
|
||||
const Slice& raw_dict = uncompression_dict.GetRawDict();
|
||||
assert(uncompression_dict.GetValue());
|
||||
|
||||
const Slice& raw_dict = uncompression_dict.GetValue()->GetRawDict();
|
||||
out_file->Append(
|
||||
"Compression Dictionary:\n"
|
||||
"--------------------------------------\n");
|
||||
|
@ -299,7 +299,7 @@ class BlockBasedTable : public TableReader {
|
||||
CachableEntry<TBlocklike>* block_entry,
|
||||
BlockType block_type, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
bool for_compaction = false) const;
|
||||
bool for_compaction, bool use_cache) const;
|
||||
|
||||
Status GetDataBlockFromCache(
|
||||
const ReadOptions& ro, const BlockHandle& handle,
|
||||
@ -309,10 +309,10 @@ class BlockBasedTable : public TableReader {
|
||||
|
||||
void MaybeLoadBlocksToCache(
|
||||
const ReadOptions& options, const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<
|
||||
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>*
|
||||
results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const;
|
||||
|
||||
// Get the iterator from the index reader.
|
||||
@ -602,8 +602,7 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
|
||||
const SliceTransform* prefix_extractor,
|
||||
BlockType block_type, TableReaderCaller caller,
|
||||
size_t compaction_readahead_size = 0)
|
||||
: InternalIteratorBase<TValue>(false),
|
||||
table_(table),
|
||||
: table_(table),
|
||||
read_options_(read_options),
|
||||
icomp_(icomp),
|
||||
user_comparator_(icomp.user_comparator()),
|
||||
|
@ -13,7 +13,7 @@ namespace rocksdb {
|
||||
template <typename TBlocklike>
|
||||
Status FilterBlockReaderCommon<TBlocklike>::ReadFilterBlock(
|
||||
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options, GetContext* get_context,
|
||||
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<TBlocklike>* filter_block) {
|
||||
PERF_TIMER_GUARD(read_filter_block_nanos);
|
||||
@ -28,7 +28,8 @@ Status FilterBlockReaderCommon<TBlocklike>::ReadFilterBlock(
|
||||
const Status s =
|
||||
table->RetrieveBlock(prefetch_buffer, read_options, rep->filter_handle,
|
||||
UncompressionDict::GetEmptyDict(), filter_block,
|
||||
BlockType::kFilter, get_context, lookup_context);
|
||||
BlockType::kFilter, get_context, lookup_context,
|
||||
/* for_compaction */ false, use_cache);
|
||||
|
||||
return s;
|
||||
}
|
||||
@ -52,6 +53,14 @@ bool FilterBlockReaderCommon<TBlocklike>::whole_key_filtering() const {
|
||||
return table_->get_rep()->whole_key_filtering;
|
||||
}
|
||||
|
||||
template <typename TBlocklike>
|
||||
bool FilterBlockReaderCommon<TBlocklike>::cache_filter_blocks() const {
|
||||
assert(table_);
|
||||
assert(table_->get_rep());
|
||||
|
||||
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
|
||||
}
|
||||
|
||||
template <typename TBlocklike>
|
||||
Status FilterBlockReaderCommon<TBlocklike>::GetOrReadFilterBlock(
|
||||
bool no_io, GetContext* get_context,
|
||||
@ -70,7 +79,8 @@ Status FilterBlockReaderCommon<TBlocklike>::GetOrReadFilterBlock(
|
||||
}
|
||||
|
||||
return ReadFilterBlock(table_, nullptr /* prefetch_buffer */, read_options,
|
||||
get_context, lookup_context, filter_block);
|
||||
cache_filter_blocks(), get_context, lookup_context,
|
||||
filter_block);
|
||||
}
|
||||
|
||||
template <typename TBlocklike>
|
||||
|
@ -31,7 +31,7 @@ class FilterBlockReaderCommon : public FilterBlockReader {
|
||||
protected:
|
||||
static Status ReadFilterBlock(const BlockBasedTable* table,
|
||||
FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options,
|
||||
const ReadOptions& read_options, bool use_cache,
|
||||
GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<TBlocklike>* filter_block);
|
||||
@ -39,6 +39,7 @@ class FilterBlockReaderCommon : public FilterBlockReader {
|
||||
const BlockBasedTable* table() const { return table_; }
|
||||
const SliceTransform* table_prefix_extractor() const;
|
||||
bool whole_key_filtering() const;
|
||||
bool cache_filter_blocks() const;
|
||||
|
||||
Status GetOrReadFilterBlock(bool no_io, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
|
@ -134,8 +134,8 @@ std::unique_ptr<FilterBlockReader> FullFilterBlockReader::Create(
|
||||
CachableEntry<BlockContents> filter_block;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(),
|
||||
nullptr /* get_context */, lookup_context,
|
||||
&filter_block);
|
||||
use_cache, nullptr /* get_context */,
|
||||
lookup_context, &filter_block);
|
||||
if (!s.ok()) {
|
||||
return std::unique_ptr<FilterBlockReader>();
|
||||
}
|
||||
|
@ -133,8 +133,8 @@ std::unique_ptr<FilterBlockReader> PartitionedFilterBlockReader::Create(
|
||||
CachableEntry<Block> filter_block;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(),
|
||||
nullptr /* get_context */, lookup_context,
|
||||
&filter_block);
|
||||
use_cache, nullptr /* get_context */,
|
||||
lookup_context, &filter_block);
|
||||
if (!s.ok()) {
|
||||
return std::unique_ptr<FilterBlockReader>();
|
||||
}
|
||||
@ -192,7 +192,12 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
|
||||
index_key_includes_seq(), index_value_is_full());
|
||||
iter.Seek(entry);
|
||||
if (UNLIKELY(!iter.Valid())) {
|
||||
return BlockHandle(0, 0);
|
||||
// entry is larger than all the keys. However its prefix might still be
|
||||
// present in the last partition. If this is called by PrefixMayMatch this
|
||||
// is necessary for correct behavior. Otherwise it is unnecessary but safe.
|
||||
// Assuming this is an unlikely case for full key search, the performance
|
||||
// overhead should be negligible.
|
||||
iter.SeekToLast();
|
||||
}
|
||||
assert(iter.Valid());
|
||||
BlockHandle fltr_blk_handle = iter.value().handle;
|
||||
@ -226,7 +231,8 @@ Status PartitionedFilterBlockReader::GetFilterPartitionBlock(
|
||||
const Status s =
|
||||
table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle,
|
||||
UncompressionDict::GetEmptyDict(), filter_block,
|
||||
BlockType::kFilter, get_context, lookup_context);
|
||||
BlockType::kFilter, get_context, lookup_context,
|
||||
/* for_compaction */ false, /* use_cache */ true);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
@ -327,7 +327,7 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
|
||||
NewBuilder(pib.get(), prefix_extractor.get()));
|
||||
const std::string pkeys[3] = {"p-key1", "p-key2", "p-key3"};
|
||||
const std::string pkeys[3] = {"p-key10", "p-key20", "p-key30"};
|
||||
builder->Add(pkeys[0]);
|
||||
CutABlock(pib.get(), pkeys[0], pkeys[1]);
|
||||
builder->Add(pkeys[1]);
|
||||
@ -344,6 +344,16 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
||||
/*no_io=*/false, &ikey_slice, /*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr));
|
||||
}
|
||||
// Non-existent keys but with the same prefix
|
||||
const std::string pnonkeys[4] = {"p-key9", "p-key11", "p-key21", "p-key31"};
|
||||
for (auto key : pnonkeys) {
|
||||
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
ASSERT_TRUE(reader->PrefixMayMatch(
|
||||
prefix_extractor->Transform(key), prefix_extractor.get(), kNotValid,
|
||||
/*no_io=*/false, &ikey_slice, /*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) {
|
||||
|
@ -21,36 +21,36 @@ Status UncompressionDictReader::Create(
|
||||
assert(!pin || prefetch);
|
||||
assert(uncompression_dict_reader);
|
||||
|
||||
CachableEntry<BlockContents> uncompression_dict_block;
|
||||
CachableEntry<UncompressionDict> uncompression_dict;
|
||||
if (prefetch || !use_cache) {
|
||||
const Status s = ReadUncompressionDictionaryBlock(
|
||||
table, prefetch_buffer, ReadOptions(), nullptr /* get_context */,
|
||||
lookup_context, &uncompression_dict_block);
|
||||
const Status s = ReadUncompressionDictionary(
|
||||
table, prefetch_buffer, ReadOptions(), use_cache,
|
||||
nullptr /* get_context */, lookup_context, &uncompression_dict);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (use_cache && !pin) {
|
||||
uncompression_dict_block.Reset();
|
||||
uncompression_dict.Reset();
|
||||
}
|
||||
}
|
||||
|
||||
uncompression_dict_reader->reset(
|
||||
new UncompressionDictReader(table, std::move(uncompression_dict_block)));
|
||||
new UncompressionDictReader(table, std::move(uncompression_dict)));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status UncompressionDictReader::ReadUncompressionDictionaryBlock(
|
||||
Status UncompressionDictReader::ReadUncompressionDictionary(
|
||||
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options, GetContext* get_context,
|
||||
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<BlockContents>* uncompression_dict_block) {
|
||||
CachableEntry<UncompressionDict>* uncompression_dict) {
|
||||
// TODO: add perf counter for compression dictionary read time
|
||||
|
||||
assert(table);
|
||||
assert(uncompression_dict_block);
|
||||
assert(uncompression_dict_block->IsEmpty());
|
||||
assert(uncompression_dict);
|
||||
assert(uncompression_dict->IsEmpty());
|
||||
|
||||
const BlockBasedTable::Rep* const rep = table->get_rep();
|
||||
assert(rep);
|
||||
@ -58,8 +58,9 @@ Status UncompressionDictReader::ReadUncompressionDictionaryBlock(
|
||||
|
||||
const Status s = table->RetrieveBlock(
|
||||
prefetch_buffer, read_options, rep->compression_dict_handle,
|
||||
UncompressionDict::GetEmptyDict(), uncompression_dict_block,
|
||||
BlockType::kCompressionDictionary, get_context, lookup_context);
|
||||
UncompressionDict::GetEmptyDict(), uncompression_dict,
|
||||
BlockType::kCompressionDictionary, get_context, lookup_context,
|
||||
/* for_compaction */ false, use_cache);
|
||||
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(
|
||||
@ -72,15 +73,14 @@ Status UncompressionDictReader::ReadUncompressionDictionaryBlock(
|
||||
return s;
|
||||
}
|
||||
|
||||
Status UncompressionDictReader::GetOrReadUncompressionDictionaryBlock(
|
||||
Status UncompressionDictReader::GetOrReadUncompressionDictionary(
|
||||
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<BlockContents>* uncompression_dict_block) const {
|
||||
assert(uncompression_dict_block);
|
||||
CachableEntry<UncompressionDict>* uncompression_dict) const {
|
||||
assert(uncompression_dict);
|
||||
|
||||
if (!uncompression_dict_block_.IsEmpty()) {
|
||||
uncompression_dict_block->SetUnownedValue(
|
||||
uncompression_dict_block_.GetValue());
|
||||
if (!uncompression_dict_.IsEmpty()) {
|
||||
uncompression_dict->SetUnownedValue(uncompression_dict_.GetValue());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -89,42 +89,17 @@ Status UncompressionDictReader::GetOrReadUncompressionDictionaryBlock(
|
||||
read_options.read_tier = kBlockCacheTier;
|
||||
}
|
||||
|
||||
return ReadUncompressionDictionaryBlock(table_, prefetch_buffer, read_options,
|
||||
get_context, lookup_context,
|
||||
uncompression_dict_block);
|
||||
}
|
||||
|
||||
Status UncompressionDictReader::GetOrReadUncompressionDictionary(
|
||||
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
UncompressionDict* uncompression_dict) const {
|
||||
CachableEntry<BlockContents> uncompression_dict_block;
|
||||
const Status s = GetOrReadUncompressionDictionaryBlock(
|
||||
prefetch_buffer, no_io, get_context, lookup_context,
|
||||
&uncompression_dict_block);
|
||||
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
assert(uncompression_dict);
|
||||
assert(table_);
|
||||
assert(table_->get_rep());
|
||||
|
||||
UncompressionDict dict(uncompression_dict_block.GetValue()->data,
|
||||
table_->get_rep()->blocks_definitely_zstd_compressed);
|
||||
*uncompression_dict = std::move(dict);
|
||||
uncompression_dict_block.TransferTo(uncompression_dict);
|
||||
|
||||
return Status::OK();
|
||||
return ReadUncompressionDictionary(table_, prefetch_buffer, read_options,
|
||||
cache_dictionary_blocks(), get_context,
|
||||
lookup_context, uncompression_dict);
|
||||
}
|
||||
|
||||
size_t UncompressionDictReader::ApproximateMemoryUsage() const {
|
||||
assert(!uncompression_dict_block_.GetOwnValue() ||
|
||||
uncompression_dict_block_.GetValue() != nullptr);
|
||||
size_t usage = uncompression_dict_block_.GetOwnValue()
|
||||
? uncompression_dict_block_.GetValue()->ApproximateMemoryUsage()
|
||||
: 0;
|
||||
assert(!uncompression_dict_.GetOwnValue() ||
|
||||
uncompression_dict_.GetValue() != nullptr);
|
||||
size_t usage = uncompression_dict_.GetOwnValue()
|
||||
? uncompression_dict_.GetValue()->ApproximateMemoryUsage()
|
||||
: 0;
|
||||
|
||||
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
|
||||
usage += malloc_usable_size(const_cast<UncompressionDictReader*>(this));
|
||||
@ -135,4 +110,11 @@ size_t UncompressionDictReader::ApproximateMemoryUsage() const {
|
||||
return usage;
|
||||
}
|
||||
|
||||
bool UncompressionDictReader::cache_dictionary_blocks() const {
|
||||
assert(table_);
|
||||
assert(table_->get_rep());
|
||||
|
||||
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -33,32 +33,27 @@ class UncompressionDictReader {
|
||||
Status GetOrReadUncompressionDictionary(
|
||||
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
UncompressionDict* uncompression_dict) const;
|
||||
CachableEntry<UncompressionDict>* uncompression_dict) const;
|
||||
|
||||
size_t ApproximateMemoryUsage() const;
|
||||
|
||||
private:
|
||||
UncompressionDictReader(
|
||||
const BlockBasedTable* t,
|
||||
CachableEntry<BlockContents>&& uncompression_dict_block)
|
||||
: table_(t),
|
||||
uncompression_dict_block_(std::move(uncompression_dict_block)) {
|
||||
UncompressionDictReader(const BlockBasedTable* t,
|
||||
CachableEntry<UncompressionDict>&& uncompression_dict)
|
||||
: table_(t), uncompression_dict_(std::move(uncompression_dict)) {
|
||||
assert(table_);
|
||||
}
|
||||
|
||||
static Status ReadUncompressionDictionaryBlock(
|
||||
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<BlockContents>* uncompression_dict_block);
|
||||
bool cache_dictionary_blocks() const;
|
||||
|
||||
Status GetOrReadUncompressionDictionaryBlock(
|
||||
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
|
||||
static Status ReadUncompressionDictionary(
|
||||
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
|
||||
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<BlockContents>* uncompression_dict_block) const;
|
||||
CachableEntry<UncompressionDict>* uncompression_dict);
|
||||
|
||||
const BlockBasedTable* table_;
|
||||
CachableEntry<BlockContents> uncompression_dict_block_;
|
||||
CachableEntry<UncompressionDict> uncompression_dict_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -25,8 +25,8 @@ struct IterateResult {
|
||||
template <class TValue>
|
||||
class InternalIteratorBase : public Cleanable {
|
||||
public:
|
||||
InternalIteratorBase() : is_mutable_(true) {}
|
||||
InternalIteratorBase(bool _is_mutable) : is_mutable_(_is_mutable) {}
|
||||
InternalIteratorBase() {}
|
||||
|
||||
virtual ~InternalIteratorBase() {}
|
||||
|
||||
// An iterator is either positioned at a key/value pair, or
|
||||
@ -144,7 +144,6 @@ class InternalIteratorBase : public Cleanable {
|
||||
virtual Status GetProperty(std::string /*prop_name*/, std::string* /*prop*/) {
|
||||
return Status::NotSupported("");
|
||||
}
|
||||
bool is_mutable() const { return is_mutable_; }
|
||||
|
||||
protected:
|
||||
void SeekForPrevImpl(const Slice& target, const Comparator* cmp) {
|
||||
|
@ -73,7 +73,6 @@ class IteratorWrapperBase {
|
||||
}
|
||||
void Prev() { assert(iter_); iter_->Prev(); Update(); }
|
||||
void Seek(const Slice& k) {
|
||||
TEST_SYNC_POINT("IteratorWrapper::Seek:0");
|
||||
assert(iter_);
|
||||
iter_->Seek(k);
|
||||
Update();
|
||||
|
@ -127,29 +127,14 @@ class MergingIterator : public InternalIterator {
|
||||
}
|
||||
|
||||
void Seek(const Slice& target) override {
|
||||
bool is_increasing_reseek = false;
|
||||
if (current_ != nullptr && direction_ == kForward && status_.ok() &&
|
||||
comparator_->Compare(target, key()) >= 0) {
|
||||
is_increasing_reseek = true;
|
||||
}
|
||||
ClearHeaps();
|
||||
status_ = Status::OK();
|
||||
for (auto& child : children_) {
|
||||
// If upper bound never changes, we can skip Seek() for
|
||||
// the !Valid() case too, but people do hack the code to change
|
||||
// upper bound between Seek(), so it's not a good idea to break
|
||||
// the API.
|
||||
// If DBIter is used on top of merging iterator, we probably
|
||||
// can skip mutable child iterators if they are invalid too,
|
||||
// but it's a less clean API. We can optimize for it later if
|
||||
// needed.
|
||||
if (!is_increasing_reseek || !child.Valid() ||
|
||||
comparator_->Compare(target, child.key()) > 0 ||
|
||||
child.iter()->is_mutable()) {
|
||||
{
|
||||
PERF_TIMER_GUARD(seek_child_seek_time);
|
||||
child.Seek(target);
|
||||
PERF_COUNTER_ADD(seek_child_seek_count, 1);
|
||||
}
|
||||
PERF_COUNTER_ADD(seek_child_seek_count, 1);
|
||||
|
||||
if (child.Valid()) {
|
||||
assert(child.status().ok());
|
||||
|
@ -21,7 +21,6 @@
|
||||
#include <string>
|
||||
|
||||
#include "memory/memory_allocator.h"
|
||||
#include "rocksdb/cleanable.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "util/coding.h"
|
||||
@ -217,14 +216,19 @@ struct CompressionDict {
|
||||
|
||||
// Holds dictionary and related data, like ZSTD's digested uncompression
|
||||
// dictionary.
|
||||
struct UncompressionDict : public Cleanable {
|
||||
// Block containing the data for the compression dictionary. It is non-empty
|
||||
// only if the constructor that takes a string parameter is used.
|
||||
struct UncompressionDict {
|
||||
// Block containing the data for the compression dictionary in case the
|
||||
// constructor that takes a string parameter is used.
|
||||
std::string dict_;
|
||||
|
||||
// Slice pointing to the compression dictionary data. Points to
|
||||
// dict_ if the string constructor is used. In the case of the Slice
|
||||
// constructor, it is a copy of the Slice passed by the caller.
|
||||
// Block containing the data for the compression dictionary in case the
|
||||
// constructor that takes a Slice parameter is used and the passed in
|
||||
// CacheAllocationPtr is not nullptr.
|
||||
CacheAllocationPtr allocation_;
|
||||
|
||||
// Slice pointing to the compression dictionary data. Can point to
|
||||
// dict_, allocation_, or some other memory location, depending on how
|
||||
// the object was constructed.
|
||||
Slice slice_;
|
||||
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
@ -232,18 +236,12 @@ struct UncompressionDict : public Cleanable {
|
||||
ZSTD_DDict* zstd_ddict_ = nullptr;
|
||||
#endif // ROCKSDB_ZSTD_DDICT
|
||||
|
||||
// Slice constructor: it is the caller's responsibility to either
|
||||
// a) make sure slice remains valid throughout the lifecycle of this object OR
|
||||
// b) transfer the management of the underlying resource (e.g. cache handle)
|
||||
// to this object, in which case UncompressionDict is self-contained, and the
|
||||
// resource is guaranteed to be released (via the cleanup logic in Cleanable)
|
||||
// when UncompressionDict is destroyed.
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
UncompressionDict(Slice slice, bool using_zstd)
|
||||
UncompressionDict(std::string dict, bool using_zstd)
|
||||
#else // ROCKSDB_ZSTD_DDICT
|
||||
UncompressionDict(Slice slice, bool /*using_zstd*/)
|
||||
UncompressionDict(std::string dict, bool /* using_zstd */)
|
||||
#endif // ROCKSDB_ZSTD_DDICT
|
||||
: slice_(std::move(slice)) {
|
||||
: dict_(std::move(dict)), slice_(dict_) {
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
if (!slice_.empty() && using_zstd) {
|
||||
zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
|
||||
@ -252,14 +250,25 @@ struct UncompressionDict : public Cleanable {
|
||||
#endif // ROCKSDB_ZSTD_DDICT
|
||||
}
|
||||
|
||||
// String constructor: results in a self-contained UncompressionDict.
|
||||
UncompressionDict(std::string dict, bool using_zstd)
|
||||
: UncompressionDict(Slice(dict), using_zstd) {
|
||||
dict_ = std::move(dict);
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
|
||||
bool using_zstd)
|
||||
#else // ROCKSDB_ZSTD_DDICT
|
||||
UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
|
||||
bool /* using_zstd */)
|
||||
#endif // ROCKSDB_ZSTD_DDICT
|
||||
: allocation_(std::move(allocation)), slice_(std::move(slice)) {
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
if (!slice_.empty() && using_zstd) {
|
||||
zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
|
||||
assert(zstd_ddict_ != nullptr);
|
||||
}
|
||||
#endif // ROCKSDB_ZSTD_DDICT
|
||||
}
|
||||
|
||||
UncompressionDict(UncompressionDict&& rhs)
|
||||
: dict_(std::move(rhs.dict_)),
|
||||
allocation_(std::move(rhs.allocation_)),
|
||||
slice_(std::move(rhs.slice_))
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
,
|
||||
@ -288,6 +297,7 @@ struct UncompressionDict : public Cleanable {
|
||||
}
|
||||
|
||||
dict_ = std::move(rhs.dict_);
|
||||
allocation_ = std::move(rhs.allocation_);
|
||||
slice_ = std::move(rhs.slice_);
|
||||
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
@ -298,6 +308,12 @@ struct UncompressionDict : public Cleanable {
|
||||
return *this;
|
||||
}
|
||||
|
||||
// The object is self-contained if the string constructor is used, or the
|
||||
// Slice constructor is invoked with a non-null allocation. Otherwise, it
|
||||
// is the caller's responsibility to ensure that the underlying storage
|
||||
// outlives this object.
|
||||
bool own_bytes() const { return !dict_.empty() || allocation_; }
|
||||
|
||||
const Slice& GetRawDict() const { return slice_; }
|
||||
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
@ -310,12 +326,19 @@ struct UncompressionDict : public Cleanable {
|
||||
}
|
||||
|
||||
size_t ApproximateMemoryUsage() const {
|
||||
size_t usage = 0;
|
||||
usage += sizeof(struct UncompressionDict);
|
||||
size_t usage = sizeof(struct UncompressionDict);
|
||||
usage += dict_.size();
|
||||
if (allocation_) {
|
||||
auto allocator = allocation_.get_deleter().allocator;
|
||||
if (allocator) {
|
||||
usage += allocator->UsableSize(allocation_.get(), slice_.size());
|
||||
} else {
|
||||
usage += slice_.size();
|
||||
}
|
||||
}
|
||||
#ifdef ROCKSDB_ZSTD_DDICT
|
||||
usage += ZSTD_sizeof_DDict(zstd_ddict_);
|
||||
#endif // ROCKSDB_ZSTD_DDICT
|
||||
usage += dict_.size();
|
||||
return usage;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user