epaper/modules/ical.py

241 lines
11 KiB
Python

from datetime import datetime, timedelta, timezone
import hashlib
import re
import urllib.error
import urllib.request
from icalendar import Calendar, Event, vCalAddress, vText
from PIL import Image, ImageDraw, ImageFont
import pytz
from . import display_longtext
class IcalModule:
def __init__(self, config):
self.cals = config.cals
self._cached_file = ".ical-%s.cache"
self.cache_time = 15
def get_events(self, config, toofar=None, now=None):
today = datetime.now(tz=pytz.timezone('Europe/Paris')).replace(hour=0, minute=0, second=0)
events = []
for cal in self.cals:
cache_file = self._cached_file % (hashlib.md5(cal.encode()).hexdigest())
try:
with urllib.request.urlopen(cal) as c:
with open(cache_file, 'wb') as fd:
fd.write(c.read())
except ConnectionResetError:
pass
except urllib.error.URLError:
pass
with open(cache_file) as c:
ecal = Calendar.from_ical(c.read())
for component in ecal.walk():
if component.name == "VEVENT":
if component.decoded("DTEND") < today:
continue
if toofar is not None and component.decoded("DTSTART") > toofar:
continue
train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(component)
if train_start_station is not None:
start = component.decoded("DTSTART")
end = component.decoded("DTEND")
evt = {
"summary": component.get("SUMMARY"),
"start": start,
"end": end,
}
new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"])
if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"):
evt["new_start"] = new_start
if train_end_station:
new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"])
if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"):
evt["new_end"] = new_end
if len(train_conjoncturels) > 0:
evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"]
elif len(train_situations) > 0:
evt["alert"] = train_situations[0]["libelleSituation"]
if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')):
if "evenement" in train_start_station["arrivee"]:
evt["alert"] = train_start_station["arrivee"]["evenement"]["texte"]
if "voie" in train_start_station and "numero" in train_start_station["voie"]:
evt["info"] = "Départ voie " + train_start_station["voie"]["numero"]
if place is not None:
evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place
elif train_end_station is not None:
evt["info"] = "Arrivée à " + new_end.strftime("%H:%M")
if "evenement" in train_end_station["arrivee"]:
evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"]
evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"]
if "voie" in train_end_station and "numero" in train_end_station["voie"]:
evt["info"] += " voie " + train_end_station["voie"]["numero"]
events.append(evt)
elif now is not None and component.decoded("DTEND") < now:
continue
else:
evt = {
"summary": component.get("SUMMARY"),
"start": component.decoded("DTSTART"),
"end": component.decoded("DTEND"),
}
if "location" in component:
evt["location"] = component.decoded("LOCATION").decode()
events.append(evt)
# Sort events
events.sort(key=lambda e: e["start"])
return events
def is_train_event(self, evt):
if "description" not in evt:
return None, None, None, None, None
numero_train = None
ville_depart = None
ville_arrivee = None
place = None
for line in evt.decoded("description").decode().split("\n"):
res = re.match(r'.*Train.*([0-9]{4,})$', line)
if res is not None:
numero_train = res[1]
res = re.match(r'.*[0-9]{1,2}h[0-9]{2} (.*)$', line)
if res is not None:
if ville_depart is not None:
ville_arrivee = res[1]
else:
ville_depart = res[1]
res = re.match(r'.*Voiture ([0-9]+).*place([0-9]+)', line)
if res is not None:
place = "voit. " + res[1] + " pl. " + res[2]
if numero_train is None:
return None, None, None, None, place
start_time = evt.decoded("DTSTART")
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
from .sncf import SNCFAPI
status = SNCFAPI().get_train_status(numero_train, start_time)
if status is None:
return None, None, None, None, place
situations = []
if "situation" in status and len(status["situation"]) > 0:
situations = status["situation"]
conjoncturels = []
if "listeMessagesConjoncturels" in status and len(status["listeMessagesConjoncturels"]) > 0:
conjoncturels = status["listeMessagesConjoncturels"]
if ville_arrivee is None:
return situations, conjoncturels, None, None, place
start_station = None
end_station = None
for arret in status["listeArretsDesserte"]["arret"]:
if arret["emplacement"]["libelle"].startswith(ville_arrivee):
end_station = arret
if arret["emplacement"]["libelle"].startswith(ville_depart):
start_station = arret
return situations, conjoncturels, start_station, end_station, place
def event_coming(self, config):
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
coming = datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=80)
for evt in self.get_events(config):
# Looking only the first event
start = evt["start"]
return now < start and start < coming
return False
def draw_module(self, config, width, height, line_height=19):
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
events = self.get_events(config, now + timedelta(weeks=1), now)
image = Image.new('RGB', (width, height), '#fff')
draw = ImageDraw.Draw(image)
fnt_R = ImageFont.truetype(config.fnt_R_path, int(line_height*0.7))
fnt_B = ImageFont.truetype(config.fnt_RB_path, int(line_height*0.7))
last_evt = None
align = 0
for evt in events:
if last_evt is None or last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day != evt["start"].astimezone(pytz.timezone('Europe/Paris')).day:
draw.text(
(width / 2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B"),
fill="black", anchor="mt", font=fnt_B
)
align += line_height
draw.rectangle((0, align-4, width, align-4), fill="black")
draw.text(
(2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"],
fill="black", anchor="lt", font=fnt_R
)
if "new_start" in evt:
draw.line(
(2, align + line_height / 2.6, 2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"))[0], align + line_height / 2.6),
fill="black"
)
draw.text(
(0, align+line_height*0.6),
evt["new_start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"),
fill="black", anchor="lt", font=fnt_B
)
if "alert" in evt:
draw.text(
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
evt["alert"],
fill="black", anchor="lt", font=fnt_R
)
align += line_height * 0.7
if "location" in evt and now.day == evt["start"].day:
align += display_longtext(draw,
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
evt["location"],
fill="black", anchor="lt", font=fnt_R
)
if "info" in evt:
align += display_longtext(draw,
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
evt["info"],
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
)
elif "end" in evt and (("new_start" in evt and now > evt["new_start"]) or ("new_start" not in evt and now > evt["start"])):
align += display_longtext(draw,
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
"Fin à " + (evt["new_end"] if "new_end" in evt else evt["end"]).astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"),
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
)
align += line_height
last_evt = evt
return image