Introducing gnupg_mail to parse mail

This commit is contained in:
nemunaire 2021-09-19 19:44:50 +02:00
parent efa4cb6824
commit d13d93c39c
2 changed files with 88 additions and 12 deletions

View File

@ -34,23 +34,72 @@ def relatesTo(data, submissions_dir):
yield data yield data
def gen_checks(submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
if check_content: if check_content:
yield (relatesTo, [submissions_dir]) yield (relatesTo, [submissions_dir])
if HARD_MAX_SUBMISSION is not None:
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
else: else:
yield signcheck yield signcheck
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
yield (signature.check, [GNUPG_DIRECTORY]) if not gpgmail.valid:
yield (login.check, ["/home/nemunaire/workspace/check_mail/SRS2017.csv"]) yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
yield (signature.check, [GNUPG_DIRECTORY])
else:
yield (envelope.skip, [gpgmail])
yield (login.check, ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"])
if check_content: if check_content:
if HARD_MAX_SUBMISSION is not None:
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
yield archive.find yield archive.find
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] ) yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
yield archive.guess_mime yield archive.guess_mime
yield ( archive.extract, [submissions_dir] ) yield ( archive.extract, [submissions_dir] )
def respondissueemail(to, subject, ref, initial_to=None):
from email.message import EmailMessage
msg = EmailMessage()
msg["X-loop"] = "peret"
msg["From"] = FROM
msg["To"] = to
if ref is not None:
msg["References"] = ref
msg["In-Reply-To"] = ref
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
msg.set_content("""Hi!
This is the automatic e-mail analyzer in charge of checking your work.
I am currently facing a problem to analyze your e-mail.
You'll automatically receive a new response as soon as the problem is corrected.
You can continue to submit newer submissions if needed, the last one will be kept, as usual.
Sincerely,
-- \nAutomatic e-mail checker
running for nemunaire@nemunai.re""")
import smtplib
with smtplib.SMTP("localhost") as smtp:
smtp.starttls()
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
print(msg.as_string())
if REVIEW_BEFORE_SEND:
import time
for i in range(15):
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if SEND_TO_REALUSER:
smtp.send_message(msg)
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
def respondmail(to, subject, ref, checks, initial_to=None): def respondmail(to, subject, ref, checks, initial_to=None):
from email.message import EmailMessage from email.message import EmailMessage
@ -85,7 +134,12 @@ def respondmail(to, subject, ref, checks, initial_to=None):
for lvl, tests, final_decision in checks: for lvl, tests, final_decision in checks:
if fmt != "": if fmt != "":
fmt += '\n============ There is also another resolution: ============\n\n' fmt += '\n============ There is also another resolution: ============\n\n'
lasttest = None
for test in tests: for test in tests:
if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS:
continue
else:
lasttest = test.title
fmt += test.valuestr fmt += test.valuestr
fmt += test.title fmt += test.title
fmt += '\n' fmt += '\n'
@ -130,23 +184,29 @@ running for nemunaire@nemunai.re""")
def readmail(fp): def readmail(fp):
cnt = email.message_from_binary_file(fp, policy=email.policy.default) import gnupg
import gnupg_mail
theEMail = fp.read()
gpgmail = gnupg_mail.Message(theEMail.decode(), settings=gnupg_mail.Settings(log_level='debug',require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY))
cnt = email.message_from_bytes(theEMail, policy=email.policy.default)
frm = cnt.get("From") or "someone" frm = cnt.get("From") or "someone"
subject = cnt.get("Subject") or "your mail" subject = cnt.get("Subject") or "your mail"
ref = cnt.get("Message-ID") or "" ref = cnt.get("Message-ID") or ""
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
return cnt, frm, subject, ref, to return gpgmail, cnt, frm, subject, ref, to
def check_mail(cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
results = [] results = []
# sentinel # sentinel
results.append([(None, [cnt])]) results.append([(None, [cnt])])
lvl = 0 lvl = 0
for check in gen_checks(submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key): for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
lvl += 1 lvl += 1
curr = [] curr = []
curc = [] curc = []
@ -239,6 +299,9 @@ if __name__ == '__main__':
parser.add_argument('--skip-public-key', action="store_true", parser.add_argument('--skip-public-key', action="store_true",
help="enable if you want to skip public key discovery through attachments") help="enable if you want to skip public key discovery through attachments")
parser.add_argument('--issue-thunderbird91', action="store_true",
help="enable issue report for thunderbird91")
parser.add_argument('--beta', action="store_true", parser.add_argument('--beta', action="store_true",
help="enable beta features") help="enable beta features")
@ -262,5 +325,8 @@ if __name__ == '__main__':
REVIEW_BEFORE_SEND = args.review_before_send REVIEW_BEFORE_SEND = args.review_before_send
BETA = args.beta BETA = args.beta
cnt, frm, subject, ref, to = readmail(sys.stdin.buffer) gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
respondmail(frm, subject, ref, [c for c in check_mail(cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to) if args.issue_thunderbird91:
respondissueemail(frm, subject, ref, initial_to=to)
else:
respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to)

View File

@ -117,3 +117,13 @@ def check(msg, GNUPG_DIRECTORY, accept_public_key=True, beta=False):
yield part.get_payload(decode=True) yield part.get_payload(decode=True)
lpart = part lpart = part
def skip(msg, gpgmail):
ct = msg.get_content_type()
data = msg.get_payload(0)
_, verify = gpgmail.gnupg
yield MailTest("Message treated as OpenPGP signed message")
yield (data, verify.username)