Viewing file: cert_manager.py (15.78 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""Tools for managing certificates.""" import datetime import logging import re import traceback from typing import List
import pytz
from certbot import crypto_util from certbot import errors from certbot import ocsp from certbot import util from certbot._internal import storage from certbot.compat import os from certbot.display import util as display_util
logger = logging.getLogger(__name__)
################### # Commands ###################
def update_live_symlinks(config): """Update the certificate file family symlinks to use archive_dir.
Use the information in the config file to make symlinks point to the correct archive directory.
.. note:: This assumes that the installation is using a Reverter object.
:param config: Configuration. :type config: :class:`certbot._internal.configuration.NamespaceConfig`
""" for renewal_file in storage.renewal_conf_files(config): storage.RenewableCert(renewal_file, config, update_symlinks=True)
def rename_lineage(config): """Rename the specified lineage to the new name.
:param config: Configuration. :type config: :class:`certbot._internal.configuration.NamespaceConfig`
""" certname = get_certnames(config, "rename")[0]
new_certname = config.new_certname if not new_certname: code, new_certname = display_util.input_text( "Enter the new name for certificate {0}".format(certname), force_interactive=True) if code != display_util.OK or not new_certname: raise errors.Error("User ended interaction.")
lineage = lineage_for_certname(config, certname) if not lineage: raise errors.ConfigurationError("No existing certificate with name " "{0} found.".format(certname)) storage.rename_renewal_config(certname, new_certname, config) display_util.notification("Successfully renamed {0} to {1}." .format(certname, new_certname), pause=False)
def certificates(config): """Display information about certs configured with Certbot
:param config: Configuration. :type config: :class:`certbot._internal.configuration.NamespaceConfig` """ parsed_certs = [] parse_failures = [] for renewal_file in storage.renewal_conf_files(config): try: renewal_candidate = storage.RenewableCert(renewal_file, config) crypto_util.verify_renewable_cert(renewal_candidate) parsed_certs.append(renewal_candidate) except Exception as e: # pylint: disable=broad-except logger.warning("Renewal configuration file %s produced an " "unexpected error: %s. Skipping.", renewal_file, e) logger.debug("Traceback was:\n%s", traceback.format_exc()) parse_failures.append(renewal_file)
# Describe all the certs _describe_certs(config, parsed_certs, parse_failures)
def delete(config): """Delete Certbot files associated with a certificate lineage.""" certnames = get_certnames(config, "delete", allow_multiple=True) msg = ["The following certificate(s) are selected for deletion:\n"] for certname in certnames: msg.append(" * " + certname) msg.append( "\nWARNING: Before continuing, ensure that the listed certificates are not being used " "by any installed server software (e.g. Apache, nginx, mail servers). Deleting a " "certificate that is still being used will cause the server software to stop working. " "See https://certbot.org/deleting-certs for information on deleting certificates safely." ) msg.append("\nAre you sure you want to delete the above certificate(s)?") if not display_util.yesno("\n".join(msg), default=True): logger.info("Deletion of certificate(s) canceled.") return for certname in certnames: storage.delete_files(config, certname) display_util.notify("Deleted all files relating to certificate {0}." .format(certname))
################### # Public Helpers ###################
def lineage_for_certname(cli_config, certname): """Find a lineage object with name certname.""" configs_dir = cli_config.renewal_configs_dir # Verify the directory is there util.make_or_verify_dir(configs_dir, mode=0o755) try: renewal_file = storage.renewal_file_for_certname(cli_config, certname) except errors.CertStorageError: return None try: return storage.RenewableCert(renewal_file, cli_config) except (errors.CertStorageError, IOError): logger.debug("Renewal conf file %s is broken.", renewal_file) logger.debug("Traceback was:\n%s", traceback.format_exc()) return None
def domains_for_certname(config, certname): """Find the domains in the cert with name certname.""" lineage = lineage_for_certname(config, certname) return lineage.names() if lineage else None
def find_duplicative_certs(config, domains): """Find existing certs that match the given domain names.
This function searches for certificates whose domains are equal to the `domains` parameter and certificates whose domains are a subset of the domains in the `domains` parameter. If multiple certificates are found whose names are a subset of `domains`, the one whose names are the largest subset of `domains` is returned.
If multiple certificates' domains are an exact match or equally sized subsets, which matching certificates are returned is undefined.
:param config: Configuration. :type config: :class:`certbot._internal.configuration.NamespaceConfig` :param domains: List of domain names :type domains: `list` of `str`
:returns: lineages representing the identically matching cert and the largest subset if they exist :rtype: `tuple` of `storage.RenewableCert` or `None`
""" def update_certs_for_domain_matches(candidate_lineage, rv): """Return cert as identical_names_cert if it matches, or subset_names_cert if it matches as subset """ # TODO: Handle these differently depending on whether they are # expired or still valid? identical_names_cert, subset_names_cert = rv candidate_names = set(candidate_lineage.names()) if candidate_names == set(domains): identical_names_cert = candidate_lineage elif candidate_names.issubset(set(domains)): # This logic finds and returns the largest subset-names cert # in the case where there are several available. if subset_names_cert is None: subset_names_cert = candidate_lineage elif len(candidate_names) > len(subset_names_cert.names()): subset_names_cert = candidate_lineage return (identical_names_cert, subset_names_cert)
return _search_lineages(config, update_certs_for_domain_matches, (None, None))
def _archive_files(candidate_lineage, filetype): """ In order to match things like: /etc/letsencrypt/archive/example.com/chain1.pem.
Anonymous functions which call this function are eventually passed (in a list) to `match_and_check_overlaps` to help specify the acceptable_matches.
:param `.storage.RenewableCert` candidate_lineage: Lineage whose archive dir is to be searched. :param str filetype: main file name prefix e.g. "fullchain" or "chain".
:returns: Files in candidate_lineage's archive dir that match the provided filetype. :rtype: list of str or None """ archive_dir = candidate_lineage.archive_dir pattern = [os.path.join(archive_dir, f) for f in os.listdir(archive_dir) if re.match("{0}[0-9]*.pem".format(filetype), f)] if pattern: return pattern return None
def _acceptable_matches(): """ Generates the list that's passed to match_and_check_overlaps. Is its own function to make unit testing easier.
:returns: list of functions :rtype: list """ return [lambda x: x.fullchain_path, lambda x: x.cert_path, lambda x: _archive_files(x, "cert"), lambda x: _archive_files(x, "fullchain")]
def cert_path_to_lineage(cli_config): """ If config.cert_path is defined, try to find an appropriate value for config.certname.
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments
:returns: a lineage name :rtype: str
:raises `errors.Error`: If the specified cert path can't be matched to a lineage name. :raises `errors.OverlappingMatchFound`: If the matched lineage's archive is shared. """ acceptable_matches = _acceptable_matches() match = match_and_check_overlaps(cli_config, acceptable_matches, lambda x: cli_config.cert_path, lambda x: x.lineagename) return match[0]
def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func): """ Searches through all lineages for a match, and checks for duplicates. If a duplicate is found, an error is raised, as performing operations on lineages that have their properties incorrectly duplicated elsewhere is probably a bad idea.
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments :param list acceptable_matches: a list of functions that specify acceptable matches :param function match_func: specifies what to match :param function rv_func: specifies what to return
""" def find_matches(candidate_lineage, return_value, acceptable_matches): """Returns a list of matches using _search_lineages.""" acceptable_matches = [func(candidate_lineage) for func in acceptable_matches] acceptable_matches_rv: List[str] = [] for item in acceptable_matches: if isinstance(item, list): acceptable_matches_rv += item else: acceptable_matches_rv.append(item) match = match_func(candidate_lineage) if match in acceptable_matches_rv: return_value.append(rv_func(candidate_lineage)) return return_value
matched = _search_lineages(cli_config, find_matches, [], acceptable_matches) if not matched: raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path)) elif len(matched) > 1: raise errors.OverlappingMatchFound() return matched
def human_readable_cert_info(config, cert, skip_filter_checks=False): """ Returns a human readable description of info about a RenewableCert object""" certinfo = [] checker = ocsp.RevocationChecker()
if config.certname and cert.lineagename != config.certname and not skip_filter_checks: return None if config.domains and not set(config.domains).issubset(cert.names()): return None now = pytz.UTC.fromutc(datetime.datetime.utcnow())
reasons = [] if cert.is_test_cert: reasons.append('TEST_CERT') if cert.target_expiry <= now: reasons.append('EXPIRED') elif checker.ocsp_revoked(cert): reasons.append('REVOKED')
if reasons: status = "INVALID: " + ", ".join(reasons) else: diff = cert.target_expiry - now if diff.days == 1: status = "VALID: 1 day" elif diff.days < 1: status = "VALID: {0} hour(s)".format(diff.seconds // 3600) else: status = "VALID: {0} days".format(diff.days)
valid_string = "{0} ({1})".format(cert.target_expiry, status) serial = format(crypto_util.get_serial_from_cert(cert.cert_path), 'x') certinfo.append(" Certificate Name: {}\n" " Serial Number: {}\n" " Key Type: {}\n" " Domains: {}\n" " Expiry Date: {}\n" " Certificate Path: {}\n" " Private Key Path: {}".format( cert.lineagename, serial, cert.private_key_type, " ".join(cert.names()), valid_string, cert.fullchain, cert.privkey)) return "".join(certinfo)
def get_certnames(config, verb, allow_multiple=False, custom_prompt=None): """Get certname from flag, interactively, or error out.""" certname = config.certname if certname: certnames = [certname] else: filenames = storage.renewal_conf_files(config) choices = [storage.lineagename_for_filename(name) for name in filenames] if not choices: raise errors.Error("No existing certificates found.") if allow_multiple: if not custom_prompt: prompt = "Which certificate(s) would you like to {0}?".format(verb) else: prompt = custom_prompt code, certnames = display_util.checklist( prompt, choices, cli_flag="--cert-name", force_interactive=True) if code != display_util.OK: raise errors.Error("User ended interaction.") else: if not custom_prompt: prompt = "Which certificate would you like to {0}?".format(verb) else: prompt = custom_prompt
code, index = display_util.menu( prompt, choices, cli_flag="--cert-name", force_interactive=True)
if code != display_util.OK or index not in range(0, len(choices)): raise errors.Error("User ended interaction.") certnames = [choices[index]] return certnames
################### # Private Helpers ###################
def _report_lines(msgs): """Format a results report for a category of single-line renewal outcomes""" return " " + "\n ".join(str(msg) for msg in msgs)
def _report_human_readable(config, parsed_certs): """Format a results report for a parsed cert""" certinfo = [] for cert in parsed_certs: cert_info = human_readable_cert_info(config, cert) if cert_info is not None: certinfo.append(cert_info) return "\n".join(certinfo)
def _describe_certs(config, parsed_certs, parse_failures): """Print information about the certs we know about""" out: List[str] = []
notify = out.append
if not parsed_certs and not parse_failures: notify("No certificates found.") else: if parsed_certs: match = "matching " if config.certname or config.domains else "" notify("Found the following {0}certs:".format(match)) notify(_report_human_readable(config, parsed_certs)) if parse_failures: notify("\nThe following renewal configurations " "were invalid:") notify(_report_lines(parse_failures))
display_util.notification("\n".join(out), pause=False, wrap=False)
def _search_lineages(cli_config, func, initial_rv, *args): """Iterate func over unbroken lineages, allowing custom return conditions.
Allows flexible customization of return values, including multiple return values and complex checks.
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments :param function func: function used while searching over lineages :param initial_rv: initial return value of the function (any type)
:returns: Whatever was specified by `func` if a match is found. """ configs_dir = cli_config.renewal_configs_dir # Verify the directory is there util.make_or_verify_dir(configs_dir, mode=0o755)
rv = initial_rv for renewal_file in storage.renewal_conf_files(cli_config): try: candidate_lineage = storage.RenewableCert(renewal_file, cli_config) except (errors.CertStorageError, IOError): logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file) logger.debug("Traceback was:\n%s", traceback.format_exc()) continue rv = func(candidate_lineage, rv, *args) return rv
|