heat.common.environment_util

Source code for heat.common.environment_util

#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
import collections

from oslo_log import log as logging
from oslo_serialization import jsonutils

from heat.common import environment_format as env_fmt
from heat.common import exception
from heat.common.i18n import _

ALLOWED_PARAM_MERGE_STRATEGIES = (OVERWRITE, MERGE, DEEP_MERGE) = (
    'overwrite', 'merge', 'deep_merge')

LOG = logging.getLogger(__name__)


[docs]def get_param_merge_strategy(merge_strategies, param_key, available_strategies=None): if not available_strategies: available_strategies = {} if merge_strategies is None: return OVERWRITE env_default = merge_strategies.get('default', OVERWRITE) merge_strategy = merge_strategies.get( param_key, available_strategies.get( param_key, env_default)) if merge_strategy in ALLOWED_PARAM_MERGE_STRATEGIES: return merge_strategy return env_default
[docs]def merge_list(old, new): """merges lists and comma delimited lists.""" if not old: return new if isinstance(new, list): old.extend(new) return old else: return ','.join([old, new])
[docs]def merge_map(old, new, deep_merge=False): """Merge nested dictionaries.""" if not old: return new for k, v in new.items(): if v is not None: if not deep_merge: old[k] = v elif isinstance(v, collections.abc.Mapping): old_v = old.get(k) old[k] = merge_map(old_v, v, deep_merge) if old_v else v elif (isinstance(v, collections.abc.Sequence) and not isinstance(v, str)): old_v = old.get(k) old[k] = merge_list(old_v, v) if old_v else v elif isinstance(v, str): old[k] = ''.join([old.get(k, ''), v]) else: old[k] = v return old
[docs]def parse_param(p_val, p_schema): try: if p_schema.type == p_schema.MAP: if not isinstance(p_val, str): p_val = jsonutils.dumps(p_val) if p_val: return jsonutils.loads(p_val) elif not isinstance(p_val, collections.abc.Sequence): raise ValueError() except (ValueError, TypeError) as err: msg = _("Invalid parameter in environment %s.") % str(err) raise ValueError(msg) return p_val
[docs]def merge_parameters(old, new, param_schemata, strategies_in_file, available_strategies, env_file): def param_merge(p_key, p_value, p_schema, deep_merge=False): p_type = p_schema.type p_value = parse_param(p_value, p_schema) if p_type == p_schema.MAP: old[p_key] = merge_map(old.get(p_key, {}), p_value, deep_merge) elif p_type == p_schema.LIST: old[p_key] = merge_list(old.get(p_key), p_value) elif p_type == p_schema.STRING: old[p_key] = ''.join([old.get(p_key, ''), p_value]) elif p_type == p_schema.NUMBER: old[p_key] = old.get(p_key, 0) + p_value else: raise exception.InvalidMergeStrategyForParam(strategy=MERGE, param=p_key) for key, value in new.items(): # if key not in param_schemata ignore it if key in param_schemata and value is not None: param_merge_strategy = get_param_merge_strategy( strategies_in_file, key, available_strategies) if key not in available_strategies: available_strategies[key] = param_merge_strategy elif param_merge_strategy != available_strategies[key]: raise exception.ConflictingMergeStrategyForParam( strategy=param_merge_strategy, param=key, env_file=env_file) if not old: return new for key, value in new.items(): # if key not in param_schemata ignore it if key in param_schemata and value is not None: param_merge_strategy = available_strategies[key] if param_merge_strategy == DEEP_MERGE: LOG.debug("Deep Merging Parameter: %s", key) param_merge(key, value, param_schemata[key], deep_merge=True) elif param_merge_strategy == MERGE: LOG.debug("Merging Parameter: %s", key) param_merge(key, value, param_schemata[key]) else: LOG.debug("Overriding Parameter: %s", key) old[key] = value return old
[docs]def merge_environments(environment_files, files, params, param_schemata): """Merges environment files into the stack input parameters. If a list of environment files have been specified, this call will pull the contents of each from the files dict, parse them as environments, and merge them into the stack input params. This behavior is the same as earlier versions of the Heat client that performed this params population client-side. :param environment_files: ordered names of the environment files found in the files dict :type environment_files: list or None :param files: mapping of stack filenames to contents :type files: dict :param params: parameters describing the stack :type params: dict :param param_schemata: parameter schema dict :type param_schemata: dict """ if not environment_files: return available_strategies = {} for filename in environment_files: raw_env = files[filename] parsed_env = env_fmt.parse(raw_env) strategies_in_file = parsed_env.pop( env_fmt.PARAMETER_MERGE_STRATEGIES, {}) for section_key, section_value in parsed_env.items(): if section_value: if section_key in (env_fmt.PARAMETERS, env_fmt.PARAMETER_DEFAULTS): params[section_key] = merge_parameters( params[section_key], section_value, param_schemata, strategies_in_file, available_strategies, filename) else: params[section_key] = merge_map(params[section_key], section_value)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.