// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // #ifndef ROCKSDB_LITE #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif #include "utilities/blob_db/blob_db.h" #include #include "db/write_batch_internal.h" #include "monitoring/instrumented_mutex.h" #include "options/cf_options.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/utilities/stackable_db.h" #include "table/block.h" #include "table/block_based_table_builder.h" #include "table/block_builder.h" #include "util/file_reader_writer.h" #include "util/filename.h" #include "utilities/blob_db/blob_db_impl.h" namespace rocksdb { namespace blob_db { port::Mutex listener_mutex; typedef std::shared_ptr FlushBeginListener_t; typedef std::shared_ptr ReconcileWalFilter_t; typedef std::shared_ptr CompactionListener_t; // to ensure the lifetime of the listeners std::vector> all_blobdb_listeners; std::vector all_wal_filters; Status BlobDB::OpenAndLoad(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db, Options* changed_options) { *changed_options = options; *blob_db = nullptr; FlushBeginListener_t fblistener = std::make_shared(); ReconcileWalFilter_t rw_filter = std::make_shared(); CompactionListener_t ce_listener = std::make_shared(); { MutexLock l(&listener_mutex); all_blobdb_listeners.push_back(fblistener); all_blobdb_listeners.push_back(ce_listener); all_wal_filters.push_back(rw_filter); } changed_options->listeners.emplace_back(fblistener); changed_options->listeners.emplace_back(ce_listener); changed_options->wal_filter = rw_filter.get(); DBOptions db_options(*changed_options); // we need to open blob db first so that recovery can happen BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); fblistener->SetImplPtr(bdb); ce_listener->SetImplPtr(bdb); rw_filter->SetImplPtr(bdb); Status s = bdb->OpenPhase1(); if (!s.ok()) return s; *blob_db = bdb; return s; } Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db) { *blob_db = nullptr; DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; Status s = BlobDB::Open(db_options, bdb_options, dbname, column_families, &handles, blob_db); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a reference to // default column family delete handles[0]; } return s; } Status BlobDB::Open(const DBOptions& db_options_input, const BlobDBOptions& bdb_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, BlobDB** blob_db, bool no_base_db) { *blob_db = nullptr; Status s; DBOptions db_options(db_options_input); if (db_options.info_log == nullptr) { s = CreateLoggerFromOptions(dbname, db_options, &db_options.info_log); if (!s.ok()) { return s; } } FlushBeginListener_t fblistener = std::make_shared(); CompactionListener_t ce_listener = std::make_shared(); ReconcileWalFilter_t rw_filter = std::make_shared(); db_options.listeners.emplace_back(fblistener); db_options.listeners.emplace_back(ce_listener); db_options.wal_filter = rw_filter.get(); { MutexLock l(&listener_mutex); all_blobdb_listeners.push_back(fblistener); all_blobdb_listeners.push_back(ce_listener); all_wal_filters.push_back(rw_filter); } // we need to open blob db first so that recovery can happen BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); fblistener->SetImplPtr(bdb); ce_listener->SetImplPtr(bdb); rw_filter->SetImplPtr(bdb); s = bdb->OpenPhase1(); if (!s.ok()) { return s; } if (no_base_db) { return s; } DB* db = nullptr; s = DB::Open(db_options, dbname, column_families, handles, &db); if (!s.ok()) { return s; } // set the implementation pointer s = bdb->LinkToBaseDB(db); if (!s.ok()) { delete bdb; bdb = nullptr; } *blob_db = bdb; bdb_options.Dump(db_options.info_log.get()); return s; } BlobDB::BlobDB(DB* db) : StackableDB(db) {} void BlobDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", blob_dir.c_str()); ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d", path_relative); ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d", is_fifo); ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64, blob_dir_size); ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32, ttl_range_secs); ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64, bytes_per_sync); ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, blob_file_size); ROCKS_LOG_HEADER(log, "blob_db_options.num_concurrent_simple_blobs: %" PRIu32, num_concurrent_simple_blobs); ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", ttl_extractor.get()); ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", static_cast(compression)); ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d", disable_background_tasks); } } // namespace blob_db } // namespace rocksdb #endif