Move TQueue::run_gc to ClientManager.

This commit is contained in:
levlam 2022-10-06 21:42:33 +03:00
parent 337b657f9c
commit 022bed651c
3 changed files with 20 additions and 20 deletions

View File

@ -332,6 +332,7 @@ void ClientManager::start_up() {
LOG(WARNING) << "Loaded " << loaded_event_count << " TQueue events in " << (td::Time::now() - load_start_time)
<< " seconds";
last_tqueue_gc_time_ = td::Time::now();
}
// init webhook_db
@ -437,6 +438,21 @@ void ClientManager::raw_event(const td::Event::Raw &event) {
void ClientManager::timeout_expired() {
send_closure(watchdog_id_, &Watchdog::kick);
set_timeout_in(WATCHDOG_TIMEOUT / 2);
double now = td::Time::now();
if (now > last_tqueue_gc_time_ + 60.0) {
auto unix_time = parameters_->shared_data_->get_unix_time(now);
LOG(INFO) << "Run TQueue GC at " << unix_time;
last_tqueue_gc_time_ = now;
auto deleted_events = parameters_->shared_data_->tqueue_->run_gc(unix_time);
LOG(INFO) << "TQueue GC deleted " << deleted_events << " events";
tqueue_deleted_events_ += deleted_events;
if (tqueue_deleted_events_ > last_tqueue_deleted_events_ + 10000) {
LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events_ << " events since the start";
last_tqueue_deleted_events_ = tqueue_deleted_events_;
}
}
}
void ClientManager::hangup_shared() {

View File

@ -70,6 +70,9 @@ class ClientManager final : public td::Actor {
td::vector<td::Promise<td::Unit>> close_promises_;
td::ActorOwn<Watchdog> watchdog_id_;
double last_tqueue_gc_time_ = 0.0;
td::int64 tqueue_deleted_events_ = 0;
td::int64 last_tqueue_deleted_events_ = 0;
static constexpr double WATCHDOG_TIMEOUT = 0.5;

View File

@ -16,7 +16,6 @@
#include "td/telegram/ClientActor.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/TQueue.h"
#include "td/net/GetHostByNameActor.h"
#include "td/net/HttpInboundConnection.h"
@ -161,7 +160,7 @@ static void dump_statistics(const std::shared_ptr<SharedData> &shared_data,
auto query_list_size = shared_data->query_list_size_.load(std::memory_order_relaxed);
auto query_count = shared_data->query_count_.load(std::memory_order_relaxed);
LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size);
/*
/*
td::uint64 i = 0;
bool was_gap = false;
for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
@ -551,9 +550,6 @@ int main(int argc, char *argv[]) {
double next_watchdog_kick_time = start_time;
double next_cron_time = start_time;
double last_dump_time = start_time - 1000.0;
double last_tqueue_gc_time = start_time - 1000.0;
td::int64 tqueue_deleted_events = 0;
td::int64 last_tqueue_deleted_events = 0;
bool close_flag = false;
std::atomic_bool can_quit{false};
ServerCpuStat::instance(); // create ServerCpuStat instance
@ -619,21 +615,6 @@ int main(int argc, char *argv[]) {
next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2;
}
if (now > last_tqueue_gc_time + 60.0 && false) {
auto unix_time = shared_data->get_unix_time(now);
LOG(INFO) << "Run TQueue GC at " << unix_time;
last_tqueue_gc_time = now;
auto guard = sched.get_main_guard();
auto deleted_events = shared_data->tqueue_->run_gc(unix_time);
LOG(INFO) << "TQueue GC deleted " << deleted_events << " events";
tqueue_deleted_events += deleted_events;
if (tqueue_deleted_events > last_tqueue_deleted_events + 10000) {
LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events << " events since the start";
last_tqueue_deleted_events = tqueue_deleted_events;
}
}
if (now > last_dump_time + 300.0) {
last_dump_time = now;
dump_statistics(shared_data, net_query_stats);