Handle daily events, weekly recurring events optimize events calculation

This commit is contained in:
nemunaire 2022-09-19 16:06:02 +02:00
parent d3c24e6f3e
commit 4d7d1ee0f4

View File

@ -1,15 +1,23 @@
from datetime import datetime, timedelta, timezone
from datetime import date, datetime, timedelta, timezone
from functools import reduce
import hashlib
import re
import time
import urllib.error
import urllib.request
from icalendar import Calendar, Event, vCalAddress, vText
from icalendar import Calendar, Event, vCalAddress, vDDDTypes, vText
from PIL import Image, ImageDraw, ImageFont
import pytz
from . import display_longtext
def next_weekday(d, weekday):
days_ahead = weekday - d.weekday()
if days_ahead < 0: # Target day already happened this week
days_ahead += 7
return d + timedelta(days_ahead)
class IcalModule:
def __init__(self, config):
@ -17,11 +25,17 @@ class IcalModule:
self.delayed_departure = 100
self._cached_events = []
self._cached_file = ".ical-%s.cache"
self.cache_time = 15
def get_events(self, config, toofar=None, now=None):
def __get_events(self, config, toofar=None):
today = datetime.now(tz=pytz.timezone('Europe/Paris')).replace(hour=0, minute=0, second=0)
todayd = date.today()
toofard = None
if toofar is not None:
toofard = date(toofar.year, toofar.month, toofar.day)
events = []
for cal in self.cals:
@ -42,70 +56,116 @@ class IcalModule:
ecal = Calendar.from_ical(c.read())
for component in ecal.walk():
if component.name == "VEVENT":
if component.decoded("DTEND") < today:
continue
if toofar is not None and component.decoded("DTSTART") > toofar:
continue
start = component.decoded("DTSTART")
end = component.decoded("DTEND")
if "RRULE" in component:
rrule = component.decoded("RRULE")
if "UNTIL" in rrule and reduce(lambda p,d: p or d < today, rrule["UNTIL"], False):
continue
if "FREQ" in rrule and "BYDAY" in rrule and "WEEKLY" in rrule["FREQ"]:
weekday = 6
if "MO" in rrule["BYDAY"]:
weekday = 0
elif "TU" in rrule["BYDAY"]:
weekday = 1
elif "WE" in rrule["BYDAY"]:
weekday = 2
elif "TH" in rrule["BYDAY"]:
weekday = 3
elif "FR" in rrule["BYDAY"]:
weekday = 4
elif "SA" in rrule["BYDAY"]:
weekday = 5
train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(component)
if train_start_station is not None:
start = component.decoded("DTSTART")
end = component.decoded("DTEND")
evt = {
"summary": component.get("SUMMARY"),
"start": start,
"end": end,
}
start = next_weekday(today, weekday).replace(hour=start.hour, minute=start.minute, second=start.second, microsecond=start.microsecond)
end = next_weekday(today, weekday).replace(hour=end.hour, minute=end.minute, second=end.second, microsecond=end.microsecond)
new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"])
if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"):
evt["new_start"] = new_start
component["DTSTART"] = vDDDTypes(start)
component["DTEND"] = vDDDTypes(end)
if train_end_station:
new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"])
if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"):
evt["new_end"] = new_end
if len(train_conjoncturels) > 0:
evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"]
elif len(train_situations) > 0:
evt["alert"] = train_situations[0]["libelleSituation"]
if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')):
if "evenement" in train_start_station["depart"]:
evt["alert"] = train_start_station["depart"]["evenement"]["texte"]
if "voie" in train_start_station and "numero" in train_start_station["voie"]:
evt["info"] = "Départ voie " + train_start_station["voie"]["numero"]
if place is not None:
evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place
elif train_end_station is not None:
evt["info"] = "Arrivée à " + new_end.strftime("%H:%M")
if "evenement" in train_end_station["arrivee"]:
evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"]
evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"]
if "voie" in train_end_station and "numero" in train_end_station["voie"]:
evt["info"] += " voie " + train_end_station["voie"]["numero"]
events.append(evt)
elif now is not None and component.decoded("DTEND") < now:
continue
if isinstance(end, datetime):
if end < today:
continue
if toofar is not None and start > toofar:
continue
else:
evt = {
"summary": component.get("SUMMARY"),
"start": component.decoded("DTSTART"),
"end": component.decoded("DTEND"),
}
if isinstance(end, date) and end < todayd:
continue
if toofard is not None and start > toofard:
continue
if "location" in component:
evt["location"] = component.decoded("LOCATION").decode()
events.append(evt)
events.append(component)
# Sort events
events.sort(key=lambda e: e["start"])
events.sort(key=lambda e: time.mktime(e.decoded("DTSTART").timetuple()))
return events
def _get_events(self, config, toofar=None):
if len(self._cached_events) == 0:
evts = self.__get_events(config, toofar)
self._cached_events = evts
return self._cached_events
def get_events(self, config, toofar=None, now=None):
for component in self._get_events(config, toofar):
evt = None
train_situations, train_conjoncturels, train_start_station, train_end_station, place = self.is_train_event(component)
if train_start_station is not None:
start = component.decoded("DTSTART")
end = component.decoded("DTEND")
evt = {
"summary": component.get("SUMMARY"),
"start": start,
"end": end,
}
new_start = datetime.fromisoformat(train_start_station["depart"]["dateHeureReelle"])
if start.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_start.strftime("%d %H:%M"):
evt["new_start"] = new_start
if train_end_station:
new_end = datetime.fromisoformat(train_end_station["arrivee"]["dateHeureReelle"])
if end.astimezone(pytz.timezone('Europe/Paris')).strftime("%d %H:%M") != new_end.strftime("%d %H:%M"):
evt["new_end"] = new_end
if len(train_conjoncturels) > 0:
evt["alert"] = train_conjoncturels["messagesConjoncturels"][0]["titre"]
elif len(train_situations) > 0:
evt["alert"] = train_situations[0]["libelleSituation"]
if new_start > datetime.now(tz=pytz.timezone('Europe/Paris')):
if "evenement" in train_start_station["depart"]:
evt["alert"] = train_start_station["depart"]["evenement"]["texte"]
if "voie" in train_start_station and "numero" in train_start_station["voie"]:
evt["info"] = "Départ voie " + train_start_station["voie"]["numero"]
if place is not None:
evt["info"] = ((evt["info"] + "\n") if "info" in evt else "") + place
elif train_end_station is not None:
evt["info"] = "Arrivée à " + new_end.strftime("%H:%M")
if "evenement" in train_end_station["arrivee"]:
evt["alert"] = train_end_station["arrivee"]["evenement"]["texte"]
evt["info"] += " (+%d')" % train_end_station["arrivee"]["evenement"]["retard"]["duree"]
if "voie" in train_end_station and "numero" in train_end_station["voie"]:
evt["info"] += " voie " + train_end_station["voie"]["numero"]
elif now is not None and time.mktime(component.decoded("DTEND").timetuple()) < time.mktime(now.timetuple()):
continue
else:
evt = {
"summary": component.get("SUMMARY"),
"start": component.decoded("DTSTART"),
"end": component.decoded("DTEND"),
}
if "location" in component:
evt["location"] = component.decoded("LOCATION").decode()
if evt is not None:
yield evt
def is_train_event(self, evt):
if "description" not in evt:
return None, None, None, None, None
@ -165,18 +225,18 @@ class IcalModule:
return situations, conjoncturels, start_station, end_station, place
def event_coming(self, config):
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
coming = datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)
now = time.mktime(datetime.now(tz=pytz.timezone('Europe/Paris')).timetuple())
coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)).timetuple())
for evt in self.get_events(config):
# Looking only the first event
start = evt["start"]
start = time.mktime(evt["start"].timetuple())
return now < start and start < coming
return False
def non_local_event_coming(self, config):
now = datetime.now(tz=pytz.timezone('Europe/Paris'))
coming = datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)
now = time.mktime(datetime.now(tz=pytz.timezone('Europe/Paris')).timetuple())
coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure)).timetuple())
for evt in self.get_events(config):
if "location" in evt and (
@ -188,20 +248,20 @@ class IcalModule:
continue
# Looking only the first event
start = evt["start"]
start = time.mktime(evt["start"].timetuple())
return now < start and start < coming
return False
def local_event_ending(self, config):
now = datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)
coming = datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)
now = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)).timetuple())
coming = time.mktime((datetime.now(tz=pytz.timezone('Europe/Paris')) + timedelta(minutes=self.delayed_departure/3)).timetuple())
for evt in self.get_events(config):
if not("location" in evt and evt["location"].lower().startswith("ici")):
continue
# Looking only the first event
end = evt["end"]
end = time.mktime(evt["end"].timetuple())
return now < end and end < coming
return False
@ -218,20 +278,33 @@ class IcalModule:
last_evt = None
align = 0
for evt in events:
if last_evt is None or last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day != evt["start"].astimezone(pytz.timezone('Europe/Paris')).day:
last_day = None
if last_evt is not None:
last_day = last_evt["start"].astimezone(pytz.timezone('Europe/Paris')).day if isinstance(last_evt["start"], datetime) else last_evt["start"].timetuple()[2]
cur_day = evt["start"].astimezone(pytz.timezone('Europe/Paris')).day if isinstance(evt["start"], datetime) else evt["start"].timetuple()[2]
if last_day is None or last_day != cur_day:
draw.text(
(width / 2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B"),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%a %d %B") if isinstance(evt["start"], datetime) else evt["start"].strftime("%a %d %B"),
fill="black", anchor="mt", font=fnt_B
)
align += line_height
draw.rectangle((0, align-4, width, align-4), fill="black")
draw.text(
(2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"],
fill="black", anchor="lt", font=fnt_R
)
if isinstance(evt["start"], datetime):
draw.text(
(2, align),
evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M") + " " + evt["summary"],
fill="black", anchor="lt", font=fnt_R
)
else:
draw.text(
(width/2, align),
evt["summary"],
fill="black", anchor="mt", font=fnt_B
)
if "new_start" in evt:
draw.line(
(2, align + line_height / 2.6, 2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M"))[0], align + line_height / 2.6),
@ -261,6 +334,8 @@ class IcalModule:
evt["info"],
fill="black", anchor="lt", font=fnt_R, maxwidth=width - fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0]
)
elif "end" in evt and not isinstance(evt["end"], datetime):
pass
elif "end" in evt and (("new_start" in evt and now > evt["new_start"]) or ("new_start" not in evt and now > evt["start"])):
align += display_longtext(draw,
(2 + fnt_R.getsize(evt["start"].astimezone(pytz.timezone('Europe/Paris')).strftime("%H:%M "))[0], align+line_height*0.6),