added some ttv api, changed initial python file name that gets run for each message...
authorGeorgios Atheridis <atheridis@tutamail.com>
Tue, 29 Mar 2022 11:12:28 +0000 (14:12 +0300)
committerGeorgios Atheridis <atheridis@tutamail.com>
Tue, 29 Mar 2022 11:12:28 +0000 (14:12 +0300)
aptbot/__main__.py
aptbot/args_logic.py
aptbot/bot.py
setup.py
test/ttv_api/channel.py [new file with mode: 0644]
ttv_api/__init__.py [new file with mode: 0644]
ttv_api/channel.py [new file with mode: 0644]
ttv_api/emotes.py [new file with mode: 0644]
ttv_api/stream.py [new file with mode: 0644]
ttv_api/users.py [new file with mode: 0644]

index e891b421c29010463defee1dfd3cfcdccfb1c117..0a620760b8c4f5fb15ca9795164054806e7a488e 100644 (file)
@@ -17,28 +17,31 @@ load_dotenv()
 
 
 def loop(bot: aptbot.bot.Bot, modules: dict[str, ModuleType]):
-    update_modules(modules)
+    load_modules(modules)
     while True:
         messages = bot.receive_messages()
         for message in messages:
-            if message.channel:
-                if message.command:
-                    print(
-                        f"#{message.channel} ({message.command.value}) | {message.nick}: {message.value}")
-                try:
-                    method = Thread(
-                        target=modules[message.channel].main,
-                        args=(bot, message, )
-                    )
-                except KeyError:
-                    pass
-                else:
-                    method.daemon = True
-                    method.start()
+            if not message.channel:
+                continue
+            # if message.command:
+            #     print(
+            #         f"#{message.channel} ({message.command.value}) | \
+# {message.nick}: {message.value}"
+            #     )
+            try:
+                method = Thread(
+                    target=modules[message.channel].main,
+                    args=(bot, message, )
+                )
+            except KeyError:
+                pass
+            else:
+                method.daemon = True
+                method.start()
         time.sleep(0.01)
 
 
-def update_modules(modules: dict[str, ModuleType]):
+def load_modules(modules: dict[str, ModuleType]):
     modules.clear()
     channels = filter(lambda x: not x.startswith('.'), os.listdir(CONFIG_PATH))
     channels = list(channels)
@@ -47,20 +50,23 @@ def update_modules(modules: dict[str, ModuleType]):
         account_path = os.path.join(CONFIG_PATH, f"{channel}")
         sys.path.append(account_path)
         module_path = os.path.join(
-            account_path, f"message_interpreter.py")
+            account_path, f"main.py")
         spec = importlib.util.spec_from_file_location(
-            "message_interpreter",
+            "main",
             module_path,
         )
-        if spec and spec.loader:
-            foo = importlib.util.module_from_spec(spec)
-            try:
-                spec.loader.exec_module(foo)
-            except Exception as e:
-                print(traceback.format_exc())
-                print(f"Problem Loading Module: {e}")
-            else:
-                modules[channel] = foo
+        if not spec or not spec.loader:
+            print("Problem loading spec")
+            sys.path.remove(account_path)
+            continue
+        module = importlib.util.module_from_spec(spec)
+        try:
+            spec.loader.exec_module(module)
+        except Exception as e:
+            print(traceback.format_exc())
+            print(f"Problem Loading Module: {e}")
+        else:
+            modules[channel] = module
         sys.path.remove(account_path)
 
 
@@ -75,6 +81,7 @@ def listener():
     NICK = os.getenv("APTBOT_NICK")
     OAUTH = os.getenv("APTBOT_OAUTH")
     CLIENT_ID = os.getenv("APTBOT_CLIENT_ID")
+    print(f"NICK: {NICK}")
     if NICK and OAUTH and CLIENT_ID:
         bot = aptbot.bot.Bot(NICK, OAUTH, CLIENT_ID)
     else:
@@ -107,7 +114,7 @@ def listener():
             elif aptbot.args_logic.Commands.KILL.value in command:
                 sys.exit()
             elif aptbot.args_logic.Commands.UPDATE.value in command:
-                update_modules(modules)
+                load_modules(modules)
             elif aptbot.args_logic.Commands.PART.value in command:
                 bot.leave_channel(channel)
 
