Use ClientManager for Java example interface implementation.

GitOrigin-RevId: 4280b6407a1c1a18bf2a6e952f6761847b69cb83
This commit is contained in:
levlam 2020-10-11 21:28:33 +03:00
parent 9856b0e46e
commit 919848f0fe
2 changed files with 98 additions and 156 deletions

View File

@ -8,14 +8,11 @@ package org.drinkless.tdlib;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Main class for interaction with the TDLib.
*/
public final class Client implements Runnable {
public final class Client {
/**
* Interface for handler for results of queries to TDLib and incoming updates from TDLib.
*/
@ -55,25 +52,11 @@ public final class Client implements Runnable {
* @throws NullPointerException if query is null.
*/
public void send(TdApi.Function query, ResultHandler resultHandler, ExceptionHandler exceptionHandler) {
if (query == null) {
throw new NullPointerException("query is null");
}
readLock.lock();
try {
if (isClientDestroyed) {
if (resultHandler != null) {
handleResult(new TdApi.Error(500, "Client is closed"), resultHandler, exceptionHandler);
}
return;
}
long queryId = currentQueryId.incrementAndGet();
long queryId = currentQueryId.incrementAndGet();
if (resultHandler != null) {
handlers.put(queryId, new Handler(resultHandler, exceptionHandler));
nativeClientSend(nativeClientId, queryId, query);
} finally {
readLock.unlock();
}
nativeClientSend(nativeClientId, queryId, query);
}
/**
@ -97,22 +80,9 @@ public final class Client implements Runnable {
* @throws NullPointerException if query is null.
*/
public static TdApi.Object execute(TdApi.Function query) {
if (query == null) {
throw new NullPointerException("query is null");
}
return nativeClientExecute(query);
}
/**
* Overridden method from Runnable, do not call it directly.
*/
@Override
public void run() {
while (!stopFlag) {
receiveQueries(300.0 /*seconds*/);
}
}
/**
* Creates new Client.
*
@ -123,54 +93,79 @@ public final class Client implements Runnable {
*/
public static Client create(ResultHandler updateHandler, ExceptionHandler updateExceptionHandler, ExceptionHandler defaultExceptionHandler) {
Client client = new Client(updateHandler, updateExceptionHandler, defaultExceptionHandler);
new Thread(client, "TDLib thread").start();
synchronized (responseReceiver) {
if (!responseReceiver.isRun) {
responseReceiver.isRun = true;
Thread receiverThread = new Thread(responseReceiver, "TDLib thread");
receiverThread.setDaemon(true);
receiverThread.start();
}
}
return client;
}
/**
* Closes Client.
*/
public void close() {
writeLock.lock();
try {
if (isClientDestroyed) {
return;
private static class ResponseReceiver implements Runnable {
public boolean isRun = false;
@Override
public void run() {
while (true) {
int resultN = nativeClientReceive(clientIds, eventIds, events, 100000.0 /*seconds*/);
for (int i = 0; i < resultN; i++) {
processResult(clientIds[i], eventIds[i], events[i]);
events[i] = null;
}
}
if (!stopFlag) {
send(new TdApi.Close(), null);
}
isClientDestroyed = true;
while (!stopFlag) {
Thread.yield();
}
while (!handlers.isEmpty()) {
receiveQueries(300.0);
}
updateHandlers.remove(nativeClientId);
defaultExceptionHandlers.remove(nativeClientId);
destroyNativeClient(nativeClientId);
} finally {
writeLock.unlock();
}
private void processResult(int clientId, long id, TdApi.Object object) {
boolean isClosed = false;
if (id == 0 && object instanceof TdApi.UpdateAuthorizationState) {
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) object).authorizationState;
if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
isClosed = true;
}
}
Handler handler = id == 0 ? updateHandlers.get(clientId) : handlers.remove(id);
if (handler != null) {
try {
handler.resultHandler.onResult(object);
} catch (Throwable cause) {
ExceptionHandler exceptionHandler = handler.exceptionHandler;
if (exceptionHandler == null) {
exceptionHandler = defaultExceptionHandlers.get(clientId);
}
if (exceptionHandler != null) {
try {
exceptionHandler.onException(cause);
} catch (Throwable ignored) {
}
}
}
}
if (isClosed) {
updateHandlers.remove(clientId); // there will be no more updates
defaultExceptionHandlers.remove(clientId); // ignore further exceptions
}
}
private static final int MAX_EVENTS = 1000;
private final int[] clientIds = new int[MAX_EVENTS];
private final long[] eventIds = new long[MAX_EVENTS];
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
}
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final int nativeClientId;
private volatile boolean stopFlag = false;
private volatile boolean isClientDestroyed = false;
private final long nativeClientId;
private static final ConcurrentHashMap<Integer, ExceptionHandler> defaultExceptionHandlers = new ConcurrentHashMap<Integer, ExceptionHandler>();
private static final ConcurrentHashMap<Integer, Handler> updateHandlers = new ConcurrentHashMap<Integer, Handler>();
private static final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
private static final AtomicLong currentQueryId = new AtomicLong();
private static final ConcurrentHashMap<Long, ExceptionHandler> defaultExceptionHandlers = new ConcurrentHashMap<Long, ExceptionHandler>();
private static final ConcurrentHashMap<Long, Handler> updateHandlers = new ConcurrentHashMap<Long, Handler>();
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
private final AtomicLong currentQueryId = new AtomicLong();
private static final int MAX_EVENTS = 1000;
private final long[] eventIds = new long[MAX_EVENTS];
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
private static final ResponseReceiver responseReceiver = new ResponseReceiver();
private static class Handler {
final ResultHandler resultHandler;
@ -184,76 +179,24 @@ public final class Client implements Runnable {
private Client(ResultHandler updateHandler, ExceptionHandler updateExceptionHandler, ExceptionHandler defaultExceptionHandler) {
nativeClientId = createNativeClient();
updateHandlers.put(nativeClientId, new Handler(updateHandler, updateExceptionHandler));
if (updateHandler != null) {
updateHandlers.put(nativeClientId, new Handler(updateHandler, updateExceptionHandler));
}
if (defaultExceptionHandler != null) {
defaultExceptionHandlers.put(nativeClientId, defaultExceptionHandler);
defaultExceptionHandlers.put(nativeClientId, defaultExceptionHandler);
}
}
@Override
protected void finalize() throws Throwable {
try {
close();
} finally {
super.finalize();
}
send(new TdApi.Close(), null, null);
}
private void processResult(long id, TdApi.Object object) {
if (object instanceof TdApi.UpdateAuthorizationState) {
if (((TdApi.UpdateAuthorizationState) object).authorizationState instanceof TdApi.AuthorizationStateClosed) {
stopFlag = true;
}
}
Handler handler;
if (id == 0) {
// update handler stays forever
handler = updateHandlers.get(nativeClientId);
} else {
handler = handlers.remove(id);
}
if (handler == null) {
return;
}
private static native int createNativeClient();
handleResult(object, handler.resultHandler, handler.exceptionHandler);
}
private static native void nativeClientSend(int nativeClientId, long eventId, TdApi.Function function);
private void handleResult(TdApi.Object object, ResultHandler resultHandler, ExceptionHandler exceptionHandler) {
if (resultHandler == null) {
return;
}
try {
resultHandler.onResult(object);
} catch (Throwable cause) {
if (exceptionHandler == null) {
exceptionHandler = defaultExceptionHandlers.get(nativeClientId);
}
if (exceptionHandler != null) {
try {
exceptionHandler.onException(cause);
} catch (Throwable ignored) {
}
}
}
}
private void receiveQueries(double timeout) {
int resultN = nativeClientReceive(nativeClientId, eventIds, events, timeout);
for (int i = 0; i < resultN; i++) {
processResult(eventIds[i], events[i]);
events[i] = null;
}
}
private static native long createNativeClient();
private static native void nativeClientSend(long nativeClientId, long eventId, TdApi.Function function);
private static native int nativeClientReceive(long nativeClientId, long[] eventIds, TdApi.Object[] events, double timeout);
private static native int nativeClientReceive(int[] clientIds, long[] eventIds, TdApi.Object[] events, double timeout);
private static native TdApi.Object nativeClientExecute(TdApi.Function function);
private static native void destroyNativeClient(long nativeClientId);
}

View File

@ -26,31 +26,35 @@ static td::td_api::object_ptr<td::td_api::Function> fetch_function(JNIEnv *env,
return result;
}
static td::Client *get_client(jlong client_id) {
return reinterpret_cast<td::Client *>(static_cast<std::uintptr_t>(client_id));
static td::ClientManager *get_manager() {
return td::ClientManager::get_manager_singleton();
}
static jlong Client_createNativeClient(JNIEnv *env, jclass clazz) {
return static_cast<jlong>(reinterpret_cast<std::uintptr_t>(new td::Client()));
static jint Client_createNativeClient(JNIEnv *env, jclass clazz) {
return static_cast<jint>(get_manager()->create_client());
}
static void Client_nativeClientSend(JNIEnv *env, jclass clazz, jlong client_id, jlong id, jobject function) {
get_client(client_id)->send({static_cast<std::uint64_t>(id), fetch_function(env, function)});
static void Client_nativeClientSend(JNIEnv *env, jclass clazz, jint client_id, jlong id, jobject function) {
get_manager()->send(static_cast<std::int32_t>(client_id), static_cast<std::uint64_t>(id),
fetch_function(env, function));
}
static jint Client_nativeClientReceive(JNIEnv *env, jclass clazz, jlong client_id, jlongArray ids, jobjectArray events,
jdouble timeout) {
auto client = get_client(client_id);
jsize events_size = env->GetArrayLength(ids); // ids and events size must be of equal size
static jint Client_nativeClientReceive(JNIEnv *env, jclass clazz, jintArray client_ids, jlongArray ids,
jobjectArray events, jdouble timeout) {
jsize events_size = env->GetArrayLength(ids); // client_ids, ids and events must be of equal size
if (events_size == 0) {
return 0;
}
jsize result_size = 0;
auto response = client->receive(timeout);
auto *manager = get_manager();
auto response = manager->receive(timeout);
while (response.object) {
jlong result_id = static_cast<jlong>(response.id);
env->SetLongArrayRegion(ids, result_size, 1, &result_id);
jint client_id = static_cast<jint>(response.client_id);
env->SetIntArrayRegion(client_ids, result_size, 1, &client_id);
jlong request_id = static_cast<jlong>(response.request_id);
env->SetLongArrayRegion(ids, result_size, 1, &request_id);
jobject object;
response.object->store(env, object);
@ -62,21 +66,17 @@ static jint Client_nativeClientReceive(JNIEnv *env, jclass clazz, jlong client_i
break;
}
response = client->receive(0);
response = manager->receive(0);
}
return result_size;
}
static jobject Client_nativeClientExecute(JNIEnv *env, jclass clazz, jobject function) {
jobject result;
td::Client::execute({0, fetch_function(env, function)}).object->store(env, result);
td::ClientManager::execute(fetch_function(env, function))->store(env, result);
return result;
}
static void Client_destroyNativeClient(JNIEnv *env, jclass clazz, jlong client_id) {
delete get_client(client_id);
}
static void Log_setVerbosityLevel(JNIEnv *env, jclass clazz, jint new_log_verbosity_level) {
td::Log::set_verbosity_level(static_cast<int>(new_log_verbosity_level));
}
@ -136,11 +136,10 @@ static jint register_native(JavaVM *vm) {
#define TD_OBJECT "L" PACKAGE_NAME "/TdApi$Object;"
#define TD_FUNCTION "L" PACKAGE_NAME "/TdApi$Function;"
register_method(client_class, "createNativeClient", "()J", Client_createNativeClient);
register_method(client_class, "nativeClientSend", "(JJ" TD_FUNCTION ")V", Client_nativeClientSend);
register_method(client_class, "nativeClientReceive", "(J[J[" TD_OBJECT "D)I", Client_nativeClientReceive);
register_method(client_class, "createNativeClient", "()I", Client_createNativeClient);
register_method(client_class, "nativeClientSend", "(IJ" TD_FUNCTION ")V", Client_nativeClientSend);
register_method(client_class, "nativeClientReceive", "([I[J[" TD_OBJECT "D)I", Client_nativeClientReceive);
register_method(client_class, "nativeClientExecute", "(" TD_FUNCTION ")" TD_OBJECT, Client_nativeClientExecute);
register_method(client_class, "destroyNativeClient", "(J)V", Client_destroyNativeClient);
register_method(log_class, "setVerbosityLevel", "(I)V", Log_setVerbosityLevel);
register_method(log_class, "setFilePath", "(Ljava/lang/String;)Z", Log_setFilePath);