epaper/modules/ratp.py

524 lines
22 KiB
Python

from datetime import datetime, timedelta, timezone
import hashlib
import json
import logging
import os
import os.path
import re
import urllib.parse
import urllib.request
from PIL import Image, ImageDraw, ImageFont
def whenStr(date, now):
if date < now + timedelta(days=1):
if date.minute <= 9:
return f"{date.hour}h0{date.minute}"
else:
return f"{date.hour}h{date.minute}"
elif date < now + timedelta(days=7):
weekday = date.weekday()
if weekday == 0:
return "lundi"
elif weekday == 1:
return "mardi"
elif weekday == 2:
return "mercredi"
elif weekday == 3:
return "jeudi"
elif weekday == 4:
return "vendredi"
elif weekday == 5:
return "samedi"
else:
return "dimanche"
else:
return date.strftime("%d %b")
class IDFMAPI:
fnt_R_path = "Parisine-Regular.ttf"
fnt_RB_path = "Parisine-Bold.ttf"
lines = {
"metros": {
"1": "C01371",
"2": "C01372",
"3": "C01373",
"4": "C01374",
"5": "C01375",
"6": "C01376",
"7": "C01377",
"8": "C01378",
"9": "C01379",
"10": "C01380",
"11": "C01381",
"12": "C01382",
"13": "C01383",
"14": "C01384",
"3B": "C01386",
"7B": "C01387",
},
"buses": {
"57": "C01094",
"125": "C01154",
#"131": "C01159",
"184": "C01205",
},
"rers": {
"A": "C01742",
"B": "C01743",
"C": "C01727",
"D": "C01728",
"E": "C01729"
},
"tramways": {
"T2": "C01390",
"T3A": "C01391",
"T3B": "C01679",
"T7": "C01774",
"T9": "C02317",
},
}
def __init__(self, config, apikey=None):
self.fnt_R_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path)
self.fnt_RB_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path)
self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace/disruptions_bulk"
self.apikey = apikey or os.environ["TOKEN_IDFM"]
self._cached_file = ".ratp-%s.cache"
self.cache_timeout = config.cache_timeout
self.max_cache_timeout = config.max_cache_timeout
def fromHTMLDisruption(src):
cleanr = re.compile('<.*?>')
cleanrA = re.compile('<a.*?>.*?</a>')
period = re.compile('Période :[^.]+. ')
period2 = re.compile('Dates? :[^.]+. ')
more = re.compile(" Plus d'informations sur [^.]+.")
return re.sub(cleanr, '', re.sub(cleanrA, '', re.sub(period, '', re.sub(period2, '', re.sub(more, '', src.replace('&nbsp;', ' ').replace('&#160;', ' ').replace('&#8217;', "'").replace('&#224;', 'à').replace('&#233;', 'é').replace('&#232;', 'è').replace('&#234;', 'ê').replace('&#251;', 'û').replace('&#206;', 'Î').replace('<br>', ' ').replace('</p>', ' ').replace('Information Ile de France Mobilités :', '').replace("Les horaires du calculateur d'itinéraire tiennent compte des travaux. ", '').replace("à la demande de la Préfecture de Police et d'Île-de-France Mobilités, et ", '')))))).replace(" Pour plus d'informations,.", '').replace(" Pour plus d'informations, .", '').strip()
def get_schedules(self, mode, line, station, way="A+R"):
if mode == "M":
mode = "metros"
elif mode == "R":
mode = "rers"
elif mode == "T":
mode = "tramways"
line = line[1:]
elif mode == "B":
mode = "buses"
elif mode == "N":
mode = "noctiliens"
cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest())
req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way))
try:
with urllib.request.urlopen(req) as f:
with open(cache_file, 'wb') as fd:
fd.write(f.read())
except ConnectionResetError as e:
logging.exception(e)
except urllib.error.URLError as e:
logging.exception(e)
except urllib.error.HTTPError as e:
logging.exception(e)
with open(cache_file) as f:
res = json.load(f)
# Convert time to hours
now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:])
for i in range(len(res["result"]["schedules"])):
if "message" in res["result"]["schedules"][i]:
if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret" or res["result"]["schedules"][i]["message"] == "A l'arrêt" or res["result"]["schedules"][i]["message"] == "A quai":
res["result"]["schedules"][i]["message"] = now.strftime("%H:%M")
elif res["result"]["schedules"][i]["message"].endswith(" mn"):
res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M")
res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "++").replace("Retardé", "++")
return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"]
def get_weather(self):
cache_file = self._cached_file % ("ratp-disruptions")
# Read the mod time
statinfo = None
try:
statinfo = os.stat(cache_file)
except:
pass
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc):
# Do the request and save it
req = urllib.request.Request(self.baseurl + "/disruptions/v2")
req.headers["apikey"] = self.apikey
try:
with urllib.request.urlopen(req) as f:
with open(cache_file, 'wb') as fd:
fd.write(f.read())
except ConnectionResetError:
pass
try:
statinfo = os.stat(cache_file)
except:
pass
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc):
raise Exception("File too old")
# Retrieve cached data
res = {}
with open(cache_file) as f:
res = json.load(f)
disruptions = {}
for d in res["disruptions"]:
disruptions[d["id"]] = d
ret = {}
for mode in IDFMAPI.lines:
ret[mode] = {}
for line in IDFMAPI.lines[mode]:
ret[mode][line] = self.get_line_weather(res, disruptions, mode, line)
return ret
def get_line_weather(self, res, disruptions, mode, line):
for l in res["lines"]:
if l["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]:
for impactedObject in l["impactedObjects"]:
for disruptionId in impactedObject["disruptionIds"]:
status = "past"
for ap in disruptions[disruptionId]["applicationPeriods"]:
end = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S")
if end < datetime.now():
continue
begin = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S")
if begin > datetime.now():
status = "future"
continue
status = "active"
break
disruptions[disruptionId]["status"] = status
yield disruptions[disruptionId]
return None
def get_line_icon(config, mode, line, size, fill="gray", state=""):
if mode == "M" or mode == "metros":
icon_mode = "metro"
line = line.replace("B", "bis")
elif mode == "T" or mode == "tramways":
icon_mode = "tram"
line = line.replace("T", "").lower()
elif mode == "R" or mode == "rers":
icon_mode = "RER"
img_line = None
if mode == "rers" and fill != "white":
img = Image.new('RGBA', (size * 2, int(size * 2 * 1.25)), '#fff0')
draw = ImageDraw.Draw(img)
fnt_icon = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), size*2-9)
draw.rounded_rectangle((0, 0, size*2-1, size*2-1), 7, outline=fill, width=2)
draw.text((size, size), line, fill=fill, anchor="mm", font=fnt_icon)
elif mode == "buses":
width = int(size * 1.38)
img = Image.new('RGBA', (width, width), '#fff0')
draw = ImageDraw.Draw(img)
fnt_icon = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), size-3)
draw.rectangle((0, 0, width, size), fill)
draw.text((int(width / 2), int(size / 2)), line, fill="white", anchor="mm", font=fnt_icon)
elif fill == "white":
img = Image.open(os.path.join(config.icons_dir, "ratp", "lignes", icon_mode + "_" + line + "_NB" + state + "_RVB.png"))
return img.resize((int(size), int(img.height * size / img.width)))
else:
img_line = Image.open(os.path.join(config.icons_dir, "ratp", "lignes", icon_mode + "_" + line + "_NB_RVB.png"))
img = Image.new(img_line.mode, (img_line.width, int(img_line.height * 1.4)))
if mode != "rers":
img.paste(Image.new("RGB", (img_line.width, img_line.height), fill), (0,0), img_line)
else:
img.paste(img_line, (0,0))
if state != "":
if state.endswith("petit"):
coeff = 2.5
elif state.endswith("moyen"):
coeff = 2
else:
coeff = 1.5
img_perturb = Image.open(os.path.join(config.icons_dir, "ratp", "Perturbation_travaux_NB_RVB.png" if state.startswith("_travaux") else "Perturbation_trafic_NB_RVB.png"))
img_perturb = img_perturb.resize((int(img.width / coeff), int(img_perturb.height * img.width / (coeff * img_perturb.width))))
if img_line is not None:
img.paste(img_perturb, (img.width-img_perturb.width,img_line.height-int(img_perturb.height/coeff)))
else:
img.paste(img_perturb, (img.width-img_perturb.width,img.height-img_perturb.height))
return img.resize((int(size), int(img.height * size / img.width)))
class RATPWeatherModule:
def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]):
self.major_lines = major_lines
def gen_alerts(self, config):
alerts = []
weather = IDFMAPI(config).get_weather()
id_seens = []
for mode in weather:
for line in weather[mode]:
if mode[0].upper() + line not in self.major_lines:
continue
def alert_icon(mode, line, state):
def icon(size=64):
image = Image.new('RGB', (size, size), '#000')
line_icon = IDFMAPI.get_line_icon(config, mode, line, int(size/2), fill="white", state=state)
if state == "":
white = Image.new('RGB', (line_icon.width, line_icon.height), '#fff')
image.paste(white, (int(size/4),0), line_icon)
else:
image.paste(line_icon, (int(size/4),0), line_icon)
return image
return icon
for disruption in weather[mode][line]:
if "message" not in disruption:
continue
if disruption["id"] in id_seens:
continue
if disruption["status"] != "active" and "applicationPeriods" not in disruption:
continue
subtitle = ""
oneline = False
state_type = ""
state_importance = ""
if "applicationPeriods" in disruption:
now = datetime.now()
yesterday = now + timedelta(days=-1)
nextweek = now + timedelta(days=1)
application_periods = []
for ap in disruption["applicationPeriods"]:
begin = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S")
end = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S")
if end < now:
continue
elif begin > nextweek:
continue
elif len(application_periods) > 0 and application_periods[0]["begin"] + timedelta(hours=16) < begin:
continue
else:
application_periods.append({"begin": begin, "end": end})
if begin < yesterday:
oneline = True
if "cause" in disruption and disruption["cause"].lower() == "travaux" and begin > now:
oneline = True
state_type = "travaux"
elif "cause" in disruption and disruption["cause"].lower() == "travaux":
state_type = "travaux"
else:
state_type = "trafic"
if "severity" in disruption:
if disruption["severity"] == "NO_SERVICE" or disruption["severity"] == "BLOQUANTE":
state_importance = "grand"
elif disruption["severity"] == "REDUCED_SERVICE" or disruption["severity"] == "UNKNOWN_EFFECT" or disruption["severity"] == "OTHER_EFFECT":
state_importance = "moyen"
else:
state_importance = "petit"
if len(application_periods) == 0:
continue
elif len(application_periods) == 1:
if application_periods[0]["begin"] > now:
subtitle = "De " + whenStr(application_periods[0]["begin"], now) + " à " + whenStr(application_periods[0]["end"], now)
elif application_periods[0]["end"] > now:
subtitle = "Fin " + whenStr(application_periods[0]["end"], now)
id_seens.append(disruption["id"])
yield {
"title": disruption["title"],
"subtitle": subtitle,
"description": IDFMAPI.fromHTMLDisruption(disruption["message"]),
"icon": alert_icon(mode, line, ("_" + state_type + "_" + state_importance) if state_type != "" else ""),
"oneline": oneline,
}
def draw_module(self, config, width, height, line_height=22):
image = Image.new('RGB', (width, height), '#fff')
draw = ImageDraw.Draw(image)
weather = IDFMAPI(config).get_weather()
align_x = 0
align_y = 0
for mode in ["metros", "rers", "tramways", "buses"]:
if mode != "rers" and mode != "buses":
align_x = 0
for line in weather[mode]:
if align_x + line_height >= width:
align_x = 0
align_y += int(1.5 * line_height)
states = []
for disruption in weather[mode][line]:
status = "past"
for ap in disruption["applicationPeriods"]:
end = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S")
if end < datetime.now():
continue
begin = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S")
if begin > datetime.now():
status = "future"
continue
status = "active"
break
if status != "active":
continue
if "severity" in disruption:
if disruption["cause"] == "TRAVAUX":
states.append(disruption["severity"] + " " + disruption["cause"])
else:
states.append(disruption["severity"])
state = ""
fill = "darkgray"
if "NO_SERVICE" in states:
fill = "black"
state = "_trafic_grand"
elif "REDUCED_SERVICE" in states or "UNKNOWN_EFFECT" in states or "OTHER_EFFECT" in states:
fill = "teal"
state = "_trafic_moyen"
elif "BLOQUANTE TRAVAUX" in states:
fill = "teal"
state = "_travaux_moyen"
elif len(states) > 0:
fill = "gray"
state = "_trafic_petit"
icon = IDFMAPI.get_line_icon(config, mode, line, line_height, fill=fill, state=state)
image.paste(icon, (align_x, align_y), icon)
align_x += icon.width + 5
if mode != "metros" and mode != "tramways":
align_y += int(1.5 * line_height)
else:
align_x += int(1.5 * line_height)
return image
class RATPNextStopModule:
def draw_module(self, config, stops, width, height, line_height=17):
image = Image.new('RGB', (width, height), '#fff')
draw = ImageDraw.Draw(image)
alerts = []
fnt_R = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path), line_height)
fnt_B = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), line_height)
align = 0
mint = datetime.now() + timedelta(minutes=10)
api = IDFMAPI(config)
for stop in stops:
tmp = stop.split("/", 2)
mode = tmp[0][0]
line = tmp[0][1:]
try:
if 1 < len(tmp) < 4:
prep = {}
for s in api.get_schedules(mode, line, *tmp[1:]):
# Skip too close items
try:
t = datetime.strptime(datetime.now().strftime("%Y-%m-%dT") + s["message"][0:5], "%Y-%m-%dT%H:%M")
if t < mint:
continue
except:
pass
if s["destination"] not in prep:
prep[s["destination"]] = []
prep[s["destination"]].append(s)
icon = IDFMAPI.get_line_icon(config, mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1)))
image.paste(icon, (0, align), icon)
max_dest = 64
for dest, msgs in prep.items():
if len(msgs) == 0:
continue
align_x = line_height * 2
sz = fnt_B.getlength(dest)
while sz > max_dest:
dest = dest[:-1]
sz = fnt_B.getlength(dest)
draw.text(
(align_x, align),
dest,
font=fnt_B, anchor="lt", fill="black"
)
align_x += max_dest + int(line_height/2.5)
for msg in [] + msgs:
draw.text(
(align_x, align),
msg["message"],
font=fnt_R, anchor="lt", fill="black"
)
align_x += fnt_R.getlength(msg["message"]) + int(line_height/2.5)
align += line_height
align += int(line_height * 0.33)
except Exception as e:
alerts.append({
"title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1],
"description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)),
"icon": "weather/wi-earthquake.png",
})
align -= int(line_height * 0.33)
image = image.crop((0,0,width, min(align, height)))
return image, alerts