Skip to content
Snippets Groups Projects
Commit 86a49d13 authored by fmk17's avatar fmk17
Browse files

Add scheduling for notifications

parent d0d4b43f
Branches
No related tags found
No related merge requests found
.env
__pycache__
*.log
db
......@@ -6,8 +6,8 @@ import requests
import re
import itertools
Day = namedtuple('Day', 'date, date_raw, meals')
Meal = namedtuple('Meal', 'name, items')
Day = namedtuple('Day', 'date, date_raw, menus')
Menu = namedtuple('Menu', 'meal, items')
class Location(Enum):
CENTRAL = 'https://pra.ufpr.br/ru/ru-central/'
......@@ -15,10 +15,15 @@ class Location(Enum):
BOTANICO = 'https://pra.ufpr.br/ru/cardapio-ru-jardim-botanico/'
AGRARIAS = 'https://pra.ufpr.br/ru/cardapio-ru-agrarias/'
class Meal(Enum):
BREAKFAST = "CAFÉ DA MANHÃ"
LUNCH = "ALMOÇO"
DINNER = "JANTAR"
cached_update_times = dict()
cached_responses = dict()
def get_meals_by_days(location: Location):
def get_menus_by_days(location: Location):
global cached_responses
global cached_update_times
......@@ -40,11 +45,11 @@ def get_meals_by_days(location: Location):
if date_re is None: break
d, m, y = map(int, date_re.groups())
table_children = iter(table.select('td'))
meals = []
menus = []
for title_node, items in zip(table_children, table_children):
meals.append(Meal(name=title_node.text, items=list(items.stripped_strings)))
menus.append(Menu(meal=Meal(title_node.text), items=list(items.stripped_strings)))
days.append(
Day(date=date(y, m, d), date_raw=date_text, meals=meals)
Day(date=date(y, m, d), date_raw=date_text, menus=menus)
)
cached_responses[location] = days
......
from crawler import Location, Meal
from collections import namedtuple
import sqlite3
Schedule = namedtuple('Schedule', 'time, day_week, location, meal, user_id, created_at')
connection = sqlite3.connect("db", isolation_level=None, check_same_thread=False)
# Create tables
with connection:
connection.execute('''
CREATE TABLE IF NOT EXISTS schedule (
time TEXT NOT NULL,
day_week INT NOT NULL,
location TEXT NOT NULL,
meal TEXT NOT NULL,
user_id INT NOT NULL,
created_at DATETIME NOT NULL
)
''')
def get_schedules_for_user(user_id):
cur = connection.execute('''
SELECT time, day_week, location, meal, user_id, created_at
FROM schedule
WHERE user_id = ?
''', (user_id,))
rows = cur.fetchall()
return [Schedule(
row[0],
row[1],
Location[row[2]],
Meal[row[3]],
row[4],
row[5]
) for row in rows]
def insert_schedule(schedule):
connection.execute('''
INSERT INTO schedule
(time, day_week, location, meal, user_id, created_at)
VALUES
(?, ?, ?, ?, ?, ?)
''', (
schedule.time,
schedule.day_week,
schedule.location.name,
schedule.meal.name,
schedule.user_id,
schedule.created_at
))
def get_schedules_matching_time(time):
cur = connection.execute('''
SELECT time, day_week, location, meal, user_id, created_at
FROM schedule
WHERE time = ? and day_week = ?
''', (time.strftime('%H:%M'), time.weekday()))
rows = cur.fetchall()
return [Schedule(
row[0],
row[1],
Location[row[2]],
Meal[row[3]],
row[4],
row[5]
) for row in rows]
def delete_all_schedules_from_user(user_id):
connection.execute('''
DELETE FROM schedule
WHERE user_id = ?
''', (user_id,))
#!/usr/bin/env python3
from dotenv import load_dotenv
from telegram import Update, ReplyKeyboardMarkup
from telegram.ext import Updater, CommandHandler, CallbackContext
from crawler import get_meals_by_days, Location
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.constants import PARSEMODE_HTML
from telegram.ext import Updater, CommandHandler, CallbackContext, CallbackQueryHandler
from crawler import get_menus_by_days, Location, Meal
from database import get_schedules_matching_time, insert_schedule, get_schedules_for_user, Schedule, delete_all_schedules_from_user
from datetime import datetime, timedelta, date
from collections import defaultdict
from copy import deepcopy
from pprint import pformat
from dataclasses import dataclass, field
import json
import logging
import os
......@@ -10,35 +18,28 @@ load_dotenv()
file_handler = logging.FileHandler('bot.log')
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setLevel(logging.INFO)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s (%(module)s:%(funcName)s:%(lineno)d) %(message)s',
datefmt='%Y-%m-%mT%H:%M:%S',
datefmt='%Y-%m-%dT%H:%M:%S',
handlers=[file_handler, stream_handler])
logger = logging.getLogger("bot")
def start(update: Update, context: CallbackContext) -> None:
"""Sends explanation on how to use the bot."""
update.message.reply_html('''
Olá, eu sou o RU UFPR Bot, o robô de <a href="https://gitlab.c3sl.ufpr.br/caad/ru-bot-telegram">código aberto</a> mantido pelo <a href="https://caad.inf.ufpr.br/">CAAD (Centro Acadêmico Alexandre Direne)</a> que te mostra o cardápio do Restaurante Universitário da UFPR!
''', reply_markup=ReplyKeyboardMarkup(
[
[
"/agendar · Agendar envio de cardápio"
],
[
"/cardapio_central · Ver o cardápio do RU Central",
"/cardapio_poli · Ver o cardápio do RU Centro Politécnico",
],
[
"/cardapio_botanico · Ver o cardápio do RU Jardim Botânico",
"/cardapio_agrarias · Ver o cardápio do RU Agrárias",
],
]
))
update.message.reply_text('''
Olá, eu sou o RU UFPR Bot, o robô de <a href="https://gitlab.c3sl.ufpr.br/caad/ru-bot-telegram">código aberto</a> mantido pelo <a href="https://caad.inf.ufpr.br/">CAAD (Centro Acadêmico Alexandre Direne)</a> que te mostra o cardápio do Restaurante Universitário da UFPR! Aqui vão os meus comandos:
- /agendar · Configura notificações de cardápio
- /cardapio_central · Mostra o cardápio do RU Central
- /cardapio_poli · Mostra o cardápio do RU Centro Politécnico
- /cardapio_botanico · Mostra o cardápio do RU Jardim Botânico
- /cardapio_agrarias · Mostra o cardápio do RU Agrárias
''', parse_mode=PARSEMODE_HTML)
logger.info(f"User {update.effective_user.id} {update.effective_user.first_name} {update.effective_user.last_name} {update.effective_user.username} used /start")
COMMAND_TO_LOCATION = {
'cardapio_central': Location.CENTRAL,
......@@ -57,28 +58,229 @@ LOCATION_TO_HEADER = {
def cardapio(update: Update, context: CallbackContext) -> None:
command = next(iter(update.message.parse_entities(types=["bot_command"]).values()))[1:]
location = COMMAND_TO_LOCATION[command]
days, update_time = get_meals_by_days(location)
days, update_time = get_menus_by_days(location)
header = f'<b>{LOCATION_TO_HEADER[location]}</b>'
if len(days):
body = '\n\n'.join(
f'<b>{day.date_raw}</b>\n' + '\n'.join(
f' <b>{meal.name}</b>\n' + '\n'.join(f' {item}' for item in meal.items) for meal in day.meals
f' <b>{menu.meal.value}</b>\n' + '\n'.join(f' {item}' for item in menu.items) for menu in day.menus
) for day in days
)
else:
body = '<b>Cardápio indisponível</b>'
updated_on = f'<i>Atualizado às ' + update_time.strftime('%H:%M:%S') + ' de hoje</i>'
update.message.reply_html(header + '\n\n' + body + '\n\n' + updated_on)
update.message.reply_text(header + '\n\n' + body + '\n\n' + updated_on, parse_mode=PARSEMODE_HTML)
logger.info(f"User {update.effective_user.id} {update.effective_user.first_name} {update.effective_user.last_name} {update.effective_user.username} used /{command}")
DAY_WEEK_NUMBER_TO_STRING = {
0: "SEG", 1: "TER", 2: "QUA", 3: "QUI", 4: "SEX", 5: "SÁB", 6: "DOM"
}
DAY_WEEK_NUMBER_TO_LONG_STRING = {
0: "Segunda-feira", 1: "Terça-feira", 2: "Quarta-feira", 3: "Quinta-feira", 4: "Sexta-feira", 5: "Sábado", 6: "Domingo"
}
@dataclass
class AgendarAntesAbertura:
step: int
location: Location = None
days_week_by_meal: dict[Meal, set] = field(default_factory=dict)
class RemoveTodasNotificacoes:
pass
def agendar(user_id):
schedules = get_schedules_for_user(user_id)
keyboard = [
[InlineKeyboardButton('Agendar notificações para antes da abertura...', callback_data=AgendarAntesAbertura(-1))],
#[InlineKeyboardButton('Agendar notificações diárias...', callback_data="agendar_diario")],
#[InlineKeyboardButton('Agendar uma notificação individual...', callback_data="agendar_individual")],
]
if len(schedules):
keyboard += [
#[InlineKeyboardButton('Remover uma notificação individual...', callback_data="remover_agendamento_individual")],
[InlineKeyboardButton('Remover todas as notificações...', callback_data=RemoveTodasNotificacoes())],
]
keyboard = InlineKeyboardMarkup(keyboard)
if len(schedules):
schedules_by_location = defaultdict(list)
for schedule in schedules:
schedules_by_location[schedule.location].append(schedule)
day_week_by_time_meal_by_location = dict()
for key, schedules in schedules_by_location.items():
by_time_meal = defaultdict(set)
for schedule in schedules:
by_time_meal[(schedule.time, schedule.meal)].add(schedule.day_week)
day_week_by_time_meal_by_location[key] = by_time_meal
return (
f'<b>Notificações</b>\n\n' +
'\n\n'.join(
f'<b>{LOCATION_TO_HEADER[location]}</b>\n' +
'\n'.join(
f' {time} · {meal.value} · {", ".join(DAY_WEEK_NUMBER_TO_STRING[day_week] for day_week in day_weeks)}'
for (time, meal), day_weeks in day_week_by_time_meal.items()
)
for location, day_week_by_time_meal
in day_week_by_time_meal_by_location.items()
),
keyboard
)
else:
return (
f'<b>Notificações</b>\n\nAtualmente você não tem nenhuma notificação agendada.',
keyboard
)
def agendar_reply(update: Update, context: CallbackContext) -> None:
body, keyboard = agendar(update.effective_user.id)
update.message.reply_text(body, reply_markup=keyboard, parse_mode=PARSEMODE_HTML)
def agendar_callback(update: Update, context: CallbackContext) -> None:
body, keyboard = agendar(update.effective_user.id)
update.callback_query.message.edit_text(body, reply_markup=keyboard, parse_mode=PARSEMODE_HTML)
VALID_WEEK_DAYS_BY_LOCATION = {
Location.CENTRAL: {0, 1, 2, 3, 4, 5, 6},
Location.POLITECNICO: {0, 1, 2, 3, 4},
}
TIME_BEFORE_OPENING_BY_MEAL = {
Meal.BREAKFAST: '06:15',
Meal.LUNCH: '10:30',
Meal.DINNER: '17:00',
}
def agendar_antes_abertura(update: Update, context: CallbackContext) -> None:
data = update.callback_query.data
if data.step == -1:
buttons = []
for location, valid_days_week in VALID_WEEK_DAYS_BY_LOCATION.items():
new_data = deepcopy(data)
new_data.location = location
new_data.step += 1
for meal, old_days_week in new_data.days_week_by_meal.items():
new_data.days_week_by_meal[meal] = old_days_week.intersection(valid_days_week)
buttons.append([
InlineKeyboardButton(LOCATION_TO_HEADER[location], callback_data=new_data)
])
update.callback_query.message.edit_text(
'<b>Notificações</b>\n\nDe qual local você quer receber notificações?',
reply_markup=InlineKeyboardMarkup(buttons),
parse_mode=PARSEMODE_HTML
)
return
if data.step == 3:
for meal, days_week in data.days_week_by_meal.items():
for day_week in days_week:
time = TIME_BEFORE_OPENING_BY_MEAL[meal]
schedule = Schedule(time=time, day_week=day_week, location=data.location, meal=meal, user_id=update.effective_user.id, created_at=datetime.now())
logger.info(f"User {update.effective_user.id} {update.effective_user.first_name} {update.effective_user.last_name} {update.effective_user.username} added schedule {pformat(schedule)}")
insert_schedule(schedule)
body, keyboard = agendar(update.effective_user.id)
update.callback_query.message.edit_text(body, reply_markup=keyboard, parse_mode=PARSEMODE_HTML)
return
meal_step = list(Meal.__members__.values())[data.step]
if meal_step not in data.days_week_by_meal:
data.days_week_by_meal[meal_step] = { 0, 1, 2, 3, 4 }
days_week_step = data.days_week_by_meal[meal_step]
buttons = []
for day_week in VALID_WEEK_DAYS_BY_LOCATION[data.location]:
if day_week in days_week_step:
new_data = deepcopy(data)
new_data.days_week_by_meal[meal_step].remove(day_week)
buttons.append([InlineKeyboardButton(f'Remover {DAY_WEEK_NUMBER_TO_LONG_STRING[day_week]}', callback_data=new_data)])
else:
new_data = deepcopy(data)
new_data.days_week_by_meal[meal_step].add(day_week)
buttons.append([InlineKeyboardButton(f'Adicionar {DAY_WEEK_NUMBER_TO_LONG_STRING[day_week]}', callback_data=new_data)])
next_data = deepcopy(data)
next_data.step += 1
prev_data = deepcopy(data)
prev_data.step -= 1
update.callback_query.message.edit_text(
f'<b>Notificações</b>\n\n<b>{LOCATION_TO_HEADER[data.location]}</b>\n' +
'\n'.join(f' {meal.value} · {", ".join(DAY_WEEK_NUMBER_TO_STRING[day_week] for day_week in day_weeks)}'
for meal, day_weeks in data.days_week_by_meal.items()) + '\n\n' +
f'Adicione ou remova dias de semana para as notificações do {meal_step.value}, vá para o próximo quando terminar.',
reply_markup=InlineKeyboardMarkup([
*buttons,
[
InlineKeyboardButton('« Voltar', callback_data=prev_data),
InlineKeyboardButton('Próximo »', callback_data=next_data)
],
]),
parse_mode=PARSEMODE_HTML
)
class RemoveTodasNotificacoesOk:
pass
class Agendar:
pass
def remover_todas_notificacoes(update: Update, context: CallbackContext) -> None:
update.callback_query.message.edit_text(
'<b>Notificações</b>\n\nVocê tem certeza que quer remover todas as notificações?',
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton('Sim, remover todas as notificações', callback_data=RemoveTodasNotificacoesOk())],
[InlineKeyboardButton('Não, mantenha minhas notificações', callback_data=Agendar())]
]),
parse_mode=PARSEMODE_HTML
)
def remover_todas_notificacoes_ok(update: Update, context: CallbackContext) -> None:
delete_all_schedules_from_user(update.effective_user.id)
body, keyboard = agendar(update.effective_user.id)
update.callback_query.message.edit_text(body, reply_markup=keyboard, parse_mode=PARSEMODE_HTML)
logger.info(f"User {update.effective_user.id} {update.effective_user.first_name} {update.effective_user.last_name} {update.effective_user.username} removed all notifications")
def send_scheduled(context: CallbackContext) -> None:
time = context.job.next_t - timedelta(minutes=EVERY_X_MINUTES)
schedules = get_schedules_matching_time(time)
for schedule in schedules:
days, update_time = get_menus_by_days(schedule.location)
header = f'<b>{LOCATION_TO_HEADER[schedule.location]}</b>'
days = [day for day in days if date.today() == day.date]
if len(days):
day = days[0]
body = next(
f'<b>{day.date_raw}</b>\n<b> {menu.meal.value}</b>\n' + '\n'.join(f' {item}' for item in menu.items)
for menu in day.menus if menu.meal == schedule.meal
)
else:
body = '<b>Cardápio indisponível</b>'
updated_on = f'<i>Atualizado às ' + update_time.strftime('%H:%M:%S') + ' de hoje</i>'
context.dispatcher.bot.send_message(
schedule.user_id,
header + '\n\n' + body + '\n\n' + updated_on,
parse_mode=PARSEMODE_HTML
)
logger.info(f"User {update.effective_user.id} {update.effective_user.first_name} {update.effective_user.last_name} {update.effective_user.username} received schedule {pformat(schedule)}")
EVERY_X_MINUTES = 15
def main() -> None:
logger.info("Starting bot...")
updater = Updater(os.environ['TELEGRAM_BOT_TOKEN'])
updater = Updater(os.environ['TELEGRAM_BOT_TOKEN'], arbitrary_callback_data=True)
for command in COMMAND_TO_LOCATION.keys():
updater.dispatcher.add_handler(CommandHandler(command, cardapio))
updater.dispatcher.add_handler(CommandHandler('start', start))
updater.dispatcher.add_handler(CommandHandler('agendar', agendar_reply))
updater.dispatcher.add_handler(CallbackQueryHandler(remover_todas_notificacoes, pattern=RemoveTodasNotificacoes))
updater.dispatcher.add_handler(CallbackQueryHandler(remover_todas_notificacoes_ok, pattern=RemoveTodasNotificacoesOk))
updater.dispatcher.add_handler(CallbackQueryHandler(agendar_callback, pattern=Agendar))
updater.dispatcher.add_handler(CallbackQueryHandler(agendar_antes_abertura, pattern=AgendarAntesAbertura))
tm = datetime.now()
tm = tm - timedelta(minutes=tm.minute % EVERY_X_MINUTES, seconds=tm.second, microseconds=tm.microsecond)
tm = tm + timedelta(minutes=EVERY_X_MINUTES)
updater.dispatcher.job_queue.run_repeating(send_scheduled, timedelta(minutes=EVERY_X_MINUTES), first=tm)
updater.start_polling()
logger.info("Connected")
updater.idle()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment