diff --git a/HISTORY.md b/HISTORY.md index b64e12b42..a8b89f54f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * We have refactored our system of stalling writes. Any stall-related statistics' meanings are changed. Instead of per-write stall counts, we now count stalls per-epoch, where epochs are periods between flushes and compactions. You'll find more information in our Tuning Perf Guide once we release RocksDB 3.6. * When disableDataSync=true, we no longer sync the MANIFEST file. * Add identity_as_first_hash property to CuckooTable. SST file needs to be rebuilt to be opened by reader properly. +* Change target_file_size_base type to uint64_t from int. ----- Past Releases ----- diff --git a/Makefile b/Makefile index 260a51d1a..9d626e17f 100644 --- a/Makefile +++ b/Makefile @@ -198,7 +198,7 @@ endif # PLATFORM_SHARED_EXT .PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \ release tags valgrind_check whitebox_crash_test format static_lib shared_lib all \ - dbg install uninstall + dbg rocksdbjavastatic rocksdbjava install uninstall all: $(LIBRARY) $(PROGRAMS) $(TESTS) @@ -268,7 +268,7 @@ unity: unity.cc unity.o clean: -rm -f $(PROGRAMS) $(TESTS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) build_config.mk unity.cc -rm -rf ios-x86/* ios-arm/* - -find . -name "*.[od]" -exec rm {} \; + -find . -name "*.[oda]" -exec rm {} \; -find . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \; tags: ctags * -R @@ -518,6 +518,37 @@ ROCKSDBJNILIB = librocksdbjni.jnilib JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/ endif +libz.a: + -rm -rf zlib-1.2.8 + curl -O http://zlib.net/zlib-1.2.8.tar.gz + tar xvzf zlib-1.2.8.tar.gz + cd zlib-1.2.8 && CFLAGS='-fPIC' ./configure --static && make + cp zlib-1.2.8/libz.a . + +libbz2.a: + -rm -rf bzip2-1.0.6 + curl -O http://www.bzip.org/1.0.6/bzip2-1.0.6.tar.gz + tar xvzf bzip2-1.0.6.tar.gz + cd bzip2-1.0.6 && make CFLAGS='-fPIC -Wall -Winline -O2 -g -D_FILE_OFFSET_BITS=64' + cp bzip2-1.0.6/libbz2.a . + +libsnappy.a: + -rm -rf snappy-1.1.1 + curl -O https://snappy.googlecode.com/files/snappy-1.1.1.tar.gz + tar xvzf snappy-1.1.1.tar.gz + cd snappy-1.1.1 && ./configure --with-pic --enable-static + cd snappy-1.1.1 && make + cp snappy-1.1.1/.libs/libsnappy.a . + + +rocksdbjavastatic: libz.a libbz2.a libsnappy.a + OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j + cd java;$(MAKE) java; + rm -f ./java/$(ROCKSDBJNILIB) + $(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(COVERAGEFLAGS) libz.a libbz2.a libsnappy.a + cd java;jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class HISTORY*.md $(ROCKSDBJNILIB) + + rocksdbjava: OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32 cd java;$(MAKE) java; diff --git a/db/column_family.cc b/db/column_family.cc index ff6b8fe6c..8b4e007ed 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -86,6 +86,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); } +const Comparator* ColumnFamilyHandleImpl::user_comparator() const { + return cfd()->user_comparator(); +} + ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, const ColumnFamilyOptions& src) { ColumnFamilyOptions result = src; @@ -726,4 +730,13 @@ uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { return column_family_id; } +const Comparator* GetColumnFamilyUserComparator( + ColumnFamilyHandle* column_family) { + if (column_family != nullptr) { + auto cfh = reinterpret_cast(column_family); + return cfh->user_comparator(); + } + return nullptr; +} + } // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h index f1ef13cf1..65b4b53ba 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -49,6 +49,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { // destroy without mutex virtual ~ColumnFamilyHandleImpl(); virtual ColumnFamilyData* cfd() const { return cfd_; } + virtual const Comparator* user_comparator() const; virtual uint32_t GetID() const; @@ -448,4 +449,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family); +extern const Comparator* GetColumnFamilyUserComparator( + ColumnFamilyHandle* column_family); + } // namespace rocksdb diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 04d5c6f47..7cd965c20 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -575,7 +575,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, return nullptr; } Version::FileSummaryStorage tmp; - LogToBuffer(log_buffer, "[%s] Universal: candidate files(%zu): %s\n", + LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n", version->cfd_->GetName().c_str(), version->files_[level].size(), version->LevelFileSummary(&tmp, 0)); diff --git a/db/db_bench.cc b/db/db_bench.cc index 08e61e46b..d90c628a9 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -307,7 +307,7 @@ DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL"); DEFINE_int32(num_levels, 7, "The total number of levels"); -DEFINE_int32(target_file_size_base, 2 * 1048576, "Target file size at level-1"); +DEFINE_int64(target_file_size_base, 2 * 1048576, "Target file size at level-1"); DEFINE_int32(target_file_size_multiplier, 1, "A multiplier to compute target level-N file size (N >= 2)"); diff --git a/db/db_impl.cc b/db/db_impl.cc index 6038c2ce5..260939810 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3117,9 +3117,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, const uint64_t start_micros = env_->NowMicros(); unique_ptr input(versions_->MakeInputIterator(compact->compaction)); input->SeekToFirst(); - shared_ptr backup_input( - versions_->MakeInputIterator(compact->compaction)); - backup_input->SeekToFirst(); Status status; ParsedInternalKey ikey; @@ -3132,14 +3129,30 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, auto compaction_filter_v2 = compaction_filter_from_factory_v2.get(); - // temp_backup_input always point to the start of the current buffer - // temp_backup_input = backup_input; - // iterate through input, - // 1) buffer ineligible keys and value keys into 2 separate buffers; - // 2) send value_buffer to compaction filter and alternate the values; - // 3) merge value_buffer with ineligible_value_buffer; - // 4) run the modified "compaction" using the old for loop. - if (compaction_filter_v2) { + if (!compaction_filter_v2) { + status = ProcessKeyValueCompaction( + is_snapshot_supported, + visible_at_tip, + earliest_snapshot, + latest_snapshot, + deletion_state, + bottommost_level, + imm_micros, + input.get(), + compact, + false, + log_buffer); + } else { + // temp_backup_input always point to the start of the current buffer + // temp_backup_input = backup_input; + // iterate through input, + // 1) buffer ineligible keys and value keys into 2 separate buffers; + // 2) send value_buffer to compaction filter and alternate the values; + // 3) merge value_buffer with ineligible_value_buffer; + // 4) run the modified "compaction" using the old for loop. + shared_ptr backup_input( + versions_->MakeInputIterator(compact->compaction)); + backup_input->SeekToFirst(); while (backup_input->Valid() && !shutting_down_.Acquire_Load() && !cfd->IsDropped()) { // FLUSH preempts compaction @@ -3267,21 +3280,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, log_buffer); } // checking for compaction filter v2 - if (!compaction_filter_v2) { - status = ProcessKeyValueCompaction( - is_snapshot_supported, - visible_at_tip, - earliest_snapshot, - latest_snapshot, - deletion_state, - bottommost_level, - imm_micros, - input.get(), - compact, - false, - log_buffer); - } - if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); diff --git a/db/version_set.h b/db/version_set.h index 353adbfec..211fca179 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -183,10 +183,10 @@ class Version { // Return a human-readable short (single-line) summary of the number // of files per level. Uses *scratch as backing store. struct LevelSummaryStorage { - char buffer[100]; + char buffer[1000]; }; struct FileSummaryStorage { - char buffer[1000]; + char buffer[3000]; }; const char* LevelSummary(LevelSummaryStorage* scratch) const; // Return a human-readable short (single-line) summary of files diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index d8fa52d40..ba7451078 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -289,6 +289,9 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl { explicit ColumnFamilyHandleImplDummy(int id) : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {} uint32_t GetID() const override { return id_; } + const Comparator* user_comparator() const override { + return BytewiseComparator(); + } private: uint32_t id_; @@ -320,7 +323,7 @@ TEST(WriteBatchTest, ColumnFamiliesBatchTest) { } TEST(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { - WriteBatchWithIndex batch(BytewiseComparator(), 20); + WriteBatchWithIndex batch; ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8); batch.Put(&zero, Slice("foo"), Slice("bar")); batch.Put(&two, Slice("twofoo"), Slice("bar2")); diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index de4480cff..54b676626 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -77,6 +77,8 @@ struct ImmutableCFOptions { std::vector compression_per_level; CompressionOptions compression_opts; + + Options::AccessHint access_hint_on_compaction_start; }; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2c9734d24..a60f94268 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -58,6 +58,7 @@ enum CompactionStyle : char { kCompactionStyleFIFO = 0x2, // FIFO compaction style }; + struct CompactionOptionsFIFO { // once the total sum of table files reaches this, we will delete the oldest // table file @@ -287,7 +288,7 @@ struct ColumnFamilyOptions { // and each file on level-3 will be 200MB. // by default target_file_size_base is 2MB. - int target_file_size_base; + uint64_t target_file_size_base; // by default target_file_size_multiplier is 1, which means // by default files in different levels will have similar size. int target_file_size_multiplier; @@ -783,12 +784,13 @@ struct DBOptions { // Specify the file access pattern once a compaction is started. // It will be applied to all input files of a compaction. // Default: NORMAL - enum { - NONE, - NORMAL, - SEQUENTIAL, - WILLNEED - } access_hint_on_compaction_start; + enum AccessHint { + NONE, + NORMAL, + SEQUENTIAL, + WILLNEED + }; + AccessHint access_hint_on_compaction_start; // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index c09f53d11..85c80850f 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -11,8 +11,9 @@ #pragma once -#include "rocksdb/status.h" +#include "rocksdb/comparator.h" #include "rocksdb/slice.h" +#include "rocksdb/status.h" #include "rocksdb/write_batch.h" namespace rocksdb { @@ -56,12 +57,14 @@ class WBWIIterator { // A user can call NewIterator() to create an iterator. class WriteBatchWithIndex { public: - // index_comparator indicates the order when iterating data in the write - // batch. Technically, it doesn't have to be the same as the one used in - // the DB. + // backup_index_comparator: the backup comparator used to compare keys + // within the same column family, if column family is not given in the + // interface, or we can't find a column family from the column family handle + // passed in, backup_index_comparator will be used for the column family. // reserved_bytes: reserved bytes in underlying WriteBatch - explicit WriteBatchWithIndex(const Comparator* index_comparator, - size_t reserved_bytes = 0); + explicit WriteBatchWithIndex( + const Comparator* backup_index_comparator = BytewiseComparator(), + size_t reserved_bytes = 0); virtual ~WriteBatchWithIndex(); WriteBatch* GetWriteBatch(); diff --git a/java/org/rocksdb/NativeLibraryLoader.java b/java/org/rocksdb/NativeLibraryLoader.java new file mode 100644 index 000000000..440056582 --- /dev/null +++ b/java/org/rocksdb/NativeLibraryLoader.java @@ -0,0 +1,58 @@ +package org.rocksdb; + +import java.io.*; + + +/** + * This class is used to load the RocksDB shared library from within the jar. + * The shared library is extracted to a temp folder and loaded from there. + */ +public class NativeLibraryLoader { + private static String sharedLibraryName = "librocksdbjni.so"; + private static String tempFilePrefix = "librocksdbjni"; + private static String tempFileSuffix = ".so"; + + public static void loadLibraryFromJar(String tmpDir) + throws IOException { + File temp; + if(tmpDir == null || tmpDir.equals("")) + temp = File.createTempFile(tempFilePrefix, tempFileSuffix); + else + temp = new File(tmpDir + "/" + sharedLibraryName); + + temp.deleteOnExit(); + + if (!temp.exists()) { + throw new RuntimeException("File " + temp.getAbsolutePath() + " does not exist."); + } + + byte[] buffer = new byte[102400]; + int readBytes; + + InputStream is = ClassLoader.getSystemClassLoader().getResourceAsStream(sharedLibraryName); + if (is == null) { + throw new RuntimeException(sharedLibraryName + " was not found inside JAR."); + } + + OutputStream os = null; + try { + os = new FileOutputStream(temp); + while ((readBytes = is.read(buffer)) != -1) { + os.write(buffer, 0, readBytes); + } + } finally { + if(os != null) + os.close(); + + if(is != null) + is.close(); + } + + System.load(temp.getAbsolutePath()); + } + /** + * Private constructor to disallow instantiation + */ + private NativeLibraryLoader() { + } +} diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index 876a06285..33ca19d9d 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -13,6 +13,9 @@ package org.rocksdb; * native resources will be released as part of the process. */ public class Options extends RocksObject { + static { + RocksDB.loadLibrary(); + } static final long DEFAULT_CACHE_SIZE = 8 << 20; static final int DEFAULT_NUM_SHARD_BITS = -1; /** diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 829ac48df..a16586551 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -11,6 +11,7 @@ import java.util.HashMap; import java.io.Closeable; import java.io.IOException; import org.rocksdb.util.Environment; +import org.rocksdb.NativeLibraryLoader; /** * A RocksDB is a persistent ordered map from keys to values. It is safe for @@ -23,11 +24,19 @@ public class RocksDB extends RocksObject { private static final String[] compressionLibs_ = { "snappy", "z", "bzip2", "lz4", "lz4hc"}; + static { + RocksDB.loadLibrary(); + } + /** * Loads the necessary library files. * Calling this method twice will have no effect. + * By default the method extracts the shared library for loading at + * java.io.tmpdir, however, you can override this temporary location by + * setting the environment variable ROCKSDB_SHAREDLIB_DIR. */ public static synchronized void loadLibrary() { + String tmpDir = System.getenv("ROCKSDB_SHAREDLIB_DIR"); // loading possibly necessary libraries. for (String lib : compressionLibs_) { try { @@ -36,8 +45,14 @@ public class RocksDB extends RocksObject { // since it may be optional, we ignore its loading failure here. } } - // However, if any of them is required. We will see error here. - System.loadLibrary("rocksdbjni"); + try + { + NativeLibraryLoader.loadLibraryFromJar(tmpDir); + } + catch (IOException e) + { + throw new RuntimeException("Unable to load the RocksDB shared library" + e); + } } /** diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 2e883632f..09328dc3b 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -498,7 +498,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // pre-load these blocks, which will kept in member variables in Rep // and with a same life-time as this table object. IndexReader* index_reader = nullptr; - // TODO: we never really verify check sum for index block s = new_table->CreateIndexReader(&index_reader, meta_iter.get()); if (s.ok()) { @@ -533,8 +532,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, } void BlockBasedTable::SetupForCompaction() { - /* - switch (.access_hint_on_compaction_start) { + switch (rep_->ioptions.access_hint_on_compaction_start) { case Options::NONE: break; case Options::NORMAL: @@ -550,7 +548,6 @@ void BlockBasedTable::SetupForCompaction() { assert(false); } compaction_optimized_ = true; - */ } std::shared_ptr BlockBasedTable::GetTableProperties() diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index 3b170b638..6dd5e5525 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -522,7 +522,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { float time_per_op = (env->NowMicros() - start_time) * 1.0 / num; fprintf(stderr, "Time taken per op is %.3fus (%.1f Mqps) with batch size of %u, " - "# of found keys %ld\n", + "# of found keys %" PRId64 "\n", time_per_op, 1.0 / time_per_op, batch_size, found_count); } } // namespace. diff --git a/table/full_filter_block_test.cc b/table/full_filter_block_test.cc index 12e783b4a..7bf61f238 100644 --- a/table/full_filter_block_test.cc +++ b/table/full_filter_block_test.cc @@ -30,7 +30,8 @@ class TestFilterBitsBuilder : public FilterBitsBuilder { for (size_t i = 0; i < hash_entries_.size(); i++) { EncodeFixed32(data + i * 4, hash_entries_[i]); } - buf->reset(data); + const char* const_data = data; + buf->reset(const_data); return Slice(data, len); } diff --git a/util/env_test.cc b/util/env_test.cc index 3e811a98d..1779f1aa0 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -768,6 +768,41 @@ TEST(EnvPosixTest, LogBufferTest) { ASSERT_EQ(10, test_logger.char_x_count); } +class TestLogger2 : public Logger { + public: + explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {} + virtual void Logv(const char* format, va_list ap) override { + char new_format[2000]; + std::fill_n(new_format, sizeof(new_format), '2'); + { + va_list backup_ap; + va_copy(backup_ap, ap); + int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap); + // 48 bytes for extra information + bytes allocated + ASSERT_TRUE( + n <= 48 + static_cast(max_log_size_ - sizeof(struct timeval))); + ASSERT_TRUE(n > static_cast(max_log_size_ - sizeof(struct timeval))); + va_end(backup_ap); + } + } + size_t max_log_size_; +}; + +TEST(EnvPosixTest, LogBufferMaxSizeTest) { + char bytes9000[9000]; + std::fill_n(bytes9000, sizeof(bytes9000), '1'); + bytes9000[sizeof(bytes9000) - 1] = '\0'; + + for (size_t max_log_size = 256; max_log_size <= 1024; + max_log_size += 1024 - 256) { + TestLogger2 test_logger(max_log_size); + test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger); + LogToBuffer(&log_buffer, max_log_size, "%s", bytes9000); + log_buffer.FlushBufferToLog(); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/log_buffer.cc b/util/log_buffer.cc index 726c01442..ddddaec9f 100644 --- a/util/log_buffer.cc +++ b/util/log_buffer.cc @@ -13,17 +13,17 @@ LogBuffer::LogBuffer(const InfoLogLevel log_level, Logger*info_log) : log_level_(log_level), info_log_(info_log) {} -void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { +void LogBuffer::AddLogToBuffer(size_t max_log_size, const char* format, + va_list ap) { if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { // Skip the level because of its level. return; } - const size_t kLogSizeLimit = 512; - char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit); + char* alloc_mem = arena_.AllocateAligned(max_log_size); BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); char* p = buffered_log->message; - char* limit = alloc_mem + kLogSizeLimit - 1; + char* limit = alloc_mem + max_log_size - 1; // store the time gettimeofday(&(buffered_log->now_tv), nullptr); @@ -61,11 +61,22 @@ void LogBuffer::FlushBufferToLog() { logs_.clear(); } -void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { +void LogToBuffer(LogBuffer* log_buffer, size_t max_log_size, const char* format, + ...) { if (log_buffer != nullptr) { va_list ap; va_start(ap, format); - log_buffer->AddLogToBuffer(format, ap); + log_buffer->AddLogToBuffer(max_log_size, format, ap); + va_end(ap); + } +} + +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + const size_t kDefaultMaxLogSize = 512; + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(kDefaultMaxLogSize, format, ap); va_end(ap); } } diff --git a/util/log_buffer.h b/util/log_buffer.h index 2a24bf854..2d790086e 100644 --- a/util/log_buffer.h +++ b/util/log_buffer.h @@ -21,8 +21,9 @@ class LogBuffer { // info_log: logger to write the logs to LogBuffer(const InfoLogLevel log_level, Logger* info_log); - // Add a log entry to the buffer. - void AddLogToBuffer(const char* format, va_list ap); + // Add a log entry to the buffer. Use default max_log_size. + // max_log_size indicates maximize log size, including some metadata. + void AddLogToBuffer(size_t max_log_size, const char* format, va_list ap); size_t IsEmpty() const { return logs_.empty(); } @@ -44,6 +45,10 @@ class LogBuffer { // Add log to the LogBuffer for a delayed info logging. It can be used when // we want to add some logs inside a mutex. +// max_log_size indicates maximize log size, including some metadata. +extern void LogToBuffer(LogBuffer* log_buffer, size_t max_log_size, + const char* format, ...); +// Same as previous function, but with default max log size. extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); } // namespace rocksdb diff --git a/util/logging.cc b/util/logging.cc index 4dfb9a449..98d96b82b 100644 --- a/util/logging.cc +++ b/util/logging.cc @@ -45,7 +45,7 @@ int AppendHumanBytes(uint64_t bytes, char* output, int len) { void AppendNumberTo(std::string* str, uint64_t num) { char buf[30]; - snprintf(buf, sizeof(buf), "%llu", (unsigned long long) num); + snprintf(buf, sizeof(buf), "%" PRIu64, num); str->append(buf); } diff --git a/util/logging.h b/util/logging.h index ce0269726..7ca8ae0a3 100644 --- a/util/logging.h +++ b/util/logging.h @@ -19,7 +19,6 @@ namespace rocksdb { class Slice; -class WritableFile; // Append a human-readable size in bytes int AppendHumanBytes(uint64_t bytes, char* output, int len); diff --git a/util/options.cc b/util/options.cc index f0042cbda..28120659b 100644 --- a/util/options.cc +++ b/util/options.cc @@ -59,7 +59,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) use_fsync(options.use_fsync), compression(options.compression), compression_per_level(options.compression_per_level), - compression_opts(options.compression_opts) {} + compression_opts(options.compression_opts), + access_hint_on_compaction_start(options.access_hint_on_compaction_start) {} ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), @@ -273,8 +274,8 @@ void DBOptions::Dump(Logger* log) const { Log(log, " Options.disableDataSync: %d", disableDataSync); Log(log, " Options.use_fsync: %d", use_fsync); Log(log, " Options.max_log_file_size: %zu", max_log_file_size); - Log(log, "Options.max_manifest_file_size: %lu", - (unsigned long)max_manifest_file_size); + Log(log, "Options.max_manifest_file_size: %" PRIu64, + max_manifest_file_size); Log(log, " Options.log_file_time_to_roll: %zu", log_file_time_to_roll); Log(log, " Options.keep_log_file_num: %zu", keep_log_file_num); Log(log, " Options.allow_os_buffer: %d", allow_os_buffer); @@ -290,16 +291,16 @@ void DBOptions::Dump(Logger* log) const { table_cache_numshardbits); Log(log, " Options.table_cache_remove_scan_count_limit: %d", table_cache_remove_scan_count_limit); - Log(log, " Options.delete_obsolete_files_period_micros: %lu", - (unsigned long)delete_obsolete_files_period_micros); + Log(log, " Options.delete_obsolete_files_period_micros: %" PRIu64, + delete_obsolete_files_period_micros); Log(log, " Options.max_background_compactions: %d", max_background_compactions); Log(log, " Options.max_background_flushes: %d", max_background_flushes); - Log(log, " Options.WAL_ttl_seconds: %lu", - (unsigned long)WAL_ttl_seconds); - Log(log, " Options.WAL_size_limit_MB: %lu", - (unsigned long)WAL_size_limit_MB); + Log(log, " Options.WAL_ttl_seconds: %" PRIu64, + WAL_ttl_seconds); + Log(log, " Options.WAL_size_limit_MB: %" PRIu64, + WAL_size_limit_MB); Log(log, " Options.manifest_preallocation_size: %zu", manifest_preallocation_size); Log(log, " Options.allow_os_buffer: %d", @@ -322,8 +323,8 @@ void DBOptions::Dump(Logger* log) const { use_adaptive_mutex); Log(log, " Options.rate_limiter: %p", rate_limiter.get()); - Log(log, " Options.bytes_per_sync: %lu", - (unsigned long)bytes_per_sync); + Log(log, " Options.bytes_per_sync: %" PRIu64, + bytes_per_sync); } // DBOptions::Dump void ColumnFamilyOptions::Dump(Logger* log) const { @@ -371,20 +372,20 @@ void ColumnFamilyOptions::Dump(Logger* log) const { level0_stop_writes_trigger); Log(log," Options.max_mem_compaction_level: %d", max_mem_compaction_level); - Log(log," Options.target_file_size_base: %d", + Log(log," Options.target_file_size_base: %" PRIu64, target_file_size_base); Log(log," Options.target_file_size_multiplier: %d", target_file_size_multiplier); - Log(log," Options.max_bytes_for_level_base: %lu", - (unsigned long)max_bytes_for_level_base); + Log(log," Options.max_bytes_for_level_base: %" PRIu64, + max_bytes_for_level_base); Log(log," Options.max_bytes_for_level_multiplier: %d", max_bytes_for_level_multiplier); for (int i = 0; i < num_levels; i++) { Log(log,"Options.max_bytes_for_level_multiplier_addtl[%d]: %d", i, max_bytes_for_level_multiplier_additional[i]); } - Log(log," Options.max_sequential_skip_in_iterations: %lu", - (unsigned long)max_sequential_skip_in_iterations); + Log(log," Options.max_sequential_skip_in_iterations: %" PRIu64, + max_sequential_skip_in_iterations); Log(log," Options.expanded_compaction_factor: %d", expanded_compaction_factor); Log(log," Options.source_compaction_factor: %d", diff --git a/util/options_helper.cc b/util/options_helper.cc index db066f747..d552a2b9e 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -177,7 +177,7 @@ bool GetOptionsFromStrings( } else if (o.first == "max_mem_compaction_level") { new_options->max_mem_compaction_level = ParseInt(o.second); } else if (o.first == "target_file_size_base") { - new_options->target_file_size_base = ParseInt(o.second); + new_options->target_file_size_base = ParseUint64(o.second); } else if (o.first == "target_file_size_multiplier") { new_options->target_file_size_multiplier = ParseInt(o.second); } else if (o.first == "max_bytes_for_level_base") { diff --git a/util/options_test.cc b/util/options_test.cc index f640b991f..eee285e2a 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -177,7 +177,7 @@ TEST(OptionsTest, GetOptionsFromStringsTest) { ASSERT_EQ(new_opt.level0_slowdown_writes_trigger, 9); ASSERT_EQ(new_opt.level0_stop_writes_trigger, 10); ASSERT_EQ(new_opt.max_mem_compaction_level, 11); - ASSERT_EQ(new_opt.target_file_size_base, 12); + ASSERT_EQ(new_opt.target_file_size_base, static_cast(12)); ASSERT_EQ(new_opt.target_file_size_multiplier, 13); ASSERT_EQ(new_opt.max_bytes_for_level_base, 14U); ASSERT_EQ(new_opt.max_bytes_for_level_multiplier, 15); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 68b3d3970..2caa2e4cc 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -20,7 +20,6 @@ class ReadableWriteBatch : public WriteBatch { Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, Slice* value, Slice* blob) const; }; -} // namespace // Key used by skip list, as the binary searchable index of WriteBatchWithIndex. struct WriteBatchIndexEntry { @@ -38,44 +37,28 @@ struct WriteBatchIndexEntry { class WriteBatchEntryComparator { public: - WriteBatchEntryComparator(const Comparator* comparator, + WriteBatchEntryComparator(const Comparator* default_comparator, const ReadableWriteBatch* write_batch) - : comparator_(comparator), write_batch_(write_batch) {} + : default_comparator_(default_comparator), write_batch_(write_batch) {} // Compare a and b. Return a negative value if a is less than b, 0 if they // are equal, and a positive value if a is greater than b int operator()(const WriteBatchIndexEntry* entry1, const WriteBatchIndexEntry* entry2) const; + void SetComparatorForCF(uint32_t column_family_id, + const Comparator* comparator) { + cf_comparator_map_[column_family_id] = comparator; + } + private: - const Comparator* comparator_; + const Comparator* default_comparator_; + std::unordered_map cf_comparator_map_; const ReadableWriteBatch* write_batch_; }; typedef SkipList WriteBatchEntrySkipList; -struct WriteBatchWithIndex::Rep { - Rep(const Comparator* index_comparator, size_t reserved_bytes = 0) - : write_batch(reserved_bytes), - comparator(index_comparator, &write_batch), - skip_list(comparator, &arena) {} - ReadableWriteBatch write_batch; - WriteBatchEntryComparator comparator; - Arena arena; - WriteBatchEntrySkipList skip_list; - - WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) { - return GetEntryWithCfId(GetColumnFamilyID(column_family)); - } - - WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) { - auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); - auto* index_entry = new (mem) - WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id); - return index_entry; - } -}; - class WBWIIteratorImpl : public WBWIIterator { public: WBWIIteratorImpl(uint32_t column_family_id, @@ -138,6 +121,35 @@ class WBWIIteratorImpl : public WBWIIterator { } } }; +} // namespace + +struct WriteBatchWithIndex::Rep { + Rep(const Comparator* index_comparator, size_t reserved_bytes = 0) + : write_batch(reserved_bytes), + comparator(index_comparator, &write_batch), + skip_list(comparator, &arena) {} + ReadableWriteBatch write_batch; + WriteBatchEntryComparator comparator; + Arena arena; + WriteBatchEntrySkipList skip_list; + + WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) { + uint32_t cf_id = GetColumnFamilyID(column_family); + const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); + if (cf_cmp != nullptr) { + comparator.SetComparatorForCF(cf_id, cf_cmp); + } + + return GetEntryWithCfId(cf_id); + } + + WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) { + auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); + auto* index_entry = new (mem) + WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id); + return index_entry; + } +}; Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, @@ -179,9 +191,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, return Status::OK(); } -WriteBatchWithIndex::WriteBatchWithIndex(const Comparator* index_comparator, - size_t reserved_bytes) - : rep(new Rep(index_comparator, reserved_bytes)) {} +WriteBatchWithIndex::WriteBatchWithIndex( + const Comparator* default_index_comparator, size_t reserved_bytes) + : rep(new Rep(default_index_comparator, reserved_bytes)) {} WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; } @@ -287,7 +299,14 @@ int WriteBatchEntryComparator::operator()( key2 = *(entry2->search_key); } - int cmp = comparator_->Compare(key1, key2); + int cmp; + auto comparator_for_cf = cf_comparator_map_.find(entry1->column_family); + if (comparator_for_cf != cf_comparator_map_.end()) { + cmp = comparator_for_cf->second->Compare(key1, key2); + } else { + cmp = default_comparator_->Compare(key1, key2); + } + if (cmp != 0) { return cmp; } else if (entry1->offset > entry2->offset) { diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index fdceed4c4..ad8c110c1 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -19,12 +19,16 @@ namespace rocksdb { namespace { class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl { public: - explicit ColumnFamilyHandleImplDummy(int id) - : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {} + explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator) + : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), + id_(id), + comparator_(comparator) {} uint32_t GetID() const override { return id_; } + const Comparator* user_comparator() const override { return comparator_; } private: uint32_t id_; + const Comparator* comparator_; }; struct Entry { @@ -90,8 +94,9 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { index_map[e.value].push_back(&e); } - WriteBatchWithIndex batch(BytewiseComparator(), 20); - ColumnFamilyHandleImplDummy data(6), index(8); + WriteBatchWithIndex batch(nullptr, 20); + ColumnFamilyHandleImplDummy data(6, BytewiseComparator()); + ColumnFamilyHandleImplDummy index(8, BytewiseComparator()); for (auto& e : entries) { if (e.type == kPutRecord) { batch.Put(&data, e.key, e.value); @@ -230,6 +235,107 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { } } +class ReverseComparator : public Comparator { + public: + ReverseComparator() {} + + virtual const char* Name() const override { + return "rocksdb.ReverseComparator"; + } + + virtual int Compare(const Slice& a, const Slice& b) const override { + return 0 - BytewiseComparator()->Compare(a, b); + } + + virtual void FindShortestSeparator(std::string* start, + const Slice& limit) const {} + virtual void FindShortSuccessor(std::string* key) const {} +}; + +TEST(WriteBatchWithIndexTest, TestComparatorForCF) { + ReverseComparator reverse_cmp; + ColumnFamilyHandleImplDummy cf1(6, nullptr); + ColumnFamilyHandleImplDummy reverse_cf(66, &reverse_cmp); + ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20); + + batch.Put(&cf1, "ddd", ""); + batch.Put(&cf2, "aaa", ""); + batch.Put(&cf2, "eee", ""); + batch.Put(&cf1, "ccc", ""); + batch.Put(&reverse_cf, "a11", ""); + batch.Put(&cf1, "bbb", ""); + batch.Put(&reverse_cf, "a33", ""); + batch.Put(&reverse_cf, "a22", ""); + + { + std::unique_ptr iter(batch.NewIterator(&cf1)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bbb", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ccc", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ddd", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr iter(batch.NewIterator(&cf2)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("aaa", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("eee", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr iter(batch.NewIterator(&reverse_cf)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("z"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a33", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a22", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("a22"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a22", iter->Entry().key.ToString()); + + iter->Seek("a13"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + } +} + } // namespace int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }