from collections.abc import Iterable from datetime import date, datetime, timedelta, timezone from functools import reduce import hashlib import logging import re import time import urllib.error import urllib.request from icalendar import Calendar, Event, vCalAddress, vDDDTypes, vText from PIL import Image, ImageDraw, ImageFont import pytz from . import display_longtext def next_weekday(d, weekday): days_ahead = weekday - d.weekday() if days_ahead < 0: # Target day already happened this week days_ahead += 7 return d + timedelta(days_ahead) class IcalModule: def __init__(self, config): self.cals = config.cals self.delayed_departure = 100 self._cached_events = [] self._cached_file = ".ical-%s.cache" self.cache_time = 15 def __get_events(self, config, toofar=None): today = datetime.now(tz=pytz.timezone('Europe/Paris')).replace(hour=0, minute=0, second=0) todayd = date.today() toofard = None if toofar is not None: toofard = date(toofar.year, toofar.month, toofar.day) events = [] for cal in self.cals: cache_file = self._cached_file % (hashlib.md5(cal.encode()).hexdigest()) try: with urllib.request.urlopen(cal) as c: with open(cache_file, 'wb') as fd: fd.write(c.read()) except ConnectionResetError as e: logging.exception(e) except urllib.error.URLError as e: logging.exception(e) with open(cache_file) as c: ecal = Calendar.from_ical(c.read()) for component in ecal.walk(): if component.name == "VEVENT" and "DTEND" in component: start = component.decoded("DTSTART") end = component.decoded("DTEND") if "RRULE" in component: rrule = component.decoded("RRULE") if "UNTIL" in rrule and reduce(lambda p,d: p or d < today, rrule["UNTIL"], False): continue if "FREQ" in rrule and "BYDAY" in rrule and "WEEKLY" in rrule["FREQ"]: weekday = 6 if "MO" in rrule["BYDAY"]: weekday = 0 elif "TU" in rrule["BYDAY"]: weekday = 1 elif "WE" in rrule["BYDAY"]: weekday = 2 elif "TH" in rrule["BYDAY"]: weekday = 3 elif "FR" in rrule["BYDAY"]: weekday = 4 elif "SA" in rrule["BYDAY"]: weekday = 5 diff = end - start if isinstance(start, datetime): start = next_weekday(today, weekday).replace(hour=start.hour, minute=start.minute, second=start.second, microsecond=start.microsecond) else: start = next_weekday(todayd, weekday) end = start + diff exclusions = [] if "EXDATE" in component: exdates = component.decoded("EXDATE") if isinstance(exdates, Iterable): for exdate in exdates: exclusions.append(exdate.dts[0].dt) else: exclusions.append(exdates.dts[0].dt) while start < component.decoded("DTSTART"): start += timedelta(days=7) while start in exclusions: start += timedelta(days=7) component["DTSTART"] = vDDDTypes(start) component["DTEND"] = vDDDTypes(end) if isinstance(end, datetime): if end.astimezone(pytz.timezone('Europe/Paris')) < today: continue if toofar is not None and start.astimezone(pytz.timezone('Europe/Paris')) > toofar: continue else: if isinstance(end, date) and end < todayd: continue if toofard is not None and start > toofard: continue events.append(component) # Sort events events.sort(key=lambda e: time.mktime(e.decoded("DTSTART").timetuple())) return events def _get_events(self, config, toofar=None): if len(self._cached_events) == 0: evts = self.__get_events(config, toofar) self._cached_events = evts return self._cached_events def get_events(self, config, toofar=None, now=None): for component in self._get_events(config, toofar): evt = None train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(config, component) if train_start_station is not None: start = component.decoded("DTSTART") end = component.decoded("DTEND") evt = { "summary": component.get("SUMMARY"), "start": start, "end": end, } new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"]) if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"): evt["new_start"] = new_start if train_end_station: new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"]) if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"): evt["new_end"] = new_end if len(train_conjoncturels) > 0: evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"] elif len(train_situations) > 0: evt["alert"] = train_situations[0]["libelleSituation"] if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')): if "evenement" in train_start_station["depart"]: evt["alert"] = train_start_station["depart"]["evenement"]["texte"] if "voie" in train_start_station and "numero" in train_start_station["voie"]: evt["info"] = "Départ voie " + train_start_station["voie"]["numero"] if place is not None: evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place elif train_end_station is not None: evt["info"] = "Arrivée à " + new_end.strftime("%H:%M") if "evenement" in train_end_station["arrivee"]: evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"] evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"] if "voie" in train_end_station and "numero" in train_end_station["voie"]: evt["info"] += " voie " + train_end_station["voie"]["numero"] elif now is not None and time.mktime(component.decoded("DTEND").timetuple()) < time.mktime(now.timetuple()): continue else: evt = { "summary": component.get("SUMMARY"), "start": component.decoded("DTSTART"), "end": component.decoded("DTEND"), } if "location" in component: evt["location"] = component.decoded("LOCATION").decode() if evt is not None: yield evt def is_train_event(self, config, evt): if "description" not in evt: return None, None, None, None, None numero_train = None ville_depart = None ville_arrivee = None place = None for line in evt.decoded("description").decode().split("\n"): res = re.match(r'.*Train.*([0-9]{4,})$', line) if res is not None: numero_train = res[1] res = re.match(r'.*[0-9]{1,2}h[0-9]{2} (.*)$', line) if res is not None: if ville_depart is not None: ville_arrivee = res[1] else: ville_depart = res[1] res = re.match(r'.*Voiture ([0-9]+).*place([0-9]+)', line) if res is not None: place = "voit. " + res[1] + " pl. " + res[2] if numero_train is None: return None, None, None, None, place start_time = evt.decoded("DTSTART") now = datetime.now(tz=pytz.timezone('Europe/Paris')) from .sncf import SNCFAPI status = SNCFAPI(config).get_train_status(numero_train, start_time) if status is None: return None, None, None, None, place situations = [] if "situation" in status and len(status["situation"]) > 0: situations = status["situation"] conjoncturels = [] if "listeMessagesConjoncturels" in status and len(status["listeMessagesConjoncturels"]) > 0: conjoncturels = status["listeMessagesConjoncturels"] if ville_arrivee is None: return situations, conjoncturels, None, None, place start_station = None end_station = None for arret in status["listeArretsDesserte"]["arret"]: if arret["emplacement"]["libelle"].startswith(ville_arrivee): end_station = arret if arret["emplacement"]["libelle"].startswith(ville_depart): start_station = arret return situations, conjoncturels, start_station, end_station, place def event_coming(self, config): now = time.mktime(datetime.now(tz=pytz.timezone('Europe/Paris')).timetuple()) coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)).timetuple()) for evt in self.get_events(config, toofar=datetime.fromtimestamp(coming, tz=pytz.timezone('Europe/Paris'))): # Looking only the first event start = time.mktime(evt["start"].timetuple()) return now < start and start < coming return False def non_local_event_coming(self, config): now = time.mktime(datetime.now(tz=pytz.timezone('Europe/Paris')).timetuple()) coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)).timetuple()) for evt in self.get_events(config, toofar=datetime.fromtimestamp(coming, tz=pytz.timezone('Europe/Paris'))): if "location" in evt and ( evt["location"].lower().startswith("http") or evt["location"].lower().startswith("ici") or evt["location"].lower().endswith("gentilly") or evt["location"].lower().endswith("gentilly, france") ): continue # Looking only the first event start = time.mktime(evt["start"].timetuple()) return now < start and start < coming return False def local_event_ending(self, config): now = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)).timetuple()) coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)).timetuple()) for evt in self.get_events(config, toofar=datetime.fromtimestamp(coming, tz=pytz.timezone('Europe/Paris'))): if not("location" in evt and evt["location"].lower().startswith("ici")): continue # Looking only the first event end = time.mktime(evt["end"].timetuple()) return now < end and end < coming return False def draw_module(self, config, width, height, line_height=19): now = datetime.now(tz=pytz.timezone('Europe/Paris')) events = self.get_events(config, now + timedelta(weeks=1), now) image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) fnt_R = ImageFont.truetype(config.fnt_R_path, int(line_height*0.7)) fnt_B = ImageFont.truetype(config.fnt_RB_path, int(line_height*0.7)) last_evt = None align = 0 for evt in events: last_day = None if last_evt is not None: last_day = last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day if isinstance(last_evt["start"], datetime) else last_evt["start"].timetuple()[2] cur_day = evt["start"].astimezone(pytz.timezone('Europe/Paris')).day if isinstance(evt["start"], datetime) else evt["start"].timetuple()[2] if last_day is None or last_day != cur_day: draw.text( (width / 2, align), evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B") if isinstance(evt["start"], datetime) else evt["start"].strftime("%a %d %B"), fill="black", anchor="mt", font=fnt_B ) align += line_height draw.rectangle((0, align-4, width, align-4), fill="black") if isinstance(evt["start"], datetime): draw.text( (2, align), evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"], fill="black", anchor="lt", font=fnt_R ) else: draw.text( (width/2, align), evt["summary"], fill="black", anchor="mt", font=fnt_B ) if "new_start" in evt: draw.line( (2, align + line_height / 2.6, 2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"))[0], align + line_height / 2.6), fill="black" ) draw.text( (0, align+line_height*0.6), evt["new_start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"), fill="black", anchor="lt", font=fnt_B ) if "alert" in evt: draw.text( (2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), evt["alert"], fill="black", anchor="lt", font=fnt_R ) align += line_height * 0.7 if "location" in evt and now.day == evt["start"].day: if isinstance(evt["start"], datetime): align += display_longtext(draw, (2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), evt["location"], fill="black", anchor="lt", font=fnt_R ) elif evt["location"].lower() != "ici": align += display_longtext(draw, (width/2, align+line_height*0.6), evt["location"], fill="black", anchor="mt", font=fnt_R ) if "info" in evt: if isinstance(evt["start"], datetime): align += display_longtext(draw, (2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), evt["info"], fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0] ) else: align += display_longtext(draw, (width/2, align+line_height*0.6), evt["info"], fill="black", anchor="mt", font=fnt_R ) elif "end" in evt and not isinstance(evt["end"], datetime): pass elif "end" in evt and (("new_start" in evt and now > evt["new_start"].astimezone(pytz.timezone('Europe/Paris'))) or ("new_start" not in evt and now > evt["start"].astimezone(pytz.timezone('Europe/Paris')))): align += display_longtext(draw, (2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6), "Fin à " + (evt["new_end"] if "new_end" in evt else evt["end"]).astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"), fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0] ) align += line_height last_evt = evt return image