Refactor modules that used nemubot XML parser due to previous commit

This commit is contained in:
nemunaire 2015-10-25 18:50:18 +01:00
parent 2b96c32063
commit 59ea2e971b
4 changed files with 108 additions and 94 deletions

View File

@ -28,8 +28,8 @@ def get_book(title):
"""Retrieve a book from its title""" """Retrieve a book from its title"""
response = web.getXML("https://www.goodreads.com/book/title.xml?key=%s&title=%s" % response = web.getXML("https://www.goodreads.com/book/title.xml?key=%s&title=%s" %
(context.config["goodreadskey"], urllib.parse.quote(title))) (context.config["goodreadskey"], urllib.parse.quote(title)))
if response is not None and response.hasNode("book"): if response is not None and len(response.getElementsByTagName("book")):
return response.getNode("book") return response.getElementsByTagName("book")[0]
else: else:
return None return None
@ -38,8 +38,8 @@ def search_books(title):
"""Get a list of book matching given title""" """Get a list of book matching given title"""
response = web.getXML("https://www.goodreads.com/search.xml?key=%s&q=%s" % response = web.getXML("https://www.goodreads.com/search.xml?key=%s&q=%s" %
(context.config["goodreadskey"], urllib.parse.quote(title))) (context.config["goodreadskey"], urllib.parse.quote(title)))
if response is not None and response.hasNode("search"): if response is not None and len(response.getElementsByTagName("search")):
return response.getNode("search").getNode("results").getNodes("work") return response.getElementsByTagName("search")[0].getElementsByTagName("results")[0].getElementsByTagName("work")
else: else:
return [] return []
@ -48,11 +48,11 @@ def search_author(name):
"""Looking for an author""" """Looking for an author"""
response = web.getXML("https://www.goodreads.com/api/author_url/%s?key=%s" % response = web.getXML("https://www.goodreads.com/api/author_url/%s?key=%s" %
(urllib.parse.quote(name), context.config["goodreadskey"])) (urllib.parse.quote(name), context.config["goodreadskey"]))
if response is not None and response.hasNode("author") and response.getNode("author").hasAttribute("id"): if response is not None and len(response.getElementsByTagName("author")) and response.getElementsByTagName("author")[0].hasAttribute("id"):
response = web.getXML("https://www.goodreads.com/author/show/%s.xml?key=%s" % response = web.getXML("https://www.goodreads.com/author/show/%s.xml?key=%s" %
(urllib.parse.quote(response.getNode("author")["id"]), context.config["goodreadskey"])) (urllib.parse.quote(response.getElementsByTagName("author")[0].getAttribute("id")), context.config["goodreadskey"]))
if response is not None and response.hasNode("author"): if response is not None and len(response.getElementsByTagName("author")):
return response.getNode("author") return response.getElementsByTagName("author")[0]
return None return None
@ -71,9 +71,9 @@ def cmd_book(msg):
if book is None: if book is None:
raise IRCException("unable to find book named like this") raise IRCException("unable to find book named like this")
res = Response(channel=msg.channel) res = Response(channel=msg.channel)
res.append_message("%s, writed by %s: %s" % (book.getNode("title").getContent(), res.append_message("%s, written by %s: %s" % (book.getElementsByTagName("title")[0].firstChild.nodeValue,
book.getNode("authors").getNode("author").getNode("name").getContent(), book.getElementsByTagName("author")[0].getElementsByTagName("name")[0].firstChild.nodeValue,
web.striphtml(book.getNode("description").getContent()))) web.striphtml(book.getElementsByTagName("description")[0].firstChild.nodeValue if book.getElementsByTagName("description")[0].firstChild else "")))
return res return res
@ -92,8 +92,8 @@ def cmd_books(msg):
count=" (%d more books)") count=" (%d more books)")
for book in search_books(title): for book in search_books(title):
res.append_message("%s, writed by %s" % (book.getNode("best_book").getNode("title").getContent(), res.append_message("%s, writed by %s" % (book.getElementsByTagName("best_book")[0].getElementsByTagName("title")[0].firstChild.nodeValue,
book.getNode("best_book").getNode("author").getNode("name").getContent())) book.getElementsByTagName("best_book")[0].getElementsByTagName("author")[0].getElementsByTagName("name")[0].firstChild.nodeValue))
return res return res
@ -106,7 +106,10 @@ def cmd_author(msg):
if not len(msg.args): if not len(msg.args):
raise IRCException("please give me an author to search") raise IRCException("please give me an author to search")
ath = search_author(" ".join(msg.args)) name = " ".join(msg.args)
return Response([b.getNode("title").getContent() for b in ath.getNode("books").getNodes("book")], ath = search_author(name)
if ath is None:
raise IRCException("%s does not appear to be a published author." % name)
return Response([b.getElementsByTagName("title")[0].firstChild.nodeValue for b in ath.getElementsByTagName("book")],
channel=msg.channel, channel=msg.channel,
title=ath.getNode("name").getContent()) title=ath.getElementsByTagName("name")[0].firstChild.nodeValue)

