428 lines
17 KiB
Python
428 lines
17 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
def whenStr(date, now):
|
|
if date < now + timedelta(days=1):
|
|
return f"{date.hour}h{date.minute}"
|
|
elif date < now + timedelta(days=7):
|
|
weekday = date.weekday()
|
|
if weekday == 0:
|
|
return "lundi"
|
|
elif weekday == 1:
|
|
return "mardi"
|
|
elif weekday == 2:
|
|
return "mercredi"
|
|
elif weekday == 3:
|
|
return "jeudi"
|
|
elif weekday == 4:
|
|
return "vendredi"
|
|
elif weekday == 5:
|
|
return "samedi"
|
|
else:
|
|
return "dimanche"
|
|
else:
|
|
return date.strftime("%d %b")
|
|
|
|
class IDFMAPI:
|
|
|
|
fnt_R_path = "./fonts/Parisine-Regular.ttf"
|
|
fnt_RB_path = "./fonts/Parisine-Bold.ttf"
|
|
|
|
lines = {
|
|
"metros": {
|
|
"1": "C01371",
|
|
"2": "C01372",
|
|
"3": "C01373",
|
|
"4": "C01374",
|
|
"5": "C01375",
|
|
"6": "C01376",
|
|
"7": "C01377",
|
|
"8": "C01378",
|
|
"9": "C01379",
|
|
"10": "C01380",
|
|
"11": "C01381",
|
|
"12": "C01382",
|
|
"13": "C01383",
|
|
"14": "C01384",
|
|
"3B": "C01386",
|
|
"7B": "C01387",
|
|
},
|
|
"buses": {
|
|
"57": "C01094",
|
|
"125": "C01154",
|
|
#"131": "C01159",
|
|
"184": "C01205",
|
|
},
|
|
"rers": {
|
|
"A": "C01742",
|
|
"B": "C01743",
|
|
"C": "C01727",
|
|
"D": "C01728",
|
|
"E": "C01729"
|
|
},
|
|
"tramways": {
|
|
"T2": "C01390",
|
|
"T3A": "C01391",
|
|
"T3B": "C01679",
|
|
"T7": "C01774",
|
|
"T9": "C02317",
|
|
},
|
|
}
|
|
|
|
|
|
|
|
def __init__(self, config, apikey=None):
|
|
self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace"
|
|
self.apikey = apikey or os.environ["TOKEN_IDFM"]
|
|
|
|
self._cached_file = ".ratp-%s.cache"
|
|
self.cache_timeout = config.cache_timeout
|
|
self.max_cache_timeout = config.max_cache_timeout
|
|
|
|
def fromHTMLDisruption(src):
|
|
cleanr = re.compile('<.*?>')
|
|
cleanrA = re.compile('<a.*?>.*?</a>')
|
|
period = re.compile('Période :[^.]+. ')
|
|
period2 = re.compile('Dates? :[^.]+. ')
|
|
more = re.compile(" Plus d'informations sur [^.]+.")
|
|
return re.sub(cleanr, '', re.sub(cleanrA, '', re.sub(period, '', re.sub(period2, '', re.sub(more, '', src.replace(' ', ' ').replace(' ', ' ').replace('’', "'").replace('à', 'à').replace('é', 'é').replace('è', 'è').replace('ê', 'ê').replace('û', 'û').replace('Î', 'Î').replace('<br>', ' ').replace('</p>', ' ').replace('Information Ile de France Mobilités :', '').replace("Les horaires du calculateur d'itinéraire tiennent compte des travaux. ", '').replace("à la demande de la Préfecture de Police et d'Île-de-France Mobilités, et ", '')))))).strip()
|
|
|
|
def get_schedules(self, mode, line, station, way="A+R"):
|
|
if mode == "M":
|
|
mode = "metros"
|
|
elif mode == "R":
|
|
mode = "rers"
|
|
elif mode == "T":
|
|
mode = "tramways"
|
|
line = line[1:]
|
|
elif mode == "B":
|
|
mode = "buses"
|
|
elif mode == "N":
|
|
mode = "noctiliens"
|
|
|
|
cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest())
|
|
|
|
req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way))
|
|
try:
|
|
with urllib.request.urlopen(req) as f:
|
|
with open(cache_file, 'wb') as fd:
|
|
fd.write(f.read())
|
|
except ConnectionResetError as e:
|
|
logging.exception(e)
|
|
except urllib.error.URLError as e:
|
|
logging.exception(e)
|
|
except urllib.error.HTTPError as e:
|
|
logging.exception(e)
|
|
|
|
with open(cache_file) as f:
|
|
res = json.load(f)
|
|
|
|
# Convert time to hours
|
|
now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:])
|
|
|
|
for i in range(len(res["result"]["schedules"])):
|
|
if "message" in res["result"]["schedules"][i]:
|
|
if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret" or res["result"]["schedules"][i]["message"] == "A l'arrêt" or res["result"]["schedules"][i]["message"] == "A quai":
|
|
res["result"]["schedules"][i]["message"] = now.strftime("%H:%M")
|
|
elif res["result"]["schedules"][i]["message"].endswith(" mn"):
|
|
res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M")
|
|
res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "++").replace("Retardé", "++")
|
|
|
|
return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"]
|
|
|
|
def get_weather(self):
|
|
cache_file = self._cached_file % ("ratp-disruptions")
|
|
# Read the mod time
|
|
statinfo = None
|
|
try:
|
|
statinfo = os.stat(cache_file)
|
|
except:
|
|
pass
|
|
|
|
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc):
|
|
# Do the request and save it
|
|
req = urllib.request.Request(self.baseurl + "/navitia/line_reports/coverage/fr-idf/line_reports?count=2000")
|
|
req.headers["apikey"] = self.apikey
|
|
try:
|
|
with urllib.request.urlopen(req) as f:
|
|
with open(cache_file, 'wb') as fd:
|
|
fd.write(f.read())
|
|
except ConnectionResetError:
|
|
pass
|
|
|
|
try:
|
|
statinfo = os.stat(cache_file)
|
|
except:
|
|
pass
|
|
|
|
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc):
|
|
raise Exception("File too old")
|
|
|
|
# Retrieve cached data
|
|
res = {}
|
|
with open(cache_file) as f:
|
|
res = json.load(f)
|
|
|
|
disruptions = {}
|
|
for d in res["disruptions"]:
|
|
disruptions[d["id"]] = d
|
|
|
|
ret = {}
|
|
|
|
for mode in IDFMAPI.lines:
|
|
ret[mode] = {}
|
|
for line in IDFMAPI.lines[mode]:
|
|
ret[mode][line] = self.get_line_weather(res, disruptions, mode, line)
|
|
|
|
return ret
|
|
|
|
def get_line_weather(self, res, disruptions, mode, line):
|
|
for l in res["line_reports"]:
|
|
if "line" not in l:
|
|
continue
|
|
|
|
if "id" in l["line"]:
|
|
if l["line"]["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]:
|
|
for link in l["line"]["links"]:
|
|
if disruptions[link["id"]]["status"] != "past":
|
|
yield disruptions[link["id"]]
|
|
|
|
return None
|
|
|
|
def get_line_icon(mode, line, size, fill="gray"):
|
|
width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size
|
|
image = Image.new('RGBA', (width, size), '#fff0')
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
fnt_icon = ImageFont.truetype(IDFMAPI.fnt_RB_path, size-3)
|
|
|
|
if mode == "M" or mode == "metros":
|
|
draw.ellipse((0, 0, width, size), fill=fill)
|
|
elif mode == "T" or mode == "tramways":
|
|
if fill == "black":
|
|
draw.rectangle((0, 0, width, size), fill=fill)
|
|
draw.rectangle((0, 0, width, 1), fill=fill if fill != "black" else "gray")
|
|
draw.rectangle((0, size-2, width, size), fill=fill if fill != "black" else "gray")
|
|
elif (mode == "R" or mode == "rers") and "rounded_rectangle" in draw.__dict__:
|
|
draw.rounded_rectangle((0, 0, width, size), radius=4, fill=fill)
|
|
else:
|
|
draw.rectangle((0, 0, width, size), fill=fill)
|
|
draw.text((int(width / 2), int(size / 2)), line, fill="white" if (fill == "black" and mode == "tramways") or (fill != "white" and mode != "tramways") or (fill == "white" and mode == "tramways") else "black", anchor="mm", font=fnt_icon)
|
|
|
|
return image
|
|
|
|
|
|
|
|
class RATPWeatherModule:
|
|
|
|
def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]):
|
|
self.major_lines = major_lines
|
|
|
|
def gen_alerts(self, config):
|
|
alerts = []
|
|
|
|
weather = IDFMAPI(config).get_weather()
|
|
id_seens = []
|
|
for mode in weather:
|
|
for line in weather[mode]:
|
|
if mode[0].upper() + line not in self.major_lines:
|
|
continue
|
|
|
|
def alert_icon(mode, line):
|
|
def icon(size=64):
|
|
image = Image.new('RGB', (size, size), '#000')
|
|
|
|
white = Image.new('RGB', (int(size / 2), int(size / 2)), '#fff')
|
|
mode_icon = Image.open("icons/" + mode + ".png").resize((int(size/2), int(size/2)))
|
|
image.paste(white, (-5,0), mode_icon)
|
|
|
|
line_icon = IDFMAPI.get_line_icon(mode, line, int(size/2), fill="white")
|
|
image.paste(line_icon, (int(size/2) - 5,0), line_icon)
|
|
|
|
return image
|
|
return icon
|
|
|
|
for disruption in weather[mode][line]:
|
|
if "messages" not in disruption:
|
|
continue
|
|
|
|
if disruption["status"] != "active" and "application_periods" not in disruption:
|
|
continue
|
|
|
|
title = ""
|
|
subtitle = ""
|
|
content = ""
|
|
|
|
if "application_periods" in disruption:
|
|
now = datetime.now()
|
|
nextweek = now + timedelta(days=1)
|
|
|
|
application_periods = []
|
|
for ap in disruption["application_periods"]:
|
|
ap["begin"] = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S")
|
|
ap["end"] = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S")
|
|
|
|
if ap["end"] < now:
|
|
continue
|
|
elif ap["begin"] > nextweek:
|
|
continue
|
|
elif len(application_periods) > 0 and application_periods[0]["begin"] + timedelta(hours=16) < ap["begin"]:
|
|
continue
|
|
else:
|
|
application_periods.append(ap)
|
|
|
|
if len(application_periods) == 0:
|
|
continue
|
|
elif len(application_periods) == 1:
|
|
if application_periods[0]["begin"] > now:
|
|
subtitle = "De " + whenStr(application_periods[0]["begin"], now) + " à " + whenStr(application_periods[0]["end"], now)
|
|
elif application_periods[0]["end"] > now:
|
|
subtitle = "Fin " + whenStr(application_periods[0]["end"], now)
|
|
|
|
for msg in disruption["messages"]:
|
|
if msg["channel"]["name"] == "titre":
|
|
title = msg["text"]
|
|
elif msg["channel"]["name"] == "moteur":
|
|
content = IDFMAPI.fromHTMLDisruption(msg["text"])
|
|
elif len(content) == "":
|
|
content = IDFMAPI.fromHTMLDisruption(msg["text"])
|
|
yield {
|
|
"title": title,
|
|
"subtitle": subtitle,
|
|
"description": content,
|
|
"icon": alert_icon(mode, line),
|
|
}
|
|
|
|
def draw_module(self, config, width, height, line_height=19):
|
|
image = Image.new('RGB', (width, height), '#fff')
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
weather = IDFMAPI(config).get_weather()
|
|
|
|
align_x = 0
|
|
align_y = 0
|
|
for mode in ["metros", "rers", "tramways", "buses"]:
|
|
if mode != "rers" and mode != "buses":
|
|
align_x = 0
|
|
|
|
# display mode icon
|
|
icon = Image.open("icons/" + mode + ".png").resize((line_height, line_height))
|
|
image.paste(icon, (align_x,align_y), icon)
|
|
|
|
align_x += line_height + 10
|
|
|
|
for line in weather[mode]:
|
|
if align_x + line_height >= width:
|
|
align_x = line_height + 10
|
|
align_y += line_height + 6
|
|
|
|
states = []
|
|
for disruption in weather[mode][line]:
|
|
if disruption["status"] != "active":
|
|
continue
|
|
if "severity" in disruption:
|
|
if disruption["cause"] == "travaux" and "line" not in [x["pt_object"]["embedded_type"] for x in disruption["impacted_objects"]]:
|
|
states.append(disruption["severity"]["effect"] + " " + disruption["cause"])
|
|
else:
|
|
states.append(disruption["severity"]["effect"])
|
|
|
|
fill = "darkgray"
|
|
if "NO_SERVICE" in states:
|
|
fill = "black"
|
|
elif "REDUCED_SERVICE" in states or "UNKNOWN_EFFECT" in states or "OTHER_EFFECT" in states:
|
|
fill = "teal"
|
|
elif len(states) > 0:
|
|
fill = "gray"
|
|
|
|
icon = IDFMAPI.get_line_icon(mode, line, line_height, fill=fill)
|
|
image.paste(icon, (align_x, align_y), icon)
|
|
|
|
align_x += icon.width + 5
|
|
|
|
if mode != "metros" and mode != "tramways":
|
|
align_y += line_height + 10
|
|
else:
|
|
align_y += 10
|
|
|
|
return image
|
|
|
|
|
|
class RATPNextStopModule:
|
|
|
|
def draw_module(self, config, stops, width, height, line_height=17):
|
|
image = Image.new('RGB', (width, height), '#fff')
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
alerts = []
|
|
|
|
fnt_R = ImageFont.truetype(IDFMAPI.fnt_R_path, line_height)
|
|
fnt_B = ImageFont.truetype(IDFMAPI.fnt_RB_path, line_height)
|
|
|
|
align = 0
|
|
|
|
api = IDFMAPI(config)
|
|
for stop in stops:
|
|
tmp = stop.split("/", 2)
|
|
mode = tmp[0][0]
|
|
line = tmp[0][1:]
|
|
try:
|
|
if 1 < len(tmp) < 4:
|
|
prep = {}
|
|
for s in api.get_schedules(mode, line, *tmp[1:]):
|
|
if s["destination"] not in prep:
|
|
prep[s["destination"]] = []
|
|
prep[s["destination"]].append(s)
|
|
|
|
icon = IDFMAPI.get_line_icon(mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1)))
|
|
image.paste(icon, (0, align), icon)
|
|
|
|
max_dest = 64
|
|
for dest, msgs in prep.items():
|
|
if len(msgs) == 0:
|
|
continue
|
|
|
|
align_x = line_height * 2
|
|
|
|
sz = fnt_B.getsize(dest)[0]
|
|
while sz > max_dest:
|
|
dest = dest[:-1]
|
|
sz = fnt_B.getsize(dest)[0]
|
|
|
|
draw.text(
|
|
(align_x, align),
|
|
dest,
|
|
font=fnt_B, anchor="lt", fill="black"
|
|
)
|
|
|
|
align_x += max_dest + int(line_height/2.5)
|
|
|
|
for msg in [] + msgs:
|
|
draw.text(
|
|
(align_x, align),
|
|
msg["message"],
|
|
font=fnt_R, anchor="lt", fill="black"
|
|
)
|
|
align_x += fnt_R.getsize(msg["message"])[0] + int(line_height/2.5)
|
|
|
|
align += line_height
|
|
align += int(line_height * 0.33)
|
|
except Exception as e:
|
|
alerts.append({
|
|
"title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1],
|
|
"description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)),
|
|
"icon": "wi-earthquake.png",
|
|
})
|
|
|
|
align -= int(line_height * 0.33)
|
|
image = image.crop((0,0,width, min(align, height)))
|
|
|
|
return image, alerts
|