HFCNotificator v2.0.0

Changes from testing builds:
- Removed commit history (for... reasons)
~ Fixed few bugs
- Removed Herobrine

Changes from v1.1.1
+ Rewritten the whole thing
+ Now using MySQL
+ Now using the concept of "organization"
This commit is contained in:
GaMeNu 2023-10-10 22:21:19 +03:00
commit 69e26a1fbc
6 changed files with 694 additions and 0 deletions

263
cog_notificator.py Normal file
View File

@ -0,0 +1,263 @@
import datetime
import json
import os
import requests
import discord
from dotenv import load_dotenv
from discord.ext import commands, tasks
from discord import app_commands
import logging
from db_access import DBAccess
from markdown import md
load_dotenv()
AUTHOR_ID = int(os.getenv('AUTHOR_ID'))
class AlertReqs:
@staticmethod
def request_alert_json() -> dict | None:
req = requests.get('https://www.oref.org.il/WarningMessages/alert/alerts.json', headers={
'Referer': 'https://www.oref.org.il/',
'X-Requested-With': 'XMLHttpRequest',
'Client': 'HFC Notificator bot for Discord'
})
decoded = req.content.decode('utf-8-sig')
if decoded is None or len(decoded) < 3: # Why does it get a '\r\n' wtf
ret_dict = None
else:
ret_dict = json.loads(decoded)
return ret_dict
@staticmethod
def request_history_json() -> dict | None:
req = requests.get('https://www.oref.org.il/WarningMessages/History/AlertsHistory.json')
content = req.text
return json.loads(content)
class Notificator(commands.Cog):
districts: list[dict] = json.loads(requests.get('https://www.oref.org.il//Shared/Ajax/GetDistricts.aspx').text)
def __init__(self, bot: commands.Bot, handler: logging.Handler):
self.bot = bot
self.log = logging.Logger('Notificator')
self.log.addHandler(handler)
self.db = DBAccess()
self.active_districts = []
if not self.check_for_updates.is_running():
self.check_for_updates.start()
@staticmethod
async def setup(bot: commands.Bot, handler: logging.Handler):
notf = Notificator(bot, handler)
if bot.get_cog('Notificator') is None:
await bot.add_cog(notf)
return notf
@commands.Cog.listener()
async def on_ready(self):
if self.check_for_updates.is_running():
return
self.check_for_updates.start()
@tasks.loop(seconds=1)
async def check_for_updates(self):
current_alert: dict = AlertReqs.request_alert_json()
self.log.debug(f'Alert response: {current_alert}')
if current_alert is None:
return
data: list[str] = current_alert["data"]
new_districts: list[str] = []
for district in data:
if district in self.active_districts:
continue
new_districts.append(district)
if len(new_districts) == 0:
return
await self.send_new_alert(current_alert, new_districts)
self.active_districts = data
@staticmethod
def generate_alert_embed(alert_id: int, district: str, arrival_time: int, time: str, lang: str) -> discord.Embed:
e = discord.Embed(color=discord.Color.from_str('#FF0000'))
e.title = f'התראה ב{district}'
if alert_id == 1:
e.add_field(name=district, value='יריית רקטות וטילים', inline=False)
e.add_field(name='זמן מיגון', value=f'{arrival_time} שניות', inline=False)
e.add_field(name='נכון ל', value=time, inline=False)
return e
@staticmethod
def hfc_button_view() -> discord.ui.View:
button = discord.ui.Button(
style=discord.ButtonStyle.link,
label='אתר פיקוד העורף',
url='https://www.oref.org.il'
)
view = discord.ui.View()
view.add_item(button)
return view
async def send_new_alert(self, alert_data: dict, districts: list[str]):
alert_history = AlertReqs.request_history_json()[0:100]
embed_ls: list[discord.Embed] = []
embed_ls_ls: list[list[discord.Embed]] = []
for district in districts:
district_data = self.db.get_district_by_name(district)
alert_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for alert in alert_history:
if alert["data"] == district:
alert_time = alert["alertDate"]
break
embed_ls.append(Notificator.generate_alert_embed(int(alert_data["cat"]), district, district_data.migun_time,
alert_time.__str__(), 'he'))
if len(embed_ls) == 10:
embed_ls_ls.append(embed_ls)
embed_ls = []
if len(embed_ls) > 0:
embed_ls_ls.append(embed_ls)
for channel in self.db.channel_iterator():
if channel.server_id is not None:
dc_ch = self.bot.get_channel(channel.id)
else:
dc_ch = self.bot.get_user(channel.id)
for embed_list in embed_ls_ls:
if dc_ch is None:
continue
await dc_ch.send(embeds=embed_list, view=self.hfc_button_view())
@app_commands.command(name='register', description='Register a channel to receive HFC alerts')
async def register_channel(self, intr: discord.Interaction):
channel_id = intr.channel_id
if intr.channel.guild is not None:
server_id = intr.channel.guild.id
else:
server_id = None
channel_id = intr.user.id
if self.db.get_channel(channel_id) is not None:
try:
await intr.response.send_message(f'Channel #{intr.channel.name} is already receiving HFC alerts.')
except AttributeError:
await intr.response.send_message(f'This channel is already receiving HFC alerts.')
return
if server_id is not None and self.db.get_server(server_id) is None:
self.db.add_server(server_id, 'he')
self.db.add_channel(channel_id, server_id, 'he')
try:
await intr.response.send_message(f'Channel #{intr.channel.name} will now receive HFC alerts.')
except AttributeError:
await intr.response.send_message(f'This channel will now receive HFC alerts.')
@app_commands.command(name='unregister', description='Stop a channel from receiving HFC alerts')
async def unregister_channel(self, intr: discord.Interaction):
channel_id = intr.channel_id
channel = self.db.get_channel(channel_id)
if channel is None:
channel = self.db.get_channel(intr.user.id)
if channel is None:
try:
await intr.response.send_message(f'Channel #{intr.channel.name} is not yet receiving HFC alerts')
except AttributeError:
await intr.response.send_message(f'This channel is not yet receiving HFC alerts')
return
if channel.server_id is not None:
self.db.remove_channel(channel_id)
else:
self.db.remove_channel(intr.user.id)
try:
await intr.response.send_message(f'Channel #{intr.channel.name} will no longer receive HFC alerts')
except AttributeError:
await intr.response.send_message(f'This channel will no longer receive HFC alerts')
@app_commands.command(name='about', description='Info about the bot')
async def about_bot(self, intr: discord.Interaction):
e = discord.Embed(color=discord.Color.orange())
e.title = 'Home Front Command Notificator'
e.description = 'A bot to send Discord messages for HFC alerts'
e.add_field(name='Important info!',
value=f'This bot is {md.b("unofficial")} and is not related to the Home Front Command. Please do not rely on this alone.',
inline=False)
e.add_field(name='What is this?',
value='This is a bot that connects to the HFC\'s servers and sends real-time notifications about alerts in Israel.',
inline=False)
e.add_field(name='Setup',
value='Just invite the bot to a server (see \"Links\" below), and /register a channel to start receiving notifications.',
inline=False)
e.add_field(name='Can I host it?',
value='Yes! This bot was made with self-hosting in mind.\nMore info on the project\'s README page on GitHub',
inline=False)
e.add_field(name='Links',
value=f'{md.hl("GitHub", "https://github.com/GaMeNu/HFCNotificator")}\n'
f'{md.hl("Official Bot Invite Link", "https://discord.com/api/oauth2/authorize?client_id=1160344131067977738&permissions=0&scope=applications.commands%20bot")}\n'
f'{md.hl("HFC Website", "https://www.oref.org.il/")}\n'
f'{md.hl("Bot Profile (for DMs)", "https://discord.com/users/1160344131067977738")}',
inline=True)
e.add_field(name='Created by', value='GaMeNu (@gamenu)\nYrrad8', inline=True)
hfc_button = discord.ui.Button(
style=discord.ButtonStyle.link,
label='HFC Website',
url='https://www.oref.org.il'
)
gh_button = discord.ui.Button(
style=discord.ButtonStyle.link,
label='GitHub Repository',
url='https://github.com/GaMeNu/HFCNotificator'
)
view = discord.ui.View()
view.add_item(hfc_button)
view.add_item(gh_button)
await intr.response.send_message(embed=e, view=view)
@app_commands.command(name='test_alert', description='Send a test alert (available to bot author only)')
async def test_alert(self, intr: discord.Interaction):
if intr.user.id != AUTHOR_ID:
await intr.response.send_message('No access.')
return
await intr.response.send_message('Sending test alert...')
await self.send_new_alert({
"id": "133413211330000000",
"cat": "1",
"title": "ירי רקטות וטילים",
"data": [
"בדיקה"
],
"desc": "היכנסו למרחב המוגן ושהו בו 10 דקות"
}, ['בדיקה'])