View File

@ -61,17 +61,17 @@ def get_unwikitextified(site, wikitext, ssl=False):
def opensearch(site, term, ssl=False): def opensearch(site, term, ssl=False):
# Built URL # Built URL
url = "http%s://%s/w/api.php?format=xml&action=opensearch&search=%s" % ( url = "http%s://%s/w/api.php?format=json&action=opensearch&search=%s" % (
"s" if ssl else "", site, urllib.parse.quote(term)) "s" if ssl else "", site, urllib.parse.quote(term))
# Make the request # Make the request
response = web.getXML(url) response = web.getJSON(url)
if response is not None and response.hasNode("Section"): if response is not None and len(response) >= 4:
for itm in response.getNode("Section").getNodes("Item"): for k in range(len(response[1])):
yield (itm.getNode("Text").getContent(), yield (response[1][k],
itm.getNode("Description").getContent() if itm.hasNode("Description") else "", response[2][k],
itm.getNode("Url").getContent()) response[3][k])
def search(site, term, ssl=False): def search(site, term, ssl=False):
@ -167,6 +167,7 @@ def mediawiki_response(site, term, receivers):
# Try looking at opensearch # Try looking at opensearch
os = [x for x, _, _ in opensearch(site, terms[0])] os = [x for x, _, _ in opensearch(site, terms[0])]
print(os)
# Fallback to global search # Fallback to global search
if not len(os): if not len(os):
os = [x for x, _ in search(site, terms[0]) if x is not None and x != ""] os = [x for x, _ in search(site, terms[0]) if x is not None and x != ""]

View File

