Project done with some hardcoded string
This commit is contained in:
parent
7e929f3a32
commit
42409660a5
137
archive.py
Normal file
137
archive.py
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from email.message import Message
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from test import MailTest
|
||||||
|
|
||||||
|
|
||||||
|
def find(cnt):
|
||||||
|
data, login = cnt
|
||||||
|
def found(data):
|
||||||
|
mime = data.get_content_type()
|
||||||
|
if mime == "application/octet-stream":
|
||||||
|
mime = _guess_mime(data.get_payload(decode=True))
|
||||||
|
if mime == "application/x-xz":
|
||||||
|
fname = data.get_filename(login + ".tar.xz")
|
||||||
|
elif mime == "application/x-bzip" or mime == "application/x-bzip2":
|
||||||
|
fname = data.get_filename(login + ".tar.bz2")
|
||||||
|
elif mime == "application/x-gzip" or mime == "application/gzip":
|
||||||
|
fname = data.get_filename(login + ".tar.gz")
|
||||||
|
elif mime == "application/zip":
|
||||||
|
fname = data.get_filename(login + ".zip")
|
||||||
|
elif mime == "application/rar":
|
||||||
|
fname = data.get_filename(login + ".rar")
|
||||||
|
elif mime == "application/x-tar" or mime == "application/tar":
|
||||||
|
fname = data.get_filename(login + ".tar")
|
||||||
|
else:
|
||||||
|
fname = None
|
||||||
|
return data.get_payload(decode=True), fname
|
||||||
|
|
||||||
|
if isinstance(data, Message):
|
||||||
|
if not data.is_multipart():
|
||||||
|
data, fname = found(data)
|
||||||
|
yield MailTest("Tarball found: %s." % fname, -1)
|
||||||
|
else:
|
||||||
|
for part in data.walk():
|
||||||
|
data, fname = found(part)
|
||||||
|
if fname is not None:
|
||||||
|
yield MailTest("Tarball found: %s." % fname, -1)
|
||||||
|
break
|
||||||
|
yield (data, login)
|
||||||
|
|
||||||
|
|
||||||
|
def hash_archive(cnt, dest=None):
|
||||||
|
data, login = cnt
|
||||||
|
sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest()
|
||||||
|
yield MailTest("Your tarball SHA-1 is %s." % sha, -1)
|
||||||
|
if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)):
|
||||||
|
yield MailTest("You have already uploaded this tarball.", 1)
|
||||||
|
yield False
|
||||||
|
else:
|
||||||
|
yield (data, sha, login)
|
||||||
|
|
||||||
|
|
||||||
|
def _guess_mime(data):
|
||||||
|
with subprocess.Popen(["file",
|
||||||
|
"--brief",
|
||||||
|
"--mime-type",
|
||||||
|
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as p:
|
||||||
|
p.stdin.write(data.encode() if isinstance(data, str) else data)
|
||||||
|
p.stdin.close()
|
||||||
|
p.wait()
|
||||||
|
if p.returncode == 0:
|
||||||
|
return p.stdout.read().decode().strip()
|
||||||
|
|
||||||
|
|
||||||
|
def guess_mime(cnt):
|
||||||
|
data, sha, login = cnt
|
||||||
|
|
||||||
|
mime = _guess_mime(data)
|
||||||
|
if mime is not None:
|
||||||
|
yield MailTest("Guessed content-type of your submission: %s." % mime,
|
||||||
|
0 if mime.find("application/x-") == 0 else 2)
|
||||||
|
else:
|
||||||
|
mime = "application/x-tar"
|
||||||
|
yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime)
|
||||||
|
|
||||||
|
yield data, mime, sha, login
|
||||||
|
|
||||||
|
|
||||||
|
def extract(cnt, dest=None):
|
||||||
|
data, type, sha, login = cnt
|
||||||
|
|
||||||
|
if dest is not None:
|
||||||
|
odest = dest
|
||||||
|
os.makedirs(odest, exist_ok=True)
|
||||||
|
ldest = os.path.join(dest, login)
|
||||||
|
dest = os.path.join(dest, login + "." + sha)
|
||||||
|
if os.path.exists(dest):
|
||||||
|
yield MailTest("You have already uploaded this tarball.", 1)
|
||||||
|
return
|
||||||
|
last_dest = os.readlink(ldest) if os.path.exists(ldest) else None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with tempfile.TemporaryDirectory() as temp:
|
||||||
|
with subprocess.Popen(["tar", "--no-same-owner", "--no-same-permissions",
|
||||||
|
("-xvC" + temp) if dest is not None else "-t",
|
||||||
|
"-z" if type == "application/x-gzip" else "",
|
||||||
|
"-J" if type == "application/x-xz" else "",
|
||||||
|
"-j" if type in ["application/x-bzip", "application/x-bzip2"] else "",
|
||||||
|
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
||||||
|
p.stdin.write(data.encode() if isinstance(data, str) else data)
|
||||||
|
p.stdin.close()
|
||||||
|
p.wait()
|
||||||
|
err = p.stdout.read().decode()
|
||||||
|
err += p.stderr.read().decode()
|
||||||
|
if p.returncode == 0:
|
||||||
|
if dest is not None:
|
||||||
|
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
|
||||||
|
if nsub > 1:
|
||||||
|
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
|
||||||
|
else:
|
||||||
|
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
|
||||||
|
if dest != ldest:
|
||||||
|
if os.path.lexists(ldest):
|
||||||
|
os.remove(ldest)
|
||||||
|
os.symlink(os.path.basename(dest), ldest)
|
||||||
|
if len(os.listdir(temp)) == 1:
|
||||||
|
shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest)
|
||||||
|
else:
|
||||||
|
shutil.move(temp, dest)
|
||||||
|
yield MailTest("Archive successfully extracted.", details=err)
|
||||||
|
yield dest
|
||||||
|
else:
|
||||||
|
yield MailTest("An error occured during archive extraction:", 1, details=err)
|
||||||
|
|
||||||
|
if not os.path.exists(temp):
|
||||||
|
os.makedirs(temp)
|
||||||
|
except PermissionError:
|
||||||
|
if os.path.lexists(ldest):
|
||||||
|
os.remove(ldest)
|
||||||
|
if last_dest is not None:
|
||||||
|
os.symlink(last_dest, ldest)
|
||||||
|
yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1)
|
229
check.py
Executable file
229
check.py
Executable file
@ -0,0 +1,229 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
import email
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import stat
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
GNUPG_DIRECTORY = "~/.gnupg-checker"
|
||||||
|
SOFT_MAX_SUBMISSION = None
|
||||||
|
HARD_MAX_SUBMISSION = None
|
||||||
|
FROM = "Automatic VIRLI Mail Checker <virli@nemunai.re>"
|
||||||
|
SEND_TO_REALUSER = False
|
||||||
|
BETA = False
|
||||||
|
ALTERNATE_RESOLUTIONS = False
|
||||||
|
|
||||||
|
|
||||||
|
import archive
|
||||||
|
import envelope
|
||||||
|
import late
|
||||||
|
import login
|
||||||
|
import signature
|
||||||
|
from test import MailTest
|
||||||
|
|
||||||
|
def signcheck(data):
|
||||||
|
yield MailTest("Those tests are limited to signature checking. THIS IS NOT THE SUBMISSION INTERFACE.", 2)
|
||||||
|
yield data
|
||||||
|
|
||||||
|
|
||||||
|
def gen_checks(submissions_dir, check_content=False):
|
||||||
|
if HARD_MAX_SUBMISSION is not None and check_content:
|
||||||
|
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
|
||||||
|
else:
|
||||||
|
yield signcheck
|
||||||
|
yield (envelope.check, [GNUPG_DIRECTORY, BETA])
|
||||||
|
yield (signature.check, [GNUPG_DIRECTORY])
|
||||||
|
yield (login.check, ["/home/nemunaire/workspace/check_mail/SRS2017.csv"])
|
||||||
|
if check_content:
|
||||||
|
yield archive.find
|
||||||
|
yield ( archive.hash_archive, [submissions_dir] )
|
||||||
|
yield archive.guess_mime
|
||||||
|
yield ( archive.extract, [submissions_dir] )
|
||||||
|
|
||||||
|
|
||||||
|
def respondmail(to, subject, ref, checks):
|
||||||
|
from email.message import EmailMessage
|
||||||
|
|
||||||
|
if not isinstance(checks, list):
|
||||||
|
checks = [c for c in checks]
|
||||||
|
|
||||||
|
# Show only the first message if there is one ACCEPT
|
||||||
|
if len(checks) > 1 and not (ALTERNATE_RESOLUTIONS and BETA):
|
||||||
|
maxitem = checks[0]
|
||||||
|
for item in checks:
|
||||||
|
lvl, tests, final_decision = item
|
||||||
|
if maxitem[0] < lvl:
|
||||||
|
maxitem = item
|
||||||
|
if final_decision == "ACCEPT" or final_decision == "SKIP":
|
||||||
|
return respondmail(to, subject, ref, [(lvl, tests, final_decision)])
|
||||||
|
# Display the most upper error
|
||||||
|
if not ALTERNATE_RESOLUTIONS:
|
||||||
|
return respondmail(to, subject, ref, [maxitem])
|
||||||
|
|
||||||
|
msg = EmailMessage()
|
||||||
|
msg["X-loop"] = "virli"
|
||||||
|
msg["From"] = FROM
|
||||||
|
msg["To"] = to
|
||||||
|
if ref is not None:
|
||||||
|
msg["References"] = ref
|
||||||
|
msg["In-Reply-To"] = ref
|
||||||
|
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
|
||||||
|
|
||||||
|
test = None
|
||||||
|
final_decision = "REJECT"
|
||||||
|
fmt = ""
|
||||||
|
for lvl, tests, final_decision in checks:
|
||||||
|
if fmt != "":
|
||||||
|
fmt += '\n============ There is also another resolution: ============\n\n'
|
||||||
|
for test in tests:
|
||||||
|
fmt += test.valuestr
|
||||||
|
fmt += test.title
|
||||||
|
fmt += '\n'
|
||||||
|
if test is not None and test.details is not None and test.details != "":
|
||||||
|
fmt += '\n'
|
||||||
|
fmt += test.details.strip()
|
||||||
|
fmt += '\n'
|
||||||
|
|
||||||
|
if final_decision == "SKIP":
|
||||||
|
fmt += '\nI stopped here the email analysis.'
|
||||||
|
else:
|
||||||
|
fmt += '\nAfter analyzing your e-mail, I\'ve decided to ' + final_decision + ' it.'
|
||||||
|
|
||||||
|
msg.set_content("""Hi!
|
||||||
|
|
||||||
|
This is the automatic e-mail analyzer in charge of checking your work.
|
||||||
|
|
||||||
|
Here is the detailed report for your submission:
|
||||||
|
|
||||||
|
""" + fmt + """
|
||||||
|
|
||||||
|
Sincerely,
|
||||||
|
|
||||||
|
-- \nAutomatic e-mail checker
|
||||||
|
running for nemunaire@nemunai.re""")
|
||||||
|
|
||||||
|
import smtplib
|
||||||
|
with smtplib.SMTP("localhost") as smtp:
|
||||||
|
if SEND_TO_REALUSER:
|
||||||
|
smtp.send_message(msg)
|
||||||
|
smtp.send_message(msg, to_addrs=["virli-report@nemunai.re"])
|
||||||
|
else:
|
||||||
|
print(msg.as_string())
|
||||||
|
|
||||||
|
|
||||||
|
def readmail(fp):
|
||||||
|
cnt = email.message_from_binary_file(fp)
|
||||||
|
frm = cnt.get("From") or "someone"
|
||||||
|
subject = cnt.get("Subject") or "your mail"
|
||||||
|
ref = cnt.get("Message-ID") or ""
|
||||||
|
|
||||||
|
return cnt, frm, subject, ref
|
||||||
|
|
||||||
|
|
||||||
|
def check_mail(cnt, submissions_dir, check_content=False):
|
||||||
|
results = []
|
||||||
|
|
||||||
|
# sentinel
|
||||||
|
results.append([(None, [cnt])])
|
||||||
|
|
||||||
|
lvl = 0
|
||||||
|
for check in gen_checks(submissions_dir=submissions_dir, check_content=check_content):
|
||||||
|
lvl += 1
|
||||||
|
curr = []
|
||||||
|
curc = []
|
||||||
|
|
||||||
|
for parent in range(len(results[-1])):
|
||||||
|
cnt = results[-1][parent][1][-1]
|
||||||
|
|
||||||
|
if cnt is True or cnt is None or cnt is False:
|
||||||
|
res = []
|
||||||
|
for i in range(len(results) - 1, 0, -1):
|
||||||
|
for m in reversed(results[i][parent][1][:-1]):
|
||||||
|
res.append(m)
|
||||||
|
parent = results[i][parent][0]
|
||||||
|
res.reverse()
|
||||||
|
if cnt is None: final = "REJECT"
|
||||||
|
elif cnt is False: final = "SKIP"
|
||||||
|
else: final = "ACCEPT"
|
||||||
|
yield lvl, res, final
|
||||||
|
continue
|
||||||
|
|
||||||
|
if isinstance(check, tuple):
|
||||||
|
chk, params = check
|
||||||
|
res = chk(cnt, *params)
|
||||||
|
else:
|
||||||
|
res = check(cnt)
|
||||||
|
|
||||||
|
for r in res:
|
||||||
|
curc.append(r)
|
||||||
|
if not isinstance(r, MailTest):
|
||||||
|
curr.append((parent, curc))
|
||||||
|
curc = []
|
||||||
|
|
||||||
|
if len(curc) > 0:
|
||||||
|
curc.append(None)
|
||||||
|
curr.append((parent, curc))
|
||||||
|
curc = []
|
||||||
|
results.append(curr)
|
||||||
|
|
||||||
|
for parent in range(len(results[-1])):
|
||||||
|
res = []
|
||||||
|
cnt = results[-1][parent][1][-1]
|
||||||
|
for i in range(len(results) - 1, 0, -1):
|
||||||
|
for m in reversed(results[i][parent][1][:-1]):
|
||||||
|
res.append(m)
|
||||||
|
parent = results[i][parent][0]
|
||||||
|
res.reverse()
|
||||||
|
if cnt is None: final = "REJECT"
|
||||||
|
elif cnt is False: final = "SKIP"
|
||||||
|
else: final = "ACCEPT"
|
||||||
|
yield len(results), res, final
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
os.umask(~(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR))
|
||||||
|
GNUPG_DIRECTORY = os.path.expanduser(GNUPG_DIRECTORY)
|
||||||
|
if not os.path.isdir(GNUPG_DIRECTORY):
|
||||||
|
os.mkdir(GNUPG_DIRECTORY)
|
||||||
|
|
||||||
|
# Parse command line arguments
|
||||||
|
import argparse
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
|
parser.add_argument("-s", "--sign", action="store_true",
|
||||||
|
help="limit check to signature")
|
||||||
|
|
||||||
|
parser.add_argument("--real-send", action="store_true",
|
||||||
|
help="Effectively sent mail to real users")
|
||||||
|
|
||||||
|
parser.add_argument('--soft-max-submission', default="thursday 8:42",
|
||||||
|
help="allow submission until this hour")
|
||||||
|
|
||||||
|
parser.add_argument('--hard-max-submission', default="thursday 9:21",
|
||||||
|
help="allow submission until this hour")
|
||||||
|
|
||||||
|
parser.add_argument('--submissions', default="/tmp/rendus",
|
||||||
|
help="directory where store submissions")
|
||||||
|
|
||||||
|
parser.add_argument('--beta', action="store_true",
|
||||||
|
help="enable beta features")
|
||||||
|
|
||||||
|
parser.add_argument('--alternate-resolutions', action="store_true",
|
||||||
|
help="enable if you want to display alternate resolutions")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
with subprocess.Popen(["date", "-d", args.soft_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
|
||||||
|
SOFT_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode(), "%Y-%m-%dT%H:%M:%S%z")
|
||||||
|
|
||||||
|
with subprocess.Popen(["date", "-d", args.hard_max_submission, "-u", "-Iseconds"], stdout=subprocess.PIPE) as f:
|
||||||
|
HARD_MAX_SUBMISSION = datetime.strptime(f.stdout.read().strip().decode(), "%Y-%m-%dT%H:%M:%S%z")
|
||||||
|
|
||||||
|
ALTERNATE_RESOLUTIONS = args.alternate_resolutions
|
||||||
|
SEND_TO_REALUSER = args.real_send
|
||||||
|
BETA = args.beta
|
||||||
|
|
||||||
|
cnt, frm, subject, ref = readmail(sys.stdin.buffer)
|
||||||
|
respondmail(frm, subject, ref, [c for c in check_mail(cnt, submissions_dir=args.submissions, check_content=not args.sign)])
|
94
envelope.py
Normal file
94
envelope.py
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from test import MailTest
|
||||||
|
|
||||||
|
|
||||||
|
def import_pubkey(key, GNUPG_DIRECTORY):
|
||||||
|
with subprocess.Popen(["gpg",
|
||||||
|
"--homedir=" + GNUPG_DIRECTORY,
|
||||||
|
"--batch",
|
||||||
|
"--import",
|
||||||
|
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
||||||
|
p.stdin.write(key)
|
||||||
|
p.stdin.close()
|
||||||
|
p.wait()
|
||||||
|
gpg_output = p.stderr.read().decode()
|
||||||
|
if p.returncode == 0:
|
||||||
|
yield MailTest("New PGP key successfully imported:", details=gpg_output)
|
||||||
|
yield False
|
||||||
|
else:
|
||||||
|
yield MailTest("An error occurs during PGP key importation:", details=gpg_output)
|
||||||
|
|
||||||
|
|
||||||
|
def assume_rfc3156(msg):
|
||||||
|
if msg.get_param("protocol") is None or msg.get_param("protocol") != "application/pgp-signature" or msg.get_payload(1).get_content_type() != "application/pgp-signature":
|
||||||
|
yield MailTest("Message treated as RFC3156 due to Content-Type, but is not compliant", 1)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Extracting signature
|
||||||
|
try:
|
||||||
|
data = msg.get_payload(0)
|
||||||
|
sign = msg.get_payload(1).get_payload().encode()
|
||||||
|
|
||||||
|
# Except an exception in the two above lines if one part doesn't exist
|
||||||
|
yield MailTest("Message treated as RFC3156: content and signature found")
|
||||||
|
|
||||||
|
yield (data, sign)
|
||||||
|
|
||||||
|
except IndexError:
|
||||||
|
yield MailTest("Message treated as RFC3156 due to Content-Type, but is not compliant", 1)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def assume_oldstyle(payload):
|
||||||
|
yield MailTest("Found BEGIN PGP SIGNED MESSAGE: message treated as old style PGP email.")
|
||||||
|
yield payload
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def check(msg, GNUPG_DIRECTORY, beta=False):
|
||||||
|
ct = msg.get_content_type()
|
||||||
|
|
||||||
|
# First, looking for public key
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.get_content_type() == "application/pgp-keys":
|
||||||
|
yield from import_pubkey(part.get_payload(decode=True), GNUPG_DIRECTORY)
|
||||||
|
return
|
||||||
|
|
||||||
|
if ct == "multipart/signed" and msg.is_multipart():
|
||||||
|
yield from assume_rfc3156(msg)
|
||||||
|
|
||||||
|
else:
|
||||||
|
yield MailTest("This is not a signed e-mail: %s." % ct, 1)
|
||||||
|
|
||||||
|
if ct == "multipart/encrypted":
|
||||||
|
yield MailTest("As an automated service, I can't access my owner's private key. Please resend your email, unencrypted but signed.", -1)
|
||||||
|
return
|
||||||
|
|
||||||
|
from archive import _guess_mime
|
||||||
|
|
||||||
|
# Looking for signed content
|
||||||
|
for part in msg.walk():
|
||||||
|
payload = part.get_payload()
|
||||||
|
if part.get_content_type() == "application/pgp-signature":
|
||||||
|
p = [x for x in msg.walk()]
|
||||||
|
for s in range(len(p) - 1, -1, -1):
|
||||||
|
spart = p[s]
|
||||||
|
if part is not spart and not spart.is_multipart():
|
||||||
|
yield MailTest("Separate signature found. Trying it with part %d (%s) ..." % (s, spart.get_content_type()), -1)
|
||||||
|
yield (spart.get_payload(decode=True), part.get_payload(decode=True))
|
||||||
|
|
||||||
|
elif beta and part.get_content_type() == "application/octet-stream":
|
||||||
|
yield MailTest("Entering BETA feature of walking through mail part, looking for a submission.", 2)
|
||||||
|
print(_guess_mime(part.get_payload(decode=True)))
|
||||||
|
yield from check_binary(part.as_bytes() if part.is_multipart() else part.get_payload(decode=True))
|
||||||
|
|
||||||
|
elif payload is not None and not part.is_multipart() and part.get_payload(decode=True).find(b"-----BEGIN PGP SIGNED MESSAGE-----") >= 0:
|
||||||
|
res = re.match(".*(-----BEGIN PGP SIGNED MESSAGE-----(.*)-----BEGIN PGP SIGNATURE-----(.*)-----END PGP SIGNATURE-----).*", payload, re.DOTALL)
|
||||||
|
if res is not None:
|
||||||
|
yield from assume_oldstyle(payload)
|
||||||
|
else:
|
||||||
|
res = re.match(b".*(-----BEGIN PGP SIGNED MESSAGE-----(.*)-----BEGIN PGP SIGNATURE-----(.*)-----END PGP SIGNATURE-----).*", part.get_payload(decode=True), re.DOTALL)
|
||||||
|
if res is not None:
|
||||||
|
yield from assume_oldstyle(part.get_payload(decode=True))
|
157
gpg_status_parser.py
Normal file
157
gpg_status_parser.py
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
class GoodSignature:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid_or_fpr, *username):
|
||||||
|
self.long_keyid_or_fpr = long_keyid_or_fpr
|
||||||
|
self.username = " ".join(username)
|
||||||
|
|
||||||
|
|
||||||
|
class ExpiredSignature:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid_or_fpr, *username):
|
||||||
|
self.long_keyid_or_fpr = long_keyid_or_fpr
|
||||||
|
self.username = " ".join(username)
|
||||||
|
|
||||||
|
|
||||||
|
class ExpiredKey:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid_or_fpr, *username):
|
||||||
|
self.long_keyid_or_fpr = long_keyid_or_fpr
|
||||||
|
self.username = " ".join(username)
|
||||||
|
|
||||||
|
|
||||||
|
class RevokedKey:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid_or_fpr, *username):
|
||||||
|
self.long_keyid_or_fpr = long_keyid_or_fpr
|
||||||
|
self.username = " ".join(username)
|
||||||
|
|
||||||
|
|
||||||
|
class BadSignature:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid_or_fpr, *username):
|
||||||
|
self.long_keyid_or_fpr = long_keyid_or_fpr
|
||||||
|
self.username = " ".join(username)
|
||||||
|
|
||||||
|
|
||||||
|
class UncheckableSignature:
|
||||||
|
|
||||||
|
def __init__(self, keyid, pkalgo, hashalgo, sig_class, time, rc):
|
||||||
|
self.keyid = keyid
|
||||||
|
self.pkalgo = pkalgo
|
||||||
|
self.hashalgo = hashalgo
|
||||||
|
self.sig_class = sig_class
|
||||||
|
self.time = time
|
||||||
|
self.rc = rc
|
||||||
|
|
||||||
|
|
||||||
|
class ValidSignature:
|
||||||
|
|
||||||
|
def __init__(self, fingerprint_in_hex, sig_creation_date, sig_timestamp, expire_timestamp, sig_version, reserved, pubkey_algo, hash_algo, sig_class, primary_key_fpr=None):
|
||||||
|
self.fingerprint_in_hex = fingerprint_in_hex
|
||||||
|
self.sig_creation_date = sig_creation_date
|
||||||
|
self.sig_timestamp = sig_timestamp
|
||||||
|
self.expire_timestamp = expire_timestamp
|
||||||
|
self.sig_version = sig_version
|
||||||
|
self.reserved = reserved
|
||||||
|
self.pubkey_algo = pubkey_algo
|
||||||
|
self.hash_algo = hash_algo
|
||||||
|
self.sig_class = sig_class
|
||||||
|
self.primary_key_fpr = primary_key_fpr
|
||||||
|
|
||||||
|
|
||||||
|
class SignatureID:
|
||||||
|
|
||||||
|
def __init__(self, radix64_string, sig_creation_date, sig_timestamp):
|
||||||
|
self.radix64_string = radix64_string
|
||||||
|
self.sig_creation_date = sig_creation_date
|
||||||
|
self.sig_timestamp = sig_timestamp
|
||||||
|
|
||||||
|
|
||||||
|
class EncryptedTo:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid, keytype, keylength):
|
||||||
|
self.long_keyid = long_keyid
|
||||||
|
self.keytype = keytype
|
||||||
|
self.keylength = keylength
|
||||||
|
|
||||||
|
|
||||||
|
class NoPublicKey:
|
||||||
|
|
||||||
|
def __init__(self, long_keyid):
|
||||||
|
self.long_keyid = long_keyid
|
||||||
|
|
||||||
|
|
||||||
|
class _Trust:
|
||||||
|
|
||||||
|
def __init__(self, flag, validation_model=None):
|
||||||
|
self.flag = flag
|
||||||
|
self.validation_model = validation_model
|
||||||
|
|
||||||
|
class TrustUndefined(_Trust):
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
class TrustNever(_Trust):
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "never"
|
||||||
|
|
||||||
|
class TrustMarginal(_Trust):
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "marginal"
|
||||||
|
|
||||||
|
class TrustFully(_Trust):
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "fully"
|
||||||
|
|
||||||
|
class TrustUltimate(_Trust):
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "ultimate"
|
||||||
|
|
||||||
|
|
||||||
|
_keywords = {
|
||||||
|
"GOODSIG": GoodSignature,
|
||||||
|
"EXPSIG": ExpiredSignature,
|
||||||
|
"EXPKEYSIG": ExpiredKey,
|
||||||
|
"REVKEYSIG": RevokedKey,
|
||||||
|
"BADSIG": BadSignature,
|
||||||
|
"ERRSIG": UncheckableSignature,
|
||||||
|
"VALIDSIG": ValidSignature,
|
||||||
|
"SIG_ID": SignatureID,
|
||||||
|
"ENC_TO": EncryptedTo,
|
||||||
|
"NO_PUBKEY": NoPublicKey,
|
||||||
|
|
||||||
|
"TRUST_UNDEFINED": TrustUndefined,
|
||||||
|
"TRUST_NEVER": TrustNever,
|
||||||
|
"TRUST_MARGINAL": TrustMarginal,
|
||||||
|
"TRUST_FULLY": TrustFully,
|
||||||
|
"TRUST_ULTIMATE": TrustUltimate,
|
||||||
|
}
|
||||||
|
|
||||||
|
def parse(fd):
|
||||||
|
line = fd.readline()
|
||||||
|
context = {}
|
||||||
|
while line:
|
||||||
|
res = re.match(r"^\[GNUPG:\] (?P<keyword>\S+)(?: (?P<args>.*))?$", line.decode())
|
||||||
|
if res is not None:
|
||||||
|
keyword = res.group("keyword")
|
||||||
|
args = res.group("args").split(" ") if res.group("args") is not None else []
|
||||||
|
|
||||||
|
if keyword == "NEWSIG":
|
||||||
|
if len(context) > 0:
|
||||||
|
yield context
|
||||||
|
context = {}
|
||||||
|
|
||||||
|
elif keyword in _keywords:
|
||||||
|
if keyword[:5] == "TRUST":
|
||||||
|
context["TRUST"] = _keywords[keyword](*args)
|
||||||
|
context[keyword] = _keywords[keyword](*args)
|
||||||
|
line = fd.readline()
|
||||||
|
|
||||||
|
yield context
|
20
late.py
Normal file
20
late.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from test import MailTest
|
||||||
|
|
||||||
|
|
||||||
|
def check(cnt, hard_max_submission_date, soft_max_submission_date=None):
|
||||||
|
if soft_max_submission_date is None:
|
||||||
|
soft_max_submission_date = hard_max_submission_date
|
||||||
|
else:
|
||||||
|
soft_max_submission_date = min(soft_max_submission_date, hard_max_submission_date)
|
||||||
|
|
||||||
|
now = datetime.now().replace(tzinfo=timezone.utc)
|
||||||
|
yield MailTest("We are %s, submission permitted until %s" % (now.strftime("%c"), soft_max_submission_date.strftime("%c")), -1)
|
||||||
|
if now > soft_max_submission_date:
|
||||||
|
discard = now > hard_max_submission_date
|
||||||
|
yield MailTest("Submissions have been closed since %s minute(s)" % int((now - soft_max_submission_date).total_seconds()/60), 1 if discard else 2)
|
||||||
|
if discard:
|
||||||
|
return
|
||||||
|
|
||||||
|
yield cnt
|
19
login.py
Normal file
19
login.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import csv
|
||||||
|
from email.utils import parseaddr
|
||||||
|
|
||||||
|
from test import MailTest
|
||||||
|
|
||||||
|
|
||||||
|
def check(cnt, file):
|
||||||
|
data, uname = cnt
|
||||||
|
username, address = parseaddr(uname)
|
||||||
|
|
||||||
|
with open(file, encoding='utf-8') as fd:
|
||||||
|
people = csv.reader(fd)
|
||||||
|
for p in people:
|
||||||
|
if address.lower() == p[4].lower() or uname.lower().find(p[2].lower()) >= 0 or username.lower().replace(" ", "").find(p[0].lower().replace(" ", "")) >= 0 and username.lower().find(p[1].lower()) >= 0:
|
||||||
|
yield MailTest("Recognized as %s: %s %s." % (p[2], p[1], p[0]))
|
||||||
|
yield data, p[2]
|
||||||
|
return
|
||||||
|
|
||||||
|
yield MailTest("The username of your key is not explicit, I can't find you.", 1)
|
128
signature.py
Normal file
128
signature.py
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from test import MailTest
|
||||||
|
import gpg_status_parser
|
||||||
|
|
||||||
|
|
||||||
|
def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""):
|
||||||
|
if len(gpg_status) != 1:
|
||||||
|
yield MailTest("Too much status, please fill a bug report.", 1, details=gpg_output)
|
||||||
|
return
|
||||||
|
|
||||||
|
ctx = gpg_status[0]
|
||||||
|
|
||||||
|
if gpg_rcode == 0 and "VALIDSIG" in ctx:
|
||||||
|
yield MailTest("Signed with key: 0x%s, on %s" % (ctx["VALIDSIG"].fingerprint_in_hex[24:], datetime.fromtimestamp(int(ctx["VALIDSIG"].sig_timestamp))))
|
||||||
|
if "EXPKEYSIG" in ctx:
|
||||||
|
yield MailTest("Your key has expired.", 1)
|
||||||
|
return
|
||||||
|
if "EXPSIG" in ctx:
|
||||||
|
yield MailTest("The signature has expired.", 1)
|
||||||
|
return
|
||||||
|
if "TRUST_NEVER" in ctx:
|
||||||
|
yield MailTest("Your trust level is %s." % ctx["TRUST_NEVER"], 1)
|
||||||
|
return
|
||||||
|
if "GOODSIG" in ctx:
|
||||||
|
yield MailTest("Signature made by %s%s" % (ctx["GOODSIG"].username, (" [%s]" % ctx["TRUST"]) if "TRUST" in ctx else ""), -1)
|
||||||
|
if "TRUST_UNDEFINED" in ctx:
|
||||||
|
yield MailTest("Your trust level is %s. Consider asking other people to sign your key." % ctx["TRUST_UNDEFINED"], 2)
|
||||||
|
yield data, ctx["GOODSIG"].username
|
||||||
|
else:
|
||||||
|
yield MailTest("Bad signature. Here is the gnupg output:", 1, details=gpg_output.strip())
|
||||||
|
|
||||||
|
|
||||||
|
def check(cnt, GNUPG_DIRECTORY):
|
||||||
|
if len(cnt) == 2:
|
||||||
|
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
||||||
|
else:
|
||||||
|
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
||||||
|
|
||||||
|
|
||||||
|
def check_sep(data, sign, GNUPG_DIRECTORY):
|
||||||
|
gpg_output = ""
|
||||||
|
gpg_status = []
|
||||||
|
gpg_rcode = None
|
||||||
|
try:
|
||||||
|
f = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
f.write(sign)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
with subprocess.Popen(["gpg",
|
||||||
|
"--homedir=" + GNUPG_DIRECTORY,
|
||||||
|
"--status-fd=1",
|
||||||
|
"--auto-key-retrieve",
|
||||||
|
"--auto-key-locate=clear,local,pka,dane,cert,keyserver",
|
||||||
|
"--keyserver=pool.sks-keyservers.net",
|
||||||
|
"--quiet",
|
||||||
|
"--batch",
|
||||||
|
"--verify",
|
||||||
|
f.name,
|
||||||
|
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
||||||
|
if isinstance(data, bytes):
|
||||||
|
bdata = data
|
||||||
|
else:
|
||||||
|
bdata = data.as_bytes()
|
||||||
|
if not bdata.find(b'\r\n') >= 0:
|
||||||
|
bdata.replace(b'\n', b'\r\n') # Windows hack
|
||||||
|
p.stdin.write(bdata)
|
||||||
|
p.stdin.close()
|
||||||
|
|
||||||
|
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
||||||
|
p.wait()
|
||||||
|
gpg_output = p.stderr.read()
|
||||||
|
gpg_rcode = p.returncode
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
yield MailTest("An error occured: %s" % e, 1)
|
||||||
|
return
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
def check_merged(bdata, GNUPG_DIRECTORY):
|
||||||
|
f = tempfile.NamedTemporaryFile()
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
gpg_output = ""
|
||||||
|
gpg_status = []
|
||||||
|
gpg_rcode = None
|
||||||
|
try:
|
||||||
|
with subprocess.Popen(["gpg",
|
||||||
|
"--homedir=" + GNUPG_DIRECTORY,
|
||||||
|
"--status-fd=1",
|
||||||
|
"--auto-key-retrieve",
|
||||||
|
"--auto-key-locate=clear,local,pka,dane,cert,keyserver",
|
||||||
|
"--keyserver=pool.sks-keyservers.net",
|
||||||
|
"--quiet",
|
||||||
|
"--batch",
|
||||||
|
"--output",
|
||||||
|
f.name,
|
||||||
|
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
||||||
|
#if not bdata.find('\r\n') >= 0:
|
||||||
|
# bdata = bdata.replace('\n', '\r\n') # Windows hack
|
||||||
|
p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata)
|
||||||
|
p.stdin.close()
|
||||||
|
|
||||||
|
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
||||||
|
p.wait()
|
||||||
|
gpg_output = p.stderr.read()
|
||||||
|
gpg_rcode = p.returncode
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
yield MailTest("An error occured: %s" % e, 1)
|
||||||
|
return
|
||||||
|
|
||||||
|
if os.path.exists(f.name):
|
||||||
|
with open(f.name, 'rb') as fp:
|
||||||
|
bdata = fp.read()
|
||||||
|
os.unlink(f.name)
|
||||||
|
else:
|
||||||
|
bdata = None
|
||||||
|
|
||||||
|
yield from verify_sign(bdata, gpg_rcode, gpg_status, gpg_output.decode('utf-8'))
|
26
test.py
Normal file
26
test.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
class MailTest:
|
||||||
|
|
||||||
|
def __init__(self, title, value=0, details=None):
|
||||||
|
self._title = title
|
||||||
|
self._value = value
|
||||||
|
self._details = details
|
||||||
|
|
||||||
|
@property
|
||||||
|
def title(self):
|
||||||
|
return self._title
|
||||||
|
|
||||||
|
@property
|
||||||
|
def value(self):
|
||||||
|
return self._value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def valuestr(self):
|
||||||
|
if self._value < -1: return " "
|
||||||
|
elif self._value < 0: return "[ ] "
|
||||||
|
elif self._value == 0: return "[ OK ] "
|
||||||
|
elif self._value == 2: return "[WARN] "
|
||||||
|
elif self._value == 1: return "[FAIL] "
|
||||||
|
|
||||||
|
@property
|
||||||
|
def details(self):
|
||||||
|
return self._details
|
Reference in New Issue
Block a user