259 lines
7.4 KiB
Python
Executable File
259 lines
7.4 KiB
Python
Executable File
# File: inline.py
|
||
# Description: Inline wrapper
|
||
# Author: Arves100
|
||
# Date: 25-12-2019 ~ 28-03-2020 . 29-04-2020
|
||
from telegram.ext import Updater, InlineQueryHandler, CommandHandler
|
||
import logging
|
||
from telegram import InlineQueryResultArticle, InputTextMessageContent, Bot
|
||
import uuid
|
||
import requests
|
||
from telegram.utils.request import Request
|
||
import json
|
||
|
||
BOT_VERSION = "0.4"
|
||
|
||
class Command:
|
||
def __init__(self):
|
||
self.desc = ""
|
||
self.name = ""
|
||
self.headerField = ""
|
||
self.mode = ""
|
||
|
||
def __init__(self, name, desc, mode, hf):
|
||
self.desc = desc
|
||
self.name = name
|
||
self.headerField = hf
|
||
self.mode = mode
|
||
|
||
def setMode(self, mode):
|
||
self.mode = mode
|
||
|
||
def getMode(self):
|
||
return self.mode
|
||
|
||
def setDesc(self, desc):
|
||
self.desc = desc
|
||
|
||
def getDesc(self):
|
||
return self.desc
|
||
|
||
def getName(self):
|
||
return self.name
|
||
|
||
def setName(self, name):
|
||
self.name = name
|
||
|
||
def setHeaderField(self, param):
|
||
self.headerField = param
|
||
|
||
def getHeaderField(self):
|
||
return self.headerField
|
||
|
||
class JSONRequest(object):
|
||
def __init__(self, config, command, senderId, text):
|
||
self.botName = config.getApiId()
|
||
self.botPassword = config.getApiPassword()
|
||
self.command = command
|
||
self.replyToUserId = 0
|
||
self.senderUserId = senderId
|
||
self.chatId = 987120603
|
||
self.queryText = text
|
||
self.rawSingleArgument = False
|
||
|
||
class JSONResponse(object):
|
||
def __init__(self, auth, success, img, dsc):
|
||
self.auth = auth
|
||
self.success = success
|
||
self.imgUrl = img
|
||
self.htmlDesc = dsc
|
||
|
||
def isAuthorized(self):
|
||
return self.auth
|
||
|
||
def succeeded(self):
|
||
return self.success
|
||
|
||
def getHtmlDesc(self):
|
||
return self.htmlDesc
|
||
|
||
def getImageUrl(self):
|
||
return self.imgUrl
|
||
|
||
class Config:
|
||
def __init__(self):
|
||
self.apiUrl = ""
|
||
self.apiId = ""
|
||
self.apiPassword = ""
|
||
self.botToken = ""
|
||
|
||
def read(self, name):
|
||
import configparser
|
||
cfg = configparser.ConfigParser()
|
||
cfg.read(name)
|
||
self.apiUrl = cfg["Api"]["Url"]
|
||
self.apiId = cfg["Api"]["Id"]
|
||
self.apiPassword = cfg["Api"]["Password"]
|
||
self.botToken = cfg["Bot"]["Token"]
|
||
self.botConnectionPool = cfg["Bot"]["ConnectionPool"]
|
||
self.userAgent = cfg["Api"]["UserAgent"]
|
||
|
||
def getApiUrl(self):
|
||
return self.apiUrl
|
||
|
||
def getApiId(self):
|
||
return self.apiId
|
||
|
||
def getApiPassword(self):
|
||
return self.apiPassword
|
||
|
||
def getBotToken(self):
|
||
return self.botToken
|
||
|
||
def getBotConnectionPool(self):
|
||
return self.botConnectionPool
|
||
|
||
def getUserAgent(self):
|
||
return self.userAgent
|
||
|
||
def jsonasResponse(dct):
|
||
if not "imageUrl" in dct or not "htmlDescription" in dct:
|
||
raise Exception("Invalid JSON")
|
||
|
||
return JSONResponse(dct["authorized"], dct["success"], dct["imageUrl"], dct["htmlDescription"])
|
||
|
||
class InlineBot:
|
||
def __init__(self):
|
||
self.errorText = "Sowwy mawster, an ewror occuwed. ;;w;;"
|
||
self.commands = []
|
||
self.inlinecommands = []
|
||
self.config = None
|
||
|
||
self.__loadConfig()
|
||
self.__makeCommands()
|
||
|
||
def __loadConfig(self):
|
||
self.config = Config()
|
||
self.config.read("config.ini")
|
||
|
||
def __makeCommands(self):
|
||
self.commands.append(Command("Pony", "yay, ponies!", "imageurl", "imageUrl"))
|
||
self.commands.append(Command("Clop", "NSFW ponies", "imageurl", "imageUrl"))
|
||
self.commands.append(Command("Furry", "Furries", "imageurl", "imageUrl"))
|
||
self.commands.append(Command("Loli", "Cute lolis", "imageurl", "imageUrl"))
|
||
self.commands.append(Command("Yiff", "Yiff", "imageurl", "imageUrl"))
|
||
self.commands.append(Command("Lewd", "Say hi to the police", "imageurl", "imageUrl"))
|
||
self.commands.append(Command("Rasoio", "Dai facciamogli lo scherzo del rasoio!1!!", "genericurl", "htmlDescription"))
|
||
self.commands.append(Command("Pillon", "Sal🅱️ini ti blocca i porno, che fai?", "genericurl", "htmlDescription"))
|
||
# self.commands.append(Command("Pillon", "", "genericurl", "htmlDescription"))
|
||
self.inlinecommands.append(Command("Cancer", "Search on tbib.org", "imageurl", "imageUrl"))
|
||
self.inlinecommands.append(Command("R34", "Search on rule34.xxx", "imageurl", "imageUrl"))
|
||
self.inlinecommands.append(Command("Safe", "Search on e926", "imageurl", "imageUrl"))
|
||
|
||
def callRequest(self, command, senderid, text = ""):
|
||
if not command:
|
||
return None
|
||
|
||
r = requests.post(self.config.getApiUrl(), headers = { "Content-Type" : "application/json", "User-Agent" : self.config.getUserAgent() }, data = json.JSONEncoder().encode(JSONRequest(self.config, command.getName(), senderid, text).__dict__))
|
||
|
||
if not r:
|
||
self.logger.warning('Request api fail')
|
||
return None
|
||
|
||
if len(r.text) < 1:
|
||
self.logger.warning('Request text api fail')
|
||
return None
|
||
|
||
resp = None
|
||
try:
|
||
resp = json.loads(r.text, object_hook = jsonasResponse)
|
||
except:
|
||
self.logger.warning('JSON deserialization failed')
|
||
return None ## Invalid json
|
||
|
||
if not resp.isAuthorized() or not resp.succeeded():
|
||
self.logger.warning('Unathorized/Unsuccessfull')
|
||
return None
|
||
|
||
return resp
|
||
|
||
def start(self):
|
||
self.prequest = Request(con_pool_size=int(self.config.getBotConnectionPool()))
|
||
self.bot = Bot(token=self.config.getBotToken(), request=self.prequest)
|
||
self.updater = Updater(bot=self.bot,use_context=True)
|
||
|
||
print("Inline bot v. {}".format(BOT_VERSION))
|
||
|
||
self.dispatcher = self.updater.dispatcher
|
||
|
||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
self.dispatcher.add_error_handler(self.errorHandler)
|
||
|
||
inline_caps_handler = InlineQueryHandler(self.inlineQuery)
|
||
self.dispatcher.add_handler(inline_caps_handler)
|
||
|
||
print("Started!")
|
||
|
||
self.updater.start_polling()
|
||
self.updater.idle()
|
||
|
||
def errorHandler(self, update, context):
|
||
"""Log Errors caused by Updates."""
|
||
self.logger.error('Update "%s" caused error "%s"', update, context.error)
|
||
|
||
def commandStart(self, update, context):
|
||
context.bot.send_message(chat_id=update.effective_chat.id, text="Inline cancer, clop, furry spammer (h0nus)",parse_mode="HTML")
|
||
|
||
def makeResult(self, command, senderid, querySearch = ""):
|
||
if command == None:
|
||
return InlineQueryResultArticle(
|
||
id=uuid.uuid4().hex,
|
||
type="article",
|
||
title="Error",
|
||
input_message_content=InputTextMessageContent("Error", "HTML"),
|
||
description=self.errorText
|
||
)
|
||
|
||
resp = self.callRequest(command, senderid, querySearch)
|
||
contentText = ""
|
||
|
||
if resp == None:
|
||
return InlineQueryResultArticle(
|
||
id=uuid.uuid4().hex,
|
||
type="article",
|
||
title="Error",
|
||
input_message_content=InputTextMessageContent("Error", "HTML"),
|
||
description=self.errorText
|
||
)
|
||
|
||
if command.getMode() == "imageurl":
|
||
contentText = InputTextMessageContent("<a href=\"" + resp.getImageUrl() + "\">HonusBot</a>", "HTML")
|
||
elif command.getMode() == "genericurl":
|
||
contentText = InputTextMessageContent(resp.getHtmlDesc(), "HTML")
|
||
else:
|
||
contentText = InputTextMessageContent("Error", "HTML")
|
||
|
||
return InlineQueryResultArticle(
|
||
id=uuid.uuid4().hex,
|
||
type="article",
|
||
title=command.getName(),
|
||
input_message_content=contentText,
|
||
description=command.getDesc()
|
||
)
|
||
|
||
def inlineQuery(self, update, context):
|
||
senderid = str(update.inline_query.from_user.id)
|
||
query = update.inline_query.query
|
||
results = list()
|
||
if not query:
|
||
for command in self.commands:
|
||
results.append(self.makeResult(command, senderid))
|
||
else:
|
||
for command in self.inlinecommands:
|
||
command.setDesc("Search: " + query)
|
||
results.append(self.makeResult(command, senderid, query))
|
||
context.bot.answer_inline_query(update.inline_query.id, results, 1, False)
|
||
|
||
InlineBot().start()
|