def loop(bot: aptbot.bot.Bot, modules: dict[str, ModuleType]):
- update_modules(modules)
+ load_modules(modules)
while True:
messages = bot.receive_messages()
for message in messages:
- if message.channel:
- if message.command:
- print(
- f"#{message.channel} ({message.command.value}) | {message.nick}: {message.value}")
- try:
- method = Thread(
- target=modules[message.channel].main,
- args=(bot, message, )
- )
- except KeyError:
- pass
- else:
- method.daemon = True
- method.start()
+ if not message.channel:
+ continue
+ # if message.command:
+ # print(
+ # f"#{message.channel} ({message.command.value}) | \
+# {message.nick}: {message.value}"
+ # )
+ try:
+ method = Thread(
+ target=modules[message.channel].main,
+ args=(bot, message, )
+ )
+ except KeyError:
+ pass
+ else:
+ method.daemon = True
+ method.start()
time.sleep(0.01)
-def update_modules(modules: dict[str, ModuleType]):
+def load_modules(modules: dict[str, ModuleType]):
modules.clear()
channels = filter(lambda x: not x.startswith('.'), os.listdir(CONFIG_PATH))
channels = list(channels)
account_path = os.path.join(CONFIG_PATH, f"{channel}")
sys.path.append(account_path)
module_path = os.path.join(
- account_path, f"message_interpreter.py")
+ account_path, f"main.py")
spec = importlib.util.spec_from_file_location(
- "message_interpreter",
+ "main",
module_path,
)
- if spec and spec.loader:
- foo = importlib.util.module_from_spec(spec)
- try:
- spec.loader.exec_module(foo)
- except Exception as e:
- print(traceback.format_exc())
- print(f"Problem Loading Module: {e}")
- else:
- modules[channel] = foo
+ if not spec or not spec.loader:
+ print("Problem loading spec")
+ sys.path.remove(account_path)
+ continue
+ module = importlib.util.module_from_spec(spec)
+ try:
+ spec.loader.exec_module(module)
+ except Exception as e:
+ print(traceback.format_exc())
+ print(f"Problem Loading Module: {e}")
+ else:
+ modules[channel] = module
sys.path.remove(account_path)
NICK = os.getenv("APTBOT_NICK")
OAUTH = os.getenv("APTBOT_OAUTH")
CLIENT_ID = os.getenv("APTBOT_CLIENT_ID")
+ print(f"NICK: {NICK}")
if NICK and OAUTH and CLIENT_ID:
bot = aptbot.bot.Bot(NICK, OAUTH, CLIENT_ID)
else:
elif aptbot.args_logic.Commands.KILL.value in command:
sys.exit()
elif aptbot.args_logic.Commands.UPDATE.value in command:
- update_modules(modules)
+ load_modules(modules)
elif aptbot.args_logic.Commands.PART.value in command:
bot.leave_channel(channel)
os.makedirs(account_path, exist_ok=True)
# print(os.listdir("."))
- # shutil.copy("message_interpreter.py", account_path)
+ # shutil.copy("main.py", account_path)
try:
- f = open(os.path.join(account_path, "message_interpreter.py"), "r")
+ f = open(os.path.join(account_path, "main.py"), "r")
except FileNotFoundError:
- f = open(os.path.join(account_path, "message_interpreter.py"), "a")
+ f = open(os.path.join(account_path, "main.py"), "a")
f.write("""from aptbot.bot import Bot, Message, Commands
def main(bot, message: Message):
pass""")
def connect(self):
self._irc.connect((self._server, self._port))
- time.sleep(3)
+ time.sleep(1.5)
self.send_command(f"PASS oauth:{self._oauth_token}")
self.send_command(f"NICK {self._nick}")
self.send_command(f"CAP REQ :twitch.tv/membership")
f"#{channel} ({Commands.PRIVMSG.value}) | {self._nick}: {t}")
self.send_command(
f"{Commands.PRIVMSG.value} #{channel} :{t}")
+ time.sleep(1.5)
else:
print(f"#{channel} ({Commands.PRIVMSG.value}) | {self._nick}: {text}")
self.send_command(f"{Commands.PRIVMSG.value} #{channel} :{text}")
@staticmethod
def parse_message(received_msg: str) -> Message:
+ print(received_msg)
message = Message()
value_start = received_msg.find(
def receive_messages(self) -> list[Message]:
messages = []
- # i = 0
- # while i < 5:
- # if i:
- # try:
- # self.connect()
- # self.join_channels(self.connected_channels)
- # except OSError as e:
- # print(f"Connection failed {e}")
- # try:
- # received_msgs = self.irc.recv(2048).decode()
- # except ConnectionResetError as e:
- # print(f"There was an error connecting with error {e}")
- # print("Trying to connect again")
- # time.sleep(2**i+1)
- # i += 1
- # else:
- # print("broke")
- # break
- # else:
- # sys.exit(1)
received_msgs = self._irc.recv(2048).decode()
for received_msgs in received_msgs.split("\r\n"):
messages.append(self._handle_message(received_msgs))
setuptools.setup(
name="aptbot",
- version="0.0.1",
+ version="0.0.2",
author="Georgios Atheridis",
author_email="atheridis@tutamail.com",
description="A chatbot for twitch.tv",
],
packages=setuptools.find_packages(),
entry_points={
- 'console_scripts': [
- 'aptbot=aptbot.__main__:main',
+ "console_scripts": [
+ "aptbot=aptbot.__main__:main",
],
},
install_requires=[
- 'dotenv',
+ "python-dotenv",
+ "urllib3",
],
python_requires=">=3.7",
)
--- /dev/null
+import ttv_api.channel
+import ttv_api.users
+
+
+def test_get_channel():
+ channel = ttv_api.channel.get_channels("141981764")
+ if isinstance(channel, list):
+ assert channel[0].broadcaster_id == "141981764"
+ assert channel[0].broadcaster_login == "twitchdev"
+ else:
+ raise Exception("Didn't find channel")
+
+
+def test_get_user():
+ user = ttv_api.users.get_users(user_logins=[
+ "skgyo1238r923hrugo", "f43uif3ef", "fweuifhewuidw"])
+ if isinstance(user, list):
+ print(user)
+ # assert user[0].user_id == "141981764"
+ # assert user[0].login == "twitchdev"
+ else:
+ raise Exception("Didn't find channel")
+
+
+if __name__ == "__main__":
+ test_get_channel()
+ test_get_user()
+ print("Everytinhg passed")
--- /dev/null
+import os
+import urllib3
+import json
+from datetime import datetime
+from typing import Optional, Union
+from dataclasses import dataclass
+from enum import Enum
+# from dotenv import load_dotenv
+
+# load_dotenv()
+
+NICK = os.getenv("APTBOT_NICK")
+OAUTH = os.getenv("APTBOT_OAUTH")
+CLIENT_ID = os.getenv("APTBOT_CLIENT_ID")
+
+HEADER = {
+ "Authorization": f"Bearer {OAUTH}",
+ "Client-Id": f"{CLIENT_ID}",
+ "Content-Type": "application/json",
+}
+
+BASE_URL = "https://api.twitch.tv/helix/"
+
+
+class URL(Enum):
+ ads = BASE_URL + "channels/commercial"
+ analytics = BASE_URL + "analytics"
+ channels = BASE_URL + "channels"
+ emotes_channel = BASE_URL + "chat/emotes"
+ emotes_global = BASE_URL + "chat/emotes/global"
+ streams = BASE_URL + "streams"
+ users = BASE_URL + "users"
--- /dev/null
+from ttv_api import *
+
+
+@dataclass
+class Channel:
+ broadcaster_id: str
+ broadcaster_login: str
+ broadcaster_name: str
+ game_name: str
+ game_id: str
+ broadcaster_language: str
+ title: str
+ delay: int
+
+
+def get_channels(*channel_ids: str) -> Optional[list[Channel]]:
+ params = "?"
+ for channel_id in channel_ids:
+ params += f"broadcaster_id={channel_id}&"
+
+ http = urllib3.PoolManager()
+ r = http.request(
+ "GET",
+ URL.channels.value + params,
+ headers=HEADER,
+ )
+ if r.status != 200:
+ return None
+
+ data = json.loads(r.data.decode("utf-8"))["data"]
+ channels = []
+
+ for channel in data:
+ channels.append(
+ Channel(
+ channel["broadcaster_id"],
+ channel["broadcaster_login"],
+ channel["broadcaster_name"],
+ channel["game_name"],
+ channel["game_id"],
+ channel["broadcaster_language"],
+ channel["title"],
+ channel["delay"],
+ )
+ )
+ return channels
--- /dev/null
+from ttv_api import *
+
+
+@dataclass
+class Emote:
+ emote_id: str
+ name: str
+ tier: Optional[str]
+ emote_type: Optional[str]
+ emote_set_id: Optional[str]
+ emote_format: list[str]
+ scale: list[str]
+ theme_mode: list[str]
+
+
+def get_global_emotes() -> Optional[list[Emote]]:
+ http = urllib3.PoolManager()
+ r = http.request(
+ "GET",
+ URL.emotes_channel.value,
+ headers=HEADER,
+ )
+
+ if r.status != 200:
+ return None
+
+ data = json.loads(r.data.decode("utf-8"))["data"]
+ emotes = []
+
+ for emote in data:
+ emotes.append(
+ Emote(
+ emote["id"],
+ emote["name"],
+ None,
+ None,
+ None,
+ emote["format"],
+ emote["scale"],
+ emote["theme_mode"],
+ )
+ )
+
+ return emotes
+
+
+def get_channel_emotes(channel_id: str) -> Optional[list[Emote]]:
+ params = f"?broadcaster_id={channel_id}"
+
+ http = urllib3.PoolManager()
+ r = http.request(
+ "GET",
+ URL.emotes_channel.value + params,
+ headers=HEADER,
+ )
+
+ if r.status != 200:
+ return None
+
+ data = json.loads(r.data.decode("utf-8"))["data"]
+ emotes = []
+
+ for emote in data:
+ emotes.append(
+ Emote(
+ emote["id"],
+ emote["name"],
+ emote["tier"],
+ emote["emote_type"],
+ emote["emote_set_it"],
+ emote["format"],
+ emote["scale"],
+ emote["theme_mode"],
+ )
+ )
+
+ return emotes
+
+
+def get_emote_image(emote_id: str, emote_format: str, theme_mode: str, scale: str):
+ return f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/{emote_format}/{theme_mode}/{scale}"
--- /dev/null
+from ttv_api import *
+
+
+@dataclass
+class Stream:
+ stream_id: str
+ user_id: str
+ login: str
+ display_name: str
+ game_id: str
+ game_name: str
+ stream_type: str
+ stream_title: str
+ viewer_count: int
+ started_at: datetime
+ language: str
+ thumbnail_url: str
+ tag_ids: str
+ is_mature: bool
+
+
+def get_streams(
+ user_ids: list[str] = [],
+ user_logins: list[str] = [],
+ game_ids: list[str] = [],
+ languages: list[str] = [],
+ max_streams: int = 100,
+) -> Optional[list[Stream]]:
+
+ streams = []
+ params = "?first=100&"
+ for user_id in user_ids:
+ params += f"user_id={user_id}&"
+ for user_login in user_logins:
+ params += f"user_login={user_login}&"
+ for game_id in game_ids:
+ params += f"game_id={game_id}&"
+ for language in languages:
+ params += f"language={language}&"
+
+ while True:
+ http = urllib3.PoolManager()
+ r = http.request(
+ "GET",
+ URL.streams.value + params,
+ headers=HEADER,
+ )
+ if r.status != 200:
+ return None
+
+ requested_data = json.loads(r.data.decode("utf-8"))
+ data = requested_data["data"]
+
+ for stream in data:
+ dt = datetime.strptime(stream["started_at"], "%Y-%m-%dT%H:%M:%SZ")
+ streams.append(
+ Stream(
+ stream["id"],
+ stream["user_id"],
+ stream["user_login"],
+ stream["user_name"],
+ stream["game_id"],
+ stream["game_name"],
+ stream["type"],
+ stream["title"],
+ stream["viewer_count"],
+ dt,
+ stream["language"],
+ stream["thumbnail_url"],
+ stream["tag_ids"],
+ stream["is_mature"],
+ )
+ )
+ if len(streams) >= max_streams:
+ return streams
+
+ try:
+ cursor = requested_data["pagination"]["cursor"]
+ except KeyError:
+ return streams
+ else:
+ params = f"?first=100&after={cursor}"
--- /dev/null
+from ttv_api import *
+
+
+@dataclass
+class User:
+ user_id: str
+ login: str
+ display_name: str
+ user_type: str
+ broadcaster_type: str
+ description: str
+ profile_image_url: str
+ offline_image_url: str
+ view_count: int
+ created_at: datetime
+
+
+def get_users(user_ids: list[str] = [], user_logins: list[str] = []) -> Optional[list[User]]:
+ params = "?"
+ for user_id in user_ids:
+ params += f"id={user_id}&"
+ for user_login in user_logins:
+ params += f"login={user_login}&"
+
+ http = urllib3.PoolManager()
+ r = http.request(
+ "GET",
+ URL.users.value + params,
+ headers=HEADER,
+ )
+ if r.status != 200:
+ return None
+
+ data = json.loads(r.data.decode("utf-8"))["data"]
+ users = []
+
+ for user in data:
+ dt = datetime.strptime(user["created_at"], "%Y-%m-%dT%H:%M:%SZ")
+ users.append(
+ User(
+ user["id"],
+ user["login"],
+ user["display_name"],
+ user["type"],
+ user["broadcaster_type"],
+ user["description"],
+ user["profile_image_url"],
+ user["offline_image_url"],
+ user["view_count"],
+ dt,
+ )
+ )
+ return users