FROM
auto_messages
LEFT JOIN auto_message_values USING (name)
+ ORDER BY
+ last_used ASC
"""
)
- fetched = c.fetchall()
+ fetched = c.fetchone()
if not fetched:
conn.close()
return
- for auto_message in fetched:
- name, cooldown, end_time, last_used, value = auto_message
- print(auto_message)
- if time.time() < last_used + cooldown:
- continue
- if time.time() > start_stream_ts + end_time and end_time != 0:
- continue
- if value:
- tools.smart_privmsg.send(bot, message, value)
- else:
+ name, cooldown, end_time, last_used, value = fetched
+ print(fetched)
+ if time.time() < last_used + cooldown:
+ return
+ if time.time() > start_stream_ts + end_time and end_time != 0:
+ return
+ if value:
+ tools.smart_privmsg.send(bot, message, value)
+ else:
+ try:
auto_message_modules[name].main(bot, message)
- c.execute(
- "REPLACE INTO auto_messages VALUES (?, ?, ?, ?)",
- (
- name,
- cooldown,
- end_time,
- int(time.time()),
+ except KeyError:
+ c.execute(
+ """
+ DELETE FROM
+ auto_messages
+ WHERE
+ name = ?
+ """,
+ (name, )
)
+ conn.commit()
+ return
+
+ c.execute(
+ "UPDATE auto_messages SET last_used = ? WHERE name = ?",
+ (
+ int(time.time()),
+ name,
)
- conn.commit()
- time.sleep(1.5)
+ )
+ conn.commit()
conn.close()
conn = sqlite3.connect(os.path.join(PATH, "database.db"))
c = conn.cursor()
- command = message.value.split(' ')[0]
+ try:
+ replied_message = message.tags["reply-parent-msg-body"]
+ except KeyError:
+ replied_message = None
+
+ if replied_message:
+ command = message.value.split(' ')[1]
+ else:
+ command = message.value.split(' ')[0]
prefix = command[0]
command = command[1:]
user_id = message.tags["user-id"]
value,
commands.user_cooldown,
commands.global_cooldown,
- CASE WHEN cooldowns.user_cooldown >= (commands.last_used + commands.global_cooldown) THEN
- cooldowns.user_cooldown
- ELSE
- (commands.last_used + commands.global_cooldown)
+ CASE
+ WHEN ? <= 10 THEN
+ 0
+ WHEN cooldowns.user_cooldown >= (commands.last_used + commands.global_cooldown) THEN
+ cooldowns.user_cooldown
+ ELSE
+ (commands.last_used + commands.global_cooldown)
END AS avail_time
FROM
AND permission >= ?
""",
(
+ user_perm,
user_id,
command,
prefix,
)
)
fetched = c.fetchall()
- print(fetched)
if not fetched:
conn.close()
return
+++ /dev/null
-from aptbot.bot import Message, Commands, Bot
-
-COOLDOWN = 25 * 60
-END_TIME = 2 * 60 * 60
-
-
-def main(bot: Bot, message: Message):
- msg = "ELLO it's me Murphy. I managed to find my way in to twitch chat PepeLaugh"
- bot.send_privmsg(message.channel, msg)
conn = sqlite3.connect(os.path.join(PATH, "database.db"))
c = conn.cursor()
+ c.execute(
+ "SELECT value FROM commands LEFT JOIN command_values USING(command) WHERE command = ? AND prefix = ?",
+ (
+ command_name,
+ command_prefix,
+ )
+ )
+ try:
+ if not c.fetchone()[0]:
+ bot.send_privmsg(
+ message.channel,
+ f"The command {command_prefix}{command_name} already exists"
+ )
+ return
+ except TypeError:
+ pass
try:
c.execute(
"INSERT INTO commands VALUES (?, ?, ?, ?, ?, ?, ?)",
)
)
except sqlite3.IntegrityError:
- bot.send_privmsg(
- message.channel,
- f"The command {command_name} already exists."
- )
+ pass
except Exception as e:
bot.send_privmsg(
message.channel,
f"There was an error adding the command: {e}"
)
- else:
- c.execute(
- "INSERT INTO command_values VALUES (?, ?)",
- (
- command_name,
- command_value,
- )
- )
- bot.send_privmsg(
- message.channel,
- f"Successfully added {command_name}."
+ conn.close()
+ return
+ c.execute(
+ "INSERT INTO command_values VALUES (?, ?)",
+ (
+ command_name,
+ command_value,
)
+ )
+ bot.send_privmsg(
+ message.channel,
+ f"Successfully added {command_name}."
+ )
conn.commit()
conn.close()
--- /dev/null
+from aptbot.bot import Message, Commands, Bot
+import tools.smart_privmsg
+import random
+
+PERMISSION = 99
+PREFIX = '?'
+DESCRIPTION = r"Tosses a coin, there's a 50% chance it's heads and a 50% chance it's tails"
+USER_COOLDOWN = 10
+GLOBAL_COOLDOWN = 5
+
+
+def main(bot: Bot, message: Message):
+ r = random.random()
+ if r < 0.02:
+ tools.smart_privmsg.send(bot, message, f"the coin landed on it's side, try again.", reply=message.tags["id"])
+ elif r < 0.51:
+ tools.smart_privmsg.send(bot, message, f"heads", reply=message.tags["id"])
+ else:
+ tools.smart_privmsg.send(bot, message, f"tails", reply=message.tags["id"])
+
+
commands.append(f"{command[0]}{command[1]}")
commands = ' '.join(commands)
- print(f"commands is: {commands}")
tools.smart_privmsg.send(bot, message, commands)
--- /dev/null
+from aptbot.bot import Message, Commands, Bot
+
+PERMISSION = 10
+PREFIX = '\\'
+DESCRIPTION = "Reply to a message with \\delete to delete that message"
+USER_COOLDOWN = 0
+GLOBAL_COOLDOWN = 0
+
+def main(bot: Bot, message: Message):
+ try:
+ replied_msg_id = message.tags["reply-parent-msg-id"]
+ except KeyError:
+ return
+ delete = f"/delete {replied_msg_id}"
+ bot.send_privmsg(message.channel, delete)
+
conn = sqlite3.connect(os.path.join(PATH, "database.db"))
c = conn.cursor()
c.execute(
- "SELECT value FROM commands WHERE command = ? AND prefix = ?",
+ "SELECT value FROM commands LEFT JOIN command_values USING(command) WHERE command = ? AND prefix = ?",
(
command_name,
command_prefix,
try:
c.execute(
- "UPDATE commands SET value = ? WHERE command = ?",
+ "UPDATE command_values SET value = ? WHERE command = ?",
(
command_value,
command_name,
conn = sqlite3.connect(os.path.join(PATH, "database.db"))
c = conn.cursor()
c.execute(
- "SELECT value FROM commands WHERE command = ? AND prefix = ?",
+ "SELECT value FROM command_values WHERE command = ?",
(
command_name,
- command_prefix,
)
)
command_path = os.path.join(COMMANDS_PATH, f"{command_name}.py")
def main(bot: Bot, message: Message):
- msg = ' '.join(message.value.split(' ')[1:])
+ try:
+ replied_message = message.tags["reply-parent-msg-body"]
+ except KeyError:
+ replied_message = None
+ if replied_message:
+ msg = replied_message
+ else:
+ msg = ' '.join(message.value.split(' ')[1:])
new_msg = ""
while len(new_msg) + len(msg) < MAX_LENGTH:
new_msg += msg + " "
def main(bot: Bot, message: Message):
- msg = ' '.join(message.value.split(' ')[1:])
+ try:
+ replied_message = message.tags["reply-parent-msg-body"]
+ except KeyError:
+ replied_message = None
+ if replied_message:
+ msg = replied_message
+ replied_msg_id = message.tags["reply-parent-msg-id"]
+ else:
+ msg = ' '.join(message.value.split(' ')[1:])
+ replied_msg_id = None
trans = scripts.translator.translate(msg)
- tools.smart_privmsg.send(bot, message, trans)
+ print(trans)
+ tools.smart_privmsg.send(bot, message, trans, reply=replied_msg_id)
+from aptbot.bot import Message, Commands
import sqlite3
import os
import ttv_api.users
print(e)
admin_id = ttv_api.users.get_users(user_logins=["skgyorugo"])
+ aptbot_id = ttv_api.users.get_users(user_logins=["murphyai"])
broadcaster_id = ttv_api.users.get_users(user_logins=[streamer_login])
if admin_id:
try:
(admin_id[0].user_id, 0))
except sqlite3.IntegrityError as e:
print(e)
+ if aptbot_id:
+ try:
+ c.execute("INSERT INTO users VALUES (?, ?)",
+ (aptbot_id[0].user_id, 0))
+ except sqlite3.IntegrityError as e:
+ print(e)
if broadcaster_id:
try:
c.execute("INSERT INTO users VALUES (?, ?)",
conn.commit()
conn.close()
+def add_message_to_chat_history(message: Message):
+ if message.command != Commands.PRIVMSG:
+ return
+ conn = sqlite3.connect(os.path.join(PATH, "chat_history.db"))
+ c = conn.cursor()
+
+ try:
+ bits = message.tags["bits"]
+ except KeyError:
+ bits = None
+ try:
+ rp_display_name = message.tags["reply-parent-display-name"]
+ rp_msg_body = message.tags["reply-parent-msg-body"]
+ rp_msg_id = message.tags["reply-parent-msg-id"]
+ rp_user_id = int( message.tags["reply-parent-user-id"] )
+ rp_user_login = message.tags["reply-parent-user-login"]
+ except KeyError:
+ rp_display_name = None
+ rp_msg_body = None
+ rp_msg_id = None
+ rp_user_id = None
+ rp_user_login = None
+
+ c.execute(
+ """
+ INSERT INTO chat (
+ "id",
+ "nick",
+ "channel",
+ "message",
+ "tmi-sent-ts",
+ "badge-info",
+ "badges",
+ "bits",
+ "color",
+ "display-name",
+ "first-msg",
+ "mod",
+ "room-id",
+ "user-id",
+ "user-type",
+ "turbo",
+ "subscriber",
+ "reply-parent-display-name",
+ "reply-parent-msg-body",
+ "reply-parent-msg-id",
+ "reply-parent-user-id",
+ "reply-parent-user-login"
+ ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
+ """,
+ (
+ message.tags["id"],
+ message.nick,
+ message.channel,
+ message.value,
+ int(message.tags["tmi-sent-ts"]),
+ message.tags["badge-info"],
+ message.tags["badges"],
+ bits,
+ message.tags["color"],
+ message.tags["display-name"],
+ int(message.tags["first-msg"]),
+ int( message.tags["mod"] ),
+ int( message.tags["room-id"] ),
+ int( message.tags["user-id"] ),
+ message.tags["user-type"],
+ int( message.tags["turbo"] ),
+ int( message.tags["subscriber"] ),
+ rp_display_name,
+ rp_msg_body,
+ rp_msg_id,
+ rp_user_id,
+ rp_user_login,
+ )
+ )
+ conn.commit()
+ conn.close()
+
+def create_chat_history_database():
+ conn = sqlite3.connect(os.path.join(PATH, "chat_history.db"))
+ c = conn.cursor()
+
+ c.execute(
+ """
+ CREATE TABLE IF NOT EXISTS "chat" (
+ "id" TEXT NOT NULL,
+ "nick" TEXT NOT NULL,
+ "channel" TEXT NOT NULL,
+ "message" TEXT NOT NULL,
+ "tmi-sent-ts" INTEGER NOT NULL,
+ "badge-info" TEXT NOT NULL,
+ "badges" TEXT NOT NULL,
+ "bits" TEXT,
+ "color" TEXT NOT NULL,
+ "display-name" TEXT NOT NULL,
+ "first-msg" INTEGER NOT NULL,
+ "mod" INTEGER NOT NULL,
+ "room-id" INTEGER NOT NULL,
+ "user-id" INTEGER NOT NULL,
+ "user-type" TEXT NOT NULL,
+ "turbo" INTEGER NOT NULL,
+ "subscriber" INTEGER NOT NULL,
+ "reply-parent-display-name" TEXT,
+ "reply-parent-msg-body" TEXT,
+ "reply-parent-msg-id" TEXT,
+ "reply-parent-user-id" INTEGER,
+ "reply-parent-user-login" TEXT,
+ PRIMARY KEY("id")
+ );
+ """
+ )
+ conn.commit()
+ conn.close()
+
def update_auto_messages_in_database(modules, auto_messages):
conn = sqlite3.connect(os.path.join(PATH, "database.db"))
import tools.permissions
import analyze_command
import scripts.unit_converter
+import scripts.alwase
+import scripts.chatting
import database_manager
import analyze_auto_message
+import time
from importlib import reload
reload(tools.raid)
reload(tools.permissions)
reload(analyze_command)
reload(scripts.unit_converter)
+reload(scripts.alwase)
+reload(scripts.chatting)
reload(database_manager)
reload(analyze_auto_message)
database_manager.create_database()
database_manager.create_variables_db()
+database_manager.create_chat_history_database()
database_manager.update_commands_in_database(commands_modules, commands)
database_manager.update_auto_messages_in_database(
auto_message_modules, auto_messages)
+def start(bot: Bot, message: Message):
+ while True:
+ analyze_auto_message.do_auto_message(bot, message, auto_message_modules)
+ time.sleep(30)
+
+
def main(bot: Bot, message: Message):
if message.command == Commands.PRIVMSG:
+ database_manager.add_message_to_chat_history(message)
analyze_command.do_command(bot, message, commands_modules)
scripts.unit_converter.send_metric(bot, message)
-
- analyze_auto_message.do_auto_message(bot, message, auto_message_modules)
+ scripts.alwase.alwase(bot, message)
+ scripts.chatting.chatting(bot, message)
+ scripts.chatting.chatting_annoy(bot, message)
tools.raid.raid(bot, message)
--- /dev/null
+from aptbot.bot import Bot, Message, Commands
+import tools.smart_privmsg
+import random
+
+def alwase(bot: Bot, message: Message):
+ if "always" not in message.value.lower():
+ return
+ try:
+ replied_msg_id = message.tags["id"]
+ except KeyError:
+ replied_msg_id = None
+ msgs = ["It's alwase Madge", "Why don't you spell it alwase? Madge", "Spell it alwase or peepoArriveBan"]
+ msg = random.choice(msgs)
+ tools.smart_privmsg.send(bot, message, msg, reply=replied_msg_id)
--- /dev/null
+from aptbot.bot import Bot, Message, Commands
+import tools.smart_privmsg
+import random
+
+
+def chatting(bot: Bot, message: Message):
+ if "Chatting" not in message.value:
+ return
+ if random.random() > 0.4:
+ return
+ msg = ""
+ if "reply-parent-msg-body" in message.tags:
+ if message.value.split(' ')[1] == "Chatting":
+ msg = ' '.join(message.value.split(' ')[1:])
+ else:
+ if message.value.split(' ')[0] == "Chatting":
+ msg = message.value
+ if msg:
+ tools.smart_privmsg.send(bot, message, msg)
+
+
+def chatting_annoy(bot: Bot, message: Message):
+ nicks = {}
+ if message.nick.lower() not in nicks:
+ return
+ msg = "Chatting " + message.value
+ tools.smart_privmsg.send(bot, message, msg)
elif cm[i] > 0:
text += f"{ft_inch[i][0]:.1f}ft. and {ft_inch[i][1]:.1f}in. is {cm[i]:.1f}cm. | "
if text:
- tools.smart_privmsg.send(bot, message, text)
+ try:
+ replied_msg_id = message.tags["id"]
+ except KeyError:
+ replied_msg_id = None
+ tools.smart_privmsg.send(bot, message, text, reply=replied_msg_id)
def _tometric(text: str) -> tuple[list[tuple[int, int]], list[float]]:
return word_list
-def send_safe(bot: Bot, channel: str, messages: Union[str, list]):
+def send_safe(bot: Bot, channel: str, messages: Union[str, list], reply=None):
if isinstance(messages, list):
for i in range(len(messages)):
- if messages[i].startswith('/') or messages[i].startswith('!'):
+ if messages[i].startswith('/') or messages[i].startswith('!') or messages[i].startswith('\\') or messages[i].startswith('?'):
messages[i] = messages[i][1:]
else:
- if messages.startswith('/') or messages.startswith('!'):
+ if messages.startswith('/') or messages.startswith('!') or messages.startswith('\\') or messages.startswith('?'):
messages = messages[1:]
- bot.send_privmsg(channel, messages)
+ bot.send_privmsg(channel, messages, reply)
-def send(bot: Bot, message_data: Message, message: str, to_remove: int = 1, safe_send: bool = True):
+def send(bot: Bot, message_data: Message, message: str, to_remove: int = 1, safe_send: bool = True, reply=None):
# for msg in _split_message(' '.join(message_data.value.split(' ')[1:])):
# message = message.replace("{message}", msg)
# message = message.replace("{nick}", message_data.nick)
messages = _split_message(message)
if safe_send:
- send_safe(bot, message_data.channel, messages)
+ send_safe(bot, message_data.channel, messages, reply)
else:
- bot.send_privmsg(message_data.channel, messages)
+ bot.send_privmsg(message_data.channel, messages, reply)
max_last_checked[0],
)
)
- print(f"I checked {max_last_checked}")
fetched = c.fetchone()
if fetched:
start_stream_ts, last_checked, _ = fetched
- print(f"stream ts = {start_stream_ts}")
- print(
- f"last_checked {last_checked} + check_streamtime_cd {CHECK_STREAMTIME_CD} \n time.time {time.time()}")
if time.time() < last_checked + CHECK_STREAMTIME_CD:
return start_stream_ts
if not fetched:
start_stream_ts = int(stream_info[0].started_at.timestamp())
- print("TEST")
c.execute(
"REPLACE INTO stream_info VALUES (?, ?, ?)",
(
conn.close()
return start_stream_ts
- print("MORE TEST")
start_stream_ts, last_checked, _ = fetched
c.execute(
"REPLACE INTO stream_info VALUES (?, ?, ?)",
from yt_api import *
+import html
@dataclass
"GET",
get_url,
)
- print(f"the r.status is {r.status}")
if r.status != 200:
return None
data = json.loads(r.data.decode("utf-8"))["items"][0]
video_id = data["id"]["videoId"]
- video_title = data["snippet"]["title"]
+ video_title = html.unescape(data["snippet"]["title"])
return Video(video_title, video_id)