SST Partitioner interface that allows to split SST files (#6957)

Summary:
SST Partitioner interface that allows to split SST files during compactions.

It basically instruct compaction to create a new file when needed. When one is using well defined prefixes and prefixed way of defining tables it is good to define also partitioning so that promotion of some SST file does not cover huge key space on next level (worst case complete space).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6957

Reviewed By: ajkr

Differential Revision: D22461239

fbshipit-source-id: 9ce07bba08b3ba89c2d45630520368f704d1316e
This commit is contained in:
Tomas Kolda 2020-07-24 13:43:14 -07:00 committed by Facebook GitHub Bot
parent 954ee56571
commit cd4592c220
26 changed files with 551 additions and 21 deletions

View File

@ -562,6 +562,7 @@ set(SOURCES
db/compaction/compaction_picker_fifo.cc db/compaction/compaction_picker_fifo.cc
db/compaction/compaction_picker_level.cc db/compaction/compaction_picker_level.cc
db/compaction/compaction_picker_universal.cc db/compaction/compaction_picker_universal.cc
db/compaction/sst_partitioner.cc
db/convenience.cc db/convenience.cc
db/db_filesnapshot.cc db/db_filesnapshot.cc
db/db_impl/db_impl.cc db/db_impl/db_impl.cc

View File

@ -30,6 +30,7 @@
* `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kOptionalChecksumAndDbSessionId`. By default, `BackupableDBOptions::share_files_with_checksum_naming` is set to `kOptionalChecksumAndDbSessionId`. In the default case, backup table filenames generated by this version of RocksDB are of the form either `<file_number>_<crc32c>_<db_session_id>.sst` or `<file_number>_<db_session_id>.sst` as opposed to `<file_number>_<crc32c>_<file_size>.sst`. Specifically, table filenames are of the form `<file_number>_<crc32c>_<db_session_id>.sst` if `DBOptions::file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`. Futhermore, the checksum value `<crc32c>` appeared in the filenames is hexadecimal-encoded, instead of being decimal-encoded `uint32_t` value. If `DBOptions::file_checksum_gen_factory` is `nullptr`, the table filenames are of the form `<file_number>_<db_session_id>.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. Moreover, for table files generated prior to this version of RocksDB, using `kOptionalChecksumAndDbSessionId` will fall back on `kChecksumAndFileSize`. In these cases, the checksum value `<crc32c>` in the filenames `<file_number>_<crc32c>_<file_size>.sst` is decimal-encoded `uint32_t` value as before. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true. * `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kOptionalChecksumAndDbSessionId`. By default, `BackupableDBOptions::share_files_with_checksum_naming` is set to `kOptionalChecksumAndDbSessionId`. In the default case, backup table filenames generated by this version of RocksDB are of the form either `<file_number>_<crc32c>_<db_session_id>.sst` or `<file_number>_<db_session_id>.sst` as opposed to `<file_number>_<crc32c>_<file_size>.sst`. Specifically, table filenames are of the form `<file_number>_<crc32c>_<db_session_id>.sst` if `DBOptions::file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`. Futhermore, the checksum value `<crc32c>` appeared in the filenames is hexadecimal-encoded, instead of being decimal-encoded `uint32_t` value. If `DBOptions::file_checksum_gen_factory` is `nullptr`, the table filenames are of the form `<file_number>_<db_session_id>.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. Moreover, for table files generated prior to this version of RocksDB, using `kOptionalChecksumAndDbSessionId` will fall back on `kChecksumAndFileSize`. In these cases, the checksum value `<crc32c>` in the filenames `<file_number>_<crc32c>_<file_size>.sst` is decimal-encoded `uint32_t` value as before. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true.
* Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes. * Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes.
* Option `max_subcompactions` can be set dynamically using DB::SetDBOptions(). * Option `max_subcompactions` can be set dynamically using DB::SetDBOptions().
* Added experimental ColumnFamilyOptions::sst_partitioner_factory to define determine the partitioning of sst files. This helps compaction to split the files on interesting boundaries (key prefixes) to make propagation of sst files less write amplifying (covering the whole key space).
### Bug Fixes ### Bug Fixes
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. * Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.

View File

@ -138,6 +138,7 @@ cpp_library(
"db/compaction/compaction_picker_fifo.cc", "db/compaction/compaction_picker_fifo.cc",
"db/compaction/compaction_picker_level.cc", "db/compaction/compaction_picker_level.cc",
"db/compaction/compaction_picker_universal.cc", "db/compaction/compaction_picker_universal.cc",
"db/compaction/sst_partitioner.cc",
"db/convenience.cc", "db/convenience.cc",
"db/db_filesnapshot.cc", "db/db_filesnapshot.cc",
"db/db_impl/db_impl.cc", "db/db_impl/db_impl.cc",

View File

@ -7,12 +7,14 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction.h"
#include <cinttypes> #include <cinttypes>
#include <vector> #include <vector>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/compaction/compaction.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/sst_partitioner.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -329,6 +331,8 @@ bool Compaction::IsTrivialMove() const {
// assert inputs_.size() == 1 // assert inputs_.size() == 1
std::unique_ptr<SstPartitioner> partitioner = CreateSstPartitioner();
for (const auto& file : inputs_.front().files) { for (const auto& file : inputs_.front().files) {
std::vector<FileMetaData*> file_grand_parents; std::vector<FileMetaData*> file_grand_parents;
if (output_level_ + 1 >= number_levels_) { if (output_level_ + 1 >= number_levels_) {
@ -341,6 +345,13 @@ bool Compaction::IsTrivialMove() const {
if (compaction_size > max_compaction_bytes_) { if (compaction_size > max_compaction_bytes_) {
return false; return false;
} }
if (partitioner.get() != nullptr) {
if (!partitioner->CanDoTrivialMove(file->smallest.user_key(),
file->largest.user_key())) {
return false;
}
}
} }
return true; return true;
@ -526,6 +537,21 @@ std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
context); context);
} }
std::unique_ptr<SstPartitioner> Compaction::CreateSstPartitioner() const {
if (!immutable_cf_options_.sst_partitioner_factory) {
return nullptr;
}
SstPartitioner::Context context;
context.is_full_compaction = is_full_compaction_;
context.is_manual_compaction = is_manual_compaction_;
context.output_level = output_level_;
context.smallest_user_key = smallest_user_key_;
context.largest_user_key = largest_user_key_;
return immutable_cf_options_.sst_partitioner_factory->CreatePartitioner(
context);
}
bool Compaction::IsOutputLevelEmpty() const { bool Compaction::IsOutputLevelEmpty() const {
return inputs_.back().level != output_level_ || inputs_.back().empty(); return inputs_.back().level != output_level_ || inputs_.back().empty();
} }

View File

@ -11,6 +11,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "memory/arena.h" #include "memory/arena.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/sst_partitioner.h"
#include "util/autovector.h" #include "util/autovector.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -256,6 +257,9 @@ class Compaction {
// Create a CompactionFilter from compaction_filter_factory // Create a CompactionFilter from compaction_filter_factory
std::unique_ptr<CompactionFilter> CreateCompactionFilter() const; std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
// Create a SstPartitioner from sst_partitioner_factory
std::unique_ptr<SstPartitioner> CreateSstPartitioner() const;
// Is the input level corresponding to output_level_ empty? // Is the input level corresponding to output_level_ empty?
bool IsOutputLevelEmpty() const; bool IsOutputLevelEmpty() const;

View File

@ -46,6 +46,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
@ -949,6 +950,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
} }
const auto& c_iter_stats = c_iter->iter_stats(); const auto& c_iter_stats = c_iter->iter_stats();
std::unique_ptr<SstPartitioner> partitioner =
sub_compact->compaction->output_level() == 0
? nullptr
: sub_compact->compaction->CreateSstPartitioner();
std::string last_key_for_partitioner;
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true. // returns true.
@ -1006,20 +1013,29 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
"CompactionJob::Run():PausingManualCompaction:2", "CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>( reinterpret_cast<void*>(
const_cast<std::atomic<bool>*>(manual_compaction_paused_))); const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
}
c_iter->Next(); c_iter->Next();
if (c_iter->status().IsManualCompactionPaused()) { if (c_iter->status().IsManualCompactionPaused()) {
break; break;
} }
if (!output_file_ended && c_iter->Valid() && if (!output_file_ended && c_iter->Valid()) {
sub_compact->compaction->output_level() != 0 && if (((partitioner.get() &&
sub_compact->ShouldStopBefore(c_iter->key(), partitioner->ShouldPartition(PartitionerRequest(
sub_compact->current_output_file_size) && last_key_for_partitioner, c_iter->user_key(),
sub_compact->builder != nullptr) { sub_compact->current_output_file_size)) == kRequired) ||
// (2) this key belongs to the next file. For historical reasons, the (sub_compact->compaction->output_level() != 0 &&
// iterator status after advancing will be given to sub_compact->ShouldStopBefore(
// FinishCompactionOutputFile(). c_iter->key(), sub_compact->current_output_file_size))) &&
input_status = input->status(); sub_compact->builder != nullptr) {
output_file_ended = true; // (2) this key belongs to the next file. For historical reasons, the
// iterator status after advancing will be given to
// FinishCompactionOutputFile().
input_status = input->status();
output_file_ended = true;
}
} }
if (output_file_ended) { if (output_file_ended) {
const Slice* next_key = nullptr; const Slice* next_key = nullptr;

View File

@ -1665,6 +1665,32 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOn) {
ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_TRUE(compaction->IsTrivialMove());
} }
TEST_F(CompactionPickerTest, IsTrivialMoveOffSstPartitioned) {
mutable_cf_options_.max_bytes_for_level_base = 10000u;
mutable_cf_options_.max_compaction_bytes = 10001u;
ioptions_.level_compaction_dynamic_level_bytes = false;
ioptions_.sst_partitioner_factory = NewSstPartitionerFixedPrefixFactory(1);
NewVersionStorage(6, kCompactionStyleLevel);
// A compaction should be triggered and pick file 2
Add(1, 1U, "100", "150", 3000U);
Add(1, 2U, "151", "200", 3001U);
Add(1, 3U, "201", "250", 3000U);
Add(1, 4U, "251", "300", 3000U);
Add(3, 5U, "120", "130", 7000U);
Add(3, 6U, "170", "180", 7000U);
Add(3, 7U, "220", "230", 7000U);
Add(3, 8U, "270", "280", 7000U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
// No trivial move, because partitioning is applied
ASSERT_TRUE(!compaction->IsTrivialMove());
}
TEST_F(CompactionPickerTest, IsTrivialMoveOff) { TEST_F(CompactionPickerTest, IsTrivialMoveOff) {
mutable_cf_options_.max_bytes_for_level_base = 1000000u; mutable_cf_options_.max_bytes_for_level_base = 1000000u;
mutable_cf_options_.max_compaction_bytes = 10000u; mutable_cf_options_.max_compaction_bytes = 10000u;

View File

@ -0,0 +1,44 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "rocksdb/sst_partitioner.h"
#include <algorithm>
namespace ROCKSDB_NAMESPACE {
PartitionerResult SstPartitionerFixedPrefix::ShouldPartition(
const PartitionerRequest& request) {
Slice last_key_fixed(*request.prev_user_key);
if (last_key_fixed.size() > len_) {
last_key_fixed.size_ = len_;
}
Slice current_key_fixed(*request.current_user_key);
if (current_key_fixed.size() > len_) {
current_key_fixed.size_ = len_;
}
return last_key_fixed.compare(current_key_fixed) != 0 ? kRequired
: kNotRequired;
}
bool SstPartitionerFixedPrefix::CanDoTrivialMove(
const Slice& smallest_user_key, const Slice& largest_user_key) {
return ShouldPartition(PartitionerRequest(smallest_user_key, largest_user_key,
0)) == kNotRequired;
}
std::unique_ptr<SstPartitioner>
SstPartitionerFixedPrefixFactory::CreatePartitioner(
const SstPartitioner::Context& /* context */) const {
return std::unique_ptr<SstPartitioner>(new SstPartitionerFixedPrefix(len_));
}
std::shared_ptr<SstPartitionerFactory> NewSstPartitionerFixedPrefixFactory(
size_t prefix_len) {
return std::make_shared<SstPartitionerFixedPrefixFactory>(prefix_len);
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -977,6 +977,60 @@ TEST_F(DBCompactionTest, UserKeyCrossFile2) {
ASSERT_EQ("NOT_FOUND", Get("3")); ASSERT_EQ("NOT_FOUND", Get("3"));
} }
TEST_F(DBCompactionTest, CompactionSstPartitioner) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.level0_file_num_compaction_trigger = 3;
std::shared_ptr<SstPartitionerFactory> factory(
NewSstPartitionerFixedPrefixFactory(4));
options.sst_partitioner_factory = factory;
DestroyAndReopen(options);
// create first file and flush to l0
Put("aaaa1", "A");
Put("bbbb1", "B");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
Put("aaaa1", "A2");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
// move both files down to l1
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
std::vector<LiveFileMetaData> files;
dbfull()->GetLiveFilesMetaData(&files);
ASSERT_EQ(2, files.size());
ASSERT_EQ("A2", Get("aaaa1"));
ASSERT_EQ("B", Get("bbbb1"));
}
TEST_F(DBCompactionTest, CompactionSstPartitionerNonTrivial) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.level0_file_num_compaction_trigger = 1;
std::shared_ptr<SstPartitionerFactory> factory(
NewSstPartitionerFixedPrefixFactory(4));
options.sst_partitioner_factory = factory;
DestroyAndReopen(options);
// create first file and flush to l0
Put("aaaa1", "A");
Put("bbbb1", "B");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(true);
std::vector<LiveFileMetaData> files;
dbfull()->GetLiveFilesMetaData(&files);
ASSERT_EQ(2, files.size());
ASSERT_EQ("A", Get("aaaa1"));
ASSERT_EQ("B", Get("bbbb1"));
}
TEST_F(DBCompactionTest, ZeroSeqIdCompaction) { TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel; options.compaction_style = kCompactionStyleLevel;

View File

@ -10,6 +10,7 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <string> #include <string>
@ -21,6 +22,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
#include "rocksdb/listener.h" #include "rocksdb/listener.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/universal_compaction.h" #include "rocksdb/universal_compaction.h"
#include "rocksdb/version.h" #include "rocksdb/version.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
@ -308,6 +310,15 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: nullptr // Default: nullptr
std::shared_ptr<ConcurrentTaskLimiter> compaction_thread_limiter = nullptr; std::shared_ptr<ConcurrentTaskLimiter> compaction_thread_limiter = nullptr;
// If non-nullptr, use the specified factory for a function to determine the
// partitioning of sst files. This helps compaction to split the files
// on interesting boundaries (key prefixes) to make propagation of sst
// files less write amplifying (covering the whole key space).
// THE FEATURE IS STILL EXPERIMENTAL
//
// Default: nullptr
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;
// Create ColumnFamilyOptions with default values for all fields // Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions(); ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options // Create ColumnFamilyOptions from Options

View File

@ -0,0 +1,135 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <memory>
#include <string>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
class Slice;
enum PartitionerResult : char {
// Partitioner does not require to create new file
kNotRequired = 0x0,
// Partitioner is requesting forcefully to create new file
kRequired = 0x1
// Additional constants can be added
};
struct PartitionerRequest {
PartitionerRequest(const Slice& prev_user_key_,
const Slice& current_user_key_,
uint64_t current_output_file_size_)
: prev_user_key(&prev_user_key_),
current_user_key(&current_user_key_),
current_output_file_size(current_output_file_size_) {}
const Slice* prev_user_key;
const Slice* current_user_key;
uint64_t current_output_file_size;
};
/*
* A SstPartitioner is a generic pluggable way of defining the partition
* of SST files. Compaction job will split the SST files on partition boundary
* to lower the write amplification during SST file promote to higher level.
*/
class SstPartitioner {
public:
virtual ~SstPartitioner() {}
// Return the name of this partitioner.
virtual const char* Name() const = 0;
// It is called for all keys in compaction. When partitioner want to create
// new SST file it needs to return true. It means compaction job will finish
// current SST file where last key is "prev_user_key" parameter and start new
// SST file where first key is "current_user_key". Returns decission if
// partition boundary was detected and compaction should create new file.
virtual PartitionerResult ShouldPartition(
const PartitionerRequest& request) = 0;
// Called with smallest and largest keys in SST file when compation try to do
// trivial move. Returns true is partitioner allows to do trivial move.
virtual bool CanDoTrivialMove(const Slice& smallest_user_key,
const Slice& largest_user_key) = 0;
// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
bool is_full_compaction;
// Is this compaction requested by the client (true),
// or is it occurring as an automatic compaction process
bool is_manual_compaction;
// Output level for this compaction
int output_level;
// Smallest key for compaction
Slice smallest_user_key;
// Largest key for compaction
Slice largest_user_key;
};
};
class SstPartitionerFactory {
public:
virtual ~SstPartitionerFactory() {}
virtual std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& context) const = 0;
// Returns a name that identifies this partitioner factory.
virtual const char* Name() const = 0;
};
/*
* Fixed key prefix partitioner. It splits the output SST files when prefix
* defined by size changes.
*/
class SstPartitionerFixedPrefix : public SstPartitioner {
public:
explicit SstPartitionerFixedPrefix(size_t len) : len_(len) {}
virtual ~SstPartitionerFixedPrefix() override {}
const char* Name() const override { return "SstPartitionerFixedPrefix"; }
PartitionerResult ShouldPartition(const PartitionerRequest& request) override;
bool CanDoTrivialMove(const Slice& smallest_user_key,
const Slice& largest_user_key) override;
private:
size_t len_;
};
/*
* Factory for fixed prefix partitioner.
*/
class SstPartitionerFixedPrefixFactory : public SstPartitionerFactory {
public:
explicit SstPartitionerFixedPrefixFactory(size_t len) : len_(len) {}
virtual ~SstPartitionerFixedPrefixFactory() {}
const char* Name() const override {
return "SstPartitionerFixedPrefixFactory";
}
std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& /* context */) const override;
private:
size_t len_;
};
extern std::shared_ptr<SstPartitionerFactory>
NewSstPartitionerFixedPrefixFactory(size_t prefix_len);
} // namespace ROCKSDB_NAMESPACE

