from datetime import datetime import os import tempfile import subprocess import sys from test import MailTest import gpg_status_parser def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""): if len(gpg_status) != 1: yield MailTest("Too much status, please fill a bug report.", 1, details=gpg_output) return ctx = gpg_status[0] if gpg_rcode == 0 and "VALIDSIG" in ctx: yield MailTest("Signed with key: 0x%s, on %s" % (ctx["VALIDSIG"].fingerprint_in_hex[24:], datetime.fromtimestamp(int(ctx["VALIDSIG"].sig_timestamp)))) if "EXPKEYSIG" in ctx: yield MailTest("Your key has expired.", 1) return if "EXPSIG" in ctx: yield MailTest("The signature has expired.", 1) return if "REVKEYSIG" in ctx: yield MailTest("Your key is revoked.", 1) return if "TRUST_NEVER" in ctx: yield MailTest("Your trust level is %s." % ctx["TRUST_NEVER"], 1) return if "GOODSIG" in ctx: yield MailTest("Signature made by %s%s" % (ctx["GOODSIG"].username, (" [%s]" % ctx["TRUST"]) if "TRUST" in ctx else ""), -1) if "TRUST_UNDEFINED" in ctx: yield MailTest("Your trust level is %s. Consider asking other people to sign your key." % ctx["TRUST_UNDEFINED"], 2) yield data, ctx["GOODSIG"].username else: yield MailTest("Bad signature. Here is the gnupg output:", 1, details=gpg_output.strip()) def check(cnt, GNUPG_DIRECTORY): for server in ["pool.sks-keyservers.net", "keys.openpgp.org"]: if len(cnt) == 2: yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server) else: yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server) def check_sign(cmd, bdata, fname, GNUPG_DIRECTORY, keyserver, windows_hack=False): with subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--status-fd=1", "--auto-key-retrieve", "--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver", "--keyserver=" + keyserver, "--quiet", "--batch", cmd, fname, "-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: p.stdin.write(bdata) p.stdin.close() gpg_status = [l for l in gpg_status_parser.parse(p.stdout)] p.wait() gpg_output = p.stderr.read() gpg_rcode = p.returncode if gpg_rcode != 0 and not windows_hack: try: return check_sign(cmd, bdata.replace(b'\n', b'\r\n'), fname, GNUPG_DIRECTORY, keyserver, True) # Windows hack except: pass return gpg_status, gpg_output, gpg_rcode def check_sep(data, sign, GNUPG_DIRECTORY, keyserver): gpg_output = "" gpg_status = [] gpg_rcode = None try: f = tempfile.NamedTemporaryFile(delete=False) f.write(sign) f.close() gpg_status, gpg_output, gpg_rcode = check_sign("--verify", data if isinstance(data, bytes) else data.as_bytes(), f.name, GNUPG_DIRECTORY, keyserver) except Exception as e: yield MailTest("An error occured: %s" % e, 1) return finally: os.unlink(f.name) yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace')) def check_merged(bdata, GNUPG_DIRECTORY, keyserver): f = tempfile.NamedTemporaryFile() f.close() gpg_output = "" gpg_status = [] gpg_rcode = None try: gpg_status, gpg_output, gpg_rcode = check_sign("--output", bdata, f.name, GNUPG_DIRECTORY, keyserver) except Exception as e: yield MailTest("An error occured: %s" % e, 1) return if os.path.exists(f.name): with open(f.name, 'rb') as fp: bdata = fp.read() os.unlink(f.name) else: bdata = None yield from verify_sign(bdata, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))