From 4cf29bd75f84c82f0a2dec2fc3732b2afe4a13ef Mon Sep 17 00:00:00 2001 From: Niels Ulrik Andersen Date: Sun, 19 Nov 2017 12:41:38 +0100 Subject: [PATCH 1/2] Clean up some thread usage, seems to close #168. --- .../updatesreceivers/DefaultBotSession.java | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java index 8de4ba24..55455b6b 100644 --- a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java +++ b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java @@ -69,7 +69,7 @@ public class DefaultBotSession implements BotSession { lastReceivedUpdate = 0; - readerThread = new ReaderThread(updatesSupplier); + readerThread = new ReaderThread(updatesSupplier, this); readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.start(); @@ -135,12 +135,14 @@ public class DefaultBotSession implements BotSession { private class ReaderThread extends Thread implements UpdatesReader { private final UpdatesSupplier updatesSupplier; + private final Object lock; private CloseableHttpClient httpclient; private ExponentialBackOff exponentialBackOff; private RequestConfig requestConfig; - public ReaderThread(UpdatesSupplier updatesSupplier) { + public ReaderThread(UpdatesSupplier updatesSupplier, Object lock) { this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer); + this.lock = lock; } @Override @@ -183,42 +185,46 @@ public class DefaultBotSession implements BotSession { public void run() { setPriority(Thread.MIN_PRIORITY); while (running) { - try { - List updates = updatesSupplier.getUpdates(); - if (updates.isEmpty()) { - synchronized (this) { - this.wait(500); - } - } else { - updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); - lastReceivedUpdate = updates.parallelStream() - .map( - Update::getUpdateId) - .max(Integer::compareTo) - .orElse(0); - receivedUpdates.addAll(updates); + synchronized (lock) { + if (running) { + try { + List updates = updatesSupplier.getUpdates(); + if (updates.isEmpty()) { + if (running) { + lock.wait(500); + } + } else { + updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); + lastReceivedUpdate = updates.parallelStream() + .map( + Update::getUpdateId) + .max(Integer::compareTo) + .orElse(0); + receivedUpdates.addAll(updates); - synchronized (receivedUpdates) { - receivedUpdates.notifyAll(); + synchronized (receivedUpdates) { + receivedUpdates.notifyAll(); + } + } + } catch (InterruptedException e) { + if (!running) { + receivedUpdates.clear(); + } + BotLogger.debug(LOGTAG, e); + } catch (Exception global) { + BotLogger.severe(LOGTAG, global); + try { + synchronized (lock) { + lock.wait(exponentialBackOff.nextBackOffMillis()); + } + } catch (InterruptedException e) { + if (!running) { + receivedUpdates.clear(); + } + BotLogger.debug(LOGTAG, e); + } } } - } catch (InterruptedException e) { - if (!running) { - receivedUpdates.clear(); - } - BotLogger.debug(LOGTAG, e); - } catch (Exception global) { - BotLogger.severe(LOGTAG, global); - try { - synchronized (this) { - this.wait(exponentialBackOff.nextBackOffMillis()); - } - } catch (InterruptedException e) { - if (!running) { - receivedUpdates.clear(); - } - BotLogger.debug(LOGTAG, e); - } } } BotLogger.debug(LOGTAG, "Reader thread has being closed"); @@ -248,8 +254,8 @@ public class DefaultBotSession implements BotSession { if (response.getStatusLine().getStatusCode() >= 500) { BotLogger.warn(LOGTAG, responseContent); - synchronized (this) { - this.wait(500); + synchronized (lock) { + lock.wait(500); } } else { try { @@ -261,9 +267,7 @@ public class DefaultBotSession implements BotSession { } } } catch (SocketException e) { - if (!e.getMessage().equals("Socket Closed")) { - BotLogger.severe(LOGTAG, e); - } + BotLogger.severe(LOGTAG, e); } catch (SocketTimeoutException e) { BotLogger.fine(LOGTAG, e); } catch (InvalidObjectException | TelegramApiRequestException e) { From e2aed55b11a7011f8bc3eb0c47f38388f4072988 Mon Sep 17 00:00:00 2001 From: Niels Ulrik Andersen Date: Sun, 19 Nov 2017 16:59:23 +0100 Subject: [PATCH 2/2] Clean up a bit in DefaultBotSession --- .../updatesreceivers/DefaultBotSession.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java index 55455b6b..2e310b33 100644 --- a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java +++ b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java @@ -190,9 +190,7 @@ public class DefaultBotSession implements BotSession { try { List updates = updatesSupplier.getUpdates(); if (updates.isEmpty()) { - if (running) { - lock.wait(500); - } + lock.wait(500); } else { updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); lastReceivedUpdate = updates.parallelStream() @@ -211,6 +209,7 @@ public class DefaultBotSession implements BotSession { receivedUpdates.clear(); } BotLogger.debug(LOGTAG, e); + interrupt(); } catch (Exception global) { BotLogger.severe(LOGTAG, global); try { @@ -222,6 +221,7 @@ public class DefaultBotSession implements BotSession { receivedUpdates.clear(); } BotLogger.debug(LOGTAG, e); + interrupt(); } } } @@ -230,7 +230,7 @@ public class DefaultBotSession implements BotSession { BotLogger.debug(LOGTAG, "Reader thread has being closed"); } - private List getUpdatesFromServer() throws InterruptedException, Exception { + private List getUpdatesFromServer() throws IOException { GetUpdates request = new GetUpdates() .setLimit(100) .setTimeout(ApiConstants.GETUPDATES_TIMEOUT) @@ -266,12 +266,13 @@ public class DefaultBotSession implements BotSession { BotLogger.severe(responseContent, LOGTAG, e); } } - } catch (SocketException e) { + } catch (SocketException | InvalidObjectException | TelegramApiRequestException e) { BotLogger.severe(LOGTAG, e); } catch (SocketTimeoutException e) { BotLogger.fine(LOGTAG, e); - } catch (InvalidObjectException | TelegramApiRequestException e) { - BotLogger.severe(LOGTAG, e); + } catch (InterruptedException e) { + BotLogger.fine(LOGTAG, e); + interrupt(); } return Collections.emptyList(); } @@ -279,7 +280,7 @@ public class DefaultBotSession implements BotSession { public interface UpdatesSupplier { - List getUpdates() throws InterruptedException, Exception; + List getUpdates() throws Exception; } private List getUpdateList() { @@ -310,6 +311,7 @@ public class DefaultBotSession implements BotSession { callback.onUpdatesReceived(updates); } catch (InterruptedException e) { BotLogger.debug(LOGTAG, e); + interrupt(); } catch (Exception e) { BotLogger.severe(LOGTAG, e); }