You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
351 lines
12 KiB
351 lines
12 KiB
#!/usr/bin/env python3 |
|
|
|
from datetime import datetime |
|
import email |
|
import email.policy |
|
import os |
|
from os.path import exists |
|
import sys |
|
import stat |
|
import subprocess |
|
|
|
GNUPG_DIRECTORY = "~/.gnupg-checker" |
|
SOFT_MAX_SUBMISSION = None |
|
HARD_MAX_SUBMISSION = None |
|
FROM = "Peret - Automatic Mail Checker <peret@nemunai.re>" |
|
SEND_TO_REALUSER = False |
|
REVIEW_BEFORE_SEND = False |
|
BETA = False |
|
ALTERNATE_RESOLUTIONS = False |
|
STUDENTS_LISTS = ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"] |
|
|
|
import archive |
|
import envelope |
|
import late |
|
import login |
|
import signature |
|
from test import MailTest |
|
import tests |
|
|
|
def signcheck(data): |
|
yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2) |
|
yield data |
|
|
|
def relatesTo(data, submissions_dir): |
|
yield MailTest("This is the submission interface for %s." % os.path.basename(submissions_dir), -1) |
|
yield data |
|
|
|
|
|
def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): |
|
if check_content: |
|
yield (relatesTo, [submissions_dir]) |
|
else: |
|
yield signcheck |
|
|
|
if gpgmail is None or not gpgmail.valid: |
|
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA]) |
|
yield (signature.check, [GNUPG_DIRECTORY]) |
|
else: |
|
yield (envelope.skip, [gpgmail]) |
|
|
|
yield (login.check, STUDENTS_LISTS) |
|
if check_content: |
|
if HARD_MAX_SUBMISSION is not None: |
|
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION]) |
|
yield archive.find |
|
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] ) |
|
yield archive.guess_mime |
|
yield ( archive.extract, [submissions_dir] ) |
|
if exists(submissions_dir + ".sh"): |
|
yield ( tests.run_script, [submissions_dir + ".sh"] ) |
|
|
|
|
|
def respondissueemail(to, subject, ref, initial_to=None): |
|
from email.message import EmailMessage |
|
|
|
msg = EmailMessage() |
|
msg["X-loop"] = "peret" |
|
msg["From"] = FROM |
|
msg["To"] = to |
|
if ref is not None: |
|
msg["References"] = ref |
|
msg["In-Reply-To"] = ref |
|
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject |
|
|
|
msg.set_content("""Hi! |
|
|
|
This is the automatic e-mail analyzer in charge of checking your work. |
|
|
|
I am currently facing a problem to analyze your e-mail. |
|
|
|
You'll automatically receive a new response as soon as the problem is corrected. |
|
|
|
You can continue to submit newer submissions if needed, the last one will be kept, as usual. |
|
|
|
Sincerely, |
|
|
|
-- \nAutomatic e-mail checker |
|
running for nemunaire@nemunai.re""") |
|
|
|
import smtplib |
|
with smtplib.SMTP("localhost") as smtp: |
|
smtp.starttls() |
|
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND: |
|
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re") |
|
print(msg.as_string()) |
|
if REVIEW_BEFORE_SEND: |
|
import time |
|
for i in range(15): |
|
sys.stdout.write(".") |
|
sys.stdout.flush() |
|
time.sleep(1) |
|
if SEND_TO_REALUSER: |
|
smtp.send_message(msg) |
|
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"]) |
|
|
|
|
|
def respondmail(to, subject, ref, checks, initial_to=None, ps=""): |
|
from email.message import EmailMessage |
|
|
|
if not isinstance(checks, list): |
|
checks = [c for c in checks] |
|
|
|
# Show only the first message if there is one ACCEPT |
|
if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA): |
|
maxitem = checks[0] |
|
for item in checks: |
|
lvl, tests, final_decision = item |
|
if maxitem[0] < lvl: |
|
maxitem = item |
|
if final_decision == "ACCEPT" or final_decision == "SKIP": |
|
return respondmail(to, subject, ref, [(lvl, tests, final_decision)], initial_to) |
|
# Display the most upper error |
|
if not ALTERNATE_RESOLUTIONS: |
|
return respondmail(to, subject, ref, [maxitem], initial_to) |
|
|
|
msg = EmailMessage() |
|
msg["X-loop"] = "peret" |
|
msg["From"] = FROM |
|
msg["To"] = to |
|
if ref is not None: |
|
msg["References"] = ref |
|
msg["In-Reply-To"] = ref |
|
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject |
|
|
|
test = None |
|
final_decision = "REJECT" |
|
fmt = "" |
|
for lvl, tests, final_decision in checks: |
|
if fmt != "": |
|
fmt += '\n============ There is also another resolution: ============\n\n' |
|
lasttest = None |
|
for test in tests: |
|
if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS: |
|
continue |
|
else: |
|
lasttest = test.title |
|
fmt += test.valuestr |
|
fmt += test.title |
|
fmt += '\n' |
|
if test is not None and test.details is not None and test.details != "": |
|
fmt += '\n' |
|
fmt += test.details.strip() |
|
fmt += '\n' |
|
|
|
if final_decision == "SKIP": |
|
fmt += '\nI stopped here the email analysis.' |
|
else: |
|
fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.' |
|
|
|
if ps is not None and len(ps) > 0 and final_decision == "ACCEPT": |
|
fmt += '\n\n' + ps |
|
|
|
msg.set_content("""Hi! |
|
|
|
This is the automatic e-mail analyzer in charge of checking your work. |
|
|
|
Here is the detailed report for your submission: |
|
|
|
""" + fmt + """ |
|
|
|
Sincerely, |
|
|
|
-- \nAutomatic e-mail checker |
|
running for nemunaire@nemunai.re""") |
|
|
|
import smtplib |
|
with smtplib.SMTP("localhost") as smtp: |
|
smtp.starttls() |
|
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND: |
|
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re") |
|
print(msg.as_string()) |
|
if REVIEW_BEFORE_SEND: |
|
import time |
|
for i in range(15): |
|
sys.stdout.write(".") |
|
sys.stdout.flush() |
|
time.sleep(1) |
|
if SEND_TO_REALUSER: |
|
smtp.send_message(msg) |
|
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"]) |
|
|
|
|
|
def readmail(fp): |
|
import gnupg |
|
import gnupg_mail |
|
|
|
theEMail = fp.read() |
|
try: |
|
gpgmail = gnupg_mail.Message(theEMail.decode('utf-8', errors='backslashreplace'), settings=gnupg_mail.Settings(require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY)) |
|
except: |
|
gpgmail = None |
|
|
|
cnt = email.message_from_bytes(theEMail, policy=email.policy.default) |
|
frm = cnt.get("From") or "someone" |
|
subject = cnt.get("Subject") or "your mail" |
|
ref = cnt.get("Message-ID") or "" |
|
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None |
|
|
|
return gpgmail, cnt, frm, subject, ref, to |
|
|
|
|
|
def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): |
|
results = [] |
|
|
|
# sentinel |
|
results.append([(None, [cnt])]) |
|
|
|
lvl = 0 |
|
for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key): |
|
lvl += 1 |
|
curr = [] |
|
curc = [] |
|
|
|
for parent in range(len(results[-1])): |
|
cnt = results[-1][parent][1][-1] |
|
|
|
if cnt is True or cnt is None or cnt is False: |
|
res = [] |
|
for i in range(len(results) - 1, 0, -1): |
|
for m in reversed(results[i][parent][1][:-1]): |
|
res.append(m) |
|
parent = results[i][parent][0] |
|
res.reverse() |
|
if cnt is None: final = "REJECT" |
|
elif cnt is False: final = "SKIP" |
|
else: final = "ACCEPT" |
|
yield lvl, res, final |
|
continue |
|
|
|
if isinstance(check, tuple): |
|
chk, params = check |
|
res = chk(cnt, *params) |
|
else: |
|
res = check(cnt) |
|
|
|
for r in res: |
|
curc.append(r) |
|
if not isinstance(r, MailTest): |
|
curr.append((parent, curc)) |
|
curc = [] |
|
|
|
if len(curc) > 0: |
|
curc.append(None) |
|
curr.append((parent, curc)) |
|
curc = [] |
|
results.append(curr) |
|
|
|
for parent in range(len(results[-1])): |
|
res = [] |
|
cnt = results[-1][parent][1][-1] |
|
for i in range(len(results) - 1, 0, -1): |
|
for m in reversed(results[i][parent][1][:-1]): |
|
res.append(m) |
|
parent = results[i][parent][0] |
|
res.reverse() |
|
if cnt is None: final = "REJECT" |
|
elif cnt is False: final = "SKIP" |
|
else: final = "ACCEPT" |
|
yield len(results), res, final |
|
|
|
|
|
if __name__ == '__main__': |
|
os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)) |
|
GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY) |
|
if not os.path.isdir(GNUPG_DIRECTORY): |
|
os.mkdir(GNUPG_DIRECTORY) |
|
|
|
# Parse command line arguments |
|
import argparse |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument("-R", "--refresh-keys", action="store_true", |
|
help="refresh GnuPG keyring") |
|
|
|
parser.add_argument("-s", "--sign", action="store_true", |
|
help="limit check to signature") |
|
|
|
parser.add_argument("--real-send", action="store_true", |
|
help="Effectively sent mail to real users") |
|
|
|
parser.add_argument('--soft-max-submission', default="thursday 8:42", |
|
help="allow submission until this hour") |
|
|
|
parser.add_argument('--hard-max-submission', default="thursday 9:21", |
|
help="allow submission until this hour") |
|
|
|
parser.add_argument('--skip-max-submission', action="store_true", |
|
help="skip submission date check") |
|
|
|
parser.add_argument('--submissions', default="/tmp/rendus", |
|
help="directory where store submissions") |
|
|
|
parser.add_argument('--expected-submission-hash', |
|
help="imposed tarball hash") |
|
|
|
parser.add_argument('--review-before-send', action="store_true", |
|
help="Review the e-mail to be sent before sending it") |
|
|
|
parser.add_argument('--skip-public-key', action="store_true", |
|
help="enable if you want to skip public key discovery through attachments") |
|
|
|
parser.add_argument('--issue-thunderbird91', action="store_true", |
|
help="enable issue report for thunderbird91") |
|
|
|
parser.add_argument('--beta', action="store_true", |
|
help="enable beta features") |
|
|
|
parser.add_argument('--alternate-resolutions', action="store_true", |
|
help="enable if you want to display alternate resolutions") |
|
|
|
parser.add_argument('-l', '--students-list', nargs='*', |
|
help="students list(s) to use for the check") |
|
|
|
parser.add_argument('-ps', '--ps', |
|
help="Include a PostScriptum info when valid") |
|
|
|
args = parser.parse_args() |
|
|
|
if args.refresh_keys: |
|
subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--batch", "--refresh-keys"]) |
|
|
|
if not args.skip_max_submission: |
|
with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: |
|
SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z") |
|
|
|
with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: |
|
HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z") |
|
|
|
if args.students_list is not None: |
|
STUDENTS_LISTS = args.students_list |
|
|
|
ALTERNATE_RESOLUTIONS = args.alternate_resolutions |
|
SEND_TO_REALUSER = args.real_send |
|
REVIEW_BEFORE_SEND = args.review_before_send |
|
BETA = args.beta |
|
|
|
gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer) |
|
if args.issue_thunderbird91: |
|
respondissueemail(frm, subject, ref, initial_to=to) |
|
else: |
|
respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to, ps=args.ps)
|
|
|