"""NginxParser is a member object of the NginxConfigurator class."""
import copy
import functools
import glob
import logging
import os
import pyparsing
import re
import six
from certbot import errors
from certbot_nginx import obj
from certbot_nginx import nginxparser
from acme.magic_typing import Union, Dict, Set, Any, List, Tuple # pylint: disable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
[docs]class NginxParser(object):
"""Class handles the fine details of parsing the Nginx Configuration.
:ivar str root: Normalized absolute path to the server root
directory. Without trailing slash.
:ivar dict parsed: Mapping of file paths to parsed trees
"""
def __init__(self, root):
self.parsed = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]]
self.root = os.path.abspath(root)
self.config_root = self._find_config_root()
# Parse nginx.conf and included files.
# TODO: Check sites-available/ as well. For now, the configurator does
# not enable sites from there.
self.load()
[docs] def load(self):
"""Loads Nginx files into a parsed tree.
"""
self.parsed = {}
self._parse_recursively(self.config_root)
[docs] def _parse_recursively(self, filepath):
"""Parses nginx config files recursively by looking at 'include'
directives inside 'http' and 'server' blocks. Note that this only
reads Nginx files that potentially declare a virtual host.
:param str filepath: The path to the files to parse, as a glob
"""
filepath = self.abs_path(filepath)
trees = self._parse_files(filepath)
for tree in trees:
for entry in tree:
if _is_include_directive(entry):
# Parse the top-level included file
self._parse_recursively(entry[1])
elif entry[0] == ['http'] or entry[0] == ['server']:
# Look for includes in the top-level 'http'/'server' context
for subentry in entry[1]:
if _is_include_directive(subentry):
self._parse_recursively(subentry[1])
elif entry[0] == ['http'] and subentry[0] == ['server']:
# Look for includes in a 'server' context within
# an 'http' context
for server_entry in subentry[1]:
if _is_include_directive(server_entry):
self._parse_recursively(server_entry[1])
[docs] def abs_path(self, path):
"""Converts a relative path to an absolute path relative to the root.
Does nothing for paths that are already absolute.
:param str path: The path
:returns: The absolute path
:rtype: str
"""
if not os.path.isabs(path):
return os.path.join(self.root, path)
else:
return path
[docs] def _build_addr_to_ssl(self):
"""Builds a map from address to whether it listens on ssl in any server block
"""
servers = self._get_raw_servers()
addr_to_ssl = {} # type: Dict[Tuple[str, str], bool]
for filename in servers:
for server, _ in servers[filename]:
# Parse the server block to save addr info
parsed_server = _parse_server_raw(server)
for addr in parsed_server['addrs']:
addr_tuple = addr.normalized_tuple()
if addr_tuple not in addr_to_ssl:
addr_to_ssl[addr_tuple] = addr.ssl
addr_to_ssl[addr_tuple] = addr.ssl or addr_to_ssl[addr_tuple]
return addr_to_ssl
[docs] def _get_raw_servers(self):
# pylint: disable=cell-var-from-loop
# type: () -> Dict
"""Get a map of unparsed all server blocks
"""
servers = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]]
for filename in self.parsed:
tree = self.parsed[filename]
servers[filename] = []
srv = servers[filename] # workaround undefined loop var in lambdas
# Find all the server blocks
_do_for_subarray(tree, lambda x: len(x) >= 2 and x[0] == ['server'],
lambda x, y: srv.append((x[1], y)))
# Find 'include' statements in server blocks and append their trees
for i, (server, path) in enumerate(servers[filename]):
new_server = self._get_included_directives(server)
servers[filename][i] = (new_server, path)
return servers
[docs] def get_vhosts(self):
# pylint: disable=cell-var-from-loop
"""Gets list of all 'virtual hosts' found in Nginx configuration.
Technically this is a misnomer because Nginx does not have virtual
hosts, it has 'server blocks'.
:returns: List of :class:`~certbot_nginx.obj.VirtualHost`
objects found in configuration
:rtype: list
"""
enabled = True # We only look at enabled vhosts for now
servers = self._get_raw_servers()
vhosts = []
for filename in servers:
for server, path in servers[filename]:
# Parse the server block into a VirtualHost object
parsed_server = _parse_server_raw(server)
vhost = obj.VirtualHost(filename,
parsed_server['addrs'],
parsed_server['ssl'],
enabled,
parsed_server['names'],
server,
path)
vhosts.append(vhost)
self._update_vhosts_addrs_ssl(vhosts)
return vhosts
[docs] def _update_vhosts_addrs_ssl(self, vhosts):
"""Update a list of raw parsed vhosts to include global address sslishness
"""
addr_to_ssl = self._build_addr_to_ssl()
for vhost in vhosts:
for addr in vhost.addrs:
addr.ssl = addr_to_ssl[addr.normalized_tuple()]
if addr.ssl:
vhost.ssl = True
[docs] def _get_included_directives(self, block):
"""Returns array with the "include" directives expanded out by
concatenating the contents of the included file to the block.
:param list block:
:rtype: list
"""
result = copy.deepcopy(block) # Copy the list to keep self.parsed idempotent
for directive in block:
if _is_include_directive(directive):
included_files = glob.glob(
self.abs_path(directive[1]))
for incl in included_files:
try:
result.extend(self.parsed[incl])
except KeyError:
pass
return result
[docs] def _parse_files(self, filepath, override=False):
"""Parse files from a glob
:param str filepath: Nginx config file path
:param bool override: Whether to parse a file that has been parsed
:returns: list of parsed tree structures
:rtype: list
"""
files = glob.glob(filepath) # nginx on unix calls glob(3) for this
# XXX Windows nginx uses FindFirstFile, and
# should have a narrower call here
trees = []
for item in files:
if item in self.parsed and not override:
continue
try:
with open(item) as _file:
parsed = nginxparser.load(_file)
self.parsed[item] = parsed
trees.append(parsed)
except IOError:
logger.warning("Could not open file: %s", item)
except pyparsing.ParseException as err:
logger.debug("Could not parse file: %s due to %s", item, err)
return trees
[docs] def _find_config_root(self):
"""Return the Nginx Configuration Root file."""
location = ['nginx.conf']
for name in location:
if os.path.isfile(os.path.join(self.root, name)):
return os.path.join(self.root, name)
raise errors.NoInstallationError(
"Could not find Nginx root configuration file (nginx.conf)")
[docs] def filedump(self, ext='tmp', lazy=True):
"""Dumps parsed configurations into files.
:param str ext: The file extension to use for the dumped files. If
empty, this overrides the existing conf files.
:param bool lazy: Only write files that have been modified
"""
# Best-effort atomicity is enforced above us by reverter.py
for filename in self.parsed:
tree = self.parsed[filename]
if ext:
filename = filename + os.path.extsep + ext
try:
if lazy and not tree.is_dirty():
continue
out = nginxparser.dumps(tree)
logger.debug('Writing nginx conf tree to %s:\n%s', filename, out)
with open(filename, 'w') as _file:
_file.write(out)
except IOError:
logger.error("Could not open file for writing: %s", filename)
[docs] def parse_server(self, server):
"""Parses a list of server directives, accounting for global address sslishness.
:param list server: list of directives in a server block
:rtype: dict
"""
addr_to_ssl = self._build_addr_to_ssl()
parsed_server = _parse_server_raw(server)
_apply_global_addr_ssl(addr_to_ssl, parsed_server)
return parsed_server
[docs] def has_ssl_on_directive(self, vhost):
"""Does vhost have ssl on for all ports?
:param :class:`~certbot_nginx.obj.VirtualHost` vhost: The vhost in question
:returns: True if 'ssl on' directive is included
:rtype: bool
"""
server = vhost.raw
for directive in server:
if not directive:
continue
elif _is_ssl_on_directive(directive):
return True
return False
[docs] def add_server_directives(self, vhost, directives, insert_at_top=False):
"""Add directives to the server block identified by vhost.
This method modifies vhost to be fully consistent with the new directives.
..note :: It's an error to try and add a nonrepeatable directive that already
exists in the config block with a conflicting value.
..todo :: Doesn't match server blocks whose server_name directives are
split across multiple conf files.
:param :class:`~certbot_nginx.obj.VirtualHost` vhost: The vhost
whose information we use to match on
:param list directives: The directives to add
:param bool insert_at_top: True if the directives need to be inserted at the top
of the server block instead of the bottom
"""
self._modify_server_directives(vhost,
functools.partial(_add_directives, directives, insert_at_top))
[docs] def update_or_add_server_directives(self, vhost, directives, insert_at_top=False):
"""Add or replace directives in the server block identified by vhost.
This method modifies vhost to be fully consistent with the new directives.
..note :: When a directive with the same name already exists in the
config block, the first instance will be replaced. Otherwise, the directive
will be appended/prepended to the config block as in add_server_directives.
..todo :: Doesn't match server blocks whose server_name directives are
split across multiple conf files.
:param :class:`~certbot_nginx.obj.VirtualHost` vhost: The vhost
whose information we use to match on
:param list directives: The directives to add
:param bool insert_at_top: True if the directives need to be inserted at the top
of the server block instead of the bottom
"""
self._modify_server_directives(vhost,
functools.partial(_update_or_add_directives, directives, insert_at_top))
[docs] def remove_server_directives(self, vhost, directive_name, match_func=None):
"""Remove all directives of type directive_name.
:param :class:`~certbot_nginx.obj.VirtualHost` vhost: The vhost
to remove directives from
:param string directive_name: The directive type to remove
:param callable match_func: Function of the directive that returns true for directives
to be deleted.
"""
self._modify_server_directives(vhost,
functools.partial(_remove_directives, directive_name, match_func))
def _update_vhost_based_on_new_directives(self, vhost, directives_list):
new_server = self._get_included_directives(directives_list)
parsed_server = self.parse_server(new_server)
vhost.addrs = parsed_server['addrs']
vhost.ssl = parsed_server['ssl']
vhost.names = parsed_server['names']
vhost.raw = new_server
def _modify_server_directives(self, vhost, block_func):
filename = vhost.filep
try:
result = self.parsed[filename]
for index in vhost.path:
result = result[index]
if not isinstance(result, list) or len(result) != 2:
raise errors.MisconfigurationError("Not a server block.")
result = result[1]
block_func(result)
self._update_vhost_based_on_new_directives(vhost, result)
except errors.MisconfigurationError as err:
raise errors.MisconfigurationError("Problem in %s: %s" % (filename, str(err)))
[docs] def duplicate_vhost(self, vhost_template, remove_singleton_listen_params=False,
only_directives=None):
"""Duplicate the vhost in the configuration files.
:param :class:`~certbot_nginx.obj.VirtualHost` vhost_template: The vhost
whose information we copy
:param bool remove_singleton_listen_params: If we should remove parameters
from listen directives in the block that can only be used once per address
:param list only_directives: If it exists, only duplicate the named directives. Only
looks at first level of depth; does not expand includes.
:returns: A vhost object for the newly created vhost
:rtype: :class:`~certbot_nginx.obj.VirtualHost`
"""
# TODO: https://github.com/certbot/certbot/issues/5185
# put it in the same file as the template, at the same level
new_vhost = copy.deepcopy(vhost_template)
enclosing_block = self.parsed[vhost_template.filep]
for index in vhost_template.path[:-1]:
enclosing_block = enclosing_block[index]
raw_in_parsed = copy.deepcopy(enclosing_block[vhost_template.path[-1]])
if only_directives is not None:
new_directives = nginxparser.UnspacedList([])
for directive in raw_in_parsed[1]:
if len(directive) > 0 and directive[0] in only_directives:
new_directives.append(directive)
raw_in_parsed[1] = new_directives
self._update_vhost_based_on_new_directives(new_vhost, new_directives)
enclosing_block.append(raw_in_parsed)
new_vhost.path[-1] = len(enclosing_block) - 1
if remove_singleton_listen_params:
for addr in new_vhost.addrs:
addr.default = False
addr.ipv6only = False
for directive in enclosing_block[new_vhost.path[-1]][1]:
if len(directive) > 0 and directive[0] == 'listen':
# Exclude one-time use parameters which will cause an error if repeated.
# https://nginx.org/en/docs/http/ngx_http_core_module.html#listen
exclude = set(('default_server', 'default', 'setfib', 'fastopen', 'backlog',
'rcvbuf', 'sndbuf', 'accept_filter', 'deferred', 'bind',
'ipv6only', 'reuseport', 'so_keepalive'))
for param in exclude:
# See: github.com/certbot/certbot/pull/6223#pullrequestreview-143019225
keys = [x.split('=')[0] for x in directive]
if param in keys:
del directive[keys.index(param)]
return new_vhost
def _parse_ssl_options(ssl_options):
if ssl_options is not None:
try:
with open(ssl_options) as _file:
return nginxparser.load(_file)
except IOError:
logger.warning("Missing NGINX TLS options file: %s", ssl_options)
except pyparsing.ParseBaseException as err:
logger.debug("Could not parse file: %s due to %s", ssl_options, err)
return []
[docs]def _do_for_subarray(entry, condition, func, path=None):
"""Executes a function for a subarray of a nested array if it matches
the given condition.
:param list entry: The list to iterate over
:param function condition: Returns true iff func should be executed on item
:param function func: The function to call for each matching item
"""
if path is None:
path = []
if isinstance(entry, list):
if condition(entry):
func(entry, path)
else:
for index, item in enumerate(entry):
_do_for_subarray(item, condition, func, path + [index])
[docs]def get_best_match(target_name, names):
"""Finds the best match for target_name out of names using the Nginx
name-matching rules (exact > longest wildcard starting with * >
longest wildcard ending with * > regex).
:param str target_name: The name to match
:param set names: The candidate server names
:returns: Tuple of (type of match, the name that matched)
:rtype: tuple
"""
exact = []
wildcard_start = []
wildcard_end = []
regex = []
for name in names:
if _exact_match(target_name, name):
exact.append(name)
elif _wildcard_match(target_name, name, True):
wildcard_start.append(name)
elif _wildcard_match(target_name, name, False):
wildcard_end.append(name)
elif _regex_match(target_name, name):
regex.append(name)
if len(exact) > 0:
# There can be more than one exact match; e.g. eff.org, .eff.org
match = min(exact, key=len)
return ('exact', match)
if len(wildcard_start) > 0:
# Return the longest wildcard
match = max(wildcard_start, key=len)
return ('wildcard_start', match)
if len(wildcard_end) > 0:
# Return the longest wildcard
match = max(wildcard_end, key=len)
return ('wildcard_end', match)
if len(regex) > 0:
# Just return the first one for now
match = regex[0]
return ('regex', match)
return (None, None)
def _exact_match(target_name, name):
return target_name == name or '.' + target_name == name
def _wildcard_match(target_name, name, start):
# Degenerate case
if name == '*':
return True
parts = target_name.split('.')
match_parts = name.split('.')
# If the domain ends in a wildcard, do the match procedure in reverse
if not start:
parts.reverse()
match_parts.reverse()
# The first part must be a wildcard or blank, e.g. '.eff.org'
first = match_parts.pop(0)
if first != '*' and first != '':
return False
target_name = '.'.join(parts)
name = '.'.join(match_parts)
# Ex: www.eff.org matches *.eff.org, eff.org does not match *.eff.org
return target_name.endswith('.' + name)
def _regex_match(target_name, name):
# Must start with a tilde
if len(name) < 2 or name[0] != '~':
return False
# After tilde is a perl-compatible regex
try:
regex = re.compile(name[1:])
if re.match(regex, target_name):
return True
else:
return False
except re.error: # pragma: no cover
# perl-compatible regexes are sometimes not recognized by python
return False
[docs]def _is_include_directive(entry):
"""Checks if an nginx parsed entry is an 'include' directive.
:param list entry: the parsed entry
:returns: Whether it's an 'include' directive
:rtype: bool
"""
return (isinstance(entry, list) and
len(entry) == 2 and entry[0] == 'include' and
isinstance(entry[1], six.string_types))
[docs]def _is_ssl_on_directive(entry):
"""Checks if an nginx parsed entry is an 'ssl on' directive.
:param list entry: the parsed entry
:returns: Whether it's an 'ssl on' directive
:rtype: bool
"""
return (isinstance(entry, list) and
len(entry) == 2 and entry[0] == 'ssl' and
entry[1] == 'on')
[docs]def _add_directives(directives, insert_at_top, block):
"""Adds directives to a config block."""
for directive in directives:
_add_directive(block, directive, insert_at_top)
if block and '\n' not in block[-1]: # could be " \n " or ["\n"] !
block.append(nginxparser.UnspacedList('\n'))
[docs]def _update_or_add_directives(directives, insert_at_top, block):
"""Adds or replaces directives in a config block."""
for directive in directives:
_update_or_add_directive(block, directive, insert_at_top)
if block and '\n' not in block[-1]: # could be " \n " or ["\n"] !
block.append(nginxparser.UnspacedList('\n'))
INCLUDE = 'include'
REPEATABLE_DIRECTIVES = set(['server_name', 'listen', INCLUDE, 'rewrite', 'add_header'])
COMMENT = ' managed by Certbot'
COMMENT_BLOCK = [' ', '#', COMMENT]
[docs]def _find_location(block, directive_name, match_func=None):
"""Finds the index of the first instance of directive_name in block.
If no line exists, use None."""
return next((index for index, line in enumerate(block) \
if line and line[0] == directive_name and (match_func is None or match_func(line))), None)
def _add_directive(block, directive, insert_at_top):
if not isinstance(directive, nginxparser.UnspacedList):
directive = nginxparser.UnspacedList(directive)
if _is_whitespace_or_comment(directive):
# whitespace or comment
block.append(directive)
return
location = _find_location(block, directive[0])
# Append or prepend directive. Fail if the name is not a repeatable directive name,
# and there is already a copy of that directive with a different value
# in the config file.
# handle flat include files
directive_name = directive[0]
def can_append(loc, dir_name):
""" Can we append this directive to the block? """
return loc is None or (isinstance(dir_name, six.string_types)
and dir_name in REPEATABLE_DIRECTIVES)
err_fmt = 'tried to insert directive "{0}" but found conflicting "{1}".'
# Give a better error message about the specific directive than Nginx's "fail to restart"
if directive_name == INCLUDE:
# in theory, we might want to do this recursively, but in practice, that's really not
# necessary because we know what file we're talking about (and if we don't recurse, we
# just give a worse error message)
included_directives = _parse_ssl_options(directive[1])
for included_directive in included_directives:
included_dir_loc = _find_location(block, included_directive[0])
included_dir_name = included_directive[0]
if not _is_whitespace_or_comment(included_directive) \
and not can_append(included_dir_loc, included_dir_name):
if block[included_dir_loc] != included_directive:
raise errors.MisconfigurationError(err_fmt.format(included_directive,
block[included_dir_loc]))
else:
_comment_out_directive(block, included_dir_loc, directive[1])
if can_append(location, directive_name):
if insert_at_top:
# Add a newline so the comment doesn't comment
# out existing directives
block.insert(0, nginxparser.UnspacedList('\n'))
block.insert(0, directive)
comment_directive(block, 0)
else:
block.append(directive)
comment_directive(block, len(block) - 1)
elif block[location] != directive:
raise errors.MisconfigurationError(err_fmt.format(directive, block[location]))
def _update_directive(block, directive, location):
block[location] = directive
comment_directive(block, location)
def _update_or_add_directive(block, directive, insert_at_top):
if not isinstance(directive, nginxparser.UnspacedList):
directive = nginxparser.UnspacedList(directive)
if _is_whitespace_or_comment(directive):
# whitespace or comment
block.append(directive)
return
location = _find_location(block, directive[0])
# we can update directive
if location is not None:
_update_directive(block, directive, location)
return
_add_directive(block, directive, insert_at_top)
def _is_certbot_comment(directive):
return '#' in directive and COMMENT in directive
[docs]def _remove_directives(directive_name, match_func, block):
"""Removes directives of name directive_name from a config block if match_func matches.
"""
while True:
location = _find_location(block, directive_name, match_func=match_func)
if location is None:
return
# if the directive was made by us, remove the comment following
if location + 1 < len(block) and _is_certbot_comment(block[location + 1]):
del block[location + 1]
del block[location]
[docs]def _apply_global_addr_ssl(addr_to_ssl, parsed_server):
"""Apply global sslishness information to the parsed server block
"""
for addr in parsed_server['addrs']:
addr.ssl = addr_to_ssl[addr.normalized_tuple()]
if addr.ssl:
parsed_server['ssl'] = True
[docs]def _parse_server_raw(server):
"""Parses a list of server directives.
:param list server: list of directives in a server block
:rtype: dict
"""
addrs = set() # type: Set[obj.Addr]
ssl = False # type: bool
names = set() # type: Set[str]
apply_ssl_to_all_addrs = False
for directive in server:
if not directive:
continue
if directive[0] == 'listen':
addr = obj.Addr.fromstring(" ".join(directive[1:]))
if addr:
addrs.add(addr)
if addr.ssl:
ssl = True
elif directive[0] == 'server_name':
names.update(x.strip('"\'') for x in directive[1:])
elif _is_ssl_on_directive(directive):
ssl = True
apply_ssl_to_all_addrs = True
if apply_ssl_to_all_addrs:
for addr in addrs:
addr.ssl = True
return {
'addrs': addrs,
'ssl': ssl,
'names': names
}