Finally fixed (thanks to yrrad8) Issue #005
Thanks also for the documentation help!
This commit is contained in:
GaMeNu 2023-10-22 23:41:11 +03:00
parent e0380f3a20
commit dfd454d120
3 changed files with 127 additions and 87 deletions

View File

@ -30,6 +30,7 @@ Get the latest alerts from up to a certain time back.
```
discord.py
mysql-connector-python
requests
python-dotenv
```
#### Other requirements

View File

@ -1,19 +1,12 @@
import asyncio
import datetime
import json
import os
import random
from _xxsubinterpreters import channel_recv
import requests
import discord
from dotenv import load_dotenv
from discord.ext import commands, tasks
from discord import app_commands
import logging
import db_access
from db_access import *
from markdown import md
@ -83,7 +76,27 @@ class Alert:
data.get('desc'))
class DistrictEmbed(discord.Embed):
class AlertEmbed:
def __init__(self, district: db_access.District, description: str):
self.embed = discord.Embed(color=discord.Color.from_str('#FF0000'))
self.district = district
self.embed.title = f'התראה ב{district}'
self.embed.add_field(name='נכון ל', value=datetime.datetime.now().strftime("%H:%M:%S\n%d/%m/%Y"), inline=False)
self.embed.add_field(name='מידע נוסף', value=description)
@staticmethod
def generic_alert(district: db_access.District, description: str):
ret_alem = AlertEmbed(district, description)
class DistrictEmbedTemp(discord.Embed):
def __init__(self, district_id: int, **kwargs):
self.district_id = district_id
super().__init__(**kwargs)
@ -139,24 +152,41 @@ class Notificator(commands.Cog):
#
# Off I go to make a utility function!
ch = self.bot.get_channel(intr.channel_id)
if ch is not None and self.db.is_registered_channel(ch.id):
# 17:42 update: Turns out I am very dumb and if the channel is not registered I don't return None but rather keep going
# Thanks yrrad8! (/srs)
if self.db.is_registered_channel(intr.channel_id):
return True
ch = self.bot.get_user(intr.user.id)
if ch is not None and self.db.is_registered_channel(ch.id):
if self.db.is_registered_channel(intr.user.id):
return False
return None
def get_matching_channel_id(self, intr: discord.Interaction) -> int | None:
channel_type = self.in_registered_channel(intr)
if channel_type is None:
return None
elif channel_type:
return intr.channel_id
else:
return intr.user.id
def get_matching_channel(self, intr: discord.Interaction) -> db_access.Channel:
"""
Gets the matching Channel ID for Server Channel or DM. Returns None if UNREGISTERED or not found
:param intr: Command interaction from discord
:return: registered channel ID
"""
channel = self.db.get_channel(intr.channel_id)
if channel is None:
channel = self.db.get_channel(intr.user.id)
return channel
@staticmethod
async def has_permission(intr: discord.Interaction) -> bool:
"""
Check if current user have an admin permissions
:param intr: Command interaction from discord
:return: Boolean: Have a permissions
"""
if intr.guild is not None and not intr.user.guild_permissions.manage_channels:
return False
return True
@tasks.loop(seconds=1)
async def check_for_updates(self):
@ -212,9 +242,22 @@ class Notificator(commands.Cog):
@staticmethod
def generate_alert_embed(alert_object: Alert, district: str, arrival_time: int | None, time: str,
lang: str, district_id: int) -> DistrictEmbed:
lang: str, district_id: int) -> DistrictEmbedTemp:
"""
Generate alert embed
:param alert_object: Alert content
:param district: District alert
:param arrival_time: Time to get to a safe space
:param time: Alert Time
:param lang: Alert language
:param district_id: IDK what this param doing; Me neither LOL
:return: Alert embed for current alert
"""
# TODO: fix tha param description
# TODO: fix this entire function
# TODO: Using 1 generate alert function is probably bad, should probably split into a utility class
e = DistrictEmbed(district_id=district_id, color=discord.Color.from_str('#FF0000'))
e = DistrictEmbedTemp(district_id=district_id, color=discord.Color.from_str('#FF0000'))
e.title = f'התראה ב{district}'
e.add_field(name=district, value=alert_object.title, inline=False)
match alert_object.category:
@ -249,7 +292,7 @@ class Notificator(commands.Cog):
alert_history = None
self.log.info(f'Sending alerts to channels')
embed_ls: list[DistrictEmbed] = []
embed_ls: list[DistrictEmbedTemp] = []
new_alert = Alert.from_dict(alert_data)
@ -277,18 +320,18 @@ class Notificator(commands.Cog):
alert_time_str = alert_time.strftime("%H:%M:%S\n%d/%m/%Y")
if district_data is not None:
embed_ls.append(Notificator.generate_alert_embed(new_alert, district, district_data.migun_time,
alert_time_str, 'he', district_data.id))
alert_time_str, 'he', district_data.district_id))
else:
embed_ls.append(Notificator.generate_alert_embed(new_alert, district, None, alert_time_str, 'he', district_data.id))
for channel_tup in self.db.get_all_channels():
channel = Channel.from_tuple(channel_tup)
if channel.server_id is not None:
dc_ch = self.bot.get_channel(channel.id)
dc_ch = self.bot.get_channel(channel.district_id)
else:
dc_ch = self.bot.get_user(channel.id)
dc_ch = self.bot.get_user(channel.district_id)
channel_districts = self.db.get_channel_district_ids(channel.id)
channel_districts = self.db.get_channel_district_ids(channel.district_id)
for emb in embed_ls:
if dc_ch is None:
@ -299,19 +342,23 @@ class Notificator(commands.Cog):
await dc_ch.send(embed=emb, view=self.hfc_button_view())
await asyncio.sleep(0.01)
except BaseException as e:
self.log.warning(f'Failed to send alert in channel id={channel.id}:\n'
self.log.warning(f'Failed to send alert in channel id={channel.district_id}:\n'
f'{e}')
@app_commands.command(name='register',
description='Register a channel to receive HFC alerts (Requires Manage Channels)')
@app_commands.checks.has_permissions(manage_channels=True)
async def register_channel(self, intr: discord.Interaction):
channel_id = intr.channel_id
if intr.channel.guild is not None:
server_id = intr.channel.guild.id
if not await Notificator.has_permission(intr):
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
if intr.guild is not None:
channel_id = intr.channel_id
server_id = intr.guild.id
else:
server_id = None
channel_id = intr.user.id
server_id = None
await self.attempt_registration(intr, channel_id, server_id)
@ -325,13 +372,16 @@ class Notificator(commands.Cog):
if server_id is not None and self.db.get_server(server_id) is None:
self.db.add_server(server_id, 'he')
self.db.add_channel(channel_id, server_id, 'he')
try:
await intr.response.send_message(f'Channel #{intr.channel.name} will now receive HFC alerts.')
except AttributeError:
await intr.response.send_message(f'This channel will now receive HFC alerts.')
ch = self.bot.get_channel(channel_id)
try:
ch = self.bot.get_channel(channel_id)
perms = ch.overwrites_for(self.bot.user)
perms.update(send_messages=True)
await ch.set_permissions(target=ch.guild.me, overwrite=perms,
@ -340,20 +390,25 @@ class Notificator(commands.Cog):
await intr.followup.send(
f'Could not allow bot to send messages to this channel! Please add the bot to this channel and allow it to send messages.\n'
f'Error info: {e.__str__()}')
@register_channel.error
async def register_channel_error(self, intr: discord.Interaction, error):
if isinstance(error, app_commands.MissingPermissions):
if intr.guild is not None:
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
await self.attempt_registration(intr, intr.user.id, None)
except AttributeError:
pass
@app_commands.command(name='unregister',
description='Stop a channel from receiving HFC alerts (Requires Manage Channels)')
@app_commands.checks.has_permissions(manage_channels=True)
async def unregister_channel(self, intr: discord.Interaction, confirmation: str = None):
channel_id = intr.channel_id
if not await Notificator.has_permission(intr):
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
channel = self.get_matching_channel(intr)
if channel is None:
try:
await intr.response.send_message(f'Channel #{intr.channel.name} is not yet receiving HFC alerts')
except AttributeError:
await intr.response.send_message(f'This channel is not yet receiving HFC alerts')
return
conf_str = intr.user.name
@ -371,31 +426,15 @@ class Notificator(commands.Cog):
await self.attempt_unregistration(intr, channel)
async def attempt_unregistration(self, intr, channel):
if channel is None:
try:
await intr.response.send_message(f'Channel #{intr.channel.name} is not yet receiving HFC alerts')
except AttributeError:
await intr.response.send_message(f'This channel is not yet receiving HFC alerts')
return
async def attempt_unregistration(self, intr, channel: db_access.Channel):
self.db.remove_channel(channel.id)
if channel.server_id is not None:
self.db.remove_channel(channel.id)
else:
self.db.remove_channel(intr.user.id)
try:
await intr.response.send_message(f'Channel #{intr.channel.name} will no longer receive HFC alerts')
except AttributeError:
await intr.response.send_message(f'This channel will no longer receive HFC alerts')
@unregister_channel.error
async def unregister_channel_error(self, intr: discord.Interaction, error):
if isinstance(error, app_commands.MissingPermissions):
if intr.guild is not None:
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
await self.attempt_unregistration(intr, self.db.get_channel(intr.user.id))
@app_commands.command(name='latest',
description='Get all alerts up to a certain time back (may be slightly outdated)')
@app_commands.describe(time='Amount of time back',
@ -477,8 +516,6 @@ class Notificator(commands.Cog):
if alert_counter % alerts_in_page != 0:
max_page += 1
print(max_page, page_number)
if time_back_amount <= 0:
raise ValueError("Time can't be lower than 1.")
@ -557,6 +594,10 @@ class Notificator(commands.Cog):
await intr.response.send_message(embed=e, view=view)
@app_commands.command(name='send_alert', description='Send a custom alert (available to bot author only)')
@app_commands.describe(title='Alert title',
desc='Alert description',
districts='Active alert districts',
cat='Alert category')
async def test_alert(self,
intr: discord.Interaction,
title: str = 'בדיקת מערכת שליחת התראות',
@ -578,13 +619,6 @@ class Notificator(commands.Cog):
"desc": desc
}, districts_ls)
@staticmethod
async def has_permission(intr: discord.Interaction) -> bool:
if intr.guild is not None and not intr.user.guild_permissions.manage_channels:
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return False
return True
@staticmethod
def locations_page(data_list: list, page: int, res_in_page: int = 50) -> str:
"""
@ -643,10 +677,11 @@ class Notificator(commands.Cog):
async def location_add(self, intr: discord.Interaction, locations: str):
if not await self.has_permission(intr):
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
channel_id = self.get_matching_channel_id(intr)
if channel_id is None:
channel = self.get_matching_channel(intr)
if channel.id is None:
await intr.response.send_message('Could not find this channel. Are you sure it is registered?')
return
@ -660,7 +695,7 @@ class Notificator(commands.Cog):
return
try:
self.db.add_channel_districts(channel_id, location_ids)
self.db.add_channel_districts(channel.id, location_ids)
except ValueError as e:
await intr.response.send_message(e.__str__())
return
@ -672,10 +707,11 @@ class Notificator(commands.Cog):
async def location_remove(self, intr: discord.Interaction, locations: str):
if not await self.has_permission(intr):
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
channel_id = self.get_matching_channel_id(intr)
if channel_id is None:
channel = self.get_matching_channel(intr)
if channel is None:
await intr.response.send_message('Could not find this channel. Are you sure it is registered?')
return
@ -688,17 +724,18 @@ class Notificator(commands.Cog):
await intr.response.send_message(f'District ID {md.b(f"{location}")} is not a valid district ID.')
return
self.db.remove_channel_districts(channel_id, location_ids)
self.db.remove_channel_districts(channel.id, location_ids)
await intr.response.send_message('Successfully removed all IDs')
@location_group.command(name='clear', description='Clear all registered locations (get alerts on all locations)')
async def location_clear(self, intr: discord.Interaction, confirmation: str = None):
if not await self.has_permission(intr):
await intr.response.send_message('Error: You are missing the Manage Channels permission.')
return
channel_id = self.get_matching_channel_id(intr)
if channel_id is None:
channel = self.get_matching_channel(intr)
if channel is None:
await intr.response.send_message('Could not find this channel. Are you sure it is registered?')
return
@ -719,8 +756,8 @@ class Notificator(commands.Cog):
@location_group.command(name='registered', description='List all locations registered to this channel, by IDs and names.')
async def location_registered(self, intr: discord.Interaction, page: int = 1):
channel_id = self.get_matching_channel_id(intr)
if channel_id is None:
channel = self.get_matching_channel(intr)
if channel is None:
await intr.response.send_message('Could not find this channel. Are you sure it is registered?')
return
@ -733,7 +770,7 @@ class Notificator(commands.Cog):
# Then it sorts it with the key being district_name
districts = sorted([self.db.get_district(district_id).to_tuple()
for district_id
in self.db.get_channel_district_ids(channel_id)],
in self.db.get_channel_district_ids(channel.id)],
key=lambda tup: tup[1])
page = self.locations_page(districts, page - 1)

View File

@ -25,7 +25,7 @@ class Area:
class District:
def __init__(self, id: int, name: str, area_id: int, migun_time: int):
self.id = id
self.district_id = id
self.name = name
self.area_id = area_id
self.migun_time = migun_time
@ -35,7 +35,7 @@ class District:
return District(tup[0], tup[1], tup[2], tup[3])
def to_tuple(self) -> tuple:
return self.id, self.name, self.area_id, self.migun_time
return self.district_id, self.name, self.area_id, self.migun_time
class Channel:
@ -206,6 +206,8 @@ class DBAccess:
Reason: Cannot create more queries while an iterator is active due to unread results.
"""
raise NotImplementedError("This function has been deprecated!")
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM channels')
@ -220,7 +222,6 @@ class DBAccess:
return res
def remove_channel(self, id: int):
print(id)
with self.connection.cursor() as crsr:
crsr.execute('DELETE FROM channels WHERE channel_id=%s', (id,))
self.connection.commit()
@ -267,6 +268,8 @@ class DBAccess:
Reason: Cannot create more queries while an iterator is active due to unread results.
"""
raise NotImplementedError("This function has been deprecated!")
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM district')
@ -312,7 +315,6 @@ class DBAccess:
def get_channel_district_ids(self, channel_id: int) -> list:
with self.connection.cursor() as crsr:
crsr.nextset()
crsr.execute('SELECT locations '
'FROM channels '