155
create_db.py Normal file
View File

@ -0,0 +1,155 @@
import json
import os
import time
import mysql.connector as mysql
import requests
from dotenv import load_dotenv
from db_access import DBAccess
generate_script = """
-- MySQL Workbench Forward Engineering
SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;
SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION';
-- -----------------------------------------------------
-- Schema hfc_db
-- -----------------------------------------------------
DROP SCHEMA IF EXISTS `hfc_db` ;
-- -----------------------------------------------------
-- Schema hfc_db
-- -----------------------------------------------------
CREATE SCHEMA IF NOT EXISTS `hfc_db` DEFAULT CHARACTER SET utf8 ;
USE `hfc_db` ;
-- -----------------------------------------------------
-- Table `hfc_db`.`areas`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `hfc_db`.`areas` ;
CREATE TABLE IF NOT EXISTS `hfc_db`.`areas` (
`area_id` INT NOT NULL,
`area_name` VARCHAR(64) NOT NULL,
PRIMARY KEY (`area_id`),
UNIQUE INDEX `area_id_UNIQUE` (`area_id` ASC) VISIBLE)
ENGINE = InnoDB;
-- -----------------------------------------------------
-- Table `hfc_db`.`districts`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `hfc_db`.`districts` ;
CREATE TABLE IF NOT EXISTS `hfc_db`.`districts` (
`district_id` INT NOT NULL,
`district_name` VARCHAR(64) NOT NULL,
`area_id` INT NOT NULL,
`migun_time` INT NULL,
PRIMARY KEY (`district_id`),
UNIQUE INDEX `area_code_UNIQUE` (`district_id` ASC) VISIBLE,
INDEX `area_id_idx` (`area_id` ASC) VISIBLE,
CONSTRAINT `area_id`
FOREIGN KEY (`area_id`)
REFERENCES `hfc_db`.`areas` (`area_id`)
ON DELETE NO ACTION
ON UPDATE NO ACTION)
ENGINE = InnoDB;
-- -----------------------------------------------------
-- Table `hfc_db`.`servers`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `hfc_db`.`servers` ;
CREATE TABLE IF NOT EXISTS `hfc_db`.`servers` (
`server_id` BIGINT(8) UNSIGNED NOT NULL,
`server_lang` VARCHAR(15) NOT NULL,
PRIMARY KEY (`server_id`),
UNIQUE INDEX `idservers_UNIQUE` (`server_id` ASC) VISIBLE)
ENGINE = InnoDB;
-- -----------------------------------------------------
-- Table `hfc_db`.`channels`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `hfc_db`.`channels` ;
CREATE TABLE IF NOT EXISTS `hfc_db`.`channels` (
`channel_id` BIGINT(8) UNSIGNED NOT NULL,
`server_id` BIGINT(8) UNSIGNED NULL,
`channel_lang` VARCHAR(15) NOT NULL,
PRIMARY KEY (`channel_id`),
UNIQUE INDEX `channel_id_UNIQUE` (`channel_id` ASC) VISIBLE,
CONSTRAINT `server_id`
FOREIGN KEY (`server_id`)
REFERENCES `hfc_db`.`servers` (`server_id`)
ON DELETE NO ACTION
ON UPDATE NO ACTION)
ENGINE = InnoDB;
USE `hfc_db`;
DELIMITER $$
USE `hfc_db`$$
DROP TRIGGER IF EXISTS `hfc_db`.`channels_BEFORE_INSERT` $$
USE `hfc_db`$$
CREATE DEFINER = CURRENT_USER TRIGGER `hfc_db`.`channels_BEFORE_INSERT` BEFORE INSERT ON `channels` FOR EACH ROW
BEGIN
DECLARE server_lang VARCHAR(15);
-- Get the server_lang for the corresponding server_id
SELECT server_lang INTO server_lang
FROM servers
WHERE server_id = NEW.server_id;
-- Set channel_lang to server_lang if it's NULL
IF NEW.channel_lang IS NULL THEN
SET NEW.channel_lang = IFNULL(server_lang, 'he');
END IF;
END$$
DELIMITER ;
SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;
"""
load_dotenv()
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
db = mysql.connect(
host='localhost',
user=DB_USERNAME,
password=DB_PASSWORD
)
crsr = db.cursor()
crsr.execute(generate_script)
crsr.fetchall()
print(crsr.warnings)
db.close()
districts: list[dict] = json.loads(requests.get('https://www.oref.org.il//Shared/Ajax/GetDistricts.aspx?lang=he').text)
db = DBAccess()
for district in districts:
db.add_district(
district["id"],
district["label"],
district["areaid"],
district["areaname"],
district["migun_time"]
)
db.add_district(99999, 'בדיקה', 999, 'בדיקה', 600)
db.connection.commit()

