This repository has been archived on 2022-04-27. You can view files and clone it, but cannot push or open issues or pull requests.
peret/archive.py

145 lines
6.1 KiB
Python

from datetime import datetime
from email.message import Message
import hashlib
import os
import shutil
import subprocess
import tempfile
from test import MailTest
def find(cnt):
data, login = cnt
def found(data):
mime = data.get_content_type()
if mime == "application/octet-stream":
mime = _guess_mime(data.get_payload(decode=True))
if mime == "application/x-xz":
fname = data.get_filename(login + ".tar.xz")
elif mime == "application/x-bzip" or mime == "application/x-bzip2":
fname = data.get_filename(login + ".tar.bz2")
elif mime == "application/x-gzip" or mime == "application/gzip":
fname = data.get_filename(login + ".tar.gz")
elif mime == "application/zip":
fname = data.get_filename(login + ".zip")
elif mime == "application/rar":
fname = data.get_filename(login + ".rar")
elif mime == "application/x-tar" or mime == "application/tar":
fname = data.get_filename(login + ".tar")
else:
fname = None
return data.get_payload(decode=True), fname
if isinstance(data, Message):
if not data.is_multipart():
data, fname = found(data)
yield MailTest("Tarball found: %s." % fname, -1)
else:
for part in data.walk():
data, fname = found(part)
if fname is not None:
yield MailTest("Tarball found: %s." % fname, -1)
break
yield (data, login)
def hash_archive(cnt, dest=None, imposed_hash=None):
data, login = cnt
sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest()
yield MailTest("Your tarball SHA-1 is %s." % sha, -1)
if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)):
yield MailTest("You already have uploaded this tarball.", 1)
yield False
elif imposed_hash is not None and imposed_hash != sha:
yield MailTest("This is not the expected hash. Your tarball's hash must be: %s." % imposed_hash, 1)
else:
yield (data, sha, login)
def _guess_mime(data):
with subprocess.Popen(["file",
"--brief",
"--mime-type",
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as p:
try:
p.stdin.write(data.encode() if isinstance(data, str) else data)
p.stdin.close()
except BrokenPipeError:
pass
p.wait()
if p.returncode == 0:
return p.stdout.read().decode('utf-8', 'replace').strip()
def guess_mime(cnt):
data, sha, login = cnt
mime = _guess_mime(data)
if mime is not None:
yield MailTest("Guessed content-type of your submission: %s." % mime,
0 if mime.find("application/x-") == 0 else 2)
else:
mime = "application/x-tar"
yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime)
yield data, mime, sha, login
def extract(cnt, dest=None):
data, type, sha, login = cnt
if dest is not None:
odest = dest
os.makedirs(odest, exist_ok=True)
ldest = os.path.join(dest, login)
dest = os.path.join(dest, login + "." + sha)
if os.path.exists(dest):
yield MailTest("You already have uploaded this tarball.", 1)
return
last_dest = os.readlink(ldest) if os.path.exists(ldest) else None
try:
with tempfile.TemporaryDirectory() as temp:
with subprocess.Popen(["tar", "--no-same-owner", "--no-same-permissions",
("-xvC" + temp) if dest is not None else "-t",
"-z" if type == "application/x-gzip" else "",
"-J" if type == "application/x-xz" else "",
"-j" if type in ["application/x-bzip", "application/x-bzip2"] else "",
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
p.stdin.write(data.encode() if isinstance(data, str) else data)
p.stdin.close()
p.wait()
err = p.stdout.read().decode('utf-8', 'replace')
err += p.stderr.read().decode('utf-8', 'replace')
if p.returncode == 0:
if dest is not None:
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
if nsub > 1:
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
else:
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
if dest != ldest:
if os.path.lexists(ldest):
os.remove(ldest)
os.symlink(os.path.basename(dest), ldest)
if len(os.listdir(temp)) == 1:
shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest)
else:
shutil.move(temp, dest)
# Update modification time to reflect submission timestamp
os.utime(dest)
yield MailTest("Archive successfully extracted.", details=err)
yield dest
else:
yield MailTest("An error occured during archive extraction:", 1, details=err)
if not os.path.exists(temp):
os.makedirs(temp)
except PermissionError:
if os.path.lexists(ldest):
os.remove(ldest)
if last_dest is not None:
os.symlink(last_dest, ldest)
yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1)