View File

@ -56,6 +56,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/sst_file_writerjni.cc rocksjni/sst_file_writerjni.cc
rocksjni/sst_file_readerjni.cc rocksjni/sst_file_readerjni.cc
rocksjni/sst_file_reader_iterator.cc rocksjni/sst_file_reader_iterator.cc
rocksjni/sst_partitioner.cc
rocksjni/statistics.cc rocksjni/statistics.cc
rocksjni/statisticsjni.cc rocksjni/statisticsjni.cc
rocksjni/table.cc rocksjni/table.cc
@ -201,9 +202,11 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/Snapshot.java src/main/java/org/rocksdb/Snapshot.java
src/main/java/org/rocksdb/SstFileManager.java src/main/java/org/rocksdb/SstFileManager.java
src/main/java/org/rocksdb/SstFileMetaData.java src/main/java/org/rocksdb/SstFileMetaData.java
src/main/java/org/rocksdb/SstFileWriter.java
src/main/java/org/rocksdb/SstFileReader.java src/main/java/org/rocksdb/SstFileReader.java
src/main/java/org/rocksdb/SstFileReaderIterator.java src/main/java/org/rocksdb/SstFileReaderIterator.java
src/main/java/org/rocksdb/SstFileWriter.java
src/main/java/org/rocksdb/SstPartitionerFactory.java
src/main/java/org/rocksdb/SstPartitionerFixedPrefixFactory.java
src/main/java/org/rocksdb/StateType.java src/main/java/org/rocksdb/StateType.java
src/main/java/org/rocksdb/StatisticsCollectorCallback.java src/main/java/org/rocksdb/StatisticsCollectorCallback.java
src/main/java/org/rocksdb/StatisticsCollector.java src/main/java/org/rocksdb/StatisticsCollector.java
@ -452,6 +455,8 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7"
org.rocksdb.SstFileWriter org.rocksdb.SstFileWriter
org.rocksdb.SstFileReader org.rocksdb.SstFileReader
org.rocksdb.SstFileReaderIterator org.rocksdb.SstFileReaderIterator
org.rocksdb.SstPartitionerFactory
org.rocksdb.SstPartitionerFixedPrefixFactory
org.rocksdb.Statistics org.rocksdb.Statistics
org.rocksdb.StringAppendOperator org.rocksdb.StringAppendOperator
org.rocksdb.TableFormatConfig org.rocksdb.TableFormatConfig

