# if message.command:
# print(
# f"#{message.channel} ({message.command.value}) | \
-# {message.nick}: {message.value}"
+ # {message.nick}: {message.value}"
# )
try:
method = Thread(
target=modules[message.channel].main,
- args=(bot, message, )
+ args=(
+ bot,
+ message,
+ ),
)
except KeyError:
pass
load_modules(modules)
message_handler_thread = Thread(
target=handle_message,
- args=(bot, modules, )
+ args=(
+ bot,
+ modules,
+ ),
)
message_handler_thread.daemon = True
message_handler_thread.start()
for channel in modules:
update_channel = Thread(
target=modules[channel].start,
- args=(bot, aptbot.bot.Message({}, "", None, channel, ""), )
+ args=(
+ bot,
+ aptbot.bot.Message({}, "", None, channel, ""),
+ ),
)
update_channel.daemon = True
update_channel.start()
def load_modules(modules: dict[str, ModuleType]):
modules.clear()
channels = [
- c for c in os.listdir(CONFIG_PATH) if os.path.isdir(os.path.join(CONFIG_PATH, c))
+ c
+ for c in os.listdir(CONFIG_PATH)
+ if os.path.isdir(os.path.join(CONFIG_PATH, c))
]
- channels = filter(lambda x: not x.startswith('.'), channels)
+ channels = filter(lambda x: not x.startswith("."), channels)
for channel in channels:
account_path = os.path.join(CONFIG_PATH, f"{channel}")
sys.path.append(account_path)
- module_path = os.path.join(
- account_path, f"main.py")
+ module_path = os.path.join(account_path, f"main.py")
spec = importlib.util.spec_from_file_location(
"main",
module_path,
def initialize(bot: aptbot.bot.Bot):
channels = [
- c for c in os.listdir(CONFIG_PATH) if os.path.isdir(os.path.join(CONFIG_PATH, c))
+ c
+ for c in os.listdir(CONFIG_PATH)
+ if os.path.isdir(os.path.join(CONFIG_PATH, c))
]
- channels = filter(lambda x: not x.startswith('.'), channels)
+ channels = filter(lambda x: not x.startswith("."), channels)
for channel in channels:
- if not channel.startswith('.'):
+ if not channel.startswith("."):
bot.join_channel(channel)
sys.exit(1)
bot.connect()
modules = {}
- message_loop = Thread(target=start, args=(bot, modules, ))
+ message_loop = Thread(
+ target=start,
+ args=(
+ bot,
+ modules,
+ ),
+ )
message_loop.daemon = True
message_loop.start()
s = socket.socket()
time.sleep(1)
-def send(func,):
+def send(func):
def inner(*args, **kwargs):
s = socket.socket()
s.connect((LOCALHOST, PORT))
func(s, *args, **kwargs)
s.close()
+
return inner
self._connected_channels = []
def send_command(self, command: str):
- # if "PASS" not in command:
- # print(f"< {command}")
+ if "PASS" not in command:
+ print(f"< {command}")
self._irc.send((command + "\r\n").encode())
def connect(self):
self._irc.connect((self._server, self._port))
- time.sleep(1.5)
self.send_command(f"PASS oauth:{self._oauth_token}")
self.send_command(f"NICK {self._nick}")
self.send_command(f"CAP REQ :twitch.tv/membership")
message = Message()
value_start = received_msg.find(
- ':',
- received_msg.find(' ', received_msg.find(' ') + 1)
+ ":", received_msg.find(" ", received_msg.find(" ") + 1)
)
if value_start != -1:
message.value = received_msg[value_start:][1:]
- received_msg = received_msg[:value_start - 1]
+ received_msg = received_msg[: value_start - 1]
- parts = received_msg.split(' ')
+ parts = received_msg.split(" ")
for part in parts:
- if part.startswith('@'):
+ if part.startswith("@"):
part = part[1:]
- for tag in part.split(';'):
- tag = tag.split('=')
+ for tag in part.split(";"):
+ tag = tag.split("=")
try:
message.tags[tag[0]] = tag[1]
except IndexError:
message.tags[tag[0]] = ""
- elif part.startswith(':'):
+ elif part.startswith(":"):
part = part[1:]
- if '!' in part:
- message.nick = part.split('!')[0]
+ if "!" in part:
+ message.nick = part.split("!")[0]
elif part in set(command.value for command in Commands):
message.command = Commands(part)
- elif part.startswith('#'):
+ elif part.startswith("#"):
part = part[1:]
message.channel = part
- message.value = ' '.join(message.value.split())
+ message.value = " ".join(message.value.split())
if not message.tags.get("reply-parent-msg-body", None):
+ print(message)
return message
rep = message.tags["reply-parent-msg-body"]
if i + 1 == len(rep):
new_rep += rep[i]
break
- if rep[i+1] == "\\":
+ if rep[i + 1] == "\\":
new_rep += "\\"
- elif rep[i+1] == "s":
+ elif rep[i + 1] == "s":
new_rep += " "
ignore_next = True
- message.tags["reply-parent-msg-body"] = ' '.join(new_rep.split())
+ message.tags["reply-parent-msg-body"] = " ".join(new_rep.split())
+ print(message)
return message
def _handle_message(self, received_msg: str) -> Message: