This repository has been archived on 2022-04-27. You can view files and clone it, but cannot push or open issues or pull requests.
peret/archive.py

142 lines
6.1 KiB
Python
Raw Normal View History

from datetime import datetime
from email.message import Message
import hashlib
import os
import shutil
import subprocess
import tempfile
from test import MailTest
def find(cnt):
data, login = cnt
def found(data):
mime = data.get_content_type()
if mime == "application/octet-stream":
mime = _guess_mime(data.get_payload(decode=True))
if mime == "application/x-xz":
fname = data.get_filename(login + ".tar.xz")
elif mime == "application/x-bzip" or mime == "application/x-bzip2":
fname = data.get_filename(login + ".tar.bz2")
elif mime == "application/x-gzip" or mime == "application/gzip":
fname = data.get_filename(login + ".tar.gz")
elif mime == "application/zip":
fname = data.get_filename(login + ".zip")
elif mime == "application/rar":
fname = data.get_filename(login + ".rar")
elif mime == "application/x-tar" or mime == "application/tar":
fname = data.get_filename(login + ".tar")
else:
fname = None
return data.get_payload(decode=True), fname
if isinstance(data, Message):
if not data.is_multipart():
data, fname = found(data)
yield MailTest("Tarball found: %s." % fname, -1)
else:
for part in data.walk():
data, fname = found(part)
if fname is not None:
yield MailTest("Tarball found: %s." % fname, -1)
break
yield (data, login)
def hash_archive(cnt, dest=None, imposed_hash=None):
data, login = cnt
sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest()
yield MailTest("Your tarball SHA-1 is %s." % sha, -1)
if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)):
yield MailTest("You have already uploaded this tarball.", 1)
yield False
elif imposed_hash is not None and imposed_hash != sha:
yield MailTest("This is not the expected hash. Your tarball's hash must be: %s." % imposed_hash, 1)
else:
yield (data, sha, login)
def _guess_mime(data):
with subprocess.Popen(["file",
"--brief",
"--mime-type",
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as p:
p.stdin.write(data.encode() if isinstance(data, str) else data)
p.stdin.close()
p.wait()
if p.returncode == 0:
2017-10-25 22:22:50 +00:00
return p.stdout.read().decode('utf-8', 'replace').strip()
def guess_mime(cnt):
data, sha, login = cnt
mime = _guess_mime(data)
if mime is not None:
yield MailTest("Guessed content-type of your submission: %s." % mime,
0 if mime.find("application/x-") == 0 else 2)
else:
mime = "application/x-tar"
yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime)
yield data, mime, sha, login
def extract(cnt, dest=None):
data, type, sha, login = cnt
if dest is not None:
odest = dest
os.makedirs(odest, exist_ok=True)
ldest = os.path.join(dest, login)
dest = os.path.join(dest, login + "." + sha)
if os.path.exists(dest):
yield MailTest("You have already uploaded this tarball.", 1)
return
last_dest = os.readlink(ldest) if os.path.exists(ldest) else None
try:
with tempfile.TemporaryDirectory() as temp:
with subprocess.Popen(["tar", "--no-same-owner", "--no-same-permissions",
("-xvC" + temp) if dest is not None else "-t",
"-z" if type == "application/x-gzip" else "",
"-J" if type == "application/x-xz" else "",
"-j" if type in ["application/x-bzip", "application/x-bzip2"] else "",
"-"], env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
p.stdin.write(data.encode() if isinstance(data, str) else data)
p.stdin.close()
p.wait()
2017-10-25 22:22:50 +00:00
err = p.stdout.read().decode('utf-8', 'replace')
err += p.stderr.read().decode('utf-8', 'replace')
if p.returncode == 0:
if dest is not None:
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
if nsub > 1:
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
else:
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
if dest != ldest:
if os.path.lexists(ldest):
os.remove(ldest)
os.symlink(os.path.basename(dest), ldest)
if len(os.listdir(temp)) == 1:
shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest)
else:
shutil.move(temp, dest)
2017-10-25 22:22:50 +00:00
# Update modification time to reflect submission timestamp
os.utime(dest)
yield MailTest("Archive successfully extracted.", details=err)
yield dest
else:
yield MailTest("An error occured during archive extraction:", 1, details=err)
if not os.path.exists(temp):
os.makedirs(temp)
except PermissionError:
if os.path.lexists(ldest):
os.remove(ldest)
if last_dest is not None:
os.symlink(last_dest, ldest)
yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1)