from datetime import datetime, timedelta, timezone import hashlib import json import logging import os import re import urllib.parse import urllib.request from PIL import Image, ImageDraw, ImageFont class IDFMAPI: fnt_R_path = "./fonts/Parisine-Regular.ttf" fnt_RB_path = "./fonts/Parisine-Bold.ttf" lines = { "metros": { "1": "C01371", "2": "C01372", "3": "C01373", "4": "C01374", "5": "C01375", "6": "C01376", "7": "C01377", "8": "C01378", "9": "C01379", "10": "C01380", "11": "C01381", "12": "C01382", "13": "C01383", "14": "C01384", "3B": "C01386", "7B": "C01387", }, "buses": { "57": "C01094", "125": "C01154", #"131": "C01159", "184": "C01205", }, "rers": { "A": "C01742", "B": "C01743", "C": "C01727", "D": "C01728", "E": "C01729" }, "tramways": { "T2": "C01390", "T3A": "C01391", "T3B": "C01679", "T7": "C01774", "T9": "C02317", }, } def __init__(self, config, apikey=None): self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace" self.apikey = apikey or os.environ["TOKEN_IDFM"] self._cached_file = ".ratp-%s.cache" self.cache_timeout = config.cache_timeout self.max_cache_timeout = config.max_cache_timeout def fromIVtoPRIM(src, line): cleanr = re.compile('<.*?>') cleanrA = re.compile('.*?') return { "InfoChannelRef": { "value": "Perturbation" if src["severity"] >= 2 else "Travaux", }, "Content": { "Message": [{ "MessageType": "SHORT_MESSAGE", "MessageText": { "id": src["id"], "title": src["title"].replace("Métro " + line + " : ", '').replace("Ligne " + line + " : ", ''), "value": re.sub(cleanr, '', re.sub(cleanrA, '', src["message"].replace(' ', ' ').replace('’', "'").replace('à', 'à').replace('é', 'é').replace('è', 'è').replace('
', ' ').replace('

', ' ').replace('Information Ile de France Mobilités :', ''))).strip(), } }], } } def get_schedules(self, mode, line, station, way="A+R"): if mode == "M": mode = "metros" elif mode == "R": mode = "rers" elif mode == "T": mode = "tramways" elif mode == "B": mode = "buses" elif mode == "N": mode = "noctiliens" cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest()) req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way)) try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError as e: logging.exception(e) except urllib.error.URLError as e: logging.exception(e) except urllib.error.HTTPError as e: logging.exception(e) with open(cache_file) as f: res = json.load(f) # Convert time to hours now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:]) for i in range(len(res["result"]["schedules"])): if "message" in res["result"]["schedules"][i]: if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret": res["result"]["schedules"][i]["message"] = now.strftime("%H:%M") elif res["result"]["schedules"][i]["message"].endswith(" mn"): res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M") res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "...") return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"] def get_weather(self): ret = {} for mode in IDFMAPI.lines: ret[mode] = {} for line in IDFMAPI.lines[mode]: ret[mode][line] = self.get_line_weather(mode, line) return ret def get_line_weather(self, mode, line): cache_file = self._cached_file % ("ratp-disruptions") # Read the mod time statinfo = None try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc): # Do the request and save it req = urllib.request.Request("https://api-iv.iledefrance-mobilites.fr/disruptions") try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError: pass try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc): raise Exception("File too old") # Retrieve cached data res = {} with open(cache_file) as f: res = json.load(f) for l in res["currentIT"]: if "id" in l: if l["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]: for d in l["disruptions"]: yield IDFMAPI.fromIVtoPRIM(d, line) return None def get_line_icon(mode, line, size, fill="gray"): width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size image = Image.new('RGBA', (width, size), '#fff0') draw = ImageDraw.Draw(image) fnt_icon = ImageFont.truetype(IDFMAPI.fnt_RB_path, size-3) if mode == "M" or mode == "metros": draw.ellipse((0, 0, width, size), fill=fill) elif mode == "T" or mode == "tramways": if fill == "black": draw.rectangle((0, 0, width, size), fill=fill) draw.rectangle((0, 0, width, 1), fill=fill if fill != "black" else "gray") draw.rectangle((0, size-2, width, size), fill=fill if fill != "black" else "gray") elif (mode == "R" or mode == "rers") and "rounded_rectangle" in draw.__dict__: draw.rounded_rectangle((0, 0, width, size), radius=4, fill=fill) else: draw.rectangle((0, 0, width, size), fill=fill) draw.text((int(width / 2), int(size / 2)), line, fill="white" if (fill == "black" and mode == "tramways") or (fill != "white" and mode != "tramways") else "black", anchor="mm", font=fnt_icon) return image class RATPWeatherModule: def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]): self.major_lines = major_lines def gen_alerts(self, config): alerts = [] weather = IDFMAPI(config).get_weather() id_seens = [] for mode in weather: for line in weather[mode]: if mode[0].upper() + line not in self.major_lines: continue def alert_icon(mode, line): def icon(size=64): image = Image.new('RGB', (size, size), '#000') white = Image.new('RGB', (int(size / 2), int(size / 2)), '#fff') mode_icon = Image.open("icons/" + mode + ".png").resize((int(size/2), int(size/2))) image.paste(white, (-5,0), mode_icon) line_icon = IDFMAPI.get_line_icon(mode, line, int(size/2), fill="white") image.paste(line_icon, (int(size/2) - 5,0), line_icon) return image return icon for info in weather[mode][line]: if "InfoChannelRef" not in info or (info["InfoChannelRef"]["value"] != "Perturbation" and info["InfoChannelRef"]["value"] != "Travaux"): continue if "Message" not in info["Content"]: continue for msg in info["Content"]["Message"]: if "MessageType" not in msg or msg["MessageType"] != "SHORT_MESSAGE": continue if "id" in msg["MessageText"]: if msg["MessageText"]["id"] in id_seens: continue id_seens.append(msg["MessageText"]["id"]) yield { "title": msg["MessageText"]["title"] if "title" in msg["MessageText"] else "", "description": msg["MessageText"]["value"].replace(" ", " "), "icon": alert_icon(mode, line), } def draw_module(self, config, width, height, line_height=19): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) weather = IDFMAPI(config).get_weather() align_x = 0 align_y = 0 for mode in ["metros", "rers", "tramways", "buses"]: if mode != "rers" and mode != "buses": align_x = 0 # display mode icon icon = Image.open("icons/" + mode + ".png").resize((line_height, line_height)) image.paste(icon, (align_x,align_y), icon) align_x += line_height + 10 for line in weather[mode]: if align_x + line_height >= width: align_x = line_height + 10 align_y += line_height + 6 states = [] for info in weather[mode][line]: if "InfoChannelRef" in info: states.append(info["InfoChannelRef"]["value"]) fill = "gray" if "Perturbation" not in states else "black" icon = IDFMAPI.get_line_icon(mode, line, line_height, fill=fill) image.paste(icon, (align_x, align_y), icon) align_x += icon.width + 5 if mode != "metros" and mode != "tramways": align_y += line_height + 10 else: align_y += 10 return image class RATPNextStopModule: def draw_module(self, config, stops, width, height, line_height=17): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) alerts = [] fnt_R = ImageFont.truetype(IDFMAPI.fnt_R_path, line_height) fnt_B = ImageFont.truetype(IDFMAPI.fnt_RB_path, line_height) align = 0 api = IDFMAPI(config) for stop in stops: tmp = stop.split("/", 2) mode = tmp[0][0] line = tmp[0][1:] try: if 1 < len(tmp) < 4: prep = {} for s in api.get_schedules(mode, line, *tmp[1:]): if s["destination"] not in prep: prep[s["destination"]] = [] prep[s["destination"]].append(s) icon = IDFMAPI.get_line_icon(mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1))) image.paste(icon, (0, align), icon) max_dest = 64 for dest, msgs in prep.items(): if len(msgs) == 0: continue align_x = line_height * 2 sz = fnt_B.getsize(dest)[0] while sz > max_dest: dest = dest[:-1] sz = fnt_B.getsize(dest)[0] draw.text( (align_x, align), dest, font=fnt_B, anchor="lt", fill="black" ) align_x += max_dest + int(line_height/2.5) for msg in [] + msgs: draw.text( (align_x, align), msg["message"], font=fnt_R, anchor="lt", fill="black" ) align_x += fnt_R.getsize(msg["message"])[0] + int(line_height/2.5) align += line_height align += int(line_height * 0.33) except Exception as e: alerts.append({ "title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1], "description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)), "icon": "wi-earthquake.png", }) align -= int(line_height * 0.33) image = image.crop((0,0,width, min(align, height))) return image, alerts