Clean up some thread usage, seems to close #168.

This commit is contained in:
Niels Ulrik Andersen 2017-11-19 12:41:38 +01:00
parent 49c0fef3c5
commit 4cf29bd75f

View File

@ -69,7 +69,7 @@ public class DefaultBotSession implements BotSession {
lastReceivedUpdate = 0; lastReceivedUpdate = 0;
readerThread = new ReaderThread(updatesSupplier); readerThread = new ReaderThread(updatesSupplier, this);
readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.setName(callback.getBotUsername() + " Telegram Connection");
readerThread.start(); readerThread.start();
@ -135,12 +135,14 @@ public class DefaultBotSession implements BotSession {
private class ReaderThread extends Thread implements UpdatesReader { private class ReaderThread extends Thread implements UpdatesReader {
private final UpdatesSupplier updatesSupplier; private final UpdatesSupplier updatesSupplier;
private final Object lock;
private CloseableHttpClient httpclient; private CloseableHttpClient httpclient;
private ExponentialBackOff exponentialBackOff; private ExponentialBackOff exponentialBackOff;
private RequestConfig requestConfig; private RequestConfig requestConfig;
public ReaderThread(UpdatesSupplier updatesSupplier) { public ReaderThread(UpdatesSupplier updatesSupplier, Object lock) {
this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer); this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer);
this.lock = lock;
} }
@Override @Override
@ -183,42 +185,46 @@ public class DefaultBotSession implements BotSession {
public void run() { public void run() {
setPriority(Thread.MIN_PRIORITY); setPriority(Thread.MIN_PRIORITY);
while (running) { while (running) {
try { synchronized (lock) {
List<Update> updates = updatesSupplier.getUpdates(); if (running) {
if (updates.isEmpty()) { try {
synchronized (this) { List<Update> updates = updatesSupplier.getUpdates();
this.wait(500); if (updates.isEmpty()) {
} if (running) {
} else { lock.wait(500);
updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); }
lastReceivedUpdate = updates.parallelStream() } else {
.map( updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
Update::getUpdateId) lastReceivedUpdate = updates.parallelStream()
.max(Integer::compareTo) .map(
.orElse(0); Update::getUpdateId)
receivedUpdates.addAll(updates); .max(Integer::compareTo)
.orElse(0);
receivedUpdates.addAll(updates);
synchronized (receivedUpdates) { synchronized (receivedUpdates) {
receivedUpdates.notifyAll(); receivedUpdates.notifyAll();
}
}
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
} catch (Exception global) {
BotLogger.severe(LOGTAG, global);
try {
synchronized (lock) {
lock.wait(exponentialBackOff.nextBackOffMillis());
}
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
}
} }
} }
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
} catch (Exception global) {
BotLogger.severe(LOGTAG, global);
try {
synchronized (this) {
this.wait(exponentialBackOff.nextBackOffMillis());
}
} catch (InterruptedException e) {
if (!running) {
receivedUpdates.clear();
}
BotLogger.debug(LOGTAG, e);
}
} }
} }
BotLogger.debug(LOGTAG, "Reader thread has being closed"); BotLogger.debug(LOGTAG, "Reader thread has being closed");
@ -248,8 +254,8 @@ public class DefaultBotSession implements BotSession {
if (response.getStatusLine().getStatusCode() >= 500) { if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent); BotLogger.warn(LOGTAG, responseContent);
synchronized (this) { synchronized (lock) {
this.wait(500); lock.wait(500);
} }
} else { } else {
try { try {
@ -261,9 +267,7 @@ public class DefaultBotSession implements BotSession {
} }
} }
} catch (SocketException e) { } catch (SocketException e) {
if (!e.getMessage().equals("Socket Closed")) { BotLogger.severe(LOGTAG, e);
BotLogger.severe(LOGTAG, e);
}
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e); BotLogger.fine(LOGTAG, e);
} catch (InvalidObjectException | TelegramApiRequestException e) { } catch (InvalidObjectException | TelegramApiRequestException e) {