Compare commits

...

19 Commits

Author SHA1 Message Date
Siying Dong
06ac8ba7f9
Fix a backporting error in options_settable_test.cc
options_settable_test.cc has unit tests backported after compression_opt changed. Fix it.
2020-04-16 11:17:32 -07:00
akankshamahajan
00b4ed2f90 Add a feature to HISTORY.md 2020-04-15 16:45:01 -07:00
Zhichao Cao
2b7ee8e9c6 Add NewFileChecksumGenCrc32cFactory to file checksum (#6688)
Summary:
Add NewFileChecksumGenCrc32cFactory to file checksum public interface such that applications can use the build in crc32 checksum factory.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6688

Test Plan: pass make asan_check

Reviewed By: riversand963

Differential Revision: D21006859

Pulled By: zhichao-cao

fbshipit-source-id: ea8a45196a8b77c310728ab05f6cc0f49f3baef0
2020-04-15 16:23:21 -07:00
anand76
79f39ff931 Update HISTORY.md 2020-04-15 15:32:28 -07:00
anand76
b54712ed7d Log CompactOnDeletionCollectorFactory parameters on DB open (#6686)
Summary:
Log it in the info log to help in troubleshooting. It is logged as follows -
```
2020/04/10-10:51:39.886662 7ffff7fef340                   Options.table_properties_collectors: CompactOnDeletionCollector (Sliding window size = 100 Deletion trigger = 90);
```

Tests:
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6686

Reviewed By: ltamasi

Differential Revision: D21002442

Pulled By: anand1976

fbshipit-source-id: 7adf0dbae7f1febcb00ce61fea5097118ede5c6a
2020-04-14 16:01:38 -07:00
Akanksha Mahajan
272d006037 Report kFilesMarkedForCompaction for delete triggered compactions (#6680)
Summary:
Summary : Set manual_compaction false in case of DeleteTriggeredCompaction object so that kFilesMarkedForComapaction can be reported.
          Added a DeletionTriggeredUniversalCompactionMarking test case for Deletion Triggered compaction in case of Universal Compaction.

Test Plan : make check -j64
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6680

Test Plan: make check -j64

Reviewed By: anand1976

Differential Revision: D20945946

Pulled By: akankshamahajan15

fbshipit-source-id: af84e417bd7127652aaae9143c560d1ab3815d25
2020-04-14 13:21:40 -07:00
Zhichao Cao
51b25da6f8 Updated to RocksDB 6.9.1 2020-04-09 14:54:33 -07:00
Cheng Chang
8f2ef73d4b Add two more optimization improvements to HISTORY (#6679)
Summary:
Although these optimizations are not user facing, still feel it's valuable to call out in HISTORY.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6679

Test Plan: no need

Reviewed By: zhichao-cao

Differential Revision: D20945916

Pulled By: cheng-chang

fbshipit-source-id: f3e790c07f3bcc4a8a74246c4fa232800ddd4438
2020-04-09 14:42:56 -07:00
Cheng Chang
a964d457a9 Fix result slice's address for direct io read (#6672)
Summary:
When aligned_buf is provided, the result slice's starting address should take offset advance into account.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6672

Test Plan: make check

Reviewed By: anand1976

Differential Revision: D20934198

Pulled By: cheng-chang

fbshipit-source-id: c3475c9c132b92c50d8c7c399fca2e9e76870803
2020-04-09 14:37:57 -07:00
Yi Wu
0279d2711f Fix wrong key being read on ingested file with global seqno and delta encoding (#6669)
Summary:
On reading an ingested SST file, `DataBlockIter` will replace seqno encoded in a key with global seqno. However, if the original seqno was part of the prefix used for the next key, the global seqno is by mistake used as part of the prefix to construct the next key, causing wrong result being returned. Although at this point it is only software error while data in the file is not corrupted, the issue can further cause compaction output out of order and corrupted result when the ingested SST participated in compaction. Fixing the issue by save the actual seqno and restore it before the key being used as prefix to construct next key.

The unit test is by Little-Wallace from https://github.com/facebook/rocksdb/issues/6666. Fixing https://github.com/facebook/rocksdb/issues/6666.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6669

Test Plan:
New unit test

Signed-off-by: Yi Wu <yiwu@pingcap.com>

Reviewed By: cheng-chang

Differential Revision: D20931808

Pulled By: ajkr

fbshipit-source-id: f01959c35d6a493954dca981663766c7a5a9e8ab
2020-04-09 14:37:23 -07:00
sdong
20bc5166b3 Fix memory corruption caused by new test in options_settable_test (#6676)
Summary:
https://github.com/facebook/rocksdb/pull/6668 added some new test code but it has a risk of memory corruption. Fix it
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6676

Test Plan: Run the test under ASAN and see it passes.

Reviewed By: ajkr

Differential Revision: D20937108

fbshipit-source-id: 22cc96bb02030df0a37a02e67a2cc37ca31ba22d
2020-04-09 13:27:21 -07:00
sdong
cc0b5a4c14 compression related options are not copied back from MutableCFOptions… (#6668)
Summary:
… to CFOptions
https://github.com/facebook/rocksdb/pull/6615 made several compression related options dynamically changeable. They are moved to MutableCFOptions. However, they are not copied back to ColumnFamilyOptions, so the changed values are not written to option files and for some other uses. Fix it by copying them back.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6668

Test Plan: Add a unit test to make sure that when a MutableCFOptions is converted to CFOptions and back to MutableCFOptions, they stay the same. This test would fail without the fix.

Reviewed By: ajkr

Differential Revision: D20923999

fbshipit-source-id: c3bccd6923b00d677764e2269bed6a95ad7ed780
2020-04-08 17:32:33 -07:00
Andrew Gallagher
145d48a02e Fix jemalloc forward declarations (#6613)
Summary:
Add `nothrow` attribute to match declarations in jemalloc.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6613

Reviewed By: igorsugak

Differential Revision: D20749490

fbshipit-source-id: 9ac8df27f7b4268f27b32b130c23ce8a1f772b3a
2020-04-07 16:24:40 -07:00
Zhichao Cao
b3f017fc31 Fix the multi-thread Manifest write dependency in error_handler_fs_test (#6637)
Summary:
In CompactionManifestWriteRetryableError in error_handler_fs_test, the manifest write of flush should pass with no fs error. After flush, fs is set to error status and the manifest write of compaction should fail due to the IO Error. Currently, the manifest write of flush is not synced with the compaction in order, which might cause manifest write fails, which will cause test failure. Fixed by adding the LoadDependency of sync-point after flush and before compaction.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6637

Test Plan: pass error_hanlder_fs_tes. Pass make asan_check

Reviewed By: anand1976

Differential Revision: D20826969

Pulled By: zhichao-cao

fbshipit-source-id: fb2e702caa19bd63c82570320536b7acda870ff1
2020-04-06 16:45:53 -07:00
Zhichao Cao
3eb30d64d3 Remove redundant in HISTORY (#6627)
Summary:
Remove redundant description in HISTORY

no code change
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6627

Test Plan: make check

Reviewed By: anand1976

Differential Revision: D20797269

Pulled By: zhichao-cao

fbshipit-source-id: dee4c9a22f6d241c985f250c0f11bfaa9198f4c1
2020-04-02 16:44:49 -07:00
sdong
06c2935d83 Fix a bug that crashes the service when write buffer manager fails to insert to block cache (#6619)
Summary:
https://github.com/facebook/rocksdb/issues/6247 reports that when write buffer manager fails to insert the dummy entry to block cache, null pointer is still stored and used to release the handle and cause corruption. Fix the bug by not releasing it with null handle.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6619

Test Plan: Add a unit test that fails without the fix.

Reviewed By: ajkr

Differential Revision: D20776769

fbshipit-source-id: 4127fbd9f295a0a3e45774746ffcd91f939f6287
2020-04-01 11:38:30 -07:00
Levi Tamasi
b3621aa555 Revert the recent cache deleter change (#6620)
Summary:
Revert "Use function objects as deleters in the block cache (https://github.com/facebook/rocksdb/issues/6545)"

    This reverts commit 6301dbe7a7.

    Revert "Call out the cache deleter related interface change in HISTORY.md (https://github.com/facebook/rocksdb/issues/6606)"

    This reverts commit 3a35542f86.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6620

Test Plan: `make check`

Reviewed By: zhichao-cao

Differential Revision: D20773311

Pulled By: ltamasi

fbshipit-source-id: 7637a761f718f323ef0e7da959462e8fb06e7a2b
2020-03-31 16:21:36 -07:00
sdong
a8c6000d86 Make options.bottommost_compression, compression_opts and bottommost_compression_opts dynamically changeable. (#6615)
Summary:
These three options should be made dynamically changeable. Simply add them to MutableCFOptions and made the change.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6615

Test Plan: Add a unit test to make sure that SetOptions() can change the options.

Reviewed By: riversand963

Differential Revision: D20755951

fbshipit-source-id: 8165f4fd7a7a665cc7fb049698935022a5d2e7ff
2020-03-31 12:49:12 -07:00
Yanqin Jin
b490f3d753 Use flush time for the props.creation_time for FIFO compaction (#6612)
Summary:
For FIFO compaction, we use flush time instead of oldest key time as the
creation time. This is to prevent FIFO compaction dropping files whose oldest
key time is older than TTL but which has newer keys than TTL.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6612

Test Plan: make check

Reviewed By: siying

Differential Revision: D20748217

Pulled By: riversand963

fbshipit-source-id: 3f7b00a847020760537cdddd12f6fe039e5bc663
2020-03-31 12:48:42 -07:00
52 changed files with 686 additions and 376 deletions

View File

@ -1,25 +1,44 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications.
### New Features
* Log CompactOnDeletionCollectorFactory window_size and deletion_trigger parameters in the info log file for troubleshooting purposes.
* Report kFilesMarkedForCompaction for delete triggered compactions in case of Universal and Levelled Compactions.
## 6.9.1 (04/09/2020)
### Bug Fixes
* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced.
* Fix a bug when making options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts dynamically changeable: the modified values are not written to option files or returned back to users when being queried.
## 6.9.0 (03/29/2020)
### Behavior changes
* Since RocksDB 6.8, ttl-based FIFO compaction can drop a file whose oldest key becomes older than options.ttl while others have not. This fix reverts this and makes ttl-based FIFO compaction use the file's flush time as the criterion. This fix also requires that max_open_files = -1 and compaction_options_fifo.allow_compaction = false to function properly.
### Public API Change
* Fix spelling so that API now has correctly spelled transaction state name `COMMITTED`, while the old misspelled `COMMITED` is still available as an alias.
* Updated default format_version in BlockBasedTableOptions from 2 to 4. SST files generated with the new default can be read by RocksDB versions 5.16 and newer, and use more efficient encoding of keys in index blocks.
* `Cache::Insert` now expects clients to pass in function objects implementing the `Cache::Deleter` interface as deleters instead of plain function pointers.
* A new parameter `CreateBackupOptions` is added to both `BackupEngine::CreateNewBackup` and `BackupEngine::CreateNewBackupWithMetadata`, you can decrease CPU priority of `BackupEngine`'s background threads by setting `decrease_background_thread_cpu_priority` and `background_thread_cpu_priority` in `CreateBackupOptions`.
* Updated the public API of SST file checksum. Introduce the FileChecksumGenFactory to create the FileChecksumGenerator for each SST file, such that the FileChecksumGenerator is not shared and it can be more general for checksum implementations. Changed the FileChecksumGenerator interface from Value, Extend, and GetChecksum to Update, Finalize, and GetChecksum. Temproal data should be maintained by the FileChecksumGenerator object itself and finally it can return the checksum string.
* Updated the public API of SST file checksum. Introduce the FileChecksumGenFactory to create the FileChecksumGenerator for each SST file, such that the FileChecksumGenerator is not shared and it can be more general for checksum implementations. Changed the FileChecksumGenerator interface from Value, Extend, and GetChecksum to Update, Finalize, and GetChecksum. Finalize should be only called once after all data is processed to generate the final checksum. Temproal data should be maintained by the FileChecksumGenerator object itself and finally it can return the checksum string.
### Bug Fixes
* Fix a bug where range tombstone blocks in ingested files were cached incorrectly during ingestion. If range tombstones were read from those incorrectly cached blocks, the keys they covered would be exposed.
* Fix a data race that might cause crash when calling DB::GetCreationTimeOfOldestFile() by a small chance. The bug was introduced in 6.6 Release.
* Fix a bug where a boolean value optimize_filters_for_hits was for max threads when calling load table handles after a flush or compaction. The value is correct to 1. The bug should not cause user visible problems.
* Fix a bug which might crash the service when write buffer manager fails to insert the dummy handle to the block cache.
### Performance Improvements
* In CompactRange, for levels starting from 0, if the level does not have any file with any key falling in the specified range, the level is skipped. So instead of always compacting from level 0, the compaction starts from the first level with keys in the specified range until the last such level.
* Reduced memory copy when reading sst footer and blobdb in direct IO mode.
* When restarting a database with large numbers of sst files, large amount of CPU time is spent on getting logical block size of the sst files, which slows down the starting progress, this inefficiency is optimized away with an internal cache for the logical block sizes.
### New Features
* Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered.
* When file lock failure when the lock is held by the current process, return acquiring time and thread ID in the error message.
* Added a new option, best_efforts_recovery (default: false), to allow database to open in a db dir with missing table files. During best efforts recovery, missing table files are ignored, and database recovers to the most recent state without missing table file. Cross-column-family consistency is not guaranteed even if WAL is enabled.
* options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts are now dynamically changeable.
## 6.8.0 (02/24/2020)
### Java API Changes

10
cache/cache_bench.cc vendored
View File

@ -15,7 +15,6 @@ int main() {
#include <sys/types.h>
#include <cinttypes>
#include "cache/simple_deleter.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
@ -50,6 +49,9 @@ namespace ROCKSDB_NAMESPACE {
class CacheBench;
namespace {
void deleter(const Slice& /*key*/, void* value) {
delete reinterpret_cast<char *>(value);
}
// State shared by all concurrent executions of the same benchmark.
class SharedState {
@ -147,8 +149,7 @@ class CacheBench {
// Cast uint64* to be char*, data would be copied to cache
Slice key(reinterpret_cast<char*>(&rand_key), 8);
// do insert
cache_->Insert(key, new char[10], 1,
SimpleDeleter<char[]>::GetInstance());
cache_->Insert(key, new char[10], 1, &deleter);
}
}
@ -226,8 +227,7 @@ class CacheBench {
int32_t prob_op = thread->rnd.Uniform(100);
if (prob_op >= 0 && prob_op < FLAGS_insert_percent) {
// do insert
cache_->Insert(key, new char[10], 1,
SimpleDeleter<char[]>::GetInstance());
cache_->Insert(key, new char[10], 1, &deleter);
} else if (prob_op -= FLAGS_insert_percent &&
prob_op < FLAGS_lookup_percent) {
// do lookup

91
cache/cache_test.cc vendored
View File

@ -16,7 +16,6 @@
#include <vector>
#include "cache/clock_cache.h"
#include "cache/lru_cache.h"
#include "cache/simple_deleter.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/string_util.h"
@ -41,17 +40,21 @@ static int DecodeValue(void* v) {
const std::string kLRU = "lru";
const std::string kClock = "clock";
void dumbDeleter(const Slice& /*key*/, void* /*value*/) {}
void eraseDeleter(const Slice& /*key*/, void* value) {
Cache* cache = reinterpret_cast<Cache*>(value);
cache->Erase("foo");
}
class CacheTest : public testing::TestWithParam<std::string> {
public:
static CacheTest* current_;
class Deleter : public Cache::Deleter {
public:
void operator()(const Slice& key, void* v) override {
static void Deleter(const Slice& key, void* v) {
current_->deleted_keys_.push_back(DecodeKey(key));
current_->deleted_values_.push_back(DecodeValue(v));
}
};
static const int kCacheSize = 1000;
static const int kNumShardBits = 4;
@ -61,7 +64,6 @@ class CacheTest : public testing::TestWithParam<std::string> {
std::vector<int> deleted_keys_;
std::vector<int> deleted_values_;
Deleter deleter_;
std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> cache2_;
@ -115,7 +117,8 @@ class CacheTest : public testing::TestWithParam<std::string> {
void Insert(std::shared_ptr<Cache> cache, int key, int value,
int charge = 1) {
cache->Insert(EncodeKey(key), EncodeValue(value), charge, &deleter_);
cache->Insert(EncodeKey(key), EncodeValue(value), charge,
&CacheTest::Deleter);
}
void Erase(std::shared_ptr<Cache> cache, int key) {
@ -164,9 +167,9 @@ TEST_P(CacheTest, UsageTest) {
for (int i = 1; i < 100; ++i) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, nullptr);
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter);
precise_cache->Insert(key, reinterpret_cast<void*>(value), kv_size,
nullptr);
dumbDeleter);
usage += kv_size;
ASSERT_EQ(usage, cache->GetUsage());
ASSERT_LT(usage, precise_cache->GetUsage());
@ -180,9 +183,10 @@ TEST_P(CacheTest, UsageTest) {
// make sure the cache will be overloaded
for (uint64_t i = 1; i < kCapacity; ++i) {
auto key = ToString(i);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5, nullptr);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
precise_cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
nullptr);
dumbDeleter);
}
// the usage should be close to the capacity
@ -211,11 +215,11 @@ TEST_P(CacheTest, PinnedUsageTest) {
auto kv_size = key.size() + 5;
Cache::Handle* handle;
Cache::Handle* handle_in_precise_cache;
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, nullptr,
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter,
&handle);
assert(handle);
precise_cache->Insert(key, reinterpret_cast<void*>(value), kv_size, nullptr,
&handle_in_precise_cache);
precise_cache->Insert(key, reinterpret_cast<void*>(value), kv_size,
dumbDeleter, &handle_in_precise_cache);
assert(handle_in_precise_cache);
pinned_usage += kv_size;
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
@ -250,9 +254,10 @@ TEST_P(CacheTest, PinnedUsageTest) {
// check that overloading the cache does not change the pinned usage
for (uint64_t i = 1; i < 2 * kCapacity; ++i) {
auto key = ToString(i);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5, nullptr);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
precise_cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
nullptr);
dumbDeleter);
}
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
ASSERT_EQ(precise_cache_pinned_usage, precise_cache->GetPinnedUsage());
@ -445,25 +450,15 @@ TEST_P(CacheTest, EvictionPolicyRef) {
TEST_P(CacheTest, EvictEmptyCache) {
// Insert item large than capacity to trigger eviction on empty cache.
auto cache = NewCache(1, 0, false);
ASSERT_OK(cache->Insert("foo", nullptr, 10, nullptr));
ASSERT_OK(cache->Insert("foo", nullptr, 10, dumbDeleter));
}
TEST_P(CacheTest, EraseFromDeleter) {
// Have deleter which will erase item from cache, which will re-enter
// the cache at that point.
class EraseDeleter : public Cache::Deleter {
public:
void operator()(const Slice& /*key*/, void* value) override {
Cache* const cache = static_cast<Cache*>(value);
cache->Erase("foo");
}
};
EraseDeleter erase_deleter;
std::shared_ptr<Cache> cache = NewCache(10, 0, false);
ASSERT_OK(cache->Insert("foo", nullptr, 1, nullptr));
ASSERT_OK(cache->Insert("bar", cache.get(), 1, &erase_deleter));
ASSERT_OK(cache->Insert("foo", nullptr, 1, dumbDeleter));
ASSERT_OK(cache->Insert("bar", cache.get(), 1, eraseDeleter));
cache->Erase("bar");
ASSERT_EQ(nullptr, cache->Lookup("foo"));
ASSERT_EQ(nullptr, cache->Lookup("bar"));
@ -532,11 +527,17 @@ class Value {
size_t v_;
};
namespace {
void deleter(const Slice& /*key*/, void* value) {
delete static_cast<Value *>(value);
}
} // namespace
TEST_P(CacheTest, ReleaseAndErase) {
std::shared_ptr<Cache> cache = NewCache(5, 0, false);
Cache::Handle* handle;
Status s =
cache->Insert(EncodeKey(100), EncodeValue(100), 1, &deleter_, &handle);
Status s = cache->Insert(EncodeKey(100), EncodeValue(100), 1,
&CacheTest::Deleter, &handle);
ASSERT_TRUE(s.ok());
ASSERT_EQ(5U, cache->GetCapacity());
ASSERT_EQ(1U, cache->GetUsage());
@ -550,8 +551,8 @@ TEST_P(CacheTest, ReleaseAndErase) {
TEST_P(CacheTest, ReleaseWithoutErase) {
std::shared_ptr<Cache> cache = NewCache(5, 0, false);
Cache::Handle* handle;
Status s =
cache->Insert(EncodeKey(100), EncodeValue(100), 1, &deleter_, &handle);
Status s = cache->Insert(EncodeKey(100), EncodeValue(100), 1,
&CacheTest::Deleter, &handle);
ASSERT_TRUE(s.ok());
ASSERT_EQ(5U, cache->GetCapacity());
ASSERT_EQ(1U, cache->GetUsage());
@ -573,8 +574,7 @@ TEST_P(CacheTest, SetCapacity) {
// Insert 5 entries, but not releasing.
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i+1);
Status s = cache->Insert(key, new Value(i + 1), 1,
SimpleDeleter<Value>::GetInstance(), &handles[i]);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
ASSERT_EQ(5U, cache->GetCapacity());
@ -589,8 +589,7 @@ TEST_P(CacheTest, SetCapacity) {
// and usage should be 7
for (size_t i = 5; i < 10; i++) {
std::string key = ToString(i+1);
Status s = cache->Insert(key, new Value(i + 1), 1,
SimpleDeleter<Value>::GetInstance(), &handles[i]);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
ASSERT_EQ(10U, cache->GetCapacity());
@ -618,8 +617,7 @@ TEST_P(LRUCacheTest, SetStrictCapacityLimit) {
Status s;
for (size_t i = 0; i < 10; i++) {
std::string key = ToString(i + 1);
s = cache->Insert(key, new Value(i + 1), 1,
SimpleDeleter<Value>::GetInstance(), &handles[i]);
s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_OK(s);
ASSERT_NE(nullptr, handles[i]);
}
@ -630,8 +628,7 @@ TEST_P(LRUCacheTest, SetStrictCapacityLimit) {
Value* extra_value = new Value(0);
cache->SetStrictCapacityLimit(true);
Cache::Handle* handle;
s = cache->Insert(extra_key, extra_value, 1,
SimpleDeleter<Value>::GetInstance(), &handle);
s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(nullptr, handle);
ASSERT_EQ(10, cache->GetUsage());
@ -644,18 +641,15 @@ TEST_P(LRUCacheTest, SetStrictCapacityLimit) {
std::shared_ptr<Cache> cache2 = NewCache(5, 0, true);
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i + 1);
s = cache2->Insert(key, new Value(i + 1), 1,
SimpleDeleter<Value>::GetInstance(), &handles[i]);
s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_OK(s);
ASSERT_NE(nullptr, handles[i]);
}
s = cache2->Insert(extra_key, extra_value, 1,
SimpleDeleter<Value>::GetInstance(), &handle);
s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(nullptr, handle);
// test insert without handle
s = cache2->Insert(extra_key, extra_value, 1,
SimpleDeleter<Value>::GetInstance());
s = cache2->Insert(extra_key, extra_value, 1, &deleter);
// AS if the key have been inserted into cache but get evicted immediately.
ASSERT_OK(s);
ASSERT_EQ(5, cache2->GetUsage());
@ -677,8 +671,7 @@ TEST_P(CacheTest, OverCapacity) {
// Insert n+1 entries, but not releasing.
for (size_t i = 0; i < n + 1; i++) {
std::string key = ToString(i+1);
Status s = cache->Insert(key, new Value(i + 1), 1,
SimpleDeleter<Value>::GetInstance(), &handles[i]);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}

25
cache/clock_cache.cc vendored
View File

@ -175,13 +175,11 @@ namespace {
// Cache entry meta data.
struct CacheHandle {
using Deleter = Cache::Deleter;
Slice key;
uint32_t hash;
void* value;
size_t charge;
Deleter* deleter;
void (*deleter)(const Slice&, void* value);
// Flags and counters associated with the cache handle:
// lowest bit: n-cache bit
@ -195,7 +193,8 @@ struct CacheHandle {
CacheHandle(const CacheHandle& a) { *this = a; }
CacheHandle(const Slice& k, void* v, Deleter* del)
CacheHandle(const Slice& k, void* v,
void (*del)(const Slice& key, void* value))
: key(k), value(v), deleter(del) {}
CacheHandle& operator=(const CacheHandle& a) {
@ -270,8 +269,8 @@ class ClockCacheShard final : public CacheShard {
void SetCapacity(size_t capacity) override;
void SetStrictCapacityLimit(bool strict_capacity_limit) override;
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
Deleter* deleter, Cache::Handle** handle,
Cache::Priority priority) override;
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority priority) override;
Cache::Handle* Lookup(const Slice& key, uint32_t hash) override;
// If the entry in in cache, increase reference count and return true.
// Return false otherwise.
@ -340,8 +339,9 @@ class ClockCacheShard final : public CacheShard {
bool EvictFromCache(size_t charge, CleanupContext* context);
CacheHandle* Insert(const Slice& key, uint32_t hash, void* value,
size_t charge, Deleter* deleter, bool hold_reference,
CleanupContext* context);
size_t change,
void (*deleter)(const Slice& key, void* value),
bool hold_reference, CleanupContext* context);
// Guards list_, head_, and recycle_. In addition, updating table_ also has
// to hold the mutex, to avoid the cache being in inconsistent state.
@ -561,9 +561,9 @@ void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
std::memory_order_relaxed);
}
CacheHandle* ClockCacheShard::Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
Deleter* deleter, bool hold_reference,
CacheHandle* ClockCacheShard::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), bool hold_reference,
CleanupContext* context) {
size_t total_charge =
CacheHandle::CalcTotalCharge(key, charge, metadata_charge_policy_);
@ -610,7 +610,8 @@ CacheHandle* ClockCacheShard::Insert(const Slice& key, uint32_t hash,
}
Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge, Deleter* deleter,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** out_handle,
Cache::Priority /*priority*/) {
CleanupContext context;

3
cache/lru_cache.cc vendored
View File

@ -337,7 +337,8 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) {
}
Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge, Deleter* deleter,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority priority) {
// Allocate the memory here outside of the mutex
// If the cache is full, we'll have to release it

8
cache/lru_cache.h vendored
View File

@ -48,10 +48,8 @@ namespace ROCKSDB_NAMESPACE {
// (to move into state 3).
struct LRUHandle {
using Deleter = Cache::Deleter;
void* value;
Deleter* deleter;
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
@ -211,7 +209,9 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
// Like Cache methods, but with an extra "hash" parameter.
virtual Status Insert(const Slice& key, uint32_t hash, void* value,
size_t charge, Deleter* deleter, Cache::Handle** handle,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle,
Cache::Priority priority) override;
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override;
virtual bool Ref(Cache::Handle* handle) override;

View File

@ -44,8 +44,8 @@ void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) {
}
Status ShardedCache::Insert(const Slice& key, void* value, size_t charge,
Deleter* deleter, Handle** handle,
Priority priority) {
void (*deleter)(const Slice& key, void* value),
Handle** handle, Priority priority) {
uint32_t hash = HashSlice(key);
return GetShard(Shard(hash))
->Insert(key, hash, value, charge, deleter, handle, priority);

11
cache/sharded_cache.h vendored
View File

@ -21,14 +21,13 @@ namespace ROCKSDB_NAMESPACE {
// Single cache shard interface.
class CacheShard {
public:
using Deleter = Cache::Deleter;
CacheShard() = default;
virtual ~CacheShard() = default;
virtual Status Insert(const Slice& key, uint32_t hash, void* value,
size_t charge, Deleter* deleter, Cache::Handle** handle,
Cache::Priority priority) = 0;
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority priority) = 0;
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0;
virtual bool Ref(Cache::Handle* handle) = 0;
virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0;
@ -71,8 +70,8 @@ class ShardedCache : public Cache {
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override;
virtual Status Insert(const Slice& key, void* value, size_t charge,
Deleter* deleter, Handle** handle,
Priority priority) override;
void (*deleter)(const Slice& key, void* value),
Handle** handle, Priority priority) override;
virtual Handle* Lookup(const Slice& key, Statistics* stats) override;
virtual bool Ref(Handle* handle) override;
virtual bool Release(Handle* handle, bool force_erase = false) override;

View File

@ -1,47 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/cache.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
template <typename T>
class SimpleDeleter : public Cache::Deleter {
public:
static SimpleDeleter* GetInstance() {
static auto deleter = new SimpleDeleter;
return deleter;
}
void operator()(const Slice& /* key */, void* value) override {
T* const t = static_cast<T*>(value);
delete t;
}
private:
SimpleDeleter() = default;
};
template <typename T>
class SimpleDeleter<T[]> : public Cache::Deleter {
public:
static SimpleDeleter* GetInstance() {
static auto deleter = new SimpleDeleter;
return deleter;
}
void operator()(const Slice& /* key */, void* value) override {
T* const t = static_cast<T*>(value);
delete[] t;
}
private:
SimpleDeleter() = default;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -314,11 +314,11 @@ class CompactionJobTest : public testing::Test {
num_input_files += level_files.size();
}
Compaction compaction(cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(),
compaction_input_files, output_level, 1024 * 1024,
10 * 1024 * 1024, 0, kNoCompression,
cfd->ioptions()->compression_opts, 0, {}, true);
Compaction compaction(
cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), compaction_input_files, output_level,
1024 * 1024, 10 * 1024 * 1024, 0, kNoCompression,
cfd->GetLatestMutableCFOptions()->compression_opts, 0, {}, true);
compaction.SetInputVersion(cfd->current());
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());

View File

@ -110,9 +110,9 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use it.
if (ioptions.bottommost_compression != kDisableCompressionOption &&
if (mutable_cf_options.bottommost_compression != kDisableCompressionOption &&
level >= (vstorage->num_non_empty_levels() - 1)) {
return ioptions.bottommost_compression;
return mutable_cf_options.bottommost_compression;
}
// If the user has specified a different compression level for each level,
// then pick the compression for that level.
@ -132,22 +132,22 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
}
}
CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
CompressionOptions GetCompressionOptions(const MutableCFOptions& cf_options,
const VersionStorageInfo* vstorage,
int level,
const bool enable_compression) {
if (!enable_compression) {
return ioptions.compression_opts;
return cf_options.compression_opts;
}
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use the specified compression options
// for the bottmomost_compression.
if (ioptions.bottommost_compression != kDisableCompressionOption &&
if (cf_options.bottommost_compression != kDisableCompressionOption &&
level >= (vstorage->num_non_empty_levels() - 1) &&
ioptions.bottommost_compression_opts.enabled) {
return ioptions.bottommost_compression_opts;
cf_options.bottommost_compression_opts.enabled) {
return cf_options.bottommost_compression_opts;
}
return ioptions.compression_opts;
return cf_options.compression_opts;
}
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
@ -359,7 +359,7 @@ Compaction* CompactionPicker::CompactFiles(
vstorage, ioptions_, mutable_cf_options, input_files, output_level,
compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
GetCompressionOptions(ioptions_, vstorage, output_level),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_options.max_subcompactions,
/* grandparents */ {}, true);
RegisterCompaction(c);
@ -634,7 +634,7 @@ Compaction* CompactionPicker::CompactRange(
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
GetCompressionOptions(ioptions_, vstorage, output_level),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_range_options.max_subcompactions, /* grandparents */ {},
/* is manual */ true);
RegisterCompaction(c);
@ -787,7 +787,7 @@ Compaction* CompactionPicker::CompactRange(
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
GetCompressionOptions(ioptions_, vstorage, output_level),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_range_options.max_subcompactions, std::move(grandparents),
/* is manual compaction */ true);

View File

@ -305,9 +305,9 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
int level, int base_level,
const bool enable_compression = true);
CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
int level,
CompressionOptions GetCompressionOptions(
const MutableCFOptions& mutable_cf_options,
const VersionStorageInfo* vstorage, int level,
const bool enable_compression = true);
} // namespace ROCKSDB_NAMESPACE

View File

@ -71,11 +71,14 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
if (current_time > mutable_cf_options.ttl) {
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
FileMetaData* f = *ritr;
uint64_t creation_time = f->TryGetFileCreationTime();
if (creation_time == kUnknownFileCreationTime ||
if (f->fd.table_reader && f->fd.table_reader->GetTableProperties()) {
uint64_t creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time == 0 ||
creation_time >= (current_time - mutable_cf_options.ttl)) {
break;
}
}
total_size -= f->compensated_file_size;
inputs[0].files.push_back(f);
}
@ -92,17 +95,21 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
}
for (const auto& f : inputs[0].files) {
uint64_t creation_time = 0;
if (f && f->fd.table_reader && f->fd.table_reader->GetTableProperties()) {
creation_time = f->fd.table_reader->GetTableProperties()->creation_time;
}
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with creation time %" PRIu64 " for deletion",
cf_name.c_str(), f->fd.GetNumber(),
f->TryGetFileCreationTime());
cf_name.c_str(), f->fd.GetNumber(), creation_time);
}
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
{}, /* is manual */ false, vstorage->CompactionScore(0),
kNoCompression, mutable_cf_options.compression_opts,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}
@ -142,7 +149,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
ioptions_.compression_opts, 0 /* max_subcompactions */, {},
mutable_cf_options.compression_opts, 0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
@ -190,8 +197,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
{}, /* is manual */ false, vstorage->CompactionScore(0),
kNoCompression, mutable_cf_options.compression_opts,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}

View File

@ -250,7 +250,6 @@ void LevelCompactionBuilder::SetupInitialFiles() {
cf_name_, vstorage_, &start_level_, &output_level_,
&start_level_inputs_);
if (!start_level_inputs_.empty()) {
is_manual_ = true;
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
return;
}
@ -384,7 +383,7 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
GetPathId(ioptions_, mutable_cf_options_, output_level_),
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level_, vstorage_->base_level()),
GetCompressionOptions(ioptions_, vstorage_, output_level_),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);

View File

@ -751,7 +751,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, enable_compression),
GetCompressionOptions(ioptions_, vstorage_, start_level,
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
@ -959,8 +959,8 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1),
GetCompressionOptions(ioptions_, vstorage_, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
}
@ -1029,7 +1029,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, true /* enable_compression */),
GetCompressionOptions(ioptions_, vstorage_, start_level,
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
true /* enable_compression */),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);

View File

@ -2053,7 +2053,8 @@ class DBBasicTestWithParallelIO
virtual const char* Name() const override { return "MyBlockCache"; }
virtual Status Insert(const Slice& key, void* value, size_t charge,
Deleter* deleter, Handle** handle = nullptr,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
num_inserts_++;
return target_->Insert(key, value, charge, deleter, handle, priority);

View File

@ -443,8 +443,9 @@ class MockCache : public LRUCache {
false /*strict_capacity_limit*/, 0.0 /*high_pri_pool_ratio*/) {
}
Status Insert(const Slice& key, void* value, size_t charge, Deleter* deleter,
Handle** handle, Priority priority) override {
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle,
Priority priority) override {
if (priority == Priority::LOW) {
low_pri_insert_count++;
} else {

View File

@ -5217,31 +5217,6 @@ TEST_P(DBCompactionTestWithParam,
}
}
TEST_F(DBCompactionTest, FifoCompactionGetFileCreationTime) {
MockEnv mock_env(env_);
do {
Options options = CurrentOptions();
options.table_factory.reset(new BlockBasedTableFactory());
options.env = &mock_env;
options.ttl = static_cast<uint64_t>(24) * 3600;
options.compaction_style = kCompactionStyleFIFO;
constexpr size_t kNumFiles = 24;
options.max_open_files = 20;
constexpr size_t kNumKeysPerFile = 10;
DestroyAndReopen(options);
for (size_t i = 0; i < kNumFiles; ++i) {
for (size_t j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(Put(std::to_string(j), "value_" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
mock_env.FakeSleepForMicroseconds(
static_cast<uint64_t>(1000 * 1000 * (1 + options.ttl)));
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
} while (ChangeOptions());
}
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE

View File

@ -1248,7 +1248,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.sample_for_compression,
cfd->ioptions()->compression_opts, paranoid_file_checks,
mutable_cf_options.compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);

View File

@ -861,6 +861,62 @@ TEST_F(DBOptionsTest, FIFOTtlBackwardCompatible) {
#endif // ROCKSDB_LITE
TEST_F(DBOptionsTest, ChangeCompression) {
if (!Snappy_Supported() || !LZ4_Supported()) {
return;
}
Options options;
options.write_buffer_size = 10 << 10; // 10KB
options.level0_file_num_compaction_trigger = 2;
options.create_if_missing = true;
options.compression = CompressionType::kLZ4Compression;
options.bottommost_compression = CompressionType::kNoCompression;
options.bottommost_compression_opts.level = 2;
ASSERT_OK(TryReopen(options));
CompressionType compression_used = CompressionType::kLZ4Compression;
CompressionOptions compression_opt_used;
bool compacted = false;
SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* c = reinterpret_cast<Compaction*>(arg);
compression_used = c->output_compression();
compression_opt_used = c->output_compression_opts();
compacted = true;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kNoCompression, compression_used);
ASSERT_EQ(options.compression_opts.level, compression_opt_used.level);
compression_used = CompressionType::kLZ4Compression;
compacted = false;
ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compression", "kSnappyCompression"},
{"bottommost_compression_opts", "0:6:0:0:0:true"}}));
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kSnappyCompression, compression_used);
ASSERT_EQ(6, compression_opt_used.level);
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -45,7 +45,8 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) {
}
} // namespace
class DBTablePropertiesTest : public DBTestBase {
class DBTablePropertiesTest : public DBTestBase,
public testing::WithParamInterface<std::string> {
public:
DBTablePropertiesTest() : DBTestBase("/db_table_properties_test") {}
TablePropertiesCollection TestGetPropertiesOfTablesInRange(
@ -251,7 +252,20 @@ TEST_F(DBTablePropertiesTest, GetColumnFamilyNameProperty) {
}
}
TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
class DeletionTriggeredCompactionTestListener : public EventListener {
public:
void OnCompactionBegin(DB* , const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.compaction_reason,
CompactionReason::kFilesMarkedForCompaction);
}
void OnCompactionCompleted(DB* , const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.compaction_reason,
CompactionReason::kFilesMarkedForCompaction);
}
};
TEST_P(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
int kNumKeys = 1000;
int kWindowSize = 100;
int kNumDelsTrigger = 90;
@ -260,6 +274,10 @@ TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(compact_on_del);
if(GetParam() == "kCompactionStyleUniversal") {
opts.compaction_style = kCompactionStyleUniversal;
}
Reopen(opts);
// add an L1 file to prevent tombstones from dropping due to obsolescence
@ -268,6 +286,11 @@ TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
Flush();
MoveFilesToLevel(1);
DeletionTriggeredCompactionTestListener *listener =
new DeletionTriggeredCompactionTestListener();
opts.listeners.emplace_back(listener);
Reopen(opts);
for (int i = 0; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
@ -280,7 +303,6 @@ TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(1), 0);
// Change the window size and deletion trigger and ensure new values take
// effect
@ -302,7 +324,6 @@ TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(1), 0);
// Change the window size to disable delete triggered compaction
kWindowSize = 0;
@ -322,9 +343,16 @@ TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
INSTANTIATE_TEST_CASE_P(
DBTablePropertiesTest,
DBTablePropertiesTest,
::testing::Values(
"kCompactionStyleLevel",
"kCompactionStyleUniversal"
));
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -5503,7 +5503,7 @@ TEST_F(DBTest, EmptyCompactedDB) {
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(DBTest, SuggestCompactRangeTest) {
TEST_F(DBTest, DISABLED_SuggestCompactRangeTest) {
class CompactionFilterFactoryGetContext : public CompactionFilterFactory {
public:
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
@ -5611,6 +5611,7 @@ TEST_F(DBTest, SuggestCompactRangeTest) {
ASSERT_EQ(1, NumTableFilesAtLevel(1));
}
TEST_F(DBTest, PromoteL0) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;

View File

@ -469,6 +469,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
Status s;
std::string old_manifest;
std::string new_manifest;
std::atomic<bool> fail_manifest(false);
DestroyAndReopen(options);
old_manifest = GetManifestNameFromLiveFiles();
@ -482,15 +483,33 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
listener->EnableAutoRecovery(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
// Wait for flush of 2nd L0 file before starting compaction
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
"BackgroundCallCompaction:0"},
// Wait for compaction to detect manifest write error
{"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
// Make compaction thread wait for error to be cleared
{"CompactionManifestWriteError:1",
"DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"}});
// trigger manifest write failure in compaction thread
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
"BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void*) {
if (fail_manifest.load()) {
fault_fs->SetFilesystemActive(false,error_msg); }
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put(Key(1), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
TEST_SYNC_POINT("CompactionManifestWriteError:0");
TEST_SYNC_POINT("CompactionManifestWriteError:1");
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);

View File

@ -6,7 +6,9 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
@ -2799,6 +2801,47 @@ TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
GenerateAndAddExternalFile(options, data);
}
TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresents) {
Options options = CurrentOptions();
DestroyAndReopen(options);
constexpr size_t kValueSize = 8;
Random rnd(301);
std::string value(RandomString(&rnd, kValueSize));
// Write some key to make global seqno larger than zero
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("ab" + Key(i), value));
}
// Get a Snapshot to make RocksDB assign global seqno to ingested sst files.
auto snap = dbfull()->GetSnapshot();
std::string fname = sst_files_dir_ + "test_file";
rocksdb::SstFileWriter writer(EnvOptions(), options);
ASSERT_OK(writer.Open(fname));
std::string key1 = "ab";
std::string key2 = "ab";
// Make the prefix of key2 is same with key1 add zero seqno. The tail of every
// key is composed as (seqno << 8 | value_type), and here `1` represents
// ValueType::kTypeValue
PutFixed64(&key2, PackSequenceAndType(0, kTypeValue));
key2 += "cdefghijkl";
ASSERT_OK(writer.Put(key1, value));
ASSERT_OK(writer.Put(key2, value));
ExternalSstFileInfo info;
ASSERT_OK(writer.Finish(&info));
ASSERT_OK(dbfull()->IngestExternalFile({info.file_path},
IngestExternalFileOptions()));
dbfull()->ReleaseSnapshot(snap);
ASSERT_EQ(value, Get(key1));
// You will get error here
ASSERT_EQ(value, Get(key2));
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true),

View File

@ -375,6 +375,11 @@ Status FlushJob::WriteLevel0Table() {
meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
meta_.file_creation_time = current_time;
uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
CompactionStyle::kCompactionStyleFIFO)
? current_time
: meta_.oldest_ancester_time;
IOStatus io_s;
s = BuildTable(
dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
@ -384,12 +389,11 @@ Status FlushJob::WriteLevel0Table() {
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
output_compression_, mutable_cf_options_.sample_for_compression,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, &io_s, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
meta_.oldest_ancester_time, oldest_key_time, write_hint,
current_time);
creation_time, oldest_key_time, write_hint, current_time);
if (!io_s.ok()) {
io_status_ = io_s;
}

View File

@ -9,7 +9,6 @@
#include "db/table_cache.h"
#include "cache/simple_deleter.h"
#include "db/dbformat.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/snapshot_impl.h"
@ -34,6 +33,12 @@ namespace ROCKSDB_NAMESPACE {
namespace {
template <class T>
static void DeleteEntry(const Slice& /*key*/, void* value) {
T* typed_value = reinterpret_cast<T*>(value);
delete typed_value;
}
static void UnrefEntry(void* arg1, void* arg2) {
Cache* cache = reinterpret_cast<Cache*>(arg1);
Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
@ -161,8 +166,8 @@ Status TableCache::FindTable(const FileOptions& file_options,
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
s = cache_->Insert(key, table_reader.get(), 1,
SimpleDeleter<TableReader>::GetInstance(), handle);
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
@ -420,7 +425,7 @@ Status TableCache::Get(const ReadOptions& options,
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
SimpleDeleter<std::string>::GetInstance());
&DeleteEntry<std::string>);
}
#endif // ROCKSDB_LITE
@ -540,7 +545,7 @@ Status TableCache::MultiGet(const ReadOptions& options,
row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
SimpleDeleter<std::string>::GetInstance());
&DeleteEntry<std::string>);
}
}
}

View File

@ -84,7 +84,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
if (aligned_buf == nullptr) {
buf.Read(scratch, offset_advance, res_len);
} else {
scratch = buf.BufferStart();
scratch = buf.BufferStart() + offset_advance;
aligned_buf->reset(buf.Release());
}
}

View File

@ -96,6 +96,34 @@ class RandomAccessFileReaderTest : public testing::Test {
}
};
TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
if (!IsDirectIOSupported()) {
printf("Direct IO is not supported, skip this test\n");
return;
}
std::string fname = "read-direct-io";
Random rand(0);
std::string content;
test::RandomString(&rand, static_cast<int>(alignment()), &content);
Write(fname, content);
FileOptions opts;
opts.use_direct_reads = true;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
ASSERT_TRUE(r->use_direct_io());
size_t offset = alignment() / 2;
size_t len = alignment() / 3;
Slice result;
AlignedBuf buf;
for (bool for_compaction : {true, false}) {
ASSERT_OK(r->Read(offset, len, &result, nullptr, &buf, for_compaction));
ASSERT_EQ(result.ToString(), content.substr(offset, len));
}
}
TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
if (!IsDirectIOSupported()) {
printf("Direct IO is not supported, skip this test\n");

View File

@ -645,6 +645,7 @@ struct AdvancedColumnFamilyOptions {
bool report_bg_io_stats = false;
// Files older than TTL will go through the compaction process.
// Pre-req: This needs max_open_files to be set to -1.
// In Level: Non-bottom-level files older than TTL will go through the
// compation process.
// In FIFO: Files older than TTL will be deleted.
@ -672,6 +673,7 @@ struct AdvancedColumnFamilyOptions {
// Supported in Level and FIFO compaction.
// In FIFO compaction, this option has the same meaning as TTL and whichever
// stricter will be used.
// Pre-req: max_open_file == -1.
// unit: seconds. Ex: 7 days = 7 * 24 * 60 * 60
//
// Values:

View File

@ -132,13 +132,6 @@ extern std::shared_ptr<Cache> NewClockCache(
kDefaultCacheMetadataChargePolicy);
class Cache {
public:
class Deleter {
public:
virtual ~Deleter() = default;
virtual void operator()(const Slice& key, void* value) = 0;
};
// Depending on implementation, cache entries with high priority could be less
// likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW };
@ -175,10 +168,10 @@ class Cache {
// insert. In case of error value will be cleanup.
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter". It is the caller's responsibility to
// ensure that the deleter outlives the cache entries referring to it.
// value will be passed to "deleter".
virtual Status Insert(const Slice& key, void* value, size_t charge,
Deleter* deleter, Handle** handle = nullptr,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) = 0;
// If the cache has no mapping for "key", returns nullptr.

View File

@ -97,4 +97,10 @@ class FileChecksumList {
// Create a new file checksum list.
extern FileChecksumList* NewFileChecksumList();
// Return a shared_ptr of the builtin Crc32 based file checksum generatory
// factory object, which can be shared to create the Crc32c based checksum
// generator object.
extern std::shared_ptr<FileChecksumGenFactory>
GetFileChecksumGenCrc32cFactory();
} // namespace ROCKSDB_NAMESPACE

View File

@ -136,6 +136,11 @@ class TablePropertiesCollectorFactory {
// The name of the properties collector can be used for debugging purpose.
virtual const char* Name() const = 0;
// Can be overridden by sub-classes to return the Name, followed by
// configuration info that will // be logged to the info log when the
// DB is opened
virtual std::string ToString() const { return Name(); }
};
// TableProperties contains a bunch of read-only properties of its associated

View File

@ -18,9 +18,9 @@ namespace ROCKSDB_NAMESPACE {
class CompactOnDeletionCollectorFactory
: public TablePropertiesCollectorFactory {
public:
virtual ~CompactOnDeletionCollectorFactory() {}
~CompactOnDeletionCollectorFactory() {}
virtual TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override;
// Change the value of sliding_window_size "N"
@ -34,10 +34,12 @@ class CompactOnDeletionCollectorFactory
deletion_trigger_.store(deletion_trigger);
}
virtual const char* Name() const override {
const char* Name() const override {
return "CompactOnDeletionCollector";
}
std::string ToString() const override;
private:
friend std::shared_ptr<CompactOnDeletionCollectorFactory>
NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,

View File

@ -6,7 +6,7 @@
#define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 9
#define ROCKSDB_PATCH 0
#define ROCKSDB_PATCH 1
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -69,9 +69,11 @@ WriteBufferManager::~WriteBufferManager() {
#ifndef ROCKSDB_LITE
if (cache_rep_) {
for (auto* handle : cache_rep_->dummy_handles_) {
if (handle != nullptr) {
cache_rep_->cache_->Release(handle, true);
}
}
}
#endif // ROCKSDB_LITE
}
@ -88,9 +90,16 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) {
while (new_mem_used > cache_rep_->cache_allocated_size_) {
// Expand size by at least 256KB.
// Add a dummy record to the cache
Cache::Handle* handle;
Cache::Handle* handle = nullptr;
cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
kSizeDummyEntry, nullptr, &handle);
// We keep the handle even if insertion fails and a null handle is
// returned, so that when memory shrinks, we don't release extra
// entries from cache.
// Ideallly we should prevent this allocation from happening if
// this insertion fails. However, the callers to this code path
// are not able to handle failures properly. We'll need to improve
// it in the future.
cache_rep_->dummy_handles_.push_back(handle);
cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
}
@ -119,7 +128,11 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) {
if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
assert(!cache_rep_->dummy_handles_.empty());
cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
auto* handle = cache_rep_->dummy_handles_.back();
// If insert failed, handle is null so we should not release.
if (handle != nullptr) {
cache_rep_->cache_->Release(handle, true);
}
cache_rep_->dummy_handles_.pop_back();
cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
}

View File

@ -146,6 +146,35 @@ TEST_F(WriteBufferManagerTest, NoCapCacheCost) {
ASSERT_GE(cache->GetPinnedUsage(), 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024 + 10000);
}
TEST_F(WriteBufferManagerTest, CacheFull) {
// 15MB cache size with strict capacity
LRUCacheOptions lo;
lo.capacity = 12 * 1024 * 1024;
lo.num_shard_bits = 0;
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache = NewLRUCache(lo);
std::unique_ptr<WriteBufferManager> wbf(new WriteBufferManager(0, cache));
wbf->ReserveMem(10 * 1024 * 1024);
size_t prev_pinned = cache->GetPinnedUsage();
ASSERT_GE(prev_pinned, 10 * 1024 * 1024);
// Some insert will fail
wbf->ReserveMem(10 * 1024 * 1024);
ASSERT_LE(cache->GetPinnedUsage(), 12 * 1024 * 1024);
// Increase capacity so next insert will succeed
cache->SetCapacity(30 * 1024 * 1024);
wbf->ReserveMem(10 * 1024 * 1024);
ASSERT_GT(cache->GetPinnedUsage(), 20 * 1024 * 1024);
// Gradually release 20 MB
for (int i = 0; i < 40; i++) {
wbf->FreeMem(512 * 1024);
}
ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 20 * 1024 * 1024);
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

View File

@ -57,9 +57,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
cf_options.purge_redundant_kvs_while_flush),
use_fsync(db_options.use_fsync),
compression_per_level(cf_options.compression_per_level),
bottommost_compression(cf_options.bottommost_compression),
bottommost_compression_opts(cf_options.bottommost_compression_opts),
compression_opts(cf_options.compression_opts),
level_compaction_dynamic_level_bytes(
cf_options.level_compaction_dynamic_level_bytes),
access_hint_on_compaction_start(

View File

@ -90,12 +90,6 @@ struct ImmutableCFOptions {
std::vector<CompressionType> compression_per_level;
CompressionType bottommost_compression;
CompressionOptions bottommost_compression_opts;
CompressionOptions compression_opts;
bool level_compaction_dynamic_level_bytes;
Options::AccessHint access_hint_on_compaction_start;
@ -166,6 +160,9 @@ struct MutableCFOptions {
paranoid_file_checks(options.paranoid_file_checks),
report_bg_io_stats(options.report_bg_io_stats),
compression(options.compression),
bottommost_compression(options.bottommost_compression),
compression_opts(options.compression_opts),
bottommost_compression_opts(options.bottommost_compression_opts),
sample_for_compression(options.sample_for_compression) {
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}
@ -198,6 +195,7 @@ struct MutableCFOptions {
paranoid_file_checks(false),
report_bg_io_stats(false),
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression),
bottommost_compression(kDisableCompressionOption),
sample_for_compression(0) {}
explicit MutableCFOptions(const Options& options);
@ -253,6 +251,10 @@ struct MutableCFOptions {
bool paranoid_file_checks;
bool report_bg_io_stats;
CompressionType compression;
CompressionType bottommost_compression;
CompressionOptions compression_opts;
CompressionOptions bottommost_compression_opts;
uint64_t sample_for_compression;
// Derived options

View File

@ -310,14 +310,13 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log,
"Options.compaction_options_fifo.allow_compaction: %d",
compaction_options_fifo.allow_compaction);
std::string collector_names;
std::ostringstream collector_info;
for (const auto& collector_factory : table_properties_collector_factories) {
collector_names.append(collector_factory->Name());
collector_names.append("; ");
collector_info << collector_factory->ToString() << ';';
}
ROCKS_LOG_HEADER(
log, " Options.table_properties_collectors: %s",
collector_names.c_str());
collector_info.str().c_str());
ROCKS_LOG_HEADER(log,
" Options.inplace_update_support: %d",
inplace_update_support);

View File

@ -209,6 +209,10 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks;
cf_opts.report_bg_io_stats = mutable_cf_options.report_bg_io_stats;
cf_opts.compression = mutable_cf_options.compression;
cf_opts.compression_opts = mutable_cf_options.compression_opts;
cf_opts.bottommost_compression = mutable_cf_options.bottommost_compression;
cf_opts.bottommost_compression_opts =
mutable_cf_options.bottommost_compression_opts;
cf_opts.sample_for_compression = mutable_cf_options.sample_for_compression;
cf_opts.table_factory = options.table_factory;
@ -259,6 +263,8 @@ std::unordered_map<std::string, CompressionType>
const std::string kNameComparator = "comparator";
const std::string kNameEnv = "env";
const std::string kNameMergeOperator = "merge_operator";
const std::string kOptNameBMCompOpts = "bottommost_compression_opts";
const std::string kOptNameCompOpts = "compression_opts";
template <typename T>
Status GetStringFromStruct(
@ -786,6 +792,66 @@ bool SerializeSingleOptionHelper(const char* opt_address,
return true;
}
Status ParseCompressionOptions(const std::string& value,
const std::string& name,
CompressionOptions& compression_opts) {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.window_bits = ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.level = ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
end = value.find(':', start);
compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// enabled is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
}
return Status::OK();
}
Status GetMutableOptionsFromStrings(
const MutableCFOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map,
@ -793,30 +859,50 @@ Status GetMutableOptionsFromStrings(
assert(new_options);
*new_options = base_options;
for (const auto& o : options_map) {
auto& option_name = o.first;
auto& option_value = o.second;
try {
auto iter = cf_options_type_info.find(o.first);
if (option_name == kOptNameBMCompOpts) {
Status s =
ParseCompressionOptions(option_value, option_name,
new_options->bottommost_compression_opts);
if (!s.ok()) {
return s;
}
} else if (option_name == kOptNameCompOpts) {
Status s = ParseCompressionOptions(option_value, option_name,
new_options->compression_opts);
if (!s.ok()) {
return s;
}
} else {
auto iter = cf_options_type_info.find(option_name);
if (iter == cf_options_type_info.end()) {
return Status::InvalidArgument("Unrecognized option: " + o.first);
return Status::InvalidArgument("Unrecognized option: " + option_name);
}
const auto& opt_info = iter->second;
if (!opt_info.is_mutable) {
return Status::InvalidArgument("Option not changeable: " + o.first);
return Status::InvalidArgument("Option not changeable: " +
option_name);
}
if (opt_info.verification == OptionVerificationType::kDeprecated) {
// log warning when user tries to set a deprecated option but don't fail
// the call for compatibility.
ROCKS_LOG_WARN(info_log, "%s is a deprecated option and cannot be set",
o.first.c_str());
// log warning when user tries to set a deprecated option but don't
// fail the call for compatibility.
ROCKS_LOG_WARN(info_log,
"%s is a deprecated option and cannot be set",
option_name.c_str());
continue;
}
bool is_ok = ParseOptionHelper(
reinterpret_cast<char*>(new_options) + opt_info.mutable_offset,
opt_info.type, o.second);
opt_info.type, option_value);
if (!is_ok) {
return Status::InvalidArgument("Error parsing " + o.first);
return Status::InvalidArgument("Error parsing " + option_name);
}
}
} catch (std::exception& e) {
return Status::InvalidArgument("Error parsing " + o.first + ":" +
return Status::InvalidArgument("Error parsing " + option_name + ":" +
std::string(e.what()));
}
}
@ -929,65 +1015,6 @@ Status StringToMap(const std::string& opts_str,
return Status::OK();
}
Status ParseCompressionOptions(const std::string& value, const std::string& name,
CompressionOptions& compression_opts) {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.window_bits = ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.level = ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
end = value.find(':', start);
compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// enabled is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
}
return Status::OK();
}
Status ParseColumnFamilyOption(const std::string& name,
const std::string& org_value,
ColumnFamilyOptions* new_options,
@ -1986,8 +2013,8 @@ std::unordered_map<std::string, OptionTypeInfo>
false, 0}},
{"bottommost_compression",
{offset_of(&ColumnFamilyOptions::bottommost_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal, false,
0}},
OptionType::kCompressionType, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, bottommost_compression)}},
{kNameComparator,
{offset_of(&ColumnFamilyOptions::comparator), OptionType::kComparator,
OptionVerificationType::kByName, false, 0}},

View File

@ -42,13 +42,14 @@ const char kSpecialChar = 'z';
typedef std::vector<std::pair<size_t, size_t>> OffsetGap;
void FillWithSpecialChar(char* start_ptr, size_t total_size,
const OffsetGap& blacklist) {
const OffsetGap& blacklist,
char special_char = kSpecialChar) {
size_t offset = 0;
for (auto& pair : blacklist) {
std::memset(start_ptr + offset, kSpecialChar, pair.first - offset);
std::memset(start_ptr + offset, special_char, pair.first - offset);
offset = pair.first + pair.second;
}
std::memset(start_ptr + offset, kSpecialChar, total_size - offset);
std::memset(start_ptr + offset, special_char, total_size - offset);
}
int NumUnsetBytes(char* start_ptr, size_t total_size,
@ -71,6 +72,26 @@ int NumUnsetBytes(char* start_ptr, size_t total_size,
return total_unset_bytes_base;
}
// Return true iff two structs are the same except blacklist fields.
bool CompareBytes(char* start_ptr1, char* start_ptr2, size_t total_size,
const OffsetGap& blacklist) {
size_t offset = 0;
for (auto& pair : blacklist) {
for (; offset < pair.first; offset++) {
if (*(start_ptr1 + offset) != *(start_ptr2 + offset)) {
return false;
}
}
offset = pair.first + pair.second;
}
for (; offset < total_size; offset++) {
if (*(start_ptr1 + offset) != *(start_ptr2 + offset)) {
return false;
}
}
return true;
}
// If the test fails, likely a new option is added to BlockBasedTableOptions
// but it cannot be set through GetBlockBasedTableOptionsFromString(), or the
// test is not updated accordingly.
@ -373,6 +394,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
ColumnFamilyOptions* options = new (options_ptr) ColumnFamilyOptions();
FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions),
kColumnFamilyOptionsBlacklist);
// It based on the behavior of compiler that padding bytes are not changed
// when copying the struct. It's prone to failure when compiler behavior
// changes. We verify there is unset bytes to detect the case.
@ -395,8 +417,6 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
// GetColumnFamilyOptionsFromString():
options->rate_limit_delay_max_milliseconds = 33;
options->compaction_options_universal = CompactionOptionsUniversal();
options->compression_opts = CompressionOptions();
options->bottommost_compression_opts = CompressionOptions();
options->hard_rate_limit = 0;
options->soft_rate_limit = 0;
options->purge_redundant_kvs_while_flush = false;
@ -434,6 +454,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_bytes_for_level_multiplier=60;"
"memtable_factory=SkipListFactory;"
"compression=kNoCompression;"
"compression_opts=5:6:7:8:9:true;"
"bottommost_compression_opts=4:5:6:7:8:true;"
"bottommost_compression=kDisableCompressionOption;"
"level0_stop_writes_trigger=33;"
"num_levels=99;"
@ -470,11 +492,58 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
NumUnsetBytes(new_options_ptr, sizeof(ColumnFamilyOptions),
kColumnFamilyOptionsBlacklist));
ColumnFamilyOptions rnd_filled_options = *new_options;
options->~ColumnFamilyOptions();
new_options->~ColumnFamilyOptions();
delete[] options_ptr;
delete[] new_options_ptr;
// Test copying to mutabable and immutable options and copy back the mutable
// part.
const OffsetGap kMutableCFOptionsBlacklist = {
{offset_of(&MutableCFOptions::prefix_extractor),
sizeof(std::shared_ptr<const SliceTransform>)},
{offset_of(&MutableCFOptions::max_bytes_for_level_multiplier_additional),
sizeof(std::vector<int>)},
{offset_of(&MutableCFOptions::max_file_size),
sizeof(std::vector<uint64_t>)},
};
// For all memory used for options, pre-fill every char. Otherwise, the
// padding bytes might be different so that byte-wise comparison doesn't
// general equal results even if objects are equal.
const char kMySpecialChar = 'x';
char* mcfo1_ptr = new char[sizeof(MutableCFOptions)];
FillWithSpecialChar(mcfo1_ptr, sizeof(MutableCFOptions),
kMutableCFOptionsBlacklist, kMySpecialChar);
char* mcfo2_ptr = new char[sizeof(MutableCFOptions)];
FillWithSpecialChar(mcfo2_ptr, sizeof(MutableCFOptions),
kMutableCFOptionsBlacklist, kMySpecialChar);
// A clean column family options is constructed after filling the same special
// char as the initial one. So that the padding bytes are the same.
char* cfo_clean_ptr = new char[sizeof(ColumnFamilyOptions)];
FillWithSpecialChar(cfo_clean_ptr, sizeof(ColumnFamilyOptions),
kColumnFamilyOptionsBlacklist);
rnd_filled_options.num_levels = 66;
ColumnFamilyOptions* cfo_clean = new (cfo_clean_ptr) ColumnFamilyOptions();
MutableCFOptions* mcfo1 =
new (mcfo1_ptr) MutableCFOptions(rnd_filled_options);
ColumnFamilyOptions cfo_back = BuildColumnFamilyOptions(*cfo_clean, *mcfo1);
MutableCFOptions* mcfo2 = new (mcfo2_ptr) MutableCFOptions(cfo_back);
ASSERT_TRUE(CompareBytes(mcfo1_ptr, mcfo2_ptr, sizeof(MutableCFOptions),
kMutableCFOptionsBlacklist));
cfo_clean->~ColumnFamilyOptions();
mcfo1->~MutableCFOptions();
mcfo2->~MutableCFOptions();
delete[] mcfo1_ptr;
delete[] mcfo2_ptr;
delete[] cfo_clean_ptr;
}
#endif // !__clang__
#endif // OS_LINUX || OS_WIN

View File

@ -40,21 +40,21 @@ static inline bool HasJemalloc() { return true; }
// Declare non-standard jemalloc APIs as weak symbols. We can null-check these
// symbols to detect whether jemalloc is linked with the binary.
extern "C" void* mallocx(size_t, int) __attribute__((__weak__));
extern "C" void* rallocx(void*, size_t, int) __attribute__((__weak__));
extern "C" size_t xallocx(void*, size_t, size_t, int) __attribute__((__weak__));
extern "C" size_t sallocx(const void*, int) __attribute__((__weak__));
extern "C" void dallocx(void*, int) __attribute__((__weak__));
extern "C" void sdallocx(void*, size_t, int) __attribute__((__weak__));
extern "C" size_t nallocx(size_t, int) __attribute__((__weak__));
extern "C" void* mallocx(size_t, int) __attribute__((__nothrow__, __weak__));
extern "C" void* rallocx(void*, size_t, int) __attribute__((__nothrow__, __weak__));
extern "C" size_t xallocx(void*, size_t, size_t, int) __attribute__((__nothrow__, __weak__));
extern "C" size_t sallocx(const void*, int) __attribute__((__nothrow__, __weak__));
extern "C" void dallocx(void*, int) __attribute__((__nothrow__, __weak__));
extern "C" void sdallocx(void*, size_t, int) __attribute__((__nothrow__, __weak__));
extern "C" size_t nallocx(size_t, int) __attribute__((__nothrow__, __weak__));
extern "C" int mallctl(const char*, void*, size_t*, void*, size_t)
__attribute__((__weak__));
__attribute__((__nothrow__, __weak__));
extern "C" int mallctlnametomib(const char*, size_t*, size_t*)
__attribute__((__weak__));
__attribute__((__nothrow__, __weak__));
extern "C" int mallctlbymib(const size_t*, size_t, void*, size_t*, void*,
size_t) __attribute__((__weak__));
size_t) __attribute__((__nothrow__, __weak__));
extern "C" void malloc_stats_print(void (*)(void*, const char*), void*,
const char*) __attribute__((__weak__));
const char*) __attribute__((__nothrow__, __weak__));
extern "C" size_t malloc_usable_size(JEMALLOC_USABLE_SIZE_CONST void*)
JEMALLOC_CXX_THROW __attribute__((__weak__));

View File

@ -525,6 +525,9 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
key_.SetKey(Slice(p, non_shared), false /* copy */);
key_pinned_ = true;
} else {
if (global_seqno_ != kDisableGlobalSequenceNumber) {
key_.UpdateInternalKey(stored_seqno_, stored_value_type_);
}
// This key share `shared` bytes with prev key, we need to decode it
key_.TrimAppend(shared, p, non_shared);
key_pinned_ = false;
@ -536,11 +539,12 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
// type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion.
assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0);
ValueType value_type = ExtractValueType(key_.GetKey());
assert(value_type == ValueType::kTypeValue ||
value_type == ValueType::kTypeMerge ||
value_type == ValueType::kTypeDeletion ||
value_type == ValueType::kTypeRangeDeletion);
uint64_t packed = ExtractInternalKeyFooter(key_.GetKey());
UnPackSequenceAndType(packed, &stored_seqno_, &stored_value_type_);
assert(stored_value_type_ == ValueType::kTypeValue ||
stored_value_type_ == ValueType::kTypeMerge ||
stored_value_type_ == ValueType::kTypeDeletion ||
stored_value_type_ == ValueType::kTypeRangeDeletion);
if (key_pinned_) {
// TODO(tec): Investigate updating the seqno in the loaded block
@ -552,7 +556,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
key_pinned_ = false;
}
key_.UpdateInternalKey(global_seqno_, value_type);
key_.UpdateInternalKey(global_seqno_, stored_value_type_);
}
value_ = Slice(p + non_shared, value_length);

View File

@ -319,6 +319,11 @@ class BlockIter : public InternalIteratorBase<TValue> {
// e.g. PinnableSlice, the pointer to the bytes will still be valid.
bool block_contents_pinned_;
SequenceNumber global_seqno_;
// Save the actual sequence before replaced by global seqno, which potentially
// is used as part of prefix of delta encoding.
SequenceNumber stored_seqno_ = 0;
// Save the value type of key_. Used to restore stored_seqno_.
ValueType stored_value_type_ = kMaxValue;
private:
// Store the cache handle, if the block is cached. We need this since the

View File

@ -18,7 +18,6 @@
#include <unordered_map>
#include <utility>
#include "cache/simple_deleter.h"
#include "db/dbformat.h"
#include "index_builder.h"
@ -796,6 +795,11 @@ Status BlockBasedTableBuilder::status() const { return rep_->status; }
IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; }
static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
BlockContents* bc = reinterpret_cast<BlockContents*>(value);
delete bc;
}
//
// Make a copy of the block contents and insert into compressed block cache
//
@ -830,7 +834,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
block_cache_compressed->Insert(
key, block_contents_to_cache,
block_contents_to_cache->ApproximateMemoryUsage(),
SimpleDeleter<BlockContents>::GetInstance());
&DeleteCachedBlockContents);
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->offset), size);

View File

@ -14,8 +14,6 @@
#include <utility>
#include <vector>
#include "cache/simple_deleter.h"
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
@ -181,6 +179,13 @@ Status ReadBlockFromFile(
return s;
}
// Delete the entry resided in the cache.
template <class Entry>
void DeleteCachedEntry(const Slice& /*key*/, void* value) {
auto entry = reinterpret_cast<Entry*>(value);
delete entry;
}
// Release the cached entry and decrement its ref count.
// Do not force erase
void ReleaseCachedEntry(void* arg, void* h) {
@ -1166,8 +1171,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(block_cache_key, block_holder.get(), charge,
SimpleDeleter<TBlocklike>::GetInstance(),
&cache_handle);
&DeleteCachedEntry<TBlocklike>, &cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
block->SetCachedValue(block_holder.release(), block_cache,
@ -1256,7 +1260,7 @@ Status BlockBasedTable::PutDataBlockToCache(
s = block_cache_compressed->Insert(
compressed_block_cache_key, block_cont_for_comp_cache,
block_cont_for_comp_cache->ApproximateMemoryUsage(),
SimpleDeleter<BlockContents>::GetInstance());
&DeleteCachedEntry<BlockContents>);
if (s.ok()) {
// Avoid the following code to delete this cached block.
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
@ -1271,8 +1275,8 @@ Status BlockBasedTable::PutDataBlockToCache(
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(block_cache_key, block_holder.get(), charge,
SimpleDeleter<TBlocklike>::GetInstance(),
&cache_handle, priority);
&DeleteCachedEntry<TBlocklike>, &cache_handle,
priority);
if (s.ok()) {
assert(cache_handle != nullptr);
cached_block->SetCachedValue(block_holder.release(), block_cache,

View File

@ -7,8 +7,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/partitioned_index_reader.h"
#include "cache/simple_deleter.h"
#include "table/block_based/partitioned_index_iterator.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -190,20 +190,21 @@ Status SstFileWriter::Open(const std::string& file_path) {
CompressionType compression_type;
CompressionOptions compression_opts;
if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
compression_type = r->ioptions.bottommost_compression;
if (r->ioptions.bottommost_compression_opts.enabled) {
compression_opts = r->ioptions.bottommost_compression_opts;
if (r->mutable_cf_options.bottommost_compression !=
kDisableCompressionOption) {
compression_type = r->mutable_cf_options.bottommost_compression;
if (r->mutable_cf_options.bottommost_compression_opts.enabled) {
compression_opts = r->mutable_cf_options.bottommost_compression_opts;
} else {
compression_opts = r->ioptions.compression_opts;
compression_opts = r->mutable_cf_options.compression_opts;
}
} else if (!r->ioptions.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());
compression_opts = r->ioptions.compression_opts;
compression_opts = r->mutable_cf_options.compression_opts;
} else {
compression_type = r->mutable_cf_options.compression;
compression_opts = r->ioptions.compression_opts;
compression_opts = r->mutable_cf_options.compression_opts;
}
uint64_t sample_for_compression =
r->mutable_cf_options.sample_for_compression;

View File

@ -344,9 +344,7 @@ TEST_F(LdbCmdTest, DumpFileChecksumCRC32) {
Options opts;
opts.env = env.get();
opts.create_if_missing = true;
FileChecksumGenCrc32cFactory* file_checksum_gen_factory =
new FileChecksumGenCrc32cFactory();
opts.file_checksum_gen_factory.reset(file_checksum_gen_factory);
opts.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
DB* db = nullptr;
std::string dbname = test::TmpDir();

View File

@ -77,4 +77,10 @@ FileChecksumList* NewFileChecksumList() {
return checksum_list;
}
std::shared_ptr<FileChecksumGenFactory> GetFileChecksumGenCrc32cFactory() {
static std::shared_ptr<FileChecksumGenFactory> default_crc32c_gen_factory(
new FileChecksumGenCrc32cFactory());
return default_crc32c_gen_factory;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -168,15 +168,19 @@ class SimCacheImpl : public SimCache {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}
Status Insert(const Slice& key, void* value, size_t charge, Deleter* deleter,
Handle** handle, Priority priority) override {
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle,
Priority priority) override {
// The handle and value passed in are for real cache, so we pass nullptr
// to key_only_cache_ for both instead. Also, the deleter should be invoked
// only once (on the actual value), so we pass nullptr to key_only_cache for
// that one as well.
// to key_only_cache_ for both instead. Also, the deleter function pointer
// will be called by user to perform some external operation which should
// be applied only once. Thus key_only_cache accepts an empty function.
// *Lambda function without capture can be assgined to a function pointer
Handle* h = key_only_cache_->Lookup(key);
if (h == nullptr) {
key_only_cache_->Insert(key, nullptr, charge, nullptr, nullptr, priority);
key_only_cache_->Insert(key, nullptr, charge,
[](const Slice& /*k*/, void* /*v*/) {}, nullptr,
priority);
} else {
key_only_cache_->Release(h);
}

View File

@ -7,6 +7,7 @@
#include "utilities/table_properties_collectors/compact_on_deletion_collector.h"
#include <memory>
#include <sstream>
#include "rocksdb/utilities/table_properties_collectors.h"
namespace ROCKSDB_NAMESPACE {
@ -78,6 +79,13 @@ CompactOnDeletionCollectorFactory::CreateTablePropertiesCollector(
sliding_window_size_.load(), deletion_trigger_.load());
}
std::string CompactOnDeletionCollectorFactory::ToString() const {
std::ostringstream cfg;
cfg << Name() << " (Sliding window size = " << sliding_window_size_.load()
<< " Deletion trigger = " << deletion_trigger_.load() << ')';
return cfg.str();
}
std::shared_ptr<CompactOnDeletionCollectorFactory>
NewCompactOnDeletionCollectorFactory(
size_t sliding_window_size,