#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
"""Implementation of SQLAlchemy backend."""
import datetime
import functools
import itertools
import random
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exception
from oslo_db import options
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils
from oslo_log import log as logging
from oslo_utils import timeutils
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import orm
from heat.common import crypt
from heat.common import exception
from heat.common.i18n import _
from heat.db import filters as db_filters
from heat.db import models
from heat.db import utils as db_utils
from heat.engine import environment as heat_environment
from heat.rpc import api as rpc_api
CONF = cfg.CONF
CONF.import_opt('hidden_stack_tags', 'heat.common.config')
CONF.import_opt('max_events_per_stack', 'heat.common.config')
CONF.import_group('profiler', 'heat.common.config')
CONF.import_opt('db_max_retries', 'oslo_db.options', group='database')
CONF.import_opt('db_retry_interval', 'oslo_db.options', group='database')
CONF.import_opt(
    'db_inc_retry_interval', 'oslo_db.options', group='database')
CONF.import_opt(
    'db_max_retry_interval', 'oslo_db.options', group='database')
options.set_defaults(CONF)
_facade = None
LOG = logging.getLogger(__name__)
# TODO(sbaker): fix tests so that sqlite_fk=True can be passed to configure
context_manager = enginefacade.transaction_context()
# utility methods
[docs]
def get_engine():
    return context_manager.writer.get_engine() 
[docs]
def retry_on_db_error(func):
    @functools.wraps(func)
    def try_func(context, *args, **kwargs):
        wrapped = oslo_db_api.wrap_db_retry(
            max_retries=CONF.database.db_max_retries,
            retry_on_deadlock=True,
            retry_on_disconnect=True,
            retry_interval=CONF.database.db_retry_interval,
            inc_retry_interval=CONF.database.db_inc_retry_interval,
            max_retry_interval=CONF.database.db_max_retry_interval,
        )(func)
        return wrapped(context, *args, **kwargs)
    return try_func 
def _soft_delete(context, obj):
    """Mark this object as deleted."""
    setattr(obj, 'deleted_at', timeutils.utcnow())
def _soft_delete_aware_query(context, *args, **kwargs):
    """Stack query helper that accounts for context's `show_deleted` field.
    :param show_deleted: if True, overrides context's show_deleted field.
    """
    query = context.session.query(*args)
    show_deleted = kwargs.get('show_deleted') or context.show_deleted
    if not show_deleted:
        query = query.filter_by(deleted_at=None)
    return query
# raw template
[docs]
@context_manager.reader
def raw_template_get(context, template_id):
    return _raw_template_get(context, template_id) 
def _raw_template_get(context, template_id):
    result = context.session.get(models.RawTemplate, template_id)
    if not result:
        raise exception.NotFound(_('raw template with id %s not found') %
                                 template_id)
    return result
[docs]
@context_manager.writer
def raw_template_create(context, values):
    raw_template_ref = models.RawTemplate()
    raw_template_ref.update(values)
    raw_template_ref.save(context.session)
    return raw_template_ref 
[docs]
@context_manager.writer
def raw_template_update(context, template_id, values):
    raw_template_ref = _raw_template_get(context, template_id)
    # get only the changed values
    values = dict((k, v) for k, v in values.items()
                  if getattr(raw_template_ref, k) != v)
    if values:
        for k, v in values.items():
            setattr(raw_template_ref, k, v)
    return raw_template_ref 
[docs]
@context_manager.writer
def raw_template_delete(context, template_id):
    try:
        raw_template = _raw_template_get(context, template_id)
    except exception.NotFound:
        # Ignore not found
        return
    raw_tmpl_files_id = raw_template.files_id
    context.session.delete(raw_template)
    if raw_tmpl_files_id is None:
        return
    # If no other raw_template is referencing the same raw_template_files,
    # delete that too
    if context.session.query(models.RawTemplate).filter_by(
            files_id=raw_tmpl_files_id).first() is None:
        try:
            raw_tmpl_files = _raw_template_files_get(
                context, raw_tmpl_files_id)
        except exception.NotFound:
            # Ignore not found
            return
        context.session.delete(raw_tmpl_files) 
# raw template files
[docs]
@context_manager.writer
def raw_template_files_create(context, values):
    raw_templ_files_ref = models.RawTemplateFiles()
    raw_templ_files_ref.update(values)
    raw_templ_files_ref.save(context.session)
    return raw_templ_files_ref 
[docs]
@context_manager.reader
def raw_template_files_get(context, files_id):
    return _raw_template_files_get(context, files_id) 
def _raw_template_files_get(context, files_id):
    result = context.session.get(models.RawTemplateFiles, files_id)
    if not result:
        raise exception.NotFound(
            _("raw_template_files with files_id %d not found") %
            files_id)
    return result
# resource
[docs]
@context_manager.writer
def resource_create(context, values):
    return _resource_create(context, values) 
def _resource_create(context, values):
    resource_ref = models.Resource()
    resource_ref.data = []
    resource_ref.attr_data = None
    resource_ref.rsrc_attr_data = None
    resource_ref.update(values)
    resource_ref.save(context.session)
    return resource_ref
[docs]
@retry_on_db_error
@context_manager.writer
def resource_create_replacement(context,
                                existing_res_id,
                                new_res_values,
                                atomic_key, expected_engine_id=None):
    try:
        with context_manager.writer.independent.using(context):
            new_res = _resource_create(context, new_res_values)
            update_data = {'replaced_by': new_res.id}
            rows_updated = _resource_update(
                context, existing_res_id, update_data, atomic_key,
                expected_engine_id=expected_engine_id,
            )
        if not bool(rows_updated):
            data = {}
            if 'name' in new_res_values:
                data['resource_name'] = new_res_values['name']
            raise exception.UpdateInProgress(**data)
    except db_exception.DBReferenceError as exc:
        # New template_id no longer exists
        LOG.debug('Not creating replacement resource: %s', exc)
        return None
    else:
        return new_res 
[docs]
@context_manager.reader
def resource_get_all_by_stack(context, stack_id, filters=None):
    query = context.session.query(
        models.Resource
    ).filter_by(
        stack_id=stack_id
    ).options(
        orm.joinedload(models.Resource.attr_data),
        orm.joinedload(models.Resource.data),
        orm.joinedload(models.Resource.rsrc_prop_data),
    )
    query = db_filters.exact_filter(query, models.Resource, filters)
    results = query.all()
    return dict((res.name, res) for res in results) 
[docs]
@context_manager.reader
def resource_get_all_active_by_stack(context, stack_id):
    filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
    subquery = context.session.query(models.Resource.id).filter_by(**filters)
    results = context.session.query(models.Resource).filter_by(
        stack_id=stack_id).filter(
        models.Resource.id.notin_(subquery.scalar_subquery())
    ).options(
        orm.joinedload(models.Resource.attr_data),
        orm.joinedload(models.Resource.data),
        orm.joinedload(models.Resource.rsrc_prop_data),
    ).all()
    return dict((res.id, res) for res in results) 
[docs]
@context_manager.reader
def resource_get_all_by_root_stack(context, stack_id, filters=None,
                                   stack_id_only=False):
    query = context.session.query(
        models.Resource
    ).filter_by(
        root_stack_id=stack_id
    )
    if stack_id_only:
        query = query.options(
            orm.load_only(models.Resource.id, models.Resource.stack_id)
        )
    else:
        query = query.options(
            orm.joinedload(models.Resource.attr_data),
            orm.joinedload(models.Resource.data),
            orm.joinedload(models.Resource.rsrc_prop_data),
        )
    query = db_filters.exact_filter(query, models.Resource, filters)
    results = query.all()
    return dict((res.id, res) for res in results) 
