93494cf048
GitOrigin-RevId: f99e2e838220826d3dea7fdc33c43ac9918a5efd
163 lines
5.1 KiB
C++
163 lines
5.1 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/mtproto/RawConnection.h"
|
|
|
|
#include "td/mtproto/AuthKey.h"
|
|
#include "td/mtproto/Transport.h"
|
|
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/Status.h"
|
|
#include "td/utils/StorerBase.h"
|
|
|
|
#include <utility>
|
|
|
|
namespace td {
|
|
namespace mtproto {
|
|
|
|
void RawConnection::send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
|
|
uint64 quick_ack_token) {
|
|
PacketInfo info;
|
|
info.version = 2;
|
|
info.no_crypto_flag = false;
|
|
info.salt = salt;
|
|
info.session_id = session_id;
|
|
|
|
auto packet = BufferWriter{Transport::write(storer, auth_key, &info), transport_->max_prepend_size(),
|
|
transport_->max_append_size()};
|
|
Transport::write(storer, auth_key, &info, packet.as_slice());
|
|
|
|
bool use_quick_ack = false;
|
|
if (quick_ack_token != 0 && transport_->support_quick_ack()) {
|
|
auto tmp = quick_ack_to_token_.insert(std::make_pair(info.message_ack, quick_ack_token));
|
|
if (tmp.second) {
|
|
use_quick_ack = true;
|
|
} else {
|
|
LOG(ERROR) << "Quick ack collision " << tag("quick_ack", info.message_ack);
|
|
}
|
|
}
|
|
|
|
transport_->write(std::move(packet), use_quick_ack);
|
|
}
|
|
|
|
uint64 RawConnection::send_no_crypto(const Storer &storer) {
|
|
PacketInfo info;
|
|
|
|
info.no_crypto_flag = true;
|
|
auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), transport_->max_prepend_size(),
|
|
transport_->max_append_size()};
|
|
Transport::write(storer, AuthKey(), &info, packet.as_slice());
|
|
LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice());
|
|
transport_->write(std::move(packet), false);
|
|
return info.message_id;
|
|
}
|
|
|
|
Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) {
|
|
auto r = socket_fd_.flush_read();
|
|
if (r.is_ok()) {
|
|
if (stats_callback_) {
|
|
stats_callback_->on_read(r.ok());
|
|
}
|
|
callback.on_read(r.ok());
|
|
}
|
|
while (transport_->can_read()) {
|
|
BufferSlice packet;
|
|
uint32 quick_ack = 0;
|
|
TRY_RESULT(wait_size, transport_->read_next(&packet, &quick_ack));
|
|
if (!is_aligned_pointer<4>(packet.as_slice().ubegin())) {
|
|
BufferSlice new_packet(packet.size());
|
|
new_packet.as_slice().copy_from(packet.as_slice());
|
|
packet = std::move(new_packet);
|
|
}
|
|
LOG_CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin()))
|
|
<< packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size;
|
|
if (wait_size != 0) {
|
|
constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024;
|
|
if (wait_size > MAX_PACKET_SIZE) {
|
|
return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size);
|
|
}
|
|
break;
|
|
}
|
|
|
|
if (quick_ack != 0) {
|
|
on_quick_ack(quick_ack, callback);
|
|
continue;
|
|
}
|
|
|
|
PacketInfo info;
|
|
info.version = 2;
|
|
|
|
TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info));
|
|
switch (read_result.type()) {
|
|
case Transport::ReadResult::Quickack: {
|
|
TRY_STATUS(on_quick_ack(read_result.quick_ack(), callback));
|
|
break;
|
|
}
|
|
case Transport::ReadResult::Error: {
|
|
TRY_STATUS(on_read_mtproto_error(read_result.error()));
|
|
break;
|
|
}
|
|
case Transport::ReadResult::Packet: {
|
|
// If a packet was successfully decrypted, then it is ok to assume that the connection is alive
|
|
if (!auth_key.empty()) {
|
|
if (stats_callback_) {
|
|
stats_callback_->on_pong();
|
|
}
|
|
}
|
|
|
|
TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet())));
|
|
break;
|
|
}
|
|
case Transport::ReadResult::Nop:
|
|
break;
|
|
default:
|
|
UNREACHABLE();
|
|
}
|
|
}
|
|
|
|
TRY_STATUS(std::move(r));
|
|
return Status::OK();
|
|
}
|
|
|
|
Status RawConnection::on_read_mtproto_error(int32 error_code) {
|
|
if (error_code == -429) {
|
|
if (stats_callback_) {
|
|
stats_callback_->on_mtproto_error();
|
|
}
|
|
return Status::Error(500, PSLICE() << "Mtproto error: " << error_code);
|
|
}
|
|
if (error_code == -404) {
|
|
return Status::Error(-404, PSLICE() << "Mtproto error: " << error_code);
|
|
}
|
|
return Status::Error(PSLICE() << "Mtproto error: " << error_code);
|
|
}
|
|
|
|
Status RawConnection::on_quick_ack(uint32 quick_ack, Callback &callback) {
|
|
auto it = quick_ack_to_token_.find(quick_ack);
|
|
if (it == quick_ack_to_token_.end()) {
|
|
LOG(WARNING) << Status::Error(PSLICE() << "Unknown " << tag("quick_ack", quick_ack));
|
|
return Status::OK();
|
|
// TODO: return Status::Error(PSLICE() << "Unknown " << tag("quick_ack", quick_ack));
|
|
}
|
|
auto token = it->second;
|
|
quick_ack_to_token_.erase(it);
|
|
callback.on_quick_ack(token);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status RawConnection::flush_write() {
|
|
TRY_RESULT(size, socket_fd_.flush_write());
|
|
if (size > 0 && stats_callback_) {
|
|
stats_callback_->on_write(size);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace mtproto
|
|
} // namespace td
|