diff --git a/telegrambots-extensions/src/main/java/org/telegram/telegrambots/extensions/bots/timedbot/TimedSendLongPollingBot.java b/telegrambots-extensions/src/main/java/org/telegram/telegrambots/extensions/bots/timedbot/TimedSendLongPollingBot.java index 00a240a3..97ee169c 100644 --- a/telegrambots-extensions/src/main/java/org/telegram/telegrambots/extensions/bots/timedbot/TimedSendLongPollingBot.java +++ b/telegrambots-extensions/src/main/java/org/telegram/telegrambots/extensions/bots/timedbot/TimedSendLongPollingBot.java @@ -11,9 +11,9 @@ import java.util.concurrent.atomic.AtomicBoolean; * Created by Daniil Nikanov aka JetCoder */ -@SuppressWarnings("unused") public abstract class TimedSendLongPollingBot extends TelegramLongPollingBot { + private static final long MANY_CHATS_SEND_INTERVAL = 33; private static final long ONE_CHAT_SEND_INTERVAL = 1000; private static final long CHAT_INACTIVE_INTERVAL = 1000 * 60 * 10; private final Timer mSendTimer = new Timer(true); @@ -21,29 +21,85 @@ public abstract class TimedSendLongPollingBot extends TelegramLongPollingBot private final ArrayList mSendQueues = new ArrayList<>(); private final AtomicBoolean mSendRequested = new AtomicBoolean(false); + private final class MessageSenderTask extends TimerTask + { + @Override + public void run() + { + //mSendRequested used for optimisation to not traverse all mMessagesMap 30 times per second all the time + if (!mSendRequested.getAndSet(false)) + return; + + long currentTime = System.currentTimeMillis(); + mSendQueues.clear(); + boolean processNext = false; + + //First step - find all chats in which already allowed to send message (passed more than 1000 ms from previuos send) + Iterator> it = mMessagesMap.entrySet().iterator(); + while (it.hasNext()) + { + MessageQueue queue = it.next().getValue(); + int state = queue.getCurrentState(currentTime); //Actual check here + if (state == MessageQueue.GET_MESSAGE) + { + mSendQueues.add(queue); + processNext = true; + } + else if (state == MessageQueue.WAIT) + { + processNext = true; + } + else if (state == MessageQueue.DELETE) + { + it.remove(); + } + } + + //If any of chats are in state WAIT or GET_MESSAGE, request another iteration + if (processNext) + mSendRequested.set(true); + + //Second step - find oldest waiting queue and peek it's message + MessageQueue sendQueue = null; + long oldestPutTime = Long.MAX_VALUE; + for (MessageQueue queue : mSendQueues) { + long putTime = queue.getPutTime(); + if (putTime < oldestPutTime) { + oldestPutTime = putTime; + sendQueue = queue; + } + } + if (sendQueue == null) //Possible if on first step wasn't found any chats in state GET_MESSAGE + return; + + //Invoke the send callback. ChatId is passed for possible additional processing + sendMessageCallback(sendQueue.getChatId(), sendQueue.getMessage(currentTime)); + } + } + private static class MessageQueue { - static final int EMPTY = 0; //Queue is empty - static final int WAIT = 1; //Queue has message(s) but not yet allowed to send - static final int DELETE = 2; //No one message of given queue was sent longer than CHAT_INACTIVE_INTERVAL, delete for optimisation - static final int GET_MESSAGE = 3; //Queue has message(s) and ready to send + public static final int EMPTY = 0; //Queue is empty + public static final int WAIT = 1; //Queue has message(s) but not yet allowed to send + public static final int DELETE = 2; //No one message of given queue was sent longer than CHAT_INACTIVE_INTERVAL, delete for optimisation + public static final int GET_MESSAGE = 3; //Queue has message(s) and ready to send private final ConcurrentLinkedQueue mQueue = new ConcurrentLinkedQueue<>(); private final Long mChatId; private long mLastSendTime; //Time of last peek from queue private volatile long mLastPutTime; //Time of last put into queue - MessageQueue(Long chatId) + public MessageQueue(Long chatId) { mChatId = chatId; } - synchronized void putMessage(Object msg) + public synchronized void putMessage(Object msg) { mQueue.add(msg); mLastPutTime = System.currentTimeMillis(); } - synchronized int getCurrentState(long currentTime) + public synchronized int getCurrentState(long currentTime) { //currentTime is passed as parameter for optimisation to do not recall currentTimeMillis() many times long interval = currentTime - mLastSendTime; @@ -58,23 +114,29 @@ public abstract class TimedSendLongPollingBot extends TelegramLongPollingBot return WAIT; } - synchronized Object getMessage(long currentTime) + public synchronized Object getMessage(long currentTime) { mLastSendTime = currentTime; return mQueue.poll(); } - long getPutTime() + public long getPutTime() { return mLastPutTime; } - Long getChatId() + public Long getChatId() { return mChatId; } } + //Constructor + protected TimedSendLongPollingBot() + { + mSendTimer.schedule(new MessageSenderTask(), MANY_CHATS_SEND_INTERVAL, MANY_CHATS_SEND_INTERVAL); + } + //Something like destructor public void finish() { @@ -149,5 +211,5 @@ public abstract class TimedSendLongPollingBot extends TelegramLongPollingBot } } */ - abstract void sendMessageCallback(Long chatId, Object messageRequest); + public abstract void sendMessageCallback(Long chatId, Object messageRequest); }