View File

@ -63,6 +63,8 @@ NATIVE_JAVA_CLASSES = \
org.rocksdb.SstFileWriter\ org.rocksdb.SstFileWriter\
org.rocksdb.SstFileReader\ org.rocksdb.SstFileReader\
org.rocksdb.SstFileReaderIterator\ org.rocksdb.SstFileReaderIterator\
org.rocksdb.SstPartitionerFactory\
org.rocksdb.SstPartitionerFixedPrefixFactory\
org.rocksdb.Statistics\ org.rocksdb.Statistics\
org.rocksdb.ThreadStatus\ org.rocksdb.ThreadStatus\
org.rocksdb.TimedEnv\ org.rocksdb.TimedEnv\
@ -165,6 +167,7 @@ JAVA_TESTS = \
org.rocksdb.SstFileManagerTest\ org.rocksdb.SstFileManagerTest\
org.rocksdb.SstFileWriterTest\ org.rocksdb.SstFileWriterTest\
org.rocksdb.SstFileReaderTest\ org.rocksdb.SstFileReaderTest\
org.rocksdb.SstPartitionerTest\
org.rocksdb.TableFilterTest\ org.rocksdb.TableFilterTest\
org.rocksdb.TimedEnvTest\ org.rocksdb.TimedEnvTest\
org.rocksdb.TransactionTest\ org.rocksdb.TransactionTest\