@ -1,7 +1,7 @@
# coding=utf-8
"""Gets information about velib stations""" """Gets information about velib stations"""
# PYTHON STUFFS #######################################################
import re import re
from nemubot import context from nemubot import context
@ -9,11 +9,11 @@ from nemubot.exception import IRCException
from nemubot.hooks import hook from nemubot.hooks import hook
from nemubot.tools import web from nemubot.tools import web
nemubotversion = 4.0
from more import Response from more import Response
# LOADING #############################################################
URL_API = None # http://www.velib.paris.fr/service/stationdetails/paris/%s URL_API = None # http://www.velib.paris.fr/service/stationdetails/paris/%s
def load(context): def load(context):
@ -29,25 +29,14 @@ def load(context):
# context.add_event(evt) # context.add_event(evt)
def help_full(): # MODULE CORE #########################################################
return ("!velib /number/ ...: gives available bikes and slots at "
"the station /number/.")
def station_status(station): def station_status(station):
"""Gets available and free status of a given station""" """Gets available and free status of a given station"""
response = web.getXML(URL_API % station) response = web.getXML(URL_API % station)
if response is not None: if response is not None:
available = response.getNode("available").getContent() available = int(response.getElementsByTagName("available")[0].firstChild.nodeValue)
if available is not None and len(available) > 0: free = int(response.getElementsByTagName("free")[0].firstChild.nodeValue)
available = int(available)
else:
available = 0
free = response.getNode("free").getContent()
if free is not None and len(free) > 0:
free = int(free)
else:
free = 0
return (available, free) return (available, free)
else: else:
return (None, None) return (None, None)
@ -69,27 +58,30 @@ def print_station_status(msg, station):
"""Send message with information about the given station""" """Send message with information about the given station"""
(available, free) = station_status(station) (available, free) = station_status(station)
if available is not None and free is not None: if available is not None and free is not None:
return Response("à la station %s : %d vélib et %d points d'attache" return Response("À la station %s : %d vélib et %d points d'attache"
" disponibles." % (station, available, free), " disponibles." % (station, available, free),
channel=msg.channel, nick=msg.nick) channel=msg.channel)
raise IRCException("station %s inconnue." % station) raise IRCException("station %s inconnue." % station)
@hook("cmd_hook", "velib") # MODULE INTERFACE ####################################################
@hook("cmd_hook", "velib",
help="gives available bikes and slots at the given station",
help_usage={
"STATION_ID": "gives available bikes and slots at the station STATION_ID"
})
def ask_stations(msg): def ask_stations(msg):
"""Hook entry from !velib"""
if len(msg.args) > 4: if len(msg.args) > 4:
raise IRCException("demande-moi moins de stations à la fois.") raise IRCException("demande-moi moins de stations à la fois.")
elif not len(msg.args):
elif len(msg.args):
for station in msg.args:
if re.match("^[0-9]{4,5}$", station):
return print_station_status(msg, station)
elif station in context.data.index:
return print_station_status(msg,
context.data.index[station]["number"])
else:
raise IRCException("numéro de station invalide.")
else:
raise IRCException("pour quelle station ?") raise IRCException("pour quelle station ?")
for station in msg.args:
if re.match("^[0-9]{4,5}$", station):
return print_station_status(msg, station)
elif station in context.data.index:
return print_station_status(msg,
context.data.index[station]["number"])
else:
raise IRCException("numéro de station invalide.")

View File

