2022-08-14 13:19:51 +00:00
from datetime import datetime , timedelta , timezone
2022-08-21 02:03:33 +00:00
import hashlib
2022-08-14 13:19:51 +00:00
import json
2023-01-16 12:48:41 +00:00
import logging
2022-08-14 13:19:51 +00:00
import os
2023-01-19 11:04:08 +00:00
import re
2022-08-14 13:19:51 +00:00
import urllib . parse
import urllib . request
from PIL import Image , ImageDraw , ImageFont
2022-08-14 16:24:46 +00:00
class IDFMAPI :
2022-08-14 13:19:51 +00:00
fnt_R_path = " ./fonts/Parisine-Regular.ttf "
fnt_RB_path = " ./fonts/Parisine-Bold.ttf "
2022-08-14 16:24:46 +00:00
lines = {
" metros " : {
" 1 " : " C01371 " ,
" 2 " : " C01372 " ,
" 3 " : " C01373 " ,
" 4 " : " C01374 " ,
" 5 " : " C01375 " ,
" 6 " : " C01376 " ,
" 7 " : " C01377 " ,
" 8 " : " C01378 " ,
" 9 " : " C01379 " ,
" 10 " : " C01380 " ,
" 11 " : " C01381 " ,
" 12 " : " C01382 " ,
" 13 " : " C01383 " ,
" 14 " : " C01384 " ,
" 3B " : " C01386 " ,
" 7B " : " C01387 " ,
} ,
" buses " : {
" 57 " : " C01094 " ,
" 125 " : " C01154 " ,
#"131": "C01159",
" 184 " : " C01205 " ,
} ,
" rers " : {
" A " : " C01742 " ,
" B " : " C01743 " ,
" C " : " C01727 " ,
" D " : " C01728 " ,
" E " : " C01729 "
} ,
" tramways " : {
" T2 " : " C01390 " ,
" T3A " : " C01391 " ,
" T3B " : " C01679 " ,
" T7 " : " C01774 " ,
" T9 " : " C02317 " ,
} ,
}
2022-12-30 18:54:21 +00:00
def __init__ ( self , config , apikey = None ) :
2022-08-14 16:24:46 +00:00
self . baseurl = " https://prim.iledefrance-mobilites.fr/marketplace "
self . apikey = apikey or os . environ [ " TOKEN_IDFM " ]
2022-08-14 13:19:51 +00:00
self . _cached_file = " .ratp- %s .cache "
2022-12-30 18:54:21 +00:00
self . cache_timeout = config . cache_timeout
self . max_cache_timeout = config . max_cache_timeout
2022-08-14 13:19:51 +00:00
2023-01-19 11:04:08 +00:00
def fromIVtoPRIM ( src , line ) :
cleanr = re . compile ( ' <.*?> ' )
cleanrA = re . compile ( ' <a.*?>.*?</a> ' )
2022-12-03 11:39:58 +00:00
return {
" InfoChannelRef " : {
2023-01-19 11:57:53 +00:00
" value " : " Perturbation " if src [ " severity " ] > = 2 else ( " Travaux " if src [ " severity " ] == 1 else " Message " ) ,
2022-12-03 11:39:58 +00:00
} ,
" Content " : {
" Message " : [ {
" MessageType " : " SHORT_MESSAGE " ,
" MessageText " : {
2023-01-19 11:04:08 +00:00
" id " : src [ " id " ] ,
2023-01-19 11:57:25 +00:00
" title " : src [ " title " ] . replace ( " Métro " + line + " : " , ' ' ) . replace ( " Ligne " + line + " : " , ' ' ) . replace ( " Tramway " + line + " : " , ' ' ) ,
2023-01-19 11:04:08 +00:00
" value " : re . sub ( cleanr , ' ' , re . sub ( cleanrA , ' ' , src [ " message " ] . replace ( ' ' , ' ' ) . replace ( ' ’ ' , " ' " ) . replace ( ' à ' , ' à ' ) . replace ( ' é ' , ' é ' ) . replace ( ' è ' , ' è ' ) . replace ( ' <br> ' , ' ' ) . replace ( ' </p> ' , ' ' ) . replace ( ' Information Ile de France Mobilités : ' , ' ' ) ) ) . strip ( ) ,
2022-12-03 11:39:58 +00:00
}
} ] ,
}
}
2022-08-21 02:03:33 +00:00
def get_schedules ( self , mode , line , station , way = " A+R " ) :
if mode == " M " :
mode = " metros "
elif mode == " R " :
mode = " rers "
elif mode == " T " :
mode = " tramways "
2023-01-19 11:57:25 +00:00
line = line [ 1 : ]
2022-08-21 02:03:33 +00:00
elif mode == " B " :
mode = " buses "
elif mode == " N " :
mode = " noctiliens "
cache_file = self . _cached_file % ( " schedule- " + mode + " - " + line + " - " + hashlib . md5 ( ( mode + line + station + way ) . encode ( ) ) . hexdigest ( ) )
2023-01-16 01:27:23 +00:00
req = urllib . request . Request ( " https://ratp.p0m.fr/api/schedules/ %s / %s / %s / %s " % ( mode , line , station , way ) )
2022-08-21 02:03:33 +00:00
try :
with urllib . request . urlopen ( req ) as f :
with open ( cache_file , ' wb ' ) as fd :
fd . write ( f . read ( ) )
2023-01-16 12:48:41 +00:00
except ConnectionResetError as e :
logging . exception ( e )
except urllib . error . URLError as e :
logging . exception ( e )
except urllib . error . HTTPError as e :
logging . exception ( e )
2022-08-21 02:03:33 +00:00
with open ( cache_file ) as f :
res = json . load ( f )
# Convert time to hours
2023-01-16 01:27:23 +00:00
now = datetime . fromisoformat ( res [ " _metadata " ] [ " date " ] if len ( res [ " _metadata " ] [ " date " ] ) < = 25 else res [ " _metadata " ] [ " date " ] [ 0 : 19 ] + res [ " _metadata " ] [ " date " ] [ len ( res [ " _metadata " ] [ " date " ] ) - 6 : ] )
2022-08-21 02:03:33 +00:00
2022-08-22 07:15:12 +00:00
for i in range ( len ( res [ " result " ] [ " schedules " ] ) ) :
if " message " in res [ " result " ] [ " schedules " ] [ i ] :
2023-01-19 11:57:53 +00:00
if res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " Train a l ' approche " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " Train à l ' approche " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " Train à quai " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " Train a quai " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " A l ' approche " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " A l ' arret " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " A l ' arrêt " or res [ " result " ] [ " schedules " ] [ i ] [ " message " ] == " A quai " :
2022-08-22 07:15:12 +00:00
res [ " result " ] [ " schedules " ] [ i ] [ " message " ] = now . strftime ( " % H: % M " )
elif res [ " result " ] [ " schedules " ] [ i ] [ " message " ] . endswith ( " mn " ) :
res [ " result " ] [ " schedules " ] [ i ] [ " message " ] = ( now + timedelta ( minutes = int ( res [ " result " ] [ " schedules " ] [ i ] [ " message " ] . split ( " " ) [ 0 ] ) ) ) . strftime ( " % H: % M " )
2023-01-19 11:57:53 +00:00
res [ " result " ] [ " schedules " ] [ i ] [ " message " ] = res [ " result " ] [ " schedules " ] [ i ] [ " message " ] . replace ( " Retardé " , " + " ) . replace ( " Train retardé " , " ++ " ) . replace ( " Retardé " , " ++ " )
2022-08-21 02:03:33 +00:00
2022-08-22 07:15:12 +00:00
return [ m for m in res [ " result " ] [ " schedules " ] if " message " in m and m [ " message " ] != " Train sans arrêt " ]
2022-08-21 02:03:33 +00:00
2022-08-14 13:19:51 +00:00
def get_weather ( self ) :
2022-08-14 16:24:46 +00:00
ret = { }
for mode in IDFMAPI . lines :
ret [ mode ] = { }
for line in IDFMAPI . lines [ mode ] :
ret [ mode ] [ line ] = self . get_line_weather ( mode , line )
return ret
def get_line_weather ( self , mode , line ) :
2022-12-03 11:39:58 +00:00
cache_file = self . _cached_file % ( " ratp-disruptions " )
2022-08-14 13:19:51 +00:00
# Read the mod time
statinfo = None
try :
2022-08-14 16:24:46 +00:00
statinfo = os . stat ( cache_file )
2022-08-14 13:19:51 +00:00
except :
pass
2022-12-30 18:54:21 +00:00
if statinfo is None or datetime . fromtimestamp ( statinfo . st_mtime , tz = timezone . utc ) + timedelta ( minutes = self . cache_timeout ) < datetime . now ( tz = timezone . utc ) :
2022-08-14 13:19:51 +00:00
# Do the request and save it
2022-12-03 11:39:58 +00:00
req = urllib . request . Request ( " https://api-iv.iledefrance-mobilites.fr/disruptions " )
2022-08-14 18:27:54 +00:00
try :
with urllib . request . urlopen ( req ) as f :
with open ( cache_file , ' wb ' ) as fd :
fd . write ( f . read ( ) )
except ConnectionResetError :
pass
2022-08-14 13:19:51 +00:00
2022-12-30 18:54:21 +00:00
try :
statinfo = os . stat ( cache_file )
except :
pass
if statinfo is None or datetime . fromtimestamp ( statinfo . st_mtime , tz = timezone . utc ) + timedelta ( minutes = self . max_cache_timeout ) < datetime . now ( tz = timezone . utc ) :
raise Exception ( " File too old " )
2022-08-14 13:19:51 +00:00
# Retrieve cached data
res = { }
2022-08-14 16:24:46 +00:00
with open ( cache_file ) as f :
2022-08-14 13:19:51 +00:00
res = json . load ( f )
2022-12-03 11:39:58 +00:00
for l in res [ " currentIT " ] :
if " id " in l :
if l [ " id " ] == " line:IDFM: " + IDFMAPI . lines [ mode ] [ line ] :
for d in l [ " disruptions " ] :
2023-01-19 11:04:08 +00:00
yield IDFMAPI . fromIVtoPRIM ( d , line )
2022-12-03 11:39:58 +00:00
return None
2022-08-14 16:24:46 +00:00
def get_line_icon ( mode , line , size , fill = " gray " ) :
width = int ( size * 1.38 ) if mode == " buses " or mode == " tramways " else size
image = Image . new ( ' RGBA ' , ( width , size ) , ' #fff0 ' )
draw = ImageDraw . Draw ( image )
fnt_icon = ImageFont . truetype ( IDFMAPI . fnt_RB_path , size - 3 )
2022-08-21 02:03:33 +00:00
if mode == " M " or mode == " metros " :
2022-08-14 16:24:46 +00:00
draw . ellipse ( ( 0 , 0 , width , size ) , fill = fill )
2022-08-21 02:03:33 +00:00
elif mode == " T " or mode == " tramways " :
2022-08-14 16:51:03 +00:00
if fill == " black " :
draw . rectangle ( ( 0 , 0 , width , size ) , fill = fill )
draw . rectangle ( ( 0 , 0 , width , 1 ) , fill = fill if fill != " black " else " gray " )
draw . rectangle ( ( 0 , size - 2 , width , size ) , fill = fill if fill != " black " else " gray " )
2022-08-21 08:30:57 +00:00
elif ( mode == " R " or mode == " rers " ) and " rounded_rectangle " in draw . __dict__ :
2022-08-19 14:57:59 +00:00
draw . rounded_rectangle ( ( 0 , 0 , width , size ) , radius = 4 , fill = fill )
2022-08-14 16:24:46 +00:00
else :
draw . rectangle ( ( 0 , 0 , width , size ) , fill = fill )
2023-01-19 11:57:25 +00:00
draw . text ( ( int ( width / 2 ) , int ( size / 2 ) ) , line , fill = " white " if ( fill == " black " and mode == " tramways " ) or ( fill != " white " and mode != " tramways " ) or ( fill == " white " and mode == " tramways " ) else " black " , anchor = " mm " , font = fnt_icon )
2022-08-14 16:24:46 +00:00
return image
2022-08-14 13:19:51 +00:00
class RATPWeatherModule :
2023-01-19 09:51:59 +00:00
def __init__ ( self , major_lines = [ " M7 " , " M5 " , " M14 " , " RB " , " T3A " ] ) :
self . major_lines = major_lines
2022-08-14 16:24:46 +00:00
2022-12-30 18:54:21 +00:00
def gen_alerts ( self , config ) :
2022-08-14 16:24:46 +00:00
alerts = [ ]
2022-12-30 18:54:21 +00:00
weather = IDFMAPI ( config ) . get_weather ( )
2023-01-19 11:04:08 +00:00
id_seens = [ ]
2022-08-14 16:24:46 +00:00
for mode in weather :
for line in weather [ mode ] :
if mode [ 0 ] . upper ( ) + line not in self . major_lines :
continue
def alert_icon ( mode , line ) :
def icon ( size = 64 ) :
2022-08-14 16:41:51 +00:00
image = Image . new ( ' RGB ' , ( size , size ) , ' #000 ' )
2022-08-14 16:24:46 +00:00
white = Image . new ( ' RGB ' , ( int ( size / 2 ) , int ( size / 2 ) ) , ' #fff ' )
mode_icon = Image . open ( " icons/ " + mode + " .png " ) . resize ( ( int ( size / 2 ) , int ( size / 2 ) ) )
image . paste ( white , ( - 5 , 0 ) , mode_icon )
line_icon = IDFMAPI . get_line_icon ( mode , line , int ( size / 2 ) , fill = " white " )
image . paste ( line_icon , ( int ( size / 2 ) - 5 , 0 ) , line_icon )
return image
return icon
for info in weather [ mode ] [ line ] :
2023-01-19 11:04:08 +00:00
if " InfoChannelRef " not in info or ( info [ " InfoChannelRef " ] [ " value " ] != " Perturbation " and info [ " InfoChannelRef " ] [ " value " ] != " Travaux " ) :
2022-08-14 16:24:46 +00:00
continue
2022-08-17 08:22:48 +00:00
if " Message " not in info [ " Content " ] :
continue
2022-08-14 16:24:46 +00:00
for msg in info [ " Content " ] [ " Message " ] :
2022-09-21 14:47:38 +00:00
if " MessageType " not in msg or msg [ " MessageType " ] != " SHORT_MESSAGE " :
2022-08-14 16:24:46 +00:00
continue
2023-01-19 11:04:08 +00:00
if " id " in msg [ " MessageText " ] :
if msg [ " MessageText " ] [ " id " ] in id_seens :
continue
id_seens . append ( msg [ " MessageText " ] [ " id " ] )
2022-08-14 16:24:46 +00:00
yield {
2023-01-19 11:04:08 +00:00
" title " : msg [ " MessageText " ] [ " title " ] if " title " in msg [ " MessageText " ] else " " ,
2022-08-14 16:51:03 +00:00
" description " : msg [ " MessageText " ] [ " value " ] . replace ( " " , " " ) ,
2022-08-14 16:24:46 +00:00
" icon " : alert_icon ( mode , line ) ,
}
2022-08-14 13:19:51 +00:00
def draw_module ( self , config , width , height , line_height = 19 ) :
image = Image . new ( ' RGB ' , ( width , height ) , ' #fff ' )
draw = ImageDraw . Draw ( image )
2022-12-30 18:54:21 +00:00
weather = IDFMAPI ( config ) . get_weather ( )
2022-08-14 13:19:51 +00:00
align_x = 0
align_y = 0
2022-08-14 16:24:46 +00:00
for mode in [ " metros " , " rers " , " tramways " , " buses " ] :
if mode != " rers " and mode != " buses " :
2022-08-14 13:19:51 +00:00
align_x = 0
# display mode icon
icon = Image . open ( " icons/ " + mode + " .png " ) . resize ( ( line_height , line_height ) )
image . paste ( icon , ( align_x , align_y ) , icon )
align_x + = line_height + 10
for line in weather [ mode ] :
if align_x + line_height > = width :
align_x = line_height + 10
align_y + = line_height + 6
2022-08-14 16:24:46 +00:00
states = [ ]
for info in weather [ mode ] [ line ] :
2022-09-21 14:47:38 +00:00
if " InfoChannelRef " in info :
states . append ( info [ " InfoChannelRef " ] [ " value " ] )
2022-08-14 16:24:46 +00:00
2023-01-19 12:47:21 +00:00
fill = " black " if " Perturbation " in states else ( " gray " if " Travaux " in states else " darkgray " )
2022-08-14 13:19:51 +00:00
2022-08-14 16:24:46 +00:00
icon = IDFMAPI . get_line_icon ( mode , line , line_height , fill = fill )
image . paste ( icon , ( align_x , align_y ) , icon )
2022-08-14 13:19:51 +00:00
2022-08-14 16:24:46 +00:00
align_x + = icon . width + 5
2022-08-14 13:19:51 +00:00
2022-08-14 16:24:46 +00:00
if mode != " metros " and mode != " tramways " :
2022-08-14 13:19:51 +00:00
align_y + = line_height + 10
else :
align_y + = 10
return image
2022-08-21 02:03:33 +00:00
class RATPNextStopModule :
def draw_module ( self , config , stops , width , height , line_height = 17 ) :
image = Image . new ( ' RGB ' , ( width , height ) , ' #fff ' )
draw = ImageDraw . Draw ( image )
2023-01-16 01:28:10 +00:00
alerts = [ ]
2022-08-21 02:03:33 +00:00
fnt_R = ImageFont . truetype ( IDFMAPI . fnt_R_path , line_height )
fnt_B = ImageFont . truetype ( IDFMAPI . fnt_RB_path , line_height )
align = 0
2022-12-30 18:54:21 +00:00
api = IDFMAPI ( config )
2022-08-21 02:03:33 +00:00
for stop in stops :
2023-01-16 01:27:23 +00:00
tmp = stop . split ( " / " , 2 )
2022-08-21 02:03:33 +00:00
mode = tmp [ 0 ] [ 0 ]
line = tmp [ 0 ] [ 1 : ]
2023-01-16 01:28:10 +00:00
try :
if 1 < len ( tmp ) < 4 :
prep = { }
for s in api . get_schedules ( mode , line , * tmp [ 1 : ] ) :
if s [ " destination " ] not in prep :
prep [ s [ " destination " ] ] = [ ]
prep [ s [ " destination " ] ] . append ( s )
icon = IDFMAPI . get_line_icon ( mode , line , int ( line_height * ( 1.5 if len ( prep . keys ( ) ) > 1 else 1 ) ) )
image . paste ( icon , ( 0 , align ) , icon )
max_dest = 64
for dest , msgs in prep . items ( ) :
if len ( msgs ) == 0 :
continue
2022-08-22 07:15:12 +00:00
2023-01-16 01:28:10 +00:00
align_x = line_height * 2
2022-08-21 02:03:33 +00:00
sz = fnt_B . getsize ( dest ) [ 0 ]
2023-01-16 01:28:10 +00:00
while sz > max_dest :
dest = dest [ : - 1 ]
sz = fnt_B . getsize ( dest ) [ 0 ]
2022-08-21 02:03:33 +00:00
draw . text (
( align_x , align ) ,
2023-01-16 01:28:10 +00:00
dest ,
font = fnt_B , anchor = " lt " , fill = " black "
2022-08-21 02:03:33 +00:00
)
2023-01-16 01:28:10 +00:00
align_x + = max_dest + int ( line_height / 2.5 )
2023-01-16 12:48:59 +00:00
for msg in [ ] + msgs :
2023-01-16 01:28:10 +00:00
draw . text (
( align_x , align ) ,
msg [ " message " ] ,
font = fnt_R , anchor = " lt " , fill = " black "
)
align_x + = fnt_R . getsize ( msg [ " message " ] ) [ 0 ] + int ( line_height / 2.5 )
align + = line_height
align + = int ( line_height * 0.33 )
except Exception as e :
alerts . append ( {
" title " : " Impossible de récupérer les horaires du " + mode + line + " pour " + tmp [ 1 ] ,
" description " : type ( e ) . __name__ + " : " + ( e . message if hasattr ( e , ' message ' ) else str ( e ) ) ,
" icon " : " wi-earthquake.png " ,
} )
2022-08-21 02:03:33 +00:00
2023-01-16 12:38:51 +00:00
align - = int ( line_height * 0.33 )
2022-08-21 02:03:33 +00:00
image = image . crop ( ( 0 , 0 , width , min ( align , height ) ) )
2023-01-16 01:28:10 +00:00
return image , alerts