2022-08-14 17:51:51 +00:00
|
|
|
from datetime import datetime, timedelta, timezone
|
2022-08-19 14:59:00 +00:00
|
|
|
import hashlib
|
2022-08-21 00:13:11 +00:00
|
|
|
import re
|
2022-08-19 14:59:00 +00:00
|
|
|
import urllib.error
|
2022-08-14 17:51:51 +00:00
|
|
|
import urllib.request
|
|
|
|
|
|
|
|
from icalendar import Calendar, Event, vCalAddress, vText
|
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
import pytz
|
|
|
|
|
2022-08-21 00:13:11 +00:00
|
|
|
from . import display_longtext
|
|
|
|
|
2022-08-14 17:51:51 +00:00
|
|
|
class IcalModule:
|
|
|
|
|
2022-08-14 17:55:12 +00:00
|
|
|
def __init__(self, config):
|
|
|
|
self.cals = config.cals
|
2022-08-14 17:51:51 +00:00
|
|
|
|
2022-08-19 14:59:00 +00:00
|
|
|
self._cached_file = ".ical-%s.cache"
|
|
|
|
self.cache_time = 15
|
|
|
|
|
2022-08-21 00:13:11 +00:00
|
|
|
def get_events(self, config, toofar=None, now=None):
|
|
|
|
today = datetime.now(tz=pytz.timezone('Europe/Paris')).replace(hour=0, minute=0, second=0)
|
2022-08-14 17:51:51 +00:00
|
|
|
|
|
|
|
events = []
|
|
|
|
for cal in self.cals:
|
2022-08-19 14:59:00 +00:00
|
|
|
cache_file = self._cached_file % (hashlib.md5(cal.encode()).hexdigest())
|
|
|
|
|
|
|
|
try:
|
|
|
|
with urllib.request.urlopen(cal) as c:
|
|
|
|
with open(cache_file, 'wb') as fd:
|
|
|
|
fd.write(c.read())
|
|
|
|
except ConnectionResetError:
|
|
|
|
pass
|
|
|
|
except urllib.error.URLError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
with open(cache_file) as c:
|
2022-08-14 17:51:51 +00:00
|
|
|
ecal = Calendar.from_ical(c.read())
|
|
|
|
for component in ecal.walk():
|
|
|
|
if component.name == "VEVENT":
|
2022-08-21 00:13:11 +00:00
|
|
|
if component.decoded("DTEND") < today:
|
2022-08-14 17:51:51 +00:00
|
|
|
continue
|
2022-08-21 00:13:11 +00:00
|
|
|
if toofar is not None and component.decoded("DTSTART") > toofar:
|
2022-08-14 17:51:51 +00:00
|
|
|
continue
|
|
|
|
|
2022-08-21 00:13:11 +00:00
|
|
|
train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(component)
|
|
|
|
if train_start_station is not None:
|
|
|
|
start = component.decoded("DTSTART")
|
|
|
|
end = component.decoded("DTEND")
|
|
|
|
evt = {
|
|
|
|
"summary": component.get("SUMMARY"),
|
|
|
|
"start": start,
|
|
|
|
"end": end,
|
|
|
|
}
|
|
|
|
|
|
|
|
new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"])
|
|
|
|
if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"):
|
|
|
|
evt["new_start"] = new_start
|
|
|
|
|
|
|
|
if train_end_station:
|
|
|
|
new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"])
|
|
|
|
if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"):
|
|
|
|
evt["new_end"] = new_end
|
|
|
|
|
|
|
|
if len(train_conjoncturels) > 0:
|
|
|
|
evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"]
|
|
|
|
elif len(train_situations) > 0:
|
|
|
|
evt["alert"] = train_situations[0]["libelleSituation"]
|
|
|
|
|
|
|
|
if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')):
|
|
|
|
if "evenement" in train_start_station["arrivee"]:
|
|
|
|
evt["alert"] = train_start_station["arrivee"]["evenement"]["texte"]
|
|
|
|
if "voie" in train_start_station and "numero" in train_start_station["voie"]:
|
|
|
|
evt["info"] = "Départ voie " + train_start_station["voie"]["numero"]
|
|
|
|
if place is not None:
|
|
|
|
evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place
|
|
|
|
elif train_end_station is not None:
|
|
|
|
evt["info"] = "Arrivée à " + new_end.strftime("%H:%M")
|
|
|
|
if "evenement" in train_end_station["arrivee"]:
|
|
|
|
evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"]
|
|
|
|
evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"]
|
|
|
|
if "voie" in train_end_station and "numero" in train_end_station["voie"]:
|
|
|
|
evt["info"] += " voie " + train_end_station["voie"]["numero"]
|
|
|
|
|
|
|
|
events.append(evt)
|
|
|
|
elif now is not None and component.decoded("DTEND") < now:
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
events.append({
|
|
|
|
"summary": component.get("SUMMARY"),
|
|
|
|
"start": component.decoded("DTSTART"),
|
|
|
|
"end": component.decoded("DTEND"),
|
|
|
|
})
|
2022-08-14 17:51:51 +00:00
|
|
|
|
|
|
|
# Sort events
|
|
|
|
events.sort(key=lambda e: e["start"])
|
|
|
|
|
2022-08-21 00:13:11 +00:00
|
|
|
return events
|
|
|
|
|
|
|
|
def is_train_event(self, evt):
|
|
|
|
if "description" not in evt:
|
|
|
|
return None, None, None, None, None
|
|
|
|
|
|
|
|
numero_train = None
|
|
|
|
ville_depart = None
|
|
|
|
ville_arrivee = None
|
|
|
|
place = None
|
|
|
|
|
|
|
|
for line in evt.decoded("description").decode().split("\n"):
|
|
|
|
res = re.match(r'.*Train.*([0-9]{4,})$', line)
|
|
|
|
if res is not None:
|
|
|
|
numero_train = res[1]
|
|
|
|
|
|
|
|
res = re.match(r'.*[0-9]{1,2}h[0-9]{2} (.*)$', line)
|
|
|
|
if res is not None:
|
|
|
|
if ville_depart is not None:
|
|
|
|
ville_arrivee = res[1]
|
|
|
|
else:
|
|
|
|
ville_depart = res[1]
|
|
|
|
|
|
|
|
res = re.match(r'.*Voiture ([0-9]+).*place([0-9]+)', line)
|
|
|
|
if res is not None:
|
|
|
|
place = "voit. " + res[1] + " pl. " + res[2]
|
|
|
|
|
|
|
|
if numero_train is None:
|
|
|
|
return None, None, None, None, place
|
|
|
|
|
|
|
|
start_time = evt.decoded("DTSTART")
|
|
|
|
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
|
|
|
|
|
|
|
|
from .sncf import SNCFAPI
|
|
|
|
status = SNCFAPI().get_train_status(numero_train, start_time)
|
|
|
|
|
|
|
|
if status is None:
|
|
|
|
return None, None, None, None, place
|
|
|
|
|
|
|
|
situations = []
|
|
|
|
if "situation" in status and len(status["situation"]) > 0:
|
|
|
|
situations = status["situation"]
|
|
|
|
|
|
|
|
conjoncturels = []
|
|
|
|
if "listeMessagesConjoncturels" in status and len(status["listeMessagesConjoncturels"]) > 0:
|
|
|
|
conjoncturels = status["listeMessagesConjoncturels"]
|
|
|
|
|
|
|
|
if ville_arrivee is None:
|
|
|
|
return situations, conjoncturels, None, None, place
|
|
|
|
|
|
|
|
start_station = None
|
|
|
|
end_station = None
|
|
|
|
for arret in status["listeArretsDesserte"]["arret"]:
|
|
|
|
if arret["emplacement"]["libelle"].startswith(ville_arrivee):
|
|
|
|
end_station = arret
|
|
|
|
if arret["emplacement"]["libelle"].startswith(ville_depart):
|
|
|
|
start_station = arret
|
|
|
|
|
|
|
|
return situations, conjoncturels, start_station, end_station, place
|
|
|
|
|
|
|
|
def draw_module(self, config, width, height, line_height=19):
|
|
|
|
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
|
|
|
|
events = self.get_events(config, now + timedelta(weeks=1), now)
|
|
|
|
|
2022-08-14 17:51:51 +00:00
|
|
|
image = Image.new('RGB', (width, height), '#fff')
|
|
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
|
|
|
|
fnt_R = ImageFont.truetype(config.fnt_R_path, int(line_height*0.7))
|
|
|
|
fnt_B = ImageFont.truetype(config.fnt_RB_path, int(line_height*0.7))
|
|
|
|
|
|
|
|
last_evt = None
|
|
|
|
align = 0
|
|
|
|
for evt in events:
|
|
|
|
if last_evt is None or last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day != evt["start"].astimezone(pytz.timezone('Europe/Paris')).day:
|
|
|
|
draw.text(
|
|
|
|
(width / 2, align),
|
|
|
|
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B"),
|
|
|
|
fill="black", anchor="mt", font=fnt_B
|
|
|
|
)
|
|
|
|
align += line_height
|
|
|
|
draw.rectangle((0, align-4, width, align-4), fill="black")
|
|
|
|
|
|
|
|
draw.text(
|
|
|
|
(2, align),
|
|
|
|
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"],
|
|
|
|
fill="black", anchor="lt", font=fnt_R
|
|
|
|
)
|
2022-08-21 00:13:11 +00:00
|
|
|
if "new_start" in evt:
|
|
|
|
draw.line(
|
|
|
|
(2, align + line_height / 2.6, 2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"))[0], align + line_height / 2.6),
|
|
|
|
fill="black"
|
|
|
|
)
|
|
|
|
draw.text(
|
|
|
|
(0, align+line_height*0.6),
|
|
|
|
evt["new_start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"),
|
|
|
|
fill="black", anchor="lt", font=fnt_B
|
|
|
|
)
|
|
|
|
if "alert" in evt:
|
|
|
|
draw.text(
|
|
|
|
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
|
|
|
|
evt["alert"],
|
|
|
|
fill="black", anchor="lt", font=fnt_R
|
|
|
|
)
|
|
|
|
align += line_height * 0.7
|
|
|
|
if "info" in evt:
|
|
|
|
align += display_longtext(draw,
|
|
|
|
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.5),
|
|
|
|
evt["info"],
|
|
|
|
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
|
|
|
|
)
|
|
|
|
elif "end" in evt and (("new_start" in evt and now > evt["new_start"]) or ("new_start" not in evt and now > evt["start"])):
|
|
|
|
align += display_longtext(draw,
|
|
|
|
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.5),
|
|
|
|
"Fin à " + (evt["new_end"] if "new_end" in evt else evt["end"]).astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"),
|
|
|
|
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
|
|
|
|
)
|
|
|
|
|
2022-08-14 17:51:51 +00:00
|
|
|
|
|
|
|
align += line_height
|
|
|
|
last_evt = evt
|
|
|
|
|
|
|
|
return image
|