Merge branch 'SocketException' of https://github.com/myplacedk/TelegramBots into myplacedk-SocketException

This commit is contained in:
Ruben Bermudez 2018-02-13 21:51:07 +01:00
commit 7d6fdcbf69

View File

@ -69,7 +69,7 @@ public class DefaultBotSession implements BotSession {
lastReceivedUpdate = 0; lastReceivedUpdate = 0;
readerThread = new ReaderThread(updatesSupplier); readerThread = new ReaderThread(updatesSupplier, this);
readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.setName(callback.getBotUsername() + " Telegram Connection");
readerThread.start(); readerThread.start();
@ -135,12 +135,14 @@ public class DefaultBotSession implements BotSession {
private class ReaderThread extends Thread implements UpdatesReader { private class ReaderThread extends Thread implements UpdatesReader {
private final UpdatesSupplier updatesSupplier; private final UpdatesSupplier updatesSupplier;
private final Object lock;
private CloseableHttpClient httpclient; private CloseableHttpClient httpclient;
private ExponentialBackOff exponentialBackOff; private ExponentialBackOff exponentialBackOff;
private RequestConfig requestConfig; private RequestConfig requestConfig;
public ReaderThread(UpdatesSupplier updatesSupplier) { public ReaderThread(UpdatesSupplier updatesSupplier, Object lock) {
this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer); this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer);
this.lock = lock;
} }
@Override @Override
@ -183,48 +185,52 @@ public class DefaultBotSession implements BotSession {
public void run() { public void run() {
setPriority(Thread.MIN_PRIORITY); setPriority(Thread.MIN_PRIORITY);
while (running) { while (running) {
try { synchronized (lock) {
List<Update> updates = updatesSupplier.getUpdates(); if (running) {
if (updates.isEmpty()) { try {
synchronized (this) { List<Update> updates = updatesSupplier.getUpdates();
this.wait(500); if (updates.isEmpty()) {
} lock.wait(500);
} else { } else {
updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
lastReceivedUpdate = updates.parallelStream() lastReceivedUpdate = updates.parallelStream()
.map( .map(
Update::getUpdateId) Update::getUpdateId)
.max(Integer::compareTo) .max(Integer::compareTo)
.orElse(0); .orElse(0);
receivedUpdates.addAll(updates); receivedUpdates.addAll(updates);
synchronized (receivedUpdates) { synchronized (receivedUpdates) {
receivedUpdates.notifyAll(); receivedUpdates.notifyAll();
}
}
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
interrupt();
} catch (Exception global) {
BotLogger.severe(LOGTAG, global);
try {
synchronized (lock) {
lock.wait(exponentialBackOff.nextBackOffMillis());
}
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
interrupt();
}
} }
} }
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
} catch (Exception global) {
BotLogger.severe(LOGTAG, global);
try {
synchronized (this) {
this.wait(exponentialBackOff.nextBackOffMillis());
}
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
}
} }
} }
BotLogger.debug(LOGTAG, "Reader thread has being closed"); BotLogger.debug(LOGTAG, "Reader thread has being closed");
} }
private List<Update> getUpdatesFromServer() throws InterruptedException, Exception { private List<Update> getUpdatesFromServer() throws IOException {
GetUpdates request = new GetUpdates() GetUpdates request = new GetUpdates()
.setLimit(100) .setLimit(100)
.setTimeout(ApiConstants.GETUPDATES_TIMEOUT) .setTimeout(ApiConstants.GETUPDATES_TIMEOUT)
@ -248,8 +254,8 @@ public class DefaultBotSession implements BotSession {
if (response.getStatusLine().getStatusCode() >= 500) { if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent); BotLogger.warn(LOGTAG, responseContent);
synchronized (this) { synchronized (lock) {
this.wait(500); lock.wait(500);
} }
} else { } else {
try { try {
@ -260,14 +266,13 @@ public class DefaultBotSession implements BotSession {
BotLogger.severe(responseContent, LOGTAG, e); BotLogger.severe(responseContent, LOGTAG, e);
} }
} }
} catch (SocketException e) { } catch (SocketException | InvalidObjectException | TelegramApiRequestException e) {
if (!e.getMessage().equals("Socket Closed")) { BotLogger.severe(LOGTAG, e);
BotLogger.severe(LOGTAG, e);
}
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e); BotLogger.fine(LOGTAG, e);
} catch (InvalidObjectException | TelegramApiRequestException e) { } catch (InterruptedException e) {
BotLogger.severe(LOGTAG, e); BotLogger.fine(LOGTAG, e);
interrupt();
} }
return Collections.emptyList(); return Collections.emptyList();
} }
@ -275,7 +280,7 @@ public class DefaultBotSession implements BotSession {
public interface UpdatesSupplier { public interface UpdatesSupplier {
List<Update> getUpdates() throws InterruptedException, Exception; List<Update> getUpdates() throws Exception;
} }
private List<Update> getUpdateList() { private List<Update> getUpdateList() {
@ -306,6 +311,7 @@ public class DefaultBotSession implements BotSession {
callback.onUpdatesReceived(updates); callback.onUpdatesReceived(updates);
} catch (InterruptedException e) { } catch (InterruptedException e) {
BotLogger.debug(LOGTAG, e); BotLogger.debug(LOGTAG, e);
interrupt();
} catch (Exception e) { } catch (Exception e) {
BotLogger.severe(LOGTAG, e); BotLogger.severe(LOGTAG, e);
} }