epaper/modules/ical.py

384 lines
18 KiB
Python

from collections.abc import Iterable
from datetime import date, datetime, timedelta, timezone
from functools import reduce
import hashlib
import logging
import re
import time
import urllib.error
import urllib.request
from icalendar import Calendar, Event, vCalAddress, vDDDTypes, vText
from PIL import Image, ImageDraw, ImageFont
import pytz
from . import display_longtext
def next_weekday(d, weekday):
days_ahead = weekday - d.weekday()
if days_ahead < 0: # Target day already happened this week
days_ahead += 7
return d + timedelta(days_ahead)
class IcalModule:
def __init__(self, config):
self.cals = config.cals
self.delayed_departure = 100
self._cached_events = []
self._cached_file = ".ical-%s.cache"
self.cache_time = 15
def __get_events(self, config, toofar=None):
today = datetime.now(tz=pytz.timezone('Europe/Paris')).replace(hour=0, minute=0, second=0)
todayd = date.today()
toofard = None
if toofar is not None:
toofard = date(toofar.year, toofar.month, toofar.day)
events = []
for cal in self.cals:
cache_file = self._cached_file % (hashlib.md5(cal.encode()).hexdigest())
try:
with urllib.request.urlopen(cal) as c:
with open(cache_file, 'wb') as fd:
fd.write(c.read())
except ConnectionResetError as e:
logging.exception(e)
except urllib.error.URLError as e:
logging.exception(e)
with open(cache_file) as c:
ecal = Calendar.from_ical(c.read())
for component in ecal.walk():
if component.name == "VEVENT" and "DTEND" in component:
start = component.decoded("DTSTART")
end = component.decoded("DTEND")
if "RRULE" in component:
rrule = component.decoded("RRULE")
if "UNTIL" in rrule and reduce(lambda p,d: p or d < today, rrule["UNTIL"], False):
continue
if "FREQ" in rrule and "BYDAY" in rrule and "WEEKLY" in rrule["FREQ"]:
weekday = 6
if "MO" in rrule["BYDAY"]:
weekday = 0
elif "TU" in rrule["BYDAY"]:
weekday = 1
elif "WE" in rrule["BYDAY"]:
weekday = 2
elif "TH" in rrule["BYDAY"]:
weekday = 3
elif "FR" in rrule["BYDAY"]:
weekday = 4
elif "SA" in rrule["BYDAY"]:
weekday = 5
diff = end - start
if isinstance(start, datetime):
start = next_weekday(today, weekday).replace(hour=start.hour, minute=start.minute, second=start.second, microsecond=start.microsecond)
else:
start = next_weekday(todayd, weekday)
end = start + diff
exclusions = []
if "EXDATE" in component:
exdates = component.decoded("EXDATE")
if isinstance(exdates, Iterable):
for exdate in exdates:
exclusions.append(exdate.dts[0].dt)
else:
exclusions.append(exdates.dts[0].dt)
while start < component.decoded("DTSTART"):
start += timedelta(days=7)
while start in exclusions:
start += timedelta(days=7)
component["DTSTART"] = vDDDTypes(start)
component["DTEND"] = vDDDTypes(end)
if isinstance(end, datetime):
if end.astimezone(pytz.timezone('Europe/Paris')) < today:
continue
if toofar is not None and start.astimezone(pytz.timezone('Europe/Paris')) > toofar:
continue
else:
if isinstance(end, date) and end < todayd:
continue
if toofard is not None and start > toofard:
continue
events.append(component)
# Sort events
events.sort(key=lambda e: time.mktime(e.decoded("DTSTART").timetuple()))
return events
def _get_events(self, config, toofar=None):
if len(self._cached_events) == 0:
evts = self.__get_events(config, toofar)
self._cached_events = evts
return self._cached_events
def get_events(self, config, toofar=None, now=None):
for component in self._get_events(config, toofar):
evt = None
train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(config, component)
if train_start_station is not None:
start = component.decoded("DTSTART")
end = component.decoded("DTEND")
evt = {
"summary": component.get("SUMMARY"),
"start": start,
"end": end,
}
new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"])
if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"):
evt["new_start"] = new_start
if train_end_station:
new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"])
if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"):
evt["new_end"] = new_end
if len(train_conjoncturels) > 0:
evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"]
elif len(train_situations) > 0:
evt["alert"] = train_situations[0]["libelleSituation"]
if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')):
if "evenement" in train_start_station["depart"]:
evt["alert"] = train_start_station["depart"]["evenement"]["texte"]
if "voie" in train_start_station and "numero" in train_start_station["voie"]:
evt["info"] = "Départ voie " + train_start_station["voie"]["numero"]
if place is not None:
evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place
elif train_end_station is not None:
evt["info"] = "Arrivée à " + new_end.strftime("%H:%M")
if "evenement" in train_end_station["arrivee"]:
evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"]
evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"]
if "voie" in train_end_station and "numero" in train_end_station["voie"]:
evt["info"] += " voie " + train_end_station["voie"]["numero"]
elif now is not None and time.mktime(component.decoded("DTEND").timetuple()) < time.mktime(now.timetuple()):
continue
else:
evt = {
"summary": component.get("SUMMARY"),
"start": component.decoded("DTSTART"),
"end": component.decoded("DTEND"),
}
if "location" in component:
evt["location"] = component.decoded("LOCATION").decode()
if evt is not None:
yield evt
def is_train_event(self, config, evt):
if "description" not in evt:
return None, None, None, None, None
numero_train = None
ville_depart = None
ville_arrivee = None
place = None
for line in evt.decoded("description").decode().split("\n"):
res = re.match(r'.*Train.*([0-9]{4,})$', line)
if res is not None:
numero_train = res[1]
res = re.match(r'.*[0-9]{1,2}h[0-9]{2} (.*)$', line)
if res is not None:
if ville_depart is not None:
ville_arrivee = res[1]
else:
ville_depart = res[1]
res = re.match(r'.*Voiture ([0-9]+).*place([0-9]+)', line)
if res is not None:
place = "voit. " + res[1] + " pl. " + res[2]
if numero_train is None:
return None, None, None, None, place
start_time = evt.decoded("DTSTART")
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
from .sncf import SNCFAPI
status = SNCFAPI(config).get_train_status(numero_train, start_time)
if status is None:
return None, None, None, None, place
situations = []
if "situation" in status and len(status["situation"]) > 0:
situations = status["situation"]
conjoncturels = []
if "listeMessagesConjoncturels" in status and len(status["listeMessagesConjoncturels"]) > 0:
conjoncturels = status["listeMessagesConjoncturels"]
if ville_arrivee is None:
return situations, conjoncturels, None, None, place
start_station = None
end_station = None
for arret in status["listeArretsDesserte"]["arret"]:
if arret["emplacement"]["libelle"].startswith(ville_arrivee):
end_station = arret
if arret["emplacement"]["libelle"].startswith(ville_depart):
start_station = arret
return situations, conjoncturels, start_station, end_station, place
def event_coming(self, config):
now = time.mktime(datetime.now(tz=pytz.timezone('Europe/Paris')).timetuple())
coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)).timetuple())
for evt in self.get_events(config, toofar=datetime.fromtimestamp(coming, tz=pytz.timezone('Europe/Paris'))):
# Looking only the first event
start = time.mktime(evt["start"].timetuple())
return now < start and start < coming
return False
def non_local_event_coming(self, config):
now = time.mktime(datetime.now(tz=pytz.timezone('Europe/Paris')).timetuple())
coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)).timetuple())
for evt in self.get_events(config, toofar=datetime.fromtimestamp(coming, tz=pytz.timezone('Europe/Paris'))):
if "location" in evt and (
evt["location"].lower().startswith("http") or
evt["location"].lower().startswith("ici") or
evt["location"].lower().endswith("gentilly") or
evt["location"].lower().endswith("gentilly, france")
):
continue
# Looking only the first event
start = time.mktime(evt["start"].timetuple())
return now < start and start < coming
return False
def local_event_ending(self, config):
now = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)).timetuple())
coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)).timetuple())
for evt in self.get_events(config, toofar=datetime.fromtimestamp(coming, tz=pytz.timezone('Europe/Paris'))):
if not("location" in evt and evt["location"].lower().startswith("ici")):
continue
# Looking only the first event
end = time.mktime(evt["end"].timetuple())
return now < end and end < coming
return False
def draw_module(self, config, width, height, line_height=19):
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
events = self.get_events(config, now + timedelta(weeks=1), now)
image = Image.new('RGB', (width, height), '#fff')
draw = ImageDraw.Draw(image)
fnt_R = ImageFont.truetype(config.fnt_R_path, int(line_height*0.7))
fnt_B = ImageFont.truetype(config.fnt_RB_path, int(line_height*0.7))
last_evt = None
align = 0
for evt in events:
last_day = None
if last_evt is not None:
last_day = last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day if isinstance(last_evt["start"], datetime) else last_evt["start"].timetuple()[2]
cur_day = evt["start"].astimezone(pytz.timezone('Europe/Paris')).day if isinstance(evt["start"], datetime) else evt["start"].timetuple()[2]
if last_day is None or last_day != cur_day:
draw.text(
(width / 2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B") if isinstance(evt["start"], datetime) else evt["start"].strftime("%a %d %B"),
fill="black", anchor="mt", font=fnt_B
)
align += line_height
draw.rectangle((0, align-4, width, align-4), fill="black")
if isinstance(evt["start"], datetime):
draw.text(
(2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"],
fill="black", anchor="lt", font=fnt_R
)
else:
draw.text(
(width/2, align),
evt["summary"],
fill="black", anchor="mt", font=fnt_B
)
if "new_start" in evt:
draw.line(
(2, align + line_height / 2.6, 2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"))[0], align + line_height / 2.6),
fill="black"
)
draw.text(
(0, align+line_height*0.6),
evt["new_start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"),
fill="black", anchor="lt", font=fnt_B
)
if "alert" in evt:
draw.text(
(2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
evt["alert"],
fill="black", anchor="lt", font=fnt_R
)
align += line_height * 0.7
if "location" in evt and now.day == evt["start"].day:
if isinstance(evt["start"], datetime):
align += display_longtext(draw,
(2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
evt["location"],
fill="black", anchor="lt", font=fnt_R
)
elif evt["location"].lower() != "ici":
align += display_longtext(draw,
(width/2, align+line_height*0.6),
evt["location"],
fill="black", anchor="mt", font=fnt_R
)
if "info" in evt:
if isinstance(evt["start"], datetime):
align += display_longtext(draw,
(2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
evt["info"],
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
)
else:
align += display_longtext(draw,
(width/2, align+line_height*0.6),
evt["info"],
fill="black", anchor="mt", font=fnt_R
)
elif "end" in evt and not isinstance(evt["end"], datetime):
pass
elif "end" in evt and (("new_start" in evt and now > evt["new_start"].astimezone(pytz.timezone('Europe/Paris'))) or ("new_start" not in evt and now > evt["start"].astimezone(pytz.timezone('Europe/Paris')))):
align += display_longtext(draw,
(2 + fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),
"Fin à " + (evt["new_end"] if "new_end" in evt else evt["end"]).astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"),
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getbbox(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
)
align += line_height
last_evt = evt
return image