[docs]
@context_manager.reader
def engine_get_all_locked_by_stack(context, stack_id):
    query = context.session.query(
        func.distinct(models.Resource.engine_id)
    ).filter(
        models.Resource.stack_id == stack_id,
        models.Resource.engine_id.isnot(None))
    return set(i[0] for i in query.all()) 
[docs]
@context_manager.reader
def resource_get(context, resource_id, refresh=False, refresh_data=False):
    return _resource_get(context, resource_id, refresh=refresh,
                         refresh_data=refresh_data) 
def _resource_get(context, resource_id, refresh=False, refresh_data=False):
    result = context.session.query(
        models.Resource
    ).filter_by(
        id=resource_id
    ).options(
        orm.joinedload(models.Resource.attr_data),
        orm.joinedload(models.Resource.data),
        orm.joinedload(models.Resource.rsrc_prop_data),
    ).first()
    if not result:
        raise exception.NotFound(_("resource with id %s not found") %
                                 resource_id)
    if refresh:
        context.session.refresh(result)
        if refresh_data:
            # ensure data is loaded (lazy or otherwise)
            result.data
    return result
[docs]
@context_manager.reader
def resource_get_by_name_and_stack(context, resource_name, stack_id):
    result = context.session.query(
        models.Resource
    ).filter_by(
        name=resource_name
    ).filter_by(
        stack_id=stack_id
    ).options(
        orm.joinedload(models.Resource.attr_data),
        orm.joinedload(models.Resource.data),
        orm.joinedload(models.Resource.rsrc_prop_data),
    ).first()
    return result 
[docs]
@context_manager.reader
def resource_get_all_by_physical_resource_id(context, physical_resource_id):
    return list(
        _resource_get_all_by_physical_resource_id(
            context, physical_resource_id,
        )
    ) 
def _resource_get_all_by_physical_resource_id(context, physical_resource_id):
    results = context.session.query(
        models.Resource,
    ).filter_by(
        physical_resource_id=physical_resource_id,
    ).options(
        orm.joinedload(models.Resource.attr_data),
        orm.joinedload(models.Resource.data),
        orm.joinedload(models.Resource.rsrc_prop_data),
    ).all()
    for result in results:
        if context is None or context.is_admin or context.project_id in (
            result.stack.tenant, result.stack.stack_user_project_id,
        ):
            yield result
[docs]
@context_manager.reader
def resource_get_by_physical_resource_id(context, physical_resource_id):
    results = _resource_get_all_by_physical_resource_id(
        context, physical_resource_id,
    )
    try:
        return next(results)
    except StopIteration:
        return None 
[docs]
@context_manager.reader
def resource_get_all(context):
    results = context.session.query(
        models.Resource,
    ).options(
        orm.joinedload(models.Resource.attr_data),
        orm.joinedload(models.Resource.data),
        orm.joinedload(models.Resource.rsrc_prop_data),
    ).all()
    if not results:
        raise exception.NotFound(_('no resources were found'))
    return results 
[docs]
@retry_on_db_error
@context_manager.writer
def resource_purge_deleted(context, stack_id):
    filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
    query = context.session.query(models.Resource)
    result = query.filter_by(**filters)
    attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None]
    result.delete()
    if attr_ids:
        context.session.query(models.ResourcePropertiesData).filter(
            models.ResourcePropertiesData.id.in_(attr_ids)).delete(
                synchronize_session=False) 
def _add_atomic_key_to_values(values, atomic_key):
    if atomic_key is None:
        values['atomic_key'] = 1
    else:
        values['atomic_key'] = atomic_key + 1
[docs]
@retry_on_db_error
@context_manager.writer
def resource_update(context, resource_id, values, atomic_key,
                    expected_engine_id=None):
    return _resource_update(
        context, resource_id, values, atomic_key,
        expected_engine_id=expected_engine_id,
    ) 
def _resource_update(
    context, resource_id, values, atomic_key, expected_engine_id=None,
):
    _add_atomic_key_to_values(values, atomic_key)
    rows_updated = context.session.query(models.Resource).filter_by(
        id=resource_id, engine_id=expected_engine_id,
        atomic_key=atomic_key).update(values)
    return bool(rows_updated)
[docs]
@context_manager.writer
def resource_update_and_save(context, resource_id, values):
    resource = context.session.get(models.Resource, resource_id)
    resource.update(values)
    resource.save(context.session)
    return _resource_get(context, resource.id) 
[docs]
@context_manager.writer
def resource_delete(context, resource_id):
    resource = context.session.get(models.Resource, resource_id)
    if resource:
        context.session.delete(resource)
        if resource.attr_data_id is not None:
            attr_prop_data = context.session.get(
                models.ResourcePropertiesData, resource.attr_data_id)
            context.session.delete(attr_prop_data) 
[docs]
@context_manager.writer
def resource_exchange_stacks(context, resource_id1, resource_id2):
    res1 = context.session.get(models.Resource, resource_id1)
    res2 = context.session.get(models.Resource, resource_id2)
    res1.stack, res2.stack = res2.stack, res1.stack 
[docs]
@context_manager.writer
def resource_attr_id_set(context, resource_id, atomic_key, attr_id):
    values = {'attr_data_id': attr_id}
    _add_atomic_key_to_values(values, atomic_key)
    rows_updated = context.session.query(models.Resource).filter(and_(
        models.Resource.id == resource_id,
        models.Resource.atomic_key == atomic_key,
        models.Resource.engine_id.is_(None),
        or_(models.Resource.attr_data_id == attr_id,
            models.Resource.attr_data_id.is_(None)))).update(
                values)
    if rows_updated > 0:
        return True
    else:
        # Someone else set the attr_id first and/or we have a stale
        # view of the resource based on atomic_key, so delete the
        # resource_properties_data (attr) DB row.
        LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s',
                  {'rid': resource_id, 'aid': attr_id})
        context.session.query(
            models.ResourcePropertiesData).filter(
                models.ResourcePropertiesData.id == attr_id).delete()
        return False 
[docs]
@context_manager.writer
def resource_attr_data_delete(context, resource_id, attr_id):
    resource = context.session.get(models.Resource, resource_id)
    attr_prop_data = context.session.get(
        models.ResourcePropertiesData, attr_id)
    if resource:
        resource.update({'attr_data_id': None})
    if attr_prop_data:
        context.session.delete(attr_prop_data) 
# resource data
[docs]
@context_manager.reader
def resource_data_get_all(context, resource_id, data=None):
    """Looks up resource_data by resource.id.
    If data is encrypted, this method will decrypt the results.
    """
    if data is None:
        data = (context.session.query(models.ResourceData)
                .filter_by(resource_id=resource_id)).all()
    if not data:
        raise exception.NotFound(_('no resource data found'))
    ret = {}
    for res in data:
        if res.redact:
            try:
                ret[res.key] = crypt.decrypt(res.decrypt_method, res.value)
                continue
            except exception.InvalidEncryptionKey:
                LOG.exception('Failed to decrypt resource data %(rkey)s '
                              'for %(rid)s, ignoring.',
                              {'rkey': res.key, 'rid': resource_id})
        ret[res.key] = res.value
    return ret 
[docs]
@context_manager.reader
def resource_data_get(context, resource_id, key):
    """Lookup value of resource's data by key.
    Decrypts resource data if necessary.
    """
    result = _resource_data_get_by_key(context, resource_id, key)
    if result.redact:
        return crypt.decrypt(result.decrypt_method, result.value)
    return result.value 
