2016-10-11 22:33:40 +00:00
|
|
|
from datetime import datetime
|
|
|
|
import os
|
|
|
|
import tempfile
|
|
|
|
import subprocess
|
|
|
|
import sys
|
|
|
|
|
|
|
|
from test import MailTest
|
|
|
|
import gpg_status_parser
|
|
|
|
|
|
|
|
|
|
|
|
def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""):
|
|
|
|
if len(gpg_status) != 1:
|
|
|
|
yield MailTest("Too much status, please fill a bug report.", 1, details=gpg_output)
|
|
|
|
return
|
|
|
|
|
|
|
|
ctx = gpg_status[0]
|
|
|
|
|
|
|
|
if gpg_rcode == 0 and "VALIDSIG" in ctx:
|
|
|
|
yield MailTest("Signed with key: 0x%s, on %s" % (ctx["VALIDSIG"].fingerprint_in_hex[24:], datetime.fromtimestamp(int(ctx["VALIDSIG"].sig_timestamp))))
|
|
|
|
if "EXPKEYSIG" in ctx:
|
|
|
|
yield MailTest("Your key has expired.", 1)
|
|
|
|
return
|
|
|
|
if "EXPSIG" in ctx:
|
|
|
|
yield MailTest("The signature has expired.", 1)
|
|
|
|
return
|
2017-10-22 09:29:51 +00:00
|
|
|
if "REVKEYSIG" in ctx:
|
|
|
|
yield MailTest("Your key is revoked.", 1)
|
|
|
|
return
|
2016-10-11 22:33:40 +00:00
|
|
|
if "TRUST_NEVER" in ctx:
|
|
|
|
yield MailTest("Your trust level is %s." % ctx["TRUST_NEVER"], 1)
|
|
|
|
return
|
|
|
|
if "GOODSIG" in ctx:
|
|
|
|
yield MailTest("Signature made by %s%s" % (ctx["GOODSIG"].username, (" [%s]" % ctx["TRUST"]) if "TRUST" in ctx else ""), -1)
|
|
|
|
if "TRUST_UNDEFINED" in ctx:
|
|
|
|
yield MailTest("Your trust level is %s. Consider asking other people to sign your key." % ctx["TRUST_UNDEFINED"], 2)
|
|
|
|
yield data, ctx["GOODSIG"].username
|
|
|
|
else:
|
|
|
|
yield MailTest("Bad signature. Here is the gnupg output:", 1, details=gpg_output.strip())
|
|
|
|
|
|
|
|
|
|
|
|
def check(cnt, GNUPG_DIRECTORY):
|
|
|
|
if len(cnt) == 2:
|
|
|
|
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
|
|
|
else:
|
|
|
|
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
|
|
|
|
|
|
|
|
|
|
|
def check_sep(data, sign, GNUPG_DIRECTORY):
|
|
|
|
gpg_output = ""
|
|
|
|
gpg_status = []
|
|
|
|
gpg_rcode = None
|
|
|
|
try:
|
|
|
|
f = tempfile.NamedTemporaryFile(delete=False)
|
|
|
|
f.write(sign)
|
|
|
|
f.close()
|
|
|
|
|
|
|
|
with subprocess.Popen(["gpg",
|
|
|
|
"--homedir=" + GNUPG_DIRECTORY,
|
|
|
|
"--status-fd=1",
|
|
|
|
"--auto-key-retrieve",
|
2017-10-22 09:30:13 +00:00
|
|
|
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
|
2016-10-11 22:33:40 +00:00
|
|
|
"--keyserver=pool.sks-keyservers.net",
|
|
|
|
"--quiet",
|
|
|
|
"--batch",
|
|
|
|
"--verify",
|
|
|
|
f.name,
|
|
|
|
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
|
|
if isinstance(data, bytes):
|
|
|
|
bdata = data
|
|
|
|
else:
|
|
|
|
bdata = data.as_bytes()
|
|
|
|
if not bdata.find(b'\r\n') >= 0:
|
|
|
|
bdata.replace(b'\n', b'\r\n') # Windows hack
|
|
|
|
p.stdin.write(bdata)
|
|
|
|
p.stdin.close()
|
|
|
|
|
|
|
|
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
|
|
|
p.wait()
|
|
|
|
gpg_output = p.stderr.read()
|
|
|
|
gpg_rcode = p.returncode
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
yield MailTest("An error occured: %s" % e, 1)
|
|
|
|
return
|
|
|
|
finally:
|
|
|
|
os.unlink(f.name)
|
|
|
|
|
2017-10-25 22:22:50 +00:00
|
|
|
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
|
2016-10-11 22:33:40 +00:00
|
|
|
|
|
|
|
|
|
|
|
def check_merged(bdata, GNUPG_DIRECTORY):
|
|
|
|
f = tempfile.NamedTemporaryFile()
|
|
|
|
f.close()
|
|
|
|
|
|
|
|
gpg_output = ""
|
|
|
|
gpg_status = []
|
|
|
|
gpg_rcode = None
|
|
|
|
try:
|
|
|
|
with subprocess.Popen(["gpg",
|
|
|
|
"--homedir=" + GNUPG_DIRECTORY,
|
|
|
|
"--status-fd=1",
|
|
|
|
"--auto-key-retrieve",
|
|
|
|
"--auto-key-locate=clear,local,pka,dane,cert,keyserver",
|
|
|
|
"--keyserver=pool.sks-keyservers.net",
|
|
|
|
"--quiet",
|
|
|
|
"--batch",
|
|
|
|
"--output",
|
|
|
|
f.name,
|
|
|
|
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
|
|
#if not bdata.find('\r\n') >= 0:
|
|
|
|
# bdata = bdata.replace('\n', '\r\n') # Windows hack
|
|
|
|
p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata)
|
|
|
|
p.stdin.close()
|
|
|
|
|
|
|
|
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
|
|
|
p.wait()
|
|
|
|
gpg_output = p.stderr.read()
|
|
|
|
gpg_rcode = p.returncode
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
yield MailTest("An error occured: %s" % e, 1)
|
|
|
|
return
|
|
|
|
|
|
|
|
if os.path.exists(f.name):
|
|
|
|
with open(f.name, 'rb') as fp:
|
|
|
|
bdata = fp.read()
|
|
|
|
os.unlink(f.name)
|
|
|
|
else:
|
|
|
|
bdata = None
|
|
|
|
|
2017-10-25 22:22:50 +00:00
|
|
|
yield from verify_sign(bdata, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
|