View File

@ -6,9 +6,12 @@
// This file implements the "bridge" between Java and C++ for // This file implements the "bridge" between Java and C++ for
// ROCKSDB_NAMESPACE::Options. // ROCKSDB_NAMESPACE::Options.
#include "rocksdb/options.h"
#include <jni.h> #include <jni.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <memory> #include <memory>
#include <vector> #include <vector>
@ -19,22 +22,20 @@
#include "include/org_rocksdb_Options.h" #include "include/org_rocksdb_Options.h"
#include "include/org_rocksdb_ReadOptions.h" #include "include/org_rocksdb_ReadOptions.h"
#include "include/org_rocksdb_WriteOptions.h" #include "include/org_rocksdb_WriteOptions.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/portal.h"
#include "rocksjni/statisticsjni.h"
#include "rocksjni/table_filter_jnicallback.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
#include "rocksdb/options.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/portal.h"
#include "rocksjni/statisticsjni.h"
#include "rocksjni/table_filter_jnicallback.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
/* /*
@ -1130,6 +1131,20 @@ void Java_org_rocksdb_Options_setTableFactory(
options->table_factory.reset(table_factory); options->table_factory.reset(table_factory);
} }
/*
* Method: setSstPartitionerFactory
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setSstPartitionerFactory(JNIEnv*, jobject,
jlong jhandle,
jlong factory_handle) {
auto* options = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
auto factory = reinterpret_cast<
std::shared_ptr<ROCKSDB_NAMESPACE::SstPartitionerFactory>*>(
factory_handle);
options->sst_partitioner_factory = *factory;
}
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
* Method: allowMmapReads * Method: allowMmapReads
@ -3621,6 +3636,19 @@ void Java_org_rocksdb_ColumnFamilyOptions_setTableFactory(
reinterpret_cast<ROCKSDB_NAMESPACE::TableFactory*>(jfactory_handle)); reinterpret_cast<ROCKSDB_NAMESPACE::TableFactory*>(jfactory_handle));
} }
/*
* Method: setSstPartitionerFactory
* Signature: (JJ)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setSstPartitionerFactory(
JNIEnv*, jobject, jlong jhandle, jlong factory_handle) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
auto* factory = reinterpret_cast<ROCKSDB_NAMESPACE::SstPartitionerFactory*>(
factory_handle);
options->sst_partitioner_factory.reset(factory);
}
/* /*
* Method: tableFactoryName * Method: tableFactoryName
* Signature: (J)Ljava/lang/String * Signature: (J)Ljava/lang/String

View File

@ -0,0 +1,42 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the "bridge" between Java and C++ and enables
// calling C++ ROCKSDB_NAMESPACE::SstFileManager methods
// from Java side.
#include "rocksdb/sst_partitioner.h"
#include <jni.h>
#include <memory>
#include "include/org_rocksdb_SstPartitionerFixedPrefixFactory.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksjni/portal.h"
/*
* Class: org_rocksdb_SstPartitionerFixedPrefixFactory
* Method: newSstPartitionerFixedPrefixFactory0
* Signature: (J)J
*/
jlong Java_org_rocksdb_SstPartitionerFixedPrefixFactory_newSstPartitionerFixedPrefixFactory0(
JNIEnv*, jclass, jlong prefix_len) {
auto* ptr = new std::shared_ptr<ROCKSDB_NAMESPACE::SstPartitionerFactory>(
ROCKSDB_NAMESPACE::NewSstPartitionerFixedPrefixFactory(prefix_len));
return reinterpret_cast<jlong>(ptr);
}
/*
* Class: org_rocksdb_SstPartitionerFixedPrefixFactory
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_SstPartitionerFixedPrefixFactory_disposeInternal(
JNIEnv*, jobject, jlong jhandle) {
auto* ptr = reinterpret_cast<
std::shared_ptr<ROCKSDB_NAMESPACE::SstPartitionerFactory>*>(jhandle);
delete ptr; // delete std::shared_ptr
}

View File

@ -844,6 +844,18 @@ public class ColumnFamilyOptions extends RocksObject
return forceConsistencyChecks(nativeHandle_); return forceConsistencyChecks(nativeHandle_);
} }
@Override
public ColumnFamilyOptions setSstPartitionerFactory(SstPartitionerFactory sstPartitionerFactory) {
setSstPartitionerFactory(nativeHandle_, sstPartitionerFactory.nativeHandle_);
this.sstPartitionerFactory_ = sstPartitionerFactory;
return this;
}
@Override
public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}
private static native long getColumnFamilyOptionsFromProps( private static native long getColumnFamilyOptionsFromProps(
final long cfgHandle, String optString); final long cfgHandle, String optString);
private static native long getColumnFamilyOptionsFromProps(final String optString); private static native long getColumnFamilyOptionsFromProps(final String optString);
@ -1005,6 +1017,7 @@ public class ColumnFamilyOptions extends RocksObject
private native void setForceConsistencyChecks(final long handle, private native void setForceConsistencyChecks(final long handle,
final boolean forceConsistencyChecks); final boolean forceConsistencyChecks);
private native boolean forceConsistencyChecks(final long handle); private native boolean forceConsistencyChecks(final long handle);
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
// instance variables // instance variables
// NOTE: If you add new member variables, please update the copy constructor above! // NOTE: If you add new member variables, please update the copy constructor above!
@ -1018,5 +1031,5 @@ public class ColumnFamilyOptions extends RocksObject
private CompactionOptionsFIFO compactionOptionsFIFO_; private CompactionOptionsFIFO compactionOptionsFIFO_;
private CompressionOptions bottommostCompressionOptions_; private CompressionOptions bottommostCompressionOptions_;
private CompressionOptions compressionOptions_; private CompressionOptions compressionOptions_;
private SstPartitionerFactory sstPartitionerFactory_;
} }

View File

@ -437,6 +437,23 @@ public interface ColumnFamilyOptionsInterface<T extends ColumnFamilyOptionsInter
*/ */
CompressionOptions compressionOptions(); CompressionOptions compressionOptions();
/**
* If non-nullptr, use the specified factory for a function to determine the
* partitioning of sst files. This helps compaction to split the files
* on interesting boundaries (key prefixes) to make propagation of sst
* files less write amplifying (covering the whole key space).
* @param factory The factory reference
* @return the reference of the current options.
*/
T setSstPartitionerFactory(SstPartitionerFactory factory);
/**
* Get SST partitioner factory
*
* @return SST partitioner factory
*/
SstPartitionerFactory sstPartitionerFactory();
/** /**
* Default memtable memory budget used with the following methods: * Default memtable memory budget used with the following methods:
* *

View File

@ -1808,6 +1808,18 @@ public class Options extends RocksObject
return atomicFlush(nativeHandle_); return atomicFlush(nativeHandle_);
} }
@Override
public Options setSstPartitionerFactory(SstPartitionerFactory sstPartitionerFactory) {
setSstPartitionerFactory(nativeHandle_, sstPartitionerFactory.nativeHandle_);
this.sstPartitionerFactory_ = sstPartitionerFactory;
return this;
}
@Override
public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}
private native static long newOptions(); private native static long newOptions();
private native static long newOptions(long dbOptHandle, private native static long newOptions(long dbOptHandle,
long cfOptHandle); long cfOptHandle);
@ -2178,6 +2190,7 @@ public class Options extends RocksObject
private native void setAtomicFlush(final long handle, private native void setAtomicFlush(final long handle,
final boolean atomicFlush); final boolean atomicFlush);
private native boolean atomicFlush(final long handle); private native boolean atomicFlush(final long handle);
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
// instance variables // instance variables
// NOTE: If you add new member variables, please update the copy constructor above! // NOTE: If you add new member variables, please update the copy constructor above!
@ -2196,4 +2209,5 @@ public class Options extends RocksObject
private Cache rowCache_; private Cache rowCache_;
private WalFilter walFilter_; private WalFilter walFilter_;
private WriteBufferManager writeBufferManager_; private WriteBufferManager writeBufferManager_;
private SstPartitionerFactory sstPartitionerFactory_;
} }

View File

@ -0,0 +1,15 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* Handle to factory for SstPartitioner. It is used in {@link ColumnFamilyOptions}
*/
public abstract class SstPartitionerFactory extends RocksObject {
protected SstPartitionerFactory(final long nativeHandle) {
super(nativeHandle);
}
}

