diskless message cache #3

Manually merged
andreacavalli merged 4 commits from :fork into fork 2019-12-15 01:48:06 +01:00
2 changed files with 45 additions and 24 deletions

View File

@ -104,7 +104,7 @@ Status init_messages_db(SqliteDb &db, int32 version) {
TRY_STATUS(
db.exec("CREATE TABLE IF NOT EXISTS messages (dialog_id INT8, message_id INT8, "
"unique_message_id INT4, sender_user_id INT4, random_id INT8, data BLOB, "
"ttl_expires_at INT4, index_mask INT4, search_id INT8, text STRING, notification_id INT4, PRIMARY KEY "
"ttl_expires_at INT4, index_mask INT4, search_id INT8, text STRING, notification_id INT4, seqno INT32, PRIMARY KEY "
"(dialog_id, message_id))"));
TRY_STATUS(
@ -165,36 +165,40 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
}
Status init() {
TRY_RESULT(
seqno_ = 0;
db_memory_ = SqliteDb::open_with_key(":memory:", DbKey::empty()).move_as_ok();
TRY_STATUS(init_messages_db(db_memory_, db_.user_version().move_as_ok()));
TRY_RESULT(
add_message_stmt,
db_.get_statement("INSERT OR REPLACE INTO messages VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"));
TRY_RESULT(delete_message_stmt, db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
db_memory_.get_statement("INSERT OR REPLACE INTO messages VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)"));
TRY_RESULT(delete_message_stmt, db_memory_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
TRY_RESULT(delete_all_dialog_messages_stmt,
db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id <= ?2"));
db_memory_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id <= ?2"));
TRY_RESULT(delete_dialog_messages_from_user_stmt,
db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND sender_user_id == ?2"));
db_memory_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND sender_user_id == ?2"));
TRY_RESULT(get_message_stmt,
db_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
db_memory_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
TRY_RESULT(get_message_by_random_id_stmt,
db_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND random_id = ?2"));
db_memory_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND random_id = ?2"));
TRY_RESULT(get_message_by_unique_message_id_stmt,
db_.get_statement("SELECT dialog_id, data FROM messages WHERE unique_message_id = ?1"));
db_memory_.get_statement("SELECT dialog_id, data FROM messages WHERE unique_message_id = ?1"));
TRY_RESULT(get_expiring_messages_stmt,
db_.get_statement("SELECT dialog_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= "
db_memory_.get_statement("SELECT dialog_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= "
"?2"));
TRY_RESULT(get_expiring_messages_helper_stmt,
db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM messages WHERE "
db_memory_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM messages WHERE "
"?1 < ttl_expires_at LIMIT ?2) AS T"));
TRY_RESULT(get_messages_asc_stmt,
db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND "
db_memory_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND "
"message_id > ?2 ORDER BY message_id ASC LIMIT ?3"));
TRY_RESULT(get_messages_desc_stmt, db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
TRY_RESULT(get_messages_desc_stmt, db_memory_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
"AND message_id < ?2 ORDER BY message_id DESC LIMIT ?3"));
TRY_RESULT(get_messages_from_notification_id_stmt,
db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
db_memory_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
"AND notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3"));
// TRY_RESULT(
// get_messages_fts_stmt,
@ -204,13 +208,13 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
for (int32 i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
TRY_RESULT(get_messages_from_index_desc_stmt,
db_.get_statement(PSLICE() << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
db_memory_.get_statement(PSLICE() << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
"AND message_id < ?2 AND (index_mask & "
<< (1 << i) << ") != 0 ORDER BY message_id DESC LIMIT ?3"));
get_messages_from_index_stmts_[i].desc_stmt_ = std::move(get_messages_from_index_desc_stmt);
TRY_RESULT(get_messages_from_index_asc_stmt,
db_.get_statement(PSLICE() << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
db_memory_.get_statement(PSLICE() << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 "
"AND message_id > ?2 AND (index_mask & "
<< (1 << i) << ") != 0 ORDER BY message_id ASC LIMIT ?3"));
get_messages_from_index_stmts_[i].asc_stmt_ = std::move(get_messages_from_index_asc_stmt);
@ -221,7 +225,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
for (int i = static_cast<int>(SearchMessagesFilter::Call) - 1, pos = 0;
i < static_cast<int>(SearchMessagesFilter::MissedCall); i++, pos++) {
TRY_RESULT(get_messages_from_index_stmt,
db_.get_statement(PSLICE() << "SELECT dialog_id, data FROM messages "
db_memory_.get_statement(PSLICE() << "SELECT dialog_id, data FROM messages "
"WHERE unique_message_id < ?1 AND (index_mask & "
<< (1 << i) << ") != 0 ORDER BY unique_message_id DESC LIMIT ?2"));
get_calls_stmts_[pos] = std::move(get_messages_from_index_stmt);
@ -272,6 +276,15 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
SCOPE_EXIT {
add_message_stmt_.reset();
};
seqno_++;
if (seqno_ % 8128 == 0) {
TRY_STATUS(db_memory_.exec("DELETE FROM messages WHERE seqno < " + to_string(seqno_ - 7168)));
}
add_message_stmt_.bind_int32(12, seqno_).ensure();
add_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
add_message_stmt_.bind_int64(2, message_id.get()).ensure();
@ -728,6 +741,9 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
private:
SqliteDb db_;
SqliteDb db_memory_;
int32_t seqno_;
SqliteStatement add_message_stmt_;

View File

@ -50,17 +50,22 @@ SqliteDb::~SqliteDb() = default;
Status SqliteDb::init(CSlice path, bool *was_created) {
// If database does not exist, delete all other files which may left
// from older database
bool is_db_exists = stat(path).is_ok();
if (!is_db_exists) {
TRY_STATUS(destroy(path));
if (path != ":memory:") {
bool is_db_exists = stat(path).is_ok();
if (!is_db_exists) {
TRY_STATUS(destroy(path));
}
if (was_created != nullptr) {
*was_created = !is_db_exists;
}
}
if (was_created != nullptr) {
*was_created = !is_db_exists;
}
sqlite3 *db;
CHECK(sqlite3_threadsafe() != 0);
int rc = sqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE /*| SQLITE_OPEN_SHAREDCACHE*/,
int rc = sqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_SHAREDCACHE,
nullptr);
if (rc != SQLITE_OK) {
auto res = Status::Error(PSLICE() << "Failed to open database: " << detail::RawSqliteDb::last_error(db));