Peret checks PGP signed mails and extract submissions
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
9.3 KiB

#!/usr/bin/env python3
from datetime import datetime
import email
import email.policy
import os
import sys
import stat
import subprocess
GNUPG_DIRECTORY = "~/.gnupg-checker"
SOFT_MAX_SUBMISSION = None
HARD_MAX_SUBMISSION = None
FROM = "Peret - Automatic Mail Checker <peret@nemunai.re>"
SEND_TO_REALUSER = False
REVIEW_BEFORE_SEND = False
BETA = False
ALTERNATE_RESOLUTIONS = False
import archive
import envelope
import late
import login
import signature
from test import MailTest
def signcheck(data):
yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2)
yield data
def relatesTo(data, submissions_dir):
yield MailTest("This is the submission interface for %s." % os.path.basename(submissions_dir), -1)
yield data
def gen_checks(submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
if check_content:
yield (relatesTo, [submissions_dir])
if HARD_MAX_SUBMISSION is not None:
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
else:
yield signcheck
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
yield (signature.check, [GNUPG_DIRECTORY])
yield (login.check, ["/home/nemunaire/workspace/check_mail/SRS2017.csv"])
if check_content:
yield archive.find
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
yield archive.guess_mime
yield ( archive.extract, [submissions_dir] )
def respondmail(to, subject, ref, checks, initial_to=None):
from email.message import EmailMessage
if not isinstance(checks, list):
checks = [c for c in checks]
# Show only the first message if there is one ACCEPT
if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA):
maxitem = checks[0]
for item in checks:
lvl, tests, final_decision = item
if maxitem[0] < lvl:
maxitem = item
if final_decision == "ACCEPT" or final_decision == "SKIP":
return respondmail(to, subject, ref, [(lvl, tests, final_decision)])
# Display the most upper error
if not ALTERNATE_RESOLUTIONS:
return respondmail(to, subject, ref, [maxitem])
msg = EmailMessage()
msg["X-loop"] = "peret"
msg["From"] = FROM
msg["To"] = to
if ref is not None:
msg["References"] = ref
msg["In-Reply-To"] = ref
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
test = None
final_decision = "REJECT"
fmt = ""
for lvl, tests, final_decision in checks:
if fmt != "":
fmt += '\n============ There is also another resolution: ============\n\n'
for test in tests:
fmt += test.valuestr
fmt += test.title
fmt += '\n'
if test is not None and test.details is not None and test.details != "":
fmt += '\n'
fmt += test.details.strip()
fmt += '\n'
if final_decision == "SKIP":
fmt += '\nI stopped here the email analysis.'
else:
fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.'
msg.set_content("""Hi!
This is the automatic e-mail analyzer in charge of checking your work.
Here is the detailed report for your submission:
""" + fmt + """
Sincerely,
-- \nAutomatic e-mail checker
running for nemunaire@nemunai.re""")
import smtplib
with smtplib.SMTP("localhost") as smtp:
smtp.starttls()
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
print(msg.as_string())
if REVIEW_BEFORE_SEND:
import time
for i in range(15):
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if SEND_TO_REALUSER:
smtp.send_message(msg)
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
def readmail(fp):
cnt = email.message_from_binary_file(fp, policy=email.policy.default)
frm = cnt.get("From") or "someone"
subject = cnt.get("Subject") or "your mail"
ref = cnt.get("Message-ID") or ""
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
return cnt, frm, subject, ref, to
def check_mail(cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
results = []
# sentinel
results.append([(None, [cnt])])
lvl = 0
for check in gen_checks(submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
lvl += 1
curr = []
curc = []
for parent in range(len(results[-1])):
cnt = results[-1][parent][1][-1]
if cnt is True or cnt is None or cnt is False:
res = []
for i in range(len(results) - 1, 0, -1):
for m in reversed(results[i][parent][1][:-1]):
res.append(m)
parent = results[i][parent][0]
res.reverse()
if cnt is None: final = "REJECT"
elif cnt is False: final = "SKIP"
else: final = "ACCEPT"
yield lvl, res, final
continue
if isinstance(check, tuple):
chk, params = check
res = chk(cnt, *params)
else:
res = check(cnt)
for r in res:
curc.append(r)
if not isinstance(r, MailTest):
curr.append((parent, curc))
curc = []
if len(curc) > 0:
curc.append(None)
curr.append((parent, curc))
curc = []
results.append(curr)
for parent in range(len(results[-1])):
res = []
cnt = results[-1][parent][1][-1]
for i in range(len(results) - 1, 0, -1):
for m in reversed(results[i][parent][1][:-1]):
res.append(m)
parent = results[i][parent][0]
res.reverse()
if cnt is None: final = "REJECT"
elif cnt is False: final = "SKIP"
else: final = "ACCEPT"
yield len(results), res, final
if __name__ == '__main__':
os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR))
GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY)
if not os.path.isdir(GNUPG_DIRECTORY):
os.mkdir(GNUPG_DIRECTORY)
# Parse command line arguments
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-R", "--refresh-keys", action="store_true",
help="refresh GnuPG keyring")
parser.add_argument("-s", "--sign", action="store_true",
help="limit check to signature")
parser.add_argument("--real-send", action="store_true",
help="Effectively sent mail to real users")
parser.add_argument('--soft-max-submission', default="thursday 8:42",
help="allow submission until this hour")
parser.add_argument('--hard-max-submission', default="thursday 9:21",
help="allow submission until this hour")
parser.add_argument('--skip-max-submission', action="store_true",
help="skip submission date check")
parser.add_argument('--submissions', default="/tmp/rendus",
help="directory where store submissions")
parser.add_argument('--expected-submission-hash',
help="imposed tarball hash")
parser.add_argument('--review-before-send', action="store_true",
help="Review the e-mail to be sent before sending it")
parser.add_argument('--skip-public-key', action="store_true",
help="enable if you want to skip public key discovery through attachments")
parser.add_argument('--beta', action="store_true",
help="enable beta features")
parser.add_argument('--alternate-resolutions', action="store_true",
help="enable if you want to display alternate resolutions")
args = parser.parse_args()
if args.refresh_keys:
subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--batch", "--refresh-keys"])
if not args.skip_max_submission:
with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
ALTERNATE_RESOLUTIONS = args.alternate_resolutions
SEND_TO_REALUSER = args.real_send
REVIEW_BEFORE_SEND = args.review_before_send
BETA = args.beta
cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
respondmail(frm, subject, ref, [c for c in check_mail(cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], to)