View File

@ -0,0 +1,19 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* Fixed prefix factory. It partitions SST files using fixed prefix of the key.
*/
public class SstPartitionerFixedPrefixFactory extends SstPartitionerFactory {
public SstPartitionerFixedPrefixFactory(long prefixLength) {
super(newSstPartitionerFixedPrefixFactory0(prefixLength));
}
private native static long newSstPartitionerFixedPrefixFactory0(long prefixLength);
@Override protected final native void disposeInternal(final long handle);
}

View File

@ -0,0 +1,43 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class SstPartitionerTest {
@ClassRule
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
new RocksNativeLibraryResource();
@Rule public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void sstFixedPrefix() throws InterruptedException, RocksDBException {
try (SstPartitionerFixedPrefixFactory factory = new SstPartitionerFixedPrefixFactory(4);
final Options opt =
new Options().setCreateIfMissing(true).setSstPartitionerFactory(factory);
final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) {
// writing (long)100 under key
db.put("aaaa1".getBytes(), "A".getBytes());
db.put("bbbb1".getBytes(), "B".getBytes());
db.flush(new FlushOptions());
db.put("aaaa1".getBytes(), "A2".getBytes());
db.flush(new FlushOptions());
db.compactRange();
List<LiveFileMetaData> metadata = db.getLiveFilesMetaData();
assertThat(metadata.size()).isEqualTo(2);
}
}
}

View File

@ -763,7 +763,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
cf_options.memtable_insert_with_hint_prefix_extractor.get()), cf_options.memtable_insert_with_hint_prefix_extractor.get()),
cf_paths(cf_options.cf_paths), cf_paths(cf_options.cf_paths),
compaction_thread_limiter(cf_options.compaction_thread_limiter), compaction_thread_limiter(cf_options.compaction_thread_limiter),
file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()) {} file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()),
sst_partitioner_factory(cf_options.sst_partitioner_factory) {}
// Multiple two operands. If they overflow, return op1. // Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) {

View File

@ -119,6 +119,8 @@ struct ImmutableCFOptions {
std::shared_ptr<ConcurrentTaskLimiter> compaction_thread_limiter; std::shared_ptr<ConcurrentTaskLimiter> compaction_thread_limiter;
FileChecksumGenFactory* file_checksum_gen_factory; FileChecksumGenFactory* file_checksum_gen_factory;
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory;
}; };
struct MutableCFOptions { struct MutableCFOptions {

View File

@ -24,6 +24,7 @@
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/sst_file_manager.h" #include "rocksdb/sst_file_manager.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"
@ -122,6 +123,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " Options.compaction_filter_factory: %s", log, " Options.compaction_filter_factory: %s",
compaction_filter_factory ? compaction_filter_factory->Name() : "None"); compaction_filter_factory ? compaction_filter_factory->Name() : "None");
ROCKS_LOG_HEADER(
log, " Options.sst_partitioner_factory: %s",
sst_partitioner_factory ? sst_partitioner_factory->Name() : "None");
ROCKS_LOG_HEADER(log, " Options.memtable_factory: %s", ROCKS_LOG_HEADER(log, " Options.memtable_factory: %s",
memtable_factory->Name()); memtable_factory->Name());
ROCKS_LOG_HEADER(log, " Options.table_factory: %s", ROCKS_LOG_HEADER(log, " Options.table_factory: %s",

View File

@ -387,6 +387,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
{offset_of(&ColumnFamilyOptions::cf_paths), sizeof(std::vector<DbPath>)}, {offset_of(&ColumnFamilyOptions::cf_paths), sizeof(std::vector<DbPath>)},
{offset_of(&ColumnFamilyOptions::compaction_thread_limiter), {offset_of(&ColumnFamilyOptions::compaction_thread_limiter),
sizeof(std::shared_ptr<ConcurrentTaskLimiter>)}, sizeof(std::shared_ptr<ConcurrentTaskLimiter>)},
{offset_of(&ColumnFamilyOptions::sst_partitioner_factory),
sizeof(std::shared_ptr<SstPartitionerFactory>)},
}; };
char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; char* options_ptr = new char[sizeof(ColumnFamilyOptions)];
@ -425,6 +427,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
options->purge_redundant_kvs_while_flush = false; options->purge_redundant_kvs_while_flush = false;
options->max_mem_compaction_level = 0; options->max_mem_compaction_level = 0;
options->compaction_filter = nullptr; options->compaction_filter = nullptr;
options->sst_partitioner_factory = nullptr;
char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)]; char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)];
ColumnFamilyOptions* new_options = ColumnFamilyOptions* new_options =

2
src.mk
View File

@ -22,6 +22,7 @@ LIB_SOURCES = \
db/compaction/compaction_picker_fifo.cc \ db/compaction/compaction_picker_fifo.cc \
db/compaction/compaction_picker_level.cc \ db/compaction/compaction_picker_level.cc \
db/compaction/compaction_picker_universal.cc \ db/compaction/compaction_picker_universal.cc \
db/compaction/sst_partitioner.cc \
db/convenience.cc \ db/convenience.cc \
db/db_filesnapshot.cc \ db/db_filesnapshot.cc \
db/db_impl/db_impl.cc \ db/db_impl/db_impl.cc \
@ -554,6 +555,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/sst_file_writerjni.cc \ java/rocksjni/sst_file_writerjni.cc \
java/rocksjni/sst_file_readerjni.cc \ java/rocksjni/sst_file_readerjni.cc \
java/rocksjni/sst_file_reader_iterator.cc \ java/rocksjni/sst_file_reader_iterator.cc \
java/rocksjni/sst_partitioner.cc \
java/rocksjni/statistics.cc \ java/rocksjni/statistics.cc \
java/rocksjni/statisticsjni.cc \ java/rocksjni/statisticsjni.cc \
java/rocksjni/table.cc \ java/rocksjni/table.cc \