1021 lines
34 KiB
C++
Raw Normal View History

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/Global.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/SecretChatActor.h"
#include "td/telegram/SecretChatId.h"
#include "td/telegram/secret_api.h"
#include "td/telegram/telegram_api.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/db/binlog/detail/BinlogEventsProcessor.h"
#include "td/db/BinlogKeyValue.h"
#include "td/db/DbKey.h"
#include "td/mtproto/DhHandshake.h"
#include "td/mtproto/utils.h"
#include "td/tl/tl_object_parse.h"
#include "td/tl/tl_object_store.h"
#include "td/utils/base64.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
#include "td/utils/format.h"
#include "td/utils/Gzip.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/tests.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include <cstdio>
#include <ctime>
#include <limits>
#include <map>
#include <memory>
REGISTER_TESTS(secret);
namespace my_api {
using namespace td;
class messages_getDhConfig {
public:
int32 version_{};
int32 random_length_{};
messages_getDhConfig() = default;
messages_getDhConfig(int32 version_, int32 random_length_);
static const int32 ID = 651135312;
explicit messages_getDhConfig(TlBufferParser &p)
#define FAIL(error) p.set_error(error)
: version_(TlFetchInt::parse(p))
, random_length_(TlFetchInt::parse(p))
#undef FAIL
{
}
};
class InputUser {
public:
static tl_object_ptr<InputUser> fetch(TlBufferParser &p);
};
class inputUser final : public InputUser {
public:
int32 user_id_{};
int64 access_hash_{};
static const int32 ID = -668391402;
inputUser() = default;
explicit inputUser(TlBufferParser &p)
#define FAIL(error) p.set_error(error)
: user_id_(TlFetchInt::parse(p))
, access_hash_(TlFetchLong::parse(p))
#undef FAIL
{
}
};
tl_object_ptr<InputUser> InputUser::fetch(TlBufferParser &p) {
#define FAIL(error) \
p.set_error(error); \
return nullptr;
int constructor = p.fetch_int();
switch (constructor) {
case inputUser::ID:
return make_tl_object<inputUser>(p);
default:
FAIL(PSTRING() << "Unknown constructor found " << format::as_hex(constructor));
}
#undef FAIL
}
class messages_requestEncryption final {
public:
tl_object_ptr<InputUser> user_id_;
int32 random_id_{};
BufferSlice g_a_;
static const int32 ID = -162681021;
messages_requestEncryption();
explicit messages_requestEncryption(TlBufferParser &p)
: user_id_(TlFetchObject<InputUser>::parse(p))
, random_id_(TlFetchInt::parse(p))
, g_a_(TlFetchBytes<BufferSlice>::parse(p)) {
}
};
class inputEncryptedChat final {
public:
int32 chat_id_{};
int64 access_hash_{};
inputEncryptedChat() = default;
static const int32 ID = -247351839;
explicit inputEncryptedChat(TlBufferParser &p) : chat_id_(TlFetchInt::parse(p)), access_hash_(TlFetchLong::parse(p)) {
}
static tl_object_ptr<inputEncryptedChat> fetch(TlBufferParser &p) {
return make_tl_object<inputEncryptedChat>(p);
}
};
class messages_acceptEncryption final {
public:
tl_object_ptr<inputEncryptedChat> peer_;
BufferSlice g_b_;
int64 key_fingerprint_{};
messages_acceptEncryption() = default;
static const int32 ID = 1035731989;
explicit messages_acceptEncryption(TlBufferParser &p)
: peer_(TlFetchBoxed<TlFetchObject<inputEncryptedChat>, -247351839>::parse(p))
, g_b_(TlFetchBytes<BufferSlice>::parse(p))
, key_fingerprint_(TlFetchLong::parse(p)) {
}
};
class messages_sendEncryptedService final {
public:
tl_object_ptr<inputEncryptedChat> peer_;
int64 random_id_{};
BufferSlice data_;
messages_sendEncryptedService() = default;
static const int32 ID = 852769188;
explicit messages_sendEncryptedService(TlBufferParser &p)
: peer_(TlFetchBoxed<TlFetchObject<inputEncryptedChat>, -247351839>::parse(p))
, random_id_(TlFetchLong::parse(p))
, data_(TlFetchBytes<BufferSlice>::parse(p)) {
}
};
class messages_sendEncrypted final {
public:
tl_object_ptr<inputEncryptedChat> peer_;
int64 random_id_{};
BufferSlice data_;
messages_sendEncrypted() = default;
static const int32 ID = -1451792525;
explicit messages_sendEncrypted(TlBufferParser &p)
: peer_(TlFetchBoxed<TlFetchObject<inputEncryptedChat>, -247351839>::parse(p))
, random_id_(TlFetchLong::parse(p))
, data_(TlFetchBytes<BufferSlice>::parse(p)) {
}
};
template <class F>
static void downcast_call(TlBufferParser &p, F &&f) {
auto id = p.fetch_int();
switch (id) {
case messages_getDhConfig::ID:
return f(*make_tl_object<messages_getDhConfig>(p));
case messages_requestEncryption::ID:
return f(*make_tl_object<messages_requestEncryption>(p));
case messages_acceptEncryption::ID:
return f(*make_tl_object<messages_acceptEncryption>(p));
case messages_sendEncrypted::ID:
return f(*make_tl_object<messages_sendEncrypted>(p));
case messages_sendEncryptedService::ID:
return f(*make_tl_object<messages_sendEncryptedService>(p));
default:
LOG(ERROR) << "Unknown constructor " << id;
UNREACHABLE();
}
}
class messages_dhConfig final {
public:
int32 g_{};
BufferSlice p_;
int32 version_{};
BufferSlice random_;
messages_dhConfig() = default;
messages_dhConfig(int32 g_, BufferSlice &&p_, int32 version_, BufferSlice &&random_)
: g_(g_), p_(std::move(p_)), version_(version_), random_(std::move(random_)) {
}
static const int32 ID = 740433629;
int32 get_id() const {
return ID;
}
void store(TlStorerCalcLength &s) const {
(void)sizeof(s);
TlStoreBinary::store(g_, s);
TlStoreString::store(p_, s);
TlStoreBinary::store(version_, s);
TlStoreString::store(random_, s);
}
void store(TlStorerUnsafe &s) const {
(void)sizeof(s);
TlStoreBinary::store(g_, s);
TlStoreString::store(p_, s);
TlStoreBinary::store(version_, s);
TlStoreString::store(random_, s);
}
};
class encryptedChat final {
public:
int32 id_{};
int64 access_hash_{};
int32 date_{};
int32 admin_id_{};
int32 participant_id_{};
BufferSlice g_a_or_b_;
int64 key_fingerprint_{};
encryptedChat() = default;
encryptedChat(int32 id_, int64 access_hash_, int32 date_, int32 admin_id_, int32 participant_id_,
BufferSlice &&g_a_or_b_, int64 key_fingerprint_)
: id_(id_)
, access_hash_(access_hash_)
, date_(date_)
, admin_id_(admin_id_)
, participant_id_(participant_id_)
, g_a_or_b_(std::move(g_a_or_b_))
, key_fingerprint_(key_fingerprint_) {
}
static const int32 ID = -94974410;
int32 get_id() const {
return ID;
}
void store(TlStorerCalcLength &s) const {
(void)sizeof(s);
TlStoreBinary::store(id_, s);
TlStoreBinary::store(access_hash_, s);
TlStoreBinary::store(date_, s);
TlStoreBinary::store(admin_id_, s);
TlStoreBinary::store(participant_id_, s);
TlStoreString::store(g_a_or_b_, s);
TlStoreBinary::store(key_fingerprint_, s);
}
void store(TlStorerUnsafe &s) const {
(void)sizeof(s);
TlStoreBinary::store(id_, s);
TlStoreBinary::store(access_hash_, s);
TlStoreBinary::store(date_, s);
TlStoreBinary::store(admin_id_, s);
TlStoreBinary::store(participant_id_, s);
TlStoreString::store(g_a_or_b_, s);
TlStoreBinary::store(key_fingerprint_, s);
}
};
class messages_sentEncryptedMessage final {
public:
int32 date_{};
messages_sentEncryptedMessage() = default;
explicit messages_sentEncryptedMessage(int32 date_) : date_(date_) {
}
static const int32 ID = 1443858741;
int32 get_id() const {
return ID;
}
void store(TlStorerCalcLength &s) const {
(void)sizeof(s);
TlStoreBinary::store(date_, s);
}
void store(TlStorerUnsafe &s) const {
(void)sizeof(s);
TlStoreBinary::store(date_, s);
}
};
} // namespace my_api
namespace td {
static int32 g = 3;
static string prime_base64 =
"xxyuucaxyQSObFIvcPE_c5gNQCOOPiHBSTTQN1Y9kw9IGYoKp8FAWCKUk9IlMPTb-jNvbgrJJROVQ67UTM58NyD9UfaUWHBaxozU_mtrE6vcl0ZRKW"
"kyhFTxj6-MWV9kJHf-lrsqlB1bzR1KyMxJiAcI-ps3jjxPOpBgvuZ8-aSkppWBEFGQfhYnU7VrD2tBDbp02KhLKhSzFE4O8ShHVP0X7ZUNWWW0ud1G"
"WC2xF40WnGvEZbDW_5yjko_vW5rk5Bj8Feg-vqD4f6n_Xu1wBQ3tKEn0e_lZ2VaFDOkphR8NgRX2NbEF7i5OFdBLJFS_b0-t8DSxBAMRnNjjuS_MW"
"w";
class FakeDhCallback : public DhCallback {
public:
int is_good_prime(Slice prime_str) const override {
auto it = cache.find(prime_str.str());
if (it == cache.end()) {
return -1;
}
return it->second;
}
void add_good_prime(Slice prime_str) const override {
cache[prime_str.str()] = 1;
}
void add_bad_prime(Slice prime_str) const override {
cache[prime_str.str()] = 0;
}
mutable std::map<string, int> cache;
};
class FakeBinlog
: public BinlogInterface
, public Actor {
public:
FakeBinlog() {
register_actor("FakeBinlog", this).release();
}
void force_sync(Promise<> promise) override {
if (pending_events_.empty()) {
pending_events_.emplace_back();
}
pending_events_.back().promises_.push_back(std::move(promise));
pending_events_.back().sync_flag = true;
request_sync();
}
void request_sync() {
if (!has_request_sync) {
has_request_sync = true;
if (Random::fast(0, 4) == 0) {
set_timeout_in(Random::fast(0, 99) / 100.0 * 0.005 + 0.001);
} else {
yield();
}
}
}
void force_flush() override {
}
uint64 next_id() override {
auto res = last_id_;
last_id_++;
return res;
}
uint64 next_id(int32 shift) override {
auto res = last_id_;
last_id_ += shift;
return res;
}
template <class F>
void for_each(const F &f) {
events_processor_.for_each([&](auto &x) {
LOG(INFO) << "REPLAY: " << x.id_;
f(x);
});
}
void restart() {
has_request_sync = false;
cancel_timeout();
for (auto &pending : pending_events_) {
auto &event = pending.event;
if (!event.empty()) {
// LOG(ERROR) << "FORGET EVENT: " << event.id_ << " " << event;
}
}
pending_events_.clear();
}
void change_key(DbKey key, Promise<> promise) override {
}
protected:
void close_impl(Promise<> promise) override {
}
void close_and_destroy_impl(Promise<> promise) override {
}
void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) override {
auto event = BinlogEvent(std::move(raw_event), info);
LOG(INFO) << "ADD EVENT: " << event.id_ << " " << event;
pending_events_.emplace_back();
pending_events_.back().event = std::move(event);
pending_events_.back().promises_.push_back(std::move(promise));
}
void do_force_sync() {
if (pending_events_.empty()) {
return;
}
cancel_timeout();
has_request_sync = false;
auto pos = static_cast<size_t>(Random::fast_uint64() % pending_events_.size());
// pos = pending_events_.size() - 1;
std::vector<Promise<>> promises;
for (size_t i = 0; i <= pos; i++) {
auto &pending = pending_events_[i];
auto event = std::move(pending.event);
if (!event.empty()) {
LOG(INFO) << "SAVE EVENT: " << event.id_ << " " << event;
events_processor_.add_event(std::move(event)).ensure();
}
append(promises, std::move(pending.promises_));
}
pending_events_.erase(pending_events_.begin(), pending_events_.begin() + pos + 1);
for (auto &promise : promises) {
promise.set_value(Unit());
}
for (auto &event : pending_events_) {
if (event.sync_flag) {
request_sync();
break;
}
}
}
void timeout_expired() override {
do_force_sync();
}
void wakeup() override {
if (has_request_sync) {
do_force_sync();
}
}
bool has_request_sync = false;
uint64 last_id_ = 1;
detail::BinlogEventsProcessor events_processor_;
struct PendingEvent {
BinlogEvent event;
bool sync_flag = false;
std::vector<Promise<>> promises_;
};
std::vector<PendingEvent> pending_events_;
};
using FakeKeyValue = BinlogKeyValue<BinlogInterface>;
class Master;
class FakeSecretChatContext : public SecretChatActor::Context {
public:
FakeSecretChatContext(std::shared_ptr<BinlogInterface> binlog, std::shared_ptr<KeyValueSyncInterface> key_value,
std::shared_ptr<bool> close_flag, ActorShared<Master> master)
: binlog_(std::move(binlog))
, key_value_(std::move(key_value))
, close_flag_(std::move(close_flag))
, master_(std::move(master)) {
secret_chat_db_ = std::make_shared<SecretChatDb>(key_value_, 1);
net_query_creator_.stop_check(); // :(
}
DhCallback *dh_callback() override {
return &fake_dh_callback_;
}
NetQueryCreator &net_query_creator() override {
return net_query_creator_;
}
int32 unix_time() override {
return static_cast<int32>(std::time(nullptr));
}
bool close_flag() override {
return *close_flag_;
}
BinlogInterface *binlog() override {
return binlog_.get();
}
SecretChatDb *secret_chat_db() override {
return secret_chat_db_.get();
}
std::shared_ptr<DhConfig> dh_config() override {
static auto config = [] {
DhConfig dh_config;
dh_config.version = 12;
dh_config.g = g;
dh_config.prime = base64url_decode(prime_base64).move_as_ok();
return std::make_shared<DhConfig>(dh_config);
}();
return config;
}
void set_dh_config(std::shared_ptr<DhConfig> dh_config) override {
// empty
}
bool get_config_option_boolean(const string &name) const override {
return false;
}
// We don't want to expose the whole NetQueryDispatcher, MessagesManager and ContactsManager.
// So it is more clear which parts of MessagesManager is really used. And it is much easier to create tests.
void send_net_query(NetQueryPtr query, ActorShared<NetQueryCallback> callback, bool ordered) override;
void on_update_secret_chat(int64 access_hash, UserId user_id, SecretChatState state, bool is_outbound, int32 ttl,
int32 date, string key_hash, int32 layer) override {
}
void on_inbound_message(UserId user_id, MessageId message_id, int32 date,
tl_object_ptr<telegram_api::encryptedFile> file,
tl_object_ptr<secret_api::decryptedMessage> message, Promise<>) override;
void on_send_message_error(int64 random_id, Status error, Promise<>) override;
void on_send_message_ack(int64 random_id) override;
void on_send_message_ok(int64 random_id, MessageId message_id, int32 date,
tl_object_ptr<telegram_api::EncryptedFile> file, Promise<>) override;
void on_delete_messages(std::vector<int64> random_id, Promise<>) override;
void on_flush_history(MessageId, Promise<>) override;
void on_read_message(int64, Promise<>) override;
void on_screenshot_taken(UserId user_id, MessageId message_id, int32 date, int64 random_id,
Promise<> promise) override {
}
void on_set_ttl(UserId user_id, MessageId message_id, int32 date, int32 ttl, int64 random_id,
Promise<> promise) override {
}
private:
FakeDhCallback fake_dh_callback_;
static NetQueryCreator net_query_creator_;
std::shared_ptr<BinlogInterface> binlog_;
std::shared_ptr<KeyValueSyncInterface> key_value_;
std::shared_ptr<bool> close_flag_;
ActorShared<Master> master_;
std::shared_ptr<SecretChatDb> secret_chat_db_;
};
NetQueryCreator FakeSecretChatContext::net_query_creator_;
class Master : public Actor {
public:
explicit Master(Status *status) : status_(status) {
}
class SecretChatProxy : public Actor {
public:
SecretChatProxy(string name, ActorShared<Master> parent) : name_(std::move(name)) {
binlog_ = std::make_shared<FakeBinlog>();
key_value_ = std::make_shared<FakeKeyValue>();
key_value_->external_init_begin(LogEvent::HandlerType::BinlogPmcMagic);
key_value_->external_init_finish(binlog_);
close_flag_ = std::make_shared<bool>(false);
parent_ = parent.get();
parent_token_ = parent.token();
actor_ = create_actor<SecretChatActor>(
PSLICE() << "SecretChat " << name_, 123,
td::make_unique<FakeSecretChatContext>(binlog_, key_value_, close_flag_, std::move(parent)), true);
on_binlog_replay_finish();
}
ActorOwn<SecretChatActor> actor_;
void add_inbound_message(int32 chat_id, BufferSlice data, uint64 crc) {
CHECK(crc64(data.as_slice()) == crc);
auto event = make_unique<logevent::InboundSecretMessage>();
event->qts = 0;
event->chat_id = chat_id;
event->date = 0;
event->encrypted_message = std::move(data);
event->qts_ack = PromiseCreator::lambda(
[actor_id = actor_id(this), chat_id, data = event->encrypted_message.copy(), crc](Result<> result) mutable {
if (result.is_ok()) {
LOG(INFO) << "FINISH add_inbound_message " << tag("crc", crc);
return;
}
LOG(INFO) << "RESEND add_inbound_message " << tag("crc", crc) << result.error();
send_closure(actor_id, &SecretChatProxy::add_inbound_message, chat_id, std::move(data), crc);
});
add_event(Event::delayed_closure(&SecretChatActor::add_inbound_message, std::move(event)));
}
void send_message(tl_object_ptr<secret_api::DecryptedMessage> message) {
BufferSlice serialized_message(serialize(*message));
auto resend_promise = PromiseCreator::lambda(
[actor_id = actor_id(this), serialized_message = std::move(serialized_message)](Result<> result) mutable {
TlBufferParser parser(&serialized_message);
auto message = secret_api::decryptedMessage::fetch(parser);
if (result.is_ok()) {
LOG(INFO) << "FINISH send_message " << tag("message", to_string(message));
return;
}
LOG(INFO) << "RESEND send_message " << tag("message", to_string(message)) << result.error();
CHECK(serialize(*message) == serialized_message.as_slice());
send_closure(actor_id, &SecretChatProxy::send_message, std::move(message));
});
auto sync_promise = PromiseCreator::lambda([actor_id = actor_id(this), generation = this->binlog_generation_,
resend_promise = std::move(resend_promise)](Result<> result) mutable {
if (result.is_error()) {
resend_promise.set_error(result.move_as_error());
return;
}
send_closure(actor_id, &SecretChatProxy::sync_binlog, generation, std::move(resend_promise));
});
add_event(
Event::delayed_closure(&SecretChatActor::send_message, std::move(message), nullptr, std::move(sync_promise)));
}
int32 binlog_generation_ = 0;
void sync_binlog(int32 binlog_generation, Promise<> promise) {
if (binlog_generation != binlog_generation_) {
return promise.set_error(Status::Error("Binlog generation mismatch"));
}
binlog_->force_sync(std::move(promise));
}
void on_closed() {
LOG(INFO) << "CLOSED";
ready_ = false;
*close_flag_ = false;
key_value_ = std::make_shared<FakeKeyValue>();
key_value_->external_init_begin(LogEvent::HandlerType::BinlogPmcMagic);
std::vector<BinlogEvent> events;
binlog_generation_++;
binlog_->restart();
binlog_->for_each([&](const BinlogEvent &event) {
if (event.type_ == LogEvent::HandlerType::BinlogPmcMagic) {
key_value_->external_init_handle(event);
} else {
events.push_back(event.clone());
}
});
key_value_->external_init_finish(binlog_);
actor_ = create_actor<SecretChatActor>(
PSLICE() << "SecretChat " << name_, 123,
td::make_unique<FakeSecretChatContext>(binlog_, key_value_, close_flag_,
ActorShared<Master>(parent_, parent_token_)),
true);
for (auto &event : events) {
CHECK(event.type_ == LogEvent::HandlerType::SecretChats);
auto r_message = logevent::SecretChatEvent::from_buffer_slice(event.data_as_buffer_slice());
LOG_IF(FATAL, r_message.is_error()) << "Failed to deserialize event: " << r_message.error();
auto message = r_message.move_as_ok();
message->set_logevent_id(event.id_);
LOG(INFO) << "Process binlog event " << *message;
switch (message->get_type()) {
case logevent::SecretChatEvent::Type::InboundSecretMessage:
send_closure_later(actor_, &SecretChatActor::replay_inbound_message,
unique_ptr<logevent::InboundSecretMessage>(
static_cast<logevent::InboundSecretMessage *>(message.release())));
break;
case logevent::SecretChatEvent::Type::OutboundSecretMessage:
send_closure_later(actor_, &SecretChatActor::replay_outbound_message,
unique_ptr<logevent::OutboundSecretMessage>(
static_cast<logevent::OutboundSecretMessage *>(message.release())));
break;
default:
UNREACHABLE();
}
};
start_test();
on_binlog_replay_finish();
}
void on_binlog_replay_finish() {
ready_ = true;
LOG(INFO) << "Finish replay binlog";
send_closure(actor_, &SecretChatActor::binlog_replay_finish);
for (auto &event : pending_events_) {
send_event(actor_, std::move(event));
}
pending_events_.clear();
}
void start_test() {
set_timeout_in(Random::fast(50, 99) * 0.3 / 50);
events_cnt_ = 0;
}
private:
string name_;
ActorId<Master> parent_;
uint64 parent_token_;
std::shared_ptr<FakeBinlog> binlog_;
std::shared_ptr<FakeKeyValue> key_value_;
std::shared_ptr<bool> close_flag_;
int events_cnt_ = 0;
std::vector<Event> pending_events_;
bool ready_ = false;
bool is_active() {
return !actor_.empty() && ready_;
}
void add_event(Event event) {
events_cnt_++;
if (is_active()) {
LOG(INFO) << "EMIT";
send_event(actor_, std::move(event));
} else {
LOG(INFO) << "DELAY";
pending_events_.push_back(std::move(event));
}
}
int32 bad_cnt_ = 0;
void timeout_expired() override {
LOG(INFO) << "TIMEOUT EXPIRED";
if (events_cnt_ < 4) {
bad_cnt_++;
CHECK(bad_cnt_ < 10);
} else {
bad_cnt_ = 0;
}
*close_flag_ = true;
actor_.reset();
}
};
auto &get_by_id(uint64 id) {
if (id == 1) {
return alice_;
} else {
return bob_;
}
}
auto &from() {
return get_by_id(get_link_token());
}
auto &to() {
return get_by_id(3 - get_link_token());
}
void start_up() override {
auto old_context = set_context(std::make_shared<Global>());
alice_ = create_actor<SecretChatProxy>("SecretChatProxy alice", "alice", actor_shared(this, 1));
bob_ = create_actor<SecretChatProxy>("SecretChatProxy bob", "bob", actor_shared(this, 2));
send_closure(alice_->get_actor_unsafe()->actor_, &SecretChatActor::create_chat, 2, 0, 123,
PromiseCreator::lambda([actor_id = actor_id(this)](Result<SecretChatId> res) {
send_closure(actor_id, &Master::got_secret_chat_id, std::move(res), 0);
}));
}
void got_secret_chat_id(Result<SecretChatId> res, int) { // second parameter is needed to workaround clang bug
CHECK(res.is_ok());
auto id = res.move_as_ok();
LOG(INFO) << "SecretChatId = " << id;
}
bool can_fail(NetQueryPtr &query) {
static int cnt = 20;
if (cnt > 0) {
cnt--;
return false;
}
if (query->tl_constructor() == telegram_api::messages_sendEncrypted::ID ||
query->tl_constructor() == telegram_api::messages_sendEncryptedFile::ID) {
return true;
}
return false;
}
void send_net_query(NetQueryPtr query, ActorShared<NetQueryCallback> callback, bool ordered) {
if (can_fail(query) && Random::fast(0, 1) == 0) {
LOG(INFO) << "Fail query " << query;
auto resend_promise =
PromiseCreator::lambda([id = actor_shared(this, get_link_token()), callback_actor = callback.get(),
callback_token = callback.token()](Result<NetQueryPtr> r_net_query) mutable {
if (r_net_query.is_error()) {
id.release();
return;
}
send_closure(std::move(id), &Master::send_net_query, r_net_query.move_as_ok(),
ActorShared<NetQueryCallback>(callback_actor, callback_token), true);
});
query->set_error(Status::Error(429, "Test error"));
send_closure(std::move(callback), &NetQueryCallback::on_result_resendable, std::move(query),
std::move(resend_promise));
return;
} else {
LOG(INFO) << "Do not fail " << query;
}
auto query_slice = query->query().clone();
if (query->gzip_flag() == NetQuery::GzipFlag::On) {
query_slice = gzdecode(query_slice.as_slice());
}
TlBufferParser parser(&query_slice);
//auto object = telegram_api::Function::fetch(parser);
//LOG(INFO) << query_slice.size();
//parser.get_status().ensure();
my_api::downcast_call(parser, [&](auto &object) {
this->process_net_query(std::move(object), std::move(query), std::move(callback));
});
}
template <class T>
void process_net_query(T &&object, NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
LOG(FATAL) << "Unsupported query: " << to_string(object);
}
void process_net_query(my_api::messages_getDhConfig &&get_dh_config, NetQueryPtr net_query,
ActorShared<NetQueryCallback> callback) {
//LOG(INFO) << "Got query " << to_string(get_dh_config);
my_api::messages_dhConfig config;
config.p_ = BufferSlice(base64url_decode(prime_base64).move_as_ok());
config.g_ = g;
config.version_ = 12;
auto storer = TLObjectStorer<my_api::messages_dhConfig>(config);
BufferSlice answer(storer.size());
auto real_size = storer.store(answer.as_slice().ubegin());
CHECK(real_size == answer.size());
net_query->set_ok(std::move(answer));
send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
}
void process_net_query(my_api::messages_requestEncryption &&request_encryption, NetQueryPtr net_query,
ActorShared<NetQueryCallback> callback) {
CHECK(get_link_token() == 1);
send_closure(alice_->get_actor_unsafe()->actor_, &SecretChatActor::update_chat,
make_tl_object<telegram_api::encryptedChatWaiting>(123, 321, 0, 1, 2));
send_closure(
bob_->get_actor_unsafe()->actor_, &SecretChatActor::update_chat,
make_tl_object<telegram_api::encryptedChatRequested>(123, 321, 0, 1, 2, request_encryption.g_a_.clone()));
net_query->clear();
}
void process_net_query(my_api::messages_acceptEncryption &&request_encryption, NetQueryPtr net_query,
ActorShared<NetQueryCallback> callback) {
CHECK(get_link_token() == 2);
send_closure(alice_->get_actor_unsafe()->actor_, &SecretChatActor::update_chat,
make_tl_object<telegram_api::encryptedChat>(123, 321, 0, 1, 2, request_encryption.g_b_.clone(),
request_encryption.key_fingerprint_));
my_api::encryptedChat encrypted_chat(123, 321, 0, 1, 2, BufferSlice(), request_encryption.key_fingerprint_);
auto storer = TLObjectStorer<my_api::encryptedChat>(encrypted_chat);
BufferSlice answer(storer.size());
auto real_size = storer.store(answer.as_slice().ubegin());
CHECK(real_size == answer.size());
net_query->set_ok(std::move(answer));
send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
send_closure(alice_, &SecretChatProxy::start_test);
send_closure(bob_, &SecretChatProxy::start_test);
send_ping(1, 5000);
set_timeout_in(1);
}
void timeout_expired() override {
send_message(1, "oppa");
send_message(2, "appo");
set_timeout_in(1);
}
void send_ping(int id, int cnt) {
if (cnt % 200 == 0) {
LOG(ERROR) << "Send ping " << tag("id", id) << tag("cnt", cnt);
} else {
LOG(INFO) << "Send ping " << tag("id", id) << tag("cnt", cnt);
}
string text = PSTRING() << "PING: " << cnt;
send_message(id, std::move(text));
}
void send_message(int id, string text) {
auto random_id = Random::secure_int64();
LOG(INFO) << "Send message: " << tag("id", id) << tag("text", text) << tag("random_id", random_id);
sent_messages_[random_id] = Message{id, text};
send_closure(get_by_id(id), &SecretChatProxy::send_message,
secret_api::make_object<secret_api::decryptedMessage>(0, random_id, 0, text, Auto(), Auto(), Auto(),
Auto(), 0));
}
void process_net_query(my_api::messages_sendEncryptedService &&message, NetQueryPtr net_query,
ActorShared<NetQueryCallback> callback) {
process_net_query_send_enrypted(std::move(message.data_), std::move(net_query), std::move(callback));
}
void process_net_query(my_api::messages_sendEncrypted &&message, NetQueryPtr net_query,
ActorShared<NetQueryCallback> callback) {
process_net_query_send_enrypted(std::move(message.data_), std::move(net_query), std::move(callback));
}
void process_net_query_send_enrypted(BufferSlice data, NetQueryPtr net_query,
ActorShared<NetQueryCallback> callback) {
my_api::messages_sentEncryptedMessage sent_message;
sent_message.date_ = 0;
auto storer = TLObjectStorer<my_api::messages_sentEncryptedMessage>(sent_message);
BufferSlice answer(storer.size());
auto real_size = storer.store(answer.as_slice().ubegin());
CHECK(real_size == answer.size());
net_query->set_ok(std::move(answer));
send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
// We can't loose updates yet :(
auto crc = crc64(data.as_slice());
LOG(INFO) << "Send SecretChatProxy::add_inbound_message" << tag("crc", crc);
send_closure(to(), &SecretChatProxy::add_inbound_message, narrow_cast<int32>(3 - get_link_token()), std::move(data),
crc);
}
int32 last_ping_ = std::numeric_limits<int32>::max();
void on_inbound_message(string message, Promise<> promise) {
promise.set_value(Unit());
LOG(INFO) << "GOT INBOUND MESSAGE: " << message << " " << get_link_token();
int32 cnt;
int x = std::sscanf(message.c_str(), "PING: %d", &cnt);
if (x != 1) {
return;
}
if (cnt == 0) {
Scheduler::instance()->finish();
*status_ = Status::OK();
return;
}
if (cnt >= last_ping_) {
return;
}
last_ping_ = cnt;
send_ping(narrow_cast<int32>(get_link_token()), cnt - 1);
}
void on_send_message_error(int64 random_id, Status error, Promise<> promise) {
promise.set_value(Unit());
LOG(INFO) << "Receive send message error: " << tag("random_id", random_id) << error;
auto it = sent_messages_.find(random_id);
if (it == sent_messages_.end()) {
LOG(INFO) << "TODO: try to fix errors about message after it is sent";
return;
}
CHECK(it != sent_messages_.end());
auto message = it->second;
// sent_messages_.erase(it);
send_message(message.id, message.text);
}
void on_send_message_ok(int64 random_id, Promise<> promise) {
promise.set_value(Unit());
LOG(INFO) << "Receive send message ok: " << tag("random_id", random_id);
auto it = sent_messages_.find(random_id);
if (it == sent_messages_.end()) {
LOG(INFO) << "TODO: try to fix errors about message after it is sent";
return;
}
CHECK(it != sent_messages_.end());
// sent_messages_.erase(it);
}
private:
Status *status_;
ActorOwn<SecretChatProxy> alice_;
ActorOwn<SecretChatProxy> bob_;
struct Message {
int32 id;
string text;
};
std::map<int64, Message> sent_messages_;
void hangup_shared() override {
LOG(INFO) << "GOT HANGUP: " << get_link_token();
send_closure(from(), &SecretChatProxy::on_closed);
}
};
void FakeSecretChatContext::send_net_query(NetQueryPtr query, ActorShared<NetQueryCallback> callback, bool ordered) {
send_closure(master_, &Master::send_net_query, std::move(query), std::move(callback), ordered);
}
void FakeSecretChatContext::on_inbound_message(UserId user_id, MessageId message_id, int32 date,
tl_object_ptr<telegram_api::encryptedFile> file,
tl_object_ptr<secret_api::decryptedMessage> message, Promise<> promise) {
send_closure(master_, &Master::on_inbound_message, message->message_, std::move(promise));
}
void FakeSecretChatContext::on_send_message_error(int64 random_id, Status error, Promise<> promise) {
send_closure(master_, &Master::on_send_message_error, random_id, std::move(error), std::move(promise));
}
void FakeSecretChatContext::on_send_message_ack(int64 random_id) {
}
void FakeSecretChatContext::on_send_message_ok(int64 random_id, MessageId message_id, int32 date,
tl_object_ptr<telegram_api::EncryptedFile> file, Promise<> promise) {
send_closure(master_, &Master::on_send_message_ok, random_id, std::move(promise));
}
void FakeSecretChatContext::on_delete_messages(std::vector<int64> random_id, Promise<> promise) {
promise.set_value(Unit());
}
void FakeSecretChatContext::on_flush_history(MessageId, Promise<> promise) {
promise.set_error(Status::Error("Unsupported"));
}
void FakeSecretChatContext::on_read_message(int64, Promise<> promise) {
promise.set_error(Status::Error("Unsupported"));
}
TEST(Secret, go) {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
ConcurrentScheduler sched;
int threads_n = 0;
sched.init(threads_n);
Status result;
sched.create_actor_unsafe<Master>(0, "HandshakeTestActor", &result).release();
sched.start();
while (sched.run_main(10)) {
// empty;
}
sched.finish();
if (result.is_error()) {
LOG(ERROR) << result;
}
ASSERT_TRUE(result.is_ok());
}
} // namespace td