Merge remote-tracking branch 'td/master'

This commit is contained in:
Andrea Cavalli 2021-10-29 00:31:20 +02:00
commit 9b0d54de26
47 changed files with 447 additions and 459 deletions

View File

@ -30,7 +30,7 @@ class HttpClient final : public HttpOutboundConnection::Callback {
addr.init_ipv4_port("127.0.0.1", 8082).ensure(); addr.init_ipv4_port("127.0.0.1", 8082).ensure();
auto fd = SocketFd::open(addr); auto fd = SocketFd::open(addr);
LOG_CHECK(fd.is_ok()) << fd.error(); LOG_CHECK(fd.is_ok()) << fd.error();
connection_ = create_actor<HttpOutboundConnection>("Connect", fd.move_as_ok(), SslStream{}, connection_ = create_actor<HttpOutboundConnection>("Connect", BufferedFd<SocketFd>(fd.move_as_ok()), SslStream{},
std::numeric_limits<size_t>::max(), 0, 0, std::numeric_limits<size_t>::max(), 0, 0,
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this))); ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
yield(); yield();

View File

@ -56,8 +56,8 @@ class Server final : public TcpListener::Callback {
LOG(ERROR) << "ACCEPT " << cnt++; LOG(ERROR) << "ACCEPT " << cnt++;
pos_++; pos_++;
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0); auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
create_actor_on_scheduler<HttpInboundConnection>("HttpInboundConnection", scheduler_id, std::move(fd), 1024 * 1024, create_actor_on_scheduler<HttpInboundConnection>("HttpInboundConnection", scheduler_id,
0, 0, BufferedFd<SocketFd>(std::move(fd)), 1024 * 1024, 0, 0,
create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id)) create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id))
.release(); .release();
} }

View File