[docs]
@context_manager.reader
def resource_data_get_by_key(context, resource_id, key):
    return _resource_data_get_by_key(context, resource_id, key) 
def _resource_data_get_by_key(context, resource_id, key):
    """Looks up resource_data by resource_id and key.
    Does not decrypt resource_data.
    """
    result = (context.session.query(models.ResourceData)
              .filter_by(resource_id=resource_id)
              .filter_by(key=key).first())
    if not result:
        raise exception.NotFound(_('No resource data found'))
    return result
[docs]
@context_manager.writer
def resource_data_set(context, resource_id, key, value, redact=False):
    """Save resource's key/value pair to database."""
    if redact:
        method, value = crypt.encrypt(value)
    else:
        method = ''
    try:
        current = _resource_data_get_by_key(context, resource_id, key)
    except exception.NotFound:
        current = models.ResourceData()
        current.key = key
        current.resource_id = resource_id
    current.redact = redact
    current.value = value
    current.decrypt_method = method
    current.save(session=context.session)
    return current 
[docs]
@context_manager.writer
def resource_data_delete(context, resource_id, key):
    result = _resource_data_get_by_key(context, resource_id, key)
    context.session.delete(result) 
# resource properties data
[docs]
@context_manager.writer
def resource_prop_data_create_or_update(context, values, rpd_id=None):
    return _resource_prop_data_create_or_update(context, values, rpd_id=rpd_id) 
def _resource_prop_data_create_or_update(context, values, rpd_id=None):
    obj_ref = None
    if rpd_id is not None:
        obj_ref = context.session.query(
            models.ResourcePropertiesData).filter_by(id=rpd_id).first()
    if obj_ref is None:
        obj_ref = models.ResourcePropertiesData()
    obj_ref.update(values)
    obj_ref.save(context.session)
    return obj_ref
[docs]
@context_manager.writer
def resource_prop_data_create(context, values):
    return _resource_prop_data_create_or_update(context, values) 
[docs]
@context_manager.reader
def resource_prop_data_get(context, resource_prop_data_id):
    result = context.session.get(
        models.ResourcePropertiesData, resource_prop_data_id)
    if result is None:
        raise exception.NotFound(
            _('ResourcePropertiesData with id %s not found') %
            resource_prop_data_id)
    return result 
# stack
[docs]
@context_manager.reader
def stack_get_by_name_and_owner_id(context, stack_name, owner_id):
    query = _soft_delete_aware_query(
        context, models.Stack
    ).options(
        orm.joinedload(models.Stack.raw_template),
    ).filter(
        sqlalchemy.or_(
            models.Stack.tenant == context.project_id,
            models.Stack.stack_user_project_id == context.project_id,
        )
    ).filter_by(name=stack_name).filter_by(owner_id=owner_id)
    return query.first() 
[docs]
@context_manager.reader
def stack_get_by_name(context, stack_name):
    return _stack_get_by_name(context, stack_name) 
def _stack_get_by_name(context, stack_name):
    query = _soft_delete_aware_query(
        context, models.Stack
    ).options(
        orm.joinedload(models.Stack.raw_template),
    ).filter(
        sqlalchemy.or_(
            models.Stack.tenant == context.project_id,
            models.Stack.stack_user_project_id == context.project_id),
    ).filter_by(name=stack_name)
    return query.order_by(models.Stack.created_at).first()
[docs]
@context_manager.reader
def stack_get(context, stack_id, show_deleted=False, eager_load=True):
    return _stack_get(
        context, stack_id, show_deleted=show_deleted, eager_load=eager_load
    ) 
def _stack_get(context, stack_id, show_deleted=False, eager_load=True):
    options = []
    if eager_load:
        options.append(orm.joinedload(models.Stack.raw_template))
    result = context.session.get(models.Stack, stack_id, options=options)
    deleted_ok = show_deleted or context.show_deleted
    if result is None or result.deleted_at is not None and not deleted_ok:
        return None
    # One exception to normal project scoping is users created by the
    # stacks in the stack_user_project_id (in the heat stack user domain)
    if (result is not None
        and context is not None and not context.is_admin
        and context.project_id not in (result.tenant,
                                       result.stack_user_project_id)):
        return None
    return result
[docs]
@context_manager.reader
def stack_get_status(context, stack_id):
    query = context.session.query(models.Stack)
    query = query.options(
        orm.load_only(
            models.Stack.action,
            models.Stack.status,
            models.Stack.status_reason,
            models.Stack.updated_at,
        )
    )
    result = query.filter_by(id=stack_id).first()
    if result is None:
        raise exception.NotFound(_('Stack with id %s not found') % stack_id)
    return (result.action, result.status, result.status_reason,
            result.updated_at) 
[docs]
@context_manager.reader
def stack_get_all_by_owner_id(context, owner_id):
    return _stack_get_all_by_owner_id(context, owner_id) 
def _stack_get_all_by_owner_id(context, owner_id):
    results = _soft_delete_aware_query(
        context, models.Stack,
    ).filter_by(
        owner_id=owner_id, backup=False,
    ).all()
    return results
[docs]
@context_manager.reader
def stack_get_all_by_root_owner_id(context, owner_id):
    return list(_stack_get_all_by_root_owner_id(context, owner_id)) 
def _stack_get_all_by_root_owner_id(context, owner_id):
    for stack in _stack_get_all_by_owner_id(context, owner_id):
        yield stack
        for ch_st in _stack_get_all_by_root_owner_id(context, stack.id):
            yield ch_st
def _get_sort_keys(sort_keys, mapping):
    """Returns an array containing only allowed keys
    :param sort_keys: an array of strings
    :param mapping: a mapping from keys to DB column names
    :returns: filtered list of sort keys
    """
    if isinstance(sort_keys, str):
        sort_keys = [sort_keys]
    return [mapping[key] for key in sort_keys or [] if key in mapping]
def _paginate_query(context, query, model, limit=None, sort_keys=None,
                    marker=None, sort_dir=None):
    default_sort_keys = ['created_at']
    if not sort_keys:
        sort_keys = default_sort_keys
        if not sort_dir:
            sort_dir = 'desc'
    # This assures the order of the stacks will always be the same
    # even for sort_key values that are not unique in the database
    sort_keys = sort_keys + ['id']
    model_marker = None
    if marker:
        model_marker = context.session.get(model, marker)
    try:
        query = utils.paginate_query(query, model, limit, sort_keys,
                                     model_marker, sort_dir)
    except utils.InvalidSortKey as exc:
        raise exception.Invalid(reason=str(exc))
    return query
