from datetime import datetime, timedelta, timezone import hashlib import json import logging import os import re import urllib.parse import urllib.request from PIL import Image, ImageDraw, ImageFont def whenStr(date, now): if date < now + timedelta(days=1): return date.hours() + "h" + date.minutes() elif date < now + timedelta(days=7): weekday = date.weekday() if weekday == 0: return "lundi" elif weekday == 1: return "mardi" elif weekday == 2: return "mercredi" elif weekday == 3: return "jeudi" elif weekday == 4: return "vendredi" elif weekday == 5: return "samedi" else: return "dimanche" else: return date.strftime("%d %b") class IDFMAPI: fnt_R_path = "./fonts/Parisine-Regular.ttf" fnt_RB_path = "./fonts/Parisine-Bold.ttf" lines = { "metros": { "1": "C01371", "2": "C01372", "3": "C01373", "4": "C01374", "5": "C01375", "6": "C01376", "7": "C01377", "8": "C01378", "9": "C01379", "10": "C01380", "11": "C01381", "12": "C01382", "13": "C01383", "14": "C01384", "3B": "C01386", "7B": "C01387", }, "buses": { "57": "C01094", "125": "C01154", #"131": "C01159", "184": "C01205", }, "rers": { "A": "C01742", "B": "C01743", "C": "C01727", "D": "C01728", "E": "C01729" }, "tramways": { "T2": "C01390", "T3A": "C01391", "T3B": "C01679", "T7": "C01774", "T9": "C02317", }, } def __init__(self, config, apikey=None): self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace" self.apikey = apikey or os.environ["TOKEN_IDFM"] self._cached_file = ".ratp-%s.cache" self.cache_timeout = config.cache_timeout self.max_cache_timeout = config.max_cache_timeout def fromHTMLDisruption(src): cleanr = re.compile('<.*?>') cleanrA = re.compile('.*?') period = re.compile('Période :[^.]+. ') period2 = re.compile('Dates? :[^.]+. ') more = re.compile(" Plus d'informations sur [^.]+.") return re.sub(cleanr, '', re.sub(cleanrA, '', re.sub(period, '', re.sub(period2, '', re.sub(more, '', src.replace(' ', ' ').replace(' ', ' ').replace('’', "'").replace('à', 'à').replace('é', 'é').replace('è', 'è').replace('ê', 'ê').replace('û', 'û').replace('Î', 'Î').replace('
', ' ').replace('

', ' ').replace('Information Ile de France Mobilités :', '').replace("Les horaires du calculateur d'itinéraire tiennent compte des travaux. ", '').replace("à la demande de la Préfecture de Police et d'Île-de-France Mobilités, et ", '')))))).strip() def get_schedules(self, mode, line, station, way="A+R"): if mode == "M": mode = "metros" elif mode == "R": mode = "rers" elif mode == "T": mode = "tramways" line = line[1:] elif mode == "B": mode = "buses" elif mode == "N": mode = "noctiliens" cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest()) req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way)) try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError as e: logging.exception(e) except urllib.error.URLError as e: logging.exception(e) except urllib.error.HTTPError as e: logging.exception(e) with open(cache_file) as f: res = json.load(f) # Convert time to hours now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:]) for i in range(len(res["result"]["schedules"])): if "message" in res["result"]["schedules"][i]: if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret" or res["result"]["schedules"][i]["message"] == "A l'arrêt" or res["result"]["schedules"][i]["message"] == "A quai": res["result"]["schedules"][i]["message"] = now.strftime("%H:%M") elif res["result"]["schedules"][i]["message"].endswith(" mn"): res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M") res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "++").replace("Retardé", "++") return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"] def get_weather(self): cache_file = self._cached_file % ("ratp-disruptions") # Read the mod time statinfo = None try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc): # Do the request and save it req = urllib.request.Request(self.baseurl + "/navitia/line_reports/coverage/fr-idf/line_reports?count=2000") req.headers["apikey"] = self.apikey try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError: pass try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc): raise Exception("File too old") # Retrieve cached data res = {} with open(cache_file) as f: res = json.load(f) disruptions = {} for d in res["disruptions"]: disruptions[d["id"]] = d ret = {} for mode in IDFMAPI.lines: ret[mode] = {} for line in IDFMAPI.lines[mode]: ret[mode][line] = self.get_line_weather(res, disruptions, mode, line) return ret def get_line_weather(self, res, disruptions, mode, line): for l in res["line_reports"]: if "line" not in l: continue if "id" in l["line"]: if l["line"]["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]: for link in l["line"]["links"]: if disruptions[link["id"]]["status"] != "past": yield disruptions[link["id"]] return None def get_line_icon(mode, line, size, fill="gray"): width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size image = Image.new('RGBA', (width, size), '#fff0') draw = ImageDraw.Draw(image) fnt_icon = ImageFont.truetype(IDFMAPI.fnt_RB_path, size-3) if mode == "M" or mode == "metros": draw.ellipse((0, 0, width, size), fill=fill) elif mode == "T" or mode == "tramways": if fill == "black": draw.rectangle((0, 0, width, size), fill=fill) draw.rectangle((0, 0, width, 1), fill=fill if fill != "black" else "gray") draw.rectangle((0, size-2, width, size), fill=fill if fill != "black" else "gray") elif (mode == "R" or mode == "rers") and "rounded_rectangle" in draw.__dict__: draw.rounded_rectangle((0, 0, width, size), radius=4, fill=fill) else: draw.rectangle((0, 0, width, size), fill=fill) draw.text((int(width / 2), int(size / 2)), line, fill="white" if (fill == "black" and mode == "tramways") or (fill != "white" and mode != "tramways") or (fill == "white" and mode == "tramways") else "black", anchor="mm", font=fnt_icon) return image class RATPWeatherModule: def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]): self.major_lines = major_lines def gen_alerts(self, config): alerts = [] weather = IDFMAPI(config).get_weather() id_seens = [] for mode in weather: for line in weather[mode]: if mode[0].upper() + line not in self.major_lines: continue def alert_icon(mode, line): def icon(size=64): image = Image.new('RGB', (size, size), '#000') white = Image.new('RGB', (int(size / 2), int(size / 2)), '#fff') mode_icon = Image.open("icons/" + mode + ".png").resize((int(size/2), int(size/2))) image.paste(white, (-5,0), mode_icon) line_icon = IDFMAPI.get_line_icon(mode, line, int(size/2), fill="white") image.paste(line_icon, (int(size/2) - 5,0), line_icon) return image return icon for disruption in weather[mode][line]: if "messages" not in disruption: continue if disruption["status"] != "active" and "application_periods" not in disruption: continue title = "" subtitle = "" content = "" if "application_periods" in disruption: now = datetime.now() nextweek = now + timedelta(days=1) application_periods = [] for ap in disruption["application_periods"]: ap["begin"] = datetime.fromisoformat(ap["begin"]) ap["end"] = datetime.fromisoformat(ap["end"]) if ap["end"] < now: continue elif ap["begin"] > nextweek: continue elif len(application_periods) > 0 and application_periods[0]["begin"] + timedelta(hours=16) < ap["begin"]: continue else: application_periods.append(ap) if len(application_periods) == 0: continue elif len(application_periods) == 1: if application_periods[0]["begin"] > now: subtitle = "De " + whenStr(application_periods[0]["begin"], now) + " à " + whenStr(application_periods[0]["end"], now) elif application_periods[0]["end"] > now: subtitle = "Fin " + whenStr(application_periods[0]["end"], now) for msg in disruption["messages"]: if msg["channel"]["name"] == "titre": title = msg["text"] elif msg["channel"]["name"] == "moteur": content = IDFMAPI.fromHTMLDisruption(msg["text"]) elif len(content) == "": content = IDFMAPI.fromHTMLDisruption(msg["text"]) yield { "title": title, "subtitle": subtitle, "description": content, "icon": alert_icon(mode, line), } def draw_module(self, config, width, height, line_height=19): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) weather = IDFMAPI(config).get_weather() align_x = 0 align_y = 0 for mode in ["metros", "rers", "tramways", "buses"]: if mode != "rers" and mode != "buses": align_x = 0 # display mode icon icon = Image.open("icons/" + mode + ".png").resize((line_height, line_height)) image.paste(icon, (align_x,align_y), icon) align_x += line_height + 10 for line in weather[mode]: if align_x + line_height >= width: align_x = line_height + 10 align_y += line_height + 6 states = [] for disruption in weather[mode][line]: if disruption["status"] != "active": continue if "severity" in disruption: if disruption["cause"] == "travaux" and "line" not in [x["pt_object"]["embedded_type"] for x in disruption["impacted_objects"]]: states.append(disruption["severity"]["effect"] + " " + disruption["cause"]) else: states.append(disruption["severity"]["effect"]) fill = "darkgray" if "NO_SERVICE" in states: fill = "black" elif "REDUCED_SERVICE" in states or "UNKNOWN_EFFECT" in states or "OTHER_EFFECT" in states: fill = "teal" elif len(states) > 0: fill = "gray" icon = IDFMAPI.get_line_icon(mode, line, line_height, fill=fill) image.paste(icon, (align_x, align_y), icon) align_x += icon.width + 5 if mode != "metros" and mode != "tramways": align_y += line_height + 10 else: align_y += 10 return image class RATPNextStopModule: def draw_module(self, config, stops, width, height, line_height=17): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) alerts = [] fnt_R = ImageFont.truetype(IDFMAPI.fnt_R_path, line_height) fnt_B = ImageFont.truetype(IDFMAPI.fnt_RB_path, line_height) align = 0 api = IDFMAPI(config) for stop in stops: tmp = stop.split("/", 2) mode = tmp[0][0] line = tmp[0][1:] try: if 1 < len(tmp) < 4: prep = {} for s in api.get_schedules(mode, line, *tmp[1:]): if s["destination"] not in prep: prep[s["destination"]] = [] prep[s["destination"]].append(s) icon = IDFMAPI.get_line_icon(mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1))) image.paste(icon, (0, align), icon) max_dest = 64 for dest, msgs in prep.items(): if len(msgs) == 0: continue align_x = line_height * 2 sz = fnt_B.getsize(dest)[0] while sz > max_dest: dest = dest[:-1] sz = fnt_B.getsize(dest)[0] draw.text( (align_x, align), dest, font=fnt_B, anchor="lt", fill="black" ) align_x += max_dest + int(line_height/2.5) for msg in [] + msgs: draw.text( (align_x, align), msg["message"], font=fnt_R, anchor="lt", fill="black" ) align_x += fnt_R.getsize(msg["message"])[0] + int(line_height/2.5) align += line_height align += int(line_height * 0.33) except Exception as e: alerts.append({ "title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1], "description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)), "icon": "wi-earthquake.png", }) align -= int(line_height * 0.33) image = image.crop((0,0,width, min(align, height))) return image, alerts