173
db_access.py Normal file
View File

@ -0,0 +1,173 @@
import os
from dotenv import load_dotenv
from mysql import connector as mysql
load_dotenv()
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
class Area:
def __init__(self, id: int, name: str):
self.id = id
self.name = name
class District:
def __init__(self, id: int, name: str, area_id: int, migun_time: int):
self.id = id
self.name = name
self.area_id = area_id
self.migun_time = migun_time
class Channel:
def __init__(self, id: int, server_id: int, channel_lang: str):
self.id = id
self.server_id = server_id
self.channel_lang = channel_lang
class Server:
def __init__(self, id: int, lang: str):
self.id = id
self.lang = lang
class ChannelIterator:
def __init__(self, cursor: mysql.connection.MySQLCursor):
self.cursor = cursor
def __iter__(self):
return self
def __next__(self) -> Channel:
res = self.cursor.fetchone()
if res is None:
raise StopIteration
return Channel(res[0], res[1], res[2])
class DBAccess:
def __init__(self):
self.connection = mysql.connect(
host='localhost',
user=DB_USERNAME,
password=DB_PASSWORD,
database='hfc_db'
)
def add_area(self, area_id: int, area_name: str):
with self.connection.cursor() as crsr:
crsr.execute(f'REPLACE INTO areas (area_id, area_name) VALUES (%s, %s)', (area_id, area_name))
self.connection.commit()
def add_district(self, district_id: int, district_name: str, area_id: int, area_name: str, migun_time: int):
with self.connection.cursor() as crsr:
crsr.execute(f'SELECT * FROM areas WHERE area_id=%s', (area_id,))
crsr.fetchall()
if crsr.rowcount == 0:
self.add_area(area_id, area_name)
crsr.execute(f'REPLACE INTO districts (district_id, district_name, area_id, migun_time) VALUES (%s, %s, %s, %s)', (district_id, district_name, area_id, migun_time))
self.connection.commit()
def add_server(self, server_id: int, server_lang: str):
with self.connection.cursor() as crsr:
crsr.execute(f'INSERT IGNORE INTO servers (server_id, server_lang) VALUES (%s, %s)', (server_id, server_lang))
self.connection.commit()
def add_channel(self, channel_id: int, server_id: int | None, channel_lang: str | None):
with self.connection.cursor() as crsr:
if server_id is not None:
self.add_server(server_id, channel_lang)
crsr.execute(f'REPLACE INTO channels (channel_id, server_id, channel_lang) VALUES (%s, %s, %s)', (channel_id, server_id, channel_lang))
self.connection.commit()
def get_area(self, id: int) -> Area | None:
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM areas WHERE area_id=%s', (id,))
res = crsr.fetchone()
if res is not None:
return Area(res[0], res[1])
else:
return None
def get_district(self, id: int) -> District | None:
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM districts WHERE district_id=%s', (id,))
res = crsr.fetchone()
if res is not None:
return District(res[0], res[1], res[2], res[3])
else:
return None
def get_district_area(self, district: District) -> Area | None:
return self.get_area(district.area_id)
def get_server(self, id: int) -> Server | None:
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM servers WHERE server_id=%s', (id,))
res = crsr.fetchone()
if res is not None:
return Server(res[0], res[1])
else:
return None
def get_channel(self, id: int) -> Channel | None:
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM channels WHERE channel_id=%s', (id,))
res = crsr.fetchone()
if res is not None:
return Channel(res[0], res[1], res[2])
else:
return None
def get_channel_server(self, channel: Channel) -> Server:
return self.get_server(channel.server_id)
def channel_iterator(self):
crsr = self.connection.cursor()
crsr.execute('SELECT * FROM channels')
return ChannelIterator(crsr)
def remove_channel(self, id: int):
print(id)
with self.connection.cursor() as crsr:
crsr.execute('DELETE FROM channels WHERE channel_id=%s', (id,))
self.connection.commit()
def remove_server(self, id: int):
with self.connection.cursor() as crsr:
crsr.execute('DELETE FROM channels WHERE server_id=%s', (id,))
crsr.execute('DELETE FROM servers WHERE server_id=%s', (id,))
self.connection.commit()
def remove_district(self, id: int):
with self.connection.cursor() as crsr:
crsr.execute('DELETE FROM districts WHERE district_id=%s', (id,))
self.connection.commit()
def remove_area(self, id: int):
with self.connection.cursor() as crsr:
crsr.execute('DELETE FROM districts WHERE area_id=%s', (id,))
crsr.execute('DELETE FROM areas WHERE area_id=%s', (id,))
self.connection.commit()
def get_district_by_name(self, name: str):
with self.connection.cursor() as crsr:
crsr.execute('SELECT * FROM districts WHERE district_name=%s', (name,))
res = crsr.fetchone()
crsr.fetchall()
return District(res[0], res[1], res[2], res[3])

