313 lines
11 KiB
Python
313 lines
11 KiB
Python
from datetime import datetime, timedelta, timezone
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import urllib.parse
|
||
import urllib.request
|
||
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
class IDFMAPI:
|
||
|
||
fnt_R_path = "./fonts/Parisine-Regular.ttf"
|
||
fnt_RB_path = "./fonts/Parisine-Bold.ttf"
|
||
|
||
lines = {
|
||
"metros": {
|
||
"1": "C01371",
|
||
"2": "C01372",
|
||
"3": "C01373",
|
||
"4": "C01374",
|
||
"5": "C01375",
|
||
"6": "C01376",
|
||
"7": "C01377",
|
||
"8": "C01378",
|
||
"9": "C01379",
|
||
"10": "C01380",
|
||
"11": "C01381",
|
||
"12": "C01382",
|
||
"13": "C01383",
|
||
"14": "C01384",
|
||
"3B": "C01386",
|
||
"7B": "C01387",
|
||
},
|
||
"buses": {
|
||
"57": "C01094",
|
||
"125": "C01154",
|
||
#"131": "C01159",
|
||
"184": "C01205",
|
||
},
|
||
"rers": {
|
||
"A": "C01742",
|
||
"B": "C01743",
|
||
"C": "C01727",
|
||
"D": "C01728",
|
||
"E": "C01729"
|
||
},
|
||
"tramways": {
|
||
"T2": "C01390",
|
||
"T3A": "C01391",
|
||
"T3B": "C01679",
|
||
"T7": "C01774",
|
||
"T9": "C02317",
|
||
},
|
||
}
|
||
|
||
|
||
|
||
def __init__(self, apikey=None):
|
||
self.baseurl = "https://prim.iledefrance-mobilites.fr/marketplace"
|
||
self.apikey = apikey or os.environ["TOKEN_IDFM"]
|
||
|
||
self._cached_file = ".ratp-%s.cache"
|
||
self.cache_time = 5
|
||
|
||
def get_schedules(self, mode, line, station, way="A+R"):
|
||
if mode == "M":
|
||
mode = "metros"
|
||
elif mode == "R":
|
||
mode = "rers"
|
||
elif mode == "T":
|
||
mode = "tramways"
|
||
elif mode == "B":
|
||
mode = "buses"
|
||
elif mode == "N":
|
||
mode = "noctiliens"
|
||
|
||
cache_file = self._cached_file % ("schedule-" + mode + "-" + line + "-" + hashlib.md5((mode + line + station + way).encode()).hexdigest())
|
||
|
||
req = urllib.request.Request("https://ratp.p0m.fr/api/schedules/%s/%s/%s/%s" % (mode, line, station, way))
|
||
try:
|
||
with urllib.request.urlopen(req) as f:
|
||
with open(cache_file, 'wb') as fd:
|
||
fd.write(f.read())
|
||
except ConnectionResetError:
|
||
pass
|
||
except urllib.error.URLError:
|
||
pass
|
||
except urllib.error.HTTPError:
|
||
pass
|
||
|
||
with open(cache_file) as f:
|
||
res = json.load(f)
|
||
|
||
# Convert time to hours
|
||
now = datetime.fromisoformat(res["_metadata"]["date"])
|
||
|
||
for i in range(len(res["result"]["schedules"])):
|
||
if "message" in res["result"]["schedules"][i]:
|
||
if res["result"]["schedules"][i]["message"] == "Train a l'approche" or res["result"]["schedules"][i]["message"] == "Train à l'approche" or res["result"]["schedules"][i]["message"] == "Train à quai" or res["result"]["schedules"][i]["message"] == "Train a quai" or res["result"]["schedules"][i]["message"] == "A l'approche" or res["result"]["schedules"][i]["message"] == "A l'arret":
|
||
res["result"]["schedules"][i]["message"] = now.strftime("%H:%M")
|
||
elif res["result"]["schedules"][i]["message"].endswith(" mn"):
|
||
res["result"]["schedules"][i]["message"] = (now + timedelta(minutes=int(res["result"]["schedules"][i]["message"].split(" ")[0]))).strftime("%H:%M")
|
||
res["result"]["schedules"][i]["message"] = res["result"]["schedules"][i]["message"].replace(" Retardé", "+").replace("Train retardé", "...")
|
||
|
||
return [m for m in res["result"]["schedules"] if "message" in m and m["message"] != "Train sans arrêt"]
|
||
|
||
def get_weather(self):
|
||
ret = {}
|
||
|
||
for mode in IDFMAPI.lines:
|
||
ret[mode] = {}
|
||
for line in IDFMAPI.lines[mode]:
|
||
ret[mode][line] = self.get_line_weather(mode, line)
|
||
|
||
return ret
|
||
|
||
def get_line_weather(self, mode, line):
|
||
cache_file = self._cached_file % ("traffic-" + IDFMAPI.lines[mode][line])
|
||
# Read the mod time
|
||
statinfo = None
|
||
try:
|
||
statinfo = os.stat(cache_file)
|
||
except:
|
||
pass
|
||
|
||
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_time) < datetime.now(tz=timezone.utc):
|
||
# Do the request and save it
|
||
req = urllib.request.Request(self.baseurl + "/general-message?LineRef=STIF:Line::" + IDFMAPI.lines[mode][line] + ":", headers={'apikey': self.apikey})
|
||
try:
|
||
with urllib.request.urlopen(req) as f:
|
||
with open(cache_file, 'wb') as fd:
|
||
fd.write(f.read())
|
||
except ConnectionResetError:
|
||
pass
|
||
|
||
# Retrieve cached data
|
||
res = {}
|
||
with open(cache_file) as f:
|
||
res = json.load(f)
|
||
|
||
return res["Siri"]["ServiceDelivery"]["GeneralMessageDelivery"][0]["InfoMessage"]
|
||
|
||
def get_line_icon(mode, line, size, fill="gray"):
|
||
width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size
|
||
image = Image.new('RGBA', (width, size), '#fff0')
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
fnt_icon = ImageFont.truetype(IDFMAPI.fnt_RB_path, size-3)
|
||
|
||
if mode == "M" or mode == "metros":
|
||
draw.ellipse((0, 0, width, size), fill=fill)
|
||
elif mode == "T" or mode == "tramways":
|
||
if fill == "black":
|
||
draw.rectangle((0, 0, width, size), fill=fill)
|
||
draw.rectangle((0, 0, width, 1), fill=fill if fill != "black" else "gray")
|
||
draw.rectangle((0, size-2, width, size), fill=fill if fill != "black" else "gray")
|
||
elif (mode == "R" or mode == "rers") and "rounded_rectangle" in draw.__dict__:
|
||
draw.rounded_rectangle((0, 0, width, size), radius=4, fill=fill)
|
||
else:
|
||
draw.rectangle((0, 0, width, size), fill=fill)
|
||
draw.text((int(width / 2), int(size / 2)), line, fill="white" if (fill == "black" and mode == "tramways") or (fill != "white" and mode != "tramways") else "black", anchor="mm", font=fnt_icon)
|
||
|
||
return image
|
||
|
||
|
||
|
||
class RATPWeatherModule:
|
||
|
||
def __init__(self):
|
||
self.major_lines = ["M7", "M5", "M14", "RB", "T3A"]
|
||
|
||
def gen_alerts(self):
|
||
alerts = []
|
||
|
||
weather = IDFMAPI().get_weather()
|
||
for mode in weather:
|
||
for line in weather[mode]:
|
||
if mode[0].upper() + line not in self.major_lines:
|
||
continue
|
||
|
||
def alert_icon(mode, line):
|
||
def icon(size=64):
|
||
image = Image.new('RGB', (size, size), '#000')
|
||
|
||
white = Image.new('RGB', (int(size / 2), int(size / 2)), '#fff')
|
||
mode_icon = Image.open("icons/" + mode + ".png").resize((int(size/2), int(size/2)))
|
||
image.paste(white, (-5,0), mode_icon)
|
||
|
||
line_icon = IDFMAPI.get_line_icon(mode, line, int(size/2), fill="white")
|
||
image.paste(line_icon, (int(size/2) - 5,0), line_icon)
|
||
|
||
return image
|
||
return icon
|
||
|
||
for info in weather[mode][line]:
|
||
if info["InfoChannelRef"]["value"] != "Perturbation":
|
||
continue
|
||
|
||
if "Message" not in info["Content"]:
|
||
continue
|
||
|
||
for msg in info["Content"]["Message"]:
|
||
if msg["MessageType"] != "SHORT_MESSAGE":
|
||
continue
|
||
|
||
yield {
|
||
"description": msg["MessageText"]["value"].replace(" ", " "),
|
||
"icon": alert_icon(mode, line),
|
||
}
|
||
|
||
def draw_module(self, config, width, height, line_height=19):
|
||
image = Image.new('RGB', (width, height), '#fff')
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
weather = IDFMAPI().get_weather()
|
||
|
||
align_x = 0
|
||
align_y = 0
|
||
for mode in ["metros", "rers", "tramways", "buses"]:
|
||
if mode != "rers" and mode != "buses":
|
||
align_x = 0
|
||
|
||
# display mode icon
|
||
icon = Image.open("icons/" + mode + ".png").resize((line_height, line_height))
|
||
image.paste(icon, (align_x,align_y), icon)
|
||
|
||
align_x += line_height + 10
|
||
|
||
for line in weather[mode]:
|
||
if align_x + line_height >= width:
|
||
align_x = line_height + 10
|
||
align_y += line_height + 6
|
||
|
||
states = []
|
||
for info in weather[mode][line]:
|
||
states.append(info["InfoChannelRef"]["value"])
|
||
|
||
fill = "gray" if "Perturbation" not in states else "black"
|
||
|
||
icon = IDFMAPI.get_line_icon(mode, line, line_height, fill=fill)
|
||
image.paste(icon, (align_x, align_y), icon)
|
||
|
||
align_x += icon.width + 5
|
||
|
||
if mode != "metros" and mode != "tramways":
|
||
align_y += line_height + 10
|
||
else:
|
||
align_y += 10
|
||
|
||
return image
|
||
|
||
|
||
class RATPNextStopModule:
|
||
|
||
def draw_module(self, config, stops, width, height, line_height=17):
|
||
image = Image.new('RGB', (width, height), '#fff')
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
fnt_R = ImageFont.truetype(IDFMAPI.fnt_R_path, line_height)
|
||
fnt_B = ImageFont.truetype(IDFMAPI.fnt_RB_path, line_height)
|
||
|
||
align = 0
|
||
|
||
api = IDFMAPI()
|
||
for stop in stops:
|
||
tmp = stop.split("/")
|
||
mode = tmp[0][0]
|
||
line = tmp[0][1:]
|
||
if 1 < len(tmp) < 3:
|
||
prep = {}
|
||
for s in api.get_schedules(mode, line, *tmp[1:]):
|
||
if s["destination"] not in prep:
|
||
prep[s["destination"]] = []
|
||
prep[s["destination"]].append(s)
|
||
|
||
icon = IDFMAPI.get_line_icon(mode, line, int(line_height*(1.5 if len(prep.keys()) > 1 else 1)))
|
||
image.paste(icon, (0, align), icon)
|
||
|
||
max_dest = 64
|
||
for dest, msgs in prep.items():
|
||
if len(msgs) == 0:
|
||
continue
|
||
|
||
align_x = line_height * 2
|
||
|
||
sz = fnt_B.getsize(dest)[0]
|
||
while sz > max_dest:
|
||
dest = dest[:-1]
|
||
sz = fnt_B.getsize(dest)[0]
|
||
|
||
draw.text(
|
||
(align_x, align),
|
||
dest,
|
||
font=fnt_B, anchor="lt", fill="black"
|
||
)
|
||
|
||
align_x += max_dest + int(line_height/2.5)
|
||
|
||
for msg in [] + msgs + msgs:
|
||
draw.text(
|
||
(align_x, align),
|
||
msg["message"],
|
||
font=fnt_R, anchor="lt", fill="black"
|
||
)
|
||
align_x += fnt_R.getsize(msg["message"])[0] + int(line_height/2.5)
|
||
|
||
align += line_height
|
||
align += int(line_height * 0.33)
|
||
|
||
image = image.crop((0,0,width, min(align, height)))
|
||
|
||
return image
|