2023-10-20 16:07:16 +03:00
import asyncio
2023-10-10 22:21:19 +03:00
import datetime
2023-10-25 21:16:31 +03:00
import re
2023-10-29 15:56:52 +02:00
import sys
2023-10-20 01:20:24 +03:00
2023-10-10 22:21:19 +03:00
import requests
import discord
from discord . ext import commands , tasks
from discord import app_commands
2023-10-20 16:07:16 +03:00
import db_access
2023-10-29 15:56:52 +02:00
import errlogging
2023-10-20 16:07:16 +03:00
from db_access import *
2023-10-10 22:21:19 +03:00
from markdown import md
load_dotenv ( )
AUTHOR_ID = int ( os . getenv ( ' AUTHOR_ID ' ) )
2023-10-26 11:31:47 +03:00
# 2023-10-26 I have decided to start documenting the project.
2023-10-10 22:21:19 +03:00
class AlertReqs :
2023-10-26 11:31:47 +03:00
"""
A class that handles all requests from HFC ' s website
"""
2023-10-25 12:49:05 +03:00
def __init__ ( self ) :
self . session = requests . Session ( )
self . session . verify = True
def request_alert_json ( self ) - > dict | None :
2023-10-12 01:10:59 +03:00
"""
Request a json of the current running alert
: return : JSON object as Python dict , or None if there ' s no alert running
: raises requests . exceptions . Timeout : If request times out ( 5 seconds )
"""
2023-10-25 12:49:05 +03:00
req = self . session . get ( ' https://www.oref.org.il/WarningMessages/alert/alerts.json ' , headers = {
2023-10-10 22:21:19 +03:00
' Referer ' : ' https://www.oref.org.il/ ' ,
' X-Requested-With ' : ' XMLHttpRequest ' ,
2023-10-25 12:49:05 +03:00
' Connection ' : ' keep-alive ' ,
' Client ' : ' HFC Notificator bot for Discord ' ,
' Nonexistent-Header ' : ' Yes '
2023-10-11 17:16:33 +03:00
} , timeout = 5 )
2023-10-10 22:21:19 +03:00
decoded = req . content . decode ( ' utf-8-sig ' )
if decoded is None or len ( decoded ) < 3 : # Why does it get a '\r\n' wtf
2023-10-15 11:47:45 +03:00
ret_dict = { }
2023-10-10 22:21:19 +03:00
else :
2023-10-15 11:47:45 +03:00
try :
ret_dict = json . loads ( decoded )
except ( json . decoder . JSONDecodeError , json . JSONDecodeError ) :
ret_dict = None
2023-10-10 22:21:19 +03:00
return ret_dict
2023-10-26 11:31:47 +03:00
def request_history_json ( self ) - > dict | None :
2023-10-12 01:10:59 +03:00
"""
Request a json of the alert history from last day
: return : JSON object as Python dict
: raises requests . exceptions . Timeout : If request times out ( 5 seconds )
"""
2023-10-26 11:31:47 +03:00
req = self . session . get ( ' https://www.oref.org.il/WarningMessages/History/AlertsHistory.json ' , timeout = 5 )
2023-10-10 22:21:19 +03:00
content = req . text
2023-10-15 11:47:45 +03:00
try :
ret_dict = json . loads ( content )
except ( json . JSONDecodeError , json . decoder . JSONDecodeError ) :
ret_dict = None
return ret_dict
2023-10-10 22:21:19 +03:00
2023-10-11 12:45:28 +03:00
class Alert :
2023-10-26 11:31:47 +03:00
"""
Represents an HFC Alert
"""
2023-10-11 12:45:28 +03:00
def __init__ ( self , id : int , cat : int , title : str , districts : list [ str ] , desc : str ) :
2023-10-26 11:31:47 +03:00
"""
Init an Alert instance
: param id : Alert ID
: param cat : Alert category
: param title : Alert title
: param districts : districts the alert is running for
: param desc : Alert description / extra info
"""
2023-10-11 12:45:28 +03:00
self . id = id
self . category = cat
self . title = title
self . districts = districts
self . description = desc
2023-10-23 01:04:03 +03:00
@classmethod
def from_dict ( cls , data : dict ) :
2023-10-26 11:31:47 +03:00
"""
Return a new Alert instance from an Alert - formatted dict ( matching HFC alert requests )
Dict format :
{
" id " : int ,
" cat " : int ,
" title " : str ,
" data " : list [ str ] ,
" desc " : str
}
: param data : A dict of matching format
: return : The new Alert instance
"""
2023-10-23 01:04:03 +03:00
return cls ( int ( data . get ( ' id ' , ' 0 ' ) ) ,
int ( data . get ( ' cat ' , ' 0 ' ) ) ,
data . get ( ' title ' ) ,
data . get ( ' data ' ) ,
data . get ( ' desc ' ) )
2023-10-11 19:17:29 +03:00
2023-10-11 12:45:28 +03:00
2023-10-22 23:41:11 +03:00
class AlertEmbed :
2023-10-26 11:31:47 +03:00
"""
A class representing an AlertEmbed
: var embed : discord . Embed ready for sending
: var alert : Alert containing alert data
: var district : db_access . AreaDistrict | str containing district data
"""
2023-10-22 23:41:11 +03:00
2023-10-24 00:54:45 +03:00
def __init__ ( self , alert : Alert | dict , district : db_access . AreaDistrict | str ) :
2023-10-23 01:04:03 +03:00
"""
Initiating the AlertEmbed class directly is equivalent to AlertEmbed . generic_alert , but is not recommended .
Please use AlertEmbed . generic_alert instead .
2023-10-26 11:31:47 +03:00
: var embed : discord . Embed ready for sending
: var alert : Alert containing alert data
: var district : db_access . AreaDistrict | str containing district data
2023-10-23 01:04:03 +03:00
"""
2023-10-22 23:41:11 +03:00
self . embed = discord . Embed ( color = discord . Color . from_str ( ' #FF0000 ' ) )
self . district = district
2023-10-23 01:04:03 +03:00
if isinstance ( alert , dict ) :
self . alert = Alert . from_dict ( alert )
else :
self . alert = alert
2023-10-22 23:41:11 +03:00
2023-10-24 00:54:45 +03:00
if isinstance ( self . district , AreaDistrict ) :
2023-10-24 00:59:10 +03:00
self . embed . title = f ' התראה ב { self . district . name } '
2023-10-24 00:54:45 +03:00
self . embed . add_field ( name = self . alert . title , value = f ' איזור { self . district . area . name } ' )
2023-10-24 00:59:10 +03:00
2023-10-24 00:54:45 +03:00
else :
2023-10-24 00:59:10 +03:00
self . embed . title = f ' התראה ב { self . district } '
self . embed . add_field ( name = self . alert . title , value = ' ' )
2023-10-24 00:54:45 +03:00
2023-10-22 23:41:11 +03:00
self . embed . add_field ( name = ' נכון ל ' , value = datetime . datetime . now ( ) . strftime ( " % H: % M: % S \n %d / % m/ % Y " ) , inline = False )
2023-10-23 01:04:03 +03:00
self . embed . add_field ( name = ' מידע נוסף ' , value = self . alert . description )
2023-10-22 23:41:11 +03:00
2023-10-23 01:04:03 +03:00
@classmethod
2023-10-24 01:02:55 +03:00
def generic_alert ( cls , alert : Alert | dict , district : db_access . AreaDistrict | str ) :
2023-10-26 11:31:47 +03:00
"""
Returns a new Generic AlertEmbed
: param alert : Alert object
: param district : AreaDistrict object ( District containing Area , check db_access . AreaDistrict )
: return : New AlertEmbed instance
"""
2023-10-23 01:04:03 +03:00
ret_alem = cls ( alert , district )
return ret_alem
@classmethod
2023-10-24 01:02:55 +03:00
def missile_alert ( cls , alert : Alert | dict , district : db_access . AreaDistrict | str ) :
2023-10-26 11:31:47 +03:00
"""
Returns a new Missile AlertEmbed
Similar to Generic AlertEmbed , but contains Migun Time
: param alert : Alert object
: param district : AreaDistrict object ( District containing Area , check db_access . AreaDistrict )
: return : New AlertEmbed instance
"""
2023-10-23 01:04:03 +03:00
ret_alem = cls . generic_alert ( alert , district )
if ( not isinstance ( district , str ) ) and ( district . migun_time is not None ) :
2023-10-24 00:54:45 +03:00
ret_alem . embed . insert_field_at ( index = 1 , name = ' זמן מיגון ' , value = f ' { district . migun_time } שניות ' , inline = False )
2023-10-23 01:04:03 +03:00
return ret_alem
2023-10-24 00:54:45 +03:00
ret_alem . embed . insert_field_at ( index = 1 , name = ' זמן מיגון ' , value = ' שגיאה באחזרת המידע ' , inline = False )
2023-10-23 01:04:03 +03:00
return ret_alem
@classmethod
2023-10-24 01:02:55 +03:00
def auto_alert ( cls , alert : Alert | dict , district : db_access . AreaDistrict | str ) :
2023-10-23 01:04:03 +03:00
"""
Tired of having to CHOOSE an alert type all the time ? Well this is JUST for you !
Introducing . . . auto_alert ! Just init it like any other alert , and it will return the fitting alert right then and there * !
* " then and there " does not include any computer , end - user , developer , or any other type of tomfoolery .
( Hopefully now I ' ll never have to write documentation again >:) )
: param alert : Alert object or alert dict .
: param district : District object ( from db_access )
: return : AlertEmbed object
"""
if isinstance ( alert , dict ) :
alert_obj = Alert . from_dict ( alert )
else :
alert_obj = alert
match alert_obj . category :
case 1 :
return cls . missile_alert ( alert_obj , district )
case _ :
return cls . generic_alert ( alert_obj , district )
2023-10-22 23:41:11 +03:00
2023-10-12 10:10:30 +03:00
# noinspection PyUnresolvedReferences
2023-10-10 22:21:19 +03:00
class Notificator ( commands . Cog ) :
2023-10-26 11:31:47 +03:00
"""
Oh god .
"""
2023-10-20 01:41:05 +03:00
location_group = app_commands . Group ( name = ' locations ' ,
description = ' Commands related adding, removing, or setting locations. ' )
2023-10-10 22:21:19 +03:00
districts : list [ dict ] = json . loads ( requests . get ( ' https://www.oref.org.il//Shared/Ajax/GetDistricts.aspx ' ) . text )
def __init__ ( self , bot : commands . Bot , handler : logging . Handler ) :
2023-10-26 11:31:47 +03:00
"""
Create the Cog
: param bot : Discord commands bot client
: param handler : Logging handler
"""
2023-10-10 22:21:19 +03:00
self . bot = bot
self . log = logging . Logger ( ' Notificator ' )
self . log . addHandler ( handler )
self . db = DBAccess ( )
self . active_districts = [ ]
2023-10-12 22:31:59 +03:00
self . reset_district_checker = 0
2023-10-25 12:49:05 +03:00
self . alert_reqs = AlertReqs ( )
2023-10-10 22:21:19 +03:00
if not self . check_for_updates . is_running ( ) :
self . check_for_updates . start ( )
@staticmethod
async def setup ( bot : commands . Bot , handler : logging . Handler ) :
2023-10-26 11:31:47 +03:00
"""
Set up the cog
: param bot : commands . Bot client
: param handler : logging handler
: return : the cog instance that was created and added to the bot
"""
2023-10-10 22:21:19 +03:00
notf = Notificator ( bot , handler )
if bot . get_cog ( ' Notificator ' ) is None :
await bot . add_cog ( notf )
return notf
@commands.Cog.listener ( )
async def on_ready ( self ) :
2023-10-26 11:31:47 +03:00
"""
Start API update task when ready
"""
2023-10-10 22:21:19 +03:00
if self . check_for_updates . is_running ( ) :
return
self . check_for_updates . start ( )
2023-10-22 14:03:07 +03:00
def in_registered_channel ( self , intr : discord . Interaction ) - > bool | None :
"""
2023-10-22 23:41:19 +03:00
an info about current channel
: param intr : Command interaction from discord
: return : Boolean :
True - is a registered server channel , False - is a registered DM , None - was not found ( may not be registered )
2023-10-22 14:03:07 +03:00
"""
# OPTIONS:
# Channel ID not None + DB not None: IS Channel and IS Registered => matching output and end
# Channel ID not None + DB None: IS Channel and NOT Registered => matching output and end
# Channel ID None cases:
# User ID not None + DB not None: IS DM and IS Registered
# User ID not None + DB None: IS DM and NOT Registered
#
# Off I go to make a utility function!
2023-10-22 23:41:11 +03:00
# 17:42 update: Turns out I am very dumb and if the channel is not registered I don't return None but rather keep going
# Thanks yrrad8! (/srs)
if self . db . is_registered_channel ( intr . channel_id ) :
2023-10-22 14:03:07 +03:00
return True
2023-10-22 23:41:11 +03:00
if self . db . is_registered_channel ( intr . user . id ) :
2023-10-22 14:03:07 +03:00
return False
return None
2023-10-22 23:41:11 +03:00
def get_matching_channel ( self , intr : discord . Interaction ) - > db_access . Channel :
"""
Gets the matching Channel ID for Server Channel or DM . Returns None if UNREGISTERED or not found
: param intr : Command interaction from discord
: return : registered channel ID
"""
channel = self . db . get_channel ( intr . channel_id )
if channel is None :
channel = self . db . get_channel ( intr . user . id )
return channel
@staticmethod
async def has_permission ( intr : discord . Interaction ) - > bool :
"""
Check if current user have an admin permissions
: param intr : Command interaction from discord
: return : Boolean : Have a permissions
"""
if intr . guild is not None and not intr . user . guild_permissions . manage_channels :
return False
return True
2023-10-10 22:21:19 +03:00
@tasks.loop ( seconds = 1 )
async def check_for_updates ( self ) :
2023-10-26 11:31:47 +03:00
"""
API Updating loop , contacts the API every second to get the latest info
"""
2023-10-11 17:16:33 +03:00
try :
2023-10-26 11:31:47 +03:00
# Request the data
2023-10-25 12:49:05 +03:00
current_alert : dict = self . alert_reqs . request_alert_json ( )
2023-10-11 17:16:33 +03:00
except requests . exceptions . Timeout as error :
self . log . error ( f ' Request timed out: { error } ' )
return
2023-10-10 22:21:19 +03:00
self . log . debug ( f ' Alert response: { current_alert } ' )
2023-10-12 22:31:59 +03:00
2023-10-15 11:47:45 +03:00
if current_alert is None or len ( current_alert ) == 0 :
2023-10-26 11:31:47 +03:00
# Alert is empty
2023-10-15 11:47:45 +03:00
if current_alert is None :
2023-10-26 11:31:47 +03:00
# None - an error while retrieving alert
2023-10-15 11:47:45 +03:00
self . log . warning ( ' Error while current alert data. ' )
2023-10-13 11:39:56 +03:00
2023-10-26 11:31:47 +03:00
# from here, len(current_alert) == 0
# current_alert is an empty dict.
2023-10-12 22:31:59 +03:00
if len ( self . active_districts ) == 0 :
2023-10-26 11:31:47 +03:00
# Alert is empty and there are no active districts
2023-10-12 22:31:59 +03:00
return
2023-10-13 11:39:56 +03:00
2023-10-26 11:31:47 +03:00
# Check if we should reset active districts. 3 empty alerts in a row will trigger a reset
2023-10-12 22:31:59 +03:00
self . reset_district_checker + = 1
if self . reset_district_checker == 3 :
2023-10-13 11:39:56 +03:00
print ( ' reset ' )
2023-10-12 22:31:59 +03:00
self . active_districts = [ ]
self . reset_district_checker = 0
2023-10-10 22:21:19 +03:00
return
2023-10-26 11:31:47 +03:00
# data = active districts
2023-10-10 22:21:19 +03:00
data : list [ str ] = current_alert [ " data " ]
new_districts : list [ str ] = [ ]
for district in data :
2023-10-26 11:31:47 +03:00
# Get all new_districts
2023-10-10 22:21:19 +03:00
if district in self . active_districts :
continue
new_districts . append ( district )
if len ( new_districts ) == 0 :
2023-10-26 11:31:47 +03:00
# There are no new districts.
2023-10-10 22:21:19 +03:00
return
2023-10-26 11:31:47 +03:00
2023-10-12 13:01:53 +03:00
try :
await self . send_new_alert ( current_alert , new_districts )
2023-10-30 13:30:36 +02:00
except Exception as e :
2023-10-12 13:01:53 +03:00
self . log . error ( f ' Could not send message! \n Error info: { e . __str__ ( ) } ' )
2023-10-26 11:31:47 +03:00
2023-10-10 22:21:19 +03:00
self . active_districts = data
2023-10-26 11:31:47 +03:00
self . reset_district_checker = 0
2023-10-10 22:21:19 +03:00
2023-10-13 11:39:56 +03:00
@check_for_updates.after_loop
2023-10-29 15:56:52 +02:00
async def after_update_loop ( self ) :
2023-10-13 11:39:56 +03:00
# Attempt to force stupid "Unread Result" down its own throat
# and just reset the connection.
# I'm not dealing with Unread Results
2023-10-12 12:55:26 +03:00
self . db . connection . close ( )
self . db = DBAccess ( )
2023-10-12 13:01:53 +03:00
if not self . check_for_updates . is_running ( ) :
self . check_for_updates . start ( )
2023-10-12 12:55:26 +03:00
2023-10-29 15:56:52 +02:00
@check_for_updates.error
async def update_loop_error ( self , err ) :
errlogging . new_errlog ( sys . exc_info ( ) [ 1 ] )
await self . after_update_loop ( )
2023-10-10 22:21:19 +03:00
@staticmethod
def hfc_button_view ( ) - > discord . ui . View :
2023-10-26 11:31:47 +03:00
"""
Get a discord UI View containing a button with a link to the HFC Website
: returns : discord . ui . View containing a discord . ui . Button ( ) with link to HFC website
"""
2023-10-10 22:21:19 +03:00
button = discord . ui . Button (
style = discord . ButtonStyle . link ,
label = ' אתר פיקוד העורף ' ,
url = ' https://www.oref.org.il '
)
view = discord . ui . View ( )
view . add_item ( button )
return view
2023-10-30 13:30:36 +02:00
@errlogging.async_errlog
2023-10-11 12:45:28 +03:00
async def send_new_alert ( self , alert_data : dict , new_districts : list [ str ] ) :
2023-10-22 23:41:19 +03:00
"""
Push an alert to all registered channels
: param alert_data : Alert data dict ( see test_alert for format )
: param new_districts : Currently active districts ( districts that were not already active )
: return :
"""
2023-10-29 15:56:52 +02:00
2023-10-12 12:00:44 +03:00
self . log . info ( f ' Sending alerts to channels ' )
2023-10-10 22:21:19 +03:00
2023-10-23 01:04:03 +03:00
embed_ls : list [ AlertEmbed ] = [ ]
2023-10-11 12:45:28 +03:00
for district in new_districts :
2023-10-23 01:04:03 +03:00
district_data = self . db . get_district_by_name ( district )
2023-10-11 17:16:33 +03:00
if district_data is not None :
2023-10-24 00:54:45 +03:00
embed_ls . append ( AlertEmbed . auto_alert ( alert_data , AreaDistrict . from_district ( district_data , self . db . get_area ( district_data . area_id ) ) ) )
2023-10-11 17:16:33 +03:00
else :
2023-10-23 01:04:03 +03:00
embed_ls . append ( AlertEmbed . auto_alert ( alert_data , district ) )
2023-10-10 22:21:19 +03:00
2023-10-25 21:49:17 +03:00
asyncio . create_task ( self . send_alerts_to_channels ( embed_ls ) )
2023-10-30 13:30:36 +02:00
@errlogging.async_errlog
2023-10-25 21:49:17 +03:00
async def send_alerts_to_channels ( self , embed_ls ) :
2023-10-26 11:31:47 +03:00
"""
Send the embeds in embed_ls to all channels
: param embed_ls : List of AlertEmbeds to send to channels
"""
2023-10-30 13:30:36 +02:00
raise Exception ( ' sdgh ' )
2023-10-20 16:07:16 +03:00
for channel_tup in self . db . get_all_channels ( ) :
channel = Channel . from_tuple ( channel_tup )
2023-10-10 22:21:19 +03:00
if channel . server_id is not None :
2023-10-24 00:54:45 +03:00
dc_ch = self . bot . get_channel ( channel . id )
2023-10-10 22:21:19 +03:00
else :
2023-10-24 00:54:45 +03:00
dc_ch = self . bot . get_user ( channel . id )
2023-10-20 16:07:16 +03:00
for emb in embed_ls :
2023-10-25 10:10:42 +03:00
# Skipping conditions
2023-10-10 22:21:19 +03:00
if dc_ch is None :
2023-10-25 10:10:42 +03:00
# Channel could not be found
2023-10-10 22:21:19 +03:00
continue
2023-10-25 10:10:42 +03:00
if len ( channel . locations ) != 0 :
# Channel has specific locations registered
if isinstance ( emb . district , AreaDistrict ) and ( emb . district . district_id not in channel . locations ) :
# District is registered but isn't in channel's registered location list
continue
if isinstance ( emb . district , str ) :
# District is not registered.
continue
2023-10-12 12:00:44 +03:00
try :
2023-10-23 01:04:03 +03:00
await dc_ch . send ( embed = emb . embed , view = self . hfc_button_view ( ) )
2023-10-26 19:47:18 +03:00
await asyncio . sleep ( 0.02 )
2023-10-30 13:30:36 +02:00
except Exception as e :
2023-10-22 23:41:11 +03:00
self . log . warning ( f ' Failed to send alert in channel id= { channel . district_id } : \n '
2023-10-12 12:09:08 +03:00
f ' { e } ' )
2023-10-12 12:00:44 +03:00
2023-10-11 19:17:29 +03:00
@app_commands.command ( name = ' register ' ,
description = ' Register a channel to receive HFC alerts (Requires Manage Channels) ' )
2023-10-10 22:21:19 +03:00
async def register_channel ( self , intr : discord . Interaction ) :
2023-10-22 23:41:11 +03:00
if not await Notificator . has_permission ( intr ) :
await intr . response . send_message ( ' Error: You are missing the Manage Channels permission. ' )
return
if intr . guild is not None :
channel_id = intr . channel_id
server_id = intr . guild . id
2023-10-10 22:21:19 +03:00
else :
channel_id = intr . user . id
2023-10-22 23:41:11 +03:00
server_id = None
2023-10-10 22:21:19 +03:00
2023-10-11 14:32:05 +03:00
await self . attempt_registration ( intr , channel_id , server_id )
async def attempt_registration ( self , intr , channel_id , server_id ) :
2023-10-10 22:21:19 +03:00
if self . db . get_channel ( channel_id ) is not None :
try :
await intr . response . send_message ( f ' Channel # { intr . channel . name } is already receiving HFC alerts. ' )
except AttributeError :
await intr . response . send_message ( f ' This channel is already receiving HFC alerts. ' )
return
if server_id is not None and self . db . get_server ( server_id ) is None :
self . db . add_server ( server_id , ' he ' )
2023-10-22 23:41:11 +03:00
2023-10-10 22:21:19 +03:00
self . db . add_channel ( channel_id , server_id , ' he ' )
try :
await intr . response . send_message ( f ' Channel # { intr . channel . name } will now receive HFC alerts. ' )
except AttributeError :
await intr . response . send_message ( f ' This channel will now receive HFC alerts. ' )
2023-10-22 23:41:11 +03:00
ch = self . bot . get_channel ( channel_id )
2023-10-12 12:00:44 +03:00
try :
perms = ch . overwrites_for ( self . bot . user )
perms . update ( send_messages = True )
2023-10-12 12:55:26 +03:00
await ch . set_permissions ( target = ch . guild . me , overwrite = perms ,
reason = ' Update perms to allow bot to send messages in channel. ' )
2023-10-12 12:00:44 +03:00
except discord . errors . Forbidden as e :
2023-10-12 12:55:26 +03:00
await intr . followup . send (
f ' Could not allow bot to send messages to this channel! Please add the bot to this channel and allow it to send messages. \n '
f ' Error info: { e . __str__ ( ) } ' )
2023-10-22 23:41:11 +03:00
except AttributeError :
pass
2023-10-11 14:32:05 +03:00
2023-10-11 19:17:29 +03:00
@app_commands.command ( name = ' unregister ' ,
description = ' Stop a channel from receiving HFC alerts (Requires Manage Channels) ' )
2023-10-20 16:07:16 +03:00
async def unregister_channel ( self , intr : discord . Interaction , confirmation : str = None ) :
2023-10-22 23:41:11 +03:00
if not await Notificator . has_permission ( intr ) :
await intr . response . send_message ( ' Error: You are missing the Manage Channels permission. ' )
return
channel = self . get_matching_channel ( intr )
if channel is None :
try :
await intr . response . send_message ( f ' Channel # { intr . channel . name } is not yet receiving HFC alerts ' )
except AttributeError :
await intr . response . send_message ( f ' This channel is not yet receiving HFC alerts ' )
return
2023-10-10 22:21:19 +03:00
2023-10-20 13:18:05 +03:00
conf_str = intr . user . name
if confirmation is None :
2023-10-20 16:07:16 +03:00
await intr . response . send_message (
f ' Are you sure you want to unregister the channel? \n This action will also clear all related data. \n { md . b ( " Warning: " ) } this action cannot be reversed! \n Please type your username ( " { conf_str } " ) in the confirmation argument to confirm. ' )
2023-10-20 13:18:05 +03:00
return
if confirmation != conf_str :
await intr . response . send_message ( f ' Invalid confirmation string! ' )
return
2023-10-11 14:32:05 +03:00
await self . attempt_unregistration ( intr , channel )
2023-10-10 22:21:19 +03:00
2023-10-22 23:41:11 +03:00
async def attempt_unregistration ( self , intr , channel : db_access . Channel ) :
self . db . remove_channel ( channel . id )
2023-10-10 22:21:19 +03:00
try :
await intr . response . send_message ( f ' Channel # { intr . channel . name } will no longer receive HFC alerts ' )
except AttributeError :
await intr . response . send_message ( f ' This channel will no longer receive HFC alerts ' )
2023-10-12 01:10:59 +03:00
@app_commands.command ( name = ' latest ' ,
description = ' Get all alerts up to a certain time back (may be slightly outdated) ' )
2023-10-12 09:58:52 +03:00
@app_commands.describe ( time = ' Amount of time back ' ,
unit = " The unit of time, can be ' h ' (hors), ' m ' (minutes), or ' s ' (seconds) " ,
page = ' Results page ' )
2023-10-12 01:10:59 +03:00
async def latest_alerts ( self , intr : discord . Interaction , time : int , unit : str , page : int = 1 ) :
2023-10-12 09:58:52 +03:00
"""
Get all alerts up to a certain time back ( this may be slightly outdated )
: param intr : command
: param time : Amount of time back
: param unit : The unit of time , can be ' h ' ( hors ) , ' m ' ( minutes ) , or ' s ' ( seconds )
: param page : Results page
: return :
"""
2023-10-12 01:10:59 +03:00
units = [ ' h ' , ' hours ' , ' m ' , ' minutes ' , ' s ' , ' seconds ' ]
if unit not in units :
await intr . response . send_message ( f ' Invalid time unit, please use one of the following: \n '
2023-10-12 09:58:52 +03:00
f ' { " , " . join ( units ) } ' )
2023-10-12 01:10:59 +03:00
return
time_s = time
if unit in [ ' h ' , ' hours ' ] :
time_s * = 3600
elif unit in [ ' m ' , ' minutes ' ] :
time_s * = 60
if time_s > 86400 :
await intr . response . send_message ( ' You can currently only view history up to 1 day back. \n '
f ' Please use the { md . u ( md . hl ( " Home Front Command Website " , " https://www.oref.org.il/ " ) ) } to view alerts further back ' )
return
page_number = page - 1
alert_count = 20
try :
history_page = Notificator . get_alert_history_page ( time_s , page_number , alert_count )
except requests . exceptions . Timeout :
await intr . response . send_message ( ' Request timed out. ' )
return
2023-10-12 09:58:52 +03:00
except ValueError as e :
await intr . response . send_message ( e . __str__ ( ) )
2023-10-12 01:10:59 +03:00
return
if history_page == ' ' :
history_page = ' No results found. '
view = self . hfc_button_view ( )
await intr . response . send_message ( history_page ,
view = view )
@staticmethod
2023-10-12 09:58:52 +03:00
def get_alert_history_page ( time_back_amount : int , page_number : int , alerts_in_page : int ) - > str :
2023-10-12 10:10:30 +03:00
"""
max_page is EXCLUSIVE !
: param time_back_amount : amount of time back
: param page_number : the page number ( starting at 0 )
: param alerts_in_page : The number of alerts in one page
: return : page as str
"""
2023-10-27 17:43:12 +03:00
2023-10-27 17:39:08 +03:00
alert_history = AlertReqs ( ) . request_history_json ( )
2023-10-12 01:10:59 +03:00
current_time = datetime . datetime . now ( )
2023-10-12 09:58:52 +03:00
time_back = datetime . timedelta ( seconds = time_back_amount )
2023-10-12 01:10:59 +03:00
alert_counter = 0
for alert in alert_history :
# This can be merged with the other loop to optimize performance.
# Especially considering Python is a slow language.
# Too bad!
alert_date = datetime . datetime . strptime ( alert [ " alertDate " ] , " % Y- % m- %d % H: % M: % S " )
if abs ( current_time - alert_date ) > time_back :
break
alert_counter + = 1
max_page = alert_counter / / alerts_in_page
if alert_counter % alerts_in_page != 0 :
max_page + = 1
2023-10-12 09:58:52 +03:00
if time_back_amount < = 0 :
raise ValueError ( " Time can ' t be lower than 1. " )
2023-10-12 01:10:59 +03:00
2023-10-12 10:10:30 +03:00
if max_page == 0 :
raise ValueError ( " No results found. " )
if page_number > = max_page :
raise ValueError ( " Page number is too high. " )
2023-10-12 09:58:52 +03:00
if page_number < 0 :
2023-10-12 10:10:30 +03:00
raise ValueError ( " Page number is too low. " )
2023-10-12 09:58:52 +03:00
2023-10-12 12:55:26 +03:00
page_info = f ' Page { page_number + 1 } / { alert_counter / / alerts_in_page + 1 } \n \n '
2023-10-12 09:58:52 +03:00
ret_str = ' '
2023-10-12 01:16:55 +03:00
2023-10-12 01:10:59 +03:00
for alert in alert_history [ ( page_number * alerts_in_page ) : ( ( page_number + 1 ) * alerts_in_page ) ] :
alert_date = datetime . datetime . strptime ( alert [ " alertDate " ] , " % Y- % m- %d % H: % M: % S " )
if abs ( current_time - alert_date ) > time_back :
break
ret_str + = f ' התראה ב { md . b ( alert [ " data " ] ) } \n ' \
f ' { md . u ( alert [ " title " ] ) } \n ' \
f ' בשעה { alert [ " alertDate " ] } \n \n '
2023-10-12 09:58:52 +03:00
if ret_str == ' ' :
ret_str = ' No results found '
else :
ret_str = page_info + ret_str
2023-10-12 01:10:59 +03:00
return ret_str
2023-10-10 22:21:19 +03:00
@app_commands.command ( name = ' about ' , description = ' Info about the bot ' )
async def about_bot ( self , intr : discord . Interaction ) :
e = discord . Embed ( color = discord . Color . orange ( ) )
e . title = ' Home Front Command Notificator '
e . description = ' A bot to send Discord messages for HFC alerts '
e . add_field ( name = ' Important info! ' ,
value = f ' This bot is { md . b ( " unofficial " ) } and is not related to the Home Front Command. Please do not rely on this alone. ' ,
inline = False )
e . add_field ( name = ' What is this? ' ,
value = ' This is a bot that connects to the HFC \' s servers and sends real-time notifications about alerts in Israel. ' ,
inline = False )
e . add_field ( name = ' Setup ' ,
2023-10-11 14:32:05 +03:00
value = ' Just invite the bot to a server (see Links below), and /register a channel to start receiving notifications. \n '
' Alternatively, you can /register a DM directly with the bot. \n '
2023-10-12 18:31:25 +03:00
' Please do note that the main instance of the bot is hosted on a private machine, so it may be a bit slow. \n '
2023-10-11 19:17:29 +03:00
' Feel free to host your own instance! ' ,
2023-10-10 22:21:19 +03:00
inline = False )
e . add_field ( name = ' Can I host it? ' ,
2023-10-11 13:07:28 +03:00
value = ' Yes! Everything is available in the GitHub repository. \n More info on the project \' s README page (See Links below). ' ,
2023-10-10 22:21:19 +03:00
inline = False )
e . add_field ( name = ' Links ' ,
2023-10-11 19:17:29 +03:00
value = md . bq ( f ' { md . hl ( " GitHub " , " https://github.com/GaMeNu/HFCNotificator " ) } \n '
f ' { md . hl ( " Official Bot Invite Link " , " https://discord.com/api/oauth2/authorize?client_id=1160344131067977738&permissions=0&scope=applications.commands % 20bot " ) } \n '
f ' { md . hl ( " HFC Website " , " https://www.oref.org.il/ " ) } \n '
2023-10-25 21:16:31 +03:00
f ' { md . hl ( " Bot Profile (for DMs) " , " https://discord.com/users/1160344131067977738 " ) } \n '
f ' { md . hl ( " Support Server " , " https://discord.gg/K3E4a5ekNy " ) } ' ) ,
2023-10-10 22:21:19 +03:00
inline = True )
2023-10-11 19:17:29 +03:00
e . add_field ( name = ' Created by ' , value = md . bq ( ' GaMeNu (@gamenu) \n '
' Yrrad8 ' ) ,
inline = True )
2023-10-10 22:21:19 +03:00
hfc_button = discord . ui . Button (
style = discord . ButtonStyle . link ,
label = ' HFC Website ' ,
url = ' https://www.oref.org.il '
)
gh_button = discord . ui . Button (
style = discord . ButtonStyle . link ,
label = ' GitHub Repository ' ,
url = ' https://github.com/GaMeNu/HFCNotificator '
)
view = discord . ui . View ( )
view . add_item ( hfc_button )
view . add_item ( gh_button )
await intr . response . send_message ( embed = e , view = view )
2023-10-13 17:38:49 +03:00
@app_commands.command ( name = ' send_alert ' , description = ' Send a custom alert (available to bot author only) ' )
2023-10-22 23:41:11 +03:00
@app_commands.describe ( title = ' Alert title ' ,
desc = ' Alert description ' ,
districts = ' Active alert districts ' ,
cat = ' Alert category ' )
2023-10-13 17:38:49 +03:00
async def test_alert ( self ,
intr : discord . Interaction ,
title : str = ' בדיקת מערכת שליחת התראות ' ,
desc : str = ' התעלמו מהתראה זו ' ,
2023-10-20 16:07:16 +03:00
districts : str = ' בדיקה ' ,
cat : int = 99 ) :
2023-10-22 23:41:19 +03:00
"""
A function to send a test alert
: param intr : Command interaction from discord
: param title : Title of the alert
: param desc : Description of the alert
: param districts : Districts of the alert
: param cat : Category of the alert
: return :
"""
2023-10-10 22:21:19 +03:00
if intr . user . id != AUTHOR_ID :
await intr . response . send_message ( ' No access. ' )
return
await intr . response . send_message ( ' Sending test alert... ' )
2023-10-13 17:38:49 +03:00
districts_ls = [ word . strip ( ) for word in districts . split ( ' , ' ) ]
2023-10-10 22:21:19 +03:00
await self . send_new_alert ( {
" id " : " 133413211330000000 " ,
2023-10-20 16:07:16 +03:00
" cat " : str ( cat ) ,
2023-10-13 17:38:49 +03:00
" title " : title ,
2023-10-13 17:45:21 +03:00
" data " : districts_ls ,
2023-10-13 17:38:49 +03:00
" desc " : desc
2023-10-13 17:45:21 +03:00
} , districts_ls )
2023-10-13 17:38:49 +03:00
2023-10-20 01:20:24 +03:00
@staticmethod
def locations_page ( data_list : list , page : int , res_in_page : int = 50 ) - > str :
"""
Page starts at 0
max_page is EXCLUSIVE
: param data_list : custom data list to get page info of
: param page : District page
: param res_in_page : Amount of districts to put in one pages
: return :
"""
dist_ls = data_list
dist_len = len ( dist_ls )
if dist_len == 0 :
return ' No results found. '
max_page = dist_len / / res_in_page
if dist_len % res_in_page != 0 :
max_page + = 1
if page > = max_page :
raise ValueError ( ' Page number is too high. ' )
if page < 0 :
raise ValueError ( ' Page number is too low. ' )
2023-10-22 00:28:07 +03:00
page_content = f ' Page { md . b ( f " { page + 1 } / { max_page } " ) } \n \n '
2023-10-20 01:20:24 +03:00
start_i = page * res_in_page
end_i = min ( start_i + res_in_page , dist_len )
for district in dist_ls [ start_i : end_i ] :
page_content + = f ' { district [ 0 ] } - { district [ 1 ] } \n '
return page_content
2023-10-25 21:16:31 +03:00
@location_group.command ( name = ' list ' , description = ' List all available locations, by IDs and names. Sorted alphabetically ' )
@app_commands.describe ( search = ' Search tokens, separated by spaces ' )
async def locations_list ( self , intr : discord . Interaction , search : str | None = None , page : int = 1 ) :
# decide the search_results
if search is not None :
search_results = self . db . search_districts ( * re . split ( r " \ s+ " , search ) )
else :
search_results = self . db . get_all_districts ( )
2023-10-20 01:20:24 +03:00
try :
2023-10-25 21:16:31 +03:00
# Turn into a display-able page
page = self . locations_page ( sorted ( search_results , key = lambda tup : tup [ 1 ] ) , page - 1 )
2023-10-20 01:20:24 +03:00
except ValueError as e :
await intr . response . send_message ( e . __str__ ( ) )
return
if len ( page ) > 2000 :
2023-10-20 16:07:16 +03:00
await intr . response . send_message (
' Page content exceeds character limit. \n Please contact the bot authors with the command you \' ve tried to run. ' )
2023-10-20 01:20:24 +03:00
return
await intr . response . send_message ( page )
@location_group.command ( name = ' add ' , description = ' Add a location(s) to the location list ' )
@app_commands.describe ( locations = ' A list of comma-separated Area IDs ' )
async def location_add ( self , intr : discord . Interaction , locations : str ) :
2023-10-22 14:03:07 +03:00
2023-10-20 13:18:05 +03:00
if not await self . has_permission ( intr ) :
2023-10-22 23:41:11 +03:00
await intr . response . send_message ( ' Error: You are missing the Manage Channels permission. ' )
2023-10-20 01:20:24 +03:00
return
2023-10-22 23:41:11 +03:00
channel = self . get_matching_channel ( intr )
if channel . id is None :
2023-10-22 14:03:07 +03:00
await intr . response . send_message ( ' Could not find this channel. Are you sure it is registered? ' )
return
2023-10-20 01:20:24 +03:00
locations_ls = [ word . strip ( ) for word in locations . split ( ' , ' ) ]
location_ids = [ ]
for location in locations_ls :
try :
location_ids . append ( int ( location ) )
except ValueError :
await intr . response . send_message ( f ' District ID { md . b ( f " { location } " ) } is not a valid district ID. ' )
return
2023-10-20 13:18:05 +03:00
try :
2023-10-22 23:41:11 +03:00
self . db . add_channel_districts ( channel . id , location_ids )
2023-10-20 13:18:05 +03:00
except ValueError as e :
await intr . response . send_message ( e . __str__ ( ) )
return
await intr . response . send_message ( ' Successfully added all IDs ' )
2023-10-20 01:20:24 +03:00
@location_group.command ( name = ' remove ' , description = ' Remove a location(s) to the location list ' )
@app_commands.describe ( locations = ' A list of comma-separated Area IDs ' )
async def location_remove ( self , intr : discord . Interaction , locations : str ) :
2023-10-20 13:18:05 +03:00
if not await self . has_permission ( intr ) :
2023-10-22 23:41:11 +03:00
await intr . response . send_message ( ' Error: You are missing the Manage Channels permission. ' )
2023-10-20 01:20:24 +03:00
return
2023-10-22 23:41:11 +03:00
channel = self . get_matching_channel ( intr )
if channel is None :
2023-10-22 14:03:07 +03:00
await intr . response . send_message ( ' Could not find this channel. Are you sure it is registered? ' )
return
2023-10-20 01:20:24 +03:00
locations_ls = [ word . strip ( ) for word in locations . split ( ' , ' ) ]
location_ids = [ ]
for location in locations_ls :
try :
location_ids . append ( int ( location ) )
except ValueError :
await intr . response . send_message ( f ' District ID { md . b ( f " { location } " ) } is not a valid district ID. ' )
return
2023-10-22 23:41:11 +03:00
self . db . remove_channel_districts ( channel . id , location_ids )
2023-10-20 13:18:05 +03:00
await intr . response . send_message ( ' Successfully removed all IDs ' )
@location_group.command ( name = ' clear ' , description = ' Clear all registered locations (get alerts on all locations) ' )
async def location_clear ( self , intr : discord . Interaction , confirmation : str = None ) :
if not await self . has_permission ( intr ) :
2023-10-22 23:41:11 +03:00
await intr . response . send_message ( ' Error: You are missing the Manage Channels permission. ' )
2023-10-20 13:18:05 +03:00
return
2023-10-22 23:41:11 +03:00
channel = self . get_matching_channel ( intr )
if channel is None :
2023-10-22 14:03:07 +03:00
await intr . response . send_message ( ' Could not find this channel. Are you sure it is registered? ' )
return
2023-10-20 13:18:05 +03:00
2023-10-22 14:03:07 +03:00
conf_str = intr . user . name
2023-10-20 13:18:05 +03:00
if confirmation is None :
2023-10-20 16:07:16 +03:00
await intr . response . send_message (
f ' Are you sure you want to clear all registered locations? \n { md . b ( " Warning: " ) } this action cannot be reversed! \n Please type your username ( " { conf_str } " ) in the confirmation argument to confirm. ' )
2023-10-20 13:18:05 +03:00
return
if confirmation != conf_str :
await intr . response . send_message ( f ' Invalid confirmation string! ' )
return
self . db . clear_channel_districts ( channel . id )
2023-10-20 16:07:16 +03:00
await intr . response . send_message (
f ' Cleared all registered locations. \n Channel will now receive alerts from every location. ' )
2023-10-20 01:20:24 +03:00
2023-10-25 21:16:31 +03:00
@location_group.command ( name = ' registered ' , description = ' List all locations registered to this channel, by IDs and names. Sorted alphabetically ' )
@app_commands.describe ( search = ' Search tokens, separated by spaces ' )
async def location_registered ( self , intr : discord . Interaction , search : str | None = None , page : int = 1 ) :
2023-10-20 01:20:24 +03:00
2023-10-22 23:41:11 +03:00
channel = self . get_matching_channel ( intr )
if channel is None :
2023-10-22 14:03:07 +03:00
await intr . response . send_message ( ' Could not find this channel. Are you sure it is registered? ' )
return
2023-10-20 01:20:24 +03:00
2023-10-25 21:16:31 +03:00
if search is None :
search_results = [ dist . to_tuple ( ) for dist in self . db . district_ids_to_districts ( * self . db . get_channel_district_ids ( channel . id ) ) ]
else :
search_results = [ dist . to_tuple ( ) for dist in self . db . search_channel_districts ( channel . id , * re . split ( r " \ s+ " , search ) ) ]
districts = sorted ( search_results , key = lambda tup : tup [ 1 ] )
2023-10-20 01:20:24 +03:00
2023-10-20 16:07:16 +03:00
page = self . locations_page ( districts , page - 1 )
2023-10-20 01:20:24 +03:00
if len ( page ) > 2000 :
await intr . response . send_message (
' Page content exceeds character limit. \n Please contact the bot authors with the command you \' ve tried to run. ' )
return
await intr . response . send_message ( page )