diff --git a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java index 8de4ba24..55455b6b 100644 --- a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java +++ b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java @@ -69,7 +69,7 @@ public class DefaultBotSession implements BotSession { lastReceivedUpdate = 0; - readerThread = new ReaderThread(updatesSupplier); + readerThread = new ReaderThread(updatesSupplier, this); readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.start(); @@ -135,12 +135,14 @@ public class DefaultBotSession implements BotSession { private class ReaderThread extends Thread implements UpdatesReader { private final UpdatesSupplier updatesSupplier; + private final Object lock; private CloseableHttpClient httpclient; private ExponentialBackOff exponentialBackOff; private RequestConfig requestConfig; - public ReaderThread(UpdatesSupplier updatesSupplier) { + public ReaderThread(UpdatesSupplier updatesSupplier, Object lock) { this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer); + this.lock = lock; } @Override @@ -183,42 +185,46 @@ public class DefaultBotSession implements BotSession { public void run() { setPriority(Thread.MIN_PRIORITY); while (running) { - try { - List updates = updatesSupplier.getUpdates(); - if (updates.isEmpty()) { - synchronized (this) { - this.wait(500); - } - } else { - updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); - lastReceivedUpdate = updates.parallelStream() - .map( - Update::getUpdateId) - .max(Integer::compareTo) - .orElse(0); - receivedUpdates.addAll(updates); + synchronized (lock) { + if (running) { + try { + List updates = updatesSupplier.getUpdates(); + if (updates.isEmpty()) { + if (running) { + lock.wait(500); + } + } else { + updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); + lastReceivedUpdate = updates.parallelStream() + .map( + Update::getUpdateId) + .max(Integer::compareTo) + .orElse(0); + receivedUpdates.addAll(updates); - synchronized (receivedUpdates) { - receivedUpdates.notifyAll(); + synchronized (receivedUpdates) { + receivedUpdates.notifyAll(); + } + } + } catch (InterruptedException e) { + if (!running) { + receivedUpdates.clear(); + } + BotLogger.debug(LOGTAG, e); + } catch (Exception global) { + BotLogger.severe(LOGTAG, global); + try { + synchronized (lock) { + lock.wait(exponentialBackOff.nextBackOffMillis()); + } + } catch (InterruptedException e) { + if (!running) { + receivedUpdates.clear(); + } + BotLogger.debug(LOGTAG, e); + } } } - } catch (InterruptedException e) { - if (!running) { - receivedUpdates.clear(); - } - BotLogger.debug(LOGTAG, e); - } catch (Exception global) { - BotLogger.severe(LOGTAG, global); - try { - synchronized (this) { - this.wait(exponentialBackOff.nextBackOffMillis()); - } - } catch (InterruptedException e) { - if (!running) { - receivedUpdates.clear(); - } - BotLogger.debug(LOGTAG, e); - } } } BotLogger.debug(LOGTAG, "Reader thread has being closed"); @@ -248,8 +254,8 @@ public class DefaultBotSession implements BotSession { if (response.getStatusLine().getStatusCode() >= 500) { BotLogger.warn(LOGTAG, responseContent); - synchronized (this) { - this.wait(500); + synchronized (lock) { + lock.wait(500); } } else { try { @@ -261,9 +267,7 @@ public class DefaultBotSession implements BotSession { } } } catch (SocketException e) { - if (!e.getMessage().equals("Socket Closed")) { - BotLogger.severe(LOGTAG, e); - } + BotLogger.severe(LOGTAG, e); } catch (SocketTimeoutException e) { BotLogger.fine(LOGTAG, e); } catch (InvalidObjectException | TelegramApiRequestException e) {