You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
164 lines
6.7 KiB
Python
164 lines
6.7 KiB
Python
from datetime import datetime
|
|
from email.message import Message
|
|
import hashlib
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from test import MailTest
|
|
|
|
|
|
def find(cnt):
|
|
data, login = cnt
|
|
def found(data):
|
|
mime = data.get_content_type()
|
|
if mime == "application/octet-stream":
|
|
mime = _guess_mime(data.get_payload(decode=True))
|
|
if mime == "application/x-xz":
|
|
fname = data.get_filename(login + ".tar.xz")
|
|
elif mime == "application/x-bzip" or mime == "application/x-bzip2":
|
|
fname = data.get_filename(login + ".tar.bz2")
|
|
elif mime == "application/x-gzip" or mime == "application/gzip":
|
|
fname = data.get_filename(login + ".tar.gz")
|
|
elif mime == "application/zip":
|
|
fname = data.get_filename(login + ".zip")
|
|
elif mime == "application/rar":
|
|
fname = data.get_filename(login + ".rar")
|
|
elif mime == "application/x-tar" or mime == "application/tar":
|
|
fname = data.get_filename(login + ".tar")
|
|
else:
|
|
fname = None
|
|
return data.get_payload(decode=True), fname
|
|
|
|
if isinstance(data, Message):
|
|
if not data.is_multipart():
|
|
data, fname = found(data)
|
|
yield MailTest("Tarball found: %s." % fname, -1)
|
|
else:
|
|
for part in data.walk():
|
|
data, fname = found(part)
|
|
if fname is not None:
|
|
yield MailTest("Tarball found: %s." % fname, -1)
|
|
break
|
|
yield (data, login)
|
|
|
|
|
|
def hash_archive(cnt, dest=None, imposed_hash=None):
|
|
data, login = cnt
|
|
sha = hashlib.sha1(data.encode() if isinstance(data, str) else data).hexdigest()
|
|
yield MailTest("Your tarball SHA-1 is %s." % sha, -1)
|
|
if dest is not None and os.path.exists(os.path.join(dest, login + "." + sha)):
|
|
yield MailTest("You already have uploaded this tarball.", 1)
|
|
yield False
|
|
elif imposed_hash is not None and imposed_hash != sha:
|
|
yield MailTest("This is not the expected hash. Your tarball's hash must be: %s." % imposed_hash, 1)
|
|
else:
|
|
yield (data, sha, login)
|
|
|
|
|
|
def _guess_mime(data):
|
|
fd, path = tempfile.mkstemp()
|
|
with os.fdopen(fd, 'wb') as fp:
|
|
fp.write(data.encode() if isinstance(data, str) else data)
|
|
|
|
with subprocess.Popen(["file",
|
|
"--brief",
|
|
"--mime-type",
|
|
path], env={"LANG": 'C'}, stdout=subprocess.PIPE) as p:
|
|
p.wait()
|
|
os.unlink(path)
|
|
|
|
if p.returncode == 0:
|
|
return p.stdout.read().decode('utf-8', 'replace').strip()
|
|
|
|
|
|
def guess_mime(cnt):
|
|
data, sha, login = cnt
|
|
|
|
mime = _guess_mime(data)
|
|
if mime is not None:
|
|
yield MailTest("Guessed content-type of your submission: %s." % mime,
|
|
0 if mime.find("application/") == 0 else 2)
|
|
else:
|
|
mime = "application/tar"
|
|
yield MailTest("Unable to guess content-type of your submission. Assuming: %s." % mime)
|
|
|
|
yield data, mime, sha, login
|
|
|
|
|
|
def extract(cnt, dest=None):
|
|
data, type, sha, login = cnt
|
|
|
|
if dest is not None:
|
|
odest = dest
|
|
os.makedirs(odest, exist_ok=True)
|
|
ldest = os.path.join(dest, login)
|
|
dest = os.path.join(dest, login + "." + sha)
|
|
if os.path.exists(dest):
|
|
yield MailTest("You already have uploaded this tarball.", 1)
|
|
return
|
|
last_dest = os.readlink(ldest) if os.path.exists(ldest) else None
|
|
|
|
try:
|
|
with tempfile.TemporaryDirectory() as temp:
|
|
cmdline = ["tar", "--no-same-owner", "--no-same-permissions"]
|
|
if dest is not None:
|
|
cmdline.append("-xC")
|
|
cmdline.append(temp)
|
|
else:
|
|
cmdline.append("-t")
|
|
if len(data) < 3145728:
|
|
cmdline.append("-v")
|
|
if type in ["application/gzip", "application/x-gzip"]:
|
|
cmdline.append("-z")
|
|
elif type in ["application/xz", "application/x-xz"]:
|
|
cmdline.append("-J")
|
|
elif type in ["application/bzip", "application/bzip2", "application/x-bzip", "application/x-bzip2"]:
|
|
cmdline.append("-j")
|
|
elif type in ["application/lzma", "application/x-lzma"]:
|
|
cmdline.append("--lzma")
|
|
elif type in ["application/zstd", "application/x-zstd"]:
|
|
cmdline.append("--zstd")
|
|
|
|
with subprocess.Popen(cmdline, env={"LANG": 'C'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as p:
|
|
try:
|
|
p.stdin.write(data.encode() if isinstance(data, str) else data)
|
|
except Exception as e:
|
|
print(type)
|
|
print(p.stdout.read().decode('utf-8', 'replace'))
|
|
raise e
|
|
p.stdin.close()
|
|
err = p.stdout.read().decode('utf-8', 'replace')
|
|
p.wait()
|
|
if p.returncode == 0:
|
|
if dest is not None:
|
|
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
|
|
if nsub > 1:
|
|
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
|
|
else:
|
|
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
|
|
if dest != ldest:
|
|
if os.path.lexists(ldest):
|
|
os.remove(ldest)
|
|
os.symlink(os.path.basename(dest), ldest)
|
|
if len(os.listdir(temp)) == 1:
|
|
shutil.move(os.path.join(temp, os.listdir(temp)[0]), dest)
|
|
else:
|
|
shutil.move(temp, dest)
|
|
# Update modification time to reflect submission timestamp
|
|
os.utime(dest)
|
|
yield MailTest("Archive successfully extracted.", details=err)
|
|
yield dest
|
|
else:
|
|
yield MailTest("An error occured during archive extraction:", 1, details=err)
|
|
|
|
if not os.path.exists(temp):
|
|
os.makedirs(temp)
|
|
except PermissionError:
|
|
if os.path.lexists(ldest):
|
|
os.remove(ldest)
|
|
if last_dest is not None:
|
|
os.symlink(last_dest, ldest)
|
|
yield MailTest("Your archive's content has crazy permissions. Please fix it.", 1)
|