69cf867d47
GitOrigin-RevId: 3e35df493f46f91cd1cf81671819848d9640fedd
150 lines
4.3 KiB
C++
150 lines
4.3 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#pragma once
|
|
#include "td/mtproto/IStreamTransport.h"
|
|
|
|
#include "td/utils/buffer.h"
|
|
#include "td/utils/BufferedFd.h"
|
|
#include "td/utils/common.h"
|
|
#include "td/utils/port/Fd.h"
|
|
#include "td/utils/port/SocketFd.h"
|
|
#include "td/utils/Status.h"
|
|
|
|
#include "td/telegram/StateManager.h"
|
|
|
|
#include <map>
|
|
|
|
namespace td {
|
|
class Storer;
|
|
namespace mtproto {
|
|
class AuthKey;
|
|
struct PacketInfo;
|
|
} // namespace mtproto
|
|
} // namespace td
|
|
|
|
namespace td {
|
|
namespace mtproto {
|
|
class RawConnection {
|
|
public:
|
|
class StatsCallback {
|
|
public:
|
|
virtual ~StatsCallback() = default;
|
|
virtual void on_read(uint64 bytes) = 0;
|
|
virtual void on_write(uint64 bytes) = 0;
|
|
|
|
virtual void on_pong() = 0; // called when we know that connection is alive
|
|
virtual void on_error() = 0; // called on RawConnectin error. Such error should be very rare on good connections.
|
|
virtual void on_mtproto_error() = 0;
|
|
};
|
|
RawConnection() = default;
|
|
RawConnection(SocketFd socket_fd, TransportType transport_type, std::unique_ptr<StatsCallback> stats_callback)
|
|
: socket_fd_(std::move(socket_fd))
|
|
, transport_(create_transport(transport_type))
|
|
, stats_callback_(std::move(stats_callback)) {
|
|
transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer());
|
|
}
|
|
|
|
void set_connection_token(StateManager::ConnectionToken connection_token) {
|
|
connection_token_ = std::move(connection_token);
|
|
}
|
|
|
|
bool can_send() const {
|
|
return transport_->can_write();
|
|
}
|
|
TransportType get_transport_type() const {
|
|
return transport_->get_type();
|
|
}
|
|
void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
|
|
uint64 quick_ack_token = 0);
|
|
uint64 send_no_crypto(const Storer &storer);
|
|
|
|
Fd &get_pollable() {
|
|
return socket_fd_.get_fd();
|
|
}
|
|
StatsCallback *stats_callback() {
|
|
return stats_callback_.get();
|
|
}
|
|
|
|
class Callback {
|
|
public:
|
|
Callback() = default;
|
|
Callback(const Callback &) = delete;
|
|
Callback &operator=(const Callback &) = delete;
|
|
virtual ~Callback() = default;
|
|
virtual Status on_raw_packet(const PacketInfo &info, BufferSlice packet) = 0;
|
|
virtual Status on_quick_ack(uint64 quick_ack_token) {
|
|
return Status::Error("quick acks unsupported fully, but still used");
|
|
}
|
|
virtual Status before_write() {
|
|
return Status::OK();
|
|
}
|
|
};
|
|
|
|
// NB: After first returned error, all subsequent calls will return error too.
|
|
Status flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
|
|
auto status = do_flush(auth_key, callback);
|
|
if (status.is_error()) {
|
|
if (stats_callback_ && status.code() != 2) {
|
|
stats_callback_->on_error();
|
|
}
|
|
has_error_ = true;
|
|
}
|
|
return status;
|
|
}
|
|
|
|
bool has_error() const {
|
|
return has_error_;
|
|
}
|
|
|
|
void close() {
|
|
transport_.reset();
|
|
socket_fd_.close();
|
|
}
|
|
|
|
uint32 extra_{0};
|
|
string debug_str_;
|
|
double rtt_{0};
|
|
|
|
private:
|
|
BufferedFd<SocketFd> socket_fd_;
|
|
unique_ptr<IStreamTransport> transport_;
|
|
std::map<uint32, uint64> quick_ack_to_token_;
|
|
bool has_error_{false};
|
|
|
|
std::unique_ptr<StatsCallback> stats_callback_;
|
|
|
|
StateManager::ConnectionToken connection_token_;
|
|
|
|
Status flush_read(const AuthKey &auth_key, Callback &callback);
|
|
Status flush_write();
|
|
|
|
Status on_quick_ack(uint32 quick_ack, Callback &callback);
|
|
Status on_read_mtproto_error(int32 error_code);
|
|
|
|
Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
|
|
if (has_error_) {
|
|
return Status::Error("Connection has already failed");
|
|
}
|
|
|
|
// read/write
|
|
// EINVAL may be returned in linux kernel < 2.6.28. And on some new kernels too.
|
|
// just close connection and hope that read or write will not return this error too.
|
|
TRY_STATUS(socket_fd_.get_pending_error());
|
|
|
|
TRY_STATUS(flush_read(auth_key, callback));
|
|
TRY_STATUS(callback.before_write());
|
|
TRY_STATUS(flush_write());
|
|
if (can_close(socket_fd_)) {
|
|
return Status::Error("Connection closed");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
};
|
|
|
|
} // namespace mtproto
|
|
} // namespace td
|