# aptbot
+
A chatbot for twitch.tv
-import os
-
-if "XDG_CONFIG_HOME" in os.environ:
- CONFIG_HOME = os.environ["XDG_CONFIG_HOME"]
-elif "APPDATA" in os.environ:
- CONFIG_HOME = os.environ["APPDATA"]
-else:
- CONFIG_HOME = os.path.join(os.environ["HOME"], ".config")
-
-CONFIG_PATH = os.path.join(CONFIG_HOME, f"aptbot")
-
-
-PORT = 26538
-LOCALHOST = "127.0.0.1"
+++ /dev/null
-import importlib
-import importlib.util
-import os
-import socket
-import sys
-import time
-import traceback
-from threading import Thread
-from types import ModuleType
-
-from dotenv import load_dotenv
-
-import aptbot.args
-import aptbot.args_logic
-import aptbot.bot
-from aptbot import *
-
-load_dotenv()
-
-
-def handle_message(bot: aptbot.bot.Bot, modules: dict[str, ModuleType]):
- while True:
- messages = bot.receive_messages()
- for message in messages:
- if not message.channel:
- continue
- # if message.command:
- # print(
- # f"#{message.channel} ({message.command.value}) | \
- # {message.nick}: {message.value}"
- # )
- try:
- method = Thread(
- target=modules[message.channel].main,
- args=(
- bot,
- message,
- ),
- )
- except KeyError:
- pass
- else:
- method.daemon = True
- method.start()
-
-
-def start(bot: aptbot.bot.Bot, modules: dict[str, ModuleType]):
- load_modules(modules)
- message_handler_thread = Thread(
- target=handle_message,
- args=(
- bot,
- modules,
- ),
- )
- message_handler_thread.daemon = True
- message_handler_thread.start()
- for channel in modules:
- update_channel = Thread(
- target=modules[channel].start,
- args=(
- bot,
- aptbot.bot.Message({}, "", None, channel, ""),
- ),
- )
- update_channel.daemon = True
- update_channel.start()
-
-
-def load_modules(modules: dict[str, ModuleType]):
- modules.clear()
- channels = [
- c
- for c in os.listdir(CONFIG_PATH)
- if os.path.isdir(os.path.join(CONFIG_PATH, c))
- ]
- channels = filter(lambda x: not x.startswith("."), channels)
- for channel in channels:
- account_path = os.path.join(CONFIG_PATH, f"{channel}")
- sys.path.append(account_path)
- module_path = os.path.join(account_path, f"main.py")
- spec = importlib.util.spec_from_file_location(
- "main",
- module_path,
- )
- if not spec or not spec.loader:
- print("Problem loading spec")
- sys.path.remove(account_path)
- continue
- module = importlib.util.module_from_spec(spec)
- try:
- spec.loader.exec_module(module)
- except Exception as e:
- print(traceback.format_exc())
- print(f"Problem Loading Module: {e}")
- else:
- modules[channel] = module
- sys.path.remove(account_path)
-
-
-def initialize(bot: aptbot.bot.Bot):
- channels = [
- c
- for c in os.listdir(CONFIG_PATH)
- if os.path.isdir(os.path.join(CONFIG_PATH, c))
- ]
- channels = filter(lambda x: not x.startswith("."), channels)
- for channel in channels:
- if not channel.startswith("."):
- bot.join_channel(channel)
-
-
-def listener():
- NICK = os.getenv("APTBOT_NICK")
- OAUTH = os.getenv("APTBOT_PASS")
- if NICK and OAUTH:
- bot = aptbot.bot.Bot(NICK, OAUTH)
- else:
- sys.exit(1)
- bot.connect()
- modules = {}
- message_loop = Thread(
- target=start,
- args=(
- bot,
- modules,
- ),
- )
- message_loop.daemon = True
- message_loop.start()
- s = socket.socket()
- s.bind((LOCALHOST, PORT))
- s.listen(5)
- initialize(bot)
-
- while True:
- c, _ = s.accept()
- msg = c.recv(1024).decode()
- msg = msg.split("===")
- try:
- command = msg[0]
- channel = msg[1]
- msg = msg[2]
- except IndexError:
- pass
- else:
- if aptbot.args_logic.Commands.JOIN.value in command:
- bot.join_channel(channel)
- elif aptbot.args_logic.Commands.SEND.value in command:
- bot.send_privmsg(channel, msg)
- elif aptbot.args_logic.Commands.KILL.value in command:
- sys.exit()
- elif aptbot.args_logic.Commands.UPDATE.value in command:
- load_modules(modules)
- elif aptbot.args_logic.Commands.PART.value in command:
- bot.leave_channel(channel)
-
- time.sleep(1)
-
-
-def send(func):
- def inner(*args, **kwargs):
- s = socket.socket()
- s.connect((LOCALHOST, PORT))
- func(s, *args, **kwargs)
- s.close()
-
- return inner
-
-
-def main():
- argsv = aptbot.args.parse_arguments()
- os.makedirs(CONFIG_PATH, exist_ok=True)
- if argsv.enable:
- listener()
-
- s = socket.socket()
- try:
- s.connect((LOCALHOST, PORT))
- except ConnectionRefusedError:
- pass
-
- if argsv.add_account:
- aptbot.args_logic.add_account(s, argsv.add_account)
- if argsv.disable_account:
- aptbot.args_logic.disable_account(s, argsv.disable_account)
- if argsv.send_message:
- aptbot.args_logic.send_msg(s, argsv.send_message)
- if argsv.disable:
- aptbot.args_logic.disable(s)
- if argsv.update:
- aptbot.args_logic.update(s)
- s.close()
-
-
-if __name__ == "__main__":
- main()
-import socket
import os
+import shutil
+import socket
from enum import Enum
-from aptbot import CONFIG_PATH
+
+from .constants import CONFIG_PATH
-class Commands(Enum):
+class BotCommands(Enum):
JOIN = "JOIN"
PART = "PART"
SEND = "SEND"
pass
os.makedirs(account_path, exist_ok=True)
- # print(os.listdir("."))
- # shutil.copy("main.py", account_path)
- try:
- f = open(os.path.join(account_path, "main.py"), "r")
- except FileNotFoundError:
- f = open(os.path.join(account_path, "main.py"), "a")
- f.write("""from aptbot.bot import Bot, Message, Commands
-def main(bot, message: Message):
- pass""")
- f.close()
- else:
- f.close()
-
- command = Commands.JOIN.value
+ shutil.copyfile(
+ os.path.join(os.path.dirname(os.path.realpath(__file__)), "resources/main.py"),
+ os.path.join(account_path, "main.py"),
+ )
+
+ command = BotCommands.JOIN.value
channel = acc
msg = ""
- s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+ try:
+ s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+ except BrokenPipeError:
+ pass
def send_msg(s: socket.socket, msg: str):
- command = Commands.SEND.value
- channel = msg.split(' ')[0]
- msg = msg[len(channel) + 1:]
+ command = BotCommands.SEND.value
+ channel = msg.split(" ")[0]
+ msg = msg[len(channel) + 1 :]
s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
def disable(s: socket.socket):
- command = Commands.KILL.value
+ command = BotCommands.KILL.value
channel = ""
msg = ""
s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
except FileNotFoundError:
print(f"Account {acc} is already disabled.")
- command = Commands.PART.value
+ command = BotCommands.PART.value
channel = ""
msg = ""
- s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+ try:
+ s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+ except BrokenPipeError:
+ pass
def update(s: socket.socket):
- command = Commands.UPDATE.value
+ command = BotCommands.UPDATE.value
channel = ""
msg = ""
s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+import logging
import re
-import time
+import socket
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Union
-import websocket
+logger = logging.getLogger(__name__)
class Commands(Enum):
class Bot:
def __init__(self, nick: str, oauth_token: str):
- self._irc = websocket.WebSocket()
- self._server = "wss://irc-ws.chat.twitch.tv:443"
+ self._irc = socket.socket()
+ self._server = "irc.chat.twitch.tv"
+ self._port = 6667
self._nick = nick
self._oauth_token = oauth_token
self._connected_channels = []
def send_command(self, command: str):
if "PASS" not in command:
- print(f"< {command}")
+ logger.debug(f"< {command}")
self._irc.send((command + "\r\n").encode())
def connect(self):
- self._irc.connect(self._server)
+ self._irc.connect((self._server, self._port))
self.send_command(f"PASS oauth:{self._oauth_token}")
self.send_command(f"NICK {self._nick}")
self.send_command(f"CAP REQ :twitch.tv/membership")
replied_command = ""
if isinstance(text, list):
for t in text:
- # print(
- # f"#{channel} ({Commands.PRIVMSG.value}) | {self._nick}: {t}")
command = replied_command + f"{Commands.PRIVMSG.value} #{channel} :{t}"
self.send_command(command)
else:
- # print(f"#{channel} ({Commands.PRIVMSG.value}) | {self._nick}: {text}")
command = replied_command + f"{Commands.PRIVMSG.value} #{channel} :{text}"
self.send_command(command)
@staticmethod
- def parse_message(received_msg: str) -> Message:
- # print(received_msg)
- message = Message()
-
- value_start = received_msg.find(
- ":", received_msg.find(" ", received_msg.find(" ") + 1)
- )
- if value_start != -1:
- message.value = received_msg[value_start:][1:]
- received_msg = received_msg[: value_start - 1]
-
- parts = received_msg.split(" ")
-
- for part in parts:
- if part.startswith("@"):
- part = part[1:]
- for tag in part.split(";"):
- tag = tag.split("=")
- try:
- message.tags[tag[0]] = tag[1]
- except IndexError:
- message.tags[tag[0]] = ""
- elif part.startswith(":"):
- part = part[1:]
- if "!" in part:
- message.nick = part.split("!")[0]
- elif part in set(command.value for command in Commands):
- message.command = Commands(part)
- elif part.startswith("#"):
- part = part[1:]
- message.channel = part
-
- message.value = " ".join(message.value.split())
-
- if not message.tags.get("reply-parent-msg-body", None):
- # print(message)
- try:
- print(f"#{message.channel} | {message.tags['display-name']}: {message.value}")
- except KeyError:
- pass
- return message
-
- rep = message.tags["reply-parent-msg-body"]
- new_rep = ""
+ def _replace_escaped_space_in_tags(tag_value: str) -> str:
+ new_tag_value = ""
ignore_next = False
- for i in range(len(rep)):
+ for i in range(len(tag_value)):
if ignore_next:
ignore_next = False
continue
- if not rep[i] == "\\":
- new_rep += rep[i]
+ if not tag_value[i] == "\\":
+ new_tag_value += tag_value[i]
ignore_next = False
continue
- if i + 1 == len(rep):
- new_rep += rep[i]
+ if i + 1 == len(tag_value):
+ new_tag_value += tag_value[i]
break
- if rep[i + 1] == "\\":
- new_rep += "\\"
- elif rep[i + 1] == "s":
- new_rep += " "
+ if tag_value[i + 1] == "\\":
+ new_tag_value += "\\"
+ elif tag_value[i + 1] == "s":
+ new_tag_value += " "
ignore_next = True
- message.tags["reply-parent-msg-body"] = " ".join(new_rep.split())
+ @staticmethod
+ def parse_message(received_msg: str) -> Message:
+ split = re.search(
+ r"(?:(?:@(.+))\s)?:(?:(?:(\w+)!\w+@\w+\.)?.+)\s(\w+)\s\#(\w+)\s:?(.+)?",
+ received_msg,
+ )
- # print(message)
+ if not split:
+ return Message()
+
+ tags = {}
+ if split[1]:
+ for tag in split[1].split(";"):
+ tag_name, tag_value = tag.split("=")
+ if tag.split("=")[0] == "reply-parent-msg-body":
+ tag_value = Bot._replace_escaped_space_in_tags(tag.split("=")[1])
+ tags[tag_name] = " ".join(tag_value.split())
+
+ nick = split[2]
try:
- print(f"#{message.channel} | {message.tags['display-name']}: {message.value}")
+ command = Commands[split[3]]
except KeyError:
- pass
- return message
+ return Message()
+ channel = split[4]
+ value = " ".join(split[5].split())
+
+ return Message(
+ tags=tags,
+ nick=nick,
+ command=command,
+ channel=channel,
+ value=value,
+ )
def _handle_message(self, received_msg: str) -> Message:
+ logger.debug(received_msg)
if received_msg == "PING :tmi.twitch.tv":
self.send_command("PONG :tmi.twitch.tv")
return Message()
def receive_messages(self) -> list[Message]:
messages = []
- received_msgs = self._irc.recv()
- for received_msgs in received_msgs.split("\r\n"):
+ received_msgs = self._irc.recv(2048)
+ for received_msgs in received_msgs.decode("utf-8").split("\r\n"):
messages.append(self._handle_message(received_msgs))
return messages
--- /dev/null
+import os
+
+if os.name == "posix":
+ if "XDG_CONFIG_HOME" in os.environ and "XDG_CACHE_HOME" in os.environ:
+ CONFIG_PATH = os.path.join(os.environ["XDG_CONFIG_HOME"], f"aptbot")
+ CONFIG_LOGS = os.path.join(os.environ["XDG_CACHE_HOME"], "aptbot")
+ else:
+ CONFIG_PATH = os.path.join(os.environ["HOME"], ".config/aptbot")
+ CONFIG_LOGS = os.path.join(os.environ["HOME"], ".cache/aptbot/logs")
+elif os.name == "nt":
+ if "APPDATA" in os.environ:
+ CONFIG_PATH = os.path.join(os.environ["APPDATA"], "aptbot/accounts")
+ CONFIG_LOGS = os.path.join(os.environ["APPDATA"], "aptbot/logs")
+
+PORT = 26538
+LOCALHOST = "127.0.0.1"
+
+os.makedirs(CONFIG_LOGS, exist_ok=True)
+CONFIG_FILE = os.path.join(CONFIG_LOGS, "aptbot.log")
+# open(CONFIG_FILE, "a").close()
+
+LOGGING_DICT = {
+ "version": 1,
+ "formatters": {
+ "simple": {"format": "[%(levelname)s] %(asctime)s: %(name)s; %(message)s"}
+ },
+ "handlers": {
+ "console": {
+ "class": "logging.StreamHandler",
+ "level": "DEBUG",
+ "formatter": "simple",
+ "stream": "ext://sys.stdout",
+ },
+ "file": {
+ "class": "logging.handlers.TimedRotatingFileHandler",
+ "level": "DEBUG",
+ "formatter": "simple",
+ "filename": CONFIG_FILE,
+ "when": "w0",
+ "utc": True,
+ "backupCount": 3,
+ },
+ },
+ "loggers": {
+ "basicLogger": {
+ "level": "DEBUG",
+ "handlers": ["console", "file"],
+ "propagate": "no",
+ }
+ },
+ "root": {
+ "level": "DEBUG",
+ "handlers": ["console", "file"],
+ },
+ "disable_existing_loggers": False,
+}
--- /dev/null
+import importlib
+import importlib.util
+import logging
+import logging.config
+import os
+import socket
+import sys
+import time
+import traceback
+from threading import Thread
+from types import ModuleType
+
+from dotenv import load_dotenv
+
+from . import args_logic
+from .args import parse_arguments
+from .bot import Bot, Message
+from .constants import CONFIG_LOGS, CONFIG_PATH, LOCALHOST, LOGGING_DICT, PORT
+
+logging.config.dictConfig(LOGGING_DICT)
+logger = logging.getLogger(__name__)
+
+load_dotenv()
+
+
+def handle_message(bot: Bot, modules: dict[str, ModuleType]):
+ while True:
+ messages = bot.receive_messages()
+ for message in messages:
+ if not message.channel:
+ continue
+ try:
+ method = Thread(
+ target=modules[message.channel].main,
+ args=(
+ bot,
+ message,
+ ),
+ )
+ except KeyError:
+ pass
+ else:
+ method.daemon = True
+ method.start()
+
+
+def start(bot: Bot, modules: dict[str, ModuleType]):
+ load_modules(modules)
+ message_handler_thread = Thread(
+ target=handle_message,
+ args=(
+ bot,
+ modules,
+ ),
+ )
+ message_handler_thread.daemon = True
+ message_handler_thread.start()
+ for channel in modules:
+ update_channel = Thread(
+ target=modules[channel].start,
+ args=(
+ bot,
+ Message({}, "", None, channel, ""),
+ ),
+ )
+ update_channel.daemon = True
+ update_channel.start()
+
+
+def load_modules(modules: dict[str, ModuleType]):
+ modules.clear()
+ channels = [
+ c
+ for c in os.listdir(CONFIG_PATH)
+ if os.path.isdir(os.path.join(CONFIG_PATH, c))
+ ]
+ channels = filter(lambda x: not x.startswith("."), channels)
+ for channel in channels:
+ account_path = os.path.join(CONFIG_PATH, f"{channel}")
+ sys.path.append(account_path)
+ module_path = os.path.join(account_path, "main.py")
+ spec = importlib.util.spec_from_file_location(
+ "main",
+ module_path,
+ )
+ if not spec or not spec.loader:
+ print("Problem loading spec")
+ sys.path.remove(account_path)
+ continue
+ module = importlib.util.module_from_spec(spec)
+ try:
+ spec.loader.exec_module(module)
+ except Exception as e:
+ logger.exception(f"Problem Loading Module: {e}")
+ logger.exception(traceback.format_exc())
+ else:
+ modules[channel] = module
+ sys.path.remove(account_path)
+
+
+def initialize(bot: Bot):
+ channels = [
+ c
+ for c in os.listdir(CONFIG_PATH)
+ if os.path.isdir(os.path.join(CONFIG_PATH, c))
+ ]
+ channels = filter(lambda x: not x.startswith("."), channels)
+ for channel in channels:
+ if not channel.startswith("."):
+ bot.join_channel(channel)
+
+
+def listener():
+ NICK = os.getenv("APTBOT_NICK")
+ OAUTH = os.getenv("APTBOT_PASS")
+ if NICK and OAUTH:
+ bot = Bot(NICK, OAUTH)
+ else:
+ print(
+ "Please set the environment variables:\nAPTBOT_NICK\nAPTBOT_PASS",
+ file=sys.stderr,
+ )
+ time.sleep(3)
+ sys.exit(1)
+ bot.connect()
+ modules = {}
+ message_loop = Thread(
+ target=start,
+ args=(
+ bot,
+ modules,
+ ),
+ )
+ message_loop.daemon = True
+ message_loop.start()
+ s = socket.socket()
+ s.bind((LOCALHOST, PORT))
+ s.listen(5)
+ initialize(bot)
+
+ while True:
+ c, _ = s.accept()
+ msg = c.recv(1024).decode()
+ msg = msg.split("===")
+ try:
+ command = msg[0]
+ channel = msg[1]
+ msg = msg[2]
+ except IndexError:
+ pass
+ else:
+ if args_logic.BotCommands.JOIN.value in command:
+ bot.join_channel(channel)
+ elif args_logic.BotCommands.SEND.value in command:
+ bot.send_privmsg(channel, msg)
+ elif args_logic.BotCommands.KILL.value in command:
+ sys.exit()
+ elif args_logic.BotCommands.UPDATE.value in command:
+ load_modules(modules)
+ elif args_logic.BotCommands.PART.value in command:
+ bot.leave_channel(channel)
+
+ time.sleep(1)
+
+
+def send(func):
+ def inner(*args, **kwargs):
+ s = socket.socket()
+ s.connect((LOCALHOST, PORT))
+ func(s, *args, **kwargs)
+ s.close()
+
+ return inner
+
+
+def main():
+ argsv = parse_arguments()
+ os.makedirs(CONFIG_PATH, exist_ok=True)
+ os.makedirs(CONFIG_LOGS, exist_ok=True)
+ if argsv.enable:
+ listener()
+
+ s = socket.socket()
+ try:
+ s.connect((LOCALHOST, PORT))
+ except ConnectionRefusedError:
+ pass
+
+ if argsv.add_account:
+ args_logic.add_account(s, argsv.add_account)
+ if argsv.disable_account:
+ args_logic.disable_account(s, argsv.disable_account)
+ if argsv.send_message:
+ args_logic.send_msg(s, argsv.send_message)
+ if argsv.disable:
+ args_logic.disable(s)
+ if argsv.update:
+ args_logic.update(s)
+ s.close()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+from aptbot.bot import Bot, Commands, Message
+
+
+def start(bot: Bot, message: Message):
+ pass
+
+
+def main(bot: Bot, message: Message):
+ # Check whether the message sent is a message by a user in chat
+ # and not some notification.
+ if message.command == Commands.PRIVMSG:
+ # Check the content of the message and if the first word is '!hello'
+ # send a reply by creating a new thread.
+ # You can also use `message.nick` instead of `message.tags['display-name']`
+ # but then the message sent back
+ # will contain the name of the user in all lowercase
+ if message.value.split()[0] == "!hello":
+ bot.send_privmsg(
+ message.channel,
+ f"hello {message.tags['display-name']}",
+ reply=message.tags["id"],
+ )
--- /dev/null
+# Examples
+
+Add an account using `aptbot --add-account "account_name"`.
+A directory will be created in `~/.config/aptbot/` on Linux
+or `%APPDATA%\aptbot\` on Windows with the twitch id of that account.
+
+The contents of each example directory here should mimic the contents of the
+directory with the twitch id.
+
+Each account directory should contain a `main.py` file
+with the most minimal code being:
+
+```python
+from aptbot import Bot, Message, Commands
+
+# Gets ran at the beginning, when the account connects.
+# Can be used for an infinite loop within the account,
+# so the bot send messages, even when chat is dead.
+def start(bot: Bot, message: Message):
+ pass
+
+# Gets ran every time the IRC channel sends a message.
+# This can either be a message from a user
+# a raid, when a mod deletes a message, etc.
+def main(bot: Bot, message: Message):
+ pass
+```
+++ /dev/null
-from aptbot.bot import Bot, Message, Commands
-import os
-import importlib
-import importlib.util
-from importlib import reload
-import traceback
-
-import tools.raid
-reload(tools.raid)
-
-
-PATH = os.path.dirname(os.path.realpath(__file__))
-
-commands = [
- c for c in os.listdir(PATH) if os.path.isfile(os.path.join(PATH, c))
-]
-commands.remove(os.path.split(__file__)[1])
-specs = {}
-for command in commands:
- if command.split('.')[0]:
- specs[command.split('.')[0]] = (
- importlib.util.spec_from_file_location(
- f"{command.split('.')[0]}",
- os.path.join(PATH, command)
- )
- )
-
-modules = {}
-for command in specs:
- modules[command] = importlib.util.module_from_spec(specs[command])
- if specs[command] and specs[command].loader:
- try:
- specs[command].loader.exec_module(modules[command])
- except Exception as e:
- print()
- print(traceback.format_exc())
- print(f"Problem Loading Module: {e}")
-
-
-def main(bot: Bot, message: Message):
- prefix = '?'
- command = message.value.split(' ')[0]
- if message.command == Commands.PRIVMSG and command.startswith(prefix):
- try:
- modules[command[1:]].main(bot, message)
- except KeyError:
- pass
-
- tools.raid.raid(bot, message)
+++ /dev/null
-from aptbot.bot import Message, Commands, Bot
-
-
-def main(bot: Bot, message: Message):
- msg = message.nick + " you have been scammed KEKW"
- bot.send_privmsg(message.channel, msg)
+++ /dev/null
-from aptbot.bot import Message, Commands, Bot
-
-
-def main(bot: Bot, message: Message):
- msg = ' '.join(message.value.split(' ')[1:])
- msg = (msg + ' ') * 10
- bot.send_privmsg(message.channel, msg)
+++ /dev/null
-from aptbot.bot import Bot, Message, Commands
-
-
-def raid(bot: Bot, message: Message):
- if message.command == Commands.USERNOTICE:
- if message.tags["msg-id"] == "raid":
- raider_name = message.tags["msg-param-displayName"]
- raider_login = message.tags["msg-param-login"]
- raider_id = message.tags["user-id"]
- raider_game = ""
- if raider_id:
- raider_channel_info = "channel info here"
- viewers = message.tags["msg-param-viewerCount"]
- viewers = f"{viewers} viewer" if viewers == "1" else f"{viewers} viewers"
- msg_reply = f"POGGERS {raider_name} has raided {message.channel} with {viewers}!!! Why don\'t you check them out at https://twitch.tv/{raider_login}"
- if raider_game:
- msg_reply += f' they were just playing {raider_game}.'
- bot.send_privmsg(message.channel, msg_reply)
--- /dev/null
+from aptbot.bot import Bot, Commands, Message
+
+
+def start(bot: Bot, message: Message):
+ pass
+
+
+def main(bot: Bot, message: Message):
+ # Check whether the message sent is a message by a user in chat
+ # and not some notification.
+ if message.command == Commands.PRIVMSG:
+ # Check the content of the message and if the first word is '!hello'
+ # send a reply by creating a new thread.
+ # You can also use `message.nick` instead of `message.tags['display-name']`
+ # but then the message sent back
+ # will contain the name of the user in all lowercase
+ if message.value.split()[0] == "!hello":
+ bot.send_privmsg(
+ message.channel,
+ f"hello {message.tags['display-name']}",
+ reply=message.tags["id"],
+ )
--- /dev/null
+black==22.3.0
+click==8.1.3
+flake8==4.0.1
+flake8-black==0.3.3
+mccabe==0.6.1
+mypy-extensions==0.4.3
+pathspec==0.9.0
+platformdirs==2.5.2
+pycodestyle==2.8.0
+pyflakes==2.4.0
+python-dotenv==0.20.0
+tomli==2.0.1
+urllib3==1.26.9
setuptools.setup(
name="aptbot",
- version="0.0.2",
+ version="0.1.0",
author="Georgios Atheridis",
author_email="atheridis@tutamail.com",
description="A chatbot for twitch.tv",
long_description_content_type="text/markdown",
url="https://github.com/atheridis/aptbot",
classifiers=[
- "License :: OSI Approved :: MIT License"
+ "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3.9",
],
packages=setuptools.find_packages(),
+ package_data={"aptbot": ["resources/main.py"]},
entry_points={
"console_scripts": [
- "aptbot=aptbot.__main__:main",
+ "aptbot=aptbot.main:main",
],
},
install_requires=[
"python-dotenv",
"urllib3",
- "websocket-client"
],
- python_requires=">=3.7",
+ python_requires=">=3.9",
)