heat.common.policy

Source code for heat.common.policy

#
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

# Based on glance/api/policy.py
"""Policy Engine For Heat."""

from oslo_config import cfg
from oslo_log import log as logging
from oslo_policy import opts
from oslo_policy import policy
from oslo_utils import excutils

from heat.common import exception
from heat.common.i18n import _
from heat import policies


CONF = cfg.CONF
LOG = logging.getLogger(__name__)

DEFAULT_RULES = policy.Rules.from_dict({'default': '!'})
DEFAULT_RESOURCE_RULES = policy.Rules.from_dict({'default': '@'})

# TODO(gmann): Remove setting the default value of config policy_file
# once oslo_policy change the default value to 'policy.yaml'.
# https://github.com/openstack/oslo.policy/blob/a626ad12fe5a3abd49d70e3e5b95589d279ab578/oslo_policy/opts.py#L49
DEFAULT_POLICY_FILE = 'policy.yaml'
opts.set_defaults(CONF, DEFAULT_POLICY_FILE)

ENFORCER = None


[docs]class Enforcer(object): """Responsible for loading and enforcing rules.""" def __init__(self, scope='heat', exc=exception.Forbidden, default_rule=DEFAULT_RULES['default'], policy_file=None): self.scope = scope self.exc = exc self.default_rule = default_rule self.enforcer = policy.Enforcer( CONF, default_rule=default_rule, policy_file=policy_file) self.log_not_registered = True # TODO(ramishra) Remove this once remove the deprecated rules. self.enforcer.suppress_deprecation_warnings = True # register rules self.enforcer.register_defaults(policies.list_rules()) self.file_rules = self.enforcer.file_rules self.registered_rules = self.enforcer.registered_rules
[docs] def set_rules(self, rules, overwrite=True): """Create a new Rules object based on the provided dict of rules.""" rules_obj = policy.Rules(rules, self.default_rule) self.enforcer.set_rules(rules_obj, overwrite)
[docs] def load_rules(self, force_reload=False): """Set the rules found in the json file on disk.""" self.enforcer.load_rules(force_reload)
def _check(self, context, rule, target, exc, is_registered_policy=False, *args, **kwargs): """Verifies that the action is valid on the target in this context. :param context: Heat request context :param rule: String representing the action to be checked :param target: Dictionary representing the object of the action. :raises heat.common.exception.Forbidden: When permission is denied (or self.exc if supplied). :returns: A non-False value if access is allowed. """ do_raise = False if not exc else True credentials = context.to_policy_values() if is_registered_policy: try: return self.enforcer.authorize(rule, target, credentials, do_raise=do_raise, exc=exc, action=rule) except policy.PolicyNotRegistered: if self.log_not_registered: with excutils.save_and_reraise_exception(): LOG.exception(_('Policy not registered.')) else: raise else: return self.enforcer.enforce(rule, target, credentials, do_raise, exc=exc, *args, **kwargs)
[docs] def enforce(self, context, action, scope=None, target=None, is_registered_policy=False): """Verifies that the action is valid on the target in this context. :param context: Heat request context :param action: String representing the action to be checked :param target: Dictionary representing the object of the action. :raises heat.common.exception.Forbidden: When permission is denied (or self.exc if supplied). :returns: A non-False value if access is allowed. """ _action = '%s:%s' % (scope or self.scope, action) _target = target or {} return self._check(context, _action, _target, self.exc, action=action, is_registered_policy=is_registered_policy)
[docs] def check_is_admin(self, context): """Whether or not is admin according to policy. By default the rule will check whether or not roles contains 'admin' role and is admin project. :param context: Heat request context :returns: A non-False value if the user is admin according to policy """ return self._check(context, 'context_is_admin', target={}, exc=None, is_registered_policy=True)
[docs]def get_policy_enforcer(): # This method is used by oslopolicy CLI scripts to generate policy # files from overrides on disk and defaults in code. CONF([], project='heat') return get_enforcer()
[docs]def get_enforcer(): global ENFORCER if ENFORCER is None: ENFORCER = Enforcer() return ENFORCER
[docs]class ResourceEnforcer(Enforcer): def __init__(self, default_rule=DEFAULT_RESOURCE_RULES['default'], **kwargs): super(ResourceEnforcer, self).__init__( default_rule=default_rule, **kwargs) self.log_not_registered = False def _enforce(self, context, res_type, scope=None, target=None, is_registered_policy=False): try: result = super(ResourceEnforcer, self).enforce( context, res_type, scope=scope or 'resource_types', target=target, is_registered_policy=is_registered_policy) except policy.PolicyNotRegistered: result = True except self.exc as ex: LOG.info(str(ex)) raise if not result: if self.exc: raise self.exc(action=res_type) return result
[docs] def enforce(self, context, res_type, scope=None, target=None, is_registered_policy=False): # NOTE(pas-ha): try/except just to log the exception result = self._enforce(context, res_type, scope, target, is_registered_policy=is_registered_policy) if result: # check for wildcard resource types subparts = res_type.split("::")[:-1] subparts.append('*') res_type_wc = "::".join(subparts) try: return self._enforce(context, res_type_wc, scope, target, is_registered_policy=is_registered_policy) except self.exc: raise self.exc(action=res_type) return result
[docs] def enforce_stack(self, stack, scope=None, target=None, is_registered_policy=False): for res in stack.resources.values(): self.enforce(stack.context, res.type(), scope=scope, target=target, is_registered_policy=is_registered_policy)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.