Compare commits

..

No commits in common. "8f7d6e79c58a7aec7e53cdd8e2ca0f0d25818d08" and "65fb49d0804c9af121d52413b9e81aaa3a466de6" have entirely different histories.

6 changed files with 79 additions and 160 deletions

View File

@ -123,7 +123,7 @@ def extract(cnt, dest=None):
if dest is not None: if dest is not None:
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1 nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
if nsub > 1: if nsub > 1:
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1) yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
else: else:
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1) yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
if dest != ldest: if dest != ldest:

106
check.py
View File

@ -34,72 +34,23 @@ def relatesTo(data, submissions_dir):
yield data yield data
def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): def gen_checks(submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
if check_content: if check_content:
yield (relatesTo, [submissions_dir]) yield (relatesTo, [submissions_dir])
else:
yield signcheck
if gpgmail is None or not gpgmail.valid:
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
yield (signature.check, [GNUPG_DIRECTORY])
else:
yield (envelope.skip, [gpgmail])
yield (login.check, ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"])
if check_content:
if HARD_MAX_SUBMISSION is not None: if HARD_MAX_SUBMISSION is not None:
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION]) yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
else:
yield signcheck
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
yield (signature.check, [GNUPG_DIRECTORY])
yield (login.check, ["/home/nemunaire/workspace/check_mail/SRS2017.csv"])
if check_content:
yield archive.find yield archive.find
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] ) yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
yield archive.guess_mime yield archive.guess_mime
yield ( archive.extract, [submissions_dir] ) yield ( archive.extract, [submissions_dir] )
def respondissueemail(to, subject, ref, initial_to=None):
from email.message import EmailMessage
msg = EmailMessage()
msg["X-loop"] = "peret"
msg["From"] = FROM
msg["To"] = to
if ref is not None:
msg["References"] = ref
msg["In-Reply-To"] = ref
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
msg.set_content("""Hi!
This is the automatic e-mail analyzer in charge of checking your work.
I am currently facing a problem to analyze your e-mail.
You'll automatically receive a new response as soon as the problem is corrected.
You can continue to submit newer submissions if needed, the last one will be kept, as usual.
Sincerely,
-- \nAutomatic e-mail checker
running for nemunaire@nemunai.re""")
import smtplib
with smtplib.SMTP("localhost") as smtp:
smtp.starttls()
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
print(msg.as_string())
if REVIEW_BEFORE_SEND:
import time
for i in range(15):
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
if SEND_TO_REALUSER:
smtp.send_message(msg)
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
def respondmail(to, subject, ref, checks, initial_to=None): def respondmail(to, subject, ref, checks, initial_to=None):
from email.message import EmailMessage from email.message import EmailMessage
@ -114,10 +65,10 @@ def respondmail(to, subject, ref, checks, initial_to=None):
if maxitem[0] < lvl: if maxitem[0] < lvl:
maxitem = item maxitem = item
if final_decision == "ACCEPT" or final_decision == "SKIP": if final_decision == "ACCEPT" or final_decision == "SKIP":
return respondmail(to, subject, ref, [(lvl, tests, final_decision)], initial_to) return respondmail(to, subject, ref, [(lvl, tests, final_decision)])
# Display the most upper error # Display the most upper error
if not ALTERNATE_RESOLUTIONS: if not ALTERNATE_RESOLUTIONS:
return respondmail(to, subject, ref, [maxitem], initial_to) return respondmail(to, subject, ref, [maxitem])
msg = EmailMessage() msg = EmailMessage()
msg["X-loop"] = "peret" msg["X-loop"] = "peret"
@ -134,12 +85,7 @@ def respondmail(to, subject, ref, checks, initial_to=None):
for lvl, tests, final_decision in checks: for lvl, tests, final_decision in checks:
if fmt != "": if fmt != "":
fmt += '\n============ There is also another resolution: ============\n\n' fmt += '\n============ There is also another resolution: ============\n\n'
lasttest = None
for test in tests: for test in tests:
if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS:
continue
else:
lasttest = test.title
fmt += test.valuestr fmt += test.valuestr
fmt += test.title fmt += test.title
fmt += '\n' fmt += '\n'
@ -170,7 +116,6 @@ running for nemunaire@nemunai.re""")
with smtplib.SMTP("localhost") as smtp: with smtplib.SMTP("localhost") as smtp:
smtp.starttls() smtp.starttls()
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND: if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
print(msg.as_string()) print(msg.as_string())
if REVIEW_BEFORE_SEND: if REVIEW_BEFORE_SEND:
import time import time
@ -184,32 +129,23 @@ running for nemunaire@nemunai.re""")
def readmail(fp): def readmail(fp):
import gnupg cnt = email.message_from_binary_file(fp, policy=email.policy.default)
import gnupg_mail
theEMail = fp.read()
try:
gpgmail = gnupg_mail.Message(theEMail.decode(), settings=gnupg_mail.Settings(log_level='debug',require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY))
except:
gpgmail = None
cnt = email.message_from_bytes(theEMail, policy=email.policy.default)
frm = cnt.get("From") or "someone" frm = cnt.get("From") or "someone"
subject = cnt.get("Subject") or "your mail" subject = cnt.get("Subject") or "your mail"
ref = cnt.get("Message-ID") or "" ref = cnt.get("Message-ID") or ""
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
return gpgmail, cnt, frm, subject, ref, to return cnt, frm, subject, ref, to
def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True): def check_mail(cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
results = [] results = []
# sentinel # sentinel
results.append([(None, [cnt])]) results.append([(None, [cnt])])
lvl = 0 lvl = 0
for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key): for check in gen_checks(submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
lvl += 1 lvl += 1
curr = [] curr = []
curc = [] curc = []
@ -272,9 +208,6 @@ if __name__ == '__main__':
import argparse import argparse
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-R", "--refresh-keys", action="store_true",
help="refresh GnuPG keyring")
parser.add_argument("-s", "--sign", action="store_true", parser.add_argument("-s", "--sign", action="store_true",
help="limit check to signature") help="limit check to signature")
@ -302,9 +235,6 @@ if __name__ == '__main__':
parser.add_argument('--skip-public-key', action="store_true", parser.add_argument('--skip-public-key', action="store_true",
help="enable if you want to skip public key discovery through attachments") help="enable if you want to skip public key discovery through attachments")
parser.add_argument('--issue-thunderbird91', action="store_true",
help="enable issue report for thunderbird91")
parser.add_argument('--beta', action="store_true", parser.add_argument('--beta', action="store_true",
help="enable beta features") help="enable beta features")
@ -313,9 +243,6 @@ if __name__ == '__main__':
args = parser.parse_args() args = parser.parse_args()
if args.refresh_keys:
subprocess.Popen(["gpg", "--homedir=" + GNUPG_DIRECTORY, "--batch", "--refresh-keys"])
if not args.skip_max_submission: if not args.skip_max_submission:
with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f: with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z") SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode().replace(":", ""), "%Y-%m-%dT%H%M%S%z")
@ -328,8 +255,5 @@ if __name__ == '__main__':
REVIEW_BEFORE_SEND = args.review_before_send REVIEW_BEFORE_SEND = args.review_before_send
BETA = args.beta BETA = args.beta
gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer) cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
if args.issue_thunderbird91: respondmail(frm, subject, ref, [c for c in check_mail(cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], to)
respondissueemail(frm, subject, ref, initial_to=to)
else:
respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to)