def _query_stack_get_all(context, show_deleted=False,
                         show_nested=False, show_hidden=False, tags=None,
                         tags_any=None, not_tags=None, not_tags_any=None):
    if show_nested:
        query = _soft_delete_aware_query(
            context, models.Stack, show_deleted=show_deleted
        ).filter_by(backup=False)
    else:
        query = _soft_delete_aware_query(
            context, models.Stack, show_deleted=show_deleted
        ).filter_by(owner_id=None)
    if not context.is_admin:
        query = query.filter_by(tenant=context.project_id)
    query = query.options(orm.subqueryload(models.Stack.tags))
    if tags:
        for tag in tags:
            tag_alias = orm.aliased(models.StackTag)
            query = query.join(tag_alias, models.Stack.tags)
            query = query.filter(tag_alias.tag == tag)
    if tags_any:
        query = query.filter(
            models.Stack.tags.any(
                models.StackTag.tag.in_(tags_any)))
    if not_tags:
        subquery = _soft_delete_aware_query(
            context, models.Stack, show_deleted=show_deleted
        )
        for tag in not_tags:
            tag_alias = orm.aliased(models.StackTag)
            subquery = subquery.join(tag_alias, models.Stack.tags)
            subquery = subquery.filter(tag_alias.tag == tag)
        not_stack_ids = [s.id for s in subquery.all()]
        query = query.filter(models.Stack.id.notin_(not_stack_ids))
    if not_tags_any:
        query = query.filter(
            ~models.Stack.tags.any(
                models.StackTag.tag.in_(not_tags_any)))
    if not show_hidden and cfg.CONF.hidden_stack_tags:
        query = query.filter(
            ~models.Stack.tags.any(
                models.StackTag.tag.in_(cfg.CONF.hidden_stack_tags)))
    return query
[docs]
@context_manager.reader
def stack_get_all(context, limit=None, sort_keys=None, marker=None,
                  sort_dir=None, filters=None,
                  show_deleted=False, show_nested=False, show_hidden=False,
                  tags=None, tags_any=None, not_tags=None,
                  not_tags_any=None, eager_load=False):
    query = _query_stack_get_all(context,
                                 show_deleted=show_deleted,
                                 show_nested=show_nested,
                                 show_hidden=show_hidden, tags=tags,
                                 tags_any=tags_any, not_tags=not_tags,
                                 not_tags_any=not_tags_any)
    if eager_load:
        query = query.options(orm.joinedload(models.Stack.raw_template))
    return _filter_and_page_query(context, query, limit, sort_keys,
                                  marker, sort_dir, filters).all() 
def _filter_and_page_query(context, query, limit=None, sort_keys=None,
                           marker=None, sort_dir=None, filters=None):
    if filters is None:
        filters = {}
    sort_key_map = {rpc_api.STACK_NAME: models.Stack.name.key,
                    rpc_api.STACK_STATUS: models.Stack.status.key,
                    rpc_api.STACK_CREATION_TIME: models.Stack.created_at.key,
                    rpc_api.STACK_UPDATED_TIME: models.Stack.updated_at.key}
    valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map)
    query = db_filters.exact_filter(query, models.Stack, filters)
    return _paginate_query(context, query, models.Stack, limit,
                           valid_sort_keys, marker, sort_dir)
[docs]
@context_manager.reader
def stack_count_all(context, filters=None,
                    show_deleted=False, show_nested=False, show_hidden=False,
                    tags=None, tags_any=None, not_tags=None,
                    not_tags_any=None):
    query = _query_stack_get_all(context,
                                 show_deleted=show_deleted,
                                 show_nested=show_nested,
                                 show_hidden=show_hidden, tags=tags,
                                 tags_any=tags_any, not_tags=not_tags,
                                 not_tags_any=not_tags_any)
    query = db_filters.exact_filter(query, models.Stack, filters)
    return query.count() 
[docs]
@context_manager.writer
def stack_create(context, values):
    stack_ref = models.Stack()
    stack_ref.update(values)
    stack_name = stack_ref.name
    stack_ref.save(context.session)
    # Even though we just created a stack with this name, we may not find
    # it again because some unit tests create stacks with deleted_at set. Also
    # some backup stacks may not be found, for reasons that are unclear.
    earliest = _stack_get_by_name(context, stack_name)
    if earliest is not None and earliest.id != stack_ref.id:
        context.session.query(models.Stack).filter_by(
            id=stack_ref.id,
        ).delete()
        raise exception.StackExists(stack_name=stack_name)
    return stack_ref 
[docs]
@retry_on_db_error
@context_manager.writer
def stack_update(context, stack_id, values, exp_trvsl=None):
    query = (context.session.query(models.Stack)
             .filter(and_(models.Stack.id == stack_id),
                     (models.Stack.deleted_at.is_(None))))
    if not context.is_admin:
        query = query.filter(sqlalchemy.or_(
            models.Stack.tenant == context.project_id,
            models.Stack.stack_user_project_id == context.project_id))
    if exp_trvsl is not None:
        query = query.filter(models.Stack.current_traversal == exp_trvsl)
    rows_updated = query.update(values, synchronize_session=False)
    if not rows_updated:
        LOG.debug('Did not set stack state with values '
                  '%(vals)s, stack id: %(id)s with '
                  'expected traversal: %(trav)s',
                  {'id': stack_id, 'vals': str(values),
                   'trav': str(exp_trvsl)})
        if not _stack_get(context, stack_id, eager_load=False):
            raise exception.NotFound(
                _('Attempt to update a stack with id: '
                  '%(id)s %(msg)s') % {
                      'id': stack_id,
                      'msg': 'that does not exist'})
    return (rows_updated is not None and rows_updated > 0) 
[docs]
@context_manager.writer
def stack_delete(context, stack_id):
    s = _stack_get(context, stack_id, eager_load=False)
    if not s:
        raise exception.NotFound(_('Attempt to delete a stack with id: '
                                 '%(id)s %(msg)s') % {
                                     'id': stack_id,
                                     'msg': 'that does not exist'})
    attr_ids = []
    # normally the resources are deleted already by this point
    for r in s.resources:
        if r.attr_data_id is not None:
            attr_ids.append(r.attr_data_id)
        context.session.delete(r)
    if attr_ids:
        context.session.query(
            models.ResourcePropertiesData.id).filter(
                models.ResourcePropertiesData.id.in_(attr_ids)).delete(
                    synchronize_session=False)
    _soft_delete(context, s) 
[docs]
@context_manager.writer
def reset_stack_status(context, stack_id):
    return _reset_stack_status(context, stack_id) 
# NOTE(stephenfin): This method uses separate transactions to delete nested
# stacks, thus it's the only private method that is allowed to open a
# transaction (via 'context.session.begin')
def _reset_stack_status(context, stack_id, stack=None):
    if stack is None:
        stack = context.session.get(models.Stack, stack_id)
    if stack is None:
        raise exception.NotFound(_('Stack with id %s not found') % stack_id)
    query = context.session.query(models.Resource).filter_by(
        status='IN_PROGRESS', stack_id=stack_id)
    query.update({'status': 'FAILED',
                  'status_reason': 'Stack status manually reset',
                  'engine_id': None})
    query = context.session.query(models.ResourceData)
    query = query.join(models.Resource)
    query = query.filter_by(stack_id=stack_id)
    query = query.filter(
        models.ResourceData.key.in_(heat_environment.HOOK_TYPES))
    data_ids = [data.id for data in query]
    if data_ids:
        query = context.session.query(models.ResourceData)
        query = query.filter(models.ResourceData.id.in_(data_ids))
        query.delete(synchronize_session='fetch')
    # commit what we've done already
    context.session.commit()
    query = context.session.query(models.Stack).filter_by(owner_id=stack_id)
    for child in query:
        _reset_stack_status(context, child.id, child)
    if stack.status == 'IN_PROGRESS':
        stack.status = 'FAILED'
        stack.status_reason = 'Stack status manually reset'
    context.session.query(
        models.StackLock
    ).filter_by(stack_id=stack_id).delete()
def _stack_tags_delete(context, stack_id):
    result = _stack_tags_get(context, stack_id)
    if result:
        for tag in result:
            context.session.delete(tag)
def _stack_tags_get(context, stack_id):
    result = (context.session.query(models.StackTag)
              .filter_by(stack_id=stack_id)
              .all())
    return result or None
