Peret checks PGP signed mails and extract submissions
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
6.4 KiB

from datetime import datetime
from email.message import Message
import hashlib
import os
import shutil
import subprocess
import tempfile
from test import MailTest
def find(cnt):
data, login = cnt
def found(data):
mime = data.get_content_type()
if mime == "application/octet-stream":
mime = _guess_mime(data.get_payload(decode=True))
if mime == "application/x-xz":
fname = data.get_filename(login + ".tar.xz")
elif mime == "application/x-bzip" or mime == "application/x-bzip2":
fname = data.get_filename(login + ".tar.bz2")
elif mime == "application/x-gzip" or mime == "application/gzip":
fname = data.get_filename(login + ".tar.gz")
elif mime == "application/zip":
fname = data.get_filename(login + ".zip")
elif mime == "application/rar":
fname = data.get_filename(login + ".rar")
elif mime == "application/x-tar" or mime == "application/tar":
fname = data.get_filename(login + ".tar")
else:
fname = None
return data.get_payload(decode=True), fname
if isinstance(data, Message):
if not data.is_multipart():
data, fname = found(data)
yield MailTest("Tarball found: %s." % fname, -1)
else:
for part in data.walk():
data, fname = found(part)
if fname is not None:
yield MailTest("Tarball found: %s." % fname, -1)
break
yield (data, login)
def hash_archive(cnt, dest=None, imposed_hash=None):
data, login = cnt
sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest()
yield MailTest("Your tarball SHA-1 is %s." % sha, -1)
if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)):
yield MailTest("You already have uploaded this tarball.", 1)
yield False
elif imposed_hash is not None and imposed_hash != sha:
yield MailTest("This is not the expected hash. Your tarball's hash must be: %s." % imposed_hash, 1)
else:
yield (data, sha, login)
def _guess_mime(data):
fd, path = tempfile.mkstemp()
with os.fdopen(fd, 'wb') as fp:
fp.write(data.encode() if isinstance(data, str) else data)
with subprocess.Popen(["file",
"--brief",
"--mime-type",
path], env={"LANG": 'C'}, stdout=subprocess.PIPE) as p:
p.wait()
os.unlink(path)
if p.returncode == 0:
return p.stdout.read().decode('utf-8', 'replace').strip()
def guess_mime(cnt):
data, sha, login = cnt
mime = _guess_mime(data)
if mime is not None:
yield MailTest("Guessed content-type of your submission: %s." % mime,
0 if mime.find("application/") == 0 else 2)
else:
mime = "application/tar"
yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime)
yield data, mime, sha, login
def extract(cnt, dest=None):
data, type, sha, login = cnt
if dest is not None:
odest = dest
os.makedirs(odest, exist_ok=True)
ldest = os.path.join(dest, login)
dest = os.path.join(dest, login + "." + sha)
if os.path.exists(dest):
yield MailTest("You already have uploaded this tarball.", 1)
return
last_dest = os.readlink(ldest) if os.path.exists(ldest) else None
try:
with tempfile.TemporaryDirectory() as temp:
with subprocess.Popen(["tar", "--no-same-owner", "--no-same-permissions",
("-xvC" + temp) if dest is not None else "-t",
"-z" if type in ["application/gzip", "application/x-gzip"] else "",
"-J" if type in ["application/xz", "application/x-xz"] else "",
"-j" if type in ["application/bzip", "application/bzip2", "application/x-bzip", "application/x-bzip2"] else "",
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
try:
p.stdin.write(data.encode() if isinstance(data, str) else data)
except Exception as e:
print(type)
print(p.stdout.read().decode('utf-8', 'replace'))
print(p.stderr.read().decode('utf-8', 'replace'))
raise e
p.stdin.close()
p.wait()
err = p.stdout.read().decode('utf-8', 'replace')
err += p.stderr.read().decode('utf-8', 'replace')
if p.returncode == 0:
if dest is not None:
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
if nsub > 1:
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
else:
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
if dest != ldest:
if os.path.lexists(ldest):
os.remove(ldest)
os.symlink(os.path.basename(dest), ldest)
if len(os.listdir(temp)) == 1:
shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest)
else:
shutil.move(temp, dest)
# Update modification time to reflect submission timestamp
os.utime(dest)
yield MailTest("Archive successfully extracted.", details=err)
yield dest
else:
yield MailTest("An error occured during archive extraction:", 1, details=err)
if not os.path.exists(temp):
os.makedirs(temp)
except PermissionError:
if os.path.lexists(ldest):
os.remove(ldest)
if last_dest is not None:
os.symlink(last_dest, ldest)
yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1)