BufferdStdin: support posix

GitOrigin-RevId: eac3b2429f6680fb6246fecefc8bed7b4c48bd14
This commit is contained in:
Arseny Smirnov 2018-09-10 18:09:08 +03:00
parent 6329ebf6bc
commit 982369c495
3 changed files with 65 additions and 72 deletions

View File

@ -633,28 +633,19 @@ class CliClient final : public Actor {
close_flag_ = true;
dump_memory_usage();
td_.reset();
#if TD_WINDOWS
stdin_reader_.reset();
#else
unsubscribe(stdin_.get_poll_info().get_pollable_fd_ref());
is_stdin_reader_stopped_ = true;
#endif
yield();
}
#ifdef USE_READLINE
FileFd stdin_;
#else
using StreamConnection = BufferedFd<FileFd>;
StreamConnection stdin_;
#endif
BufferedStdin stdin_;
static CliClient *instance_;
#ifdef USE_READLINE
/* Callback function called for each line when accept-line executed, EOF
* seen, or EOF character read. This sets a flag and returns; it could
* also call exit. */
static void cb_linehandler(char *line) {
static void static_add_cmd(char *line) {
/* Can use ^D (stty eof) to exit. */
if (line == nullptr) {
LOG(FATAL) << "closed";
@ -666,6 +657,9 @@ class CliClient final : public Actor {
instance_->add_cmd(line);
rl_free(line);
}
static int static_getc(FILE *) {
return instance_->stdin_getc();
}
#endif
unique_ptr<TdCallback> make_td_callback() {
@ -746,63 +740,14 @@ class CliClient final : public Actor {
init_td();
#if TD_WINDOWS
auto stdin_id = Scheduler::instance()->sched_count() - 1;
class StdinReader : public Actor {
public:
explicit StdinReader(ActorShared<CliClient> parent) : parent_(std::move(parent)) {
}
void start_up() override {
stdin_ = &Stdin();
set_timeout_in(0.001);
}
void timeout_expired() override {
std::array<char, 100> buf;
auto t_res = stdin_->read(MutableSlice(buf.data(), buf.size()));
if (t_res.is_error()) {
LOG(FATAL) << "Can't read from stdin";
}
auto res = t_res.ok();
VLOG(fd) << res << " " << string(buf.data(), res);
data_.append(string(buf.data(), res));
process();
set_timeout_in(0.05);
}
private:
FileFd *stdin_ = nullptr;
string data_;
ActorShared<CliClient> parent_;
void process() {
while (true) {
auto pos = data_.find('\n');
if (pos == string::npos) {
break;
}
auto cmd = string(data_.data(), pos);
while (!cmd.empty() && cmd.back() == '\r') {
cmd.pop_back();
}
send_closure(parent_, &CliClient::on_cmd, cmd);
data_ = data_.substr(pos + 1);
}
}
};
stdin_reader_ = create_actor_on_scheduler<StdinReader>("stdin_reader", stdin_id, actor_shared(this, 1));
#else
Stdin().get_native_fd().set_is_blocking(false).ensure();
#ifdef USE_READLINE
deactivate_readline();
rl_callback_handler_install(prompt, cb_linehandler);
rl_getc_function = static_getc;
rl_callback_handler_install(prompt, static_add_cmd);
rl_attempted_completion_function = tg_cli_completion;
reactivate_readline();
stdin_ = std::move(Stdin());
#else
stdin_ = StreamConnection(std::move(Stdin()));
#endif
subscribe(stdin_.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
#endif
if (get_chat_list_) {
send_request(make_tl_object<td_api::getChats>(std::numeric_limits<int64>::max(), 0, 100));
@ -3403,15 +3348,12 @@ class CliClient final : public Actor {
inited_ = true;
init();
}
#if !TD_WINDOWS
stdin_.flush_read().ensure();
#ifdef USE_READLINE
if (can_read(stdin_)) {
while (!stdin_.input_buffer().empty()) {
rl_callback_read_char();
stdin_.get_poll_info().clear_flags(PollFlags::Read());
}
#else
auto r = stdin_.flush_read();
CHECK(r.is_ok());
while (true) {
auto cmd = process_stdin(&stdin_.input_buffer());
if (cmd.is_error()) {
@ -3426,7 +3368,6 @@ class CliClient final : public Actor {
cmd_queue_.pop();
on_cmd(std::move(cmd));
}
#endif
if (ready_to_stop_ && close_flag_ && is_stdin_reader_stopped_) {
#ifdef USE_READLINE
@ -3480,6 +3421,15 @@ class CliClient final : public Actor {
void add_cmd(string cmd) {
cmd_queue_.push(std::move(cmd));
}
int stdin_getc() {
auto slice = stdin_.input_buffer().prepare_read();
if (slice.empty()) {
return 0;
}
int res = slice[0];
stdin_.input_buffer().confirm_read(1);
return res;
}
std::unordered_map<int32, double> being_downloaded_files_;
@ -3496,10 +3446,6 @@ class CliClient final : public Actor {
bool disable_network_ = false;
int api_id_ = 0;
std::string api_hash_;
#if TD_WINDOWS
ActorOwn<> stdin_reader_;
#endif
};
CliClient *CliClient::instance_ = nullptr;
@ -3534,6 +3480,7 @@ void main(int argc, char **argv) {
std::locale::global(new_locale);
SCOPE_EXIT {
std::locale::global(std::locale::classic());
log_interface = default_log_interface;
};
CliLog cli_log;

View File

@ -7,6 +7,7 @@
#pragma once
#include "td/utils/port/FileFd.h"
#include "td/utils/buffer.h"
namespace td {
@ -14,4 +15,48 @@ FileFd &Stdin();
FileFd &Stdout();
FileFd &Stderr();
class BufferedStdin {
public:
BufferedStdin() {
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
file_fd_.get_native_fd().set_is_blocking(false);
}
~BufferedStdin() {
file_fd_.move_as_native_fd().release();
}
ChainBufferReader &input_buffer() {
return reader_;
}
PollableFdInfo &get_poll_info() {
return file_fd_.get_poll_info();
}
const PollableFdInfo &get_poll_info() const {
return file_fd_.get_poll_info();
}
Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT {
size_t result = 0;
while (::td::can_read(*this) && max_read) {
MutableSlice slice = writer_.prepare_append().truncate(max_read);
TRY_RESULT(x, file_fd_.read(slice));
slice.truncate(x);
writer_.confirm_append(x);
result += x;
max_read -= x;
}
if (result) {
reader_.sync_with_writer();
}
return result;
}
private:
FileFd file_fd_;
ChainBufferWriter writer_;
ChainBufferReader reader_ = writer_.extract_reader();
};
} // namespace td

View File

@ -12,6 +12,7 @@
#if TD_PORT_POSIX
#include <unistd.h>
#include <fcntl.h>
#endif
namespace td {