# stack lock
def _is_duplicate_error(exc):
    return isinstance(exc, db_exception.DBDuplicateEntry)
[docs]
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
                           retry_on_disconnect=True,
                           retry_interval=0.5,
                           inc_retry_interval=True,
                           exception_checker=_is_duplicate_error)
def stack_lock_create(context, stack_id, engine_id):
    with context_manager.writer.independent.using(context) as session:
        lock = session.get(models.StackLock, stack_id)
        if lock is not None:
            return lock.engine_id
        session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id)) 
[docs]
def stack_lock_get_engine_id(context, stack_id):
    with context_manager.reader.independent.using(context) as session:
        lock = session.get(models.StackLock, stack_id)
        if lock is not None:
            return lock.engine_id 
[docs]
@context_manager.writer
def persist_state_and_release_lock(context, stack_id, engine_id, values):
    rows_updated = (context.session.query(models.Stack)
                    .filter(models.Stack.id == stack_id)
                    .update(values, synchronize_session=False))
    rows_affected = None
    if rows_updated is not None and rows_updated > 0:
        rows_affected = context.session.query(
            models.StackLock
        ).filter_by(stack_id=stack_id, engine_id=engine_id).delete()
    if not rows_affected:
        return True 
[docs]
def stack_lock_steal(context, stack_id, old_engine_id, new_engine_id):
    with context_manager.writer.independent.using(context) as session:
        lock = session.get(models.StackLock, stack_id)
        rows_affected = session.query(
            models.StackLock
        ).filter_by(stack_id=stack_id, engine_id=old_engine_id
                    ).update({"engine_id": new_engine_id})
    if not rows_affected:
        return lock.engine_id if lock is not None else True 
[docs]
def stack_lock_release(context, stack_id, engine_id):
    with context_manager.writer.independent.using(context) as session:
        rows_affected = session.query(
            models.StackLock
        ).filter_by(stack_id=stack_id, engine_id=engine_id).delete()
    if not rows_affected:
        return True 
[docs]
@context_manager.reader
def stack_get_root_id(context, stack_id):
    s = _stack_get(context, stack_id, eager_load=False)
    if not s:
        return None
    while s.owner_id:
        s = _stack_get(context, s.owner_id, eager_load=False)
    return s.id 
[docs]
@context_manager.reader
def stack_count_total_resources(context, stack_id):
    # count all resources which belong to the root stack
    return context.session.query(
        func.count(models.Resource.id)
    ).filter_by(root_stack_id=stack_id).scalar() 
# user credentials
[docs]
@context_manager.writer
def user_creds_create(context):
    values = context.to_dict()
    user_creds_ref = models.UserCreds()
    if values.get('trust_id'):
        method, trust_id = crypt.encrypt(values.get('trust_id'))
        user_creds_ref.trust_id = trust_id
        user_creds_ref.decrypt_method = method
        user_creds_ref.trustor_user_id = values.get('trustor_user_id')
        user_creds_ref.username = None
        user_creds_ref.password = None
        user_creds_ref.tenant = values.get('tenant')
        user_creds_ref.tenant_id = values.get('tenant_id')
        user_creds_ref.auth_url = values.get('auth_url')
        user_creds_ref.region_name = values.get('region_name')
    else:
        user_creds_ref.update(values)
        method, password = crypt.encrypt(values['password'])
        if len(str(password)) > 255:
            raise exception.Error(_("Length of OS_PASSWORD after encryption"
                                    " exceeds Heat limit (255 chars)"))
        user_creds_ref.password = password
        user_creds_ref.decrypt_method = method
    user_creds_ref.save(context.session)
    result = dict(user_creds_ref)
    if values.get('trust_id'):
        result['trust_id'] = values.get('trust_id')
    else:
        result['password'] = values.get('password')
    return result 
[docs]
@context_manager.reader
def user_creds_get(context, user_creds_id):
    db_result = context.session.get(models.UserCreds, user_creds_id)
    if db_result is None:
        return None
    # Return a dict copy of DB results, do not decrypt details into db_result
    # or it can be committed back to the DB in decrypted form
    result = dict(db_result)
    del result['decrypt_method']
    result['password'] = crypt.decrypt(
        db_result.decrypt_method, result['password'])
    result['trust_id'] = crypt.decrypt(
        db_result.decrypt_method, result['trust_id'])
    return result 
[docs]
@db_utils.retry_on_stale_data_error
@context_manager.writer
def user_creds_delete(context, user_creds_id):
    creds = context.session.get(models.UserCreds, user_creds_id)
    if not creds:
        raise exception.NotFound(
            _('Attempt to delete user creds with id '
              '%(id)s that does not exist') % {'id': user_creds_id})
    context.session.delete(creds) 
# event
[docs]
@context_manager.reader
def event_get_all_by_tenant(context, limit=None, marker=None,
                            sort_keys=None, sort_dir=None, filters=None):
    query = context.session.query(models.Event)
    query = db_filters.exact_filter(query, models.Event, filters)
    query = query.join(
        models.Event.stack
    ).filter_by(tenant=context.project_id).filter_by(deleted_at=None)
    filters = None
    return _events_filter_and_page_query(context, query, limit, marker,
                                         sort_keys, sort_dir, filters).all() 
[docs]
@context_manager.reader
def event_get_all_by_stack(context, stack_id, limit=None, marker=None,
                           sort_keys=None, sort_dir=None, filters=None):
    query = context.session.query(models.Event).filter_by(stack_id=stack_id)
    if filters and 'uuid' in filters:
        # retrieving a single event, so eager load its rsrc_prop_data detail
        query = query.options(orm.joinedload(models.Event.rsrc_prop_data))
    return _events_filter_and_page_query(context, query, limit, marker,
                                         sort_keys, sort_dir, filters).all() 
def _events_paginate_query(context, query, model, limit=None, sort_keys=None,
                           marker=None, sort_dir=None):
    default_sort_keys = ['created_at']
    if not sort_keys:
        sort_keys = default_sort_keys
        if not sort_dir:
            sort_dir = 'desc'
    # This assures the order of the stacks will always be the same
    # even for sort_key values that are not unique in the database
    sort_keys = sort_keys + ['id']
    model_marker = None
    if marker:
        # not to use context.session.get(model, marker), because
        # user can only see the ID(column 'uuid') and the ID as the marker
        model_marker = context.session.query(
            model).filter_by(uuid=marker).first()
    try:
        query = utils.paginate_query(query, model, limit, sort_keys,
                                     model_marker, sort_dir)
    except utils.InvalidSortKey as exc:
        raise exception.Invalid(reason=str(exc))
    return query
def _events_filter_and_page_query(context, query,
                                  limit=None, marker=None,
                                  sort_keys=None, sort_dir=None,
                                  filters=None):
    if filters is None:
        filters = {}
    sort_key_map = {rpc_api.EVENT_TIMESTAMP: models.Event.created_at.key,
                    rpc_api.EVENT_RES_TYPE: models.Event.resource_type.key}
    valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map)
    query = db_filters.exact_filter(query, models.Event, filters)
    return _events_paginate_query(context, query, models.Event, limit,
                                  valid_sort_keys, marker, sort_dir)
[docs]
@context_manager.reader
def event_count_all_by_stack(context, stack_id):
    return _event_count_all_by_stack(context, stack_id) 
def _event_count_all_by_stack(context, stack_id):
    query = context.session.query(func.count(models.Event.id))
    return query.filter_by(stack_id=stack_id).scalar()