@ -5,7 +5,6 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
// //
#include "td/net/HttpHeaderCreator.h" #include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpInboundConnection.h"
#include "td/net/TcpListener.h" #include "td/net/TcpListener.h"
#include "td/actor/actor.h" #include "td/actor/actor.h"
@ -22,7 +21,6 @@
namespace td { namespace td {
// HttpInboundConnection header
static int cnt = 0; static int cnt = 0;
class HelloWorld final : public Actor { class HelloWorld final : public Actor {
public: public:
@ -107,7 +105,7 @@ class Server final : public TcpListener::Callback {
LOG(ERROR) << "ACCEPT " << cnt++; LOG(ERROR) << "ACCEPT " << cnt++;
pos_++; pos_++;
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0); auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
create_actor_on_scheduler<HelloWorld>("HttpInboundConnection", scheduler_id, std::move(fd)).release(); create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id, std::move(fd)).release();
} }
void hangup() final { void hangup() final {
// may be it should be default?.. // may be it should be default?..

View File

@ -93,7 +93,7 @@ class Server final : public TcpListener::Callback {
void accept(SocketFd fd) final { void accept(SocketFd fd) final {
pos_++; pos_++;
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0); auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
create_actor_on_scheduler<HttpEchoConnection>("HttpInboundConnection", scheduler_id, std::move(fd)).release(); create_actor_on_scheduler<HttpEchoConnection>("HttpEchoConnection", scheduler_id, std::move(fd)).release();
} }
void hangup() final { void hangup() final {
LOG(ERROR) << "Hanging up.."; LOG(ERROR) << "Hanging up..";

View File

@ -17,15 +17,16 @@
// TODO: all return values must be checked // TODO: all return values must be checked
#include <atomic> #include <atomic>
#include <cstdio>
#include <cstdlib> #include <cstdlib>
#include <vector> #include <vector>
#if TD_PORT_POSIX
#include <pthread.h> #include <pthread.h>
#include <sched.h> #include <sched.h>
#include <semaphore.h> #include <semaphore.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
#endif
#if TD_LINUX #if TD_LINUX
#include <sys/eventfd.h> #include <sys/eventfd.h>
@ -34,17 +35,31 @@
#define MODE std::memory_order_relaxed #define MODE std::memory_order_relaxed
// void set_affinity(int mask) { // void set_affinity(int mask) {
// int err, syscallres; // pid_t pid = gettid();
// pid_t pid = gettid(); // int syscallres = syscall(__NR_sched_setaffinity, pid, sizeof(mask), &mask);
// syscallres = syscall(__NR_sched_setaffinity, pid, sizeof(mask), &mask); // if (syscallres) {
// if (syscallres) { // perror("Failed to set affinity");
// err = errno; // }
// perror("oppa"); // }
//}
//}
using qvalue_t = int; using qvalue_t = int;
class Backoff {
int cnt = 0;
public:
bool next() {
cnt++;
if (cnt < 50) {
return true;
} else {
td::this_thread::yield();
return cnt < 500;
}
}
};
#if TD_PORT_POSIX
// Just for testing, not production // Just for testing, not production
class PipeQueue { class PipeQueue {
int input; int input;
@ -77,24 +92,6 @@ class PipeQueue {
} }
}; };
class Backoff {
int cnt;
public:
Backoff() : cnt(0) {
}
bool next() {
cnt++;
if (cnt < 50) {
return true;
} else {
sched_yield();
return cnt < 500;
}
}
};
class VarQueue { class VarQueue {
std::atomic<qvalue_t> data{0}; std::atomic<qvalue_t> data{0};
@ -178,6 +175,7 @@ class SemQueue {
return get(); return get();
} }
}; };
#endif
#if TD_LINUX #if TD_LINUX
class EventfdQueue { class EventfdQueue {
@ -320,8 +318,7 @@ class BufferQueue {
return; return;
} }
if (!update_writer()) { if (!update_writer()) {
std::fprintf(stderr, "put strong failed\n"); LOG(FATAL) << "Put strong failed";
std::exit(0);
} }
put_unsafe(val); put_unsafe(val);
} }
@ -474,10 +471,9 @@ class FdQueue {
td::int64 x; td::int64 x;
wait_flag.store(1, MODE); wait_flag.store(1, MODE);
__sync_synchronize(); __sync_synchronize();
// std::fprintf(stderr, "!\n");
// while (res == -1 && read(fd, &x, sizeof(x)) == sizeof(x)) { // while (res == -1 && read(fd, &x, sizeof(x)) == sizeof(x)) {
// res = q.try_get(); // res = q.try_get();
//} // }
do { do {
__sync_synchronize(); __sync_synchronize();
res = q.try_get(); res = q.try_get();
@ -493,6 +489,7 @@ class FdQueue {
}; };
#endif #endif
#if TD_PORT_POSIX
class SemBackoffQueue { class SemBackoffQueue {
sem_t sem; sem_t sem;
VarQueue q; VarQueue q;
@ -568,15 +565,17 @@ class QueueBenchmark2 final : public td::Benchmark {
int server_active_connections; int server_active_connections;
int client_active_connections; int client_active_connections;
std::vector<td::int64> server_conn; td::vector<td::int64> server_conn;
std::vector<td::int64> client_conn; td::vector<td::int64> client_conn;
td::string name;
public: public:
explicit QueueBenchmark2(int connections_n = 1) : connections_n(connections_n) { QueueBenchmark2(int connections_n, td::string name) : connections_n(connections_n), name(std::move(name)) {
} }
std::string get_description() const final { td::string get_description() const final {
return "QueueBenchmark2"; return name;
} }
void start_up() final { void start_up() final {
@ -592,16 +591,8 @@ class QueueBenchmark2 final : public td::Benchmark {
void server_process(qvalue_t value) { void server_process(qvalue_t value) {
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24); auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24);
// std::fprintf(stderr, "-->%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != server_conn[co]++) { CHECK(no == server_conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(server_conn[co] - 1));
std::fprintf(stderr, "Server BUG\n");
while (true) {
}
}
// std::fprintf(stderr, "no = %d/%d\n", no, queries_n);
// std::fprintf(stderr, "answer: %d %d\n", no, co);
client.writer_put(value); client.writer_put(value);
client.writer_flush(); client.writer_flush();
@ -611,15 +602,12 @@ class QueueBenchmark2 final : public td::Benchmark {
} }
void *server_run(void *) { void *server_run(void *) {
server_conn = std::vector<td::int64>(connections_n); server_conn = td::vector<td::int64>(connections_n);
server_active_connections = connections_n; server_active_connections = connections_n;
while (server_active_connections > 0) { while (server_active_connections > 0) {
int cnt = server.reader_wait(); int cnt = server.reader_wait();
if (cnt == 0) { CHECK(cnt != 0);
std::fprintf(stderr, "ERROR!\n");
std::exit(0);
}
while (cnt-- > 0) { while (cnt-- > 0) {
server_process(server.reader_get_unsafe()); server_process(server.reader_get_unsafe());
server.reader_flush(); server.reader_flush();
@ -633,17 +621,9 @@ class QueueBenchmark2 final : public td::Benchmark {
void client_process(qvalue_t value) { void client_process(qvalue_t value) {
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24); auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24);
// std::fprintf(stderr, "<--%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != client_conn[co]++) { CHECK(no == client_conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(client_conn[co] - 1));
std::fprintf(stderr, "BUG\n");
while (true) {
}
std::exit(0);
}
if (no + 1 < queries_n) { if (no + 1 < queries_n) {
// std::fprintf(stderr, "query: %d %d\n", no + 1, co);
server.writer_put(value + 1); server.writer_put(value + 1);
server.writer_flush(); server.writer_flush();
} else { } else {
@ -652,12 +632,9 @@ class QueueBenchmark2 final : public td::Benchmark {
} }
void *client_run(void *) { void *client_run(void *) {
client_conn = std::vector<td::int64>(connections_n); client_conn = td::vector<td::int64>(connections_n);
client_active_connections = connections_n; client_active_connections = connections_n;
if (queries_n >= (1 << 24)) { CHECK(queries_n < (1 << 24));
std::fprintf(stderr, "Too big queries_n\n");
std::exit(0);
}
for (int i = 0; i < connections_n; i++) { for (int i = 0; i < connections_n; i++) {
server.writer_put(static_cast<qvalue_t>(i) << 24); server.writer_put(static_cast<qvalue_t>(i) << 24);
@ -666,10 +643,7 @@ class QueueBenchmark2 final : public td::Benchmark {
while (client_active_connections > 0) { while (client_active_connections > 0) {
int cnt = client.reader_wait(); int cnt = client.reader_wait();
if (cnt == 0) { CHECK(cnt != 0);
std::fprintf(stderr, "ERROR!\n");
std::exit(0);
}
while (cnt-- > 0) { while (cnt-- > 0) {
client_process(client.reader_get_unsafe()); client_process(client.reader_get_unsafe());
client.reader_flush(); client.reader_flush();
@ -709,12 +683,14 @@ class QueueBenchmark final : public td::Benchmark {
const int connections_n; const int connections_n;
int queries_n; int queries_n;
td::string name;
public: public:
explicit QueueBenchmark(int connections_n = 1) : connections_n(connections_n) { QueueBenchmark(int connections_n, td::string name) : connections_n(connections_n), name(std::move(name)) {
} }
std::string get_description() const final { td::string get_description() const final {
return "QueueBenchmark"; return name;
} }
void start_up() final { void start_up() final {
@ -728,21 +704,14 @@ class QueueBenchmark final : public td::Benchmark {
} }
void *server_run(void *) { void *server_run(void *) {
std::vector<td::int64> conn(connections_n); td::vector<td::int64> conn(connections_n);
int active_connections = connections_n; int active_connections = connections_n;
while (active_connections > 0) { while (active_connections > 0) {
qvalue_t value = server.get(); qvalue_t value = server.get();
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(value >> 24); auto co = static_cast<int>(value >> 24);
// std::fprintf(stderr, "-->%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != conn[co]++) { CHECK(no == conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1));
std::fprintf(stderr, "Server BUG\n");
while (true) {
}
}
// std::fprintf(stderr, "no = %d/%d\n", no, queries_n);
client.put(value); client.put(value);
if (no + 1 >= queries_n) { if (no + 1 >= queries_n) {
active_connections--; active_connections--;
@ -752,11 +721,8 @@ class QueueBenchmark final : public td::Benchmark {
} }
void *client_run(void *) { void *client_run(void *) {
std::vector<td::int64> conn(connections_n); td::vector<td::int64> conn(connections_n);
if (queries_n >= (1 << 24)) { CHECK(queries_n < (1 << 24));
std::fprintf(stderr, "Too big queries_n\n");
std::exit(0);
}
for (int i = 0; i < connections_n; i++) { for (int i = 0; i < connections_n; i++) {
server.put(static_cast<qvalue_t>(i) << 24); server.put(static_cast<qvalue_t>(i) << 24);
} }
@ -765,15 +731,8 @@ class QueueBenchmark final : public td::Benchmark {
qvalue_t value = client.get(); qvalue_t value = client.get();
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(value >> 24); auto co = static_cast<int>(value >> 24);
// std::fprintf(stderr, "<--%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != conn[co]++) { CHECK(no == conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1));
std::fprintf(stderr, "BUG\n");
while (true) {
}
std::exit(0);
}
if (no + 1 < queries_n) { if (no + 1 < queries_n) {
server.put(value + 1); server.put(value + 1);
} else { } else {
@ -785,11 +744,8 @@ class QueueBenchmark final : public td::Benchmark {
} }
void *client_run2(void *) { void *client_run2(void *) {
std::vector<td::int64> conn(connections_n); td::vector<td::int64> conn(connections_n);
if (queries_n >= (1 << 24)) { CHECK(queries_n < (1 << 24));
std::fprintf(stderr, "Too big queries_n\n");
std::exit(0);
}
for (int query = 0; query < queries_n; query++) { for (int query = 0; query < queries_n; query++) {
for (int i = 0; i < connections_n; i++) { for (int i = 0; i < connections_n; i++) {
server.put((static_cast<td::int64>(i) << 24) + query); server.put((static_cast<td::int64>(i) << 24) + query);
@ -798,15 +754,8 @@ class QueueBenchmark final : public td::Benchmark {
qvalue_t value = client.get(); qvalue_t value = client.get();
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(value >> 24); auto co = static_cast<int>(value >> 24);
// std::fprintf(stderr, "<--%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != conn[co]++) { CHECK(no == conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1));
std::fprintf(stderr, "BUG\n");
while (true) {
}
std::exit(0);
}
} }
} }
// system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq"); // system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq");
@ -848,7 +797,6 @@ class RingBenchmark final : public td::Benchmark {
void *run() { void *run() {
qvalue_t value; qvalue_t value;
// std::fprintf(stderr, "start %d\n", int_id);
do { do {
int cnt = queue.reader_wait(); int cnt = queue.reader_wait();
CHECK(cnt == 1); CHECK(cnt == 1);
@ -888,7 +836,6 @@ class RingBenchmark final : public td::Benchmark {
pthread_create(&q[i].id, nullptr, run_gateway, &q[i]); pthread_create(&q[i].id, nullptr, run_gateway, &q[i]);
} }
std::fprintf(stderr, "run %d\n", n);
if (n < 1000) { if (n < 1000) {
n = 1000; n = 1000;
} }
@ -900,11 +847,14 @@ class RingBenchmark final : public td::Benchmark {
} }
} }
}; };
#endif
void test_queue() { /*
std::vector<td::thread> threads; #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
static void test_queue() {
td::vector<td::thread> threads;
static constexpr size_t THREAD_COUNT = 100; static constexpr size_t THREAD_COUNT = 100;
std::vector<td::MpscPollableQueue<int>> queues(THREAD_COUNT); td::vector<td::MpscPollableQueue<int>> queues(THREAD_COUNT);
for (auto &q : queues) { for (auto &q : queues) {
q.init(); q.init();
} }
@ -920,58 +870,66 @@ void test_queue() {
}); });
} }
while (true) { for (size_t iter = 0; iter < THREAD_COUNT; iter++) {
td::usleep_for(100); td::usleep_for(100);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
queues[td::Random::fast(0, THREAD_COUNT - 1)].writer_put(1); queues[td::Random::fast(0, THREAD_COUNT - 1)].writer_put(1);
} }
} }
for (size_t i = 0; i < THREAD_COUNT; i++) {
threads[i].join();
}
} }
#endif
*/
int main() { int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
//test_queue(); // test_queue();
#define BENCH_Q2(Q, N) \ #endif
std::fprintf(stderr, "!%s %d:\t", #Q, N); \
td::bench(QueueBenchmark2<Q>(N));
#define BENCH_Q(Q, N) \
std::fprintf(stderr, "%s %d:\t", #Q, N); \
td::bench(QueueBenchmark<Q>(N));
#define BENCH_R(Q) \ #if TD_PORT_POSIX
std::fprintf(stderr, "%s:\t", #Q); \
td::bench(RingBenchmark<Q>());
// TODO: yield makes it extremely slow. Yet some backoff may be necessary. // TODO: yield makes it extremely slow. Yet some backoff may be necessary.
// BENCH_R(SemQueue); // td::bench(RingBenchmark<SemQueue>());
// BENCH_R(td::PollQueue<qvalue_t>); // td::bench(RingBenchmark<td::PollQueue<qvalue_t>>());
BENCH_Q2(td::PollQueue<qvalue_t>, 1); #define BENCH_Q2(Q, N) td::bench(QueueBenchmark2<Q<qvalue_t>>(N, #Q "(" #N ")"))
BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 1);
BENCH_Q2(td::PollQueue<qvalue_t>, 100);
BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 100);
BENCH_Q2(td::PollQueue<qvalue_t>, 10);
BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 10);
BENCH_Q(VarQueue, 1); #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
// BENCH_Q(FdQueue, 1); BENCH_Q2(td::InfBackoffQueue, 1);
// BENCH_Q(BufferedFdQueue, 1); BENCH_Q2(td::MpscPollableQueue, 1);
BENCH_Q2(td::PollQueue, 1);
BENCH_Q2(td::InfBackoffQueue, 10);
BENCH_Q2(td::MpscPollableQueue, 10);
BENCH_Q2(td::PollQueue, 10);
BENCH_Q2(td::InfBackoffQueue, 100);
BENCH_Q2(td::MpscPollableQueue, 100);
BENCH_Q2(td::PollQueue, 100);
BENCH_Q2(td::PollQueue, 4);
BENCH_Q2(td::PollQueue, 10);
BENCH_Q2(td::PollQueue, 100);
#endif
#define BENCH_Q(Q, N) td::bench(QueueBenchmark<Q>(N, #Q "(" #N ")"))
#if TD_LINUX
BENCH_Q(BufferQueue, 1);
BENCH_Q(BufferedFdQueue, 1);
BENCH_Q(FdQueue, 1);
#endif
BENCH_Q(PipeQueue, 1); BENCH_Q(PipeQueue, 1);
BENCH_Q(SemCheatQueue, 1); BENCH_Q(SemCheatQueue, 1);
BENCH_Q(SemQueue, 1); BENCH_Q(SemQueue, 1);
BENCH_Q(VarQueue, 1);
// BENCH_Q2(td::PollQueue<qvalue_t>, 100); #if TD_LINUX
// BENCH_Q2(td::PollQueue<qvalue_t>, 10); BENCH_Q(BufferQueue, 4);
// BENCH_Q2(td::PollQueue<qvalue_t>, 4); BENCH_Q(BufferQueue, 10);
// BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 100); BENCH_Q(BufferQueue, 100);
#endif
// BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 1); #endif
// BENCH_Q(SemCheatQueue, 1);
// BENCH_Q(BufferedFdQueue, 100);
// BENCH_Q(BufferedFdQueue, 10);
// BENCH_Q(BufferQueue, 4);
// BENCH_Q(BufferQueue, 100);
// BENCH_Q(BufferQueue, 10);
// BENCH_Q(BufferQueue, 1);
} }

View File

@ -525,7 +525,7 @@ function onOptionsChanged() {
} }
if (os_linux && linux_distro === 'Other') { if (os_linux && linux_distro === 'Other') {
var jdk = target === 'JNI' ? ', JDK ' : ''; var jdk = target === 'JNI' ? ', JDK ' : '';
var compiler = use_clang ? 'clang >= 3.4' : 'g++ >= 4.9.2'; var compiler = use_clang ? 'clang >= 3.4, libc++' : 'g++ >= 4.9.2';
pre_text.push('Install Git, ' + compiler + ', make, CMake >= 3.0.2, OpenSSL-dev, zlib-dev, gperf, PHP' + jdk + ' using your package manager.'); pre_text.push('Install Git, ' + compiler + ', make, CMake >= 3.0.2, OpenSSL-dev, zlib-dev, gperf, PHP' + jdk + ' using your package manager.');
} }
if (os_linux && os.includes('Node.js')) { if (os_linux && os.includes('Node.js')) {

View File

@ -112,8 +112,7 @@ class QueryImpl {
void do_store(StorerT &storer) const { void do_store(StorerT &storer) const {
storer.store_binary(query_.message_id); storer.store_binary(query_.message_id);
storer.store_binary(query_.seq_no); storer.store_binary(query_.seq_no);
Slice header = this->header_; Slice invoke_header;
Slice invoke_header = Slice();
// TODO(refactor): // TODO(refactor):
// invokeAfterMsg#cb9f372d {X:Type} msg_id:long query:!X = X; // invokeAfterMsg#cb9f372d {X:Type} msg_id:long query:!X = X;
@ -138,7 +137,7 @@ class QueryImpl {
const Storer &data_storer = const Storer &data_storer =
query_.gzip_flag ? static_cast<const Storer &>(gzip_storer) : static_cast<const Storer &>(plain_storer); query_.gzip_flag ? static_cast<const Storer &>(gzip_storer) : static_cast<const Storer &>(plain_storer);
auto invoke_header_storer = create_storer(invoke_header); auto invoke_header_storer = create_storer(invoke_header);
auto header_storer = create_storer(header); auto header_storer = create_storer(header_);
auto suff_storer = create_storer(invoke_header_storer, data_storer); auto suff_storer = create_storer(invoke_header_storer, data_storer);
auto all_storer = create_storer(header_storer, suff_storer); auto all_storer = create_storer(header_storer, suff_storer);

View File

@ -63,7 +63,7 @@ class RawConnection {
virtual ~Callback() = default; virtual ~Callback() = default;
virtual Status on_raw_packet(const PacketInfo &info, BufferSlice packet) = 0; virtual Status on_raw_packet(const PacketInfo &info, BufferSlice packet) = 0;
virtual Status on_quick_ack(uint64 quick_ack_token) { virtual Status on_quick_ack(uint64 quick_ack_token) {
return Status::Error("Quick acks unsupported fully, but still used"); return Status::Error("Quick acknowledgements are unsupported by the callback");
} }
virtual Status before_write() { virtual Status before_write() {
return Status::OK(); return Status::OK();

View File

@ -173,6 +173,7 @@ namespace mtproto {
*/ */
unique_ptr<RawConnection> SessionConnection::move_as_raw_connection() { unique_ptr<RawConnection> SessionConnection::move_as_raw_connection() {
was_moved_ = true;
return std::move(raw_connection_); return std::move(raw_connection_);
} }
@ -1007,7 +1008,10 @@ void SessionConnection::send_before(double tm) {
} }
Status SessionConnection::do_flush() { Status SessionConnection::do_flush() {
CHECK(raw_connection_); LOG_CHECK(raw_connection_) << was_moved_ << ' ' << state_ << ' ' << static_cast<int32>(mode_) << ' '
<< connected_flag_ << ' ' << is_main_ << ' ' << need_destroy_auth_key_ << ' '
<< sent_destroy_auth_key_ << ' ' << callback_ << ' ' << (Time::now() - created_at_) << ' '
<< (Time::now() - last_read_at_);
CHECK(state_ != Closed); CHECK(state_ != Closed);
if (state_ == Init) { if (state_ == Init) {
TRY_STATUS(init()); TRY_STATUS(init());

View File

@ -69,8 +69,13 @@ class SessionConnection final
: public Named : public Named
, private RawConnection::Callback { , private RawConnection::Callback {
public: public:
enum class Mode { Tcp, Http, HttpLongPoll }; enum class Mode : int32 { Tcp, Http, HttpLongPoll };
SessionConnection(Mode mode, unique_ptr<RawConnection> raw_connection, AuthData *auth_data); SessionConnection(Mode mode, unique_ptr<RawConnection> raw_connection, AuthData *auth_data);
SessionConnection(const SessionConnection &) = delete;
SessionConnection &operator=(const SessionConnection &) = delete;
SessionConnection(SessionConnection &&) = delete;
SessionConnection &operator=(SessionConnection &&) = delete;
~SessionConnection() = default;
PollableFdInfo &get_poll_info(); PollableFdInfo &get_poll_info();
unique_ptr<RawConnection> move_as_raw_connection(); unique_ptr<RawConnection> move_as_raw_connection();
@ -87,7 +92,6 @@ class SessionConnection final
void set_online(bool online_flag, bool is_main); void set_online(bool online_flag, bool is_main);
// Callback
class Callback { class Callback {
public: public:
Callback() = default; Callback() = default;
@ -132,6 +136,7 @@ class SessionConnection final
bool online_flag_ = false; bool online_flag_ = false;
bool is_main_ = false; bool is_main_ = false;
bool was_moved_ = false;
int rtt() const { int rtt() const {
return max(2, static_cast<int>(raw_connection_->extra().rtt * 1.5 + 1)); return max(2, static_cast<int>(raw_connection_->extra().rtt * 1.5 + 1));

View File

@ -129,7 +129,7 @@ class ClientManager::Impl final {
} }
auto response = receiver_.receive(0); auto response = receiver_.receive(0);
if (response.client_id == 0 && concurrent_scheduler_ != nullptr) { if (response.client_id == 0 && response.request_id == 0 && concurrent_scheduler_ != nullptr) {
concurrent_scheduler_->run_main(0); concurrent_scheduler_->run_main(0);
response = receiver_.receive(0); response = receiver_.receive(0);
} else { } else {
@ -188,7 +188,9 @@ class ClientManager::Impl final {
while (!tds_.empty() && !ExitGuard::is_exited()) { while (!tds_.empty() && !ExitGuard::is_exited()) {
receive(0.1); receive(0.1);
} }
concurrent_scheduler_->finish(); if (concurrent_scheduler_ != nullptr) {
concurrent_scheduler_->finish();
}
} }
private: private:

View File

@ -3129,20 +3129,23 @@ tl_object_ptr<td_api::chatStatisticsSupergroup> ContactsManager::convert_megagro
td::remove_if(obj->top_inviters_, td::remove_if(obj->top_inviters_,
[](auto &obj) { return !UserId(obj->user_id_).is_valid() || obj->invitations_ < 0; }); [](auto &obj) { return !UserId(obj->user_id_).is_valid() || obj->invitations_ < 0; });
auto top_senders = transform(std::move(obj->top_posters_), [this](auto &&top_poster) { auto top_senders =
return td_api::make_object<td_api::chatStatisticsMessageSenderInfo>( transform(std::move(obj->top_posters_), [this](tl_object_ptr<telegram_api::statsGroupTopPoster> &&top_poster) {
this->get_user_id_object(UserId(top_poster->user_id_), "get_top_senders"), top_poster->messages_, return td_api::make_object<td_api::chatStatisticsMessageSenderInfo>(
top_poster->avg_chars_); get_user_id_object(UserId(top_poster->user_id_), "get_top_senders"), top_poster->messages_,
}); top_poster->avg_chars_);
auto top_administrators = transform(std::move(obj->top_admins_), [this](auto &&top_admin) { });
return td_api::make_object<td_api::chatStatisticsAdministratorActionsInfo>( auto top_administrators =
this->get_user_id_object(UserId(top_admin->user_id_), "get_top_administrators"), top_admin->deleted_, transform(std::move(obj->top_admins_), [this](tl_object_ptr<telegram_api::statsGroupTopAdmin> &&top_admin) {
top_admin->kicked_, top_admin->banned_); return td_api::make_object<td_api::chatStatisticsAdministratorActionsInfo>(
}); get_user_id_object(UserId(top_admin->user_id_), "get_top_administrators"), top_admin->deleted_,
auto top_inviters = transform(std::move(obj->top_inviters_), [this](auto &&top_inviter) { top_admin->kicked_, top_admin->banned_);
return td_api::make_object<td_api::chatStatisticsInviterInfo>( });
this->get_user_id_object(UserId(top_inviter->user_id_), "get_top_inviters"), top_inviter->invitations_); auto top_inviters =
}); transform(std::move(obj->top_inviters_), [this](tl_object_ptr<telegram_api::statsGroupTopInviter> &&top_inviter) {
return td_api::make_object<td_api::chatStatisticsInviterInfo>(
get_user_id_object(UserId(top_inviter->user_id_), "get_top_inviters"), top_inviter->invitations_);
});
return make_tl_object<td_api::chatStatisticsSupergroup>( return make_tl_object<td_api::chatStatisticsSupergroup>(
convert_date_range(obj->period_), convert_stats_absolute_value(obj->members_), convert_date_range(obj->period_), convert_stats_absolute_value(obj->members_),
@ -14772,17 +14775,18 @@ void ContactsManager::get_dialog_participant(DialogId dialog_id, DialogId partic
} }
switch (dialog_id.get_type()) { switch (dialog_id.get_type()) {
case DialogType::User: case DialogType::User: {
if (participant_dialog_id == DialogId(get_my_id())) { auto my_user_id = get_my_id();
return promise.set_value( auto peer_user_id = dialog_id.get_user_id();
DialogParticipant{participant_dialog_id, dialog_id.get_user_id(), 0, DialogParticipantStatus::Member()}); if (participant_dialog_id == DialogId(my_user_id)) {
return promise.set_value(DialogParticipant::private_member(my_user_id, peer_user_id));
} }
if (participant_dialog_id == dialog_id) { if (participant_dialog_id == dialog_id) {
return promise.set_value( return promise.set_value(DialogParticipant::private_member(peer_user_id, my_user_id));
DialogParticipant{participant_dialog_id, get_my_id(), 0, DialogParticipantStatus::Member()});
} }
return promise.set_error(Status::Error(400, "Member not found")); return promise.set_error(Status::Error(400, "Member not found"));
}
case DialogType::Chat: case DialogType::Chat:
if (participant_dialog_id.get_type() != DialogType::User) { if (participant_dialog_id.get_type() != DialogType::User) {
return promise.set_value(DialogParticipant::left(participant_dialog_id)); return promise.set_value(DialogParticipant::left(participant_dialog_id));
@ -14791,15 +14795,13 @@ void ContactsManager::get_dialog_participant(DialogId dialog_id, DialogId partic
case DialogType::Channel: case DialogType::Channel:
return get_channel_participant(dialog_id.get_channel_id(), participant_dialog_id, std::move(promise)); return get_channel_participant(dialog_id.get_channel_id(), participant_dialog_id, std::move(promise));
case DialogType::SecretChat: { case DialogType::SecretChat: {
auto my_user_id = get_my_id();
auto peer_user_id = get_secret_chat_user_id(dialog_id.get_secret_chat_id()); auto peer_user_id = get_secret_chat_user_id(dialog_id.get_secret_chat_id());
if (participant_dialog_id == DialogId(get_my_id())) { if (participant_dialog_id == DialogId(my_user_id)) {
return promise.set_value(DialogParticipant{participant_dialog_id, return promise.set_value(DialogParticipant::private_member(my_user_id, peer_user_id));
peer_user_id.is_valid() ? peer_user_id : get_my_id(), 0,
DialogParticipantStatus::Member()});
} }
if (participant_dialog_id == DialogId(peer_user_id)) { if (peer_user_id.is_valid() && participant_dialog_id == DialogId(peer_user_id)) {
return promise.set_value( return promise.set_value(DialogParticipant::private_member(peer_user_id, my_user_id));
DialogParticipant{participant_dialog_id, get_my_id(), 0, DialogParticipantStatus::Member()});
} }
return promise.set_error(Status::Error(400, "Member not found")); return promise.set_error(Status::Error(400, "Member not found"));
@ -14815,42 +14817,18 @@ DialogParticipants ContactsManager::search_private_chat_participants(UserId my_u
const string &query, int32 limit, const string &query, int32 limit,
DialogParticipantsFilter filter) const { DialogParticipantsFilter filter) const {
vector<DialogId> dialog_ids; vector<DialogId> dialog_ids;
switch (filter.type) { if (filter.is_dialog_participant_suitable(td_, DialogParticipant::private_member(my_user_id, peer_user_id))) {
case DialogParticipantsFilter::Type::Contacts: dialog_ids.push_back(DialogId(my_user_id));
if (peer_user_id.is_valid() && is_user_contact(peer_user_id)) { }
dialog_ids.push_back(DialogId(peer_user_id)); if (peer_user_id.is_valid() && peer_user_id != my_user_id &&
} filter.is_dialog_participant_suitable(td_, DialogParticipant::private_member(peer_user_id, my_user_id))) {
break; dialog_ids.push_back(DialogId(peer_user_id));
case DialogParticipantsFilter::Type::Administrators:
break;
case DialogParticipantsFilter::Type::Members:
case DialogParticipantsFilter::Type::Mention:
dialog_ids.push_back(DialogId(my_user_id));
if (peer_user_id.is_valid() && peer_user_id != my_user_id) {
dialog_ids.push_back(DialogId(peer_user_id));
}
break;
case DialogParticipantsFilter::Type::Restricted:
break;
case DialogParticipantsFilter::Type::Banned:
break;
case DialogParticipantsFilter::Type::Bots:
if (td_->auth_manager_->is_bot()) {
dialog_ids.push_back(DialogId(my_user_id));
}
if (peer_user_id.is_valid() && is_user_bot(peer_user_id) && peer_user_id != my_user_id) {
dialog_ids.push_back(DialogId(peer_user_id));
}
break;
default:
UNREACHABLE();
} }
auto result = search_among_dialogs(dialog_ids, query, limit); auto result = search_among_dialogs(dialog_ids, query, limit);
return {result.first, transform(result.second, [&](DialogId dialog_id) { return {result.first, transform(result.second, [&](DialogId dialog_id) {
return DialogParticipant( auto user_id = dialog_id.get_user_id();
dialog_id, dialog_id == DialogId(my_user_id) && peer_user_id.is_valid() ? peer_user_id : my_user_id, 0, return DialogParticipant::private_member(user_id, user_id == my_user_id ? peer_user_id : my_user_id);
DialogParticipantStatus::Member());
})}; })};
} }
@ -14873,55 +14851,14 @@ void ContactsManager::search_dialog_participants(DialogId dialog_id, const strin
case DialogType::Chat: case DialogType::Chat:
return search_chat_participants(dialog_id.get_chat_id(), query, limit, filter, std::move(promise)); return search_chat_participants(dialog_id.get_chat_id(), query, limit, filter, std::move(promise));
case DialogType::Channel: { case DialogType::Channel: {
td_api::object_ptr<td_api::SupergroupMembersFilter> request_filter; auto channel_id = dialog_id.get_channel_id();
string additional_query; if (filter.has_query()) {
int32 additional_limit = 0; return get_channel_participants(channel_id, filter.get_supergroup_members_filter_object(query), string(), 0,
switch (filter.type) { limit, 0, std::move(promise));
case DialogParticipantsFilter::Type::Contacts: } else {
request_filter = td_api::make_object<td_api::supergroupMembersFilterContacts>(); return get_channel_participants(channel_id, filter.get_supergroup_members_filter_object(string()), query, 0,
break; 100, limit, std::move(promise));
case DialogParticipantsFilter::Type::Administrators:
request_filter = td_api::make_object<td_api::supergroupMembersFilterAdministrators>();
break;
case DialogParticipantsFilter::Type::Members:
request_filter = td_api::make_object<td_api::supergroupMembersFilterSearch>(query);
break;
case DialogParticipantsFilter::Type::Restricted:
request_filter = td_api::make_object<td_api::supergroupMembersFilterRestricted>(query);
break;
case DialogParticipantsFilter::Type::Banned:
request_filter = td_api::make_object<td_api::supergroupMembersFilterBanned>(query);
break;
case DialogParticipantsFilter::Type::Mention:
request_filter =
td_api::make_object<td_api::supergroupMembersFilterMention>(query, filter.top_thread_message_id.get());
break;
case DialogParticipantsFilter::Type::Bots:
request_filter = td_api::make_object<td_api::supergroupMembersFilterBots>();
break;
default:
UNREACHABLE();
} }
switch (filter.type) {
case DialogParticipantsFilter::Type::Contacts:
case DialogParticipantsFilter::Type::Administrators:
case DialogParticipantsFilter::Type::Bots:
additional_query = query;
additional_limit = limit;
limit = 100;
break;
case DialogParticipantsFilter::Type::Members:
case DialogParticipantsFilter::Type::Restricted:
case DialogParticipantsFilter::Type::Banned:
case DialogParticipantsFilter::Type::Mention:
// query is passed to the server request
break;
default:
UNREACHABLE();
}
return get_channel_participants(dialog_id.get_channel_id(), std::move(request_filter),
std::move(additional_query), 0, limit, additional_limit, std::move(promise));
} }
case DialogType::SecretChat: { case DialogType::SecretChat: {
auto peer_user_id = get_secret_chat_user_id(dialog_id.get_secret_chat_id()); auto peer_user_id = get_secret_chat_user_id(dialog_id.get_secret_chat_id());
@ -15001,32 +14938,9 @@ void ContactsManager::do_search_chat_participants(ChatId chat_id, const string &
return promise.set_error(Status::Error(500, "Can't find basic group full info")); return promise.set_error(Status::Error(500, "Can't find basic group full info"));
} }
auto is_dialog_participant_suitable = [this, filter](const DialogParticipant &participant) {
switch (filter.type) {
case DialogParticipantsFilter::Type::Contacts:
return participant.dialog_id.get_type() == DialogType::User &&
is_user_contact(participant.dialog_id.get_user_id());
case DialogParticipantsFilter::Type::Administrators:
return participant.status.is_administrator();
case DialogParticipantsFilter::Type::Members:
return participant.status.is_member(); // should be always true
case DialogParticipantsFilter::Type::Restricted:
return participant.status.is_restricted(); // should be always false
case DialogParticipantsFilter::Type::Banned:
return participant.status.is_banned(); // should be always false
case DialogParticipantsFilter::Type::Mention:
return true;
case DialogParticipantsFilter::Type::Bots:
return participant.dialog_id.get_type() == DialogType::User && is_user_bot(participant.dialog_id.get_user_id());
default:
UNREACHABLE();
return false;
}
};
vector<DialogId> dialog_ids; vector<DialogId> dialog_ids;
for (const auto &participant : chat_full->participants) { for (const auto &participant : chat_full->participants) {
if (is_dialog_participant_suitable(participant)) { if (filter.is_dialog_participant_suitable(td_, participant)) {
dialog_ids.push_back(participant.dialog_id); dialog_ids.push_back(participant.dialog_id);
} }
} }

View File

@ -770,30 +770,30 @@ td_api::object_ptr<td_api::chatMembers> DialogParticipants::get_chat_members_obj
tl_object_ptr<telegram_api::ChannelParticipantsFilter> tl_object_ptr<telegram_api::ChannelParticipantsFilter>
ChannelParticipantsFilter::get_input_channel_participants_filter() const { ChannelParticipantsFilter::get_input_channel_participants_filter() const {
switch (type) { switch (type_) {
case Type::Recent: case Type::Recent:
return make_tl_object<telegram_api::channelParticipantsRecent>(); return make_tl_object<telegram_api::channelParticipantsRecent>();
case Type::Contacts: case Type::Contacts:
return make_tl_object<telegram_api::channelParticipantsContacts>(query); return make_tl_object<telegram_api::channelParticipantsContacts>(query_);
case Type::Administrators: case Type::Administrators:
return make_tl_object<telegram_api::channelParticipantsAdmins>(); return make_tl_object<telegram_api::channelParticipantsAdmins>();
case Type::Search: case Type::Search:
return make_tl_object<telegram_api::channelParticipantsSearch>(query); return make_tl_object<telegram_api::channelParticipantsSearch>(query_);
case Type::Mention: { case Type::Mention: {
int32 flags = 0; int32 flags = 0;
if (!query.empty()) { if (!query_.empty()) {
flags |= telegram_api::channelParticipantsMentions::Q_MASK; flags |= telegram_api::channelParticipantsMentions::Q_MASK;
} }
if (top_thread_message_id.is_valid()) { if (top_thread_message_id_.is_valid()) {
flags |= telegram_api::channelParticipantsMentions::TOP_MSG_ID_MASK; flags |= telegram_api::channelParticipantsMentions::TOP_MSG_ID_MASK;
} }
return make_tl_object<telegram_api::channelParticipantsMentions>( return make_tl_object<telegram_api::channelParticipantsMentions>(
flags, query, top_thread_message_id.get_server_message_id().get()); flags, query_, top_thread_message_id_.get_server_message_id().get());
} }
case Type::Restricted: case Type::Restricted:
return make_tl_object<telegram_api::channelParticipantsBanned>(query); return make_tl_object<telegram_api::channelParticipantsBanned>(query_);
case Type::Banned: case Type::Banned:
return make_tl_object<telegram_api::channelParticipantsKicked>(query); return make_tl_object<telegram_api::channelParticipantsKicked>(query_);
case Type::Bots: case Type::Bots:
return make_tl_object<telegram_api::channelParticipantsBots>(); return make_tl_object<telegram_api::channelParticipantsBots>();
default: default:
@ -804,67 +804,67 @@ ChannelParticipantsFilter::get_input_channel_participants_filter() const {
ChannelParticipantsFilter::ChannelParticipantsFilter(const tl_object_ptr<td_api::SupergroupMembersFilter> &filter) { ChannelParticipantsFilter::ChannelParticipantsFilter(const tl_object_ptr<td_api::SupergroupMembersFilter> &filter) {
if (filter == nullptr) { if (filter == nullptr) {
type = Type::Recent; type_ = Type::Recent;
return; return;
} }
switch (filter->get_id()) { switch (filter->get_id()) {
case td_api::supergroupMembersFilterRecent::ID: case td_api::supergroupMembersFilterRecent::ID:
type = Type::Recent; type_ = Type::Recent;
return; return;
case td_api::supergroupMembersFilterContacts::ID: case td_api::supergroupMembersFilterContacts::ID:
type = Type::Contacts; type_ = Type::Contacts;
query = static_cast<const td_api::supergroupMembersFilterContacts *>(filter.get())->query_; query_ = static_cast<const td_api::supergroupMembersFilterContacts *>(filter.get())->query_;
return; return;
case td_api::supergroupMembersFilterAdministrators::ID: case td_api::supergroupMembersFilterAdministrators::ID:
type = Type::Administrators; type_ = Type::Administrators;
return; return;
case td_api::supergroupMembersFilterSearch::ID: case td_api::supergroupMembersFilterSearch::ID:
type = Type::Search; type_ = Type::Search;
query = static_cast<const td_api::supergroupMembersFilterSearch *>(filter.get())->query_; query_ = static_cast<const td_api::supergroupMembersFilterSearch *>(filter.get())->query_;
return; return;
case td_api::supergroupMembersFilterMention::ID: { case td_api::supergroupMembersFilterMention::ID: {
auto mention_filter = static_cast<const td_api::supergroupMembersFilterMention *>(filter.get()); auto mention_filter = static_cast<const td_api::supergroupMembersFilterMention *>(filter.get());
type = Type::Mention; type_ = Type::Mention;
query = mention_filter->query_; query_ = mention_filter->query_;
top_thread_message_id = MessageId(mention_filter->message_thread_id_); top_thread_message_id_ = MessageId(mention_filter->message_thread_id_);
if (!top_thread_message_id.is_valid() || !top_thread_message_id.is_server()) { if (!top_thread_message_id_.is_valid() || !top_thread_message_id_.is_server()) {
top_thread_message_id = MessageId(); top_thread_message_id_ = MessageId();
} }
return; return;
} }
case td_api::supergroupMembersFilterRestricted::ID: case td_api::supergroupMembersFilterRestricted::ID:
type = Type::Restricted; type_ = Type::Restricted;
query = static_cast<const td_api::supergroupMembersFilterRestricted *>(filter.get())->query_; query_ = static_cast<const td_api::supergroupMembersFilterRestricted *>(filter.get())->query_;
return; return;
case td_api::supergroupMembersFilterBanned::ID: case td_api::supergroupMembersFilterBanned::ID:
type = Type::Banned; type_ = Type::Banned;
query = static_cast<const td_api::supergroupMembersFilterBanned *>(filter.get())->query_; query_ = static_cast<const td_api::supergroupMembersFilterBanned *>(filter.get())->query_;
return; return;
case td_api::supergroupMembersFilterBots::ID: case td_api::supergroupMembersFilterBots::ID:
type = Type::Bots; type_ = Type::Bots;
return; return;
default: default:
UNREACHABLE(); UNREACHABLE();
type = Type::Recent; type_ = Type::Recent;
} }
} }
StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipantsFilter &filter) { StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipantsFilter &filter) {
switch (filter.type) { switch (filter.type_) {
case ChannelParticipantsFilter::Type::Recent: case ChannelParticipantsFilter::Type::Recent:
return string_builder << "Recent"; return string_builder << "Recent";
case ChannelParticipantsFilter::Type::Contacts: case ChannelParticipantsFilter::Type::Contacts:
return string_builder << "Contacts \"" << filter.query << '"'; return string_builder << "Contacts \"" << filter.query_ << '"';
case ChannelParticipantsFilter::Type::Administrators: case ChannelParticipantsFilter::Type::Administrators:
return string_builder << "Administrators"; return string_builder << "Administrators";
case ChannelParticipantsFilter::Type::Search: case ChannelParticipantsFilter::Type::Search:
return string_builder << "Search \"" << filter.query << '"'; return string_builder << "Search \"" << filter.query_ << '"';
case ChannelParticipantsFilter::Type::Mention: case ChannelParticipantsFilter::Type::Mention:
return string_builder << "Mention \"" << filter.query << "\" in thread of " << filter.top_thread_message_id; return string_builder << "Mention \"" << filter.query_ << "\" in thread of " << filter.top_thread_message_id_;
case ChannelParticipantsFilter::Type::Restricted: case ChannelParticipantsFilter::Type::Restricted:
return string_builder << "Restricted \"" << filter.query << '"'; return string_builder << "Restricted \"" << filter.query_ << '"';
case ChannelParticipantsFilter::Type::Banned: case ChannelParticipantsFilter::Type::Banned:
return string_builder << "Banned \"" << filter.query << '"'; return string_builder << "Banned \"" << filter.query_ << '"';
case ChannelParticipantsFilter::Type::Bots: case ChannelParticipantsFilter::Type::Bots:
return string_builder << "Bots"; return string_builder << "Bots";
default: default:
@ -874,7 +874,7 @@ StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipan
} }
StringBuilder &operator<<(StringBuilder &string_builder, const DialogParticipantsFilter &filter) { StringBuilder &operator<<(StringBuilder &string_builder, const DialogParticipantsFilter &filter) {
switch (filter.type) { switch (filter.type_) {
case DialogParticipantsFilter::Type::Contacts: case DialogParticipantsFilter::Type::Contacts:
return string_builder << "Contacts"; return string_builder << "Contacts";
case DialogParticipantsFilter::Type::Administrators: case DialogParticipantsFilter::Type::Administrators:
@ -895,34 +895,108 @@ StringBuilder &operator<<(StringBuilder &string_builder, const DialogParticipant
} }
} }
DialogParticipantsFilter get_dialog_participants_filter(const tl_object_ptr<td_api::ChatMembersFilter> &filter) { DialogParticipantsFilter::DialogParticipantsFilter(const tl_object_ptr<td_api::ChatMembersFilter> &filter) {
if (filter == nullptr) { if (filter == nullptr) {
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Members}; type_ = Type::Members;
return;
} }
switch (filter->get_id()) { switch (filter->get_id()) {
case td_api::chatMembersFilterContacts::ID: case td_api::chatMembersFilterContacts::ID:
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Contacts}; type_ = Type::Contacts;
break;
case td_api::chatMembersFilterAdministrators::ID: case td_api::chatMembersFilterAdministrators::ID:
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Administrators}; type_ = Type::Administrators;
break;
case td_api::chatMembersFilterMembers::ID: case td_api::chatMembersFilterMembers::ID:
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Members}; type_ = Type::Members;
break;
case td_api::chatMembersFilterRestricted::ID: case td_api::chatMembersFilterRestricted::ID:
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Restricted}; type_ = Type::Restricted;
break;
case td_api::chatMembersFilterBanned::ID: case td_api::chatMembersFilterBanned::ID:
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Banned}; type_ = Type::Banned;
break;
case td_api::chatMembersFilterMention::ID: { case td_api::chatMembersFilterMention::ID: {
auto mention_filter = static_cast<const td_api::chatMembersFilterMention *>(filter.get()); auto mention_filter = static_cast<const td_api::chatMembersFilterMention *>(filter.get());
auto top_thread_message_id = MessageId(mention_filter->message_thread_id_); top_thread_message_id_ = MessageId(mention_filter->message_thread_id_);
if (!top_thread_message_id.is_valid() || !top_thread_message_id.is_server()) { if (!top_thread_message_id_.is_valid() || !top_thread_message_id_.is_server()) {
top_thread_message_id = MessageId(); top_thread_message_id_ = MessageId();
} }
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Mention, top_thread_message_id}; type_ = Type::Mention;
break;
} }
case td_api::chatMembersFilterBots::ID: case td_api::chatMembersFilterBots::ID:
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Bots}; type_ = Type::Bots;
break;
default: default:
UNREACHABLE(); UNREACHABLE();
return DialogParticipantsFilter{DialogParticipantsFilter::Type::Members}; type_ = Type::Members;
break;
}
}
td_api::object_ptr<td_api::SupergroupMembersFilter> DialogParticipantsFilter::get_supergroup_members_filter_object(
const string &query) const {
switch (type_) {
case Type::Contacts:
return td_api::make_object<td_api::supergroupMembersFilterContacts>();
case Type::Administrators:
return td_api::make_object<td_api::supergroupMembersFilterAdministrators>();
case Type::Members:
return td_api::make_object<td_api::supergroupMembersFilterSearch>(query);
case Type::Restricted:
return td_api::make_object<td_api::supergroupMembersFilterRestricted>(query);
case Type::Banned:
return td_api::make_object<td_api::supergroupMembersFilterBanned>(query);
case Type::Mention:
return td_api::make_object<td_api::supergroupMembersFilterMention>(query, top_thread_message_id_.get());
case Type::Bots:
return td_api::make_object<td_api::supergroupMembersFilterBots>();
default:
UNREACHABLE();
return nullptr;
}
}
bool DialogParticipantsFilter::has_query() const {
switch (type_) {
case Type::Members:
case Type::Restricted:
case Type::Banned:
case Type::Mention:
return true;
case Type::Contacts:
case Type::Administrators:
case Type::Bots:
return false;
default:
UNREACHABLE();
return false;
}
}
bool DialogParticipantsFilter::is_dialog_participant_suitable(const Td *td,
const DialogParticipant &participant) const {
switch (type_) {
case Type::Contacts:
return participant.dialog_id.get_type() == DialogType::User &&
td->contacts_manager_->is_user_contact(participant.dialog_id.get_user_id());
case Type::Administrators:
return participant.status.is_administrator();
case Type::Members:
return participant.status.is_member();
case Type::Restricted:
return participant.status.is_restricted();
case Type::Banned:
return participant.status.is_banned();
case Type::Mention:
return true;
case Type::Bots:
return participant.dialog_id.get_type() == DialogType::User &&
td->contacts_manager_->is_user_bot(participant.dialog_id.get_user_id());
default:
UNREACHABLE();
return false;
} }
} }

View File

@ -408,6 +408,11 @@ struct DialogParticipant {
return {dialog_id, UserId(), 0, DialogParticipantStatus::Left()}; return {dialog_id, UserId(), 0, DialogParticipantStatus::Left()};
} }
static DialogParticipant private_member(UserId user_id, UserId other_user_id) {
auto inviter_user_id = other_user_id.is_valid() ? other_user_id : user_id;
return {DialogId(user_id), inviter_user_id, 0, DialogParticipantStatus::Member()};
}
bool is_valid() const; bool is_valid() const;
template <class StorerT> template <class StorerT>
@ -448,9 +453,10 @@ struct DialogParticipants {
}; };
class ChannelParticipantsFilter { class ChannelParticipantsFilter {
enum class Type : int32 { Recent, Contacts, Administrators, Search, Mention, Restricted, Banned, Bots } type; enum class Type : int32 { Recent, Contacts, Administrators, Search, Mention, Restricted, Banned, Bots };
string query; Type type_;
MessageId top_thread_message_id; string query_;
MessageId top_thread_message_id_;
friend StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipantsFilter &filter); friend StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipantsFilter &filter);
@ -460,51 +466,55 @@ class ChannelParticipantsFilter {
tl_object_ptr<telegram_api::ChannelParticipantsFilter> get_input_channel_participants_filter() const; tl_object_ptr<telegram_api::ChannelParticipantsFilter> get_input_channel_participants_filter() const;
bool is_administrators() const { bool is_administrators() const {
return type == Type::Administrators; return type_ == Type::Administrators;
} }
bool is_bots() const { bool is_bots() const {
return type == Type::Bots; return type_ == Type::Bots;
} }
bool is_recent() const { bool is_recent() const {
return type == Type::Recent; return type_ == Type::Recent;
} }
bool is_contacts() const { bool is_contacts() const {
return type == Type::Contacts; return type_ == Type::Contacts;
} }
bool is_search() const { bool is_search() const {
return type == Type::Search; return type_ == Type::Search;
} }
bool is_restricted() const { bool is_restricted() const {
return type == Type::Restricted; return type_ == Type::Restricted;
} }
bool is_banned() const { bool is_banned() const {
return type == Type::Banned; return type_ == Type::Banned;
} }
}; };
StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipantsFilter &filter); StringBuilder &operator<<(StringBuilder &string_builder, const ChannelParticipantsFilter &filter);
class DialogParticipantsFilter { class DialogParticipantsFilter {
public:
enum class Type : int32 { Contacts, Administrators, Members, Restricted, Banned, Mention, Bots }; enum class Type : int32 { Contacts, Administrators, Members, Restricted, Banned, Mention, Bots };
Type type; Type type_;
MessageId top_thread_message_id; MessageId top_thread_message_id_;
explicit DialogParticipantsFilter(Type type, MessageId top_thread_message_id = MessageId()) friend StringBuilder &operator<<(StringBuilder &string_builder, const DialogParticipantsFilter &filter);
: type(type), top_thread_message_id(top_thread_message_id) {
} public:
explicit DialogParticipantsFilter(const tl_object_ptr<td_api::ChatMembersFilter> &filter);
td_api::object_ptr<td_api::SupergroupMembersFilter> get_supergroup_members_filter_object(const string &query) const;
bool has_query() const;
bool is_dialog_participant_suitable(const Td *td, const DialogParticipant &participant) const;
}; };
StringBuilder &operator<<(StringBuilder &string_builder, const DialogParticipantsFilter &filter); StringBuilder &operator<<(StringBuilder &string_builder, const DialogParticipantsFilter &filter);
DialogParticipantsFilter get_dialog_participants_filter(const tl_object_ptr<td_api::ChatMembersFilter> &filter);
DialogParticipantStatus get_dialog_participant_status(const tl_object_ptr<td_api::ChatMemberStatus> &status); DialogParticipantStatus get_dialog_participant_status(const tl_object_ptr<td_api::ChatMemberStatus> &status);
DialogParticipantStatus get_dialog_participant_status(bool can_be_edited, DialogParticipantStatus get_dialog_participant_status(bool can_be_edited,

View File

@ -2676,8 +2676,8 @@ void GroupCallManager::try_load_group_call_administrators(InputGroupCallId input
std::move(result)); std::move(result));
}); });
td_->contacts_manager_->search_dialog_participants( td_->contacts_manager_->search_dialog_participants(
dialog_id, string(), 100, DialogParticipantsFilter(DialogParticipantsFilter::Type::Administrators), dialog_id, string(), 100,
std::move(promise)); DialogParticipantsFilter(td_api::make_object<td_api::chatMembersFilterAdministrators>()), std::move(promise));
} }
void GroupCallManager::finish_load_group_call_administrators(InputGroupCallId input_group_call_id, void GroupCallManager::finish_load_group_call_administrators(InputGroupCallId input_group_call_id,

View File

@ -2740,12 +2740,12 @@ class BlockFromRepliesQuery final : public Td::ResultHandler {
explicit BlockFromRepliesQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) { explicit BlockFromRepliesQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
} }
void send(MessageId message_id, bool delete_message, bool delete_all_messages, bool report_spam) { void send(MessageId message_id, bool need_delete_message, bool need_delete_all_messages, bool report_spam) {
int32 flags = 0; int32 flags = 0;
if (delete_message) { if (need_delete_message) {
flags |= telegram_api::contacts_blockFromReplies::DELETE_MESSAGE_MASK; flags |= telegram_api::contacts_blockFromReplies::DELETE_MESSAGE_MASK;
} }
if (delete_all_messages) { if (need_delete_all_messages) {
flags |= telegram_api::contacts_blockFromReplies::DELETE_HISTORY_MASK; flags |= telegram_api::contacts_blockFromReplies::DELETE_HISTORY_MASK;
} }
if (report_spam) { if (report_spam) {
@ -16718,8 +16718,8 @@ void MessagesManager::on_get_common_dialogs(UserId user_id, int64 offset_chat_id
common_dialogs.total_count = total_count; common_dialogs.total_count = total_count;
} }
void MessagesManager::block_message_sender_from_replies(MessageId message_id, bool delete_message, void MessagesManager::block_message_sender_from_replies(MessageId message_id, bool need_delete_message,
bool delete_all_messages, bool report_spam, bool need_delete_all_messages, bool report_spam,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
auto dialog_id = DialogId(ContactsManager::get_replies_bot_user_id()); auto dialog_id = DialogId(ContactsManager::get_replies_bot_user_id());
Dialog *d = get_dialog_force(dialog_id, "block_message_sender_from_replies"); Dialog *d = get_dialog_force(dialog_id, "block_message_sender_from_replies");
@ -16744,20 +16744,19 @@ void MessagesManager::block_message_sender_from_replies(MessageId message_id, bo
} }
bool need_update_dialog_pos = false; bool need_update_dialog_pos = false;
vector<int64> deleted_message_ids; vector<int64> deleted_message_ids;
if (delete_message) { if (need_delete_message) {
auto p = this->delete_message(d, message_id, true, &need_update_dialog_pos, "block_message_sender_from_replies"); auto p = delete_message(d, message_id, true, &need_update_dialog_pos, "block_message_sender_from_replies");
CHECK(p.get() == m); CHECK(p.get() == m);
deleted_message_ids.push_back(p->message_id.get()); deleted_message_ids.push_back(p->message_id.get());
} }
if (delete_all_messages && sender_user_id.is_valid()) { if (need_delete_all_messages && sender_user_id.is_valid()) {
vector<MessageId> message_ids; vector<MessageId> message_ids;
find_messages(d->messages.get(), message_ids, [sender_user_id](const Message *m) { find_messages(d->messages.get(), message_ids, [sender_user_id](const Message *m) {
return !m->is_outgoing && m->forward_info != nullptr && m->forward_info->sender_user_id == sender_user_id; return !m->is_outgoing && m->forward_info != nullptr && m->forward_info->sender_user_id == sender_user_id;
}); });
for (auto user_message_id : message_ids) { for (auto user_message_id : message_ids) {
auto p = this->delete_message(d, user_message_id, true, &need_update_dialog_pos, auto p = delete_message(d, user_message_id, true, &need_update_dialog_pos, "block_message_sender_from_replies 2");
"block_message_sender_from_replies 2");
deleted_message_ids.push_back(p->message_id.get()); deleted_message_ids.push_back(p->message_id.get());
} }
} }
@ -16768,7 +16767,7 @@ void MessagesManager::block_message_sender_from_replies(MessageId message_id, bo
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false); send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false);
block_message_sender_from_replies_on_server(message_id, delete_message, delete_all_messages, report_spam, 0, block_message_sender_from_replies_on_server(message_id, need_delete_message, need_delete_all_messages, report_spam, 0,
std::move(promise)); std::move(promise));
} }
@ -16803,24 +16802,25 @@ class MessagesManager::BlockMessageSenderFromRepliesOnServerLogEvent {
}; };
uint64 MessagesManager::save_block_message_sender_from_replies_on_server_log_event(MessageId message_id, uint64 MessagesManager::save_block_message_sender_from_replies_on_server_log_event(MessageId message_id,
bool delete_message, bool need_delete_message,
bool delete_all_messages, bool need_delete_all_messages,
bool report_spam) { bool report_spam) {
BlockMessageSenderFromRepliesOnServerLogEvent log_event{message_id, delete_message, delete_all_messages, report_spam}; BlockMessageSenderFromRepliesOnServerLogEvent log_event{message_id, need_delete_message, need_delete_all_messages,
report_spam};
return binlog_add(G()->td_db()->get_binlog(), LogEvent::HandlerType::BlockMessageSenderFromRepliesOnServer, return binlog_add(G()->td_db()->get_binlog(), LogEvent::HandlerType::BlockMessageSenderFromRepliesOnServer,
get_log_event_storer(log_event)); get_log_event_storer(log_event));
} }
void MessagesManager::block_message_sender_from_replies_on_server(MessageId message_id, bool delete_message, void MessagesManager::block_message_sender_from_replies_on_server(MessageId message_id, bool need_delete_message,
bool delete_all_messages, bool report_spam, bool need_delete_all_messages, bool report_spam,
uint64 log_event_id, Promise<Unit> &&promise) { uint64 log_event_id, Promise<Unit> &&promise) {
if (log_event_id == 0) { if (log_event_id == 0) {
log_event_id = save_block_message_sender_from_replies_on_server_log_event(message_id, delete_message, log_event_id = save_block_message_sender_from_replies_on_server_log_event(message_id, need_delete_message,
delete_all_messages, report_spam); need_delete_all_messages, report_spam);
} }
td_->create_handler<BlockFromRepliesQuery>(get_erase_log_event_promise(log_event_id, std::move(promise))) td_->create_handler<BlockFromRepliesQuery>(get_erase_log_event_promise(log_event_id, std::move(promise)))
->send(message_id, delete_message, delete_all_messages, report_spam); ->send(message_id, need_delete_message, need_delete_all_messages, report_spam);
} }
void MessagesManager::get_blocked_dialogs(int32 offset, int32 limit, void MessagesManager::get_blocked_dialogs(int32 offset, int32 limit,

View File

@ -546,7 +546,7 @@ class MessagesManager final : public Actor {
std::pair<int32, vector<DialogId>> get_common_dialogs(UserId user_id, DialogId offset_dialog_id, int32 limit, std::pair<int32, vector<DialogId>> get_common_dialogs(UserId user_id, DialogId offset_dialog_id, int32 limit,
bool force, Promise<Unit> &&promise); bool force, Promise<Unit> &&promise);
void block_message_sender_from_replies(MessageId message_id, bool delete_message, bool delete_all_messages, void block_message_sender_from_replies(MessageId message_id, bool need_delete_message, bool need_delete_all_messages,
bool report_spam, Promise<Unit> &&promise); bool report_spam, Promise<Unit> &&promise);
void get_blocked_dialogs(int32 offset, int32 limit, Promise<td_api::object_ptr<td_api::messageSenders>> &&promise); void get_blocked_dialogs(int32 offset, int32 limit, Promise<td_api::object_ptr<td_api::messageSenders>> &&promise);
@ -1988,8 +1988,9 @@ class MessagesManager final : public Actor {
void delete_all_call_messages_from_server(bool revoke, uint64 log_event_id, Promise<Unit> &&promise); void delete_all_call_messages_from_server(bool revoke, uint64 log_event_id, Promise<Unit> &&promise);
void block_message_sender_from_replies_on_server(MessageId message_id, bool delete_message, bool delete_all_messages, void block_message_sender_from_replies_on_server(MessageId message_id, bool need_delete_message,
bool report_spam, uint64 log_event_id, Promise<Unit> &&promise); bool need_delete_all_messages, bool report_spam, uint64 log_event_id,
Promise<Unit> &&promise);
void delete_all_channel_messages_from_user_on_server(ChannelId channel_id, UserId user_id, uint64 log_event_id, void delete_all_channel_messages_from_user_on_server(ChannelId channel_id, UserId user_id, uint64 log_event_id,
Promise<Unit> &&promise); Promise<Unit> &&promise);
@ -3054,8 +3055,10 @@ class MessagesManager final : public Actor {
static uint64 save_delete_all_call_messages_from_server_log_event(bool revoke); static uint64 save_delete_all_call_messages_from_server_log_event(bool revoke);
static uint64 save_block_message_sender_from_replies_on_server_log_event(MessageId message_id, bool delete_message, static uint64 save_block_message_sender_from_replies_on_server_log_event(MessageId message_id,
bool delete_all_messages, bool report_spam); bool need_delete_message,
bool need_delete_all_messages,
bool report_spam);
static uint64 save_delete_all_channel_messages_from_user_on_server_log_event(ChannelId channel_id, UserId user_id); static uint64 save_delete_all_channel_messages_from_user_on_server_log_event(ChannelId channel_id, UserId user_id);

View File

@ -1565,8 +1565,8 @@ void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise
LOG(INFO) << "Outbound secret message [send_message] failed, rewrite it with dummy " LOG(INFO) << "Outbound secret message [send_message] failed, rewrite it with dummy "
<< tag("log_event_id", state->message->log_event_id()) << tag("error", error); << tag("log_event_id", state->message->log_event_id()) << tag("error", error);
state->send_result_ = [this, random_id = state->message->random_id, error_code = error.code(), state->send_result_ = [this, random_id = state->message->random_id, error_code = error.code(),
error_message = error.message()](Promise<> promise) { error_message = error.message().str()](Promise<> promise) {
this->context_->on_send_message_error(random_id, Status::Error(error_code, error_message), std::move(promise)); context_->on_send_message_error(random_id, Status::Error(error_code, error_message), std::move(promise));
}; };
state->send_result_(std::move(send_message_error_promise)); state->send_result_(std::move(send_message_error_promise));
} else { } else {
@ -1598,7 +1598,7 @@ void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise
state->send_result_ = [this, random_id = state->message->random_id, state->send_result_ = [this, random_id = state->message->random_id,
message_id = MessageId(ServerMessageId(state->message->message_id)), message_id = MessageId(ServerMessageId(state->message->message_id)),
date = sent->date_](Promise<> promise) { date = sent->date_](Promise<> promise) {
this->context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise)); context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise));
}; };
state->send_result_(std::move(send_message_finish_promise)); state->send_result_(std::move(send_message_finish_promise));
return; return;
@ -1611,7 +1611,7 @@ void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise
state->send_result_ = [this, random_id = state->message->random_id, state->send_result_ = [this, random_id = state->message->random_id,
message_id = MessageId(ServerMessageId(state->message->message_id)), message_id = MessageId(ServerMessageId(state->message->message_id)),
date = sent->date_](Promise<> promise) { date = sent->date_](Promise<> promise) {
this->context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise)); context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise));
}; };
} else { } else {
state->message->file = log_event::EncryptedInputFile::from_input_encrypted_file( state->message->file = log_event::EncryptedInputFile::from_input_encrypted_file(
@ -1619,8 +1619,8 @@ void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise
state->send_result_ = [this, random_id = state->message->random_id, state->send_result_ = [this, random_id = state->message->random_id,
message_id = MessageId(ServerMessageId(state->message->message_id)), message_id = MessageId(ServerMessageId(state->message->message_id)),
date = sent->date_, file = *file](Promise<> promise) { date = sent->date_, file = *file](Promise<> promise) {
this->context_->on_send_message_ok(random_id, message_id, date, make_unique<EncryptedFile>(file), context_->on_send_message_ok(random_id, message_id, date, make_unique<EncryptedFile>(file),
std::move(promise)); std::move(promise));
}; };
} }
state->send_result_(std::move(send_message_finish_promise)); state->send_result_(std::move(send_message_finish_promise));

View File

@ -3096,9 +3096,10 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
return send_result(id, td_api::make_object<td_api::updates>(std::move(updates))); return send_result(id, td_api::make_object<td_api::updates>(std::move(updates)));
} }
case td_api::close::ID: case td_api::close::ID:
// need to send response synchronously before actual closing // need to send response before actual closing
send_result(id, td_api::make_object<td_api::ok>()); send_closure(actor_id(this), &Td::send_result, id, td_api::make_object<td_api::ok>());
return close(); send_closure(actor_id(this), &Td::close);
return;
default: default:
break; break;
} }
@ -3134,8 +3135,9 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
} }
case td_api::destroy::ID: case td_api::destroy::ID:
// need to send response synchronously before actual destroying // need to send response synchronously before actual destroying
send_result(id, td_api::make_object<td_api::ok>()); send_closure(actor_id(this), &Td::send_result, id, td_api::make_object<td_api::ok>());
return destroy(); send_closure(actor_id(this), &Td::destroy);
return;
default: default:
if (is_preinitialization_request(function_id)) { if (is_preinitialization_request(function_id)) {
break; break;
@ -4540,13 +4542,13 @@ void Td::on_request(uint64 id, const td_api::logOut &request) {
void Td::on_request(uint64 id, const td_api::close &request) { void Td::on_request(uint64 id, const td_api::close &request) {
// send response before actually closing // send response before actually closing
send_closure(actor_id(this), &Td::send_result, id, td_api::make_object<td_api::ok>()); send_closure(actor_id(this), &Td::send_result, id, td_api::make_object<td_api::ok>());
close(); send_closure(actor_id(this), &Td::close);
} }
void Td::on_request(uint64 id, const td_api::destroy &request) { void Td::on_request(uint64 id, const td_api::destroy &request) {
// send response before actually destroying // send response before actually destroying
send_closure(actor_id(this), &Td::send_result, id, td_api::make_object<td_api::ok>()); send_closure(actor_id(this), &Td::send_result, id, td_api::make_object<td_api::ok>());
destroy(); send_closure(actor_id(this), &Td::destroy);
} }
void Td::on_request(uint64 id, td_api::checkAuthenticationBotToken &request) { void Td::on_request(uint64 id, td_api::checkAuthenticationBotToken &request) {
@ -6311,8 +6313,7 @@ void Td::on_request(uint64 id, td_api::searchChatMembers &request) {
} }
}); });
contacts_manager_->search_dialog_participants(DialogId(request.chat_id_), request.query_, request.limit_, contacts_manager_->search_dialog_participants(DialogId(request.chat_id_), request.query_, request.limit_,
get_dialog_participants_filter(request.filter_), DialogParticipantsFilter(request.filter_), std::move(query_promise));
std::move(query_promise));
} }
void Td::on_request(uint64 id, td_api::getChatAdministrators &request) { void Td::on_request(uint64 id, td_api::getChatAdministrators &request) {

View File

@ -302,8 +302,9 @@ void FileLoader::on_result(NetQueryPtr query) {
if (next) { if (next) {
if (ordered_flag_) { if (ordered_flag_) {
auto seq_no = part.id; auto seq_no = part.id;
ordered_parts_.add(seq_no, std::make_pair(part, std::move(query)), ordered_parts_.add(
[this](auto seq_no, auto &&p) { this->on_part_query(p.first, std::move(p.second)); }); seq_no, std::make_pair(part, std::move(query)),
[this](uint64 seq_no, std::pair<Part, NetQueryPtr> &&p) { on_part_query(p.first, std::move(p.second)); });
} else { } else {
on_part_query(part, std::move(query)); on_part_query(part, std::move(query));
} }

View File

@ -145,7 +145,7 @@ Result<string> get_suggested_file_name(CSlice directory, Slice file_name) {
file_name = cleaned_name; file_name = cleaned_name;
if (directory.empty()) { if (directory.empty()) {
directory = "./"; directory = CSlice("./");
} }
auto dir_stat = stat(directory); auto dir_stat = stat(directory);

View File

@ -827,7 +827,7 @@ FileManager::FileManager(unique_ptr<Context> context) : context_(std::move(conte
#endif #endif
}; };
G()->td_db()->with_db_path([this](CSlice path) { this->bad_paths_.insert(path.str()); }); G()->td_db()->with_db_path([bad_paths = &bad_paths_](CSlice path) { bad_paths->insert(path.str()); });
} }
void FileManager::init_actor() { void FileManager::init_actor() {
@ -3739,9 +3739,10 @@ void FileManager::on_error_impl(FileNodePtr node, Query::Type type, bool was_act
if (FileReferenceManager::is_file_reference_error(status)) { if (FileReferenceManager::is_file_reference_error(status)) {
string file_reference; string file_reference;
Slice prefix = "#BASE64"; Slice prefix = "#BASE64";
auto pos = status.message().rfind('#'); Slice error_message = status.message();
if (pos < status.message().size() && begins_with(status.message().substr(pos), prefix)) { auto pos = error_message.rfind('#');
auto r_file_reference = base64_decode(status.message().substr(pos + prefix.size())); if (pos < error_message.size() && begins_with(error_message.substr(pos), prefix)) {
auto r_file_reference = base64_decode(error_message.substr(pos + prefix.size()));
if (r_file_reference.is_ok()) { if (r_file_reference.is_ok()) {
file_reference = r_file_reference.move_as_ok(); file_reference = r_file_reference.move_as_ok();
} else { } else {

View File

@ -323,7 +323,8 @@ class FileView {
return false; return false;
} }
auto type = remote_location().get_source().get_type(); auto type = remote_location().get_source().get_type();
return type != PhotoSizeSource::Type::Legacy && type != PhotoSizeSource::Type::FullLegacy; return type != PhotoSizeSource::Type::Legacy && type != PhotoSizeSource::Type::FullLegacy &&
type != PhotoSizeSource::Type::Thumbnail;
} }
string get_persistent_file_id() const; string get_persistent_file_id() const;
@ -560,10 +561,10 @@ class FileManager final : public FileLoadManager::Callback {
mutable FileLocationSource file_location_source_; mutable FileLocationSource file_location_source_;
FileId file_id_; FileId file_id_;
bool operator==(const RemoteInfo &other) const { bool operator==(const RemoteInfo &other) const {
return this->remote_ == other.remote_; return remote_ == other.remote_;
} }
bool operator<(const RemoteInfo &other) const { bool operator<(const RemoteInfo &other) const {
return this->remote_ < other.remote_; return remote_ < other.remote_;
} }
}; };
Enumerator<RemoteInfo> remote_location_info_; Enumerator<RemoteInfo> remote_location_info_;

View File

@ -130,7 +130,7 @@ ConnectionCreator::ConnectionCreator(ActorShared<> parent) : parent_(std::move(p
ConnectionCreator::ConnectionCreator(ConnectionCreator &&other) = default; ConnectionCreator::ConnectionCreator(ConnectionCreator &&other) = default;
ConnectionCreator &ConnectionCreator::operator=(ConnectionCreator &&other) noexcept = default; ConnectionCreator &ConnectionCreator::operator=(ConnectionCreator &&other) = default;
ConnectionCreator::~ConnectionCreator() = default; ConnectionCreator::~ConnectionCreator() = default;

View File

@ -55,7 +55,7 @@ class ConnectionCreator final : public NetQueryCallback {
public: public:
explicit ConnectionCreator(ActorShared<> parent); explicit ConnectionCreator(ActorShared<> parent);
ConnectionCreator(ConnectionCreator &&other); ConnectionCreator(ConnectionCreator &&other);
ConnectionCreator &operator=(ConnectionCreator &&other) noexcept; ConnectionCreator &operator=(ConnectionCreator &&other);
~ConnectionCreator() final; ~ConnectionCreator() final;
void on_dc_options(DcOptions new_dc_options); void on_dc_options(DcOptions new_dc_options);

View File

@ -28,16 +28,16 @@ void NetQueryDelayer::delay(NetQueryPtr query) {
if (code < 0) { if (code < 0) {
// skip // skip
} else if (code == 500) { } else if (code == 500) {
auto msg = query->error().message(); auto error_message = query->error().message();
if (msg == "WORKER_BUSY_TOO_LONG_RETRY") { if (error_message == "WORKER_BUSY_TOO_LONG_RETRY") {
timeout = 1; // it is dangerous to resend query without timeout, so use 1 timeout = 1; // it is dangerous to resend query without timeout, so use 1
} }
} else if (code == 420) { } else if (code == 420) {
auto msg = query->error().message(); auto error_message = query->error().message();
for (auto prefix : for (auto prefix :
{Slice("FLOOD_WAIT_"), Slice("SLOWMODE_WAIT_"), Slice("2FA_CONFIRM_WAIT_"), Slice("TAKEOUT_INIT_DELAY_")}) { {Slice("FLOOD_WAIT_"), Slice("SLOWMODE_WAIT_"), Slice("2FA_CONFIRM_WAIT_"), Slice("TAKEOUT_INIT_DELAY_")}) {
if (begins_with(msg, prefix)) { if (begins_with(error_message, prefix)) {
timeout = clamp(to_integer<int>(msg.substr(prefix.size())), 1, 14 * 24 * 60 * 60); timeout = clamp(to_integer<int>(error_message.substr(prefix.size())), 1, 14 * 24 * 60 * 60);
break; break;
} }
} }

View File

@ -294,15 +294,15 @@ NetQueryDispatcher::NetQueryDispatcher() = default;
NetQueryDispatcher::~NetQueryDispatcher() = default; NetQueryDispatcher::~NetQueryDispatcher() = default;
void NetQueryDispatcher::try_fix_migrate(NetQueryPtr &net_query) { void NetQueryDispatcher::try_fix_migrate(NetQueryPtr &net_query) {
auto msg = net_query->error().message(); auto error_message = net_query->error().message();
static constexpr CSlice prefixes[] = {"PHONE_MIGRATE_", "NETWORK_MIGRATE_", "USER_MIGRATE_"}; static constexpr CSlice prefixes[] = {"PHONE_MIGRATE_", "NETWORK_MIGRATE_", "USER_MIGRATE_"};
for (auto &prefix : prefixes) { for (auto &prefix : prefixes) {
if (msg.substr(0, prefix.size()) == prefix) { if (error_message.substr(0, prefix.size()) == prefix) {
auto new_main_dc_id = to_integer<int32>(msg.substr(prefix.size())); auto new_main_dc_id = to_integer<int32>(error_message.substr(prefix.size()));
set_main_dc_id(new_main_dc_id); set_main_dc_id(new_main_dc_id);
if (!net_query->dc_id().is_main()) { if (!net_query->dc_id().is_main()) {
LOG(ERROR) << msg << " from query to non-main dc " << net_query->dc_id(); LOG(ERROR) << "Receive " << error_message << " for query to non-main DC" << net_query->dc_id();
net_query->resend(DcId::internal(new_main_dc_id)); net_query->resend(DcId::internal(new_main_dc_id));
} else { } else {
net_query->resend(); net_query->resend();

View File

@ -149,11 +149,12 @@ class Scheduler {
class ServiceActor final : public Actor { class ServiceActor final : public Actor {
public: public:
void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues); void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues);
void start_up() final;
private: private:
std::shared_ptr<MpscPollableQueue<EventFull>> inbound_; std::shared_ptr<MpscPollableQueue<EventFull>> inbound_;
bool subscribed_{false}; bool subscribed_{false};
void start_up() final;
void loop() final; void loop() final;
void tear_down() final; void tear_down() final;
}; };

View File

@ -54,6 +54,10 @@ void Scheduler::set_scheduler(Scheduler *scheduler) {
scheduler_ = scheduler; scheduler_ = scheduler;
} }
void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) {
inbound_ = std::move(queues);
}
void Scheduler::ServiceActor::start_up() { void Scheduler::ServiceActor::start_up() {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
CHECK(!inbound_); CHECK(!inbound_);

View File

@ -27,11 +27,6 @@
namespace td { namespace td {
/*** ServiceActor ***/
inline void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) {
inbound_ = std::move(queues);
}
/*** EventGuard ***/ /*** EventGuard ***/
class EventGuard { class EventGuard {
public: public:

View File

@ -31,16 +31,20 @@ static char buf2[BUF_SIZE];
static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1)); static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1));
static std::shared_ptr<td::MpscPollableQueue<td::EventFull>> create_queue() { static td::vector<std::shared_ptr<td::MpscPollableQueue<td::EventFull>>> create_queues() {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
return {};
#else
auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>(); auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>();
res->init(); res->init();
return res; return {res};
#endif
} }
TEST(Actors, SendLater) { TEST(Actors, SendLater) {
sb.clear(); sb.clear();
td::Scheduler scheduler; td::Scheduler scheduler;
scheduler.init(0, {create_queue()}, nullptr); scheduler.init(0, create_queues(), nullptr);
auto guard = scheduler.get_guard(); auto guard = scheduler.get_guard();
class Worker final : public td::Actor { class Worker final : public td::Actor {
@ -96,7 +100,7 @@ class XReceiver final : public td::Actor {
TEST(Actors, simple_pass_event_arguments) { TEST(Actors, simple_pass_event_arguments) {
td::Scheduler scheduler; td::Scheduler scheduler;
scheduler.init(0, {create_queue()}, nullptr); scheduler.init(0, create_queues(), nullptr);
auto guard = scheduler.get_guard(); auto guard = scheduler.get_guard();
auto id = td::create_actor<XReceiver>("XR").release(); auto id = td::create_actor<XReceiver>("XR").release();
@ -201,7 +205,7 @@ class PrintChar final : public td::Actor {
// //
TEST(Actors, simple_hand_yield) { TEST(Actors, simple_hand_yield) {
td::Scheduler scheduler; td::Scheduler scheduler;
scheduler.init(0, {create_queue()}, nullptr); scheduler.init(0, create_queues(), nullptr);
sb.clear(); sb.clear();
int cnt = 1000; int cnt = 1000;
{ {
@ -350,7 +354,7 @@ class MasterActor final : public MsgActor {
TEST(Actors, call_after_destruct) { TEST(Actors, call_after_destruct) {
td::Scheduler scheduler; td::Scheduler scheduler;
scheduler.init(0, {create_queue()}, nullptr); scheduler.init(0, create_queues(), nullptr);
{ {
auto guard = scheduler.get_guard(); auto guard = scheduler.get_guard();
td::create_actor<MasterActor>("Master").release(); td::create_actor<MasterActor>("Master").release();

View File

@ -16,7 +16,7 @@
namespace td { namespace td {
namespace detail { namespace detail {
HttpConnectionBase::HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, HttpConnectionBase::HttpConnectionBase(State state, BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size,
size_t max_files, int32 idle_timeout, int32 slow_scheduler_id) size_t max_files, int32 idle_timeout, int32 slow_scheduler_id)
: state_(state) : state_(state)
, fd_(std::move(fd)) , fd_(std::move(fd))

View File

@ -32,7 +32,7 @@ class HttpConnectionBase : public Actor {
protected: protected:
enum class State { Read, Write, Close }; enum class State { Read, Write, Close };
HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, HttpConnectionBase(State state, BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
int32 idle_timeout, int32 slow_scheduler_id); int32 idle_timeout, int32 slow_scheduler_id);
private: private:

View File

@ -12,8 +12,9 @@
namespace td { namespace td {
HttpInboundConnection::HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout, HttpInboundConnection::HttpInboundConnection(BufferedFd<SocketFd> fd, size_t max_post_size, size_t max_files,
ActorShared<Callback> callback, int32 slow_scheduler_id) int32 idle_timeout, ActorShared<Callback> callback,
int32 slow_scheduler_id)
: HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout, : HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout,
slow_scheduler_id) slow_scheduler_id)
, callback_(std::move(callback)) { , callback_(std::move(callback)) {

View File

@ -27,7 +27,7 @@ class HttpInboundConnection final : public detail::HttpConnectionBase {
// void write_ok(); // void write_ok();
// void write_error(Status error); // void write_error(Status error);
HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout, HttpInboundConnection(BufferedFd<SocketFd> fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
ActorShared<Callback> callback, int32 slow_scheduler_id = -1); ActorShared<Callback> callback, int32 slow_scheduler_id = -1);
private: private:

View File

@ -24,8 +24,8 @@ class HttpOutboundConnection final : public detail::HttpConnectionBase {
virtual void handle(unique_ptr<HttpQuery> query) = 0; virtual void handle(unique_ptr<HttpQuery> query) = 0;
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
}; };
HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout, HttpOutboundConnection(BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
ActorShared<Callback> callback, int32 slow_scheduler_id = -1) int32 idle_timeout, ActorShared<Callback> callback, int32 slow_scheduler_id = -1)
: HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size, : HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size,
max_files, idle_timeout, slow_scheduler_id) max_files, idle_timeout, slow_scheduler_id)
, callback_(std::move(callback)) { , callback_(std::move(callback)) {

View File

@ -79,14 +79,14 @@ Status Wget::try_init() {
return Status::Error("Sockets are not supported"); return Status::Error("Sockets are not supported");
} }
if (url.protocol_ == HttpUrl::Protocol::Http) { if (url.protocol_ == HttpUrl::Protocol::Http) {
connection_ = create_actor<HttpOutboundConnection>("Connect", std::move(fd), SslStream{}, connection_ = create_actor<HttpOutboundConnection>("Connect", BufferedFd<SocketFd>(std::move(fd)), SslStream{},
std::numeric_limits<std::size_t>::max(), 0, 0, std::numeric_limits<std::size_t>::max(), 0, 0,
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this))); ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
} else { } else {
TRY_RESULT(ssl_stream, SslStream::create(url.host_, CSlice() /* certificate */, verify_peer_)); TRY_RESULT(ssl_stream, SslStream::create(url.host_, CSlice() /* certificate */, verify_peer_));
connection_ = create_actor<HttpOutboundConnection>("Connect", std::move(fd), std::move(ssl_stream), connection_ = create_actor<HttpOutboundConnection>(
std::numeric_limits<std::size_t>::max(), 0, 0, "Connect", BufferedFd<SocketFd>(std::move(fd)), std::move(ssl_stream), std::numeric_limits<std::size_t>::max(),
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this))); 0, 0, ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
} }
send_closure(connection_, &HttpOutboundConnection::write_next, BufferSlice(header)); send_closure(connection_, &HttpOutboundConnection::write_next, BufferSlice(header));

View File

@ -35,7 +35,7 @@
#if TD_LINUX #if TD_LINUX
#include <linux/errqueue.h> #include <linux/errqueue.h>
#endif #endif
#endif // TD_PORT_POSIX #endif
#include <array> #include <array>
#include <atomic> #include <atomic>

View File

@ -65,4 +65,4 @@ TEST(EpochBaseMemoryReclamation, stress) {
} }
CHECK(ebmr.to_delete_size_unsafe() == 0); CHECK(ebmr.to_delete_size_unsafe() == 0);
} }
#endif //!TD_THREAD_UNSUPPORTED #endif

View File

@ -57,4 +57,4 @@ TEST(HazardPointers, stress) {
} }
CHECK(hazard_pointers.to_delete_size_unsafe() == 0); CHECK(hazard_pointers.to_delete_size_unsafe() == 0);
} }
#endif //!TD_THREAD_UNSUPPORTED #endif

View File

@ -156,6 +156,7 @@ TEST(Misc, TsList) {
} }
} }
#if !TD_THREAD_UNSUPPORTED
TEST(Misc, TsListConcurrent) { TEST(Misc, TsListConcurrent) {
td::TsList<ListData> root; td::TsList<ListData> root;
td::vector<td::thread> threads; td::vector<td::thread> threads;
@ -165,3 +166,4 @@ TEST(Misc, TsListConcurrent) {
[&] { do_run_list_test<td::TsListNode<ListData>, td::TsList<ListData>, td::TsListNode<ListData>>(root, id); }); [&] { do_run_list_test<td::TsListNode<ListData>, td::TsList<ListData>, td::TsListNode<ListData>>(root, id); });
} }
} }
#endif

View File

@ -83,7 +83,7 @@ TEST(OneValue, stress) {
thread.join(); thread.join();
} }
} }
#endif //!TD_THREAD_UNSUPPORTED #endif
TEST(MpmcQueueBlock, simple) { TEST(MpmcQueueBlock, simple) {
// Test doesn't work now and it is ok, try_pop, logic changed // Test doesn't work now and it is ok, try_pop, logic changed
@ -204,4 +204,4 @@ TEST(MpmcQueue, multi_thread) {
} }
LOG_CHECK(q.hazard_pointers_to_delele_size_unsafe() == 0) << q.hazard_pointers_to_delele_size_unsafe(); LOG_CHECK(q.hazard_pointers_to_delele_size_unsafe() == 0) << q.hazard_pointers_to_delele_size_unsafe();
} }
#endif //!TD_THREAD_UNSUPPORTED #endif

View File

@ -140,4 +140,4 @@ TEST(MpmcEagerWaiter, stress_multi) {
TEST(MpmcSleepyWaiter, stress_multi) { TEST(MpmcSleepyWaiter, stress_multi) {
test_waiter_stress<td::MpmcSleepyWaiter>(); test_waiter_stress<td::MpmcSleepyWaiter>();
} }
#endif // !TD_THREAD_UNSUPPORTED #endif

View File

@ -113,4 +113,4 @@ TEST(MpscLinkQueue, multi_thread) {
thread.join(); thread.join();
} }
} }
#endif //!TD_THREAD_UNSUPPORTED #endif

View File

@ -26,6 +26,7 @@ TEST(StealingQueue, very_simple) {
ASSERT_EQ(1, x); ASSERT_EQ(1, x);
} }
#if !TD_THREAD_UNSUPPORTED
TEST(AtomicRead, simple) { TEST(AtomicRead, simple) {
td::Stage run; td::Stage run;
td::Stage check; td::Stage check;
@ -176,3 +177,4 @@ TEST(StealingQueue, simple) {
thread.join(); thread.join();
} }
} }
#endif

View File

@ -219,6 +219,7 @@ TEST(Port, SignalsAndThread) {
} }
} }
#if !TD_EVENTFD_UNSUPPORTED
TEST(Port, EventFdAndSignals) { TEST(Port, EventFdAndSignals) {
td::set_signal_handler(td::SignalType::User, [](int signal) {}).ensure(); td::set_signal_handler(td::SignalType::User, [](int signal) {}).ensure();
SCOPE_EXIT { SCOPE_EXIT {
@ -258,3 +259,4 @@ TEST(Port, EventFdAndSignals) {
flag.clear(); flag.clear();
} }
#endif #endif
#endif

View File

@ -936,7 +936,11 @@ TEST(Client, Multi) {
TEST(Client, Manager) { TEST(Client, Manager) {
td::vector<td::thread> threads; td::vector<td::thread> threads;
td::ClientManager client; td::ClientManager client;
#if !TD_EVENTFD_UNSUPPORTED // Client must be used from a single thread if there is no EventFd
int threads_n = 4; int threads_n = 4;
#else
int threads_n = 1;
#endif
int clients_n = 1000; int clients_n = 1000;
client.send(0, 3, td::make_tl_object<td::td_api::testSquareInt>(3)); client.send(0, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
client.send(-1, 3, td::make_tl_object<td::td_api::testSquareInt>(3)); client.send(-1, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
@ -969,6 +973,7 @@ TEST(Client, Manager) {
} }
} }
#if !TD_EVENTFD_UNSUPPORTED // Client must be used from a single thread if there is no EventFd
TEST(Client, Close) { TEST(Client, Close) {
std::atomic<bool> stop_send{false}; std::atomic<bool> stop_send{false};
std::atomic<bool> can_stop_receive{false}; std::atomic<bool> can_stop_receive{false};
@ -1093,6 +1098,7 @@ TEST(Client, ManagerClose) {
ASSERT_TRUE(request_ids.empty()); ASSERT_TRUE(request_ids.empty());
} }
#endif #endif
#endif
TEST(Client, ManagerCloseOneThread) { TEST(Client, ManagerCloseOneThread) {
td::ClientManager client_manager; td::ClientManager client_manager;