from datetime import datetime, timedelta, timezone import hashlib import re import urllib.error import urllib.request from icalendar import Calendar, Event, vCalAddress, vText from PIL import Image, ImageDraw, ImageFont import pytz from . import display_longtext class IcalModule: def __init__(self, config): self.cals = config.cals self.delayed_departure = 100 self._cached_file = ".ical-%s.cache" self.cache_time = 15 def get_events(self, config, toofar=None, now=None): today = datetime.now(tz=pytz.timezone('Europe/Paris')).replace(hour=0, minute=0, second=0) events = [] for cal in self.cals: cache_file = self._cached_file % (hashlib.md5(cal.encode()).hexdigest()) try: with urllib.request.urlopen(cal) as c: with open(cache_file, 'wb') as fd: fd.write(c.read()) except ConnectionResetError: pass except urllib.error.URLError: pass with open(cache_file) as c: ecal = Calendar.from_ical(c.read()) for component in ecal.walk(): if component.name == "VEVENT": if component.decoded("DTEND") < today: continue if toofar is not None and component.decoded("DTSTART") > toofar: continue train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(component) if train_start_station is not None: start = component.decoded("DTSTART") end = component.decoded("DTEND") evt = { "summary": component.get("SUMMARY"), "start": start, "end": end, } new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"]) if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"): evt["new_start"] = new_start if train_end_station: new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"]) if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"): evt["new_end"] = new_end if len(train_conjoncturels) > 0: evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"] elif len(train_situations) > 0: evt["alert"] = train_situations[0]["libelleSituation"] if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')): if "evenement" in train_start_station["arrivee"]: evt["alert"] = train_start_station["arrivee"]["evenement"]["texte"] if "voie" in train_start_station and "numero" in train_start_station["voie"]: evt["info"] = "Départ voie " + train_start_station["voie"]["numero"] if place is not None: evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place elif train_end_station is not None: evt["info"] = "Arrivée à " + new_end.strftime("%H:%M") if "evenement" in train_end_station["arrivee"]: evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"] evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"] if "voie" in train_end_station and "numero" in train_end_station["voie"]: evt["info"] += " voie " + train_end_station["voie"]["numero"] events.append(evt) elif now is not None and component.decoded("DTEND") < now: continue else: evt = { "summary": component.get("SUMMARY"), "start": component.decoded("DTSTART"), "end": component.decoded("DTEND"), } if "location" in component: evt["location"] = component.decoded("LOCATION").decode() events.append(evt) # Sort events events.sort(key=lambda e: e["start"]) return events def is_train_event(self, evt): if "description" not in evt: return None, None, None, None, None numero_train = None ville_depart = None ville_arrivee = None place = None for line in evt.decoded("description").decode().split("\n"): res = re.match(r'.*Train.*([0-9]{4,})$', line) if res is not None: numero_train = res[1] res = re.match(r'.*[0-9]{1,2}h[0-9]{2} (.*)$', line) if res is not None: if ville_depart is not None: ville_arrivee = res[1] else: ville_depart = res[1] res = re.match(r'.*Voiture ([0-9]+).*place([0-9]+)', line) if res is not None: place = "voit. " + res[1] + " pl. " + res[2] if numero_train is None: return None, None, None, None, place start_time = evt.decoded("DTSTART") now = datetime.now(tz=pytz.timezone('Europe/Paris')) from .sncf import SNCFAPI status = SNCFAPI().get_train_status(numero_train, start_time) if status is None: return None, None, None, None, place situations = [] if "situation" in status and len(status["situation"]) > 0: situations = status["situation"] conjoncturels = [] if "listeMessagesConjoncturels" in status and len(status["listeMessagesConjoncturels"]) > 0: conjoncturels = status["listeMessagesConjoncturels"] if ville_arrivee is None: return situations, conjoncturels, None, None, place start_station = None end_station = None for arret in status["listeArretsDesserte"]["arret"]: if arret["emplacement"]["libelle"].startswith(ville_arrivee): end_station = arret if arret["emplacement"]["libelle"].startswith(ville_depart): start_station = arret return situations, conjoncturels, start_station, end_station, place def event_coming(self, config): now = datetime.now(tz=pytz.timezone('Europe/Paris')) coming = datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure) for evt in self.get_events(config): # Looking only the first event start = evt["start"] return now < start and start < coming return False def draw_module(self, config, width, height, line_height=19): now = datetime.now(tz=pytz.timezone('Europe/Paris')) events = self.get_events(config, now + timedelta(weeks=1), now) image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) fnt_R = ImageFont.truetype(config.fnt_R_path, int(line_height*0.7)) fnt_B = ImageFont.truetype(config.fnt_RB_path, int(line_height*0.7)) last_evt = None align = 0 for evt in events: if last_evt is None or last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day != evt["start"].astimezone(pytz.timezone('Europe/Paris')).day: draw.text( (width / 2, align), evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B"), fill="black", anchor="mt", font=fnt_B ) align += line_height draw.rectangle((0, align-4, width, align-4), fill="black") draw.text( (2, align), evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"], fill="black", anchor="lt", font=fnt_R ) if "new_start" in evt: draw.line( (2, align + line_height / 2.6, 2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"))[0], align + line_height / 2.6), fill="black" ) draw.text( (0, align+line_height*0.6), evt["new_start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"), fill="black", anchor="lt", font=fnt_B ) if "alert" in evt: draw.text( (2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), evt["alert"], fill="black", anchor="lt", font=fnt_R ) align += line_height * 0.7 if "location" in evt and now.day == evt["start"].day: align += display_longtext(draw, (2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), evt["location"], fill="black", anchor="lt", font=fnt_R ) if "info" in evt: align += display_longtext(draw, (2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), evt["info"], fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0] ) elif "end" in evt and (("new_start" in evt and now > evt["new_start"]) or ("new_start" not in evt and now > evt["start"])): align += display_longtext(draw, (2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), "Fin à " + (evt["new_end"] if "new_end" in evt else evt["end"]).astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"), fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0] ) align += line_height last_evt = evt return image