
286 lines
10 KiB

"""Postal tracking module"""
# PYTHON STUFF ############################################
import json
import urllib.parse
from bs4 import BeautifulSoup
import re
from nemubot.hooks import hook
from nemubot.exception import IMException
from import getURLContent, getJSON
from more import Response
# POSTAGE SERVICE PARSERS ############################################
def get_tnt_info(track_id):
values = []
data = getURLContent('' % track_id)
soup = BeautifulSoup(data)
status_list = soup.find('div', class_='result__content')
if not status_list:
return None
last_status = status_list.find('div', class_='roster')
if last_status:
for info in last_status.find_all('div', class_='roster__item'):
if len(values) == 3:
return (values[0], values[1], values[2])
def get_colissimo_info(colissimo_id):
colissimo_data = getURLContent("" % colissimo_id)
soup = BeautifulSoup(colissimo_data)
dataArray = soup.find(class_='dataArray')
if dataArray and dataArray.tbody and
date ="Date").get_text()
libelle = re.sub(r'[\n\t\r]', '',"Libelle").get_text())
site ="site").get_text().strip()
return (date, libelle, site.strip())
def get_chronopost_info(track_id):
data = urllib.parse.urlencode({'listeNumeros': track_id})
track_baseurl = ""
track_data = getURLContent(track_baseurl, data.encode('utf-8'))
soup = BeautifulSoup(track_data)
infoClass = soup.find(class_='numeroColi2')
if infoClass and infoClass.get_text():
info = infoClass.get_text().split("\n")
if len(info) >= 1:
info = info[1].strip().split("\"")
if len(info) >= 2:
date = info[2]
libelle = info[1]
return (date, libelle)
def get_colisprive_info(track_id):
data = urllib.parse.urlencode({'numColis': track_id})
track_baseurl = ""
track_data = getURLContent(track_baseurl, data.encode('utf-8'))
soup = BeautifulSoup(track_data)
dataArray = soup.find(class_='BandeauInfoColis')
if (dataArray and dataArray.find(class_='divStatut')
and dataArray.find(class_='divStatut').find(class_='tdText')):
status = dataArray.find(class_='divStatut') \
return status
def get_laposte_info(laposte_id):
data = urllib.parse.urlencode({'id': laposte_id})
laposte_baseurl = ""
laposte_data = getURLContent(laposte_baseurl, data.encode('utf-8'))
soup = BeautifulSoup(laposte_data)
search_res = soup.find(class_='resultat_rech_simple_table')
if (soup.find(class_='resultat_rech_simple_table').thead
and soup.find(class_='resultat_rech_simple_table')
and len(search_res.find_all('td')) > 3):
field = search_res.find('td')
poste_id = field.get_text()
field = field.find_next('td')
poste_type = field.get_text()
field = field.find_next('td')
poste_date = field.get_text()
field = field.find_next('td')
poste_location = field.get_text()
field = field.find_next('td')
poste_status = field.get_text()
return (poste_type.lower(), poste_id.strip(), poste_status.lower(),
poste_location, poste_date)
def get_postnl_info(postnl_id):
data = urllib.parse.urlencode({'barcodes': postnl_id})
postnl_baseurl = ""
postnl_data = getURLContent(postnl_baseurl, data.encode('utf-8'))
soup = BeautifulSoup(postnl_data)
if (soup.find(id='datatables')
and soup.find(id='datatables').tbody
and soup.find(id='datatables')
search_res = soup.find(id='datatables')
if len(search_res.find_all('td')) >= 3:
field = field.find_next('td')
post_date = field.get_text()
field = field.find_next('td')
post_status = field.get_text()
field = field.find_next('td')
post_destination = field.get_text()
return (post_status.lower(), post_destination, post_date)
def get_fedex_info(fedex_id, lang="en_US"):
data = urllib.parse.urlencode({
'data': json.dumps({
"TrackPackagesRequest": {
"appType": "WTRK",
"appDeviceType": "DESKTOP",
"uniqueKey": "",
"processingParameters": {},
"trackingInfoList": [
"trackNumberInfo": {
"trackingNumber": str(fedex_id),
"trackingQualifier": "",
"trackingCarrier": ""
'action': "trackpackages",
'locale': lang,
'version': 1,
'format': "json"
fedex_baseurl = ""
fedex_data = getJSON(fedex_baseurl, data.encode('utf-8'))
if ("TrackPackagesResponse" in fedex_data and
"packageList" in fedex_data["TrackPackagesResponse"] and
len(fedex_data["TrackPackagesResponse"]["packageList"]) and
not fedex_data["TrackPackagesResponse"]["errorList"][0]["code"] and
not fedex_data["TrackPackagesResponse"]["packageList"][0]["errorList"][0]["code"]
return fedex_data["TrackPackagesResponse"]["packageList"][0]
# TRACKING HANDLERS ###################################################
def handle_tnt(tracknum):
info = get_tnt_info(tracknum)
if info:
status, date, place = info
placestr = ''
if place:
placestr = ' à \x02{place}\x0f'
return ('Le colis \x02{trackid}\x0f a actuellement le status: '
'\x02{status}\x0F mis à jour le \x02{date}\x0f{place}.'
.format(trackid=tracknum, status=status,
date=re.sub(r'\s+', ' ', date), place=placestr))
def handle_laposte(tracknum):
info = get_laposte_info(tracknum)
if info:
poste_type, poste_id, poste_status, poste_location, poste_date = info
return ("Le courrier de type \x02%s\x0F : \x02%s\x0F est actuellement "
"\x02%s\x0F dans la zone \x02%s\x0F (Mis à jour le \x02%s\x0F"
")." % (poste_type, poste_id, poste_status,
poste_location, poste_date))
def handle_postnl(tracknum):
info = get_postnl_info(tracknum)
if info:
post_status, post_destination, post_date = info
return ("PostNL \x02%s\x0F est actuellement "
"\x02%s\x0F vers le pays \x02%s\x0F (Mis à jour le \x02%s\x0F"
")." % (tracknum, post_status, post_destination, post_date))
def handle_colissimo(tracknum):
info = get_colissimo_info(tracknum)
if info:
date, libelle, site = info
return ("Colissimo: \x02%s\x0F : \x02%s\x0F Dernière mise à jour le "
"\x02%s\x0F au site \x02%s\x0F."
% (tracknum, libelle, date, site))
def handle_chronopost(tracknum):
info = get_chronopost_info(tracknum)
if info:
date, libelle = info
return ("Colis Chronopost: \x02%s\x0F : \x02%s\x0F. Dernière mise à "
"jour \x02%s\x0F." % (tracknum, libelle, date))
def handle_coliprive(tracknum):
info = get_colisprive_info(tracknum)
if info:
return ("Colis Privé: \x02%s\x0F : \x02%s\x0F." % (tracknum, info))
def handle_fedex(tracknum):
info = get_fedex_info(tracknum)
if info:
if info["displayActDeliveryDateTime"] != "":
return ("{trackingCarrierDesc}: \x02{statusWithDetails}\x0F: in \x02{statusLocationCity}, {statusLocationCntryCD}\x0F, delivered on: {displayActDeliveryDateTime}.".format(**info))
elif info["statusLocationCity"] != "":
return ("{trackingCarrierDesc}: \x02{statusWithDetails}\x0F: estimated delivery: {displayEstDeliveryDateTime}.".format(**info))
return ("{trackingCarrierDesc}: \x02{statusWithDetails}\x0F: in \x02{statusLocationCity}, {statusLocationCntryCD}\x0F, estimated delivery: {displayEstDeliveryDateTime}.".format(**info))
'laposte': handle_laposte,
'postnl': handle_postnl,
'colissimo': handle_colissimo,
'chronopost': handle_chronopost,
'coliprive': handle_coliprive,
'tnt': handle_tnt,
'fedex': handle_fedex,
# HOOKS ##############################################################
help="Track postage delivery",
"TRACKING_ID [...]": "Track the specified postage IDs on various tracking services."
"tracker=TRK": "Precise the tracker (default: all) among: " + ', '.join(TRACKING_HANDLERS)
def get_tracking_info(msg):
if not len(msg.args):
raise IMException("Renseignez un identifiant d'envoi.")
res = Response(, count=" (%d suivis supplémentaires)")
if 'tracker' in msg.kwargs:
if msg.kwargs['tracker'] in TRACKING_HANDLERS:
trackers = {
msg.kwargs['tracker']: TRACKING_HANDLERS[msg.kwargs['tracker']]
raise IMException("No tracker named \x02{tracker}\x0F, please use"
" one of the following: \x02{trackers}\x0F"
trackers=', '
for tracknum in msg.args:
for name, tracker in trackers.items():
ret = tracker(tracknum)
if ret:
if not ret:
res.append_message("L'identifiant \x02{id}\x0F semble incorrect,"
" merci de vérifier son exactitude."
return res