def _find_rpd_references(context, stack_id):
    ev_ref_ids = set(
        e.rsrc_prop_data_id for e
        in context.session.query(models.Event).filter_by(
            stack_id=stack_id,
        ).all()
    )
    rsrc_ref_ids = set(
        r.rsrc_prop_data_id for r
        in context.session.query(models.Resource).filter_by(
            stack_id=stack_id,
        ).all()
    )
    return ev_ref_ids | rsrc_ref_ids
def _all_backup_stack_ids(context, stack_id):
    """Iterate over all the IDs of all stacks related as stack/backup pairs.
    All backup stacks of a main stack, past and present (i.e. including those
    that are soft deleted), are included. The main stack itself is also
    included if the initial ID passed in is for a backup stack. The initial ID
    passed in is never included in the output.
    """
    stack = context.session.get(models.Stack, stack_id)
    if stack is None:
        LOG.error('Stack %s not found', stack_id)
        return
    is_backup = stack.name.endswith('*')
    if is_backup:
        main = context.session.get(models.Stack, stack.owner_id)
        if main is None:
            LOG.error('Main stack for backup "%s" %s not found',
                      stack.name, stack_id)
            return
        yield main.id
        for backup_id in _all_backup_stack_ids(context, main.id):
            if backup_id != stack_id:
                yield backup_id
    else:
        q_backup = context.session.query(models.Stack).filter(sqlalchemy.or_(
            models.Stack.tenant == context.project_id,
            models.Stack.stack_user_project_id == context.project_id))
        q_backup = q_backup.filter_by(name=stack.name + '*')
        q_backup = q_backup.filter_by(owner_id=stack_id)
        for backup in q_backup.all():
            yield backup.id
def _delete_event_rows(context, stack_id, limit):
    # MySQL does not support LIMIT in subqueries,
    # sqlite does not support JOIN in DELETE.
    # So we must manually supply the IN() values.
    # pgsql SHOULD work with the pure DELETE/JOIN below but that must be
    # confirmed via integration tests.
    query = context.session.query(models.Event).filter_by(
        stack_id=stack_id,
    )
    query = query.order_by(models.Event.id).limit(limit)
    id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()]
    if not id_pairs:
        return 0
    (ids, rsrc_prop_ids) = zip(*id_pairs)
    max_id = ids[-1]
    # delete the events
    retval = context.session.query(models.Event).filter(
        models.Event.id <= max_id).filter(
            models.Event.stack_id == stack_id).delete()
    # delete unreferenced resource_properties_data
    def del_rpd(rpd_ids):
        if not rpd_ids:
            return
        q_rpd = context.session.query(models.ResourcePropertiesData)
        q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids))
        q_rpd.delete(synchronize_session=False)
    if rsrc_prop_ids:
        clr_prop_ids = set(rsrc_prop_ids) - _find_rpd_references(context,
                                                                 stack_id)
        clr_prop_ids.discard(None)
        try:
            del_rpd(clr_prop_ids)
        except db_exception.DBReferenceError:
            LOG.debug('Checking backup/stack pairs for RPD references')
            found = False
            for partner_stack_id in _all_backup_stack_ids(context,
                                                          stack_id):
                found = True
                clr_prop_ids -= _find_rpd_references(context,
                                                     partner_stack_id)
            if not found:
                LOG.debug('No backup/stack pairs found for %s', stack_id)
                raise
            del_rpd(clr_prop_ids)
    return retval
[docs]
@retry_on_db_error
@context_manager.writer
def event_create(context, values):
    if 'stack_id' in values and cfg.CONF.max_events_per_stack:
        # only count events and purge on average
        # 200.0/cfg.CONF.event_purge_batch_size percent of the time.
        check = (2.0 / cfg.CONF.event_purge_batch_size) > random.uniform(0, 1)
        if (
            check and _event_count_all_by_stack(
                context, values['stack_id']
            ) >= cfg.CONF.max_events_per_stack
        ):
            # prune
            try:
                _delete_event_rows(
                    context, values['stack_id'],
                    cfg.CONF.event_purge_batch_size,
                )
            except db_exception.DBError as exc:
                LOG.error('Failed to purge events: %s', str(exc))
    event_ref = models.Event()
    event_ref.update(values)
    event_ref.save(context.session)
    result = context.session.query(models.Event).filter_by(
        id=event_ref.id,
    ).options(
        orm.joinedload(models.Event.rsrc_prop_data)
    ).first()
    return result 
# software config
[docs]
@context_manager.writer
def software_config_create(context, values):
    obj_ref = models.SoftwareConfig()
    obj_ref.update(values)
    obj_ref.save(context.session)
    return obj_ref 
[docs]
@context_manager.reader
def software_config_get(context, config_id):
    return _software_config_get(context, config_id) 
def _software_config_get(context, config_id):
    result = context.session.get(models.SoftwareConfig, config_id)
    if (result is not None and context is not None and not context.is_admin and
            result.tenant != context.project_id):
        result = None
    if not result:
        raise exception.NotFound(_('Software config with id %s not found') %
                                 config_id)
    return result
[docs]
@context_manager.reader
def software_config_get_all(context, limit=None, marker=None):
    query = context.session.query(models.SoftwareConfig)
    if not context.is_admin:
        query = query.filter_by(tenant=context.project_id)
    return _paginate_query(context, query, models.SoftwareConfig,
                           limit=limit, marker=marker).all() 
[docs]
@context_manager.reader
def software_config_count_all(context):
    query = context.session.query(models.SoftwareConfig)
    if not context.is_admin:
        query = query.filter_by(tenant=context.project_id)
    return query.count() 
[docs]
@context_manager.writer
def software_config_delete(context, config_id):
    config = _software_config_get(context, config_id)
    # Query if the software config has been referenced by deployment.
    result = context.session.query(models.SoftwareDeployment).filter_by(
        config_id=config_id).first()
    if result:
        msg = (_("Software config with id %s can not be deleted as "
                 "it is referenced.") % config_id)
        raise exception.InvalidRestrictedAction(message=msg)
    context.session.delete(config) 
# software deployment
[docs]
@context_manager.writer
def software_deployment_create(context, values):
    obj_ref = models.SoftwareDeployment()
    obj_ref.update(values)
    try:
        obj_ref.save(context.session)
    except db_exception.DBReferenceError:
        # NOTE(tkajinam): config_id is the only FK in SoftwareDeployment
        err_msg = _('Config with id %s not found') % values['config_id']
        raise exception.Invalid(reason=err_msg)
    return _software_deployment_get(context, obj_ref.id) 
[docs]
@context_manager.reader
def software_deployment_get(context, deployment_id):
    return _software_deployment_get(context, deployment_id) 
def _software_deployment_get(context, deployment_id):
    # TODO(stephenfin): Why doesn't options work with session.get?
    result = context.session.query(
        models.SoftwareDeployment,
    ).filter_by(
        id=deployment_id,
    ).options(
        orm.joinedload(models.SoftwareDeployment.config),
    ).first()
    if (result is not None and context is not None and not context.is_admin and
        context.project_id not in (result.tenant,
                                   result.stack_user_project_id)):
        result = None
    if not result:
        raise exception.NotFound(_('Deployment with id %s not found') %
                                 deployment_id)
    return result
[docs]
@context_manager.reader
def software_deployment_get_all(context, server_id=None):
    sd = models.SoftwareDeployment
    query = context.session.query(sd).order_by(sd.created_at)
    if not context.is_admin:
        query = query.filter(
            sqlalchemy.or_(
                sd.tenant == context.project_id,
                sd.stack_user_project_id == context.project_id,
            )
        )
    if server_id:
        query = query.filter_by(server_id=server_id)
    query = query.join(
        models.SoftwareDeployment.config,
    ).options(
        orm.contains_eager(models.SoftwareDeployment.config)
    )
    return query.all() 
