// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once #include "td/utils/port/UdpSocketFd.h" #include "td/utils/buffer.h" #include "td/utils/optional.h" #include "td/utils/VectorQueue.h" namespace td { #if TD_PORT_POSIX namespace detail { class UdpWriter { public: static Status write_once(UdpSocketFd& fd, VectorQueue& queue) TD_WARN_UNUSED_RESULT { std::array messages; auto to_send = queue.as_span(); size_t to_send_n = std::min(messages.size(), to_send.size()); to_send.truncate(to_send_n); for (size_t i = 0; i < to_send_n; i++) { messages[i].to = &to_send[i].address; messages[i].data = to_send[i].data.as_slice(); } size_t cnt; auto status = fd.send_messages(::td::Span(messages).truncate(to_send_n), cnt); queue.pop_n(cnt); return status; } }; class UdpReaderHelper { public: void init_inbound_message(UdpSocketFd::InboundMessage& message) { message.from = &message_.address; message.error = &message_.error; if (buffer_.size() < MAX_PACKET_SIZE) { buffer_ = BufferSlice(RESERVED_SIZE); } CHECK(buffer_.size() >= MAX_PACKET_SIZE); message.data = buffer_.as_slice().truncate(MAX_PACKET_SIZE); } UdpMessage extract_udp_message(UdpSocketFd::InboundMessage& message) { message_.data = buffer_.from_slice(message.data); auto size = message_.data.size(); size = (size + 7) & ~7; CHECK(size <= MAX_PACKET_SIZE); buffer_.confirm_read(size); return std::move(message_); } private: enum : size_t { MAX_PACKET_SIZE = 2048, RESERVED_SIZE = MAX_PACKET_SIZE * 8 }; UdpMessage message_; BufferSlice buffer_; }; //One for thread is enough class UdpReader { public: UdpReader() { for (size_t i = 0; i < messages_.size(); i++) { helpers_[i].init_inbound_message(messages_[i]); } } Status read_once(UdpSocketFd& fd, VectorQueue& queue) TD_WARN_UNUSED_RESULT { for (size_t i = 0; i < messages_.size(); i++) { CHECK(messages_[i].data.size() == 2048); } size_t cnt = 0; auto status = fd.receive_messages(messages_, cnt); for (size_t i = 0; i < cnt; i++) { queue.push(helpers_[i].extract_udp_message(messages_[i])); helpers_[i].init_inbound_message(messages_[i]); } for (size_t i = cnt; i < messages_.size(); i++) { CHECK(messages_[i].data.size() == 2048) << " cnt = " << cnt << " i = " << i << " size = " << messages_[i].data.size() << " status = " << status; ; } if (status.is_error() && !UdpSocketFd::is_critical_read_error(status)) { queue.push(UdpMessage{{}, {}, std::move(status)}); } return status; } private: enum : size_t { BUFFER_SIZE = 16 }; std::array messages_; std::array helpers_; }; } // namespace detail #endif class BufferedUdp : public UdpSocketFd { public: BufferedUdp(UdpSocketFd fd) : UdpSocketFd(std::move(fd)) { } #if TD_PORT_POSIX Result> receive() { if (input_.empty() && can_read(*this)) { TRY_STATUS(flush_read_once()); } if (input_.empty()) { return optional(); } return input_.pop(); } void send(UdpMessage message) { output_.push(std::move(message)); } Status flush_send() { Status status; while (status.is_ok() && can_write(*this) && !output_.empty()) { status = flush_send_once(); } return status; } #endif UdpSocketFd move_as_udp_socket_fd() { return std::move(as_fd()); } UdpSocketFd& as_fd() { return *static_cast(this); } private: #if TD_PORT_POSIX VectorQueue input_; VectorQueue output_; VectorQueue& input() { return input_; } VectorQueue& output() { return output_; } Status flush_send_once() TD_WARN_UNUSED_RESULT { return detail::UdpWriter::write_once(as_fd(), output_); } Status flush_read_once() TD_WARN_UNUSED_RESULT { init_thread_local(udp_reader_); return udp_reader_->read_once(as_fd(), input_); } static TD_THREAD_LOCAL detail::UdpReader* udp_reader_; #endif }; } // namespace td