#!/usr/bin/python import argparse from cryptography import x509 from datetime import datetime from datetime import timezone import os import re import sys from typing import List class CertCheck(): def __init__(self): self.bundle: List[x509.Certificate] = [] self.cert: List[x509.Certificate] = [] self.bundle_issuers = {} self.altnames = [] self.exitstatus = 0 self.now_utc = datetime.now(timezone.utc) self.errors = {"WARNING": [], "CRITICAL": []} self.parser = argparse.ArgumentParser( description="`certcheck` performs some validations on \ Certificate `PEM` files." ) self.parser.add_argument( '-c', '--cert', help='Signed certificate file path.', type=self.valid_path, # required=True, ) self.parser.add_argument( '-b', '--bundle', help='CA Bundle file path.', type=self.valid_path, # required=True, ) self.parser.add_argument( '-s', '--servername', help='Hostname.', # required=True, ) self.args = self.parser.parse_args() if self.args.cert: self.cert = self.get_certificates(self.args.cert) self.get_altnames() if self.args.bundle: self.bundle = self.get_certificates(self.args.bundle) self.get_bundle_issuers() def valid_path(self, path): if os.path.exists(path): return path else: raise argparse.ArgumentTypeError(f"not a valid path: {path!r}") # GET METHODS def get_certificates_deprecated(self, certpath): certs = [] start = '-----BEGIN CERTIFICATE-----' end = '-----END CERTIFICATE-----' with open(certpath, encoding="utf-8") as f: pems = re.findall(f'({start}.*?{end})', f.read(), re.DOTALL) for pem in pems: certs.append(x509.load_pem_x509_certificate( bytes(pem, encoding="utf-8") )) return certs def get_altnames(self): cert = self.cert[0] try: ext = cert.extensions.get_extension_for_oid( x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) self.altnames = ext.value.get_values_for_type(x509.DNSName) except x509.extensions.ExtensionNotFound: self.altnames = [] def get_certificates(self, certpath): with open(certpath, encoding="utf-8") as f: return x509.load_pem_x509_certificates( bytes(f.read(), encoding="utf-8") ) def get_bundle_issuers(self): for b in self.bundle: self.bundle_issuers[self.to_str(b.subject)] = { "issuer": self.to_str(b.issuer), "from": b.not_valid_before_utc, "to": b.not_valid_after_utc, } # MISC def to_str(self, obj): return obj.get_attributes_for_oid( oid=x509.NameOID.COMMON_NAME )[0].value def add_error(self, sev, msg): self.errors[sev].append(msg) def exit(self): if len(self.errors['WARNING']) > 0: self.exitstatus = 1 if len(self.errors['CRITICAL']) > 0: self.exitstatus = 2 for sev in self.errors.keys(): if len(self.errors[sev]) > 0: print() print('#' * 80) print(f"# {sev}") print('#' * 80) for msg in self.errors[sev]: print(f" - {msg}") if self.exitstatus == 0: print("All validation tests completed successfully.") sys.exit(self.exitstatus) def show_cert(self): print(f"### Certificate `{self.args.cert}`") for cert in self.cert: print(f" - Subject: {self.to_str(cert.subject)}") print(f" - AltNames: \n - " + "\n - ".join(self.altnames)) # print(f" - AltNames: \n - {("\n - ").join(self.altnames)}") print(f" - Issuer: {self.to_str(cert.issuer)}") print(f" - Valid from: {cert.not_valid_before_utc}") print(f" - Valid to: {cert.not_valid_after_utc}") def show_bundle(self): print(f"### Bundle `{self.args.bundle}`") print(" - Issuers:") for subject, data in self.bundle_issuers.items(): print(f" - {subject}") print(f" - Issuer: {data['issuer']}") print(f" - Valid from: {data['from']}") print(f" - Valid to: {data['to']}") # TESTS def test_cert_count(self): if len(self.cert) != 1: self.add_error("WARNING", f"Certificate `{self.args.cert}` contains multiple " + f"certificates ({len(self.cert)}). " + " Tests will check ONLY the first one.") def test_valid_after_utc(self, c): if c.not_valid_after_utc < self.now_utc: self.add_error("CRITICAL", f"Subject `{self.to_str(c.subject)}` expired " + f"on `{c.not_valid_after_utc}`.") def test_valid_before_utc(self, c): if c.not_valid_before_utc > self.now_utc: self.add_error("CRITICAL", f"Subject `{self.to_str(c.subject)}` is not " + f"valid until `{c.not_valid_before_utc}`.") def test_chain_broken(self, c): if self.to_str(c.issuer) not in self.bundle_issuers.keys(): self.add_error("CRITICAL", f"Chain is broken. `{self.to_str(c.issuer)}` " + f"for Subject `{self.to_str(c.subject)}` " + "is missing.") def test_servername_in_altnames(self, servername): if servername not in self.altnames: self.add_error("CRITICAL", f"Alternative Names `{self.altnames}` don't include" + f"`{servername}`.") def test_alnames_defined(self, c): if len(self.altnames) == 0: self.add_error("CRITICAL", "Mandatory `Alternative Names` is not defined.") # MAIN def main(self): if self.args.cert: self.show_cert() self.test_cert_count() cert = self.cert[0] self.test_valid_after_utc(cert) self.test_valid_before_utc(cert) self.test_alnames_defined(cert) self.test_servername_in_altnames(self.to_str(cert.subject)) if self.args.servername: self.test_servername_in_altnames(self.args.servername) if self.args.bundle: self.test_chain_broken(cert) if self.args.bundle: self.show_bundle() for b in self.bundle: self.test_valid_after_utc(b) self.test_valid_before_utc(b) for b in self.bundle: self.test_chain_broken(b) self.exit() CertCheck().main()