[docs]
@context_manager.reader
def software_deployment_count_all(context):
    sd = models.SoftwareDeployment
    query = context.session.query(sd)
    if not context.is_admin:
        query = query.filter(
            sqlalchemy.or_(
                sd.tenant == context.project_id,
                sd.stack_user_project_id == context.project_id,
            )
        )
    return query.count() 
[docs]
@context_manager.writer
def software_deployment_update(context, deployment_id, values):
    deployment = _software_deployment_get(context, deployment_id)
    try:
        for k, v in values.items():
            setattr(deployment, k, v)
    except db_exception.DBReferenceError:
        # NOTE(tkajinam): config_id is the only FK in SoftwareDeployment
        err_msg = _('Config with id %s not found') % values['config_id']
        raise exception.Invalid(reason=err_msg)
    return deployment 
[docs]
@context_manager.writer
def software_deployment_delete(context, deployment_id):
    deployment = _software_deployment_get(context, deployment_id)
    context.session.delete(deployment) 
# snapshot
[docs]
@context_manager.writer
def snapshot_create(context, values):
    obj_ref = models.Snapshot()
    obj_ref.update(values)
    obj_ref.save(context.session)
    return obj_ref 
[docs]
@context_manager.reader
def snapshot_get(context, snapshot_id):
    return _snapshot_get(context, snapshot_id) 
def _snapshot_get(context, snapshot_id):
    result = context.session.get(models.Snapshot, snapshot_id)
    if (result is not None and context is not None and
            context.project_id != result.tenant):
        result = None
    if not result:
        raise exception.NotFound(_('Snapshot with id %s not found') %
                                 snapshot_id)
    return result
[docs]
@context_manager.reader
def snapshot_get_by_stack(context, snapshot_id, stack):
    snapshot = _snapshot_get(context, snapshot_id)
    if snapshot.stack_id != stack.id:
        raise exception.SnapshotNotFound(snapshot=snapshot_id,
                                         stack=stack.name)
    return snapshot 
[docs]
@context_manager.writer
def snapshot_update(context, snapshot_id, values):
    snapshot = _snapshot_get(context, snapshot_id)
    snapshot.update(values)
    snapshot.save(context.session)
    return snapshot 
[docs]
@context_manager.writer
def snapshot_delete(context, snapshot_id):
    snapshot = _snapshot_get(context, snapshot_id)
    context.session.delete(snapshot) 
[docs]
@context_manager.reader
def snapshot_get_all_by_stack(context, stack_id):
    return context.session.query(models.Snapshot).filter_by(
        stack_id=stack_id, tenant=context.project_id) 
[docs]
@context_manager.reader
def snapshot_count_all_by_stack(context, stack_id):
    return context.session.query(models.Snapshot).filter_by(
        stack_id=stack_id, tenant=context.project_id).count() 
# service
[docs]
@context_manager.writer
def service_create(context, values):
    service = models.Service()
    service.update(values)
    service.save(context.session)
    return service 
[docs]
@context_manager.writer
def service_update(context, service_id, values):
    service = _service_get(context, service_id)
    values.update({'updated_at': timeutils.utcnow()})
    service.update(values)
    service.save(context.session)
    return service 
[docs]
@context_manager.writer
def service_delete(context, service_id, soft_delete=True):
    service = _service_get(context, service_id)
    if soft_delete:
        _soft_delete(context, service)
    else:
        context.session.delete(service) 
[docs]
@context_manager.reader
def service_get(context, service_id):
    return _service_get(context, service_id) 
def _service_get(context, service_id):
    result = context.session.get(models.Service, service_id)
    if result is None:
        raise exception.EntityNotFound(entity='Service', name=service_id)
    return result
[docs]
@context_manager.reader
def service_get_all(context):
    return context.session.query(models.Service).filter_by(
        deleted_at=None,
    ).all() 
[docs]
@context_manager.reader
def service_get_all_by_args(context, host, binary, hostname):
    return (context.session.query(models.Service).
            filter_by(host=host).
            filter_by(binary=binary).
            filter_by(hostname=hostname).all()) 
# purge
[docs]
def purge_deleted(age, granularity='days', project_id=None, batch_size=20):
    def _validate_positive_integer(val, argname):
        try:
            val = int(val)
        except ValueError:
            raise exception.Error(_("%s should be an integer") % argname)
        if val < 0:
            raise exception.Error(_("%s should be a positive integer")
                                  % argname)
        return val
    age = _validate_positive_integer(age, 'age')
    batch_size = _validate_positive_integer(batch_size, 'batch_size')
    if granularity not in ('days', 'hours', 'minutes', 'seconds'):
        raise exception.Error(
            _("granularity should be days, hours, minutes, or seconds"))
    if granularity == 'days':
        age = age * 86400
    elif granularity == 'hours':
        age = age * 3600
    elif granularity == 'minutes':
        age = age * 60
    time_line = timeutils.utcnow() - datetime.timedelta(seconds=age)
    engine = get_engine()
    meta = sqlalchemy.MetaData()
    with engine.connect() as conn, conn.begin():
        stack = sqlalchemy.Table('stack', meta, autoload_with=conn)
        service = sqlalchemy.Table('service', meta, autoload_with=conn)
    # Purge deleted services
    srvc_del = service.delete().where(service.c.deleted_at < time_line)
    with engine.connect() as conn, conn.begin():
        conn.execute(srvc_del)
    # find the soft-deleted stacks that are past their expiry
    sel = sqlalchemy.select(
        stack.c.id,
        stack.c.raw_template_id,
        stack.c.prev_raw_template_id,
        stack.c.user_creds_id,
        stack.c.action,
        stack.c.status,
        stack.c.name)
    if project_id:
        stack_where = sel.where(and_(
            stack.c.tenant == project_id,
            stack.c.deleted_at < time_line))
    else:
        stack_where = sel.where(
            stack.c.deleted_at < time_line)
    with engine.connect() as conn, conn.begin():
        stacks = conn.execute(stack_where)
    while True:
        next_stacks_to_purge = list(itertools.islice(stacks, batch_size))
        if len(next_stacks_to_purge):
            _purge_stacks(next_stacks_to_purge, engine, meta)
        else:
            break 
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
                           retry_interval=0.5, inc_retry_interval=True)
