Compare commits
10 Commits
e5eb0795f2
...
a52c756159
Author | SHA1 | Date | |
---|---|---|---|
a52c756159 | |||
8f7d6e79c5 | |||
d13d93c39c | |||
efa4cb6824 | |||
19e1111e6c | |||
b84022ef70 | |||
6036099b13 | |||
5d2d4ff451 | |||
7002341583 | |||
2d5c4503ef |
@ -123,7 +123,7 @@ def extract(cnt, dest=None):
|
|||||||
if dest is not None:
|
if dest is not None:
|
||||||
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
|
nsub = len([x for x in os.listdir(odest) if x.find(os.path.basename(ldest) + ".") == 0]) + 1
|
||||||
if nsub > 1:
|
if nsub > 1:
|
||||||
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 0 else ("nd" if nsub == 1 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
|
yield MailTest("This is your %i%s submission. Last submission on: %s" % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th")), datetime.fromtimestamp(int(os.lstat(ldest).st_mtime))), -1)
|
||||||
else:
|
else:
|
||||||
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
|
yield MailTest("This is your %i%s submission." % (nsub, "st" if nsub == 1 else ("nd" if nsub == 2 else ("rd" if nsub == 3 else "th"))), -1)
|
||||||
if dest != ldest:
|
if dest != ldest:
|
||||||
|
98
check.py
98
check.py
@ -34,23 +34,72 @@ def relatesTo(data, submissions_dir):
|
|||||||
yield data
|
yield data
|
||||||
|
|
||||||
|
|
||||||
def gen_checks(submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
|
def gen_checks(gpgmail, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
|
||||||
if check_content:
|
if check_content:
|
||||||
yield (relatesTo, [submissions_dir])
|
yield (relatesTo, [submissions_dir])
|
||||||
if HARD_MAX_SUBMISSION is not None:
|
|
||||||
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
|
|
||||||
else:
|
else:
|
||||||
yield signcheck
|
yield signcheck
|
||||||
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
|
|
||||||
yield (signature.check, [GNUPG_DIRECTORY])
|
if gpgmail is None or not gpgmail.valid:
|
||||||
yield (login.check, ["/home/nemunaire/workspace/check_mail/SRS2017.csv"])
|
yield (envelope.check, [GNUPG_DIRECTORY, not skip_public_key, BETA])
|
||||||
|
yield (signature.check, [GNUPG_DIRECTORY])
|
||||||
|
else:
|
||||||
|
yield (envelope.skip, [gpgmail])
|
||||||
|
|
||||||
|
yield (login.check, ["/home/nemunaire/workspace/peret/SRS2022.csv", "/home/nemunaire/workspace/peret/GISTRE2022.csv"])
|
||||||
if check_content:
|
if check_content:
|
||||||
|
if HARD_MAX_SUBMISSION is not None:
|
||||||
|
yield (late.check, [HARD_MAX_SUBMISSION, SOFT_MAX_SUBMISSION])
|
||||||
yield archive.find
|
yield archive.find
|
||||||
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
|
yield ( archive.hash_archive, [submissions_dir, check_submission_hash] )
|
||||||
yield archive.guess_mime
|
yield archive.guess_mime
|
||||||
yield ( archive.extract, [submissions_dir] )
|
yield ( archive.extract, [submissions_dir] )
|
||||||
|
|
||||||
|
|
||||||
|
def respondissueemail(to, subject, ref, initial_to=None):
|
||||||
|
from email.message import EmailMessage
|
||||||
|
|
||||||
|
msg = EmailMessage()
|
||||||
|
msg["X-loop"] = "peret"
|
||||||
|
msg["From"] = FROM
|
||||||
|
msg["To"] = to
|
||||||
|
if ref is not None:
|
||||||
|
msg["References"] = ref
|
||||||
|
msg["In-Reply-To"] = ref
|
||||||
|
msg["Subject"] = ("Re: " if not subject.lower().find("re: ") >= 0 else "") + subject
|
||||||
|
|
||||||
|
msg.set_content("""Hi!
|
||||||
|
|
||||||
|
This is the automatic e-mail analyzer in charge of checking your work.
|
||||||
|
|
||||||
|
I am currently facing a problem to analyze your e-mail.
|
||||||
|
|
||||||
|
You'll automatically receive a new response as soon as the problem is corrected.
|
||||||
|
|
||||||
|
You can continue to submit newer submissions if needed, the last one will be kept, as usual.
|
||||||
|
|
||||||
|
Sincerely,
|
||||||
|
|
||||||
|
-- \nAutomatic e-mail checker
|
||||||
|
running for nemunaire@nemunai.re""")
|
||||||
|
|
||||||
|
import smtplib
|
||||||
|
with smtplib.SMTP("localhost") as smtp:
|
||||||
|
smtp.starttls()
|
||||||
|
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
|
||||||
|
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
|
||||||
|
print(msg.as_string())
|
||||||
|
if REVIEW_BEFORE_SEND:
|
||||||
|
import time
|
||||||
|
for i in range(15):
|
||||||
|
sys.stdout.write(".")
|
||||||
|
sys.stdout.flush()
|
||||||
|
time.sleep(1)
|
||||||
|
if SEND_TO_REALUSER:
|
||||||
|
smtp.send_message(msg)
|
||||||
|
smtp.send_message(msg, to_addrs=["peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re"])
|
||||||
|
|
||||||
|
|
||||||
def respondmail(to, subject, ref, checks, initial_to=None):
|
def respondmail(to, subject, ref, checks, initial_to=None):
|
||||||
from email.message import EmailMessage
|
from email.message import EmailMessage
|
||||||
|
|
||||||
@ -65,10 +114,10 @@ def respondmail(to, subject, ref, checks, initial_to=None):
|
|||||||
if maxitem[0] < lvl:
|
if maxitem[0] < lvl:
|
||||||
maxitem = item
|
maxitem = item
|
||||||
if final_decision == "ACCEPT" or final_decision == "SKIP":
|
if final_decision == "ACCEPT" or final_decision == "SKIP":
|
||||||
return respondmail(to, subject, ref, [(lvl, tests, final_decision)])
|
return respondmail(to, subject, ref, [(lvl, tests, final_decision)], initial_to)
|
||||||
# Display the most upper error
|
# Display the most upper error
|
||||||
if not ALTERNATE_RESOLUTIONS:
|
if not ALTERNATE_RESOLUTIONS:
|
||||||
return respondmail(to, subject, ref, [maxitem])
|
return respondmail(to, subject, ref, [maxitem], initial_to)
|
||||||
|
|
||||||
msg = EmailMessage()
|
msg = EmailMessage()
|
||||||
msg["X-loop"] = "peret"
|
msg["X-loop"] = "peret"
|
||||||
@ -85,7 +134,12 @@ def respondmail(to, subject, ref, checks, initial_to=None):
|
|||||||
for lvl, tests, final_decision in checks:
|
for lvl, tests, final_decision in checks:
|
||||||
if fmt != "":
|
if fmt != "":
|
||||||
fmt += '\n============ There is also another resolution: ============\n\n'
|
fmt += '\n============ There is also another resolution: ============\n\n'
|
||||||
|
lasttest = None
|
||||||
for test in tests:
|
for test in tests:
|
||||||
|
if lasttest is not None and lasttest == test.title and not ALTERNATE_RESOLUTIONS:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
lasttest = test.title
|
||||||
fmt += test.valuestr
|
fmt += test.valuestr
|
||||||
fmt += test.title
|
fmt += test.title
|
||||||
fmt += '\n'
|
fmt += '\n'
|
||||||
@ -116,6 +170,7 @@ running for nemunaire@nemunai.re""")
|
|||||||
with smtplib.SMTP("localhost") as smtp:
|
with smtplib.SMTP("localhost") as smtp:
|
||||||
smtp.starttls()
|
smtp.starttls()
|
||||||
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
|
if not SEND_TO_REALUSER or REVIEW_BEFORE_SEND:
|
||||||
|
print("peret-report@nemunai.re" if initial_to is None else initial_to + "-report@nemunai.re")
|
||||||
print(msg.as_string())
|
print(msg.as_string())
|
||||||
if REVIEW_BEFORE_SEND:
|
if REVIEW_BEFORE_SEND:
|
||||||
import time
|
import time
|
||||||
@ -129,23 +184,32 @@ running for nemunaire@nemunai.re""")
|
|||||||
|
|
||||||
|
|
||||||
def readmail(fp):
|
def readmail(fp):
|
||||||
cnt = email.message_from_binary_file(fp, policy=email.policy.default)
|
import gnupg
|
||||||
|
import gnupg_mail
|
||||||
|
|
||||||
|
theEMail = fp.read()
|
||||||
|
try:
|
||||||
|
gpgmail = gnupg_mail.Message(theEMail.decode(), settings=gnupg_mail.Settings(log_level='debug',require_signed=True), gpg=gnupg.GPG(gnupghome=GNUPG_DIRECTORY))
|
||||||
|
except:
|
||||||
|
gpgmail = None
|
||||||
|
|
||||||
|
cnt = email.message_from_bytes(theEMail, policy=email.policy.default)
|
||||||
frm = cnt.get("From") or "someone"
|
frm = cnt.get("From") or "someone"
|
||||||
subject = cnt.get("Subject") or "your mail"
|
subject = cnt.get("Subject") or "your mail"
|
||||||
ref = cnt.get("Message-ID") or ""
|
ref = cnt.get("Message-ID") or ""
|
||||||
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
|
to = cnt.get("To").split("@", 1)[0] or cnt.get("Cc").split("@", 1)[0] or None
|
||||||
|
|
||||||
return cnt, frm, subject, ref, to
|
return gpgmail, cnt, frm, subject, ref, to
|
||||||
|
|
||||||
|
|
||||||
def check_mail(cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
|
def check_mail(gpgmail, cnt, submissions_dir, check_content=False, check_submission_hash=None, skip_public_key=True):
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
# sentinel
|
# sentinel
|
||||||
results.append([(None, [cnt])])
|
results.append([(None, [cnt])])
|
||||||
|
|
||||||
lvl = 0
|
lvl = 0
|
||||||
for check in gen_checks(submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
|
for check in gen_checks(gpgmail, submissions_dir=submissions_dir, check_content=check_content, check_submission_hash=check_submission_hash, skip_public_key=skip_public_key):
|
||||||
lvl += 1
|
lvl += 1
|
||||||
curr = []
|
curr = []
|
||||||
curc = []
|
curc = []
|
||||||
@ -238,6 +302,9 @@ if __name__ == '__main__':
|
|||||||
parser.add_argument('--skip-public-key', action="store_true",
|
parser.add_argument('--skip-public-key', action="store_true",
|
||||||
help="enable if you want to skip public key discovery through attachments")
|
help="enable if you want to skip public key discovery through attachments")
|
||||||
|
|
||||||
|
parser.add_argument('--issue-thunderbird91', action="store_true",
|
||||||
|
help="enable issue report for thunderbird91")
|
||||||
|
|
||||||
parser.add_argument('--beta', action="store_true",
|
parser.add_argument('--beta', action="store_true",
|
||||||
help="enable beta features")
|
help="enable beta features")
|
||||||
|
|
||||||
@ -261,5 +328,8 @@ if __name__ == '__main__':
|
|||||||
REVIEW_BEFORE_SEND = args.review_before_send
|
REVIEW_BEFORE_SEND = args.review_before_send
|
||||||
BETA = args.beta
|
BETA = args.beta
|
||||||
|
|
||||||
cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
|
gpgmail, cnt, frm, subject, ref, to = readmail(sys.stdin.buffer)
|
||||||
respondmail(frm, subject, ref, [c for c in check_mail(cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], to)
|
if args.issue_thunderbird91:
|
||||||
|
respondissueemail(frm, subject, ref, initial_to=to)
|
||||||
|
else:
|
||||||
|
respondmail(frm, subject, ref, [c for c in check_mail(gpgmail, cnt, submissions_dir=args.submissions, check_content=not args.sign, check_submission_hash=args.expected_submission_hash, skip_public_key=args.skip_public_key)], initial_to=to)
|
||||||
|
28
envelope.py
28
envelope.py
@ -18,7 +18,8 @@ def import_pubkey(key, GNUPG_DIRECTORY):
|
|||||||
yield MailTest("New PGP key successfully imported:", details=gpg_output)
|
yield MailTest("New PGP key successfully imported:", details=gpg_output)
|
||||||
yield False
|
yield False
|
||||||
else:
|
else:
|
||||||
yield MailTest("An error occurs during PGP key importation:", details=gpg_output)
|
yield MailTest("An error occurs during PGP key importation:", 1, details=gpg_output)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def assume_rfc3156(msg):
|
def assume_rfc3156(msg):
|
||||||
@ -43,7 +44,11 @@ def assume_rfc3156(msg):
|
|||||||
|
|
||||||
def assume_oldstyle(payload):
|
def assume_oldstyle(payload):
|
||||||
yield MailTest("Found BEGIN PGP SIGNED MESSAGE: message treated as old style PGP email.")
|
yield MailTest("Found BEGIN PGP SIGNED MESSAGE: message treated as old style PGP email.")
|
||||||
yield payload
|
try:
|
||||||
|
yield payload.encode()
|
||||||
|
except:
|
||||||
|
yield MailTest("Non-armored signed message discovered. Avoid using binary message over SMTP (see RFC2015 #2. PGP data formats).", 2)
|
||||||
|
yield payload
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -106,4 +111,23 @@ def check(msg, GNUPG_DIRECTORY, accept_public_key=True, beta=False):
|
|||||||
yield MailTest("Standalone non-armored signature file discovered. Avoid using binary signature over SMTP (see RFC2015 #2. PGP data formats).", 2)
|
yield MailTest("Standalone non-armored signature file discovered. Avoid using binary signature over SMTP (see RFC2015 #2. PGP data formats).", 2)
|
||||||
yield (lpart.get_payload(decode=True), part.get_payload(decode=True))
|
yield (lpart.get_payload(decode=True), part.get_payload(decode=True))
|
||||||
|
|
||||||
|
elif lpart is not None and part.get_filename() is not None and lpart.get_filename() is not None and lpart.get_filename()[:len(part.get_filename())] == part.get_filename():
|
||||||
|
yield MailTest("Standalone non-armored signature file discovered. Avoid using binary signature over SMTP (see RFC2015 #2. PGP data formats).", 2)
|
||||||
|
yield (part.get_payload(decode=True), lpart.get_payload(decode=True))
|
||||||
|
|
||||||
|
elif part.get_filename() is not None and (part.get_filename()[len(part.get_filename())-4:] == ".gpg" or part.get_filename()[len(part.get_filename())-4:] == ".asc") or (
|
||||||
|
payload is not None and not part.is_multipart() and part.get_payload(decode=True).find(b"-----BEGIN PGP MESSAGE-----") >= 0):
|
||||||
|
yield MailTest("Standalone PGP message discovered.")
|
||||||
|
yield part.get_payload(decode=True)
|
||||||
|
|
||||||
lpart = part
|
lpart = part
|
||||||
|
|
||||||
|
def skip(msg, gpgmail):
|
||||||
|
ct = msg.get_content_type()
|
||||||
|
|
||||||
|
data = msg.get_payload(0)
|
||||||
|
|
||||||
|
_, verify = gpgmail.gnupg
|
||||||
|
|
||||||
|
yield MailTest("Message treated as OpenPGP signed message")
|
||||||
|
yield (data, verify.username)
|
||||||
|
@ -37,13 +37,14 @@ class BadSignature:
|
|||||||
|
|
||||||
class UncheckableSignature:
|
class UncheckableSignature:
|
||||||
|
|
||||||
def __init__(self, keyid, pkalgo, hashalgo, sig_class, time, rc):
|
def __init__(self, keyid, pkalgo, hashalgo, sig_class, time, rc, longkeyid):
|
||||||
self.keyid = keyid
|
self.keyid = keyid
|
||||||
self.pkalgo = pkalgo
|
self.pkalgo = pkalgo
|
||||||
self.hashalgo = hashalgo
|
self.hashalgo = hashalgo
|
||||||
self.sig_class = sig_class
|
self.sig_class = sig_class
|
||||||
self.time = time
|
self.time = time
|
||||||
self.rc = rc
|
self.rc = rc
|
||||||
|
self.longkeyid = longkeyid
|
||||||
|
|
||||||
|
|
||||||
class ValidSignature:
|
class ValidSignature:
|
||||||
|
17
login.py
17
login.py
@ -9,16 +9,17 @@ LOGIN_FIELD = 2
|
|||||||
EMAILADDR_FIELD = 3
|
EMAILADDR_FIELD = 3
|
||||||
|
|
||||||
|
|
||||||
def check(cnt, file):
|
def check(cnt, *files):
|
||||||
data, uname = cnt
|
data, uname = cnt
|
||||||
username, address = parseaddr(uname)
|
username, address = parseaddr(uname)
|
||||||
|
|
||||||
with open(file, encoding='utf-8') as fd:
|
for file in files:
|
||||||
people = csv.reader(fd)
|
with open(file, encoding='utf-8') as fd:
|
||||||
for p in people:
|
people = csv.reader(fd)
|
||||||
if address.lower() == p[EMAILADDR_FIELD].lower() or uname.lower().find(p[LOGIN_FIELD].lower()) >= 0 or username.lower().replace(" ", "").find(p[LASTNAME_FIELD].lower().replace(" ", "")) >= 0 and username.lower().find(p[FIRSTNAME_FIELD].lower()) >= 0:
|
for p in people:
|
||||||
yield MailTest("Recognized as %s: %s %s." % (p[LOGIN_FIELD], p[FIRSTNAME_FIELD], p[LASTNAME_FIELD]))
|
if address.lower() == p[EMAILADDR_FIELD].lower() or uname.lower().find(p[LOGIN_FIELD].lower()) >= 0 or username.lower().replace(" ", "").find(p[LASTNAME_FIELD].lower().replace(" ", "")) >= 0 and username.lower().find(p[FIRSTNAME_FIELD].lower()) >= 0:
|
||||||
yield data, p[LOGIN_FIELD]
|
yield MailTest("Recognized as %s: %s %s." % (p[LOGIN_FIELD], p[FIRSTNAME_FIELD], p[LASTNAME_FIELD]))
|
||||||
return
|
yield data, p[LOGIN_FIELD]
|
||||||
|
return
|
||||||
|
|
||||||
yield MailTest("The username of your key is not explicit, I can't find you.", 1)
|
yield MailTest("The username of your key is not explicit, I can't find you.", 1)
|
||||||
|
87
signature.py
87
signature.py
@ -39,13 +39,40 @@ def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""):
|
|||||||
|
|
||||||
|
|
||||||
def check(cnt, GNUPG_DIRECTORY):
|
def check(cnt, GNUPG_DIRECTORY):
|
||||||
if len(cnt) == 2:
|
for server in ["pool.sks-keyservers.net", "keys.openpgp.org"]:
|
||||||
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
if len(cnt) == 2:
|
||||||
else:
|
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server)
|
||||||
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
else:
|
||||||
|
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server)
|
||||||
|
|
||||||
|
|
||||||
def check_sep(data, sign, GNUPG_DIRECTORY):
|
def check_sign(cmd, bdata, fname, GNUPG_DIRECTORY, keyserver, windows_hack=False):
|
||||||
|
with subprocess.Popen(["gpg",
|
||||||
|
"--homedir=" + GNUPG_DIRECTORY,
|
||||||
|
"--status-fd=1",
|
||||||
|
"--auto-key-retrieve",
|
||||||
|
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
|
||||||
|
"--keyserver=" + keyserver,
|
||||||
|
"--quiet",
|
||||||
|
"--batch",
|
||||||
|
cmd,
|
||||||
|
fname,
|
||||||
|
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
||||||
|
p.stdin.write(bdata)
|
||||||
|
p.stdin.close()
|
||||||
|
|
||||||
|
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
||||||
|
p.wait()
|
||||||
|
gpg_output = p.stderr.read()
|
||||||
|
gpg_rcode = p.returncode
|
||||||
|
|
||||||
|
if gpg_rcode != 0 and not windows_hack:
|
||||||
|
return check_sign(cmd, bdata.replace(b'\n', b'\r\n'), fname, GNUPG_DIRECTORY, keyserver, True) # Windows hack
|
||||||
|
|
||||||
|
return gpg_status, gpg_output, gpg_rcode
|
||||||
|
|
||||||
|
|
||||||
|
def check_sep(data, sign, GNUPG_DIRECTORY, keyserver):
|
||||||
gpg_output = ""
|
gpg_output = ""
|
||||||
gpg_status = []
|
gpg_status = []
|
||||||
gpg_rcode = None
|
gpg_rcode = None
|
||||||
@ -54,31 +81,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY):
|
|||||||
f.write(sign)
|
f.write(sign)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
with subprocess.Popen(["gpg",
|
gpg_status, gpg_output, gpg_rcode = check_sign("--verify", data if isinstance(data, bytes) else data.as_bytes(), f.name, GNUPG_DIRECTORY, keyserver)
|
||||||
"--homedir=" + GNUPG_DIRECTORY,
|
|
||||||
"--status-fd=1",
|
|
||||||
"--auto-key-retrieve",
|
|
||||||
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
|
|
||||||
"--keyserver=pool.sks-keyservers.net",
|
|
||||||
"--quiet",
|
|
||||||
"--batch",
|
|
||||||
"--verify",
|
|
||||||
f.name,
|
|
||||||
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
||||||
if isinstance(data, bytes):
|
|
||||||
bdata = data
|
|
||||||
else:
|
|
||||||
bdata = data.as_bytes()
|
|
||||||
if not bdata.find(b'\r\n') >= 0:
|
|
||||||
bdata.replace(b'\n', b'\r\n') # Windows hack
|
|
||||||
p.stdin.write(bdata)
|
|
||||||
p.stdin.close()
|
|
||||||
|
|
||||||
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
|
||||||
p.wait()
|
|
||||||
gpg_output = p.stderr.read()
|
|
||||||
gpg_rcode = p.returncode
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
yield MailTest("An error occured: %s" % e, 1)
|
yield MailTest("An error occured: %s" % e, 1)
|
||||||
return
|
return
|
||||||
@ -88,7 +91,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY):
|
|||||||
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
|
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
|
||||||
|
|
||||||
|
|
||||||
def check_merged(bdata, GNUPG_DIRECTORY):
|
def check_merged(bdata, GNUPG_DIRECTORY, keyserver):
|
||||||
f = tempfile.NamedTemporaryFile()
|
f = tempfile.NamedTemporaryFile()
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
@ -96,27 +99,7 @@ def check_merged(bdata, GNUPG_DIRECTORY):
|
|||||||
gpg_status = []
|
gpg_status = []
|
||||||
gpg_rcode = None
|
gpg_rcode = None
|
||||||
try:
|
try:
|
||||||
with subprocess.Popen(["gpg",
|
gpg_status, gpg_output, gpg_rcode = check_sign("--output", bdata, f.name, GNUPG_DIRECTORY, keyserver)
|
||||||
"--homedir=" + GNUPG_DIRECTORY,
|
|
||||||
"--status-fd=1",
|
|
||||||
"--auto-key-retrieve",
|
|
||||||
"--auto-key-locate=clear,local,pka,dane,cert,keyserver",
|
|
||||||
"--keyserver=pool.sks-keyservers.net",
|
|
||||||
"--quiet",
|
|
||||||
"--batch",
|
|
||||||
"--output",
|
|
||||||
f.name,
|
|
||||||
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
||||||
#if not bdata.find('\r\n') >= 0:
|
|
||||||
# bdata = bdata.replace('\n', '\r\n') # Windows hack
|
|
||||||
p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata)
|
|
||||||
p.stdin.close()
|
|
||||||
|
|
||||||
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
|
||||||
p.wait()
|
|
||||||
gpg_output = p.stderr.read()
|
|
||||||
gpg_rcode = p.returncode
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
yield MailTest("An error occured: %s" % e, 1)
|
yield MailTest("An error occured: %s" % e, 1)
|
||||||
return
|
return
|
||||||
|
Reference in New Issue
Block a user