from datetime import datetime, timedelta, timezone import hashlib import json import logging import os import os.path import re import urllib.parse import urllib.request from PIL import Image, ImageDraw, ImageFont def whenStr(date, now): if date < now + timedelta(days=1): return f"{date.hour}h{date.minute}" elif date < now + timedelta(days=7): weekday = date.weekday() if weekday == 0: return "lundi" elif weekday == 1: return "mardi" elif weekday == 2: return "mercredi" elif weekday == 3: return "jeudi" elif weekday == 4: return "vendredi" elif weekday == 5: return "samedi" else: return "dimanche" else: return date.strftime("%d %b") class IDFMAPI: fnt_R_path = "Parisine-Regular.ttf" fnt_RB_path = "Parisine-Bold.ttf" lines = { "metros": { "1": "C01371", "2": "C01372", "3": "C01373", "4": "C01374", "5": "C01375", "6": "C01376", "7": "C01377", "8": "C01378", "9": "C01379", "10": "C01380", "11": "C01381", "12": "C01382", "13": "C01383", "14": "C01384", "3B": "C01386", "7B": "C01387", }, "buses": { "57": "C01094", "125": "C01154", #"131": "C01159", "184": "C01205", }, "rers": { "A": "C01742", "B": "C01743", "C": "C01727", "D": "C01728", "E": "C01729" }, "tramways": { "T2": "C01390", "T3A": "C01391", "T3B": "C01679", "T7": "C01774", "T9": "C02317", }, } def __init__(self, config, apikey=None): self.fnt_R_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path) self.fnt_RB_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path) self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace" self.apikey = apikey or os.environ["TOKEN_IDFM"] self._cached_file = ".ratp-%s.cache" self.cache_timeout = config.cache_timeout self.max_cache_timeout = config.max_cache_timeout def fromHTMLDisruption(src): cleanr = re.compile('<.*?>') cleanrA = re.compile('.*?') period = re.compile('Période :[^.]+. ') period2 = re.compile('Dates? :[^.]+. ') more = re.compile(" Plus d'informations sur [^.]+.") return re.sub(cleanr, '', re.sub(cleanrA, '', re.sub(period, '', re.sub(period2, '', re.sub(more, '', src.replace(' ', ' ').replace(' ', ' ').replace('’', "'").replace('à', 'à').replace('é', 'é').replace('è', 'è').replace('ê', 'ê').replace('û', 'û').replace('Î', 'Î').replace('
', ' ').replace('

', ' ').replace('Information Ile de France Mobilités :', '').replace("Les horaires du calculateur d'itinéraire tiennent compte des travaux. ", '').replace("à la demande de la Préfecture de Police et d'Île-de-France Mobilités, et ", '')))))).strip() def get_schedules(self, mode, line, station, way="A+R"): if mode == "M": mode = "metros" elif mode == "R": mode = "rers" elif mode == "T": mode = "tramways" line = line[1:] elif mode == "B": mode = "buses" elif mode == "N": mode = "noctiliens" cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest()) req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way)) try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError as e: logging.exception(e) except urllib.error.URLError as e: logging.exception(e) except urllib.error.HTTPError as e: logging.exception(e) with open(cache_file) as f: res = json.load(f) # Convert time to hours now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:]) for i in range(len(res["result"]["schedules"])): if "message" in res["result"]["schedules"][i]: if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret" or res["result"]["schedules"][i]["message"] == "A l'arrêt" or res["result"]["schedules"][i]["message"] == "A quai": res["result"]["schedules"][i]["message"] = now.strftime("%H:%M") elif res["result"]["schedules"][i]["message"].endswith(" mn"): res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M") res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "++").replace("Retardé", "++") return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"] def get_weather(self): cache_file = self._cached_file % ("ratp-disruptions") # Read the mod time statinfo = None try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc): # Do the request and save it req = urllib.request.Request(self.baseurl + "/navitia/line_reports/coverage/fr-idf/line_reports?count=2000") req.headers["apikey"] = self.apikey try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError: pass try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc): raise Exception("File too old") # Retrieve cached data res = {} with open(cache_file) as f: res = json.load(f) disruptions = {} for d in res["disruptions"]: disruptions[d["id"]] = d ret = {} for mode in IDFMAPI.lines: ret[mode] = {} for line in IDFMAPI.lines[mode]: ret[mode][line] = self.get_line_weather(res, disruptions, mode, line) return ret def get_line_weather(self, res, disruptions, mode, line): for l in res["line_reports"]: if "line" not in l: continue if "id" in l["line"]: if l["line"]["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]: for link in l["line"]["links"]: if disruptions[link["id"]]["status"] != "past": yield disruptions[link["id"]] return None def get_line_icon(config, mode, line, size, fill="gray"): width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size image = Image.new('RGBA', (width, size), '#fff0') draw = ImageDraw.Draw(image) fnt_icon = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), size-3) if mode == "M" or mode == "metros": draw.ellipse((0, 0, width, size), fill=fill) elif mode == "T" or mode == "tramways": if fill == "black": draw.rectangle((0, 0, width, size), fill=fill) draw.rectangle((0, 0, width, 1), fill=fill if fill != "black" else "gray") draw.rectangle((0, size-2, width, size), fill=fill if fill != "black" else "gray") elif (mode == "R" or mode == "rers") and "rounded_rectangle" in draw.__dict__: draw.rounded_rectangle((0, 0, width, size), radius=4, fill=fill) else: draw.rectangle((0, 0, width, size), fill=fill) draw.text((int(width / 2), int(size / 2)), line, fill="white" if (fill == "black" and mode == "tramways") or (fill != "white" and mode != "tramways") or (fill == "white" and mode == "tramways") else "black", anchor="mm", font=fnt_icon) return image class RATPWeatherModule: def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]): self.major_lines = major_lines def gen_alerts(self, config): alerts = [] weather = IDFMAPI(config).get_weather() id_seens = [] for mode in weather: for line in weather[mode]: if mode[0].upper() + line not in self.major_lines: continue def alert_icon(mode, line): def icon(size=64): image = Image.new('RGB', (size, size), '#000') white = Image.new('RGB', (int(size / 2), int(size / 2)), '#fff') mode_icon = Image.open(os.path.join(config.icons_dir, mode + ".png")).resize((int(size/2), int(size/2))) image.paste(white, (-5,0), mode_icon) line_icon = IDFMAPI.get_line_icon(config, mode, line, int(size/2), fill="white") image.paste(line_icon, (int(size/2) - 5,0), line_icon) return image return icon for disruption in weather[mode][line]: if "messages" not in disruption: continue if disruption["status"] != "active" and "application_periods" not in disruption: continue title = "" subtitle = "" content = "" if "application_periods" in disruption: now = datetime.now() nextweek = now + timedelta(days=1) application_periods = [] for ap in disruption["application_periods"]: ap["begin"] = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S") ap["end"] = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S") if ap["end"] < now: continue elif ap["begin"] > nextweek: continue elif len(application_periods) > 0 and application_periods[0]["begin"] + timedelta(hours=16) < ap["begin"]: continue else: application_periods.append(ap) if len(application_periods) == 0: continue elif len(application_periods) == 1: if application_periods[0]["begin"] > now: subtitle = "De " + whenStr(application_periods[0]["begin"], now) + " à " + whenStr(application_periods[0]["end"], now) elif application_periods[0]["end"] > now: subtitle = "Fin " + whenStr(application_periods[0]["end"], now) for msg in disruption["messages"]: if msg["channel"]["name"] == "titre": title = msg["text"] elif msg["channel"]["name"] == "moteur": content = IDFMAPI.fromHTMLDisruption(msg["text"]) elif len(content) == "": content = IDFMAPI.fromHTMLDisruption(msg["text"]) yield { "title": title, "subtitle": subtitle, "description": content, "icon": alert_icon(mode, line), } def draw_module(self, config, width, height, line_height=19): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) weather = IDFMAPI(config).get_weather() align_x = 0 align_y = 0 for mode in ["metros", "rers", "tramways", "buses"]: if mode != "rers" and mode != "buses": align_x = 0 # display mode icon icon = Image.open(os.path.join(config.icons_dir, mode + ".png")).resize((line_height, line_height)) image.paste(icon, (align_x,align_y), icon) align_x += line_height + 10 for line in weather[mode]: if align_x + line_height >= width: align_x = line_height + 10 align_y += line_height + 6 states = [] for disruption in weather[mode][line]: if disruption["status"] != "active": continue if "severity" in disruption: if disruption["cause"] == "travaux" and "line" not in [x["pt_object"]["embedded_type"] for x in disruption["impacted_objects"]]: states.append(disruption["severity"]["effect"] + " " + disruption["cause"]) else: states.append(disruption["severity"]["effect"]) fill = "darkgray" if "NO_SERVICE" in states: fill = "black" elif "REDUCED_SERVICE" in states or "UNKNOWN_EFFECT" in states or "OTHER_EFFECT" in states: fill = "teal" elif len(states) > 0: fill = "gray" icon = IDFMAPI.get_line_icon(config, mode, line, line_height, fill=fill) image.paste(icon, (align_x, align_y), icon) align_x += icon.width + 5 if mode != "metros" and mode != "tramways": align_y += line_height + 10 else: align_y += 10 return image class RATPNextStopModule: def draw_module(self, config, stops, width, height, line_height=17): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) alerts = [] fnt_R = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path), line_height) fnt_B = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), line_height) align = 0 api = IDFMAPI(config) for stop in stops: tmp = stop.split("/", 2) mode = tmp[0][0] line = tmp[0][1:] try: if 1 < len(tmp) < 4: prep = {} for s in api.get_schedules(mode, line, *tmp[1:]): if s["destination"] not in prep: prep[s["destination"]] = [] prep[s["destination"]].append(s) icon = IDFMAPI.get_line_icon(config, mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1))) image.paste(icon, (0, align), icon) max_dest = 64 for dest, msgs in prep.items(): if len(msgs) == 0: continue align_x = line_height * 2 sz = fnt_B.getsize(dest)[0] while sz > max_dest: dest = dest[:-1] sz = fnt_B.getsize(dest)[0] draw.text( (align_x, align), dest, font=fnt_B, anchor="lt", fill="black" ) align_x += max_dest + int(line_height/2.5) for msg in [] + msgs: draw.text( (align_x, align), msg["message"], font=fnt_R, anchor="lt", fill="black" ) align_x += fnt_R.getsize(msg["message"])[0] + int(line_height/2.5) align += line_height align += int(line_height * 0.33) except Exception as e: alerts.append({ "title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1], "description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)), "icon": "wi-earthquake.png", }) align -= int(line_height * 0.33) image = image.crop((0,0,width, min(align, height))) return image, alerts