from datetime import datetime, timedelta, timezone import hashlib import json import logging import os import os.path import re import urllib.parse import urllib.request from PIL import Image, ImageDraw, ImageFont def whenStr(date, now): if date < now + timedelta(days=1): if date.minute <= 9: return f"{date.hour}h0{date.minute}" else: return f"{date.hour}h{date.minute}" elif date < now + timedelta(days=7): weekday = date.weekday() if weekday == 0: return "lundi" elif weekday == 1: return "mardi" elif weekday == 2: return "mercredi" elif weekday == 3: return "jeudi" elif weekday == 4: return "vendredi" elif weekday == 5: return "samedi" else: return "dimanche" else: return date.strftime("%d %b") class IDFMAPI: fnt_R_path = "Parisine-Regular.ttf" fnt_RB_path = "Parisine-Bold.ttf" lines = { "metros": { "1": "C01371", "2": "C01372", "3": "C01373", "4": "C01374", "5": "C01375", "6": "C01376", "7": "C01377", "8": "C01378", "9": "C01379", "10": "C01380", "11": "C01381", "12": "C01382", "13": "C01383", "14": "C01384", "3B": "C01386", "7B": "C01387", }, "buses": { "57": "C01094", "125": "C01154", #"131": "C01159", "184": "C01205", }, "rers": { "A": "C01742", "B": "C01743", "C": "C01727", "D": "C01728", "E": "C01729" }, "tramways": { "T2": "C01390", "T3A": "C01391", "T3B": "C01679", "T7": "C01774", "T9": "C02317", }, } def __init__(self, config, apikey=None): self.fnt_R_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path) self.fnt_RB_path = os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path) self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace/disruptions_bulk" self.apikey = apikey or os.environ["TOKEN_IDFM"] self._cached_file = ".ratp-%s.cache" self.cache_timeout = config.cache_timeout self.max_cache_timeout = config.max_cache_timeout def fromHTMLDisruption(src): cleanr = re.compile('<.*?>') cleanrA = re.compile('.*?') period = re.compile('Période :[^.]+. ') period2 = re.compile('Dates? :[^.]+. ') more = re.compile(" Plus d'informations sur [^.]+.") return re.sub(cleanr, '', re.sub(cleanrA, '', re.sub(period, '', re.sub(period2, '', re.sub(more, '', src.replace(' ', ' ').replace(' ', ' ').replace('’', "'").replace('à', 'à').replace('é', 'é').replace('è', 'è').replace('ê', 'ê').replace('û', 'û').replace('Î', 'Î').replace('
', ' ').replace('

', ' ').replace('Information Ile de France Mobilités :', '').replace("Les horaires du calculateur d'itinéraire tiennent compte des travaux. ", '').replace("à la demande de la Préfecture de Police et d'Île-de-France Mobilités, et ", '')))))).replace(" Pour plus d'informations,.", '').replace(" Pour plus d'informations, .", '').strip() def get_schedules(self, mode, line, station, way="A+R"): if mode == "M": mode = "metros" elif mode == "R": mode = "rers" elif mode == "T": mode = "tramways" line = line[1:] elif mode == "B": mode = "buses" elif mode == "N": mode = "noctiliens" cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest()) req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way)) try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError as e: logging.exception(e) except urllib.error.URLError as e: logging.exception(e) except urllib.error.HTTPError as e: logging.exception(e) with open(cache_file) as f: res = json.load(f) # Convert time to hours now = datetime.fromisoformat(res["_metadata"]["date"] if len(res["_metadata"]["date"]) <= 25 else res["_metadata"]["date"][0:19] + res["_metadata"]["date"][len(res["_metadata"]["date"])-6:]) for i in range(len(res["result"]["schedules"])): if "message" in res["result"]["schedules"][i]: if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret" or res["result"]["schedules"][i]["message"] == "A l'arrêt" or res["result"]["schedules"][i]["message"] == "A quai": res["result"]["schedules"][i]["message"] = now.strftime("%H:%M") elif res["result"]["schedules"][i]["message"].endswith(" mn"): res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M") res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "++").replace("Retardé", "++") return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"] def get_weather(self): cache_file = self._cached_file % ("ratp-disruptions") # Read the mod time statinfo = None try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_timeout) < datetime.now(tz=timezone.utc): # Do the request and save it req = urllib.request.Request(self.baseurl + "/disruptions/v2") req.headers["apikey"] = self.apikey try: with urllib.request.urlopen(req) as f: with open(cache_file, 'wb') as fd: fd.write(f.read()) except ConnectionResetError: pass try: statinfo = os.stat(cache_file) except: pass if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.max_cache_timeout) < datetime.now(tz=timezone.utc): raise Exception("File too old") # Retrieve cached data res = {} with open(cache_file) as f: res = json.load(f) disruptions = {} for d in res["disruptions"]: disruptions[d["id"]] = d ret = {} for mode in IDFMAPI.lines: ret[mode] = {} for line in IDFMAPI.lines[mode]: ret[mode][line] = self.get_line_weather(res, disruptions, mode, line) return ret def get_line_weather(self, res, disruptions, mode, line): for l in res["lines"]: if l["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]: disruptionIds = [] disruptionTypes = {} for impactedObject in l["impactedObjects"]: for disruptionId in impactedObject["disruptionIds"]: if disruptionId not in disruptionIds: disruptionTypes[disruptionId] = impactedObject["type"] disruptionIds.append(disruptionId) for disruptionId in disruptionIds: impactType = disruptionTypes[disruptionId] status = "past" for ap in disruptions[disruptionId]["applicationPeriods"]: end = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S") if end < datetime.now(): continue begin = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S") if begin > datetime.now(): status = "future" continue status = "active" break if "cause" in disruptions and disruptions["cause"] == "INFORMATION": continue disruptions[disruptionId]["status"] = status disruptions[disruptionId]["scope"] = impactType yield disruptions[disruptionId] return None def get_line_icon(config, mode, line, size, fill="gray", state=""): if mode == "M" or mode == "metros": icon_mode = "metro" mode = "metros" line = line.replace("B", "bis") elif mode == "T" or mode == "tramways": icon_mode = "tram" mode = "tramways" line = line.replace("T", "").lower() elif mode == "R" or mode == "rers": icon_mode = "RER" mode = "rers" elif mode == "B": mode = "buses" img_line = None if mode == "rers" and fill != "white": img = Image.new('RGBA', (size * 2, int(size * 2 * 1.25)), '#fff0') draw = ImageDraw.Draw(img) fnt_icon = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), size*2-9) draw.rounded_rectangle((0, 0, size*2-1, size*2-1), 7, outline=fill, width=2) draw.text((size, size), line, fill=fill, anchor="mm", font=fnt_icon) elif mode == "buses": width = int(size * 1.38) img = Image.new('RGBA', (width, width), '#fff0') draw = ImageDraw.Draw(img) fnt_icon = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), size-3) draw.rectangle((0, 0, width, size), fill) draw.text((int(width / 2), int(size / 2)), line, fill="white", anchor="mm", font=fnt_icon) elif fill == "white": img = Image.open(os.path.join(config.icons_dir, "ratp", "lignes", icon_mode + "_" + line + "_NB" + state + "_RVB.png")) return img.resize((int(size), int(img.height * size / img.width))) else: img_line = Image.open(os.path.join(config.icons_dir, "ratp", "lignes", icon_mode + "_" + line + "_NB_RVB.png")) img = Image.new(img_line.mode, (img_line.width, int(img_line.height * 1.4))) if mode != "rers": img.paste(Image.new("RGB", (img_line.width, img_line.height), fill), (0,0), img_line) else: img.paste(img_line, (0,0)) if state != "": if state.endswith("petit"): coeff = 2.5 elif state.endswith("moyen"): coeff = 2 else: coeff = 1.5 img_perturb = Image.open(os.path.join(config.icons_dir, "ratp", "Perturbation_travaux_NB_RVB.png" if state.startswith("_travaux") else "Perturbation_trafic_NB_RVB.png")) img_perturb = img_perturb.resize((int(img.width / coeff), int(img_perturb.height * img.width / (coeff * img_perturb.width)))) if img_line is not None: img.paste(img_perturb, (img.width-img_perturb.width,img_line.height-int(img_perturb.height/coeff))) else: img.paste(img_perturb, (img.width-img_perturb.width,img.height-img_perturb.height)) return img.resize((int(size), int(img.height * size / img.width))) class RATPWeatherModule: def __init__(self, major_lines=["M7", "M5", "M14", "RB", "T3A"]): self.major_lines = major_lines def gen_alerts(self, config): alerts = [] weather = IDFMAPI(config).get_weather() id_seens = [] for mode in weather: for line in weather[mode]: if mode[0].upper() + line not in self.major_lines: continue def alert_icon(mode, line, state): def icon(size=64): image = Image.new('RGB', (size, size), '#000') line_icon = IDFMAPI.get_line_icon(config, mode, line, int(size/2), fill="white", state=state) if state == "": white = Image.new('RGB', (line_icon.width, line_icon.height), '#fff') image.paste(white, (int(size/4),0), line_icon) else: image.paste(line_icon, (int(size/4),0), line_icon) return image return icon for disruption in weather[mode][line]: if "message" not in disruption: continue if disruption["id"] in id_seens: continue if disruption["status"] != "active" and "applicationPeriods" not in disruption: continue subtitle = "" oneline = False state_type = "" state_importance = "" if "applicationPeriods" in disruption: now = datetime.now() yesterday = now + timedelta(days=-1) nextweek = now + timedelta(days=1) application_periods = [] for ap in disruption["applicationPeriods"]: begin = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S") end = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S") if end < now: continue elif begin > nextweek: continue elif len(application_periods) > 0 and application_periods[0]["begin"] + timedelta(hours=16) < begin: continue else: application_periods.append({"begin": begin, "end": end}) if begin < yesterday: oneline = True if "cause" in disruption and disruption["cause"].lower() == "travaux" and begin > now: oneline = True state_type = "travaux" elif "cause" in disruption and disruption["cause"].lower() == "travaux": state_type = "travaux" else: state_type = "trafic" if "severity" in disruption: if disruption["severity"] == "NO_SERVICE" or disruption["severity"] == "BLOQUANTE": state_importance = "grand" elif disruption["severity"] == "REDUCED_SERVICE" or disruption["severity"] == "UNKNOWN_EFFECT" or disruption["severity"] == "OTHER_EFFECT": state_importance = "moyen" else: state_importance = "petit" if len(application_periods) == 0: continue elif len(application_periods) == 1: if application_periods[0]["begin"] > now: subtitle = "De " + whenStr(application_periods[0]["begin"], now) + " à " + whenStr(application_periods[0]["end"], now) elif application_periods[0]["end"] > now: subtitle = "Fin " + whenStr(application_periods[0]["end"], now) id_seens.append(disruption["id"]) yield { "title": disruption["title"], "subtitle": subtitle, "description": IDFMAPI.fromHTMLDisruption(disruption["message"]), "icon": alert_icon(mode, line, ("_" + state_type + "_" + state_importance) if state_type != "" else ""), "oneline": oneline, } def draw_module(self, config, width, height, line_height=22): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) weather = IDFMAPI(config).get_weather() align_x = 0 align_y = 0 for mode in ["metros", "rers", "tramways", "buses"]: if mode != "rers" and mode != "buses": align_x = 0 for line in weather[mode]: if align_x + line_height >= width: align_x = 0 align_y += int(1.5 * line_height) states = [] for disruption in weather[mode][line]: status = "past" for ap in disruption["applicationPeriods"]: end = datetime.strptime(ap["end"], "%Y%m%dT%H%M%S") if end < datetime.now(): continue begin = datetime.strptime(ap["begin"], "%Y%m%dT%H%M%S") if begin > datetime.now(): status = "future" continue status = "active" break if status != "active": continue if "severity" in disruption: if disruption["cause"] == "TRAVAUX": states.append(disruption["severity"] + " " + disruption["cause"]) else: states.append(disruption["severity"]) state = "" fill = "darkgray" if disruption["scope"] == "line" and ("NO_SERVICE" in states or "BLOQUANTE" in states): fill = "black" state = "_trafic_grand" elif "PERTURBEE" in states: fill = "brown" state = "_trafic_moyen" elif "REDUCED_SERVICE" in states or "UNKNOWN_EFFECT" in states or "OTHER_EFFECT" in states or "NO_SERVICE" in states or "BLOQUANTE" in states: fill = "teal" state = "_trafic_moyen" elif "BLOQUANTE TRAVAUX" in states: fill = "teal" state = "_travaux_moyen" elif "INFORMATION" in states: state = "_travaux_petit" elif len(states) > 0: fill = "gray" state = "_trafic_petit" icon = IDFMAPI.get_line_icon(config, mode, line, line_height, fill=fill, state=state) image.paste(icon, (align_x, align_y), icon) align_x += icon.width + 5 if mode != "metros" and mode != "tramways": align_y += int(1.5 * line_height) else: align_x += int(1.5 * line_height) return image class RATPNextStopModule: def draw_module(self, config, stops, width, height, line_height=17): image = Image.new('RGB', (width, height), '#fff') draw = ImageDraw.Draw(image) alerts = [] fnt_R = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_R_path), line_height) fnt_B = ImageFont.truetype(os.path.join(config.fonts_dir, IDFMAPI.fnt_RB_path), line_height) align = 0 mint = datetime.now() + timedelta(minutes=10) api = IDFMAPI(config) for stop in stops: tmp = stop.split("/", 2) mode = tmp[0][0] line = tmp[0][1:] try: if 1 < len(tmp) < 4: prep = {} for s in api.get_schedules(mode, line, *tmp[1:]): # Skip too close items try: t = datetime.strptime(datetime.now().strftime("%Y-%m-%dT") + s["message"][0:5], "%Y-%m-%dT%H:%M") if t < mint: continue except: pass if s["destination"] not in prep: prep[s["destination"]] = [] prep[s["destination"]].append(s) icon = IDFMAPI.get_line_icon(config, mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1))) image.paste(icon, (0, align), icon) max_dest = 64 for dest, msgs in prep.items(): if len(msgs) == 0: continue align_x = line_height * 2 sz = fnt_B.getlength(dest) while sz > max_dest: dest = dest[:-1] sz = fnt_B.getlength(dest) draw.text( (align_x, align), dest, font=fnt_B, anchor="lt", fill="black" ) align_x += max_dest + int(line_height/2.5) for msg in [] + msgs: draw.text( (align_x, align), msg["message"], font=fnt_R, anchor="lt", fill="black" ) align_x += fnt_R.getlength(msg["message"]) + int(line_height/2.5) align += line_height align += int(line_height * 0.33) except Exception as e: alerts.append({ "title": "Impossible de récupérer les horaires du " + mode + line + " pour " + tmp[1], "description": type(e).__name__ + ": " + (e.message if hasattr(e, 'message') else str(e)), "icon": "weather/wi-earthquake.png", }) align -= int(line_height * 0.33) if align < 0: align = 0 image = image.crop((0,0,width, min(align, height))) return image, alerts