nemubot/tools/web.py

199 lines
6.2 KiB
Python
Raw Normal View History

2012-10-19 16:41:17 +00:00
# coding=utf-8
# Nemubot is a modulable IRC bot, built around XML configuration files.
# Copyright (C) 2012 Mercier Pierre-Olivier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import http.client
2012-11-06 03:26:38 +00:00
import json
2012-10-19 16:41:17 +00:00
import re
2012-11-04 15:26:20 +00:00
import socket
2012-10-19 16:41:17 +00:00
from urllib.parse import quote
2012-11-04 15:26:20 +00:00
import xmlparser
# Parse URL
2012-10-19 16:41:17 +00:00
def parseURL(url):
"""Separate protocol, domain, port and page request"""
2012-11-06 03:14:11 +00:00
res = re.match("^([a-zA-Z0-9+.-]+):(//)?(.*)$", url)
2012-10-19 16:41:17 +00:00
if res is not None:
2012-11-06 03:14:11 +00:00
scheme = res.group(1)
if scheme == "https":
port = 443
elif scheme == "http":
port = 80
elif scheme == "ftp":
port = 21
elif scheme == "telnet":
port = 23
2012-11-04 15:26:20 +00:00
else:
port = 0
2012-11-06 03:14:11 +00:00
if res.group(2) == "//":
# Waiting Common Internet Scheme Syntax
ciss = re.match("^(([^:]+)(:([^@]+))@)?([^:/]+)(:([0-9]+))?(/.*)$", res.group(3))
user = ciss.group(2)
password = ciss.group(4)
host = ciss.group(5)
if ciss.group(7) is not None and ciss.group(7) != "" and int(ciss.group(7)) > 0:
port = int(ciss.group(7))
path = ciss.group(8)
return (scheme, (user, password), host, port, path)
else:
return (scheme, (None, None), None, None, res.group(3))
2012-10-19 16:41:17 +00:00
else:
2012-11-06 03:14:11 +00:00
return (None, (None, None), None, None, None)
2012-10-19 16:41:17 +00:00
2012-11-06 03:14:11 +00:00
def isURL(url):
"""Return True if the URL can be parsed"""
(scheme, (user, password), host, port, path) = parseURL(url)
return scheme is not None
2012-10-19 16:41:17 +00:00
2012-11-06 03:14:11 +00:00
def getScheme(url):
2012-11-04 15:26:20 +00:00
"""Return the protocol of a given URL"""
2012-11-06 03:14:11 +00:00
(scheme, (user, password), host, port, path) = parseURL(url)
return scheme
def getHost(url):
"""Return the domain of a given URL"""
(scheme, (user, password), host, port, path) = parseURL(url)
return host
2012-10-19 16:41:17 +00:00
2012-11-04 15:26:20 +00:00
def getPort(url):
"""Return the port of a given URL"""
2012-11-06 03:14:11 +00:00
(scheme, (user, password), host, port, path) = parseURL(url)
2012-11-04 15:26:20 +00:00
return port
2012-11-06 03:14:11 +00:00
def getPath(url):
"""Return the page request of a given URL"""
(scheme, (user, password), host, port, path) = parseURL(url)
return path
def getUser(url):
"""Return the page request of a given URL"""
(scheme, (user, password), host, port, path) = parseURL(url)
return user
def getPassword(url):
2012-11-04 15:26:20 +00:00
"""Return the page request of a given URL"""
2012-11-06 03:14:11 +00:00
(scheme, (user, password), host, port, path) = parseURL(url)
return password
2012-11-04 15:26:20 +00:00
# Get real pages
def getURLContent(url, timeout=15):
2012-10-19 16:41:17 +00:00
"""Return page content corresponding to URL or None if any error occurs"""
2012-11-06 03:14:11 +00:00
(scheme, (user, password), host, port, path) = parseURL(url)
if port is None:
return None
conn = http.client.HTTPConnection(host, port=port, timeout=timeout)
2012-10-19 16:41:17 +00:00
try:
2012-11-06 03:14:11 +00:00
conn.request("GET", path, None, {"User-agent": "Nemubot v3"})
except socket.timeout:
return None
2012-10-19 16:41:17 +00:00
except socket.gaierror:
2012-11-04 15:26:20 +00:00
print ("<tools.web> Unable to receive page %s from %s on %d."
2012-11-06 03:14:11 +00:00
% (path, host, port))
2012-11-04 15:26:20 +00:00
return None
2012-10-19 16:41:17 +00:00
2012-11-06 03:14:11 +00:00
try:
res = conn.getresponse()
data = res.read()
except http.client.BadStatusLine:
return None
finally:
conn.close()
2012-10-19 16:41:17 +00:00
2012-11-04 15:26:20 +00:00
if res.status == http.client.OK or res.status == http.client.SEE_OTHER:
return data
2012-11-06 03:14:11 +00:00
elif res.status == http.client.FOUND or res.status == http.client.MOVED_PERMANENTLY:
return getURLContent(res.getheader("Location"), timeout)
2012-11-04 15:26:20 +00:00
else:
return None
def getXML(url, timeout=15):
"""Get content page and return XML parsed content"""
cnt = getURLContent(url, timeout)
if cnt is None:
return None
else:
return xmlparser.parse_string(cnt)
2012-11-06 03:26:38 +00:00
def getJSON(url, timeout=15):
"""Get content page and return JSON content"""
cnt = getURLContent(url, timeout)
if cnt is None:
return None
else:
return json.loads(cnt.decode())
2012-11-06 03:14:11 +00:00
def traceURL(url, timeout=5, stack=None):
"""Follow redirections and return the redirections stack"""
if stack is None:
stack = list()
stack.append(url)
(scheme, (user, password), host, port, path) = parseURL(url)
if port is None or port == 0:
return stack
conn = http.client.HTTPConnection(host, port=port, timeout=timeout)
try:
conn.request("HEAD", path, None, {"User-agent": "Nemubot v3"})
except socket.timeout:
stack.append("Timeout")
return stack
except socket.gaierror:
print ("<tools.web> Unable to receive page %s from %s on %d."
% (path, host, port))
return None
try:
res = conn.getresponse()
except http.client.BadStatusLine:
return None
finally:
conn.close()
if res.status == http.client.OK:
return stack
elif res.status == http.client.FOUND or res.status == http.client.MOVED_PERMANENTLY or res.status == http.client.SEE_OTHER:
url = res.getheader("Location")
if url in stack:
stack.append(url)
return stack
else:
return traceURL(url, timeout, stack)
else:
return None
2012-11-04 15:26:20 +00:00
# Other utils
def striphtml(data):
"""Remove HTML tags from text"""
p = re.compile(r'<.*?>')
return p.sub('', data).replace("&#x28;", "/(").replace("&#x29;", ")/").replace("&#x22;", "\"")
# Tests when called alone
2012-10-19 16:41:17 +00:00
if __name__ == "__main__":
2012-11-06 03:14:11 +00:00
print(parseURL("http://www.nemunai.re/"))
print(parseURL("http://www.nemunai.re/?p0m"))
2012-11-04 15:26:20 +00:00
print(parseURL("http://www.nemunai.re/?p0m"))
print(parseURL("http://www.nemunai.re:42/?p0m"))
2012-11-06 03:14:11 +00:00
print(parseURL("ftp://www.nemunai.re:42/?p0m"))
2012-11-04 15:26:20 +00:00
print(parseURL("http://www.nemunai.re/?p0m"))
2012-11-06 03:14:11 +00:00
print(parseURL("magnet:ceciestunmagnet!"))