@ -1,16 +1,20 @@
# coding=utf-8 """Performing search and calculation"""
# PYTHON STUFFS #######################################################
from urllib.parse import quote from urllib.parse import quote
import re
from nemubot import context from nemubot import context
from nemubot.exception import IRCException from nemubot.exception import IRCException
from nemubot.hooks import hook from nemubot.hooks import hook
from nemubot.tools import web from nemubot.tools import web
nemubotversion = 4.0
from more import Response from more import Response
# LOADING #############################################################
URL_API = "http://api.wolframalpha.com/v2/query?input=%%s&appid=%s" URL_API = "http://api.wolframalpha.com/v2/query?input=%%s&appid=%s"
def load(context): def load(context):
@ -24,76 +28,90 @@ def load(context):
URL_API = URL_API % quote(context.config["apikey"]).replace("%", "%%") URL_API = URL_API % quote(context.config["apikey"]).replace("%", "%%")
class WFASearch: # MODULE CORE #########################################################
class WFAResults:
def __init__(self, terms): def __init__(self, terms):
self.terms = terms
self.wfares = web.getXML(URL_API % quote(terms)) self.wfares = web.getXML(URL_API % quote(terms))
@property @property
def success(self): def success(self):
try: try:
return self.wfares["success"] == "true" return self.wfares.documentElement.hasAttribute("success") and self.wfares.documentElement.getAttribute("success") == "true"
except: except:
return False return False
@property @property
def error(self): def error(self):
if self.wfares is None: if self.wfares is None:
return "An error occurs during computation." return "An error occurs during computation."
elif self.wfares["error"] == "true": elif self.wfares.documentElement.hasAttribute("error") and self.wfares.documentElement.getAttribute("error") == "true":
return ("An error occurs during computation: " + return ("An error occurs during computation: " +
self.wfares.getNode("error").getNode("msg").getContent()) self.wfares.getElementsByTagName("error")[0].getElementsByTagName("msg")[0].firstChild.nodeValue)
elif self.wfares.hasNode("didyoumeans"): elif len(self.wfares.getElementsByTagName("didyoumeans")):
start = "Did you mean: " start = "Did you mean: "
tag = "didyoumean" tag = "didyoumean"
end = "?" end = "?"
elif self.wfares.hasNode("tips"): elif len(self.wfares.getElementsByTagName("tips")):
start = "Tips: " start = "Tips: "
tag = "tip" tag = "tip"
end = "" end = ""
elif self.wfares.hasNode("relatedexamples"): elif len(self.wfares.getElementsByTagName("relatedexamples")):
start = "Related examples: " start = "Related examples: "
tag = "relatedexample" tag = "relatedexample"
end = "" end = ""
elif self.wfares.hasNode("futuretopic"): elif len(self.wfares.getElementsByTagName("futuretopic")):
return self.wfares.getNode("futuretopic")["msg"] return self.wfares.getElementsByTagName("futuretopic")[0].getAttribute("msg")
else: else:
return "An error occurs during computation" return "An error occurs during computation"
proposal = list() proposal = list()
for dym in self.wfares.getNode(tag + "s").getNodes(tag): for dym in self.wfares.getElementsByTagName(tag):
if tag == "tip": if tag == "tip":
proposal.append(dym["text"]) proposal.append(dym.getAttribute("text"))
elif tag == "relatedexample": elif tag == "relatedexample":
proposal.append(dym["desc"]) proposal.append(dym.getAttribute("desc"))
else: else:
proposal.append(dym.getContent()) proposal.append(dym.firstChild.nodeValue)
return start + ', '.join(proposal) + end return start + ', '.join(proposal) + end
@property @property
def nextRes(self): def results(self):
try: for node in self.wfares.getElementsByTagName("pod"):
for node in self.wfares.getNodes("pod"): for subnode in node.getElementsByTagName("subpod"):
for subnode in node.getNodes("subpod"): if subnode.getElementsByTagName("plaintext")[0].firstChild:
if subnode.getFirstNode("plaintext").getContent() != "": yield (node.getAttribute("title") +
yield (node["title"] + " " + subnode["title"] + ": " + ((" / " + subnode.getAttribute("title")) if subnode.getAttribute("title") else "") + ": " +
subnode.getFirstNode("plaintext").getContent()) "; ".join(subnode.getElementsByTagName("plaintext")[0].firstChild.nodeValue.split("\n")))
except IndexError:
pass
@hook("cmd_hook", "calculate") # MODULE INTERFACE ####################################################
@hook("cmd_hook", "calculate",
help="Perform search and calculation using WolframAlpha",
help_usage={
"TERM": "Look at the given term on WolframAlpha",
"CALCUL": "Perform the computation over WolframAlpha service",
})
def calculate(msg): def calculate(msg):
if not len(msg.args): if not len(msg.args):
raise IRCException("Indicate a calcul to compute") raise IRCException("Indicate a calcul to compute")
s = WFASearch(' '.join(msg.args)) s = WFAResults(' '.join(msg.args))
if s.success: if not s.success:
res = Response(channel=msg.channel, nomore="No more results") raise IRCException(s.error)
for result in s.nextRes:
res.append_message(result) res = Response(channel=msg.channel, nomore="No more results")
if (len(res.messages) > 0):
res.messages.pop(0) for result in s.results:
return res res.append_message(re.sub(r' +', ' ', result))
else: if len(res.messages):
return Response(s.error, msg.channel) res.messages.pop(0)
return res