2016-10-11 22:33:40 +00:00
from datetime import datetime
from email . message import Message
import hashlib
import os
import shutil
import subprocess
import tempfile
from test import MailTest
def find ( cnt ) :
data , login = cnt
def found ( data ) :
mime = data . get_content_type ( )
if mime == " application/octet-stream " :
mime = _guess_mime ( data . get_payload ( decode = True ) )
if mime == " application/x-xz " :
fname = data . get_filename ( login + " .tar.xz " )
elif mime == " application/x-bzip " or mime == " application/x-bzip2 " :
fname = data . get_filename ( login + " .tar.bz2 " )
elif mime == " application/x-gzip " or mime == " application/gzip " :
fname = data . get_filename ( login + " .tar.gz " )
elif mime == " application/zip " :
fname = data . get_filename ( login + " .zip " )
elif mime == " application/rar " :
fname = data . get_filename ( login + " .rar " )
elif mime == " application/x-tar " or mime == " application/tar " :
fname = data . get_filename ( login + " .tar " )
else :
fname = None
return data . get_payload ( decode = True ) , fname
if isinstance ( data , Message ) :
if not data . is_multipart ( ) :
data , fname = found ( data )
yield MailTest ( " Tarball found: %s . " % fname , - 1 )
else :
for part in data . walk ( ) :
data , fname = found ( part )
if fname is not None :
yield MailTest ( " Tarball found: %s . " % fname , - 1 )
break
yield ( data , login )
2017-10-20 11:38:14 +00:00
def hash_archive ( cnt , dest = None , imposed_hash = None ) :
2016-10-11 22:33:40 +00:00
data , login = cnt
sha = hashlib . sha1 ( data . encode ( ) if isinstance ( data , str ) else data ) . hexdigest ( )
yield MailTest ( " Your tarball SHA-1 is %s . " % sha , - 1 )
if dest is not None and os . path . exists ( os . path . join ( dest , login + " . " + sha ) ) :
yield MailTest ( " You have already uploaded this tarball. " , 1 )
yield False
2017-10-20 11:38:14 +00:00
elif imposed_hash is not None and imposed_hash != sha :
yield MailTest ( " This is not the expected hash. Your tarball ' s hash must be: %s . " % imposed_hash , 1 )
2016-10-11 22:33:40 +00:00
else :
yield ( data , sha , login )
def _guess_mime ( data ) :
with subprocess . Popen ( [ " file " ,
" --brief " ,
" --mime-type " ,
" - " ] , env = { " LANG " : ' C ' } , stdin = subprocess . PIPE , stdout = subprocess . PIPE ) as p :
p . stdin . write ( data . encode ( ) if isinstance ( data , str ) else data )
p . stdin . close ( )
p . wait ( )
if p . returncode == 0 :
return p . stdout . read ( ) . decode ( ) . strip ( )
def guess_mime ( cnt ) :
data , sha , login = cnt
mime = _guess_mime ( data )
if mime is not None :
yield MailTest ( " Guessed content-type of your submission: %s . " % mime ,
0 if mime . find ( " application/x- " ) == 0 else 2 )
else :
mime = " application/x-tar "
yield MailTest ( " Unable to guess content-type of your submission. Assuming: %s . " % mime )
yield data , mime , sha , login
def extract ( cnt , dest = None ) :
data , type , sha , login = cnt
if dest is not None :
odest = dest
os . makedirs ( odest , exist_ok = True )
ldest = os . path . join ( dest , login )
dest = os . path . join ( dest , login + " . " + sha )
if os . path . exists ( dest ) :
yield MailTest ( " You have already uploaded this tarball. " , 1 )
return
last_dest = os . readlink ( ldest ) if os . path . exists ( ldest ) else None
try :
with tempfile . TemporaryDirectory ( ) as temp :
with subprocess . Popen ( [ " tar " , " --no-same-owner " , " --no-same-permissions " ,
( " -xvC " + temp ) if dest is not None else " -t " ,
" -z " if type == " application/x-gzip " else " " ,
" -J " if type == " application/x-xz " else " " ,
" -j " if type in [ " application/x-bzip " , " application/x-bzip2 " ] else " " ,
" - " ] , env = { " LANG " : ' C ' } , stdin = subprocess . PIPE , stdout = subprocess . PIPE , stderr = subprocess . PIPE ) as p :
p . stdin . write ( data . encode ( ) if isinstance ( data , str ) else data )
p . stdin . close ( )
p . wait ( )
err = p . stdout . read ( ) . decode ( )
err + = p . stderr . read ( ) . decode ( )
if p . returncode == 0 :
if dest is not None :
nsub = len ( [ x for x in os . listdir ( odest ) if x . find ( os . path . basename ( ldest ) + " . " ) == 0 ] ) + 1
if nsub > 1 :
yield MailTest ( " This is your %i %s submission. Last submission on: %s " % ( nsub , " st " if nsub == 0 else ( " nd " if nsub == 1 else ( " rd " if nsub == 3 else " th " ) ) , datetime . fromtimestamp ( int ( os . lstat ( ldest ) . st_mtime ) ) ) , - 1 )
else :
yield MailTest ( " This is your %i %s submission. " % ( nsub , " st " if nsub == 1 else ( " nd " if nsub == 2 else ( " rd " if nsub == 3 else " th " ) ) ) , - 1 )
if dest != ldest :
if os . path . lexists ( ldest ) :
os . remove ( ldest )
os . symlink ( os . path . basename ( dest ) , ldest )
if len ( os . listdir ( temp ) ) == 1 :
shutil . move ( os . path . join ( temp , os . listdir ( temp ) [ 0 ] ) , dest )
else :
shutil . move ( temp , dest )
yield MailTest ( " Archive successfully extracted. " , details = err )
yield dest
else :
yield MailTest ( " An error occured during archive extraction: " , 1 , details = err )
if not os . path . exists ( temp ) :
os . makedirs ( temp )
except PermissionError :
if os . path . lexists ( ldest ) :
os . remove ( ldest )
if last_dest is not None :
os . symlink ( last_dest , ldest )
yield MailTest ( " Your archive ' s content has crazy permissions. Please fix it. " , 1 )