diff --git a/archive.py b/archive.py new file mode 100644 index 0000000..7f2bd68 --- /dev/null +++ b/archive.py @@ -0,0 +1,137 @@ +from datetime import datetime +from email.message import Message +import hashlib +import os +import shutil +import subprocess +import tempfile + +from test import MailTest + + +def find(cnt): + data, login = cnt + def found(data): + mime = data.get_content_type() + if mime == "application/octet-stream": + mime = _guess_mime(data.get_payload(decode=True)) + if mime == "application/x-xz": + fname = data.get_filename(login + ".tar.xz") + elif mime == "application/x-bzip" or mime == "application/x-bzip2": + fname = data.get_filename(login + ".tar.bz2") + elif mime == "application/x-gzip" or mime == "application/gzip": + fname = data.get_filename(login + ".tar.gz") + elif mime == "application/zip": + fname = data.get_filename(login + ".zip") + elif mime == "application/rar": + fname = data.get_filename(login + ".rar") + elif mime == "application/x-tar" or mime == "application/tar": + fname = data.get_filename(login + ".tar") + else: + fname = None + return data.get_payload(decode=True), fname + + if isinstance(data, Message): + if not data.is_multipart(): + data, fname = found(data) + yield MailTest("Tarball found: %s." % fname, -1) + else: + for part in data.walk(): + data, fname = found(part) + if fname is not None: + yield MailTest("Tarball found: %s." % fname, -1) + break + yield (data, login) + + +def hash_archive(cnt, dest=None): + data, login = cnt + sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest() + yield MailTest("Your tarball SHA-1 is %s." % sha, -1) + if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)): + yield MailTest("You have already uploaded this tarball.", 1) + yield False + else: + yield (data, sha, login) + + +def _guess_mime(data): + with subprocess.Popen(["file", + "--brief", + "--mime-type", + "-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as p: + p.stdin.write(data.encode() if isinstance(data, str) else data) + p.stdin.close() + p.wait() + if p.returncode == 0: + return p.stdout.read().decode().strip() + + +def guess_mime(cnt): + data, sha, login = cnt + + mime = _guess_mime(data) + if mime is not None: + yield MailTest("Guessed content-type of your submission: %s." % mime, + 0 if mime.find("application/x-") == 0 else 2) + else: + mime = "application/x-tar" + yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime) + + yield data, mime, sha, login + + +def extract(cnt, dest=None): + data, type, sha, login = cnt + + if dest is not None: + odest = dest + os.makedirs(odest, exist_ok=True) + ldest = os.path.join(dest, login) + dest = os.path.join(dest, login + "." + sha) + if os.path.exists(dest): + yield MailTest("You have already uploaded this tarball.", 1) + return + last_dest = os.readlink(ldest) if os.path.exists(ldest) else None + + try: + with tempfile.TemporaryDirectory() as temp: + with subprocess.Popen(["tar", "--no-same-owner", "--no-same-permissions", + ("-xvC" + temp) if dest is not None else "-t", + "-z" if type == "application/x-gzip" else "", + "-J" if type == "application/x-xz" else "", + "-j" if type in ["application/x-bzip", "application/x-bzip2"] else "", + "-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + p.stdin.write(data.encode() if isinstance(data, str) else data) + p.stdin.close() + p.wait() + err = p.stdout.read().decode() + err += p.stderr.read().decode() + if p.returncode == 0: + if dest is not None: + nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1 + if nsub > 1: + yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1) + else: + yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1) + if dest != ldest: + if os.path.lexists(ldest): + os.remove(ldest) + os.symlink(os.path.basename(dest), ldest) + if len(os.listdir(temp)) == 1: + shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest) + else: + shutil.move(temp, dest) + yield MailTest("Archive successfully extracted.", details=err) + yield dest + else: + yield MailTest("An error occured during archive extraction:", 1, details=err) + + if not os.path.exists(temp): + os.makedirs(temp) + except PermissionError: + if os.path.lexists(ldest): + os.remove(ldest) + if last_dest is not None: + os.symlink(last_dest, ldest) + yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1) diff --git a/check.py b/check.py new file mode 100755 index 0000000..fb21b2c --- /dev/null +++ b/check.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 + +from datetime import datetime +import email +import os +import sys +import stat +import subprocess + +GNUPG_DIRECTORY = "~/.gnupg-checker" +SOFT_MAX_SUBMISSION = None +HARD_MAX_SUBMISSION = None +FROM = "Automatic VIRLI Mail Checker " +SEND_TO_REALUSER = False +BETA = False +ALTERNATE_RESOLUTIONS = False + + +import archive +import envelope +import late +import login +import signature +from test import MailTest + +def signcheck(data): + yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2) + yield data + + +def gen_checks(submissions_dir, check_content=False): + if HARD_MAX_SUBMISSION is not None and check_content: + yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION]) + else: + yield signcheck + yield (envelope.check, [GNUPG_DIRECTORY, BETA]) + yield (signature.check, [GNUPG_DIRECTORY]) + yield (login.check, ["/home/nemunaire/workspace/check_mail/SRS2017.csv"]) + if check_content: + yield archive.find + yield ( archive.hash_archive, [submissions_dir] ) + yield archive.guess_mime + yield ( archive.extract, [submissions_dir] ) + + +def respondmail(to, subject, ref, checks): + from email.message import EmailMessage + + if not isinstance(checks, list): + checks = [c for c in checks] + + # Show only the first message if there is one ACCEPT + if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA): + maxitem = checks[0] + for item in checks: + lvl, tests, final_decision = item + if maxitem[0] < lvl: + maxitem = item + if final_decision == "ACCEPT" or final_decision == "SKIP": + return respondmail(to, subject, ref, [(lvl, tests, final_decision)]) + # Display the most upper error + if not ALTERNATE_RESOLUTIONS: + return respondmail(to, subject, ref, [maxitem]) + + msg = EmailMessage() + msg["X-loop"] = "virli" + msg["From"] = FROM + msg["To"] = to + if ref is not None: + msg["References"] = ref + msg["In-Reply-To"] = ref + msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject + + test = None + final_decision = "REJECT" + fmt = "" + for lvl, tests, final_decision in checks: + if fmt != "": + fmt += '\n============ There is also another resolution: ============\n\n' + for test in tests: + fmt += test.valuestr + fmt += test.title + fmt += '\n' + if test is not None and test.details is not None and test.details != "": + fmt += '\n' + fmt += test.details.strip() + fmt += '\n' + + if final_decision == "SKIP": + fmt += '\nI stopped here the email analysis.' + else: + fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.' + + msg.set_content("""Hi! + +This is the automatic e-mail analyzer in charge of checking your work. + +Here is the detailed report for your submission: + +""" + fmt + """ + +Sincerely, + +-- \nAutomatic e-mail checker +running for nemunaire@nemunai.re""") + + import smtplib + with smtplib.SMTP("localhost") as smtp: + if SEND_TO_REALUSER: + smtp.send_message(msg) + smtp.send_message(msg, to_addrs=["virli-report@nemunai.re"]) + else: + print(msg.as_string()) + + +def readmail(fp): + cnt = email.message_from_binary_file(fp) + frm = cnt.get("From") or "someone" + subject = cnt.get("Subject") or "your mail" + ref = cnt.get("Message-ID") or "" + + return cnt, frm, subject, ref + + +def check_mail(cnt, submissions_dir, check_content=False): + results = [] + + # sentinel + results.append([(None, [cnt])]) + + lvl = 0 + for check in gen_checks(submissions_dir=submissions_dir, check_content=check_content): + lvl += 1 + curr = [] + curc = [] + + for parent in range(len(results[-1])): + cnt = results[-1][parent][1][-1] + + if cnt is True or cnt is None or cnt is False: + res = [] + for i in range(len(results) - 1, 0, -1): + for m in reversed(results[i][parent][1][:-1]): + res.append(m) + parent = results[i][parent][0] + res.reverse() + if cnt is None: final = "REJECT" + elif cnt is False: final = "SKIP" + else: final = "ACCEPT" + yield lvl, res, final + continue + + if isinstance(check, tuple): + chk, params = check + res = chk(cnt, *params) + else: + res = check(cnt) + + for r in res: + curc.append(r) + if not isinstance(r, MailTest): + curr.append((parent, curc)) + curc = [] + + if len(curc) > 0: + curc.append(None) + curr.append((parent, curc)) + curc = [] + results.append(curr) + + for parent in range(len(results[-1])): + res = [] + cnt = results[-1][parent][1][-1] + for i in range(len(results) - 1, 0, -1): + for m in reversed(results[i][parent][1][:-1]): + res.append(m) + parent = results[i][parent][0] + res.reverse() + if cnt is None: final = "REJECT" + elif cnt is False: final = "SKIP" + else: final = "ACCEPT" + yield len(results), res, final + + +if __name__ == '__main__': + os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)) + GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY) + if not os.path.isdir(GNUPG_DIRECTORY): + os.mkdir(GNUPG_DIRECTORY) + + # Parse command line arguments + import argparse + parser = argparse.ArgumentParser() + + parser.add_argument("-s", "--sign", action="store_true", + help="limit check to signature") + + parser.add_argument("--real-send", action="store_true", + help="Effectively sent mail to real users") + + parser.add_argument('--soft-max-submission', default="thursday 8:42", + help="allow submission until this hour") + + parser.add_argument('--hard-max-submission', default="thursday 9:21", + help="allow submission until this hour") + + parser.add_argument('--submissions', default="/tmp/rendus", + help="directory where store submissions") + + parser.add_argument('--beta', action="store_true", + help="enable beta features") + + parser.add_argument('--alternate-resolutions', action="store_true", + help="enable if you want to display alternate resolutions") + + args = parser.parse_args() + + with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: + SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode(), "%Y-%m-%dT%H:%M:%S%z") + + with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: + HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode(), "%Y-%m-%dT%H:%M:%S%z") + + ALTERNATE_RESOLUTIONS = args.alternate_resolutions + SEND_TO_REALUSER = args.real_send + BETA = args.beta + + cnt, frm, subject, ref = readmail(sys.stdin.buffer) + respondmail(frm, subject, ref, [c for c in check_mail(cnt, submissions_dir=args.submissions, check_content=not args.sign)]) diff --git a/envelope.py b/envelope.py new file mode 100644 index 0000000..888f544 --- /dev/null +++ b/envelope.py @@ -0,0 +1,94 @@ +import re +import subprocess + +from test import MailTest + + +def import_pubkey(key, GNUPG_DIRECTORY): + with subprocess.Popen(["gpg", + "--homedir=" + GNUPG_DIRECTORY, + "--batch", + "--import", + "-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + p.stdin.write(key) + p.stdin.close() + p.wait() + gpg_output = p.stderr.read().decode() + if p.returncode == 0: + yield MailTest("New PGP key successfully imported:", details=gpg_output) + yield False + else: + yield MailTest("An error occurs during PGP key importation:", details=gpg_output) + + +def assume_rfc3156(msg): + if msg.get_param("protocol") is None or msg.get_param("protocol") != "application/pgp-signature" or msg.get_payload(1).get_content_type() != "application/pgp-signature": + yield MailTest("Message treated as RFC3156 due to Content-Type, but is not compliant", 1) + return + + # Extracting signature + try: + data = msg.get_payload(0) + sign = msg.get_payload(1).get_payload().encode() + + # Except an exception in the two above lines if one part doesn't exist + yield MailTest("Message treated as RFC3156: content and signature found") + + yield (data, sign) + + except IndexError: + yield MailTest("Message treated as RFC3156 due to Content-Type, but is not compliant", 1) + return + + +def assume_oldstyle(payload): + yield MailTest("Found BEGIN PGP SIGNED MESSAGE: message treated as old style PGP email.") + yield payload + + + +def check(msg, GNUPG_DIRECTORY, beta=False): + ct = msg.get_content_type() + + # First, looking for public key + for part in msg.walk(): + if part.get_content_type() == "application/pgp-keys": + yield from import_pubkey(part.get_payload(decode=True), GNUPG_DIRECTORY) + return + + if ct == "multipart/signed" and msg.is_multipart(): + yield from assume_rfc3156(msg) + + else: + yield MailTest("This is not a signed e-mail: %s." % ct, 1) + + if ct == "multipart/encrypted": + yield MailTest("As an automated service, I can't access my owner's private key. Please resend your email, unencrypted but signed.", -1) + return + + from archive import _guess_mime + + # Looking for signed content + for part in msg.walk(): + payload = part.get_payload() + if part.get_content_type() == "application/pgp-signature": + p = [x for x in msg.walk()] + for s in range(len(p) - 1, -1, -1): + spart = p[s] + if part is not spart and not spart.is_multipart(): + yield MailTest("Separate signature found. Trying it with part %d (%s) ..." % (s, spart.get_content_type()), -1) + yield (spart.get_payload(decode=True), part.get_payload(decode=True)) + + elif beta and part.get_content_type() == "application/octet-stream": + yield MailTest("Entering BETA feature of walking through mail part, looking for a submission.", 2) + print(_guess_mime(part.get_payload(decode=True))) + yield from check_binary(part.as_bytes() if part.is_multipart() else part.get_payload(decode=True)) + + elif payload is not None and not part.is_multipart() and part.get_payload(decode=True).find(b"-----BEGIN PGP SIGNED MESSAGE-----") >= 0: + res = re.match(".*(-----BEGIN PGP SIGNED MESSAGE-----(.*)-----BEGIN PGP SIGNATURE-----(.*)-----END PGP SIGNATURE-----).*", payload, re.DOTALL) + if res is not None: + yield from assume_oldstyle(payload) + else: + res = re.match(b".*(-----BEGIN PGP SIGNED MESSAGE-----(.*)-----BEGIN PGP SIGNATURE-----(.*)-----END PGP SIGNATURE-----).*", part.get_payload(decode=True), re.DOTALL) + if res is not None: + yield from assume_oldstyle(part.get_payload(decode=True)) diff --git a/gpg_status_parser.py b/gpg_status_parser.py new file mode 100644 index 0000000..9e67f7d --- /dev/null +++ b/gpg_status_parser.py @@ -0,0 +1,157 @@ +import re + +class GoodSignature: + + def __init__(self, long_keyid_or_fpr, *username): + self.long_keyid_or_fpr = long_keyid_or_fpr + self.username = " ".join(username) + + +class ExpiredSignature: + + def __init__(self, long_keyid_or_fpr, *username): + self.long_keyid_or_fpr = long_keyid_or_fpr + self.username = " ".join(username) + + +class ExpiredKey: + + def __init__(self, long_keyid_or_fpr, *username): + self.long_keyid_or_fpr = long_keyid_or_fpr + self.username = " ".join(username) + + +class RevokedKey: + + def __init__(self, long_keyid_or_fpr, *username): + self.long_keyid_or_fpr = long_keyid_or_fpr + self.username = " ".join(username) + + +class BadSignature: + + def __init__(self, long_keyid_or_fpr, *username): + self.long_keyid_or_fpr = long_keyid_or_fpr + self.username = " ".join(username) + + +class UncheckableSignature: + + def __init__(self, keyid, pkalgo, hashalgo, sig_class, time, rc): + self.keyid = keyid + self.pkalgo = pkalgo + self.hashalgo = hashalgo + self.sig_class = sig_class + self.time = time + self.rc = rc + + +class ValidSignature: + + def __init__(self, fingerprint_in_hex, sig_creation_date, sig_timestamp, expire_timestamp, sig_version, reserved, pubkey_algo, hash_algo, sig_class, primary_key_fpr=None): + self.fingerprint_in_hex = fingerprint_in_hex + self.sig_creation_date = sig_creation_date + self.sig_timestamp = sig_timestamp + self.expire_timestamp = expire_timestamp + self.sig_version = sig_version + self.reserved = reserved + self.pubkey_algo = pubkey_algo + self.hash_algo = hash_algo + self.sig_class = sig_class + self.primary_key_fpr = primary_key_fpr + + +class SignatureID: + + def __init__(self, radix64_string, sig_creation_date, sig_timestamp): + self.radix64_string = radix64_string + self.sig_creation_date = sig_creation_date + self.sig_timestamp = sig_timestamp + + +class EncryptedTo: + + def __init__(self, long_keyid, keytype, keylength): + self.long_keyid = long_keyid + self.keytype = keytype + self.keylength = keylength + + +class NoPublicKey: + + def __init__(self, long_keyid): + self.long_keyid = long_keyid + + +class _Trust: + + def __init__(self, flag, validation_model=None): + self.flag = flag + self.validation_model = validation_model + +class TrustUndefined(_Trust): + + def __str__(self): + return "unknown" + +class TrustNever(_Trust): + + def __str__(self): + return "never" + +class TrustMarginal(_Trust): + + def __str__(self): + return "marginal" + +class TrustFully(_Trust): + + def __str__(self): + return "fully" + +class TrustUltimate(_Trust): + + def __str__(self): + return "ultimate" + + +_keywords = { + "GOODSIG": GoodSignature, + "EXPSIG": ExpiredSignature, + "EXPKEYSIG": ExpiredKey, + "REVKEYSIG": RevokedKey, + "BADSIG": BadSignature, + "ERRSIG": UncheckableSignature, + "VALIDSIG": ValidSignature, + "SIG_ID": SignatureID, + "ENC_TO": EncryptedTo, + "NO_PUBKEY": NoPublicKey, + + "TRUST_UNDEFINED": TrustUndefined, + "TRUST_NEVER": TrustNever, + "TRUST_MARGINAL": TrustMarginal, + "TRUST_FULLY": TrustFully, + "TRUST_ULTIMATE": TrustUltimate, +} + +def parse(fd): + line = fd.readline() + context = {} + while line: + res = re.match(r"^\[GNUPG:\] (?P\S+)(?: (?P.*))?$", line.decode()) + if res is not None: + keyword = res.group("keyword") + args = res.group("args").split(" ") if res.group("args") is not None else [] + + if keyword == "NEWSIG": + if len(context) > 0: + yield context + context = {} + + elif keyword in _keywords: + if keyword[:5] == "TRUST": + context["TRUST"] = _keywords[keyword](*args) + context[keyword] = _keywords[keyword](*args) + line = fd.readline() + + yield context diff --git a/late.py b/late.py new file mode 100644 index 0000000..4625785 --- /dev/null +++ b/late.py @@ -0,0 +1,20 @@ +from datetime import datetime, timezone + +from test import MailTest + + +def check(cnt, hard_max_submission_date, soft_max_submission_date=None): + if soft_max_submission_date is None: + soft_max_submission_date = hard_max_submission_date + else: + soft_max_submission_date = min(soft_max_submission_date, hard_max_submission_date) + + now = datetime.now().replace(tzinfo=timezone.utc) + yield MailTest("We are %s, submission permitted until %s" % (now.strftime("%c"), soft_max_submission_date.strftime("%c")), -1) + if now > soft_max_submission_date: + discard = now > hard_max_submission_date + yield MailTest("Submissions have been closed since %s minute(s)" % int((now - soft_max_submission_date).total_seconds()/60), 1 if discard else 2) + if discard: + return + + yield cnt diff --git a/login.py b/login.py new file mode 100644 index 0000000..54313d4 --- /dev/null +++ b/login.py @@ -0,0 +1,19 @@ +import csv +from email.utils import parseaddr + +from test import MailTest + + +def check(cnt, file): + data, uname = cnt + username, address = parseaddr(uname) + + with open(file, encoding='utf-8') as fd: + people = csv.reader(fd) + for p in people: + if address.lower() == p[4].lower() or uname.lower().find(p[2].lower()) >= 0 or username.lower().replace(" ", "").find(p[0].lower().replace(" ", "")) >= 0 and username.lower().find(p[1].lower()) >= 0: + yield MailTest("Recognized as %s: %s %s." % (p[2], p[1], p[0])) + yield data, p[2] + return + + yield MailTest("The username of your key is not explicit, I can't find you.", 1) diff --git a/signature.py b/signature.py new file mode 100644 index 0000000..07222d9 --- /dev/null +++ b/signature.py @@ -0,0 +1,128 @@ +from datetime import datetime +import os +import tempfile +import subprocess +import sys + +from test import MailTest +import gpg_status_parser + + +def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""): + if len(gpg_status) != 1: + yield MailTest("Too much status, please fill a bug report.", 1, details=gpg_output) + return + + ctx = gpg_status[0] + + if gpg_rcode == 0 and "VALIDSIG" in ctx: + yield MailTest("Signed with key: 0x%s, on %s" % (ctx["VALIDSIG"].fingerprint_in_hex[24:], datetime.fromtimestamp(int(ctx["VALIDSIG"].sig_timestamp)))) + if "EXPKEYSIG" in ctx: + yield MailTest("Your key has expired.", 1) + return + if "EXPSIG" in ctx: + yield MailTest("The signature has expired.", 1) + return + if "TRUST_NEVER" in ctx: + yield MailTest("Your trust level is %s." % ctx["TRUST_NEVER"], 1) + return + if "GOODSIG" in ctx: + yield MailTest("Signature made by %s%s" % (ctx["GOODSIG"].username, (" [%s]" % ctx["TRUST"]) if "TRUST" in ctx else ""), -1) + if "TRUST_UNDEFINED" in ctx: + yield MailTest("Your trust level is %s. Consider asking other people to sign your key." % ctx["TRUST_UNDEFINED"], 2) + yield data, ctx["GOODSIG"].username + else: + yield MailTest("Bad signature. Here is the gnupg output:", 1, details=gpg_output.strip()) + + +def check(cnt, GNUPG_DIRECTORY): + if len(cnt) == 2: + yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY) + else: + yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY) + + +def check_sep(data, sign, GNUPG_DIRECTORY): + gpg_output = "" + gpg_status = [] + gpg_rcode = None + try: + f = tempfile.NamedTemporaryFile(delete=False) + f.write(sign) + f.close() + + with subprocess.Popen(["gpg", + "--homedir=" + GNUPG_DIRECTORY, + "--status-fd=1", + "--auto-key-retrieve", + "--auto-key-locate=clear,local,pka,dane,cert,keyserver", + "--keyserver=pool.sks-keyservers.net", + "--quiet", + "--batch", + "--verify", + f.name, + "-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + if isinstance(data, bytes): + bdata = data + else: + bdata = data.as_bytes() + if not bdata.find(b'\r\n') >= 0: + bdata.replace(b'\n', b'\r\n') # Windows hack + p.stdin.write(bdata) + p.stdin.close() + + gpg_status = [l for l in gpg_status_parser.parse(p.stdout)] + p.wait() + gpg_output = p.stderr.read() + gpg_rcode = p.returncode + + except Exception as e: + yield MailTest("An error occured: %s" % e, 1) + return + finally: + os.unlink(f.name) + + yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8')) + + +def check_merged(bdata, GNUPG_DIRECTORY): + f = tempfile.NamedTemporaryFile() + f.close() + + gpg_output = "" + gpg_status = [] + gpg_rcode = None + try: + with subprocess.Popen(["gpg", + "--homedir=" + GNUPG_DIRECTORY, + "--status-fd=1", + "--auto-key-retrieve", + "--auto-key-locate=clear,local,pka,dane,cert,keyserver", + "--keyserver=pool.sks-keyservers.net", + "--quiet", + "--batch", + "--output", + f.name, + "-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + #if not bdata.find('\r\n') >= 0: + # bdata = bdata.replace('\n', '\r\n') # Windows hack + p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata) + p.stdin.close() + + gpg_status = [l for l in gpg_status_parser.parse(p.stdout)] + p.wait() + gpg_output = p.stderr.read() + gpg_rcode = p.returncode + + except Exception as e: + yield MailTest("An error occured: %s" % e, 1) + return + + if os.path.exists(f.name): + with open(f.name, 'rb') as fp: + bdata = fp.read() + os.unlink(f.name) + else: + bdata = None + + yield from verify_sign(bdata, gpg_rcode, gpg_status, gpg_output.decode('utf-8')) diff --git a/test.py b/test.py new file mode 100644 index 0000000..a844484 --- /dev/null +++ b/test.py @@ -0,0 +1,26 @@ +class MailTest: + + def __init__(self, title, value=0, details=None): + self._title = title + self._value = value + self._details = details + + @property + def title(self): + return self._title + + @property + def value(self): + return self._value + + @property + def valuestr(self): + if self._value < -1: return " " + elif self._value < 0: return "[ ] " + elif self._value == 0: return "[ OK ] " + elif self._value == 2: return "[WARN] " + elif self._value == 1: return "[FAIL] " + + @property + def details(self): + return self._details