0xdd quick ack
GitOrigin-RevId: 3e35df493f46f91cd1cf81671819848d9640fedd
This commit is contained in:
parent
3827fb91eb
commit
69cf867d47
@ -76,51 +76,69 @@ Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (quick_ack != 0) {
|
if (quick_ack != 0) {
|
||||||
auto it = quick_ack_to_token_.find(quick_ack);
|
on_quick_ack(quick_ack, callback);
|
||||||
if (it == quick_ack_to_token_.end()) {
|
|
||||||
LOG(WARNING) << Status::Error(PSLICE() << "Unknown " << tag("quick_ack", quick_ack));
|
|
||||||
continue;
|
|
||||||
// TODO: return Status::Error(PSLICE() << "Unknown " << tag("quick_ack", quick_ack));
|
|
||||||
}
|
|
||||||
auto token = it->second;
|
|
||||||
quick_ack_to_token_.erase(it);
|
|
||||||
callback.on_quick_ack(token);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
MutableSlice data = packet.as_slice();
|
|
||||||
PacketInfo info;
|
PacketInfo info;
|
||||||
info.version = 2;
|
info.version = 2;
|
||||||
|
|
||||||
int32 error_code = 0;
|
TRY_RESULT(read_result, mtproto::Transport::read(packet.as_slice(), auth_key, &info));
|
||||||
TRY_STATUS(mtproto::Transport::read(data, auth_key, &info, &data, &error_code));
|
switch (read_result.type()) {
|
||||||
|
case mtproto::Transport::ReadResult::Quickack: {
|
||||||
if (error_code) {
|
TRY_STATUS(on_quick_ack(read_result.quick_ack(), callback));
|
||||||
if (error_code == -429) {
|
break;
|
||||||
if (stats_callback_) {
|
}
|
||||||
stats_callback_->on_mtproto_error();
|
case mtproto::Transport::ReadResult::Error: {
|
||||||
|
TRY_STATUS(on_read_mtproto_error(read_result.error()));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case mtproto::Transport::ReadResult::Packet: {
|
||||||
|
// If a packet was successfully decrypted, then it is ok to assume that the connection is alive
|
||||||
|
if (!auth_key.empty()) {
|
||||||
|
if (stats_callback_) {
|
||||||
|
stats_callback_->on_pong();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Status::Error(500, PSLICE() << "Mtproto error: " << error_code);
|
|
||||||
}
|
|
||||||
if (error_code == -404) {
|
|
||||||
return Status::Error(-404, PSLICE() << "Mtproto error: " << error_code);
|
|
||||||
}
|
|
||||||
return Status::Error(PSLICE() << "Mtproto error: " << error_code);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If a packet was successfully decrypted, then it is ok to assume that the connection is alive
|
TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet())));
|
||||||
if (!auth_key.empty()) {
|
break;
|
||||||
if (stats_callback_) {
|
|
||||||
stats_callback_->on_pong();
|
|
||||||
}
|
}
|
||||||
|
case mtproto::Transport::ReadResult::Nop:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(data)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TRY_STATUS(std::move(r));
|
TRY_STATUS(std::move(r));
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status RawConnection::on_read_mtproto_error(int32 error_code) {
|
||||||
|
if (error_code == -429) {
|
||||||
|
if (stats_callback_) {
|
||||||
|
stats_callback_->on_mtproto_error();
|
||||||
|
}
|
||||||
|
return Status::Error(500, PSLICE() << "Mtproto error: " << error_code);
|
||||||
|
}
|
||||||
|
if (error_code == -404) {
|
||||||
|
return Status::Error(-404, PSLICE() << "Mtproto error: " << error_code);
|
||||||
|
}
|
||||||
|
return Status::Error(PSLICE() << "Mtproto error: " << error_code);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status RawConnection::on_quick_ack(uint32 quick_ack, Callback &callback) {
|
||||||
|
auto it = quick_ack_to_token_.find(quick_ack);
|
||||||
|
if (it == quick_ack_to_token_.end()) {
|
||||||
|
LOG(WARNING) << Status::Error(PSLICE() << "Unknown " << tag("quick_ack", quick_ack));
|
||||||
|
return Status::OK();
|
||||||
|
// TODO: return Status::Error(PSLICE() << "Unknown " << tag("quick_ack", quick_ack));
|
||||||
|
}
|
||||||
|
auto token = it->second;
|
||||||
|
quick_ack_to_token_.erase(it);
|
||||||
|
callback.on_quick_ack(token);
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
Status RawConnection::flush_write() {
|
Status RawConnection::flush_write() {
|
||||||
TRY_RESULT(size, socket_fd_.flush_write());
|
TRY_RESULT(size, socket_fd_.flush_write());
|
||||||
if (size > 0 && stats_callback_) {
|
if (size > 0 && stats_callback_) {
|
||||||
|
@ -122,6 +122,9 @@ class RawConnection {
|
|||||||
Status flush_read(const AuthKey &auth_key, Callback &callback);
|
Status flush_read(const AuthKey &auth_key, Callback &callback);
|
||||||
Status flush_write();
|
Status flush_write();
|
||||||
|
|
||||||
|
Status on_quick_ack(uint32 quick_ack, Callback &callback);
|
||||||
|
Status on_read_mtproto_error(int32 error_code);
|
||||||
|
|
||||||
Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
|
Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
|
||||||
if (has_error_) {
|
if (has_error_) {
|
||||||
return Status::Error("Connection has already failed");
|
return Status::Error("Connection has already failed");
|
||||||
|
@ -285,29 +285,41 @@ Result<uint64> Transport::read_auth_key_id(Slice message) {
|
|||||||
return as<uint64>(message.begin());
|
return as<uint64>(message.begin());
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Transport::read(MutableSlice message, const AuthKey &auth_key, PacketInfo *info, MutableSlice *data,
|
Result<Transport::ReadResult> Transport::read(MutableSlice message, const AuthKey &auth_key, PacketInfo *info) {
|
||||||
int32 *error_code) {
|
if (message.size() < 16) {
|
||||||
if (message.size() < 8) {
|
if (message.size() < 4) {
|
||||||
if (message.size() >= 4) {
|
return Status::Error(PSLICE() << "Invalid mtproto message: smaller than 4 bytes [size=" << message.size() << "]");
|
||||||
*error_code = as<int32>(message.begin());
|
|
||||||
return Status::OK();
|
|
||||||
}
|
}
|
||||||
return Status::Error(PSLICE() << "Invalid mtproto message: smaller than 8 bytes [size=" << message.size() << "]");
|
|
||||||
|
auto code = as<int32>(message.begin());
|
||||||
|
if (code == 0) {
|
||||||
|
return ReadResult::make_nop();
|
||||||
|
} else if (code == -1) {
|
||||||
|
if (message.size() >= 8) {
|
||||||
|
return ReadResult::make_quick_ack(as<uint32>(message.begin() + 4));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return ReadResult::make_error(code);
|
||||||
|
}
|
||||||
|
return Status::Error(PSLICE() << "Invalid small mtproto message");
|
||||||
}
|
}
|
||||||
|
|
||||||
info->auth_key_id = as<int64>(message.begin());
|
info->auth_key_id = as<int64>(message.begin());
|
||||||
info->no_crypto_flag = info->auth_key_id == 0;
|
info->no_crypto_flag = info->auth_key_id == 0;
|
||||||
|
MutableSlice data;
|
||||||
if (info->type == PacketInfo::EndToEnd) {
|
if (info->type == PacketInfo::EndToEnd) {
|
||||||
return read_e2e_crypto(message, auth_key, info, data);
|
TRY_STATUS(read_e2e_crypto(message, auth_key, info, &data));
|
||||||
}
|
}
|
||||||
if (info->no_crypto_flag) {
|
if (info->no_crypto_flag) {
|
||||||
return read_no_crypto(message, info, data);
|
TRY_STATUS(read_no_crypto(message, info, &data));
|
||||||
} else {
|
} else {
|
||||||
if (auth_key.empty()) {
|
if (auth_key.empty()) {
|
||||||
return Status::Error("Failed to decrypt mtproto message: auth key is empty");
|
return Status::Error("Failed to decrypt mtproto message: auth key is empty");
|
||||||
}
|
}
|
||||||
return read_crypto(message, auth_key, info, data);
|
TRY_STATUS(read_crypto(message, auth_key, info, &data));
|
||||||
}
|
}
|
||||||
}
|
return ReadResult::make_packet(data);
|
||||||
|
} // namespace mtproto
|
||||||
|
|
||||||
size_t Transport::write(const Storer &storer, const AuthKey &auth_key, PacketInfo *info, MutableSlice dest) {
|
size_t Transport::write(const Storer &storer, const AuthKey &auth_key, PacketInfo *info, MutableSlice dest) {
|
||||||
if (info->type == PacketInfo::EndToEnd) {
|
if (info->type == PacketInfo::EndToEnd) {
|
||||||
|
@ -137,6 +137,57 @@ struct PacketInfo {
|
|||||||
|
|
||||||
class Transport {
|
class Transport {
|
||||||
public:
|
public:
|
||||||
|
class ReadResult {
|
||||||
|
public:
|
||||||
|
enum Type { Packet, Nop, Error, Quickack };
|
||||||
|
|
||||||
|
static ReadResult make_nop() {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
static ReadResult make_error(int32 error_code) {
|
||||||
|
ReadResult res;
|
||||||
|
res.type_ = Error;
|
||||||
|
res.error_code_ = error_code;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
static ReadResult make_packet(MutableSlice packet) {
|
||||||
|
CHECK(!packet.empty());
|
||||||
|
ReadResult res;
|
||||||
|
res.type_ = Packet;
|
||||||
|
res.packet_ = packet;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
static ReadResult make_quick_ack(uint32 quick_ack) {
|
||||||
|
ReadResult res;
|
||||||
|
res.type_ = Quickack;
|
||||||
|
res.quick_ack_ = quick_ack;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
Type type() const {
|
||||||
|
return type_;
|
||||||
|
}
|
||||||
|
|
||||||
|
MutableSlice packet() const {
|
||||||
|
CHECK(type_ == Packet);
|
||||||
|
return packet_;
|
||||||
|
}
|
||||||
|
uint32 quick_ack() const {
|
||||||
|
CHECK(type_ == Quickack);
|
||||||
|
return quick_ack_;
|
||||||
|
}
|
||||||
|
int32 error() const {
|
||||||
|
CHECK(type_ == Error);
|
||||||
|
return error_code_;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Type type_ = Nop;
|
||||||
|
MutableSlice packet_;
|
||||||
|
int32 error_code_;
|
||||||
|
uint32 quick_ack_;
|
||||||
|
};
|
||||||
|
|
||||||
static Result<uint64> read_auth_key_id(Slice message);
|
static Result<uint64> read_auth_key_id(Slice message);
|
||||||
|
|
||||||
// Reads mtproto packet from [message] and saves into [data].
|
// Reads mtproto packet from [message] and saves into [data].
|
||||||
@ -145,8 +196,7 @@ class Transport {
|
|||||||
// Returns size of mtproto packet.
|
// Returns size of mtproto packet.
|
||||||
// If dest.size() >= size, the packet is also written into [dest].
|
// If dest.size() >= size, the packet is also written into [dest].
|
||||||
// If auth_key is nonempty, encryption will be used.
|
// If auth_key is nonempty, encryption will be used.
|
||||||
static Status read(MutableSlice message, const AuthKey &auth_key, PacketInfo *info, MutableSlice *data,
|
static Result<ReadResult> read(MutableSlice message, const AuthKey &auth_key, PacketInfo *info) TD_WARN_UNUSED_RESULT;
|
||||||
int32 *error_code) TD_WARN_UNUSED_RESULT;
|
|
||||||
|
|
||||||
static size_t write(const Storer &storer, const AuthKey &auth_key, PacketInfo *info,
|
static size_t write(const Storer &storer, const AuthKey &auth_key, PacketInfo *info,
|
||||||
MutableSlice dest = MutableSlice());
|
MutableSlice dest = MutableSlice());
|
||||||
|
@ -805,7 +805,7 @@ Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSl
|
|||||||
|
|
||||||
BufferSlice encrypted_message_copy;
|
BufferSlice encrypted_message_copy;
|
||||||
int32 mtproto_version = -1;
|
int32 mtproto_version = -1;
|
||||||
int32 error_code = 0;
|
Result<mtproto::Transport::ReadResult> r_read_result;
|
||||||
for (size_t i = 0; i < versions.size(); i++) {
|
for (size_t i = 0; i < versions.size(); i++) {
|
||||||
bool is_last = i + 1 == versions.size();
|
bool is_last = i + 1 == versions.size();
|
||||||
encrypted_message_copy = encrypted_message.copy();
|
encrypted_message_copy = encrypted_message.copy();
|
||||||
@ -817,19 +817,31 @@ Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSl
|
|||||||
mtproto_version = versions[i];
|
mtproto_version = versions[i];
|
||||||
info.version = mtproto_version;
|
info.version = mtproto_version;
|
||||||
info.is_creator = auth_state_.x == 0;
|
info.is_creator = auth_state_.x == 0;
|
||||||
auto status = mtproto::Transport::read(data, *auth_key, &info, &data, &error_code);
|
r_read_result = mtproto::Transport::read(data, *auth_key, &info);
|
||||||
if (is_last) {
|
if (!is_last && r_read_result.is_error()) {
|
||||||
TRY_STATUS(std::move(status));
|
LOG(WARNING) << tag("mtproto", mtproto_version) << " decryption failed " << r_read_result.error();
|
||||||
} else if (status.is_error()) {
|
|
||||||
LOG(WARNING) << tag("mtproto", mtproto_version) << " decryption failed " << status;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
TRY_RESULT(read_result, std::move(r_read_result));
|
||||||
if (error_code) {
|
switch (read_result.type()) {
|
||||||
return Status::Error(PSLICE() << "Got mtproto error code: " << error_code);
|
case mtproto::Transport::ReadResult::Quickack: {
|
||||||
|
return Status::Error("Got quickack instead of a message");
|
||||||
|
}
|
||||||
|
case mtproto::Transport::ReadResult::Error: {
|
||||||
|
return Status::Error(PSLICE() << "Got mtproto error code instead of a message: " << read_result.error());
|
||||||
|
}
|
||||||
|
case mtproto::Transport::ReadResult::Nop: {
|
||||||
|
return Status::Error("Got nop instead of a message");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case mtproto::Transport::ReadResult::Packet: {
|
||||||
|
data = read_result.packet();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto len = as<int32>(data.begin());
|
auto len = as<int32>(data.begin());
|
||||||
data = data.substr(4, len);
|
data = data.substr(4, len);
|
||||||
if (!is_aligned_pointer<4>(data.data())) {
|
if (!is_aligned_pointer<4>(data.data())) {
|
||||||
|
@ -14,7 +14,7 @@ int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8;
|
|||||||
Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) {
|
Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) {
|
||||||
TlParser parser(raw_event.as_slice());
|
TlParser parser(raw_event.as_slice());
|
||||||
size_ = parser.fetch_int();
|
size_ = parser.fetch_int();
|
||||||
CHECK(size_ == raw_event.size());
|
CHECK(size_ == raw_event.size()) << size_ << " " << raw_event.size();
|
||||||
id_ = parser.fetch_long();
|
id_ = parser.fetch_long();
|
||||||
type_ = parser.fetch_int();
|
type_ = parser.fetch_int();
|
||||||
flags_ = parser.fetch_int();
|
flags_ = parser.fetch_int();
|
||||||
|
Loading…
Reference in New Issue
Block a user