from dotenv import load_dotenv
from types import ModuleType
+load_dotenv()
if "XDG_CONFIG_HOME" in os.environ:
CONFIG_HOME = os.environ["XDG_CONFIG_HOME"]
CONFIG_PATH = os.path.join(CONFIG_HOME, f"aptbot")
-load_dotenv()
-
PORT = 26538
LOCALHOST = "127.0.0.1"
messages = bot.receive_messages()
for message in messages:
if message.channel:
- method = Thread(
- target=modules[message.channel].main,
- args=(bot, message, )
- )
- method.daemon = True
- method.start()
- print(message)
- time.sleep(0.001)
+ if message.command:
+ print(
+ f"#{message.channel} ({message.command.value}) | {message.nick}: {message.value}")
+ try:
+ method = Thread(
+ target=modules[message.channel].main,
+ args=(bot, message, )
+ )
+ except KeyError:
+ pass
+ else:
+ method.daemon = True
+ method.start()
+ time.sleep(0.01)
def update_modules(modules: dict[str, ModuleType]):
modules.clear()
- # modules = {}
- channels = os.listdir(CONFIG_PATH)
+ channels = filter(lambda x: not x.startswith('.'), os.listdir(CONFIG_PATH))
+ channels = list(channels)
+ # print(channels)
for channel in channels:
account_path = os.path.join(CONFIG_PATH, f"{channel}")
- module_path = os.path.join(
- account_path, f"message_interpreter.py")
- spec = importlib.util.spec_from_file_location(
- "message_interpreter",
- module_path,
- )
- account_path = os.path.join(CONFIG_PATH, f"{channel}")
+ sys.path.append(account_path)
module_path = os.path.join(
account_path, f"message_interpreter.py")
spec = importlib.util.spec_from_file_location(
print(f"Problem Loading Module: {e}")
else:
modules[channel] = foo
+ sys.path.remove(account_path)
def initialize(bot: aptbot.bot.Bot):
channels = os.listdir(CONFIG_PATH)
for channel in channels:
- bot.join_channel(channel)
+ if not channel.startswith('.'):
+ bot.join_channel(channel)
def listener():
sys.exit()
elif "UPDATE" in command:
update_modules(modules)
+ elif "PART" in command:
+ bot.leave_channel(channel)
time.sleep(1)
def add_account(s: socket.socket, acc: str):
account_path = os.path.join(CONFIG_PATH, f"{acc}")
+ hidden_account_path = os.path.join(CONFIG_PATH, f".{acc}")
+ try:
+ os.rename(hidden_account_path, account_path)
+ except FileNotFoundError:
+ pass
os.makedirs(account_path, exist_ok=True)
- f = open(os.path.join(account_path, "message_interpreter.py"), "a")
- f.write("""from aptbot.bot import Bot, Message, Commands
+ # print(os.listdir("."))
+ # shutil.copy("message_interpreter.py", account_path)
+ try:
+ f = open(os.path.join(account_path, "message_interpreter.py"), "r")
+ except FileNotFoundError:
+ f = open(os.path.join(account_path, "message_interpreter.py"), "a")
+ f.write("""from aptbot.bot import Bot, Message, Commands
def main(bot, message: Message):
pass""")
- f.close()
+ f.close()
+ else:
+ f.close()
command = "JOIN"
channel = acc
def send_msg(s: socket.socket, msg: str):
command = "SEND"
- channel = "skgyorugo"
- msg = msg
+ channel = msg.split(' ')[0]
+ msg = msg[len(channel) + 1:]
s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+def disable_account(s: socket.socket, acc: str):
+ account_path = os.path.join(CONFIG_PATH, f"{acc}")
+ hidden_account_path = os.path.join(CONFIG_PATH, f".{acc}")
+ try:
+ os.rename(account_path, hidden_account_path)
+ except FileNotFoundError:
+ print(f"Account {acc} is already disabled.")
+
+ command = "PART"
+ channel = ""
+ msg = ""
+ s.send(bytes(f"{command}==={channel}==={msg}", "utf-8"))
+
+
def update(s: socket.socket):
command = "UPDATE"
channel = ""
listener()
s = socket.socket()
- s.connect((LOCALHOST, PORT))
+ try:
+ s.connect((LOCALHOST, PORT))
+ except ConnectionRefusedError:
+ pass
if argsv.add_account:
add_account(s, argsv.add_account)
+ if argsv.disable_account:
+ disable_account(s, argsv.disable_account)
if argsv.send_message:
send_msg(s, argsv.send_message)
if argsv.disable:
import socket
import time
-import sys
from enum import Enum
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Message:
- tags: dict[str, str]
- nick: str
- command: Optional[Commands]
- channel: str
- value: str
+ tags: dict[str, str] = field(default_factory=dict)
+ nick: str = ""
+ command: Optional[Commands] = None
+ channel: str = ""
+ value: str = ""
class Bot:
def __init__(self, nick: str, oauth_token: str, client_id: str):
self.irc = socket.socket()
- self.server = "irc.twitch.tv"
+ self.server = "irc.chat.twitch.tv"
self.port = 6667
self.nick = nick
self.oauth_token = oauth_token
self.connected_channels = []
def send_command(self, command: str):
- print(f"< {command}")
+ if "PASS" not in command:
+ print(f"< {command}")
self.irc.send((command + "\r\n").encode())
def connect(self):
self.irc.connect((self.server, self.port))
+ time.sleep(3)
self.send_command(f"PASS oauth:{self.oauth_token}")
self.send_command(f"NICK {self.nick}")
self.send_command(f"CAP REQ :twitch.tv/membership")
self.connected_channels.remove(channel)
def send_privmsg(self, channel: str, text: str):
+ print(f"#{channel} ({Commands.PRIVMSG.value}) | {self.nick}: {text}")
self.send_command(f"{Commands.PRIVMSG.value} #{channel} :{text}")
@staticmethod
def parse_message(received_msg: str) -> Message:
- message = Message({}, "", None, "", "")
+ message = Message()
value_start = received_msg.find(
':',
def _handle_message(self, received_msg: str) -> Message:
if received_msg == "PING :tmi.twitch.tv":
self.send_command("PONG :tmi.twitch.tv")
- return Message({}, "", None, "", "")
+ return Message()
elif not received_msg:
- return Message({}, "", None, "", "")
+ return Message()
return Bot.parse_message(received_msg)
def receive_messages(self) -> list[Message]:
messages = []
- i = 0
- while i < 5:
- if i:
- try:
- self.connect()
- self.join_channels(self.connected_channels)
- except OSError as e:
- print(f"Connection failed {e}")
- try:
- received_msgs = self.irc.recv(2048).decode()
- except ConnectionResetError as e:
- print(f"There was an error connecting with error {e}")
- print("Trying to connect again")
- time.sleep(2**i+1)
- i += 1
- else:
- print("broke")
- break
- else:
- sys.exit(1)
+ # i = 0
+ # while i < 5:
+ # if i:
+ # try:
+ # self.connect()
+ # self.join_channels(self.connected_channels)
+ # except OSError as e:
+ # print(f"Connection failed {e}")
+ # try:
+ # received_msgs = self.irc.recv(2048).decode()
+ # except ConnectionResetError as e:
+ # print(f"There was an error connecting with error {e}")
+ # print("Trying to connect again")
+ # time.sleep(2**i+1)
+ # i += 1
+ # else:
+ # print("broke")
+ # break
+ # else:
+ # sys.exit(1)
+ received_msgs = self.irc.recv(2048).decode()
for received_msgs in received_msgs.split("\r\n"):
messages.append(self._handle_message(received_msgs))
return messages