forked from HonusBot/HonusBot
253 lines
7.0 KiB
Python
253 lines
7.0 KiB
Python
# File: inline.py
|
|
# Description: Inline wrapper
|
|
# Author: Arves100
|
|
# Date: 25-12-2019 ~ 28-03-2020 . 29-04-2020
|
|
from telegram.ext import Updater, InlineQueryHandler, CommandHandler
|
|
import logging
|
|
from telegram import InlineQueryResultArticle, InputTextMessageContent, Bot
|
|
import uuid
|
|
import requests
|
|
from telegram.utils.request import Request
|
|
import json
|
|
|
|
BOT_VERSION = "0.4"
|
|
|
|
class Command:
|
|
def __init__(self):
|
|
self.desc = ""
|
|
self.name = ""
|
|
self.headerField = ""
|
|
self.mode = ""
|
|
|
|
def __init__(self, name, desc, mode, hf):
|
|
self.desc = desc
|
|
self.name = name
|
|
self.headerField = hf
|
|
self.mode = mode
|
|
|
|
def setMode(self, mode):
|
|
self.mode = mode
|
|
|
|
def getMode(self):
|
|
return self.mode
|
|
|
|
def setDesc(self, desc):
|
|
self.desc = desc
|
|
|
|
def getDesc(self):
|
|
return self.desc
|
|
|
|
def getName(self):
|
|
return self.name
|
|
|
|
def setName(self, name):
|
|
self.name = name
|
|
|
|
def setHeaderField(self, param):
|
|
self.headerField = param
|
|
|
|
def getHeaderField(self):
|
|
return self.headerField
|
|
|
|
class JSONRequest(object):
|
|
def __init__(self, config, command, senderId, text):
|
|
self.botName = config.getApiId()
|
|
self.botPassword = config.getApiPassword()
|
|
self.command = command
|
|
self.replyToUserId = 0
|
|
self.senderUserId = senderId
|
|
self.chatId = 0
|
|
self.queryText = text
|
|
self.rawSingleArgument = False
|
|
|
|
class JSONResponse(object):
|
|
def __init__(self, auth, success, img, dsc):
|
|
self.auth = auth
|
|
self.success = success
|
|
self.imgUrl = img
|
|
self.htmlDesc = dsc
|
|
|
|
def isAuthorized(self):
|
|
return self.auth
|
|
|
|
def succeeded(self):
|
|
return self.success
|
|
|
|
def getHtmlDesc(self):
|
|
return self.htmlDesc
|
|
|
|
def getImageUrl(self):
|
|
return self.imgUrl
|
|
|
|
class Config:
|
|
def __init__(self):
|
|
self.apiUrl = ""
|
|
self.apiId = ""
|
|
self.apiPassword = ""
|
|
self.botToken = ""
|
|
|
|
def read(self, name):
|
|
import configparser
|
|
cfg = configparser.ConfigParser()
|
|
cfg.read(name)
|
|
self.apiUrl = cfg["Api"]["Url"]
|
|
self.apiId = cfg["Api"]["Id"]
|
|
self.apiPassword = cfg["Api"]["Password"]
|
|
self.botToken = cfg["Bot"]["Token"]
|
|
self.botConnectionPool = cfg["Bot"]["ConnectionPool"]
|
|
self.userAgent = cfg["Api"]["UserAgent"]
|
|
|
|
def getApiUrl(self):
|
|
return self.apiUrl
|
|
|
|
def getApiId(self):
|
|
return self.apiId
|
|
|
|
def getApiPassword(self):
|
|
return self.apiPassword
|
|
|
|
def getBotToken(self):
|
|
return self.botToken
|
|
|
|
def getBotConnectionPool(self):
|
|
return self.botConnectionPool
|
|
|
|
def getUserAgent(self):
|
|
return self.userAgent
|
|
|
|
def jsonasResponse(dct):
|
|
if not "imageUrl" in dct or not "htmlDescription" in dct:
|
|
raise Exception("Invalid JSON")
|
|
|
|
return JSONResponse(dct["authorized"], dct["success"], dct["imageUrl"], dct["htmlDescription"])
|
|
|
|
class InlineBot:
|
|
def __init__(self):
|
|
self.errorText = "Sowwy mawster, an ewror occuwed. ;;w;;"
|
|
self.commands = []
|
|
self.config = None
|
|
|
|
self.__loadConfig()
|
|
self.__makeCommands()
|
|
|
|
def __loadConfig(self):
|
|
self.config = Config()
|
|
self.config.read("config.ini")
|
|
|
|
def __makeCommands(self):
|
|
self.commands.append(Command("Pony", "yay, ponies!", "imageurl", "imageUrl"))
|
|
self.commands.append(Command("Clop", "NSFW ponies", "imageurl", "imageUrl"))
|
|
self.commands.append(Command("Furry", "Furries", "imageurl", "imageUrl"))
|
|
self.commands.append(Command("Loli", "Cute lolis", "imageurl", "imageUrl"))
|
|
self.commands.append(Command("Yiff", "Yiff", "imageurl", "imageUrl"))
|
|
self.commands.append(Command("Lewd", "Say hi to the police", "imageurl", "imageUrl"))
|
|
self.commands.append(Command("Rasoio", "Dai facciamogli lo scherzo del rasoio!1!!", "genericurl", "htmlDescription"))
|
|
self.cancerCommand = Command("Cancer", "Search cancer", "imageurl", "imageUrl")
|
|
|
|
def callRequest(self, command, senderid, text = ""):
|
|
if not command:
|
|
return None
|
|
|
|
r = requests.post(self.config.getApiUrl(), headers = { "Content-Type" : "application/json", "User-Agent" : self.config.getUserAgent() }, data = json.JSONEncoder().encode(JSONRequest(self.config, command.getName(), senderid, text).__dict__))
|
|
|
|
if not r:
|
|
self.logger.warning('Request api fail')
|
|
return None
|
|
|
|
if len(r.text) < 1:
|
|
self.logger.warning('Request text api fail')
|
|
return None
|
|
|
|
resp = None
|
|
try:
|
|
resp = json.loads(r.text, object_hook = jsonasResponse)
|
|
except:
|
|
self.logger.warning('JSON deserialization failed')
|
|
return None ## Invalid json
|
|
|
|
if not resp.isAuthorized() or not resp.succeeded():
|
|
self.logger.warning('Unathorized/Unsuccessfull')
|
|
return None
|
|
|
|
return resp
|
|
|
|
def start(self):
|
|
self.prequest = Request(con_pool_size=int(self.config.getBotConnectionPool()))
|
|
self.bot = Bot(token=self.config.getBotToken(), request=self.prequest)
|
|
self.updater = Updater(bot=self.bot,use_context=True)
|
|
|
|
print("Inline bot v. {}".format(BOT_VERSION))
|
|
|
|
self.dispatcher = self.updater.dispatcher
|
|
|
|
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.dispatcher.add_error_handler(self.errorHandler)
|
|
|
|
inline_caps_handler = InlineQueryHandler(self.inlineQuery)
|
|
self.dispatcher.add_handler(inline_caps_handler)
|
|
|
|
print("Started!")
|
|
|
|
self.updater.start_polling()
|
|
self.updater.idle()
|
|
|
|
def errorHandler(self, update, context):
|
|
"""Log Errors caused by Updates."""
|
|
self.logger.error('Update "%s" caused error "%s"', update, context.error)
|
|
|
|
def commandStart(self, update, context):
|
|
context.bot.send_message(chat_id=update.effective_chat.id, text="Inline cancer, clop, furry spammer (h0nus)",parse_mode="HTML")
|
|
|
|
def makeResult(self, command, senderid, querySearch = ""):
|
|
if command == None:
|
|
return InlineQueryResultArticle(
|
|
id=uuid.uuid4().hex,
|
|
type="article",
|
|
title="Error",
|
|
input_message_content=InputTextMessageContent("Error", "HTML"),
|
|
description=self.errorText
|
|
)
|
|
|
|
resp = self.callRequest(command, senderid, querySearch)
|
|
contentText = ""
|
|
|
|
if resp == None:
|
|
return InlineQueryResultArticle(
|
|
id=uuid.uuid4().hex,
|
|
type="article",
|
|
title="Error",
|
|
input_message_content=InputTextMessageContent("Error", "HTML"),
|
|
description=self.errorText
|
|
)
|
|
|
|
if command.getMode() == "imageurl":
|
|
contentText = InputTextMessageContent("<a href=\"" + resp.getImageUrl() + "\">HonusBot</a>", "HTML")
|
|
elif command.getMode() == "genericurl":
|
|
contentText = InputTextMessageContent("<a href=\"" + resp.getHtmlDesc() + "\">link</a>", "HTML")
|
|
else:
|
|
contentText = InputTextMessageContent("Error", "HTML")
|
|
|
|
return InlineQueryResultArticle(
|
|
id=uuid.uuid4().hex,
|
|
type="article",
|
|
title=command.getName(),
|
|
input_message_content=contentText,
|
|
description=command.getDesc()
|
|
)
|
|
|
|
def inlineQuery(self, update, context):
|
|
senderid = str(update.inline_query.from_user.id)
|
|
query = update.inline_query.query
|
|
results = list()
|
|
if not query:
|
|
for command in self.commands:
|
|
results.append(self.makeResult(command, senderid))
|
|
else:
|
|
self.cancerCommand.setDesc("Search cancer about " + query)
|
|
results.append(self.makeResult(self.cancerCommand, senderid, query))
|
|
context.bot.answer_inline_query(update.inline_query.id, results, 1, False)
|
|
|
|
InlineBot().start()
|