Peret checks PGP signed mails and extract submissions
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
12 KiB

#!/usr/bin/env python3
from datetime import datetime
import email
import email.policy
import os
from os.path import exists
import sys
import stat
import subprocess
GNUPG_DIRECTORY = "~/.gnupg-checker"
SOFT_MAX_SUBMISSION = None
HARD_MAX_SUBMISSION = None
FROM = "Peret - Automatic Mail Checker <peret@nemunai.re>"
SEND_TO_REALUSER = False
REVIEW_BEFORE_SEND = False
BETA = False
ALTERNATE_RESOLUTIONS = False
import archive
import envelope
import late
import login
import signature
from test import MailTest
import tests
def signcheck(data):
yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2)
yield data
def relatesTo(data, submissions_dir):
yield MailTest("This is the submission interface for %s." % os.path.basename(submissions_dir), -1)
yield data
def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
if check_content:
yield (relatesTo, [submissions_dir])
else:
yield signcheck
if gpgmail is None or not gpgmail.valid:
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
yield (signature.check, [GNUPG_DIRECTORY])
else:
yield (envelope.skip, [gpgmail])
yield (login.check, ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"])
if check_content:
if HARD_MAX_SUBMISSION is not None:
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
yield archive.find
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
yield archive.guess_mime
yield ( archive.extract, [submissions_dir] )
if exists(submissions_dir + ".sh"):
yield ( tests.run_script, [submissions_dir + ".sh"] )
def respondissueemail(to, subject, ref, initial_to=None):
from email.message import EmailMessage
msg = EmailMessage()
msg["X-loop"] = "peret"
msg["From"] = FROM
msg["To"] = to
if ref is not None:
msg["References"] = ref
msg["In-Reply-To"] = ref
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
msg.set_content("""Hi!
This is the automatic e-mail analyzer in charge of checking your work.
I am currently facing a problem to analyze your e-mail.
You'll automatically receive a new response as soon as the problem is corrected.
You can continue to submit newer submissions if needed, the last one will be kept, as usual.
Sincerely,
-- \nAutomatic e-mail checker
running for nemunaire@nemunai.re""")
import smtplib
with smtplib.SMTP("localhost") as smtp:
smtp.starttls()
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
print(msg.as_string())
if REVIEW_BEFORE_SEND:
import time
for i in range(15):
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if SEND_TO_REALUSER:
smtp.send_message(msg)
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
def respondmail(to, subject, ref, checks, initial_to=None):
from email.message import EmailMessage
if not isinstance(checks, list):
checks = [c for c in checks]
# Show only the first message if there is one ACCEPT
if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA):
maxitem = checks[0]
for item in checks:
lvl, tests, final_decision = item
if maxitem[0] < lvl:
maxitem = item
if final_decision == "ACCEPT" or final_decision == "SKIP":
return respondmail(to, subject, ref, [(lvl, tests, final_decision)], initial_to)
# Display the most upper error
if not ALTERNATE_RESOLUTIONS:
return respondmail(to, subject, ref, [maxitem], initial_to)
msg = EmailMessage()
msg["X-loop"] = "peret"
msg["From"] = FROM
msg["To"] = to
if ref is not None:
msg["References"] = ref
msg["In-Reply-To"] = ref
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
test = None
final_decision = "REJECT"
fmt = ""
for lvl, tests, final_decision in checks:
if fmt != "":
fmt += '\n============ There is also another resolution: ============\n\n'
lasttest = None
for test in tests:
if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS:
continue
else:
lasttest = test.title
fmt += test.valuestr
fmt += test.title
fmt += '\n'
if test is not None and test.details is not None and test.details != "":
fmt += '\n'
fmt += test.details.strip()
fmt += '\n'
if final_decision == "SKIP":
fmt += '\nI stopped here the email analysis.'
else:
fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.'
msg.set_content("""Hi!
This is the automatic e-mail analyzer in charge of checking your work.
Here is the detailed report for your submission:
""" + fmt + """
Sincerely,
-- \nAutomatic e-mail checker
running for nemunaire@nemunai.re""")
import smtplib
with smtplib.SMTP("localhost") as smtp:
smtp.starttls()
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
print(msg.as_string())
if REVIEW_BEFORE_SEND:
import time
for i in range(15):
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if SEND_TO_REALUSER:
smtp.send_message(msg)
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
def readmail(fp):
import gnupg
import gnupg_mail
theEMail = fp.read()
try:
gpgmail = gnupg_mail.Message(theEMail.decode(), settings=gnupg_mail.Settings(log_level='debug',require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY))
except:
gpgmail = None
cnt = email.message_from_bytes(theEMail, policy=email.policy.default)
frm = cnt.get("From") or "someone"
subject = cnt.get("Subject") or "your mail"
ref = cnt.get("Message-ID") or ""
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
return gpgmail, cnt, frm, subject, ref, to
def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
results = []
# sentinel
results.append([(None, [cnt])])
lvl = 0
for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
lvl += 1
curr = []
curc = []
for parent in range(len(results[-1])):
cnt = results[-1][parent][1][-1]
if cnt is True or cnt is None or cnt is False:
res = []
for i in range(len(results) - 1, 0, -1):
for m in reversed(results[i][parent][1][:-1]):
res.append(m)
parent = results[i][parent][0]
res.reverse()
if cnt is None: final = "REJECT"
elif cnt is False: final = "SKIP"
else: final = "ACCEPT"
yield lvl, res, final
continue
if isinstance(check, tuple):
chk, params = check
res = chk(cnt, *params)
else:
res = check(cnt)
for r in res:
curc.append(r)
if not isinstance(r, MailTest):
curr.append((parent, curc))
curc = []
if len(curc) > 0:
curc.append(None)
curr.append((parent, curc))
curc = []
results.append(curr)
for parent in range(len(results[-1])):
res = []
cnt = results[-1][parent][1][-1]
for i in range(len(results) - 1, 0, -1):
for m in reversed(results[i][parent][1][:-1]):
res.append(m)
parent = results[i][parent][0]
res.reverse()
if cnt is None: final = "REJECT"
elif cnt is False: final = "SKIP"
else: final = "ACCEPT"
yield len(results), res, final
if __name__ == '__main__':
os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR))
GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY)
if not os.path.isdir(GNUPG_DIRECTORY):
os.mkdir(GNUPG_DIRECTORY)
# Parse command line arguments
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-R", "--refresh-keys", action="store_true",
help="refresh GnuPG keyring")
parser.add_argument("-s", "--sign", action="store_true",
help="limit check to signature")
parser.add_argument("--real-send", action="store_true",
help="Effectively sent mail to real users")
parser.add_argument('--soft-max-submission', default="thursday 8:42",
help="allow submission until this hour")
parser.add_argument('--hard-max-submission', default="thursday 9:21",
help="allow submission until this hour")
parser.add_argument('--skip-max-submission', action="store_true",
help="skip submission date check")
parser.add_argument('--submissions', default="/tmp/rendus",
help="directory where store submissions")
parser.add_argument('--expected-submission-hash',
help="imposed tarball hash")
parser.add_argument('--review-before-send', action="store_true",
help="Review the e-mail to be sent before sending it")
parser.add_argument('--skip-public-key', action="store_true",
help="enable if you want to skip public key discovery through attachments")
parser.add_argument('--issue-thunderbird91', action="store_true",
help="enable issue report for thunderbird91")
parser.add_argument('--beta', action="store_true",
help="enable beta features")
parser.add_argument('--alternate-resolutions', action="store_true",
help="enable if you want to display alternate resolutions")
args = parser.parse_args()
if args.refresh_keys:
subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--batch", "--refresh-keys"])
if not args.skip_max_submission:
with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
ALTERNATE_RESOLUTIONS = args.alternate_resolutions
SEND_TO_REALUSER = args.real_send
REVIEW_BEFORE_SEND = args.review_before_send
BETA = args.beta
gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
if args.issue_thunderbird91:
respondissueemail(frm, subject, ref, initial_to=to)
else:
respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to)