View File

@ -18,8 +18,7 @@ def import_pubkey(key, GNUPG_DIRECTORY):
yield MailTest("New PGP key successfully imported:", details=gpg_output) yield MailTest("New PGP key successfully imported:", details=gpg_output)
yield False yield False
else: else:
yield MailTest("An error occurs during PGP key importation:", 1, details=gpg_output) yield MailTest("An error occurs during PGP key importation:", details=gpg_output)
def assume_rfc3156(msg): def assume_rfc3156(msg):
@ -44,7 +43,7 @@ def assume_rfc3156(msg):
def assume_oldstyle(payload): def assume_oldstyle(payload):
yield MailTest("Found BEGIN PGP SIGNED MESSAGE: message treated as old style PGP email.") yield MailTest("Found BEGIN PGP SIGNED MESSAGE: message treated as old style PGP email.")
yield payload.encode() yield payload
@ -107,23 +106,4 @@ def check(msg, GNUPG_DIRECTORY, accept_public_key=True, beta=False):
yield MailTest("Standalone non-armored signature file discovered. Avoid using binary signature over SMTP (see RFC2015 #2. PGP data formats).", 2) yield MailTest("Standalone non-armored signature file discovered. Avoid using binary signature over SMTP (see RFC2015 #2. PGP data formats).", 2)
yield (lpart.get_payload(decode=True), part.get_payload(decode=True)) yield (lpart.get_payload(decode=True), part.get_payload(decode=True))
elif lpart is not None and part.get_filename() is not None and lpart.get_filename() is not None and lpart.get_filename()[:len(part.get_filename())] == part.get_filename():
yield MailTest("Standalone non-armored signature file discovered. Avoid using binary signature over SMTP (see RFC2015 #2. PGP data formats).", 2)
yield (part.get_payload(decode=True), lpart.get_payload(decode=True))
elif part.get_filename() is not None and (part.get_filename()[len(part.get_filename())-4:] == ".gpg" or part.get_filename()[len(part.get_filename())-4:] == ".asc") or (
payload is not None and not part.is_multipart() and part.get_payload(decode=True).find(b"-----BEGIN PGP MESSAGE-----") >= 0):
yield MailTest("Standalone PGP message discovered.")
yield part.get_payload(decode=True)
lpart = part lpart = part
def skip(msg, gpgmail):
ct = msg.get_content_type()
data = msg.get_payload(0)
_, verify = gpgmail.gnupg
yield MailTest("Message treated as OpenPGP signed message")
yield (data, verify.username)

View File