index c6b8526d592d05838e9fc90abe9640346e613335..da9cfafb3250f44520db076b089a90d8c0a01a6e 100644 (file)
@@ -23,11 +23,11 @@ def add_account(s: socket.socket, acc: str):
     os.makedirs(account_path, exist_ok=True)
 
     # print(os.listdir("."))
-    # shutil.copy("message_interpreter.py", account_path)
+    # shutil.copy("main.py", account_path)
     try:
-        f = open(os.path.join(account_path, "message_interpreter.py"), "r")
+        f = open(os.path.join(account_path, "main.py"), "r")
     except FileNotFoundError:
-        f = open(os.path.join(account_path, "message_interpreter.py"), "a")
+        f = open(os.path.join(account_path, "main.py"), "a")
         f.write("""from aptbot.bot import Bot, Message, Commands
 def main(bot, message: Message):
     pass""")
index 5b5c0e088e63dab051fc1244682ab7913c6c71c1..85b018ce89f5182e407161d62e8d0cd5f2a3062f 100644 (file)
@@ -46,7 +46,7 @@ class Bot:
 
     def connect(self):
         self._irc.connect((self._server, self._port))
-        time.sleep(3)
+        time.sleep(1.5)
         self.send_command(f"PASS oauth:{self._oauth_token}")
         self.send_command(f"NICK {self._nick}")
         self.send_command(f"CAP REQ :twitch.tv/membership")
@@ -72,12 +72,14 @@ class Bot:
                     f"#{channel} ({Commands.PRIVMSG.value}) | {self._nick}: {t}")
                 self.send_command(
                     f"{Commands.PRIVMSG.value} #{channel} :{t}")
+                time.sleep(1.5)
         else:
             print(f"#{channel} ({Commands.PRIVMSG.value}) | {self._nick}: {text}")
             self.send_command(f"{Commands.PRIVMSG.value} #{channel} :{text}")
 
     @staticmethod
     def parse_message(received_msg: str) -> Message:
