diff --git a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java index 8de4ba24..2e310b33 100644 --- a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java +++ b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java @@ -69,7 +69,7 @@ public class DefaultBotSession implements BotSession { lastReceivedUpdate = 0; - readerThread = new ReaderThread(updatesSupplier); + readerThread = new ReaderThread(updatesSupplier, this); readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.start(); @@ -135,12 +135,14 @@ public class DefaultBotSession implements BotSession { private class ReaderThread extends Thread implements UpdatesReader { private final UpdatesSupplier updatesSupplier; + private final Object lock; private CloseableHttpClient httpclient; private ExponentialBackOff exponentialBackOff; private RequestConfig requestConfig; - public ReaderThread(UpdatesSupplier updatesSupplier) { + public ReaderThread(UpdatesSupplier updatesSupplier, Object lock) { this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer); + this.lock = lock; } @Override @@ -183,48 +185,52 @@ public class DefaultBotSession implements BotSession { public void run() { setPriority(Thread.MIN_PRIORITY); while (running) { - try { - List<Update> updates = updatesSupplier.getUpdates(); - if (updates.isEmpty()) { - synchronized (this) { - this.wait(500); - } - } else { - updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); - lastReceivedUpdate = updates.parallelStream() - .map( - Update::getUpdateId) - .max(Integer::compareTo) - .orElse(0); - receivedUpdates.addAll(updates); + synchronized (lock) { + if (running) { + try { + List<Update> updates = updatesSupplier.getUpdates(); + if (updates.isEmpty()) { + lock.wait(500); + } else { + updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); + lastReceivedUpdate = updates.parallelStream() + .map( + Update::getUpdateId) + .max(Integer::compareTo) + .orElse(0); + receivedUpdates.addAll(updates); - synchronized (receivedUpdates) { - receivedUpdates.notifyAll(); + synchronized (receivedUpdates) { + receivedUpdates.notifyAll(); + } + } + } catch (InterruptedException e) { + if (!running) { + receivedUpdates.clear(); + } + BotLogger.debug(LOGTAG, e); + interrupt(); + } catch (Exception global) { + BotLogger.severe(LOGTAG, global); + try { + synchronized (lock) { + lock.wait(exponentialBackOff.nextBackOffMillis()); + } + } catch (InterruptedException e) { + if (!running) { + receivedUpdates.clear(); + } + BotLogger.debug(LOGTAG, e); + interrupt(); + } } } - } catch (InterruptedException e) { - if (!running) { - receivedUpdates.clear(); - } - BotLogger.debug(LOGTAG, e); - } catch (Exception global) { - BotLogger.severe(LOGTAG, global); - try { - synchronized (this) { - this.wait(exponentialBackOff.nextBackOffMillis()); - } - } catch (InterruptedException e) { - if (!running) { - receivedUpdates.clear(); - } - BotLogger.debug(LOGTAG, e); - } } } BotLogger.debug(LOGTAG, "Reader thread has being closed"); } - private List<Update> getUpdatesFromServer() throws InterruptedException, Exception { + private List<Update> getUpdatesFromServer() throws IOException { GetUpdates request = new GetUpdates() .setLimit(100) .setTimeout(ApiConstants.GETUPDATES_TIMEOUT) @@ -248,8 +254,8 @@ public class DefaultBotSession implements BotSession { if (response.getStatusLine().getStatusCode() >= 500) { BotLogger.warn(LOGTAG, responseContent); - synchronized (this) { - this.wait(500); + synchronized (lock) { + lock.wait(500); } } else { try { @@ -260,14 +266,13 @@ public class DefaultBotSession implements BotSession { BotLogger.severe(responseContent, LOGTAG, e); } } - } catch (SocketException e) { - if (!e.getMessage().equals("Socket Closed")) { - BotLogger.severe(LOGTAG, e); - } + } catch (SocketException | InvalidObjectException | TelegramApiRequestException e) { + BotLogger.severe(LOGTAG, e); } catch (SocketTimeoutException e) { BotLogger.fine(LOGTAG, e); - } catch (InvalidObjectException | TelegramApiRequestException e) { - BotLogger.severe(LOGTAG, e); + } catch (InterruptedException e) { + BotLogger.fine(LOGTAG, e); + interrupt(); } return Collections.emptyList(); } @@ -275,7 +280,7 @@ public class DefaultBotSession implements BotSession { public interface UpdatesSupplier { - List<Update> getUpdates() throws InterruptedException, Exception; + List<Update> getUpdates() throws Exception; } private List<Update> getUpdateList() { @@ -306,6 +311,7 @@ public class DefaultBotSession implements BotSession { callback.onUpdatesReceived(updates); } catch (InterruptedException e) { BotLogger.debug(LOGTAG, e); + interrupt(); } catch (Exception e) { BotLogger.severe(LOGTAG, e); }