39
main.py Normal file
View File

@ -0,0 +1,39 @@
import discord
from discord.ext import commands, tasks
from dotenv import load_dotenv
import logging
import os
from cog_notificator import Notificator
# Set up constants and logger
logger = logging.Logger('General Log')
load_dotenv()
TOKEN = os.getenv('TOKEN')
AUTHOR_ID = int(os.getenv('AUTHOR_ID'))
handler = logging.StreamHandler()
bot = commands.Bot('!', intents=discord.Intents.all())
tree = bot.tree
@bot.event
async def on_message(msg: discord.Message):
# Special command to sync messages
if msg.content == '/sync_cmds' and msg.author.id == AUTHOR_ID:
print('syncing')
await msg.reply('Syncing...', delete_after=3)
await Notificator.setup(bot, handler)
await tree.sync()
print('synced')
await msg.reply('Synced!', delete_after=3)
@bot.event
async def on_ready():
await Notificator.setup(bot, handler)
await bot.change_presence(activity=discord.Activity(name='for HFC alerts.', type=discord.ActivityType.watching))
bot.run(token=TOKEN, log_handler=handler)

61
markdown.py Normal file
View File

@ -0,0 +1,61 @@
class md:
h1_ = '#'
h2_ = '##'
h3_ = '###'
q_ = '>'
bq_ = '>>>'
b_ = '**'
i_ = '_'
u_ = '__'
s_ = '~~'
c_ = '`'
bc_ = '```'
@staticmethod
def h1(text: str):
return f'{md.h1_} {text}'
@staticmethod
def h2(text: str):
return f'{md.h2_} {text}'
@staticmethod
def h3(text: str):
return f'{md.h3_} {text}'
@staticmethod
def q(text: str):
return f'{md.q_} {text}'
@staticmethod
def bq(text: str):
return f'{md.bq_} {text}'
@staticmethod
def b(text: str):
return f'{md.b_}{text}{md.b_}'
@staticmethod
def i(text: str):
return f'{md.i_}{text}{md.i_}'
@staticmethod
def u(text: str):
return f'{md.u_}{text}{md.u_}'
@staticmethod
def s(text: str):
return f'{md.s_}{text}{md.s_}'
@staticmethod
def c(text: str):
return f'{md.c_}{text}{md.c_}'
@staticmethod
def bc(text: str, lang: str = ''):
return f'{md.bc_}{lang}\n{text}\n{md.bc_}'
@staticmethod
def hl(label: str, link: str):
return f'[{label}]({link})'

3
translations/he_IL.json Normal file
View File

@ -0,0 +1,3 @@
{
"alert_at": "התראה ב",
}