//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once
#ifndef ROCKSDB_LITE

#include <mutex>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>

#include "db/db_iter.h"
#include "db/read_callback.h"
#include "db/snapshot_checker.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_txn.h"

namespace ROCKSDB_NAMESPACE {

class PessimisticTransactionDB : public TransactionDB {
 public:
  explicit PessimisticTransactionDB(DB* db,
                                    const TransactionDBOptions& txn_db_options);

  explicit PessimisticTransactionDB(StackableDB* db,
                                    const TransactionDBOptions& txn_db_options);

  virtual ~PessimisticTransactionDB();

  virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }

  virtual Status Initialize(
      const std::vector<size_t>& compaction_enabled_cf_indices,
      const std::vector<ColumnFamilyHandle*>& handles);

  Transaction* BeginTransaction(const WriteOptions& write_options,
                                const TransactionOptions& txn_options,
                                Transaction* old_txn) override = 0;

  using StackableDB::Put;
  virtual Status Put(const WriteOptions& options,
                     ColumnFamilyHandle* column_family, const Slice& key,
                     const Slice& val) override;

  using StackableDB::Delete;
  virtual Status Delete(const WriteOptions& wopts,
                        ColumnFamilyHandle* column_family,
                        const Slice& key) override;

  using StackableDB::SingleDelete;
  virtual Status SingleDelete(const WriteOptions& wopts,
                              ColumnFamilyHandle* column_family,
                              const Slice& key) override;

  using StackableDB::Merge;
  virtual Status Merge(const WriteOptions& options,
                       ColumnFamilyHandle* column_family, const Slice& key,
                       const Slice& value) override;

  using TransactionDB::Write;
  virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
                                            WriteBatch* updates) {
    // Need to lock all keys in this batch to prevent write conflicts with
    // concurrent transactions.
    Transaction* txn = BeginInternalTransaction(opts);
    txn->DisableIndexing();

    auto txn_impl =
        static_cast_with_check<PessimisticTransaction, Transaction>(txn);

    // Since commitBatch sorts the keys before locking, concurrent Write()
    // operations will not cause a deadlock.
    // In order to avoid a deadlock with a concurrent Transaction, Transactions
    // should use a lock timeout.
    Status s = txn_impl->CommitBatch(updates);

    delete txn;

    return s;
  }

  using StackableDB::CreateColumnFamily;
  virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
                                    const std::string& column_family_name,
                                    ColumnFamilyHandle** handle) override;

  using StackableDB::DropColumnFamily;
  virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;

  Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
                 const std::string& key, bool exclusive);

  void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys);
  void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
              const std::string& key);

  void AddColumnFamily(const ColumnFamilyHandle* handle);

  static TransactionDBOptions ValidateTxnDBOptions(
      const TransactionDBOptions& txn_db_options);

  const TransactionDBOptions& GetTxnDBOptions() const {
    return txn_db_options_;
  }

  void InsertExpirableTransaction(TransactionID tx_id,
                                  PessimisticTransaction* tx);
  void RemoveExpirableTransaction(TransactionID tx_id);

  // If transaction is no longer available, locks can be stolen
  // If transaction is available, try stealing locks directly from transaction
  // It is the caller's responsibility to ensure that the referred transaction
  // is expirable (GetExpirationTime() > 0) and that it is expired.
  bool TryStealingExpiredTransactionLocks(TransactionID tx_id);

  Transaction* GetTransactionByName(const TransactionName& name) override;

  void RegisterTransaction(Transaction* txn);
  void UnregisterTransaction(Transaction* txn);

  // not thread safe. current use case is during recovery (single thread)
  void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;

  TransactionLockMgr::LockStatusData GetLockStatusData() override;

  std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
  void SetDeadlockInfoBufferSize(uint32_t target_size) override;

  // The default implementation does nothing. The actual implementation is moved
  // to the child classes that actually need this information. This was due to
  // an odd performance drop we observed when the added std::atomic member to
  // the base class even when the subclass do not read it in the fast path.
  virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
  virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}

 protected:
  DBImpl* db_impl_;
  std::shared_ptr<Logger> info_log_;
  const TransactionDBOptions txn_db_options_;

  void ReinitializeTransaction(
      Transaction* txn, const WriteOptions& write_options,
      const TransactionOptions& txn_options = TransactionOptions());

  virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);

 private:
  friend class WritePreparedTxnDB;
  friend class WritePreparedTxnDBMock;
  friend class WriteUnpreparedTxn;
  friend class TransactionTest_DoubleCrashInRecovery_Test;
  friend class TransactionTest_DoubleEmptyWrite_Test;
  friend class TransactionTest_DuplicateKeys_Test;
  friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
  friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
  friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
  friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
  TransactionLockMgr lock_mgr_;

  // Must be held when adding/dropping column families.
  InstrumentedMutex column_family_mutex_;
  Transaction* BeginInternalTransaction(const WriteOptions& options);

  // Used to ensure that no locks are stolen from an expirable transaction
  // that has started a commit. Only transactions with an expiration time
  // should be in this map.
  std::mutex map_mutex_;
  std::unordered_map<TransactionID, PessimisticTransaction*>
      expirable_transactions_map_;

  // map from name to two phase transaction instance
  std::mutex name_map_mutex_;
  std::unordered_map<TransactionName, Transaction*> transactions_;

  // Signal that we are testing a crash scenario. Some asserts could be relaxed
  // in such cases.
  virtual void TEST_Crash() {}
};

// A PessimisticTransactionDB that writes the data to the DB after the commit.
// In this way the DB only contains the committed data.
class WriteCommittedTxnDB : public PessimisticTransactionDB {
 public:
  explicit WriteCommittedTxnDB(DB* db,
                               const TransactionDBOptions& txn_db_options)
      : PessimisticTransactionDB(db, txn_db_options) {}

  explicit WriteCommittedTxnDB(StackableDB* db,
                               const TransactionDBOptions& txn_db_options)
      : PessimisticTransactionDB(db, txn_db_options) {}

  virtual ~WriteCommittedTxnDB() {}

  Transaction* BeginTransaction(const WriteOptions& write_options,
                                const TransactionOptions& txn_options,
                                Transaction* old_txn) override;

  // Optimized version of ::Write that makes use of skip_concurrency_control
  // hint
  using TransactionDB::Write;
  virtual Status Write(const WriteOptions& opts,
                       const TransactionDBWriteOptimizations& optimizations,
                       WriteBatch* updates) override;
  virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
};

}  // namespace ROCKSDB_NAMESPACE
#endif  // ROCKSDB_LITE