Use IDFM website API instead of PRIM for disruptions

This commit is contained in:
nemunaire 2022-12-03 12:39:58 +01:00
parent 0472a98007
commit 381b02a15c

View File

@ -62,6 +62,21 @@ class IDFMAPI:
self._cached_file = ".ratp-%s.cache"
self.cache_time = 5
def fromIVtoPRIM(src):
return {
"InfoChannelRef": {
"value": "Perturbation" if src["severity"] >= 2 else "Travaux",
},
"Content": {
"Message": [{
"MessageType": "SHORT_MESSAGE",
"MessageText": {
"value": src["title"],
}
}],
}
}
def get_schedules(self, mode, line, station, way="A+R"):
if mode == "M":
mode = "metros"
@ -115,7 +130,7 @@ class IDFMAPI:
return ret
def get_line_weather(self, mode, line):
cache_file = self._cached_file % ("traffic-" + IDFMAPI.lines[mode][line])
cache_file = self._cached_file % ("ratp-disruptions")
# Read the mod time
statinfo = None
try:
@ -125,7 +140,7 @@ class IDFMAPI:
if statinfo is None or datetime.fromtimestamp(statinfo.st_mtime, tz=timezone.utc) + timedelta(minutes=self.cache_time) < datetime.now(tz=timezone.utc):
# Do the request and save it
req = urllib.request.Request(self.baseurl + "/general-message?LineRef=STIF:Line::" + IDFMAPI.lines[mode][line] + ":", headers={'apikey': self.apikey})
req = urllib.request.Request("https://api-iv.iledefrance-mobilites.fr/disruptions")
try:
with urllib.request.urlopen(req) as f:
with open(cache_file, 'wb') as fd:
@ -138,7 +153,19 @@ class IDFMAPI:
with open(cache_file) as f:
res = json.load(f)
return res["Siri"]["ServiceDelivery"]["GeneralMessageDelivery"][0]["InfoMessage"]
for l in res["currentIT"]:
if "id" in l:
if l["id"] == "line:IDFM:" + IDFMAPI.lines[mode][line]:
for d in l["disruptions"]:
yield IDFMAPI.fromIVtoPRIM(d)
else:
for d in l["disruptions"]:
for n in d["networks"]:
for l in n["lines"]:
if l["id"] == "line:IDFM:" + line:
yield IDFMAPI.fromIVtoPRIM(d)
return None
def get_line_icon(mode, line, size, fill="gray"):
width = int(size * 1.38) if mode == "buses" or mode == "tramways" else size