@ -37,14 +37,13 @@ class BadSignature:
class UncheckableSignature: class UncheckableSignature:
def __init__(self, keyid, pkalgo, hashalgo, sig_class, time, rc, longkeyid): def __init__(self, keyid, pkalgo, hashalgo, sig_class, time, rc):
self.keyid = keyid self.keyid = keyid
self.pkalgo = pkalgo self.pkalgo = pkalgo
self.hashalgo = hashalgo self.hashalgo = hashalgo
self.sig_class = sig_class self.sig_class = sig_class
self.time = time self.time = time
self.rc = rc self.rc = rc
self.longkeyid = longkeyid
class ValidSignature: class ValidSignature:

View File

@ -9,11 +9,10 @@ LOGIN_FIELD = 2
EMAILADDR_FIELD = 3 EMAILADDR_FIELD = 3
def check(cnt, *files): def check(cnt, file):
data, uname = cnt data, uname = cnt
username, address = parseaddr(uname) username, address = parseaddr(uname)
for file in files:
with open(file, encoding='utf-8') as fd: with open(file, encoding='utf-8') as fd:
people = csv.reader(fd) people = csv.reader(fd)
for p in people: for p in people:

View File

@ -39,40 +39,13 @@ def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""):
def check(cnt, GNUPG_DIRECTORY): def check(cnt, GNUPG_DIRECTORY):
for server in ["pool.sks-keyservers.net", "keys.openpgp.org"]:
if len(cnt) == 2: if len(cnt) == 2:
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server) yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
else: else:
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server) yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
def check_sign(cmd, bdata, fname, GNUPG_DIRECTORY, keyserver, windows_hack=False): def check_sep(data, sign, GNUPG_DIRECTORY):
with subprocess.Popen(["gpg",
"--homedir=" + GNUPG_DIRECTORY,
"--status-fd=1",
"--auto-key-retrieve",
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
"--keyserver=" + keyserver,
"--quiet",
"--batch",
cmd,
fname,
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
p.stdin.write(bdata)
p.stdin.close()
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
p.wait()
gpg_output = p.stderr.read()
gpg_rcode = p.returncode
if gpg_rcode != 0 and not windows_hack:
return check_sign(cmd, bdata.replace(b'\n', b'\r\n'), fname, GNUPG_DIRECTORY, keyserver, True) # Windows hack
return gpg_status, gpg_output, gpg_rcode
def check_sep(data, sign, GNUPG_DIRECTORY, keyserver):
gpg_output = "" gpg_output = ""
gpg_status = [] gpg_status = []
gpg_rcode = None gpg_rcode = None
@ -81,7 +54,31 @@ def check_sep(data, sign, GNUPG_DIRECTORY, keyserver):
f.write(sign) f.write(sign)
f.close() f.close()
gpg_status, gpg_output, gpg_rcode = check_sign("--verify", data if isinstance(data, bytes) else data.as_bytes(), f.name, GNUPG_DIRECTORY, keyserver) with subprocess.Popen(["gpg",
"--homedir=" + GNUPG_DIRECTORY,
"--status-fd=1",
"--auto-key-retrieve",
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
"--keyserver=pool.sks-keyservers.net",
"--quiet",
"--batch",
"--verify",
f.name,
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
if isinstance(data, bytes):
bdata = data
else:
bdata = data.as_bytes()
if not bdata.find(b'\r\n') >= 0:
bdata.replace(b'\n', b'\r\n') # Windows hack
p.stdin.write(bdata)
p.stdin.close()
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
p.wait()
gpg_output = p.stderr.read()
gpg_rcode = p.returncode
except Exception as e: except Exception as e:
yield MailTest("An error occured: %s" % e, 1) yield MailTest("An error occured: %s" % e, 1)
return return
@ -91,7 +88,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY, keyserver):
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace')) yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
def check_merged(bdata, GNUPG_DIRECTORY, keyserver): def check_merged(bdata, GNUPG_DIRECTORY):
f = tempfile.NamedTemporaryFile() f = tempfile.NamedTemporaryFile()
f.close() f.close()
@ -99,7 +96,27 @@ def check_merged(bdata, GNUPG_DIRECTORY, keyserver):
gpg_status = [] gpg_status = []
gpg_rcode = None gpg_rcode = None
try: try:
gpg_status, gpg_output, gpg_rcode = check_sign("--output", bdata, f.name, GNUPG_DIRECTORY, keyserver) with subprocess.Popen(["gpg",
"--homedir=" + GNUPG_DIRECTORY,
"--status-fd=1",
"--auto-key-retrieve",
"--auto-key-locate=clear,local,pka,dane,cert,keyserver",
"--keyserver=pool.sks-keyservers.net",
"--quiet",
"--batch",
"--output",
f.name,
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
#if not bdata.find('\r\n') >= 0:
# bdata = bdata.replace('\n', '\r\n') # Windows hack
p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata)
p.stdin.close()
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
p.wait()
gpg_output = p.stderr.read()
gpg_rcode = p.returncode
except Exception as e: except Exception as e:
yield MailTest("An error occured: %s" % e, 1) yield MailTest("An error occured: %s" % e, 1)
return return