Performance improvements, better initialization

This commit is contained in:
Andrea Cavalli 2023-05-11 01:05:00 +02:00
parent a6e075832f
commit 35bdc5653c
13 changed files with 97 additions and 68 deletions

4
.gitignore vendored
View File

@ -95,3 +95,7 @@ parent/.classpath
tdlight-java/target-snapshot/ tdlight-java/target-snapshot/
tdlight-java-8/target-snapshot/ tdlight-java-8/target-snapshot/
/example-tdlight-session/data/db.sqlite
/example-tdlight-session/data/db.sqlite-shm
/example-tdlight-session/data/db.sqlite-wal
/example-tdlight-session/data/td.binlog

View File

@ -6,11 +6,14 @@ import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.Update;
import it.tdlight.util.CleanSupport; import it.tdlight.util.CleanSupport;
import it.tdlight.util.CleanSupport.CleanableSupport; import it.tdlight.util.CleanSupport.CleanableSupport;
import java.util.Map;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MarkerFactory;
class AutoCleaningTelegramClient implements TelegramClient { class AutoCleaningTelegramClient implements TelegramClient {
private final InternalClient client;
private final TelegramClient client;
private volatile CleanableSupport cleanable; private volatile CleanableSupport cleanable;
AutoCleaningTelegramClient(InternalClientsState state) { AutoCleaningTelegramClient(InternalClientsState state) {
@ -18,7 +21,12 @@ class AutoCleaningTelegramClient implements TelegramClient {
} }
public void onClientRegistered(int clientId, LongSupplier nextQueryIdSupplier) { public void onClientRegistered(int clientId, LongSupplier nextQueryIdSupplier) {
Runnable shutDown = () -> NativeClientAccess.send(clientId, nextQueryIdSupplier.getAsLong(), new TdApi.Close()); Runnable shutDown = () -> {
Logger logger = LoggerFactory.getLogger(TelegramClient.class);
logger.debug(MarkerFactory.getMarker("TG"), "The client is being shut down automatically");
long reqId = nextQueryIdSupplier.getAsLong();
NativeClientAccess.send(clientId, reqId, new TdApi.Close());
};
Thread shutdownHook = new Thread(shutDown); Thread shutdownHook = new Thread(shutDown);
Runtime.getRuntime().addShutdownHook(shutdownHook); Runtime.getRuntime().addShutdownHook(shutdownHook);
cleanable = CleanSupport.register(this, shutDown); cleanable = CleanSupport.register(this, shutDown);

View File

@ -115,9 +115,9 @@ public class ClientFactory implements AutoCloseable {
} }
if (isClosed) { if (isClosed) {
logger.trace("Removing Client {} from event handlers", clientId); logger.debug("Removing Client {} from event handlers", clientId);
state.removeClientEventHandlers(clientId); state.removeClientEventHandlers(clientId);
logger.trace("Removed Client {} from event handlers", clientId); logger.debug("Removed Client {} from event handlers", clientId);
} }
} }

View File

@ -1,7 +1,9 @@
package it.tdlight; package it.tdlight;
import java.util.Map;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
public interface ClientRegistrationEventHandler { interface ClientRegistrationEventHandler {
void onClientRegistered(int clientId, LongSupplier nextQueryIdSupplier); void onClientRegistered(int clientId, LongSupplier nextQueryIdSupplier);
} }

View File

@ -42,38 +42,42 @@ public final class Init {
*/ */
public static void init() throws UnsupportedNativeLibraryException { public static void init() throws UnsupportedNativeLibraryException {
if (!started) { if (!started) {
boolean shouldStart = false;
synchronized (Init.class) { synchronized (Init.class) {
if (!started) { if (!started) {
Native.loadNativesInternal();
ConstructorDetector.init();
try {
NativeClientAccess.execute(new SetLogVerbosityLevel(3));
Log.setLogMessageHandler(3, (verbosityLevel, message) -> {
switch (verbosityLevel) {
case -1:
case 0:
case 1:
LOG.error(message);
break;
case 2:
LOG.warn(message);
break;
case 3:
LOG.info(message);
break;
case 4:
LOG.debug(message);
break;
default:
LOG.trace(message);
break;
}
});
NativeClientAccess.execute(new SetLogStream(new LogStreamEmpty()));
} catch (Throwable ex) {
LOG.error("Can't set verbosity level on startup", ex);
}
started = true; started = true;
shouldStart = true;
}
}
if (shouldStart) {
Native.loadNativesInternal();
ConstructorDetector.init();
try {
NativeClientAccess.execute(new SetLogVerbosityLevel(3));
Log.setLogMessageHandler(3, (verbosityLevel, message) -> {
switch (verbosityLevel) {
case -1:
case 0:
case 1:
LOG.error(message);
break;
case 2:
LOG.warn(message);
break;
case 3:
LOG.info(message);
break;
case 4:
LOG.debug(message);
break;
default:
LOG.trace(message);
break;
}
});
NativeClientAccess.execute(new SetLogStream(new LogStreamEmpty()));
} catch (Throwable ex) {
LOG.error("Can't set verbosity level on startup", ex);
} }
} }
} }

View File

@ -8,6 +8,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker; import org.slf4j.Marker;
@ -15,8 +17,8 @@ import org.slf4j.MarkerFactory;
final class InternalClient implements ClientEventsHandler, TelegramClient { final class InternalClient implements ClientEventsHandler, TelegramClient {
private static final Marker TG_MARKER = MarkerFactory.getMarker("TG"); static final Marker TG_MARKER = MarkerFactory.getMarker("TG");
private static final Logger logger = LoggerFactory.getLogger(TelegramClient.class); static final Logger logger = LoggerFactory.getLogger(TelegramClient.class);
private ClientRegistrationEventHandler clientRegistrationEventHandler; private ClientRegistrationEventHandler clientRegistrationEventHandler;
private final Map<Long, Handler<?>> handlers = new ConcurrentHashMap<>(); private final Map<Long, Handler<?>> handlers = new ConcurrentHashMap<>();
@ -80,11 +82,11 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
} }
private void handleClose() { private void handleClose() {
logger.trace(TG_MARKER, "Received close"); logger.debug(TG_MARKER, "Received close");
handlers.forEach((eventId, handler) -> handlers.forEach((eventId, handler) ->
handleResponse(eventId, new TdApi.Error(500, "Instance closed"), handler)); handleResponse(eventId, new TdApi.Error(500, "Instance closed"), handler));
handlers.clear(); handlers.clear();
logger.info(TG_MARKER, "Client closed {}", clientId); logger.debug(TG_MARKER, "Client closed {}", clientId);
} }
/** /**
@ -98,7 +100,7 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
handleException(handler.getExceptionHandler(), cause); handleException(handler.getExceptionHandler(), cause);
} }
} else { } else {
logger.error(TG_MARKER, "Unknown event id \"{}\", the event has been dropped! {}", eventId, event); logger.trace(TG_MARKER, "Client {}, request event id is not registered \"{}\", the following response has been dropped. {}", clientId, eventId, event);
} }
} }
@ -106,7 +108,7 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
* Handles a response or an update * Handles a response or an update
*/ */
private void handleEvent(long eventId, TdApi.Object event) { private void handleEvent(long eventId, TdApi.Object event) {
logger.trace(TG_MARKER, "Received response {}: {}", eventId, event); logger.trace(TG_MARKER, "Client {}, response received for request {}: {}", clientId, eventId, event);
if (updatesHandler != null || updateHandler == null) { if (updatesHandler != null || updateHandler == null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -164,18 +166,23 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
logger.info(TG_MARKER, "Registered new client {}", clientId); logger.info(TG_MARKER, "Registered new client {}", clientId);
// Send a dummy request to start TDLib // Send a dummy request to start TDLib
this.send(new TdApi.GetOption("version"), (result) -> {}, ex -> {}); logger.debug(TG_MARKER, "Sending dummy startup request as client {}", clientId);
TdApi.Function<?> dummyRequest = new TdApi.GetOption("version");
this.send(dummyRequest, null, null);
// test Client.execute
this.execute(new TdApi.GetTextEntities("@telegram /test_command https://telegram.org telegram.me @gif @test"));
} }
@Override @Override
public <R extends TdApi.Object> void send(Function<R> query, public <R extends TdApi.Object> void send(Function<R> query,
ResultHandler<R> resultHandler, ResultHandler<R> resultHandler,
ExceptionHandler exceptionHandler) { ExceptionHandler exceptionHandler) {
logger.trace(TG_MARKER, "Trying to send {}", query); logger.trace(TG_MARKER, "Trying to send async request {}", query);
// Handle special requests // Handle special requests
TdApi.Object specialResult = tryHandleSpecial(query); TdApi.Object specialResult = tryHandleSpecial(query);
if (specialResult != null) { if (specialResult != null) {
logger.trace(TG_MARKER, "Handling special result for async request {}: {}", query, specialResult);
if (resultHandler != null) { if (resultHandler != null) {
resultHandler.onResult(specialResult); resultHandler.onResult(specialResult);
} }
@ -191,11 +198,12 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
@Override @Override
public <R extends TdApi.Object> TdApi.Object execute(Function<R> query) { public <R extends TdApi.Object> TdApi.Object execute(Function<R> query) {
logger.trace(TG_MARKER, "Trying to execute {}", query); logger.trace(TG_MARKER, "Trying to execute sync request {}", query);
// Handle special requests // Handle special requests
TdApi.Object specialResult = tryHandleSpecial(query); TdApi.Object specialResult = tryHandleSpecial(query);
if (specialResult != null) { if (specialResult != null) {
logger.trace(TG_MARKER, "Handling special result for sync request {}: {}", query, specialResult);
return specialResult; return specialResult;
} }

View File

@ -2,6 +2,7 @@ package it.tdlight;
import io.atlassian.util.concurrent.CopyOnWriteMap; import io.atlassian.util.concurrent.CopyOnWriteMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -13,11 +14,11 @@ public class InternalClientsState {
static final int STATE_STOPPED = 4; static final int STATE_STOPPED = 4;
private final AtomicInteger runState = new AtomicInteger(); private final AtomicInteger runState = new AtomicInteger();
private final AtomicLong currentQueryId = new AtomicLong(); private final AtomicLong currentQueryId = new AtomicLong();
private final Map<Integer, ClientEventsHandler> registeredClientEventHandlers = CopyOnWriteMap.newHashMap(); private final Map<Integer, ClientEventsHandler> registeredClientEventHandlers = new ConcurrentHashMap<>();
public long getNextQueryId() { public long getNextQueryId() {
return currentQueryId.updateAndGet(value -> (value >= Long.MAX_VALUE ? 0 : value) + 1); return currentQueryId.updateAndGet(value -> (value == Long.MAX_VALUE ? 0 : value) + 1);
} }
public void registerClient(int clientId, ClientEventsHandler internalClient) { public void registerClient(int clientId, ClientEventsHandler internalClient) {

View File

@ -80,11 +80,11 @@ final class InternalReactiveClient implements ClientEventsHandler, ReactiveTeleg
* This method will be called exactly once * This method will be called exactly once
*/ */
private void handleClose() { private void handleClose() {
logger.trace(TG_MARKER, "Received close"); logger.debug(TG_MARKER, "Received close");
try { try {
Runtime.getRuntime().removeShutdownHook(shutdownHook); Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException ignored) { } catch (IllegalStateException ignored) {
logger.trace(TG_MARKER, "Can't remove shutdown hook because the JVM is already shutting down"); logger.debug(TG_MARKER, "Can't remove shutdown hook because the JVM is already shutting down");
} }
TdApi.Error instanceClosedError = new Error(500, "Instance closed"); TdApi.Error instanceClosedError = new Error(500, "Instance closed");
handlers.forEach((eventId, handler) -> this.handleResponse(eventId, instanceClosedError, handler)); handlers.forEach((eventId, handler) -> this.handleResponse(eventId, instanceClosedError, handler));
@ -186,7 +186,7 @@ final class InternalReactiveClient implements ClientEventsHandler, ReactiveTeleg
subscriber.onNext(new TdApi.Ok()); subscriber.onNext(new TdApi.Ok());
subscriber.onComplete(); subscriber.onComplete();
} else if (clientId == null) { } else if (clientId == null) {
logger.trace(TG_MARKER, logger.debug(TG_MARKER,
"Can't send a request to TDLib before calling \"createAndRegisterClient\" function!" "Can't send a request to TDLib before calling \"createAndRegisterClient\" function!"
); );
subscriber.onError(new IllegalStateException( subscriber.onError(new IllegalStateException(

View File

@ -15,9 +15,12 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.StringJoiner; import java.util.StringJoiner;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class ResponseReceiver extends Thread implements AutoCloseable { abstract class ResponseReceiver extends Thread implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ResponseReceiver.class);
private static final String FLAG_USE_OPTIMIZED_DISPATCHER = "tdlight.dispatcher.use_optimized_dispatcher"; private static final String FLAG_USE_OPTIMIZED_DISPATCHER = "tdlight.dispatcher.use_optimized_dispatcher";
private static final boolean USE_OPTIMIZED_DISPATCHER private static final boolean USE_OPTIMIZED_DISPATCHER
= Boolean.parseBoolean(System.getProperty(FLAG_USE_OPTIMIZED_DISPATCHER, "true")); = Boolean.parseBoolean(System.getProperty(FLAG_USE_OPTIMIZED_DISPATCHER, "true"));
@ -59,6 +62,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
@Override @Override
public void run() { public void run() {
LOG.debug("ResponseReceiver is now running");
int[] sortIndex; int[] sortIndex;
final SimpleIntQueue closedClients = new SimpleIntQueue(); final SimpleIntQueue closedClients = new SimpleIntQueue();
try { try {
@ -66,6 +70,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
while (!(interrupted = Thread.interrupted()) && registeredClients.length > 0) { while (!(interrupted = Thread.interrupted()) && registeredClients.length > 0) {
// Timeout is expressed in seconds // Timeout is expressed in seconds
int resultsCount = receive(clientIds, eventIds, events, 2.0); int resultsCount = receive(clientIds, eventIds, events, 2.0);
LOG.trace("Received {} events", resultsCount);
if (resultsCount <= 0) { if (resultsCount <= 0) {
SpinWaitSupport.onSpinWait(); SpinWaitSupport.onSpinWait();
@ -207,12 +212,15 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
} }
} }
LOG.trace("ResponseReceiver will no longer process updates");
if (interrupted) { if (interrupted) {
for (int clientId : registeredClients) { for (int clientId : registeredClients) {
eventsHandler.handleClientEvents(clientId, true, clientEventIds, clientEvents, 0, 0); eventsHandler.handleClientEvents(clientId, true, clientEventIds, clientEvents, 0, 0);
} }
} }
} finally { } finally {
LOG.debug("ResponseReceiver stopped");
this.closeWait.countDown(); this.closeWait.countDown();
} }
} }
@ -266,6 +274,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
public void close() throws InterruptedException { public void close() throws InterruptedException {
this.closeWait.await(); this.closeWait.await();
if (registeredClients.length == 0) { if (registeredClients.length == 0) {
LOG.debug("Interrupting response receiver");
ResponseReceiver.this.interrupt(); ResponseReceiver.this.interrupt();
} }
} }

View File

@ -9,6 +9,7 @@ class SequentialRequestsExecutor implements Executor {
private final Executor executor = Executors.newSingleThreadExecutor(r -> { private final Executor executor = Executors.newSingleThreadExecutor(r -> {
final Thread thread = new Thread(r); final Thread thread = new Thread(r);
thread.setName("TDLight user input request");
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
}); });

View File

@ -7,7 +7,6 @@ import it.tdlight.Init;
import it.tdlight.ResultHandler; import it.tdlight.ResultHandler;
import it.tdlight.TelegramClient; import it.tdlight.TelegramClient;
import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.Update;
import it.tdlight.util.MapUtils;
import it.tdlight.util.UnsupportedNativeLibraryException; import it.tdlight.util.UnsupportedNativeLibraryException;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.ChatListArchive; import it.tdlight.jni.TdApi.ChatListArchive;
@ -17,7 +16,6 @@ import it.tdlight.jni.TdApi.User;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.IdentityHashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -45,7 +43,7 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
private final TelegramClient client; private final TelegramClient client;
private ClientInteraction clientInteraction; private ClientInteraction clientInteraction;
private final TDLibSettings settings; private final TDLibSettings settings;
private AuthenticationSupplier<?> authenticationData; private final AuthenticationSupplier<?> authenticationData;
private final CopyOnWriteMap<String, CopyOnWriteMap<CommandHandler, Void>> commandHandlers = CopyOnWriteMap.newHashMap(); private final CopyOnWriteMap<String, CopyOnWriteMap<CommandHandler, Void>> commandHandlers = CopyOnWriteMap.newHashMap();
private final CopyOnWriteMap<ResultHandler<TdApi.Update>, Void> updateHandlers = CopyOnWriteMap.newHashMap(); private final CopyOnWriteMap<ResultHandler<TdApi.Update>, Void> updateHandlers = CopyOnWriteMap.newHashMap();
@ -64,7 +62,8 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
Set<ResultHandler<Update>> updateHandlers, Set<ResultHandler<Update>> updateHandlers,
Set<ExceptionHandler> updateExceptionHandlers, Set<ExceptionHandler> updateExceptionHandlers,
Set<ExceptionHandler> defaultExceptionHandlers, Set<ExceptionHandler> defaultExceptionHandlers,
ClientInteraction clientInteraction) { ClientInteraction clientInteraction,
AuthenticationSupplier<?> authenticationData) {
this.client = clientFactory.createClient(); this.client = clientFactory.createClient();
this.settings = Objects.requireNonNull(settings, "TDLight client settings are null"); this.settings = Objects.requireNonNull(settings, "TDLight client settings are null");
@ -115,6 +114,9 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
this.addUpdateHandler(TdApi.UpdateAuthorizationState.class, this.addUpdateHandler(TdApi.UpdateAuthorizationState.class,
this.meGetter = new AuthorizationStateReadyGetMe(client, mainChatsLoader, archivedChatsLoader)); this.meGetter = new AuthorizationStateReadyGetMe(client, mainChatsLoader, archivedChatsLoader));
this.addUpdateHandler(TdApi.UpdateNewMessage.class, new CommandsHandler(client, this.commandHandlers, this::getMe)); this.addUpdateHandler(TdApi.UpdateNewMessage.class, new CommandsHandler(client, this.commandHandlers, this::getMe));
this.authenticationData = authenticationData;
createDirectories();
client.initialize(this::handleUpdate, this::handleUpdateException, this::handleDefaultException);
} }
private void handleUpdate(TdApi.Object update) { private void handleUpdate(TdApi.Object update) {
@ -196,15 +198,6 @@ public final class SimpleTelegramClient implements Authenticable, MutableTelegra
this.defaultExceptionHandlers.put(defaultExceptionHandlers, null); this.defaultExceptionHandlers.put(defaultExceptionHandlers, null);
} }
/**
* Start the client
*/
public void start(AuthenticationSupplier<?> authenticationData) {
this.authenticationData = authenticationData;
createDirectories();
client.initialize(this::handleUpdate, this::handleUpdateException, this::handleDefaultException);
}
private void createDirectories() { private void createDirectories() {
try { try {
if (Files.notExists(settings.getDatabaseDirectoryPath())) { if (Files.notExists(settings.getDatabaseDirectoryPath())) {

View File

@ -62,17 +62,16 @@ public final class SimpleTelegramClientBuilder implements MutableTelegramClient
* Build and start the client * Build and start the client
* @return Telegram client * @return Telegram client
*/ */
public SimpleTelegramClient build(AuthenticationSupplier authenticationData) { public SimpleTelegramClient build(AuthenticationSupplier<?> authenticationData) {
SimpleTelegramClient client = new SimpleTelegramClient(clientManager, return new SimpleTelegramClient(clientManager,
clientSettings, clientSettings,
commandHandlers, commandHandlers,
updateHandlers, updateHandlers,
updateExceptionHandlers, updateExceptionHandlers,
defaultExceptionHandlers, defaultExceptionHandlers,
clientInteraction clientInteraction,
authenticationData
); );
client.start(authenticationData);
return client;
} }
} }

View File

@ -34,7 +34,7 @@ public final class Native {
* Internal util * Internal util
*/ */
public static void loadNativesInternal() throws UnsupportedNativeLibraryException { public static void loadNativesInternal() throws UnsupportedNativeLibraryException {
loadLibrary("tdlight"); loadLibrary("tdjni");
} }
private static final Logger logger = LoggerFactory.getLogger(Native.class); private static final Logger logger = LoggerFactory.getLogger(Native.class);