#269 Extracted UpdatesSupplier insede DefaultBotSession and created tests

This commit is contained in:
vsubhuman 2017-08-03 13:29:02 +03:00
parent fefe0cff05
commit 0fb556518a
3 changed files with 201 additions and 60 deletions

View File

@ -28,9 +28,7 @@ import java.io.InvalidObjectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException; import java.security.InvalidParameterException;
import java.util.ArrayList; import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -55,6 +53,7 @@ public class DefaultBotSession implements BotSession {
private String token; private String token;
private int lastReceivedUpdate = 0; private int lastReceivedUpdate = 0;
private DefaultBotOptions options; private DefaultBotOptions options;
private UpdatesSupplier updatesSupplier;
@Inject @Inject
public DefaultBotSession() { public DefaultBotSession() {
@ -70,7 +69,7 @@ public class DefaultBotSession implements BotSession {
lastReceivedUpdate = 0; lastReceivedUpdate = 0;
readerThread = new ReaderThread(); readerThread = new ReaderThread(updatesSupplier);
readerThread.setName(callback.getBotUsername() + " Telegram Connection"); readerThread.setName(callback.getBotUsername() + " Telegram Connection");
readerThread.start(); readerThread.start();
@ -100,6 +99,10 @@ public class DefaultBotSession implements BotSession {
} }
} }
public void setUpdatesSupplier(UpdatesSupplier updatesSupplier) {
this.updatesSupplier = updatesSupplier;
}
@Override @Override
public void setOptions(BotOptions options) { public void setOptions(BotOptions options) {
if (this.options != null) { if (this.options != null) {
@ -130,10 +133,16 @@ public class DefaultBotSession implements BotSession {
} }
private class ReaderThread extends Thread implements UpdatesReader { private class ReaderThread extends Thread implements UpdatesReader {
private final UpdatesSupplier updatesSupplier;
private CloseableHttpClient httpclient; private CloseableHttpClient httpclient;
private ExponentialBackOff exponentialBackOff; private ExponentialBackOff exponentialBackOff;
private RequestConfig requestConfig; private RequestConfig requestConfig;
public ReaderThread(UpdatesSupplier updatesSupplier) {
this.updatesSupplier = Optional.ofNullable(updatesSupplier).orElse(this::getUpdatesFromServer);
}
@Override @Override
public synchronized void start() { public synchronized void start() {
httpclient = HttpClientBuilder.create() httpclient = HttpClientBuilder.create()
@ -175,62 +184,23 @@ public class DefaultBotSession implements BotSession {
setPriority(Thread.MIN_PRIORITY); setPriority(Thread.MIN_PRIORITY);
while (running) { while (running) {
try { try {
GetUpdates request = new GetUpdates() List<Update> updates = updatesSupplier.getUpdates();
.setLimit(100) if (updates.isEmpty()) {
.setTimeout(ApiConstants.GETUPDATES_TIMEOUT) synchronized (this) {
.setOffset(lastReceivedUpdate + 1); this.wait(500);
}
if (options.getAllowedUpdates() != null) { } else {
request.setAllowedUpdates(options.getAllowedUpdates()); updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
} lastReceivedUpdate = updates.parallelStream()
.map(
String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH; Update::getUpdateId)
//http client .max(Integer::compareTo)
HttpPost httpPost = new HttpPost(url); .orElse(0);
httpPost.addHeader("charset", StandardCharsets.UTF_8.name()); receivedUpdates.addAll(updates);
httpPost.setConfig(requestConfig);
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON)); synchronized (receivedUpdates) {
receivedUpdates.notifyAll();
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
HttpEntity ht = response.getEntity();
BufferedHttpEntity buf = new BufferedHttpEntity(ht);
String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8);
if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent);
synchronized (this) {
this.wait(500);
}
} else {
try {
List<Update> updates = request.deserializeResponse(responseContent);
exponentialBackOff.reset();
if (updates.isEmpty()) {
synchronized (this) {
this.wait(500);
}
} else {
updates.removeIf(x -> x.getUpdateId() < lastReceivedUpdate);
lastReceivedUpdate = updates.parallelStream()
.map(
Update::getUpdateId)
.max(Integer::compareTo)
.orElse(0);
receivedUpdates.addAll(updates);
synchronized (receivedUpdates) {
receivedUpdates.notifyAll();
}
}
} catch (JSONException e) {
BotLogger.severe(responseContent, LOGTAG, e);
}
} }
} catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e);
} catch (InvalidObjectException | TelegramApiRequestException e) {
BotLogger.severe(LOGTAG, e);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (!running) { if (!running) {
@ -253,6 +223,55 @@ public class DefaultBotSession implements BotSession {
} }
BotLogger.debug(LOGTAG, "Reader thread has being closed"); BotLogger.debug(LOGTAG, "Reader thread has being closed");
} }
private List<Update> getUpdatesFromServer() throws InterruptedException, Exception {
GetUpdates request = new GetUpdates()
.setLimit(100)
.setTimeout(ApiConstants.GETUPDATES_TIMEOUT)
.setOffset(lastReceivedUpdate + 1);
if (options.getAllowedUpdates() != null) {
request.setAllowedUpdates(options.getAllowedUpdates());
}
String url = options.getBaseUrl() + token + "/" + GetUpdates.PATH;
//http client
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("charset", StandardCharsets.UTF_8.name());
httpPost.setConfig(requestConfig);
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(request), ContentType.APPLICATION_JSON));
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
HttpEntity ht = response.getEntity();
BufferedHttpEntity buf = new BufferedHttpEntity(ht);
String responseContent = EntityUtils.toString(buf, StandardCharsets.UTF_8);
if (response.getStatusLine().getStatusCode() >= 500) {
BotLogger.warn(LOGTAG, responseContent);
synchronized (this) {
this.wait(500);
}
} else {
try {
List<Update> updates = request.deserializeResponse(responseContent);
exponentialBackOff.reset();
return updates;
} catch (JSONException e) {
BotLogger.severe(responseContent, LOGTAG, e);
}
}
} catch (SocketTimeoutException e) {
BotLogger.fine(LOGTAG, e);
} catch (InvalidObjectException | TelegramApiRequestException e) {
BotLogger.severe(LOGTAG, e);
}
return Collections.emptyList();
}
}
public interface UpdatesSupplier {
List<Update> getUpdates() throws InterruptedException, Exception;
} }
private List<Update> getUpdateList() { private List<Update> getUpdateList() {

View File

@ -0,0 +1,34 @@
package org.telegram.telegrambots.test.Fakes;
import org.telegram.telegrambots.api.objects.Update;
import org.telegram.telegrambots.bots.DefaultBotOptions;
import org.telegram.telegrambots.bots.TelegramBatchLongPollingBot;
import org.telegram.telegrambots.exceptions.TelegramApiRequestException;
import org.telegram.telegrambots.generics.BotOptions;
import org.telegram.telegrambots.generics.LongPollingBot;
import java.util.List;
public class FakeBatchLongPollingBot extends TelegramBatchLongPollingBot
{
@Override
public void onUpdatesReceived(List<Update> updates) {
}
@Override
public String getBotUsername() {
return "";
}
@Override
public String getBotToken() {
return "";
}
@Override
public void clearWebhook() throws TelegramApiRequestException {
}
}

View File

@ -11,12 +11,22 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.telegram.telegrambots.api.objects.Update;
import org.telegram.telegrambots.bots.DefaultBotOptions; import org.telegram.telegrambots.bots.DefaultBotOptions;
import org.telegram.telegrambots.bots.TelegramBatchLongPollingBot;
import org.telegram.telegrambots.generics.LongPollingBot;
import org.telegram.telegrambots.test.Fakes.FakeBatchLongPollingBot;
import org.telegram.telegrambots.test.Fakes.FakeLongPollingBot; import org.telegram.telegrambots.test.Fakes.FakeLongPollingBot;
import org.telegram.telegrambots.updatesreceivers.DefaultBotSession; import org.telegram.telegrambots.updatesreceivers.DefaultBotSession;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/** /**
* @author Ruben Bermudez * @author Ruben Bermudez
@ -79,7 +89,85 @@ public class TestDefaultBotSession {
session.stop(); session.stop();
} }
@Test
public void testUpdatesForStandardLongPollingBot() throws Exception {
LongPollingBot bot = Mockito.spy(new FakeLongPollingBot());
session = getDefaultBotSession(bot);
AtomicInteger flag = new AtomicInteger();
Update[] updates = createFakeUpdates(9);
session.setUpdatesSupplier(createFakeUpdatesSupplier(flag, updates));
session.start();
Thread.sleep(1000);
Mockito.verify(bot, Mockito.never()).onUpdateReceived(Matchers.any());
flag.compareAndSet(0, 1);
Thread.sleep(1000);
Mockito.verify(bot).onUpdateReceived(updates[0]);
Mockito.verify(bot).onUpdateReceived(updates[1]);
flag.compareAndSet(2, 3);
Thread.sleep(1000);
Mockito.verify(bot).onUpdateReceived(updates[2]);
Mockito.verify(bot).onUpdateReceived(updates[3]);
Mockito.verify(bot).onUpdateReceived(updates[4]);
flag.compareAndSet(4, 5);
Thread.sleep(1000);
Mockito.verify(bot).onUpdateReceived(updates[5]);
Mockito.verify(bot).onUpdateReceived(updates[6]);
Mockito.verify(bot).onUpdateReceived(updates[7]);
Mockito.verify(bot).onUpdateReceived(updates[8]);
session.stop();
}
@Test
public void testUpdatesForBatchLongPollingBot() throws Exception {
TelegramBatchLongPollingBot bot = Mockito.spy(new FakeBatchLongPollingBot());
session = getDefaultBotSession(bot);
AtomicInteger flag = new AtomicInteger();
Update[] updates = createFakeUpdates(9);
session.setUpdatesSupplier(createFakeUpdatesSupplier(flag, updates));
session.start();
Thread.sleep(1000);
Mockito.verify(bot, Mockito.never()).onUpdateReceived(Matchers.any());
flag.compareAndSet(0, 1);
Thread.sleep(1000);
Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[0], updates[1]));
flag.compareAndSet(2, 3);
Thread.sleep(1000);
Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[2], updates[3], updates[4]));
flag.compareAndSet(4, 5);
Thread.sleep(1000);
Mockito.verify(bot).onUpdatesReceived(Arrays.asList(updates[5], updates[6], updates[7], updates[8]));
session.stop();
}
private Update[] createFakeUpdates(int count) {
return IntStream.range(0, count).mapToObj(x -> {
Update mock = Mockito.mock(Update.class);
Mockito.when(mock.getUpdateId()).thenReturn(x);
return mock;
}).toArray(Update[]::new);
}
private DefaultBotSession.UpdatesSupplier createFakeUpdatesSupplier(AtomicInteger flag, Update[] updates) {
return new DefaultBotSession.UpdatesSupplier() {
@Override
public List<Update> getUpdates() throws InterruptedException, Exception {
if (flag.compareAndSet(1, 2)) {
return Arrays.asList(updates[0], updates[1]);
} else if (flag.compareAndSet(3, 4)) {
return Arrays.asList(updates[2], updates[3], updates[4]);
} else if (flag.compareAndSet(5, 6)) {
return Arrays.asList(updates[5], updates[6], updates[7], updates[8]);
}
return Collections.emptyList();
}
};
}
private DefaultBotSession getDefaultBotSession() throws IOException { private DefaultBotSession getDefaultBotSession() throws IOException {
return getDefaultBotSession(new FakeLongPollingBot());
}
private DefaultBotSession getDefaultBotSession(LongPollingBot bot) throws IOException {
HttpResponse response = new BasicHttpResponse(new BasicStatusLine( HttpResponse response = new BasicHttpResponse(new BasicStatusLine(
new ProtocolVersion("HTTP", 1, 1), 200, "")); new ProtocolVersion("HTTP", 1, 1), 200, ""));
response.setStatusCode(200); response.setStatusCode(200);
@ -89,7 +177,7 @@ public class TestDefaultBotSession {
Mockito.when(mockHttpClient.execute(Mockito.any(HttpPost.class))) Mockito.when(mockHttpClient.execute(Mockito.any(HttpPost.class)))
.thenReturn(response); .thenReturn(response);
DefaultBotSession session = new DefaultBotSession(); DefaultBotSession session = new DefaultBotSession();
session.setCallback(new FakeLongPollingBot()); session.setCallback(bot);
session.setOptions(new DefaultBotOptions()); session.setOptions(new DefaultBotOptions());
return session; return session;
} }