from datetime import datetime from email.message import Message import hashlib import os import shutil import subprocess import tempfile from test import MailTest def find(cnt): data, login = cnt def found(data): mime = data.get_content_type() if mime == "application/octet-stream": mime = _guess_mime(data.get_payload(decode=True)) if mime == "application/x-xz": fname = data.get_filename(login + ".tar.xz") elif mime == "application/x-bzip" or mime == "application/x-bzip2": fname = data.get_filename(login + ".tar.bz2") elif mime == "application/x-gzip" or mime == "application/gzip": fname = data.get_filename(login + ".tar.gz") elif mime == "application/zip": fname = data.get_filename(login + ".zip") elif mime == "application/rar": fname = data.get_filename(login + ".rar") elif mime == "application/x-tar" or mime == "application/tar": fname = data.get_filename(login + ".tar") else: fname = None return data.get_payload(decode=True), fname if isinstance(data, Message): if not data.is_multipart(): data, fname = found(data) yield MailTest("Tarball found: %s." % fname, -1) else: for part in data.walk(): data, fname = found(part) if fname is not None: yield MailTest("Tarball found: %s." % fname, -1) break yield (data, login) def hash_archive(cnt, dest=None, imposed_hash=None): data, login = cnt sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest() yield MailTest("Your tarball SHA-1 is %s." % sha, -1) if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)): yield MailTest("You already have uploaded this tarball.", 1) yield False elif imposed_hash is not None and imposed_hash != sha: yield MailTest("This is not the expected hash. Your tarball's hash must be: %s." % imposed_hash, 1) else: yield (data, sha, login) def _guess_mime(data): with subprocess.Popen(["file", "--brief", "--mime-type", "-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as p: p.stdin.write(data.encode() if isinstance(data, str) else data) p.stdin.close() p.wait() if p.returncode == 0: return p.stdout.read().decode('utf-8', 'replace').strip() def guess_mime(cnt): data, sha, login = cnt mime = _guess_mime(data) if mime is not None: yield MailTest("Guessed content-type of your submission: %s." % mime, 0 if mime.find("application/x-") == 0 else 2) else: mime = "application/x-tar" yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime) yield data, mime, sha, login def extract(cnt, dest=None): data, type, sha, login = cnt if dest is not None: odest = dest os.makedirs(odest, exist_ok=True) ldest = os.path.join(dest, login) dest = os.path.join(dest, login + "." + sha) if os.path.exists(dest): yield MailTest("You already have uploaded this tarball.", 1) return last_dest = os.readlink(ldest) if os.path.exists(ldest) else None try: with tempfile.TemporaryDirectory() as temp: with subprocess.Popen(["tar", "--no-same-owner", "--no-same-permissions", ("-xvC" + temp) if dest is not None else "-t", "-z" if type == "application/x-gzip" else "", "-J" if type == "application/x-xz" else "", "-j" if type in ["application/x-bzip", "application/x-bzip2"] else "", "-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: p.stdin.write(data.encode() if isinstance(data, str) else data) p.stdin.close() p.wait() err = p.stdout.read().decode('utf-8', 'replace') err += p.stderr.read().decode('utf-8', 'replace') if p.returncode == 0: if dest is not None: nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1 if nsub > 1: yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1) else: yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1) if dest != ldest: if os.path.lexists(ldest): os.remove(ldest) os.symlink(os.path.basename(dest), ldest) if len(os.listdir(temp)) == 1: shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest) else: shutil.move(temp, dest) # Update modification time to reflect submission timestamp os.utime(dest) yield MailTest("Archive successfully extracted.", details=err) yield dest else: yield MailTest("An error occured during archive extraction:", 1, details=err) if not os.path.exists(temp): os.makedirs(temp) except PermissionError: if os.path.lexists(ldest): os.remove(ldest) if last_dest is not None: os.symlink(last_dest, ldest) yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1)