Refactor signature checking
This commit is contained in:
parent
e5eb0795f2
commit
2d5c4503ef
84
signature.py
84
signature.py
@ -39,13 +39,37 @@ def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""):
|
|||||||
|
|
||||||
|
|
||||||
def check(cnt, GNUPG_DIRECTORY):
|
def check(cnt, GNUPG_DIRECTORY):
|
||||||
if len(cnt) == 2:
|
for server in ["pool.sks-keyservers.net", "keys.openpgp.org"]:
|
||||||
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
if len(cnt) == 2:
|
||||||
else:
|
yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server)
|
||||||
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY)
|
else:
|
||||||
|
yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server)
|
||||||
|
|
||||||
|
|
||||||
def check_sep(data, sign, GNUPG_DIRECTORY):
|
def check_sign(cmd, bdata, fname, GNUPG_DIRECTORY, keyserver, windows_hack=False):
|
||||||
|
with subprocess.Popen(["gpg",
|
||||||
|
"--homedir=" + GNUPG_DIRECTORY,
|
||||||
|
"--status-fd=1",
|
||||||
|
"--auto-key-retrieve",
|
||||||
|
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
|
||||||
|
"--keyserver=" + keyserver,
|
||||||
|
"--quiet",
|
||||||
|
"--batch",
|
||||||
|
cmd,
|
||||||
|
fname,
|
||||||
|
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
||||||
|
p.stdin.write(bdata)
|
||||||
|
p.stdin.close()
|
||||||
|
|
||||||
|
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
||||||
|
p.wait()
|
||||||
|
gpg_output = p.stderr.read()
|
||||||
|
gpg_rcode = p.returncode
|
||||||
|
|
||||||
|
return gpg_status, gpg_output, gpg_rcode
|
||||||
|
|
||||||
|
|
||||||
|
def check_sep(data, sign, GNUPG_DIRECTORY, keyserver):
|
||||||
gpg_output = ""
|
gpg_output = ""
|
||||||
gpg_status = []
|
gpg_status = []
|
||||||
gpg_rcode = None
|
gpg_rcode = None
|
||||||
@ -54,31 +78,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY):
|
|||||||
f.write(sign)
|
f.write(sign)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
with subprocess.Popen(["gpg",
|
gpg_status, gpg_output, gpg_rcode = check_sign("--verify", data if isinstance(data, bytes) else data.as_bytes(), f.name, GNUPG_DIRECTORY, keyserver)
|
||||||
"--homedir=" + GNUPG_DIRECTORY,
|
|
||||||
"--status-fd=1",
|
|
||||||
"--auto-key-retrieve",
|
|
||||||
"--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver",
|
|
||||||
"--keyserver=pool.sks-keyservers.net",
|
|
||||||
"--quiet",
|
|
||||||
"--batch",
|
|
||||||
"--verify",
|
|
||||||
f.name,
|
|
||||||
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
||||||
if isinstance(data, bytes):
|
|
||||||
bdata = data
|
|
||||||
else:
|
|
||||||
bdata = data.as_bytes()
|
|
||||||
if not bdata.find(b'\r\n') >= 0:
|
|
||||||
bdata.replace(b'\n', b'\r\n') # Windows hack
|
|
||||||
p.stdin.write(bdata)
|
|
||||||
p.stdin.close()
|
|
||||||
|
|
||||||
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
|
||||||
p.wait()
|
|
||||||
gpg_output = p.stderr.read()
|
|
||||||
gpg_rcode = p.returncode
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
yield MailTest("An error occured: %s" % e, 1)
|
yield MailTest("An error occured: %s" % e, 1)
|
||||||
return
|
return
|
||||||
@ -88,7 +88,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY):
|
|||||||
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
|
yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace'))
|
||||||
|
|
||||||
|
|
||||||
def check_merged(bdata, GNUPG_DIRECTORY):
|
def check_merged(bdata, GNUPG_DIRECTORY, keyserver):
|
||||||
f = tempfile.NamedTemporaryFile()
|
f = tempfile.NamedTemporaryFile()
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
@ -96,27 +96,7 @@ def check_merged(bdata, GNUPG_DIRECTORY):
|
|||||||
gpg_status = []
|
gpg_status = []
|
||||||
gpg_rcode = None
|
gpg_rcode = None
|
||||||
try:
|
try:
|
||||||
with subprocess.Popen(["gpg",
|
gpg_status, gpg_output, gpg_rcode = check_sign("--output", bdata, f.name, GNUPG_DIRECTORY, keyserver)
|
||||||
"--homedir=" + GNUPG_DIRECTORY,
|
|
||||||
"--status-fd=1",
|
|
||||||
"--auto-key-retrieve",
|
|
||||||
"--auto-key-locate=clear,local,pka,dane,cert,keyserver",
|
|
||||||
"--keyserver=pool.sks-keyservers.net",
|
|
||||||
"--quiet",
|
|
||||||
"--batch",
|
|
||||||
"--output",
|
|
||||||
f.name,
|
|
||||||
"-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
|
|
||||||
#if not bdata.find('\r\n') >= 0:
|
|
||||||
# bdata = bdata.replace('\n', '\r\n') # Windows hack
|
|
||||||
p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata)
|
|
||||||
p.stdin.close()
|
|
||||||
|
|
||||||
gpg_status = [l for l in gpg_status_parser.parse(p.stdout)]
|
|
||||||
p.wait()
|
|
||||||
gpg_output = p.stderr.read()
|
|
||||||
gpg_rcode = p.returncode
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
yield MailTest("An error occured: %s" % e, 1)
|
yield MailTest("An error occured: %s" % e, 1)
|
||||||
return
|
return
|
||||||
|
Reference in New Issue
Block a user