epaper/modules/ratp.py

454 lines
18 KiB
Python

from datetime import datetime, timedelta, timezone
import hashlib
import json
import logging
import os
import os.path
import re
import urllib.parse
import urllib.request
from PIL import Image, ImageDraw, ImageFont
def whenStr(date, now):
if date < now + timedelta(days=1):
if date.minute <= 9:
return f"{date.hour}h0{date.minute}"
else:
return f"{date.hour}h{date.minute}"
elif date < now + timedelta(days=7):
weekday = date.weekday()
if weekday == 0:
return "lundi"
elif weekday == 1:
return "mardi"
elif weekday == 2:
return "mercredi"
elif weekday == 3:
return "jeudi"
elif weekday == 4:
return "vendredi"
elif weekday == 5:
return "samedi"
else:
return "dimanche"
else:
return date.strftime("%d %b")
class IDFMAPI:
fnt_R_path = "Parisine-Regular.ttf"
fnt_RB_path = "Parisine-Bold.ttf"
lines = {
"metros": {
"1": "C01371",
"2": "C01372",
"3": "C01373",
"4": "C01374",
"5": "C01375",
"6": "C01376",
"7": "C01377",
"8": "C01378",
"9": "C01379",
"10": "C01380",
"11": "C01381",
"12": "C01382",
"13": "C01383",
"14": "C01384",
"3B": "C01386",
"7B": "C01387",
},
"buses": {
"57": "C01094",
"125": "C01154",
#"131": "C01159",
"184": "C01205",
},
"rers": {
"A": "C01742",
"B": "C01743",
"C": "C01727",
"D": "C01728",
"E": "C01729"
},
"tramways": {
"T2": "C01390",
"T3A": "C01391",
"T3B": "C01679",
"T7": "C01774",
"T9": "C02317",
},
}
def __init__(self, config, apikey=None):
self.fnt_R_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path)
self.fnt_RB_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path)
self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace/v2"
self.apikey = apikey or os.environ["TOKEN_IDFM"]
self._cached_file = ".ratp-%s.cache"
self.cache_timeout = config.cache_timeout
self.max_cache_timeout = config.max_cache_timeout
def fromHTMLDisruption(src):
cleanr = re.compile('<.*?>')
cleanrA = re.compile('<a.*?>.*?</a>')
period = re.compile('Période :[^.]+. ')
period2 = re.compile('Dates? :[^.]+. ')
more = re.compile(" Plus d'informations sur [^.]+.")
return re.sub(cleanr, '', re.sub(cleanrA, '', re.sub(period, '', re.sub(period2, '', re.sub(more, '', src.replace('&nbsp;', ' ').replace('&#160;', ' ').replace('&#8217;', "'").replace('&#224;', 'à').replace('&#233;', 'é').replace('&#232;', 'è').replace('&#234;', 'ê').replace('&#251;', 'û').replace('&#206;', 'Î').replace('<br>', ' ').replace('</p>', ' ').replace('Information Ile de France Mobilités :', '').replace("Les horaires du calculateur d'itinéraire tiennent compte des travaux. ", '').replace("à la demande de la Préfecture de Police et d'Île-de-France Mobilités, et ", '')))))).replace(" Pour plus d'informations,.", '').replace(" Pour plus d'informations, .", '').strip()
def get_schedules(self, mode, line, station, way="A+R"):
if mode == "M":
mode = "metros"
elif mode == "R":
mode = "rers"
elif mode == "T":
mode = "tramways"
line = line[1:]
elif mode == "B":
mode = "buses"
elif mode == "N":
mode = "noctiliens"
cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest())
req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way))
try:
with urllib.request.urlopen(req) as f:
with open(cache_file, 'wb') as fd:
fd.write(f.read())
except ConnectionResetError as e:
logging.exception(e)
except urllib.error.URLError as e:
logging.exception(e)
except urllib.error.HTTPError as e:
logging.exception(e)
with open(cache_file) as f:
res = json.load(f)
# Convert time to hours
now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:])
for i in range(len(res["result"]["schedules"])):
if "message" in res["result"]["schedules"][i]:
if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret" or res["result"]["schedules"][i]["message"] == "A l'arrêt" or res["result"]["schedules"][i]["message"] == "A quai":
res["result"]["schedules"][i]["message"] = now.strftime("%H:%M")
elif res["result"]["schedules"][i]["message"].endswith(" mn"):
res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M")
res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "++").replace("Retardé", "++")
return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"]
def get_weather(self):
cache_file = self._cached_file % ("ratp-disruptions")
# Read the mod time
statinfo = None
try:
statinfo = os.stat(cache_file)
except:
pass
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc):
# Do the request and save it
req = urllib.request.Request(self.baseurl + "/navitia/line_reports/line_reports?count=2000")
req.headers["apikey"] = self.apikey
try:
with urllib.request.urlopen(req) as f:
with open(cache_file, 'wb') as fd:
fd.write(f.read())
except ConnectionResetError:
pass
try:
statinfo = os.stat(cache_file)
except:
pass
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc):
raise Exception("File too old")
# Retrieve cached data
res = {}
with open(cache_file) as f:
res = json.load(f)
disruptions = {}
for d in res["disruptions"]:
disruptions[d["id"]] = d
ret = {}
for mode in IDFMAPI.lines:
ret[mode] = {}
for line in IDFMAPI.lines[mode]:
ret[mode][line] = self.get_line_weather(res, disruptions, mode, line)
return ret
def get_line_weather(self, res, disruptions, mode, line):
for l in res["line_reports"]:
if "line" not in l:
continue
if "id" in l["line"]:
if l["line"]["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]:
for link in l["line"]["links"]:
if disruptions[link["id"]]["status"] != "past":
yield disruptions[link["id"]]
return None
def get_line_icon(config, mode, line, size, fill="gray"):
width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size
image = Image.new('RGBA', (width, size), '#fff0')
draw = ImageDraw.Draw(image)
fnt_icon = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), size-3)
if mode == "M" or mode == "metros":
draw.ellipse((0, 0, width, size), fill=fill)
elif mode == "T" or mode == "tramways":
if fill == "black":
draw.rectangle((0, 0, width, size), fill=fill)
draw.rectangle((0, 0, width, 1), fill=fill if fill != "black" else "gray")
draw.rectangle((0, size-2, width, size), fill=fill if fill != "black" else "gray")
elif (mode == "R" or mode == "rers") and "rounded_rectangle" in draw.__dict__:
draw.rounded_rectangle((0, 0, width, size), radius=4, fill=fill)
else:
draw.rectangle((0, 0, width, size), fill=fill)
draw.text((int(width / 2), int(size / 2)), line, fill="white" if (fill == "black" and mode == "tramways") or (fill != "white" and mode != "tramways") or (fill == "white" and mode == "tramways") else "black", anchor="mm", font=fnt_icon)
return image
class RATPWeatherModule:
def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]):
self.major_lines = major_lines
def gen_alerts(self, config):
alerts = []
weather = IDFMAPI(config).get_weather()
id_seens = []
for mode in weather:
for line in weather[mode]:
if mode[0].upper() + line not in self.major_lines:
continue
def alert_icon(mode, line):
def icon(size=64):
image = Image.new('RGB', (size, size), '#000')
white = Image.new('RGB', (int(size / 2), int(size / 2)), '#fff')
mode_icon = Image.open(os.path.join(config.icons_dir, mode + ".png")).resize((int(size/2), int(size/2)))
image.paste(white, (-5,0), mode_icon)
line_icon = IDFMAPI.get_line_icon(config, mode, line, int(size/2), fill="white")
image.paste(line_icon, (int(size/2) - 5,0), line_icon)
return image
return icon
for disruption in weather[mode][line]:
if "messages" not in disruption:
continue
if disruption["status"] != "active" and "application_periods" not in disruption:
continue
title = ""
subtitle = ""
content = ""
oneline = False
if "application_periods" in disruption:
now = datetime.now()
yesterday = now + timedelta(days=-1)
nextweek = now + timedelta(days=1)
application_periods = []
for ap in disruption["application_periods"]:
ap["begin"] = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S")
ap["end"] = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S")
if ap["end"] < now:
continue
elif ap["begin"] > nextweek:
continue
elif len(application_periods) > 0 and application_periods[0]["begin"] + timedelta(hours=16) < ap["begin"]:
continue
else:
application_periods.append(ap)
if ap["begin"] < yesterday:
oneline = True
if "cause" in disruption and disruption["cause"] == "travaux" and ap["begin"] > now:
oneline = True
if len(application_periods) == 0:
continue
elif len(application_periods) == 1:
if application_periods[0]["begin"] > now:
subtitle = "De " + whenStr(application_periods[0]["begin"], now) + " à " + whenStr(application_periods[0]["end"], now)
elif application_periods[0]["end"] > now:
subtitle = "Fin " + whenStr(application_periods[0]["end"], now)
for msg in disruption["messages"]:
if msg["channel"]["name"] == "titre":
title = msg["text"]
elif msg["channel"]["name"] == "moteur":
content = IDFMAPI.fromHTMLDisruption(msg["text"])
elif len(content) == "":
content = IDFMAPI.fromHTMLDisruption(msg["text"])
yield {
"title": title,
"subtitle": subtitle,
"description": content,
"icon": alert_icon(mode, line),
"oneline": oneline,
}
def draw_module(self, config, width, height, line_height=19):
image = Image.new('RGB', (width, height), '#fff')
draw = ImageDraw.Draw(image)
weather = IDFMAPI(config).get_weather()
align_x = 0
align_y = 0
for mode in ["metros", "rers", "tramways", "buses"]:
if mode != "rers" and mode != "buses":
align_x = 0
# display mode icon
icon = Image.open(os.path.join(config.icons_dir, mode + ".png")).resize((line_height, line_height))
image.paste(icon, (align_x,align_y), icon)
align_x += line_height + 10
for line in weather[mode]:
if align_x + line_height >= width:
align_x = line_height + 10
align_y += line_height + 6
states = []
for disruption in weather[mode][line]:
if disruption["status"] != "active":
continue
if "severity" in disruption:
if disruption["cause"] == "travaux" and "line" not in [x["pt_object"]["embedded_type"] for x in disruption["impacted_objects"]]:
states.append(disruption["severity"]["effect"] + " " + disruption["cause"])
else:
states.append(disruption["severity"]["effect"])
fill = "darkgray"
if "NO_SERVICE" in states:
fill = "black"
elif "REDUCED_SERVICE" in states or "UNKNOWN_EFFECT" in states or "OTHER_EFFECT" in states:
fill = "teal"
elif len(states) > 0:
fill = "gray"
icon = IDFMAPI.get_line_icon(config, mode, line, line_height, fill=fill)
image.paste(icon, (align_x, align_y), icon)
align_x += icon.width + 5
if mode != "metros" and mode != "tramways":
align_y += line_height + 10
else:
align_y += 10
return image
class RATPNextStopModule:
def draw_module(self, config, stops, width, height, line_height=17):
image = Image.new('RGB', (width, height), '#fff')
draw = ImageDraw.Draw(image)
alerts = []
fnt_R = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path), line_height)
fnt_B = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), line_height)
align = 0
mint = datetime.now() + timedelta(minutes=10)
api = IDFMAPI(config)
for stop in stops:
tmp = stop.split("/", 2)
mode = tmp[0][0]
line = tmp[0][1:]
try:
if 1 < len(tmp) < 4:
prep = {}
for s in api.get_schedules(mode, line, *tmp[1:]):
# Skip too close items
try:
t = datetime.strptime(datetime.now().strftime("%Y-%m-%dT") + s["message"][0:5], "%Y-%m-%dT%H:%M")
if t < mint:
continue
except:
pass
if s["destination"] not in prep:
prep[s["destination"]] = []
prep[s["destination"]].append(s)
icon = IDFMAPI.get_line_icon(config, mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1)))
image.paste(icon, (0, align), icon)
max_dest = 64
for dest, msgs in prep.items():
if len(msgs) == 0:
continue
align_x = line_height * 2
sz = fnt_B.getlength(dest)
while sz > max_dest:
dest = dest[:-1]
sz = fnt_B.getlength(dest)
draw.text(
(align_x, align),
dest,
font=fnt_B, anchor="lt", fill="black"
)
align_x += max_dest + int(line_height/2.5)
for msg in [] + msgs:
draw.text(
(align_x, align),
msg["message"],
font=fnt_R, anchor="lt", fill="black"
)
align_x += fnt_R.getlength(msg["message"]) + int(line_height/2.5)
align += line_height
align += int(line_height * 0.33)
except Exception as e:
alerts.append({
"title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1],
"description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)),
"icon": "wi-earthquake.png",
})
align -= int(line_height * 0.33)
image = image.crop((0,0,width, min(align, height)))
return image, alerts