From 2d5c4503efd03aa33ee138a8cd13ef9e2523bc73 Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Sun, 19 Sep 2021 15:41:48 +0200 Subject: [PATCH] Refactor signature checking --- signature.py | 84 ++++++++++++++++++++-------------------------------- 1 file changed, 32 insertions(+), 52 deletions(-) diff --git a/signature.py b/signature.py index 6d62ac4..ec0419b 100644 --- a/signature.py +++ b/signature.py @@ -39,13 +39,37 @@ def verify_sign(data, gpg_rcode, gpg_status, gpg_output=""): def check(cnt, GNUPG_DIRECTORY): - if len(cnt) == 2: - yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY) - else: - yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY) + for server in ["pool.sks-keyservers.net", "keys.openpgp.org"]: + if len(cnt) == 2: + yield from check_sep(*cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server) + else: + yield from check_merged(cnt, GNUPG_DIRECTORY=GNUPG_DIRECTORY, keyserver=server) -def check_sep(data, sign, GNUPG_DIRECTORY): +def check_sign(cmd, bdata, fname, GNUPG_DIRECTORY, keyserver, windows_hack=False): + with subprocess.Popen(["gpg", + "--homedir=" + GNUPG_DIRECTORY, + "--status-fd=1", + "--auto-key-retrieve", + "--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver", + "--keyserver=" + keyserver, + "--quiet", + "--batch", + cmd, + fname, + "-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + p.stdin.write(bdata) + p.stdin.close() + + gpg_status = [l for l in gpg_status_parser.parse(p.stdout)] + p.wait() + gpg_output = p.stderr.read() + gpg_rcode = p.returncode + + return gpg_status, gpg_output, gpg_rcode + + +def check_sep(data, sign, GNUPG_DIRECTORY, keyserver): gpg_output = "" gpg_status = [] gpg_rcode = None @@ -54,31 +78,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY): f.write(sign) f.close() - with subprocess.Popen(["gpg", - "--homedir=" + GNUPG_DIRECTORY, - "--status-fd=1", - "--auto-key-retrieve", - "--auto-key-locate=clear,local,pka,dane,wkd,cert,keyserver", - "--keyserver=pool.sks-keyservers.net", - "--quiet", - "--batch", - "--verify", - f.name, - "-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: - if isinstance(data, bytes): - bdata = data - else: - bdata = data.as_bytes() - if not bdata.find(b'\r\n') >= 0: - bdata.replace(b'\n', b'\r\n') # Windows hack - p.stdin.write(bdata) - p.stdin.close() - - gpg_status = [l for l in gpg_status_parser.parse(p.stdout)] - p.wait() - gpg_output = p.stderr.read() - gpg_rcode = p.returncode - + gpg_status, gpg_output, gpg_rcode = check_sign("--verify", data if isinstance(data, bytes) else data.as_bytes(), f.name, GNUPG_DIRECTORY, keyserver) except Exception as e: yield MailTest("An error occured: %s" % e, 1) return @@ -88,7 +88,7 @@ def check_sep(data, sign, GNUPG_DIRECTORY): yield from verify_sign(data, gpg_rcode, gpg_status, gpg_output.decode('utf-8', 'replace')) -def check_merged(bdata, GNUPG_DIRECTORY): +def check_merged(bdata, GNUPG_DIRECTORY, keyserver): f = tempfile.NamedTemporaryFile() f.close() @@ -96,27 +96,7 @@ def check_merged(bdata, GNUPG_DIRECTORY): gpg_status = [] gpg_rcode = None try: - with subprocess.Popen(["gpg", - "--homedir=" + GNUPG_DIRECTORY, - "--status-fd=1", - "--auto-key-retrieve", - "--auto-key-locate=clear,local,pka,dane,cert,keyserver", - "--keyserver=pool.sks-keyservers.net", - "--quiet", - "--batch", - "--output", - f.name, - "-"], env={"LANG": 'en_US.UTF-8'}, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: - #if not bdata.find('\r\n') >= 0: - # bdata = bdata.replace('\n', '\r\n') # Windows hack - p.stdin.write(bdata.encode() if isinstance(bdata, str) else bdata) - p.stdin.close() - - gpg_status = [l for l in gpg_status_parser.parse(p.stdout)] - p.wait() - gpg_output = p.stderr.read() - gpg_rcode = p.returncode - + gpg_status, gpg_output, gpg_rcode = check_sign("--output", bdata, f.name, GNUPG_DIRECTORY, keyserver) except Exception as e: yield MailTest("An error occured: %s" % e, 1) return