TelegramBots/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java

308 lines
11 KiB
Java
Raw Normal View History

2016-01-14 23:09:19 +01:00
package org.telegram.telegrambots.updatesreceivers;
2016-01-14 01:14:53 +01:00
import com.fasterxml.jackson.databind.ObjectMapper;
2017-07-20 22:37:19 +02:00
import com.google.inject.Inject;
2016-01-14 01:14:53 +01:00
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
2016-01-14 01:14:53 +01:00
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.json.JSONException;
import org.telegram.telegrambots.ApiConstants;
2016-04-11 02:53:53 +02:00
import org.telegram.telegrambots.api.methods.updates.GetUpdates;
2016-01-14 23:09:19 +01:00
import org.telegram.telegrambots.api.objects.Update;
import org.telegram.telegrambots.bots.DefaultBotOptions;
import org.telegram.telegrambots.bots.TelegramBatchLongPollingBot;
2016-09-25 15:33:20 +02:00
import org.telegram.telegrambots.exceptions.TelegramApiRequestException;
2017-07-20 22:37:19 +02:00
import org.telegram.telegrambots.generics.*;
2016-05-27 02:04:21 +02:00
import org.telegram.telegrambots.logging.BotLogger;
2016-01-14 01:14:53 +01:00
import java.io.IOException;
import java.io.InvalidObjectException;
2017-07-20 22:37:19 +02:00
import java.net.SocketTimeoutException;
2016-04-12 19:53:21 +02:00
import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
2016-01-14 01:14:53 +01:00
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
2017-02-25 13:53:01 +01:00
import static org.telegram.telegrambots.Constants.SOCKET_TIMEOUT;
2016-01-14 01:14:53 +01:00
/**
* @author Ruben Bermudez
* @version 1.0
2016-12-26 03:46:46 +01:00
* Thread to request updates with active wait
2016-01-14 01:14:53 +01:00
*/
public class DefaultBotSession implements BotSession {
2016-12-26 03:46:46 +01:00
private static final String LOGTAG = "BOTSESSION";
private volatile boolean running = false;
2016-04-11 02:53:53 +02:00
private final ConcurrentLinkedDeque<Update> receivedUpdates = new ConcurrentLinkedDeque<>();
private final ObjectMapper objectMapper = new ObjectMapper();
2016-01-14 01:14:53 +01:00
private ReaderThread readerThread;
private HandlerThread handlerThread;
private LongPollingBot callback;
private String token;
private int lastReceivedUpdate = 0;
private DefaultBotOptions options;
2016-05-27 02:04:21 +02:00
@Inject
public DefaultBotSession() {
}
2016-05-27 02:04:21 +02:00
@Override
2016-12-26 03:46:46 +01:00
public synchronized void start() {
if (running) {
throw new IllegalStateException("Session already running");
}
running = true;
2016-12-26 03:46:46 +01:00
lastReceivedUpdate = 0;
readerThread = new ReaderThread();
readerThread.setName(callback.getBotUsername() + " Telegram Connection");
2016-05-27 02:04:21 +02:00
readerThread.start();
2016-12-26 03:46:46 +01:00
handlerThread = new HandlerThread();
handlerThread.setName(callback.getBotUsername() + " Telegram Executor");
2016-05-27 02:04:21 +02:00
handlerThread.start();
2016-01-14 01:14:53 +01:00
}
2016-05-27 02:04:21 +02:00
@Override
2016-12-26 03:46:46 +01:00
public synchronized void stop() {
if (!running) {
throw new IllegalStateException("Session already stopped");
}
2016-05-27 02:04:21 +02:00
running = false;
2016-12-26 03:46:46 +01:00
2016-05-27 02:04:21 +02:00
if (readerThread != null) {
readerThread.interrupt();
}
2016-12-26 03:46:46 +01:00
2016-05-27 02:04:21 +02:00
if (handlerThread != null) {
handlerThread.interrupt();
}
2016-12-26 03:46:46 +01:00
2016-09-25 15:33:20 +02:00
if (callback != null) {
callback.onClosing();
}
}
2016-01-14 01:14:53 +01:00
@Override
public void setOptions(BotOptions options) {
if (this.options != null) {
throw new InvalidParameterException("BotOptions has already been set");
}
this.options = (DefaultBotOptions) options;
}
@Override
public void setToken(String token) {
if (this.token != null) {
throw new InvalidParameterException("Token has already been set");
}
this.token = token;
}
@Override
public void setCallback(LongPollingBot callback) {
if (this.callback != null) {
throw new InvalidParameterException("Callback has already been set");
}
this.callback = callback;
}
@Override
2016-12-26 03:46:46 +01:00
public synchronized boolean isRunning() {
return running;
}
private class ReaderThread extends Thread implements UpdatesReader {
private CloseableHttpClient httpclient;
2017-02-25 20:11:42 +01:00
private ExponentialBackOff exponentialBackOff;
private RequestConfig requestConfig;
@Override
public synchronized void start() {
httpclient = HttpClientBuilder.create()
.setSSLHostnameVerifier(new NoopHostnameVerifier())
.setConnectionTimeToLive(70, TimeUnit.SECONDS)
.setMaxConnTotal(100)
.build();
requestConfig = options.getRequestConfig();
2017-02-25 20:11:42 +01:00
exponentialBackOff = options.getExponentialBackOff();
if (exponentialBackOff == null) {
exponentialBackOff = new ExponentialBackOff();
}
2016-12-26 03:46:46 +01:00
2017-02-25 13:53:01 +01:00
if (requestConfig == null) {
requestConfig = RequestConfig.copy(RequestConfig.custom().build())
.setSocketTimeout(SOCKET_TIMEOUT)
.setConnectTimeout(SOCKET_TIMEOUT)
.setConnectionRequestTimeout(SOCKET_TIMEOUT).build();
}
2016-12-26 03:46:46 +01:00
super.start();
}
@Override
public void interrupt() {
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException e) {
BotLogger.warn(LOGTAG, e);
}
}
super.interrupt();
}
2016-05-27 02:04:21 +02:00
@Override
2016-01-14 01:14:53 +01:00
public void run() {
setPriority(Thread.MIN_PRIORITY);
2016-05-27 02:04:21 +02:00
while (running) {
2016-01-14 01:14:53 +01:00
try {
2016-12-03 21:17:03 +01:00
GetUpdates request = new GetUpdates()
.setLimit(100)
.setTimeout(ApiConstants.GETUPDATES_TIMEOUT)
.setOffset(lastReceivedUpdate + 1);
if (options.getAllowedUpdates() != null) {
request.setAllowedUpdates(options.getAllowedUpdates());
}
2017-05-11 03:02:54 +02:00
String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH;
//http client
HttpPost httpPost = new HttpPost(url);
2016-04-12 19:53:21 +02:00
httpPost.addHeader("charset", StandardCharsets.UTF_8.name());
httpPost.setConfig(requestConfig);
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON));
2016-01-14 01:14:53 +01:00
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
HttpEntity ht = response.getEntity();
BufferedHttpEntity buf = new BufferedHttpEntity(ht);
String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8);
2017-02-25 20:11:42 +01:00
if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent);
synchronized (this) {
this.wait(500);
}
} else {
try {
List<Update> updates = request.deserializeResponse(responseContent);
exponentialBackOff.reset();
if (updates.isEmpty()) {
synchronized (this) {
this.wait(500);
}
} else {
updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
lastReceivedUpdate = updates.parallelStream()
.map(
Update::getUpdateId)
.max(Integer::compareTo)
.orElse(0);
receivedUpdates.addAll(updates);
synchronized (receivedUpdates) {
receivedUpdates.notifyAll();
}
2016-01-14 01:14:53 +01:00
}
2017-02-25 20:11:42 +01:00
} catch (JSONException e) {
BotLogger.severe(responseContent, LOGTAG, e);
2016-01-14 01:14:53 +01:00
}
}
2017-07-20 22:37:19 +02:00
} catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e);
2016-09-25 15:33:20 +02:00
} catch (InvalidObjectException | TelegramApiRequestException e) {
2016-05-01 22:43:42 +02:00
BotLogger.severe(LOGTAG, e);
2016-01-14 01:14:53 +01:00
}
2016-09-25 15:33:20 +02:00
} catch (InterruptedException e) {
2016-10-02 22:22:24 +02:00
if (!running) {
receivedUpdates.clear();
}
2016-09-25 15:33:20 +02:00
BotLogger.debug(LOGTAG, e);
2016-05-01 22:43:42 +02:00
} catch (Exception global) {
BotLogger.severe(LOGTAG, global);
try {
synchronized (this) {
2017-02-25 20:11:42 +01:00
this.wait(exponentialBackOff.nextBackOffMillis());
}
2016-05-01 22:43:42 +02:00
} catch (InterruptedException e) {
2016-10-02 22:22:24 +02:00
if (!running) {
receivedUpdates.clear();
}
2016-09-25 15:33:20 +02:00
BotLogger.debug(LOGTAG, e);
}
2016-01-14 01:14:53 +01:00
}
}
2016-09-25 15:33:20 +02:00
BotLogger.debug(LOGTAG, "Reader thread has being closed");
2016-01-14 01:14:53 +01:00
}
}
private List<Update> getUpdateList() {
List<Update> updates = new ArrayList<>();
for (Iterator<Update> it = receivedUpdates.iterator(); it.hasNext();) {
updates.add(it.next());
it.remove();
}
return updates;
}
private class HandlerThread extends Thread implements UpdatesHandler {
2016-01-14 01:14:53 +01:00
@Override
public void run() {
setPriority(Thread.MIN_PRIORITY);
2016-05-27 02:04:21 +02:00
while (running) {
2016-01-14 01:14:53 +01:00
try {
if (callback instanceof TelegramBatchLongPollingBot) {
List<Update> updates = getUpdateList();
if (updates.isEmpty()) {
synchronized (receivedUpdates) {
receivedUpdates.wait();
updates = getUpdateList();
if (updates.isEmpty()) {
continue;
}
}
}
((TelegramBatchLongPollingBot) callback).onUpdatesReceived(updates);
} else {
Update update = receivedUpdates.pollFirst();
if (update == null) {
synchronized (receivedUpdates) {
receivedUpdates.wait();
update = receivedUpdates.pollFirst();
if (update == null) {
continue;
}
2016-01-14 01:14:53 +01:00
}
}
callback.onUpdateReceived(update);
2016-01-14 01:14:53 +01:00
}
2016-09-25 15:33:20 +02:00
} catch (InterruptedException e) {
BotLogger.debug(LOGTAG, e);
2016-05-01 22:43:42 +02:00
} catch (Exception e) {
BotLogger.severe(LOGTAG, e);
2016-01-14 01:14:53 +01:00
}
}
2016-09-25 15:33:20 +02:00
BotLogger.debug(LOGTAG, "Handler thread has being closed");
2016-01-14 01:14:53 +01:00
}
}
}