+        print(received_msg)
         message = Message()
 
         value_start = received_msg.find(
@@ -121,26 +123,6 @@ class Bot:
 
     def receive_messages(self) -> list[Message]:
         messages = []
-        # i = 0
-        # while i < 5:
-        #     if i:
-        #         try:
-        #             self.connect()
-        #             self.join_channels(self.connected_channels)
-        #         except OSError as e:
-        #             print(f"Connection failed {e}")
-        #     try:
-        #         received_msgs = self.irc.recv(2048).decode()
-        #     except ConnectionResetError as e:
-        #         print(f"There was an error connecting with error {e}")
-        #         print("Trying to connect again")
-        #         time.sleep(2**i+1)
-        #         i += 1
-        #     else:
-        #         print("broke")
-        #         break
-        # else:
-        #     sys.exit(1)
         received_msgs = self._irc.recv(2048).decode()
         for received_msgs in received_msgs.split("\r\n"):
             messages.append(self._handle_message(received_msgs))
index 6171504dfd2433661ca7b4230159435fae54b222..4ee98d9a1d4b059520c8adac368282798ed0b771 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -6,7 +6,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
 
 setuptools.setup(
     name="aptbot",
-    version="0.0.1",
+    version="0.0.2",
     author="Georgios Atheridis",
     author_email="atheridis@tutamail.com",
     description="A chatbot for twitch.tv",
@@ -18,12 +18,13 @@ setuptools.setup(
     ],
     packages=setuptools.find_packages(),
     entry_points={
-        'console_scripts': [
-            'aptbot=aptbot.__main__:main',
+        "console_scripts": [
+            "aptbot=aptbot.__main__:main",
         ],
     },
     install_requires=[
-        'dotenv',
+        "python-dotenv",
+        "urllib3",
     ],
     python_requires=">=3.7",
 )
diff --git a/test/ttv_api/channel.py b/test/ttv_api/channel.py
new file mode 100644 (file)
index 0000000..cffb1f5
--- /dev/null
@@ -0,0 +1,28 @@
+import ttv_api.channel
+import ttv_api.users
+
+
+def test_get_channel():
+    channel = ttv_api.channel.get_channels("141981764")
+    if isinstance(channel, list):
+        assert channel[0].broadcaster_id == "141981764"
+        assert channel[0].broadcaster_login == "twitchdev"
+    else:
+        raise Exception("Didn't find channel")
+
+
+def test_get_user():
+    user = ttv_api.users.get_users(user_logins=[
+                                   "skgyo1238r923hrugo", "f43uif3ef", "fweuifhewuidw"])
+    if isinstance(user, list):
+        print(user)
+        # assert user[0].user_id == "141981764"
+        # assert user[0].login == "twitchdev"
+    else:
+        raise Exception("Didn't find channel")
+
+
+if __name__ == "__main__":
+    test_get_channel()
+    test_get_user()
+    print("Everytinhg passed")
diff --git a/ttv_api/__init__.py b/ttv_api/__init__.py
new file mode 100644 (file)
index 0000000..3f55784
--- /dev/null
@@ -0,0 +1,32 @@
+import os
+import urllib3
+import json
+from datetime import datetime
+from typing import Optional, Union
+from dataclasses import dataclass
+from enum import Enum
+# from dotenv import load_dotenv
+
+# load_dotenv()
+
+NICK = os.getenv("APTBOT_NICK")
+OAUTH = os.getenv("APTBOT_OAUTH")
+CLIENT_ID = os.getenv("APTBOT_CLIENT_ID")
+
+HEADER = {
+    "Authorization": f"Bearer {OAUTH}",
+    "Client-Id": f"{CLIENT_ID}",
+    "Content-Type": "application/json",
+}
+
+BASE_URL = "https://api.twitch.tv/helix/"
+
+
+class URL(Enum):
+    ads = BASE_URL + "channels/commercial"
+    analytics = BASE_URL + "analytics"
+    channels = BASE_URL + "channels"
+    emotes_channel = BASE_URL + "chat/emotes"
+    emotes_global = BASE_URL + "chat/emotes/global"
+    streams = BASE_URL + "streams"
+    users = BASE_URL + "users"
diff --git a/ttv_api/channel.py b/ttv_api/channel.py
new file mode 100644 (file)
index 0000000..260f2d7
--- /dev/null
@@ -0,0 +1,46 @@
+from ttv_api import *
+
+
+@dataclass
+class Channel:
+    broadcaster_id: str
+    broadcaster_login: str
+    broadcaster_name: str
+    game_name: str
+    game_id: str
+    broadcaster_language: str
+    title: str
+    delay: int
+
+
+def get_channels(*channel_ids: str) -> Optional[list[Channel]]:
+    params = "?"
+    for channel_id in channel_ids:
+        params += f"broadcaster_id={channel_id}&"
+
+    http = urllib3.PoolManager()
+    r = http.request(
+        "GET",
+        URL.channels.value + params,
+        headers=HEADER,
+    )
+    if r.status != 200:
+        return None
+
+    data = json.loads(r.data.decode("utf-8"))["data"]
+    channels = []
+
+    for channel in data:
+        channels.append(
+            Channel(
+                channel["broadcaster_id"],
+                channel["broadcaster_login"],
+                channel["broadcaster_name"],
+                channel["game_name"],
+                channel["game_id"],
+                channel["broadcaster_language"],
+                channel["title"],
+                channel["delay"],
+            )
+        )
+    return channels
diff --git a/ttv_api/emotes.py b/ttv_api/emotes.py
new file mode 100644 (file)
index 0000000..c1fe6ce
--- /dev/null
@@ -0,0 +1,81 @@
+from ttv_api import *
+
+
+@dataclass
+class Emote:
+    emote_id: str
+    name: str
+    tier: Optional[str]
+    emote_type: Optional[str]
+    emote_set_id: Optional[str]
+    emote_format: list[str]
+    scale: list[str]
+    theme_mode: list[str]
+
+
+def get_global_emotes() -> Optional[list[Emote]]:
+    http = urllib3.PoolManager()
+    r = http.request(
+        "GET",
+        URL.emotes_channel.value,
+        headers=HEADER,
+    )
+
+    if r.status != 200:
+        return None
+
+    data = json.loads(r.data.decode("utf-8"))["data"]
+    emotes = []
+
+    for emote in data:
+        emotes.append(
+            Emote(
+                emote["id"],
+                emote["name"],
+                None,
+                None,
+                None,
+                emote["format"],
+                emote["scale"],
+                emote["theme_mode"],
+            )
+        )
+
+    return emotes
+
+
+def get_channel_emotes(channel_id: str) -> Optional[list[Emote]]:
+    params = f"?broadcaster_id={channel_id}"
+
+    http = urllib3.PoolManager()
+    r = http.request(
+        "GET",
+        URL.emotes_channel.value + params,
+        headers=HEADER,
+    )
+
+    if r.status != 200:
+        return None
+
+    data = json.loads(r.data.decode("utf-8"))["data"]
+    emotes = []
+
+    for emote in data:
+        emotes.append(
+            Emote(
+                emote["id"],
+                emote["name"],
+                emote["tier"],
+                emote["emote_type"],
+                emote["emote_set_it"],
+                emote["format"],
+                emote["scale"],
+                emote["theme_mode"],
+            )
+        )
+
+    return emotes
+
+
+def get_emote_image(emote_id: str, emote_format: str, theme_mode: str, scale: str):
+    return f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/{emote_format}/{theme_mode}/{scale}"
diff --git a/ttv_api/stream.py b/ttv_api/stream.py
new file mode 100644 (file)
index 0000000..3a5727e
--- /dev/null
@@ -0,0 +1,82 @@
+from ttv_api import *
+
+
+@dataclass
+class Stream:
+    stream_id: str
+    user_id: str
+    login: str
+    display_name: str
+    game_id: str
+    game_name: str
+    stream_type: str
+    stream_title: str
+    viewer_count: int
+    started_at: datetime
+    language: str
+    thumbnail_url: str
+    tag_ids: str
+    is_mature: bool
+
+
+def get_streams(
+    user_ids: list[str] = [],
+    user_logins: list[str] = [],
+    game_ids: list[str] = [],
+    languages: list[str] = [],
+    max_streams: int = 100,
+) -> Optional[list[Stream]]:
+
+    streams = []
+    params = "?first=100&"
+    for user_id in user_ids:
+        params += f"user_id={user_id}&"
+    for user_login in user_logins:
+        params += f"user_login={user_login}&"
+    for game_id in game_ids:
+        params += f"game_id={game_id}&"
+    for language in languages:
+        params += f"language={language}&"
+
+    while True:
+        http = urllib3.PoolManager()
+        r = http.request(
+            "GET",
+            URL.streams.value + params,
+            headers=HEADER,
+        )
+        if r.status != 200:
+            return None
+
+        requested_data = json.loads(r.data.decode("utf-8"))
+        data = requested_data["data"]
+
+        for stream in data:
+            dt = datetime.strptime(stream["started_at"], "%Y-%m-%dT%H:%M:%SZ")
+            streams.append(
+                Stream(
+                    stream["id"],
+                    stream["user_id"],
+                    stream["user_login"],
+                    stream["user_name"],
+                    stream["game_id"],
+                    stream["game_name"],
+                    stream["type"],
+                    stream["title"],
+                    stream["viewer_count"],
+                    dt,
+                    stream["language"],
+                    stream["thumbnail_url"],
+                    stream["tag_ids"],
+                    stream["is_mature"],
+                )
+            )
+            if len(streams) >= max_streams:
+                return streams
+
+        try:
+            cursor = requested_data["pagination"]["cursor"]
+        except KeyError:
+            return streams
+        else:
+            params = f"?first=100&after={cursor}"
diff --git a/ttv_api/users.py b/ttv_api/users.py
new file mode 100644 (file)
index 0000000..2297018
--- /dev/null
@@ -0,0 +1,53 @@
+from ttv_api import *
+
+
+@dataclass
+class User:
+    user_id: str
+    login: str
+    display_name: str
+    user_type: str
+    broadcaster_type: str
+    description: str
+    profile_image_url: str
+    offline_image_url: str
+    view_count: int
+    created_at: datetime
+
+
+def get_users(user_ids: list[str] = [], user_logins: list[str] = []) -> Optional[list[User]]:
+    params = "?"
+    for user_id in user_ids:
+        params += f"id={user_id}&"
+    for user_login in user_logins:
+        params += f"login={user_login}&"
+
+    http = urllib3.PoolManager()
+    r = http.request(
+        "GET",
+        URL.users.value + params,
+        headers=HEADER,
+    )
+    if r.status != 200:
+        return None
+
+    data = json.loads(r.data.decode("utf-8"))["data"]
+    users = []
+
+    for user in data:
+        dt = datetime.strptime(user["created_at"], "%Y-%m-%dT%H:%M:%SZ")
+        users.append(
+            User(
+                user["id"],
+                user["login"],
+                user["display_name"],
+                user["type"],
+                user["broadcaster_type"],
+                user["description"],
+                user["profile_image_url"],
+                user["offline_image_url"],
+                user["view_count"],
+                dt,
+            )
+        )
+    return users