rocksdb/java/rocksjni/compression_options.cc
Andrew Kryczka d904233d2f Limit buffering for collecting samples for compression dictionary (#7970)
Summary:
For dictionary compression, we need to collect some representative samples of the data to be compressed, which we use to either generate or train (when `CompressionOptions::zstd_max_train_bytes > 0`) a dictionary. Previously, the strategy was to buffer all the data blocks during flush, and up to the target file size during compaction. That strategy allowed us to randomly pick samples from as wide a range as possible that'd be guaranteed to land in a single output file.

However, some users try to make huge files in memory-constrained environments, where this strategy can cause OOM. This PR introduces an option, `CompressionOptions::max_dict_buffer_bytes`, that limits how much data blocks are buffered before we switch to unbuffered mode (which means creating the per-SST dictionary, writing out the buffered data, and compressing/writing new blocks as soon as they are built). It is not strict as we currently buffer more than just data blocks -- also keys are buffered. But it does make a step towards giving users predictable memory usage.

Related changes include:

- Changed sampling for dictionary compression to select unique data blocks when there is limited availability of data blocks
- Made use of `BlockBuilder::SwapAndReset()` to save an allocation+memcpy when buffering data blocks for building a dictionary
- Changed `ParseBoolean()` to accept an input containing characters after the boolean. This is necessary since, with this PR, a value for `CompressionOptions::enabled` is no longer necessarily the final component in the `CompressionOptions` string.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7970

Test Plan:
- updated `CompressionOptions` unit tests to verify limit is respected (to the extent expected in the current implementation) in various scenarios of flush/compaction to bottommost/non-bottommost level
- looked at jemalloc heap profiles right before and after switching to unbuffered mode during flush/compaction. Verified memory usage in buffering is proportional to the limit set.

Reviewed By: pdillinger

Differential Revision: D26467994

Pulled By: ajkr

fbshipit-source-id: 3da4ef9fba59974e4ef40e40c01611002c861465
2021-02-19 14:09:54 -08:00

186 lines
5.7 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the "bridge" between Java and C++ for
// ROCKSDB_NAMESPACE::CompressionOptions.
#include <jni.h>
#include "include/org_rocksdb_CompressionOptions.h"
#include "rocksdb/advanced_options.h"
/*
* Class: org_rocksdb_CompressionOptions
* Method: newCompressionOptions
* Signature: ()J
*/
jlong Java_org_rocksdb_CompressionOptions_newCompressionOptions(
JNIEnv*, jclass) {
const auto* opt = new ROCKSDB_NAMESPACE::CompressionOptions();
return reinterpret_cast<jlong>(opt);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setWindowBits
* Signature: (JI)V
*/
void Java_org_rocksdb_CompressionOptions_setWindowBits(
JNIEnv*, jobject, jlong jhandle, jint jwindow_bits) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->window_bits = static_cast<int>(jwindow_bits);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: windowBits
* Signature: (J)I
*/
jint Java_org_rocksdb_CompressionOptions_windowBits(
JNIEnv*, jobject, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jint>(opt->window_bits);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setLevel
* Signature: (JI)V
*/
void Java_org_rocksdb_CompressionOptions_setLevel(
JNIEnv*, jobject, jlong jhandle, jint jlevel) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->level = static_cast<int>(jlevel);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: level
* Signature: (J)I
*/
jint Java_org_rocksdb_CompressionOptions_level(
JNIEnv*, jobject, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jint>(opt->level);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setStrategy
* Signature: (JI)V
*/
void Java_org_rocksdb_CompressionOptions_setStrategy(
JNIEnv*, jobject, jlong jhandle, jint jstrategy) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->strategy = static_cast<int>(jstrategy);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: strategy
* Signature: (J)I
*/
jint Java_org_rocksdb_CompressionOptions_strategy(
JNIEnv*, jobject, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jint>(opt->strategy);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setMaxDictBytes
* Signature: (JI)V
*/
void Java_org_rocksdb_CompressionOptions_setMaxDictBytes(
JNIEnv*, jobject, jlong jhandle, jint jmax_dict_bytes) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->max_dict_bytes = static_cast<uint32_t>(jmax_dict_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: maxDictBytes
* Signature: (J)I
*/
jint Java_org_rocksdb_CompressionOptions_maxDictBytes(
JNIEnv*, jobject, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jint>(opt->max_dict_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setZstdMaxTrainBytes
* Signature: (JI)V
*/
void Java_org_rocksdb_CompressionOptions_setZstdMaxTrainBytes(
JNIEnv*, jobject, jlong jhandle, jint jzstd_max_train_bytes) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->zstd_max_train_bytes = static_cast<uint32_t>(jzstd_max_train_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: zstdMaxTrainBytes
* Signature: (J)I
*/
jint Java_org_rocksdb_CompressionOptions_zstdMaxTrainBytes(
JNIEnv *, jobject, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jint>(opt->zstd_max_train_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setMaxDictBufferBytes
* Signature: (JJ)V
*/
void Java_org_rocksdb_CompressionOptions_setMaxDictBufferBytes(
JNIEnv*, jobject, jlong jhandle, jlong jmax_dict_buffer_bytes) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->max_dict_buffer_bytes = static_cast<uint64_t>(jmax_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: maxDictBufferBytes
* Signature: (J)J
*/
jlong Java_org_rocksdb_CompressionOptions_maxDictBufferBytes(JNIEnv*, jobject,
jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jlong>(opt->max_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setEnabled
* Signature: (JZ)V
*/
void Java_org_rocksdb_CompressionOptions_setEnabled(
JNIEnv*, jobject, jlong jhandle, jboolean jenabled) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->enabled = jenabled == JNI_TRUE;
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: enabled
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_CompressionOptions_enabled(
JNIEnv*, jobject, jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<bool>(opt->enabled);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_CompressionOptions_disposeInternal(
JNIEnv*, jobject, jlong jhandle) {
delete reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
}