Merge pull request #284 from vantuz-subhuman/#269

#269 Fixed update queue polling order and added `TelegramBatchLongPollingBot`
This commit is contained in:
Ruben Bermudez 2017-10-06 18:23:10 +02:00 committed by GitHub
commit 9317c6f5a9
4 changed files with 214 additions and 63 deletions

View File

@ -3,6 +3,8 @@ package org.telegram.telegrambots.generics;
import org.telegram.telegrambots.api.objects.Update;
import org.telegram.telegrambots.exceptions.TelegramApiRequestException;
import java.util.List;
/**
* @author Ruben Bermudez
* @version 1.0
@ -16,6 +18,15 @@ public interface LongPollingBot {
*/
void onUpdateReceived(Update update);
/**
* This method is called when receiving updates via GetUpdates method.
* If not reimplemented - it just sends updates by one into {@link #onUpdateReceived(Update)}
* @param updates list of Update received
*/
default void onUpdatesReceived(List<Update> updates) {
updates.forEach(this::onUpdateReceived);
}
/**
* Return bot username of this bot
*/

View File

@ -27,7 +27,7 @@ import java.io.InvalidObjectException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
@ -52,6 +52,7 @@ public class DefaultBotSession implements BotSession {
private String token;
private int lastReceivedUpdate = 0;
private DefaultBotOptions options;
private UpdatesSupplier updatesSupplier;
@Inject
public DefaultBotSession() {
@ -67,7 +68,7 @@ public class DefaultBotSession implements BotSession {
lastReceivedUpdate = 0;
readerThread = new ReaderThread();
readerThread = new ReaderThread(updatesSupplier);
readerThread.setName(callback.getBotUsername() + " Telegram Connection");
readerThread.start();
@ -97,6 +98,10 @@ public class DefaultBotSession implements BotSession {
}
}
public void setUpdatesSupplier(UpdatesSupplier updatesSupplier) {
this.updatesSupplier = updatesSupplier;
}
@Override
public void setOptions(BotOptions options) {
if (this.options != null) {
@ -127,10 +132,16 @@ public class DefaultBotSession implements BotSession {
}
private class ReaderThread extends Thread implements UpdatesReader {
private final UpdatesSupplier updatesSupplier;
private CloseableHttpClient httpclient;
private ExponentialBackOff exponentialBackOff;
private RequestConfig requestConfig;
public ReaderThread(UpdatesSupplier updatesSupplier) {
this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer);
}
@Override
public synchronized void start() {
httpclient = HttpClientBuilder.create()
@ -172,62 +183,23 @@ public class DefaultBotSession implements BotSession {
setPriority(Thread.MIN_PRIORITY);
while (running) {
try {
GetUpdates request = new GetUpdates()
.setLimit(100)
.setTimeout(ApiConstants.GETUPDATES_TIMEOUT)
.setOffset(lastReceivedUpdate + 1);
if (options.getAllowedUpdates() != null) {
request.setAllowedUpdates(options.getAllowedUpdates());
}
String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH;
//http client
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("charset", StandardCharsets.UTF_8.name());
httpPost.setConfig(requestConfig);
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON));
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
HttpEntity ht = response.getEntity();
BufferedHttpEntity buf = new BufferedHttpEntity(ht);
String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8);
if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent);
synchronized (this) {
this.wait(500);
}
} else {
try {
List<Update> updates = request.deserializeResponse(responseContent);
exponentialBackOff.reset();
if (updates.isEmpty()) {
synchronized (this) {
this.wait(500);
}
} else {
updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
lastReceivedUpdate = updates.parallelStream()
.map(
Update::getUpdateId)
.max(Integer::compareTo)
.orElse(0);
receivedUpdates.addAll(updates);
synchronized (receivedUpdates) {
receivedUpdates.notifyAll();
}
}
} catch (JSONException e) {
BotLogger.severe(responseContent, LOGTAG, e);
}
List<Update> updates = updatesSupplier.getUpdates();
if (updates.isEmpty()) {
synchronized (this) {
this.wait(500);
}
} else {
updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
lastReceivedUpdate = updates.parallelStream()
.map(
Update::getUpdateId)
.max(Integer::compareTo)
.orElse(0);
receivedUpdates.addAll(updates);
synchronized (receivedUpdates) {
receivedUpdates.notifyAll();
}
} catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e);
} catch (InvalidObjectException | TelegramApiRequestException e) {
BotLogger.severe(LOGTAG, e);
}
} catch (InterruptedException e) {
if (!running) {
@ -250,6 +222,64 @@ public class DefaultBotSession implements BotSession {
}
BotLogger.debug(LOGTAG, "Reader thread has being closed");
}
private List<Update> getUpdatesFromServer() throws InterruptedException, Exception {
GetUpdates request = new GetUpdates()
.setLimit(100)
.setTimeout(ApiConstants.GETUPDATES_TIMEOUT)
.setOffset(lastReceivedUpdate + 1);
if (options.getAllowedUpdates() != null) {
request.setAllowedUpdates(options.getAllowedUpdates());
}
String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH;
//http client
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("charset", StandardCharsets.UTF_8.name());
httpPost.setConfig(requestConfig);
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON));
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
HttpEntity ht = response.getEntity();
BufferedHttpEntity buf = new BufferedHttpEntity(ht);
String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8);
if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent);
synchronized (this) {
this.wait(500);
}
} else {
try {
List<Update> updates = request.deserializeResponse(responseContent);
exponentialBackOff.reset();
return updates;
} catch (JSONException e) {
BotLogger.severe(responseContent, LOGTAG, e);
}
}
} catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e);
} catch (InvalidObjectException | TelegramApiRequestException e) {
BotLogger.severe(LOGTAG, e);
}
return Collections.emptyList();
}
}
public interface UpdatesSupplier {
List<Update> getUpdates() throws InterruptedException, Exception;
}
private List<Update> getUpdateList() {
List<Update> updates = new ArrayList<>();
for (Iterator<Update> it = receivedUpdates.iterator(); it.hasNext();) {
updates.add(it.next());
it.remove();
}
return updates;
}
private class HandlerThread extends Thread implements UpdatesHandler {
@ -258,17 +288,17 @@ public class DefaultBotSession implements BotSession {
setPriority(Thread.MIN_PRIORITY);
while (running) {
try {
Update update = receivedUpdates.pollLast();
if (update == null) {
List<Update> updates = getUpdateList();
if (updates.isEmpty()) {
synchronized (receivedUpdates) {
receivedUpdates.wait();
update = receivedUpdates.pollLast();
if (update == null) {
updates = getUpdateList();
if (updates.isEmpty()) {
continue;
}
}
}
callback.onUpdateReceived(update);
callback.onUpdatesReceived(updates);
} catch (InterruptedException e) {
BotLogger.debug(LOGTAG, e);
} catch (Exception e) {

View File

@ -0,0 +1,24 @@
package org.telegram.telegrambots.test;
import org.junit.Test;
import org.mockito.Mockito;
import org.telegram.telegrambots.api.objects.Update;
import org.telegram.telegrambots.bots.TelegramLongPollingBot;
import static java.util.Arrays.asList;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
public class TelegramLongPollingBotTest {
@Test
public void testOnUpdateReceived() throws Exception {
TelegramLongPollingBot bot = Mockito.mock(TelegramLongPollingBot.class);
Mockito.doCallRealMethod().when(bot).onUpdatesReceived(any());
Update update1 = new Update();
Update update2 = new Update();
bot.onUpdatesReceived(asList(update1, update2));
Mockito.verify(bot).onUpdateReceived(update1);
Mockito.verify(bot).onUpdateReceived(update2);
}
}

View File

@ -11,12 +11,20 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.telegram.telegrambots.api.objects.Update;
import org.telegram.telegrambots.bots.DefaultBotOptions;
import org.telegram.telegrambots.generics.LongPollingBot;
import org.telegram.telegrambots.test.Fakes.FakeLongPollingBot;
import org.telegram.telegrambots.updatesreceivers.DefaultBotSession;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/**
* @author Ruben Bermudez
@ -79,7 +87,85 @@ public class TestDefaultBotSession {
session.stop();
}
@Test
public void testUpdates() throws Exception {
LongPollingBot bot = Mockito.spy(new FakeLongPollingBot());
session = getDefaultBotSession(bot);
AtomicInteger flag = new AtomicInteger();
Update[] updates = createFakeUpdates(9);
session.setUpdatesSupplier(createFakeUpdatesSupplier(flag, updates));
session.start();
Thread.sleep(1000);
Mockito.verify(bot, Mockito.never()).onUpdateReceived(Matchers.any());
flag.compareAndSet(0, 1);
Thread.sleep(1000);
Mockito.verify(bot).onUpdateReceived(updates[0]);
Mockito.verify(bot).onUpdateReceived(updates[1]);
flag.compareAndSet(2, 3);
Thread.sleep(1000);
Mockito.verify(bot).onUpdateReceived(updates[2]);
Mockito.verify(bot).onUpdateReceived(updates[3]);
Mockito.verify(bot).onUpdateReceived(updates[4]);
flag.compareAndSet(4, 5);
Thread.sleep(1000);
Mockito.verify(bot).onUpdateReceived(updates[5]);
Mockito.verify(bot).onUpdateReceived(updates[6]);
Mockito.verify(bot).onUpdateReceived(updates[7]);
Mockito.verify(bot).onUpdateReceived(updates[8]);
session.stop();
}
@Test
public void testBatchUpdates() throws Exception {
LongPollingBot bot = Mockito.spy(new FakeLongPollingBot());
session = getDefaultBotSession(bot);
AtomicInteger flag = new AtomicInteger();
Update[] updates = createFakeUpdates(9);
session.setUpdatesSupplier(createFakeUpdatesSupplier(flag, updates));
session.start();
Thread.sleep(1000);
Mockito.verify(bot, Mockito.never()).onUpdateReceived(Matchers.any());
flag.compareAndSet(0, 1);
Thread.sleep(1000);
Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[0], updates[1]));
flag.compareAndSet(2, 3);
Thread.sleep(1000);
Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[2], updates[3], updates[4]));
flag.compareAndSet(4, 5);
Thread.sleep(1000);
Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[5], updates[6], updates[7], updates[8]));
session.stop();
}
private Update[] createFakeUpdates(int count) {
return IntStream.range(0, count).mapToObj(x -> {
Update mock = Mockito.mock(Update.class);
Mockito.when(mock.getUpdateId()).thenReturn(x);
return mock;
}).toArray(Update[]::new);
}
private DefaultBotSession.UpdatesSupplier createFakeUpdatesSupplier(AtomicInteger flag, Update[] updates) {
return new DefaultBotSession.UpdatesSupplier() {
@Override
public List<Update> getUpdates() throws InterruptedException, Exception {
if (flag.compareAndSet(1, 2)) {
return Arrays.asList(updates[0], updates[1]);
} else if (flag.compareAndSet(3, 4)) {
return Arrays.asList(updates[2], updates[3], updates[4]);
} else if (flag.compareAndSet(5, 6)) {
return Arrays.asList(updates[5], updates[6], updates[7], updates[8]);
}
return Collections.emptyList();
}
};
}
private DefaultBotSession getDefaultBotSession() throws IOException {
return getDefaultBotSession(new FakeLongPollingBot());
}
private DefaultBotSession getDefaultBotSession(LongPollingBot bot) throws IOException {
HttpResponse response = new BasicHttpResponse(new BasicStatusLine(
new ProtocolVersion("HTTP", 1, 1), 200, ""));
response.setStatusCode(200);
@ -89,7 +175,7 @@ public class TestDefaultBotSession {
Mockito.when(mockHttpClient.execute(Mockito.any(HttpPost.class)))
.thenReturn(response);
DefaultBotSession session = new DefaultBotSession();
session.setCallback(new FakeLongPollingBot());
session.setCallback(bot);
session.setOptions(new DefaultBotOptions());
return session;
}