You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
352 lines
12 KiB
Python
352 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from datetime import datetime
|
|
import email
|
|
import email.policy
|
|
import os
|
|
from os.path import exists
|
|
import sys
|
|
import stat
|
|
import subprocess
|
|
|
|
GNUPG_DIRECTORY = "~/.gnupg-checker"
|
|
SOFT_MAX_SUBMISSION = None
|
|
HARD_MAX_SUBMISSION = None
|
|
FROM = "Peret - Automatic Mail Checker <peret@nemunai.re>"
|
|
SEND_TO_REALUSER = False
|
|
REVIEW_BEFORE_SEND = False
|
|
BETA = False
|
|
ALTERNATE_RESOLUTIONS = False
|
|
STUDENTS_LISTS = ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"]
|
|
|
|
import archive
|
|
import envelope
|
|
import late
|
|
import login
|
|
import signature
|
|
from test import MailTest
|
|
import tests
|
|
|
|
def signcheck(data):
|
|
yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2)
|
|
yield data
|
|
|
|
def relatesTo(data, submissions_dir):
|
|
yield MailTest("This is the submission interface for %s." % os.path.basename(submissions_dir), -1)
|
|
yield data
|
|
|
|
|
|
def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
|
|
if check_content:
|
|
yield (relatesTo, [submissions_dir])
|
|
else:
|
|
yield signcheck
|
|
|
|
if gpgmail is None or not gpgmail.valid:
|
|
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
|
|
yield (signature.check, [GNUPG_DIRECTORY])
|
|
else:
|
|
yield (envelope.skip, [gpgmail])
|
|
|
|
yield (login.check, STUDENTS_LISTS)
|
|
if check_content:
|
|
if HARD_MAX_SUBMISSION is not None:
|
|
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
|
|
yield archive.find
|
|
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
|
|
yield archive.guess_mime
|
|
yield ( archive.extract, [submissions_dir] )
|
|
if exists(submissions_dir + ".sh"):
|
|
yield ( tests.run_script, [submissions_dir + ".sh"] )
|
|
|
|
|
|
def respondissueemail(to, subject, ref, initial_to=None):
|
|
from email.message import EmailMessage
|
|
|
|
msg = EmailMessage()
|
|
msg["X-loop"] = "peret"
|
|
msg["From"] = FROM
|
|
msg["To"] = to
|
|
if ref is not None:
|
|
msg["References"] = ref
|
|
msg["In-Reply-To"] = ref
|
|
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
|
|
|
|
msg.set_content("""Hi!
|
|
|
|
This is the automatic e-mail analyzer in charge of checking your work.
|
|
|
|
I am currently facing a problem to analyze your e-mail.
|
|
|
|
You'll automatically receive a new response as soon as the problem is corrected.
|
|
|
|
You can continue to submit newer submissions if needed, the last one will be kept, as usual.
|
|
|
|
Sincerely,
|
|
|
|
-- \nAutomatic e-mail checker
|
|
running for nemunaire@nemunai.re""")
|
|
|
|
import smtplib
|
|
with smtplib.SMTP("localhost") as smtp:
|
|
smtp.starttls()
|
|
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
|
|
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
|
|
print(msg.as_string())
|
|
if REVIEW_BEFORE_SEND:
|
|
import time
|
|
for i in range(15):
|
|
sys.stdout.write(".")
|
|
sys.stdout.flush()
|
|
time.sleep(1)
|
|
if SEND_TO_REALUSER:
|
|
smtp.send_message(msg)
|
|
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
|
|
|
|
|
|
def respondmail(to, subject, ref, checks, initial_to=None, ps=""):
|
|
from email.message import EmailMessage
|
|
|
|
if not isinstance(checks, list):
|
|
checks = [c for c in checks]
|
|
|
|
# Show only the first message if there is one ACCEPT
|
|
if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA):
|
|
maxitem = checks[0]
|
|
for item in checks:
|
|
lvl, tests, final_decision = item
|
|
if maxitem[0] < lvl:
|
|
maxitem = item
|
|
if final_decision == "ACCEPT" or final_decision == "SKIP":
|
|
return respondmail(to, subject, ref, [(lvl, tests, final_decision)], initial_to)
|
|
# Display the most upper error
|
|
if not ALTERNATE_RESOLUTIONS:
|
|
return respondmail(to, subject, ref, [maxitem], initial_to)
|
|
|
|
msg = EmailMessage()
|
|
msg["X-loop"] = "peret"
|
|
msg["From"] = FROM
|
|
msg["To"] = to
|
|
if ref is not None:
|
|
msg["References"] = ref
|
|
msg["In-Reply-To"] = ref
|
|
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
|
|
|
|
test = None
|
|
final_decision = "REJECT"
|
|
fmt = ""
|
|
for lvl, tests, final_decision in checks:
|
|
if fmt != "":
|
|
fmt += '\n============ There is also another resolution: ============\n\n'
|
|
lasttest = None
|
|
for test in tests:
|
|
if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS:
|
|
continue
|
|
else:
|
|
lasttest = test.title
|
|
fmt += test.valuestr
|
|
fmt += test.title
|
|
fmt += '\n'
|
|
if test is not None and test.details is not None and test.details != "":
|
|
fmt += '\n'
|
|
fmt += test.details.strip()
|
|
fmt += '\n'
|
|
|
|
if final_decision == "SKIP":
|
|
fmt += '\nI stopped here the email analysis.'
|
|
else:
|
|
fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.'
|
|
|
|
if ps is not None and len(ps) > 0 and final_decision == "ACCEPT":
|
|
fmt += '\n\n' + ps
|
|
|
|
msg.set_content("""Hi!
|
|
|
|
This is the automatic e-mail analyzer in charge of checking your work.
|
|
|
|
Here is the detailed report for your submission:
|
|
|
|
""" + fmt + """
|
|
|
|
Sincerely,
|
|
|
|
-- \nAutomatic e-mail checker
|
|
running for nemunaire@nemunai.re""")
|
|
|
|
import smtplib
|
|
with smtplib.SMTP("localhost") as smtp:
|
|
smtp.starttls()
|
|
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
|
|
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
|
|
print(msg.as_string())
|
|
if REVIEW_BEFORE_SEND:
|
|
import time
|
|
for i in range(15):
|
|
sys.stdout.write(".")
|
|
sys.stdout.flush()
|
|
time.sleep(1)
|
|
if SEND_TO_REALUSER:
|
|
smtp.send_message(msg)
|
|
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
|
|
|
|
|
|
def readmail(fp):
|
|
import gnupg
|
|
import gnupg_mail
|
|
|
|
theEMail = fp.read()
|
|
try:
|
|
gpgmail = gnupg_mail.Message(theEMail.decode('utf-8', errors='backslashreplace'), settings=gnupg_mail.Settings(require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY))
|
|
except:
|
|
gpgmail = None
|
|
|
|
cnt = email.message_from_bytes(theEMail, policy=email.policy.default)
|
|
frm = cnt.get("From") or "someone"
|
|
subject = cnt.get("Subject") or "your mail"
|
|
ref = cnt.get("Message-ID") or ""
|
|
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
|
|
|
|
return gpgmail, cnt, frm, subject, ref, to
|
|
|
|
|
|
def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
|
|
results = []
|
|
|
|
# sentinel
|
|
results.append([(None, [cnt])])
|
|
|
|
lvl = 0
|
|
for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
|
|
lvl += 1
|
|
curr = []
|
|
curc = []
|
|
|
|
for parent in range(len(results[-1])):
|
|
cnt = results[-1][parent][1][-1]
|
|
|
|
if cnt is True or cnt is None or cnt is False:
|
|
res = []
|
|
for i in range(len(results) - 1, 0, -1):
|
|
for m in reversed(results[i][parent][1][:-1]):
|
|
res.append(m)
|
|
parent = results[i][parent][0]
|
|
res.reverse()
|
|
if cnt is None: final = "REJECT"
|
|
elif cnt is False: final = "SKIP"
|
|
else: final = "ACCEPT"
|
|
yield lvl, res, final
|
|
continue
|
|
|
|
if isinstance(check, tuple):
|
|
chk, params = check
|
|
res = chk(cnt, *params)
|
|
else:
|
|
res = check(cnt)
|
|
|
|
for r in res:
|
|
curc.append(r)
|
|
if not isinstance(r, MailTest):
|
|
curr.append((parent, curc))
|
|
curc = []
|
|
|
|
if len(curc) > 0:
|
|
curc.append(None)
|
|
curr.append((parent, curc))
|
|
curc = []
|
|
results.append(curr)
|
|
|
|
for parent in range(len(results[-1])):
|
|
res = []
|
|
cnt = results[-1][parent][1][-1]
|
|
for i in range(len(results) - 1, 0, -1):
|
|
for m in reversed(results[i][parent][1][:-1]):
|
|
res.append(m)
|
|
parent = results[i][parent][0]
|
|
res.reverse()
|
|
if cnt is None: final = "REJECT"
|
|
elif cnt is False: final = "SKIP"
|
|
else: final = "ACCEPT"
|
|
yield len(results), res, final
|
|
|
|
|
|
if __name__ == '__main__':
|
|
os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR))
|
|
GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY)
|
|
if not os.path.isdir(GNUPG_DIRECTORY):
|
|
os.mkdir(GNUPG_DIRECTORY)
|
|
|
|
# Parse command line arguments
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("-R", "--refresh-keys", action="store_true",
|
|
help="refresh GnuPG keyring")
|
|
|
|
parser.add_argument("-s", "--sign", action="store_true",
|
|
help="limit check to signature")
|
|
|
|
parser.add_argument("--real-send", action="store_true",
|
|
help="Effectively sent mail to real users")
|
|
|
|
parser.add_argument('--soft-max-submission', default="thursday 8:42",
|
|
help="allow submission until this hour")
|
|
|
|
parser.add_argument('--hard-max-submission', default="thursday 9:21",
|
|
help="allow submission until this hour")
|
|
|
|
parser.add_argument('--skip-max-submission', action="store_true",
|
|
help="skip submission date check")
|
|
|
|
parser.add_argument('--submissions', default="/tmp/rendus",
|
|
help="directory where store submissions")
|
|
|
|
parser.add_argument('--expected-submission-hash',
|
|
help="imposed tarball hash")
|
|
|
|
parser.add_argument('--review-before-send', action="store_true",
|
|
help="Review the e-mail to be sent before sending it")
|
|
|
|
parser.add_argument('--skip-public-key', action="store_true",
|
|
help="enable if you want to skip public key discovery through attachments")
|
|
|
|
parser.add_argument('--issue-thunderbird91', action="store_true",
|
|
help="enable issue report for thunderbird91")
|
|
|
|
parser.add_argument('--beta', action="store_true",
|
|
help="enable beta features")
|
|
|
|
parser.add_argument('--alternate-resolutions', action="store_true",
|
|
help="enable if you want to display alternate resolutions")
|
|
|
|
parser.add_argument('-l', '--students-list', nargs='*',
|
|
help="students list(s) to use for the check")
|
|
|
|
parser.add_argument('-ps', '--ps',
|
|
help="Include a PostScriptum info when valid")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.refresh_keys:
|
|
subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--batch", "--refresh-keys"])
|
|
|
|
if not args.skip_max_submission:
|
|
with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
|
|
SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
|
|
|
|
with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
|
|
HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
|
|
|
|
if args.students_list is not None:
|
|
STUDENTS_LISTS = args.students_list
|
|
|
|
ALTERNATE_RESOLUTIONS = args.alternate_resolutions
|
|
SEND_TO_REALUSER = args.real_send
|
|
REVIEW_BEFORE_SEND = args.review_before_send
|
|
BETA = args.beta
|
|
|
|
gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
|
|
if args.issue_thunderbird91:
|
|
respondissueemail(frm, subject, ref, initial_to=to)
|
|
else:
|
|
respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to, ps=args.ps)
|