def _purge_stacks(stack_infos, engine, meta):
    """Purge some stacks and their releated events, raw_templates, etc.
    stack_infos is a list of lists of selected stack columns:
    [[id, raw_template_id, prev_raw_template_id, user_creds_id,
      action, status, name], ...]
    """
    with engine.connect() as conn, conn.begin():
        stack = sqlalchemy.Table('stack', meta, autoload_with=conn)
        stack_lock = sqlalchemy.Table('stack_lock', meta, autoload_with=conn)
        stack_tag = sqlalchemy.Table('stack_tag', meta, autoload_with=conn)
        resource = sqlalchemy.Table('resource', meta, autoload_with=conn)
        resource_data = sqlalchemy.Table(
            'resource_data', meta, autoload_with=conn)
        resource_properties_data = sqlalchemy.Table(
            'resource_properties_data', meta, autoload_with=conn)
        event = sqlalchemy.Table('event', meta, autoload_with=conn)
        raw_template = sqlalchemy.Table(
            'raw_template', meta, autoload_with=conn)
        raw_template_files = sqlalchemy.Table(
            'raw_template_files', meta, autoload_with=conn)
        user_creds = sqlalchemy.Table('user_creds', meta, autoload_with=conn)
        syncpoint = sqlalchemy.Table('sync_point', meta, autoload_with=conn)
    stack_info_str = ','.join([str(i) for i in stack_infos])
    LOG.info("Purging stacks %s", stack_info_str)
    # TODO(cwolfe): find a way to make this re-entrant with
    # reasonably sized transactions (good luck), or add
    # a cleanup for orphaned rows.
    stack_ids = [stack_info[0] for stack_info in stack_infos]
    # delete stack locks (just in case some got stuck)
    stack_lock_del = stack_lock.delete().where(
        stack_lock.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        conn.execute(stack_lock_del)
    # delete stack tags
    stack_tag_del = stack_tag.delete().where(
        stack_tag.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        conn.execute(stack_tag_del)
    # delete resource_data
    res_where = sqlalchemy.select(resource.c.id).where(
        resource.c.stack_id.in_(stack_ids))
    res_data_del = resource_data.delete().where(
        resource_data.c.resource_id.in_(res_where))
    with engine.connect() as conn, conn.begin():
        conn.execute(res_data_del)
    # clean up any sync_points that may have lingered
    sync_del = syncpoint.delete().where(
        syncpoint.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        conn.execute(sync_del)
    # get rsrc_prop_data_ids to delete
    rsrc_prop_data_where = sqlalchemy.select(
        resource.c.rsrc_prop_data_id,
    ).where(
        resource.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        rsrc_prop_data_ids = set(
            [i[0] for i in list(conn.execute(rsrc_prop_data_where))]
        )
    rsrc_prop_data_where = sqlalchemy.select(
        resource.c.attr_data_id,
    ).where(
        resource.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        rsrc_prop_data_ids.update(
            [i[0] for i in list(conn.execute(rsrc_prop_data_where))]
        )
    rsrc_prop_data_where = sqlalchemy.select(
        event.c.rsrc_prop_data_id,
    ).where(
        event.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        rsrc_prop_data_ids.update(
            [i[0] for i in list(conn.execute(rsrc_prop_data_where))]
        )
    # delete events
    event_del = event.delete().where(event.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        conn.execute(event_del)
    # delete resources (normally there shouldn't be any)
    res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        conn.execute(res_del)
    # delete resource_properties_data
    if rsrc_prop_data_ids:  # keep rpd's in events
        rsrc_prop_data_where = sqlalchemy.select(
            event.c.rsrc_prop_data_id,
        ).where(
            event.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids))
        with engine.connect() as conn, conn.begin():
            ids = list(conn.execute(rsrc_prop_data_where))
        rsrc_prop_data_ids.difference_update([i[0] for i in ids])
    if rsrc_prop_data_ids:  # keep rpd's in resources
        rsrc_prop_data_where = sqlalchemy.select(
            resource.c.rsrc_prop_data_id,
        ).where(
            resource.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids))
        with engine.connect() as conn, conn.begin():
            ids = list(conn.execute(rsrc_prop_data_where))
        rsrc_prop_data_ids.difference_update([i[0] for i in ids])
    if rsrc_prop_data_ids:  # delete if we have any
        rsrc_prop_data_del = resource_properties_data.delete().where(
            resource_properties_data.c.id.in_(rsrc_prop_data_ids))
        with engine.connect() as conn, conn.begin():
            conn.execute(rsrc_prop_data_del)
    # delete the stacks
    stack_del = stack.delete().where(stack.c.id.in_(stack_ids))
    with engine.connect() as conn, conn.begin():
        conn.execute(stack_del)
    # delete orphaned raw templates
    raw_template_ids = [i[1] for i in stack_infos if i[1] is not None]
    raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None)
    if raw_template_ids:  # keep those still referenced
        raw_tmpl_sel = sqlalchemy.select(stack.c.raw_template_id).where(
            stack.c.raw_template_id.in_(raw_template_ids))
        with engine.connect() as conn, conn.begin():
            raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)]
        raw_template_ids = set(raw_template_ids) - set(raw_tmpl)
    if raw_template_ids:  # keep those still referenced (previous tmpl)
        raw_tmpl_sel = sqlalchemy.select(
            stack.c.prev_raw_template_id,
        ).where(
            stack.c.prev_raw_template_id.in_(raw_template_ids))
        with engine.connect() as conn, conn.begin():
            raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)]
        raw_template_ids = raw_template_ids - set(raw_tmpl)
    if raw_template_ids:  # delete raw_templates if we have any
        raw_tmpl_file_sel = sqlalchemy.select(
            raw_template.c.files_id,
        ).where(
            raw_template.c.id.in_(raw_template_ids))
        with engine.connect() as conn, conn.begin():
            raw_tmpl_file_ids = [i[0] for i in conn.execute(
                raw_tmpl_file_sel)]
        raw_templ_del = raw_template.delete().where(
            raw_template.c.id.in_(raw_template_ids))
        with engine.connect() as conn, conn.begin():
            conn.execute(raw_templ_del)
        if raw_tmpl_file_ids:  # keep _files still referenced
            raw_tmpl_file_sel = sqlalchemy.select(
                raw_template.c.files_id,
            ).where(
                raw_template.c.files_id.in_(raw_tmpl_file_ids))
            with engine.connect() as conn, conn.begin():
                raw_tmpl_files = [i[0] for i in conn.execute(
                    raw_tmpl_file_sel)]
            raw_tmpl_file_ids = set(raw_tmpl_file_ids) \
                - set(raw_tmpl_files)
        if raw_tmpl_file_ids:  # delete _files if we have any
            raw_tmpl_file_del = raw_template_files.delete().where(
                raw_template_files.c.id.in_(raw_tmpl_file_ids))
            with engine.connect() as conn, conn.begin():
                conn.execute(raw_tmpl_file_del)
    # purge any user creds that are no longer referenced
    user_creds_ids = [i[3] for i in stack_infos if i[3] is not None]
    if user_creds_ids:  # keep those still referenced
        user_sel = sqlalchemy.select(stack.c.user_creds_id).where(
            stack.c.user_creds_id.in_(user_creds_ids))
        with engine.connect() as conn, conn.begin():
            users = [i[0] for i in conn.execute(user_sel)]
        user_creds_ids = set(user_creds_ids) - set(users)
    if user_creds_ids:  # delete if we have any
        usr_creds_del = user_creds.delete().where(
            user_creds.c.id.in_(user_creds_ids))
        with engine.connect() as conn, conn.begin():
            conn.execute(usr_creds_del)
# sync point
[docs]
@context_manager.writer
def sync_point_delete_all_by_stack_and_traversal(context, stack_id,
                                                 traversal_id):
    rows_deleted = context.session.query(models.SyncPoint).filter_by(
        stack_id=stack_id, traversal_id=traversal_id).delete()
    return rows_deleted 
[docs]
@retry_on_db_error
@context_manager.writer
def sync_point_create(context, values):
    values['entity_id'] = str(values['entity_id'])
    sync_point_ref = models.SyncPoint()
    sync_point_ref.update(values)
    sync_point_ref.save(context.session)
    return sync_point_ref 
[docs]
@context_manager.reader
def sync_point_get(context, entity_id, traversal_id, is_update):
    entity_id = str(entity_id)
    return context.session.get(
        models.SyncPoint, (entity_id, traversal_id, is_update),
    )