#!/usr/bin/env python3 from datetime import datetime import email import email.policy import os from os.path import exists import sys import stat import subprocess GNUPG_DIRECTORY = "~/.gnupg-checker" SOFT_MAX_SUBMISSION = None HARD_MAX_SUBMISSION = None FROM = "Peret - Automatic Mail Checker " SEND_TO_REALUSER = False REVIEW_BEFORE_SEND = False BETA = False ALTERNATE_RESOLUTIONS = False STUDENTS_LISTS = ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"] import archive import envelope import late import login import signature from test import MailTest import tests def signcheck(data): yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2) yield data def relatesTo(data, submissions_dir): yield MailTest("This is the submission interface for %s." % os.path.basename(submissions_dir), -1) yield data def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): if check_content: yield (relatesTo, [submissions_dir]) else: yield signcheck if gpgmail is None or not gpgmail.valid: yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA]) yield (signature.check, [GNUPG_DIRECTORY]) else: yield (envelope.skip, [gpgmail]) yield (login.check, STUDENTS_LISTS) if check_content: if HARD_MAX_SUBMISSION is not None: yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION]) yield archive.find yield ( archive.hash_archive, [submissions_dir, check_submission_hash] ) yield archive.guess_mime yield ( archive.extract, [submissions_dir] ) if exists(submissions_dir + ".sh"): yield ( tests.run_script, [submissions_dir + ".sh"] ) def respondissueemail(to, subject, ref, initial_to=None): from email.message import EmailMessage msg = EmailMessage() msg["X-loop"] = "peret" msg["From"] = FROM msg["To"] = to if ref is not None: msg["References"] = ref msg["In-Reply-To"] = ref msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject msg.set_content("""Hi! This is the automatic e-mail analyzer in charge of checking your work. I am currently facing a problem to analyze your e-mail. You'll automatically receive a new response as soon as the problem is corrected. You can continue to submit newer submissions if needed, the last one will be kept, as usual. Sincerely, -- \nAutomatic e-mail checker running for nemunaire@nemunai.re""") import smtplib with smtplib.SMTP("localhost") as smtp: smtp.starttls() if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND: print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re") print(msg.as_string()) if REVIEW_BEFORE_SEND: import time for i in range(15): sys.stdout.write(".") sys.stdout.flush() time.sleep(1) if SEND_TO_REALUSER: smtp.send_message(msg) smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"]) def respondmail(to, subject, ref, checks, initial_to=None, ps=""): from email.message import EmailMessage if not isinstance(checks, list): checks = [c for c in checks] # Show only the first message if there is one ACCEPT if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA): maxitem = checks[0] for item in checks: lvl, tests, final_decision = item if maxitem[0] < lvl: maxitem = item if final_decision == "ACCEPT" or final_decision == "SKIP": return respondmail(to, subject, ref, [(lvl, tests, final_decision)], initial_to) # Display the most upper error if not ALTERNATE_RESOLUTIONS: return respondmail(to, subject, ref, [maxitem], initial_to) msg = EmailMessage() msg["X-loop"] = "peret" msg["From"] = FROM msg["To"] = to if ref is not None: msg["References"] = ref msg["In-Reply-To"] = ref msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject test = None final_decision = "REJECT" fmt = "" for lvl, tests, final_decision in checks: if fmt != "": fmt += '\n============ There is also another resolution: ============\n\n' lasttest = None for test in tests: if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS: continue else: lasttest = test.title fmt += test.valuestr fmt += test.title fmt += '\n' if test is not None and test.details is not None and test.details != "": fmt += '\n' fmt += test.details.strip() fmt += '\n' if final_decision == "SKIP": fmt += '\nI stopped here the email analysis.' else: fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.' if ps is not None and len(ps) > 0 and final_decision == "ACCEPT": fmt += '\n\n' + ps msg.set_content("""Hi! This is the automatic e-mail analyzer in charge of checking your work. Here is the detailed report for your submission: """ + fmt + """ Sincerely, -- \nAutomatic e-mail checker running for nemunaire@nemunai.re""") import smtplib with smtplib.SMTP("localhost") as smtp: smtp.starttls() if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND: print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re") print(msg.as_string()) if REVIEW_BEFORE_SEND: import time for i in range(15): sys.stdout.write(".") sys.stdout.flush() time.sleep(1) if SEND_TO_REALUSER: smtp.send_message(msg) smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"]) def readmail(fp): import gnupg import gnupg_mail theEMail = fp.read() try: gpgmail = gnupg_mail.Message(theEMail.decode('utf-8', errors='backslashreplace'), settings=gnupg_mail.Settings(require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY)) except: gpgmail = None cnt = email.message_from_bytes(theEMail, policy=email.policy.default) frm = cnt.get("From") or "someone" subject = cnt.get("Subject") or "your mail" ref = cnt.get("Message-ID") or "" to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None return gpgmail, cnt, frm, subject, ref, to def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): results = [] # sentinel results.append([(None, [cnt])]) lvl = 0 for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key): lvl += 1 curr = [] curc = [] for parent in range(len(results[-1])): cnt = results[-1][parent][1][-1] if cnt is True or cnt is None or cnt is False: res = [] for i in range(len(results) - 1, 0, -1): for m in reversed(results[i][parent][1][:-1]): res.append(m) parent = results[i][parent][0] res.reverse() if cnt is None: final = "REJECT" elif cnt is False: final = "SKIP" else: final = "ACCEPT" yield lvl, res, final continue if isinstance(check, tuple): chk, params = check res = chk(cnt, *params) else: res = check(cnt) for r in res: curc.append(r) if not isinstance(r, MailTest): curr.append((parent, curc)) curc = [] if len(curc) > 0: curc.append(None) curr.append((parent, curc)) curc = [] results.append(curr) for parent in range(len(results[-1])): res = [] cnt = results[-1][parent][1][-1] for i in range(len(results) - 1, 0, -1): for m in reversed(results[i][parent][1][:-1]): res.append(m) parent = results[i][parent][0] res.reverse() if cnt is None: final = "REJECT" elif cnt is False: final = "SKIP" else: final = "ACCEPT" yield len(results), res, final if __name__ == '__main__': os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)) GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY) if not os.path.isdir(GNUPG_DIRECTORY): os.mkdir(GNUPG_DIRECTORY) # Parse command line arguments import argparse parser = argparse.ArgumentParser() parser.add_argument("-R", "--refresh-keys", action="store_true", help="refresh GnuPG keyring") parser.add_argument("-s", "--sign", action="store_true", help="limit check to signature") parser.add_argument("--real-send", action="store_true", help="Effectively sent mail to real users") parser.add_argument('--soft-max-submission', default="thursday 8:42", help="allow submission until this hour") parser.add_argument('--hard-max-submission', default="thursday 9:21", help="allow submission until this hour") parser.add_argument('--skip-max-submission', action="store_true", help="skip submission date check") parser.add_argument('--submissions', default="/tmp/rendus", help="directory where store submissions") parser.add_argument('--expected-submission-hash', help="imposed tarball hash") parser.add_argument('--review-before-send', action="store_true", help="Review the e-mail to be sent before sending it") parser.add_argument('--skip-public-key', action="store_true", help="enable if you want to skip public key discovery through attachments") parser.add_argument('--issue-thunderbird91', action="store_true", help="enable issue report for thunderbird91") parser.add_argument('--beta', action="store_true", help="enable beta features") parser.add_argument('--alternate-resolutions', action="store_true", help="enable if you want to display alternate resolutions") parser.add_argument('-l', '--students-list', nargs='*', help="students list(s) to use for the check") parser.add_argument('-ps', '--ps', help="Include a PostScriptum info when valid") args = parser.parse_args() if args.refresh_keys: subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--batch", "--refresh-keys"]) if not args.skip_max_submission: with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z") with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z") if args.students_list is not None: STUDENTS_LISTS = args.students_list ALTERNATE_RESOLUTIONS = args.alternate_resolutions SEND_TO_REALUSER = args.real_send REVIEW_BEFORE_SEND = args.review_before_send BETA = args.beta gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer) if args.issue_thunderbird91: respondissueemail(frm, subject, ref, initial_to=to) else: respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to, ps=args.ps)