diff --git a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java index c12c0eb1..d968329c 100644 --- a/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java +++ b/telegrambots/src/main/java/org/telegram/telegrambots/updatesreceivers/DefaultBotSession.java @@ -28,9 +28,7 @@ import java.io.InvalidObjectException; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; @@ -55,6 +53,7 @@ public class DefaultBotSession implements BotSession { private String token; private int lastReceivedUpdate = 0; private DefaultBotOptions options; + private UpdatesSupplier updatesSupplier; @Inject public DefaultBotSession() { @@ -70,7 +69,7 @@ public class DefaultBotSession implements BotSession { lastReceivedUpdate = 0; - readerThread = new ReaderThread(); + readerThread = new ReaderThread(updatesSupplier); readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.start(); @@ -100,6 +99,10 @@ public class DefaultBotSession implements BotSession { } } + public void setUpdatesSupplier(UpdatesSupplier updatesSupplier) { + this.updatesSupplier = updatesSupplier; + } + @Override public void setOptions(BotOptions options) { if (this.options != null) { @@ -130,10 +133,16 @@ public class DefaultBotSession implements BotSession { } private class ReaderThread extends Thread implements UpdatesReader { + + private final UpdatesSupplier updatesSupplier; private CloseableHttpClient httpclient; private ExponentialBackOff exponentialBackOff; private RequestConfig requestConfig; + public ReaderThread(UpdatesSupplier updatesSupplier) { + this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer); + } + @Override public synchronized void start() { httpclient = HttpClientBuilder.create() @@ -175,62 +184,23 @@ public class DefaultBotSession implements BotSession { setPriority(Thread.MIN_PRIORITY); while (running) { try { - GetUpdates request = new GetUpdates() - .setLimit(100) - .setTimeout(ApiConstants.GETUPDATES_TIMEOUT) - .setOffset(lastReceivedUpdate + 1); - - if (options.getAllowedUpdates() != null) { - request.setAllowedUpdates(options.getAllowedUpdates()); - } - - String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH; - //http client - HttpPost httpPost = new HttpPost(url); - httpPost.addHeader("charset", StandardCharsets.UTF_8.name()); - httpPost.setConfig(requestConfig); - httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON)); - - try (CloseableHttpResponse response = httpclient.execute(httpPost)) { - HttpEntity ht = response.getEntity(); - BufferedHttpEntity buf = new BufferedHttpEntity(ht); - String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8); - - if (response.getStatusLine().getStatusCode() >= 500) { - BotLogger.warn(LOGTAG, responseContent); - synchronized (this) { - this.wait(500); - } - } else { - try { - List updates = request.deserializeResponse(responseContent); - exponentialBackOff.reset(); - - if (updates.isEmpty()) { - synchronized (this) { - this.wait(500); - } - } else { - updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); - lastReceivedUpdate = updates.parallelStream() - .map( - Update::getUpdateId) - .max(Integer::compareTo) - .orElse(0); - receivedUpdates.addAll(updates); - - synchronized (receivedUpdates) { - receivedUpdates.notifyAll(); - } - } - } catch (JSONException e) { - BotLogger.severe(responseContent, LOGTAG, e); - } + List updates = updatesSupplier.getUpdates(); + if (updates.isEmpty()) { + synchronized (this) { + this.wait(500); + } + } else { + updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate); + lastReceivedUpdate = updates.parallelStream() + .map( + Update::getUpdateId) + .max(Integer::compareTo) + .orElse(0); + receivedUpdates.addAll(updates); + + synchronized (receivedUpdates) { + receivedUpdates.notifyAll(); } - } catch (SocketTimeoutException e) { - BotLogger.fine(LOGTAG, e); - } catch (InvalidObjectException | TelegramApiRequestException e) { - BotLogger.severe(LOGTAG, e); } } catch (InterruptedException e) { if (!running) { @@ -253,6 +223,55 @@ public class DefaultBotSession implements BotSession { } BotLogger.debug(LOGTAG, "Reader thread has being closed"); } + + private List getUpdatesFromServer() throws InterruptedException, Exception { + GetUpdates request = new GetUpdates() + .setLimit(100) + .setTimeout(ApiConstants.GETUPDATES_TIMEOUT) + .setOffset(lastReceivedUpdate + 1); + + if (options.getAllowedUpdates() != null) { + request.setAllowedUpdates(options.getAllowedUpdates()); + } + + String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH; + //http client + HttpPost httpPost = new HttpPost(url); + httpPost.addHeader("charset", StandardCharsets.UTF_8.name()); + httpPost.setConfig(requestConfig); + httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON)); + + try (CloseableHttpResponse response = httpclient.execute(httpPost)) { + HttpEntity ht = response.getEntity(); + BufferedHttpEntity buf = new BufferedHttpEntity(ht); + String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8); + + if (response.getStatusLine().getStatusCode() >= 500) { + BotLogger.warn(LOGTAG, responseContent); + synchronized (this) { + this.wait(500); + } + } else { + try { + List updates = request.deserializeResponse(responseContent); + exponentialBackOff.reset(); + return updates; + } catch (JSONException e) { + BotLogger.severe(responseContent, LOGTAG, e); + } + } + } catch (SocketTimeoutException e) { + BotLogger.fine(LOGTAG, e); + } catch (InvalidObjectException | TelegramApiRequestException e) { + BotLogger.severe(LOGTAG, e); + } + return Collections.emptyList(); + } + } + + public interface UpdatesSupplier { + + List getUpdates() throws InterruptedException, Exception; } private List getUpdateList() { diff --git a/telegrambots/src/test/java/org/telegram/telegrambots/test/Fakes/FakeBatchLongPollingBot.java b/telegrambots/src/test/java/org/telegram/telegrambots/test/Fakes/FakeBatchLongPollingBot.java new file mode 100644 index 00000000..46f89377 --- /dev/null +++ b/telegrambots/src/test/java/org/telegram/telegrambots/test/Fakes/FakeBatchLongPollingBot.java @@ -0,0 +1,34 @@ +package org.telegram.telegrambots.test.Fakes; + +import org.telegram.telegrambots.api.objects.Update; +import org.telegram.telegrambots.bots.DefaultBotOptions; +import org.telegram.telegrambots.bots.TelegramBatchLongPollingBot; +import org.telegram.telegrambots.exceptions.TelegramApiRequestException; +import org.telegram.telegrambots.generics.BotOptions; +import org.telegram.telegrambots.generics.LongPollingBot; + +import java.util.List; + +public class FakeBatchLongPollingBot extends TelegramBatchLongPollingBot +{ + + @Override + public void onUpdatesReceived(List updates) { + + } + + @Override + public String getBotUsername() { + return ""; + } + + @Override + public String getBotToken() { + return ""; + } + + @Override + public void clearWebhook() throws TelegramApiRequestException { + + } +} diff --git a/telegrambots/src/test/java/org/telegram/telegrambots/test/TestDefaultBotSession.java b/telegrambots/src/test/java/org/telegram/telegrambots/test/TestDefaultBotSession.java index c37fd5a6..39c6a83f 100644 --- a/telegrambots/src/test/java/org/telegram/telegrambots/test/TestDefaultBotSession.java +++ b/telegrambots/src/test/java/org/telegram/telegrambots/test/TestDefaultBotSession.java @@ -11,12 +11,22 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; import org.mockito.Mockito; +import org.telegram.telegrambots.api.objects.Update; import org.telegram.telegrambots.bots.DefaultBotOptions; +import org.telegram.telegrambots.bots.TelegramBatchLongPollingBot; +import org.telegram.telegrambots.generics.LongPollingBot; +import org.telegram.telegrambots.test.Fakes.FakeBatchLongPollingBot; import org.telegram.telegrambots.test.Fakes.FakeLongPollingBot; import org.telegram.telegrambots.updatesreceivers.DefaultBotSession; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; /** * @author Ruben Bermudez @@ -79,7 +89,85 @@ public class TestDefaultBotSession { session.stop(); } + @Test + public void testUpdatesForStandardLongPollingBot() throws Exception { + LongPollingBot bot = Mockito.spy(new FakeLongPollingBot()); + session = getDefaultBotSession(bot); + AtomicInteger flag = new AtomicInteger(); + Update[] updates = createFakeUpdates(9); + session.setUpdatesSupplier(createFakeUpdatesSupplier(flag, updates)); + session.start(); + Thread.sleep(1000); + Mockito.verify(bot, Mockito.never()).onUpdateReceived(Matchers.any()); + flag.compareAndSet(0, 1); + Thread.sleep(1000); + Mockito.verify(bot).onUpdateReceived(updates[0]); + Mockito.verify(bot).onUpdateReceived(updates[1]); + flag.compareAndSet(2, 3); + Thread.sleep(1000); + Mockito.verify(bot).onUpdateReceived(updates[2]); + Mockito.verify(bot).onUpdateReceived(updates[3]); + Mockito.verify(bot).onUpdateReceived(updates[4]); + flag.compareAndSet(4, 5); + Thread.sleep(1000); + Mockito.verify(bot).onUpdateReceived(updates[5]); + Mockito.verify(bot).onUpdateReceived(updates[6]); + Mockito.verify(bot).onUpdateReceived(updates[7]); + Mockito.verify(bot).onUpdateReceived(updates[8]); + session.stop(); + } + + @Test + public void testUpdatesForBatchLongPollingBot() throws Exception { + TelegramBatchLongPollingBot bot = Mockito.spy(new FakeBatchLongPollingBot()); + session = getDefaultBotSession(bot); + AtomicInteger flag = new AtomicInteger(); + Update[] updates = createFakeUpdates(9); + session.setUpdatesSupplier(createFakeUpdatesSupplier(flag, updates)); + session.start(); + Thread.sleep(1000); + Mockito.verify(bot, Mockito.never()).onUpdateReceived(Matchers.any()); + flag.compareAndSet(0, 1); + Thread.sleep(1000); + Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[0], updates[1])); + flag.compareAndSet(2, 3); + Thread.sleep(1000); + Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[2], updates[3], updates[4])); + flag.compareAndSet(4, 5); + Thread.sleep(1000); + Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[5], updates[6], updates[7], updates[8])); + session.stop(); + } + + private Update[] createFakeUpdates(int count) { + return IntStream.range(0, count).mapToObj(x -> { + Update mock = Mockito.mock(Update.class); + Mockito.when(mock.getUpdateId()).thenReturn(x); + return mock; + }).toArray(Update[]::new); + } + + private DefaultBotSession.UpdatesSupplier createFakeUpdatesSupplier(AtomicInteger flag, Update[] updates) { + return new DefaultBotSession.UpdatesSupplier() { + @Override + public List getUpdates() throws InterruptedException, Exception { + if (flag.compareAndSet(1, 2)) { + return Arrays.asList(updates[0], updates[1]); + } else if (flag.compareAndSet(3, 4)) { + return Arrays.asList(updates[2], updates[3], updates[4]); + } else if (flag.compareAndSet(5, 6)) { + return Arrays.asList(updates[5], updates[6], updates[7], updates[8]); + } + return Collections.emptyList(); + } + }; + } + private DefaultBotSession getDefaultBotSession() throws IOException { + return getDefaultBotSession(new FakeLongPollingBot()); + } + + private DefaultBotSession getDefaultBotSession(LongPollingBot bot) throws IOException { HttpResponse response = new BasicHttpResponse(new BasicStatusLine( new ProtocolVersion("HTTP", 1, 1), 200, "")); response.setStatusCode(200); @@ -89,7 +177,7 @@ public class TestDefaultBotSession { Mockito.when(mockHttpClient.execute(Mockito.any(HttpPost.class))) .thenReturn(response); DefaultBotSession session = new DefaultBotSession(); - session.setCallback(new FakeLongPollingBot()); + session.setCallback(bot); session.setOptions(new DefaultBotOptions()); return session; }