oslo_db.sqlalchemy package

oslo_db.sqlalchemy package

Subpackages

Submodules

oslo_db.sqlalchemy.enginefacade module

exception oslo_db.sqlalchemy.enginefacade.AlreadyStartedError

Bases: TypeError

Raises when a factory is being asked to initialize a second time.

Subclasses TypeError for legacy support.

class oslo_db.sqlalchemy.enginefacade.LegacyEngineFacade(sql_connection, slave_connection=None, sqlite_fk=False, autocommit=False, expire_on_commit=False, _conf=None, _factory=None, **kwargs)

Bases: object

A helper class for removing of global engine instances from oslo.db.

Deprecated since version 1.12.0: Please use oslo_db.sqlalchemy.enginefacade for new development.

As a library, oslo.db can’t decide where to store/when to create engine and sessionmaker instances, so this must be left for a target application.

On the other hand, in order to simplify the adoption of oslo.db changes, we’ll provide a helper class, which creates engine and sessionmaker on its instantiation and provides get_engine()/get_session() methods that are compatible with corresponding utility functions that currently exist in target projects, e.g. in Nova.

engine/sessionmaker instances will still be global (and they are meant to be global), but they will be stored in the app context, rather that in the oslo.db context.

Two important things to remember:

  1. An Engine instance is effectively a pool of DB connections, so it’s meant to be shared (and it’s thread-safe).

  2. A Session instance is not meant to be shared and represents a DB transactional context (i.e. it’s not thread-safe). sessionmaker is a factory of sessions.

Parameters:
  • sql_connection (string) – the connection string for the database to use

  • slave_connection (string) – the connection string for the ‘slave’ database to use. If not provided, the master database will be used for all operations. Note: this is meant to be used for offloading of read operations to asynchronously replicated slaves to reduce the load on the master database.

  • sqlite_fk (bool) – enable foreign keys in SQLite

  • autocommit (bool) – use autocommit mode for created Session instances

  • expire_on_commit (bool) – expire session objects on commit

Keyword arguments:

Parameters:
  • mysql_sql_mode – the SQL mode to be used for MySQL sessions. (defaults to TRADITIONAL)

  • mysql_enable_ndb – If True, transparently enables support for handling MySQL Cluster (NDB). (defaults to False)

  • connection_recycle_time – Time period for connections to be recycled upon checkout (defaults to 3600)

  • connection_debug – verbosity of SQL debugging information. -1=Off, 0=None, 100=Everything (defaults to 0)

  • max_pool_size – maximum number of SQL connections to keep open in a pool (defaults to SQLAlchemy settings)

  • max_overflow – if set, use this value for max_overflow with sqlalchemy (defaults to SQLAlchemy settings)

  • pool_timeout – if set, use this value for pool_timeout with sqlalchemy (defaults to SQLAlchemy settings)

  • sqlite_synchronous – if True, SQLite uses synchronous mode (defaults to True)

  • connection_trace – add python stack traces to SQL as comment strings (defaults to False)

  • max_retries – maximum db connection retries during startup. (setting -1 implies an infinite retry count) (defaults to 10)

  • retry_interval – interval between retries of opening a sql connection (defaults to 10)

  • thread_checkin – boolean that indicates that between each engine checkin event a sleep(0) will occur to allow other greenthreads to run (defaults to True)

classmethod from_config(conf, sqlite_fk=False, autocommit=False, expire_on_commit=False)

Initialize EngineFacade using oslo.config config instance options.

Parameters:
  • conf (oslo_config.cfg.ConfigOpts) – oslo.config config instance

  • sqlite_fk (bool) – enable foreign keys in SQLite

  • autocommit (bool) – use autocommit mode for created Session instances

  • expire_on_commit (bool) – expire session objects on commit

get_engine(use_slave=False)

Get the engine instance (note, that it’s shared).

Parameters:

use_slave (bool) – if possible, use ‘slave’ database for this engine. If the connection string for the slave database wasn’t provided, ‘master’ engine will be returned. (defaults to False)

get_session(use_slave=False, **kwargs)

Get a Session instance.

Parameters:

use_slave (bool) – if possible, use ‘slave’ database connection for this session. If the connection string for the slave database wasn’t provided, a session bound to the ‘master’ engine will be returned. (defaults to False)

Keyword arguments will be passed to a sessionmaker instance as is (if passed, they will override the ones used when the sessionmaker instance was created). See SQLAlchemy Session docs for details.

get_sessionmaker(use_slave=False)

Get the sessionmaker instance used to create a Session.

This can be called for those cases where the sessionmaker() is to be temporarily injected with some state such as a specific connection.

oslo_db.sqlalchemy.enginefacade.configure(**kw)

Apply configurational options to the global factory.

This method can only be called before any specific transaction-beginning methods have been called.

See also

_TransactionFactory.configure()

oslo_db.sqlalchemy.enginefacade.get_legacy_facade()

Return a LegacyEngineFacade for the global factory.

This facade will make use of the same engine and sessionmaker as this factory, however will not share the same transaction context; the legacy facade continues to work the old way of returning a new Session each time get_session() is called.

oslo_db.sqlalchemy.enginefacade.reader = <oslo_db.sqlalchemy.enginefacade._TransactionContextManager object>

The global ‘reader’ starting point.

oslo_db.sqlalchemy.enginefacade.transaction_context()

Construct a local transaction context.

oslo_db.sqlalchemy.enginefacade.transaction_context_provider(klass)

Decorate a class with session and connection attributes.

oslo_db.sqlalchemy.enginefacade.writer = <oslo_db.sqlalchemy.enginefacade._TransactionContextManager object>

The global ‘writer’ starting point.

oslo_db.sqlalchemy.engines module

Core SQLAlchemy connectivity routines.

oslo_db.sqlalchemy.engines.create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, mysql_enable_ndb=False, connection_recycle_time=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, connection_trace=False, max_retries=10, retry_interval=10, thread_checkin=True, logging_name=None, json_serializer=None, json_deserializer=None, connection_parameters=None)

Return a new SQLAlchemy engine.

oslo_db.sqlalchemy.exc_filters module

Define exception redefinitions for SQLAlchemy DBAPI exceptions.

oslo_db.sqlalchemy.exc_filters.filters(dbname, exception_type, regex)

Mark a function as receiving a filtered exception.

Parameters:
  • dbname – string database name, e.g. ‘mysql’

  • exception_type – a SQLAlchemy database exception class, which extends from sqlalchemy.exc.DBAPIError.

  • regex – a string, or a tuple of strings, that will be processed as matching regular expressions.

oslo_db.sqlalchemy.exc_filters.handler(context)

Iterate through available filters and invoke those which match.

The first one which raises wins. The order in which the filters are attempted is sorted by specificity - dialect name or “*”, exception class per method resolution order (__mro__). Method resolution order is used so that filter rules indicating a more specific exception class are attempted first.

oslo_db.sqlalchemy.exc_filters.register_engine(engine)

oslo_db.sqlalchemy.migration module

oslo_db.sqlalchemy.migration.db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True)

Upgrade or downgrade a database.

Function runs the upgrade() or downgrade() functions in change scripts.

Parameters:
  • engine – SQLAlchemy engine instance for a given database

  • abs_path – Absolute path to migrate repository.

  • version – Database will upgrade/downgrade until this version. If None - database will update to the latest available version.

  • init_version – Initial database version

  • sanity_check – Require schema sanity checking for all tables

oslo_db.sqlalchemy.migration.db_version(engine, abs_path, init_version)

Show the current version of the repository.

Parameters:
  • engine – SQLAlchemy engine instance for a given database

  • abs_path – Absolute path to migrate repository

  • init_version – Initial database version

oslo_db.sqlalchemy.migration.db_version_control(engine, abs_path, version=None)

Mark a database as under this repository’s version control.

Once a database is under version control, schema changes should only be done via change scripts in this repository.

Parameters:
  • engine – SQLAlchemy engine instance for a given database

  • abs_path – Absolute path to migrate repository

  • version – Initial database version

oslo_db.sqlalchemy.models module

SQLAlchemy models.

class oslo_db.sqlalchemy.models.ModelBase

Bases: object

Base class for models.

get(key, default=None)
items()

Make the model object behave like a dict.

iteritems()

Make the model object behave like a dict.

keys()

Make the model object behave like a dict.

save(session)

Save this object.

update(values)

Make the model object behave like a dict.

class oslo_db.sqlalchemy.models.ModelIterator(model, columns)

Bases: object

class oslo_db.sqlalchemy.models.SoftDeleteMixin

Bases: object

deleted = Column(None, SoftDeleteInteger(), table=None, default=ColumnDefault(0))
deleted_at = Column(None, DateTime(), table=None)
soft_delete(session)

Mark this object as deleted.

class oslo_db.sqlalchemy.models.TimestampMixin

Bases: object

created_at = Column(None, DateTime(), table=None, default=ColumnDefault(<function TimestampMixin.<lambda>>))
updated_at = Column(None, DateTime(), table=None, onupdate=ColumnDefault(<function TimestampMixin.<lambda>>))

oslo_db.sqlalchemy.ndb module

Core functions for MySQL Cluster (NDB) Support.

oslo_db.sqlalchemy.ndb.enable_ndb_support(engine)

Enable NDB Support.

Function to flag the MySQL engine dialect to support features specific to MySQL Cluster (NDB).

oslo_db.sqlalchemy.ndb.init_ndb_events(engine)

Initialize NDB Events.

Function starts NDB specific events.

oslo_db.sqlalchemy.ndb.ndb_status(engine_or_compiler)

Test if NDB Support is enabled.

Function to test if NDB support is enabled or not.

oslo_db.sqlalchemy.ndb.prefix_inserts(create_table, compiler, **kw)

Replace InnoDB with NDBCLUSTER automatically.

Function will intercept CreateTable() calls and automatically convert InnoDB to NDBCLUSTER. Targets compiler events.

oslo_db.sqlalchemy.orm module

SQLAlchemy ORM connectivity and query structures.

class oslo_db.sqlalchemy.orm.Query(entities, session=None)

Bases: Query

Subclass of sqlalchemy.query with soft_delete() method.

soft_delete(synchronize_session='evaluate')
update_on_match(specimen, surrogate_key, values, **kw)

Emit an UPDATE statement matching the given specimen.

This is a method-version of oslo_db.sqlalchemy.update_match.update_on_match(); see that function for usage details.

update_returning_pk(values, surrogate_key)

Perform an UPDATE, returning the primary key of the matched row.

This is a method-version of oslo_db.sqlalchemy.update_match.update_returning_pk(); see that function for usage details.

class oslo_db.sqlalchemy.orm.Session(bind=None, autoflush=True, future=False, expire_on_commit=True, autocommit=False, twophase=False, binds=None, enable_baked_queries=True, info=None, query_cls=None)

Bases: Session

oslo.db-specific Session subclass.

oslo_db.sqlalchemy.orm.get_maker(engine, autocommit=False, expire_on_commit=False)

Return a SQLAlchemy sessionmaker using the given engine.

oslo_db.sqlalchemy.provision module

Provision test environment for specific DB backends

class oslo_db.sqlalchemy.provision.Backend(database_type, url)

Bases: object

Represent a particular database backend that may be provisionable.

The Backend object maintains a database type (e.g. database without specific driver type, such as “sqlite”, “postgresql”, etc.), a target URL, a base Engine for that URL object that can be used to provision databases and a BackendImpl which knows how to perform operations against this type of Engine.

classmethod all_viable_backends()

Return an iterator of all Backend objects that are present

and provisionable.

classmethod backend_for_database_type(database_type)

Return the Backend for the given database type.

backends_by_database_type = {'mysql': <oslo_db.sqlalchemy.provision.Backend object>, 'postgresql': <oslo_db.sqlalchemy.provision.Backend object>, 'sqlite': <oslo_db.sqlalchemy.provision.Backend object>}
create_named_database(ident, conditional=False)

Create a database with the given name.

database_exists(ident)

Return True if a database of the given name exists.

drop_all_objects(engine)

Drop all database objects.

Drops all database objects remaining on the default schema of the given engine.

drop_named_database(ident, conditional=False)

Drop a database with the given name.

provisioned_database_url(ident)

Given the identifier of an anoymous database, return a URL.

For hostname-based URLs, this typically involves switching just the ‘database’ portion of the URL with the given name and creating a URL.

For SQLite URLs, the identifier may be used to create a filename or may be ignored in the case of a memory database.

class oslo_db.sqlalchemy.provision.BackendImpl(drivername)

Bases: object

Provide database-specific implementations of key provisioning

functions.

BackendImpl is owned by a Backend instance which delegates to it for all database-specific features.

classmethod all_impls()

Return an iterator of all possible BackendImpl objects.

These are BackendImpls that are implemented, but not necessarily provisionable.

abstract create_named_database(engine, ident, conditional=False)

Create a database with the given name.

abstract create_opportunistic_driver_url()

Produce a string url known as the ‘opportunistic’ URL.

This URL is one that corresponds to an established OpenStack convention for a pre-established database login, which, when detected as available in the local environment, is automatically used as a test platform for a specific type of driver.

default_engine_kwargs = {}
dispose(engine)
drop_additional_objects(conn)
drop_all_objects(engine)

Drop all database objects.

Drops all database objects remaining on the default schema of the given engine.

Per-db implementations will also need to drop items specific to those systems, such as sequences, custom types (e.g. pg ENUM), etc.

abstract drop_named_database(engine, ident, conditional=False)

Drop a database with the given name.

impl = <oslo_db.sqlalchemy.utils.DialectSingleFunctionDispatcher object>
provisioned_database_url(base_url, ident)

Return a provisioned database URL.

Given the URL of a particular database backend and the string name of a particular ‘database’ within that backend, return an URL which refers directly to the named database.

For hostname-based URLs, this typically involves switching just the ‘database’ portion of the URL with the given name and creating an engine.

For URLs that instead deal with DSNs, the rules may be more custom; for example, the engine may need to connect to the root URL and then emit a command to switch to the named database.

supports_drop_fk = True
class oslo_db.sqlalchemy.provision.BackendResource(database_type, ad_hoc_url=None)

Bases: TestResourceManager

clean(resource)

Override this to class method to hook into resource removal.

isDirty()

Return True if this managers cached resource is dirty.

Calling when the resource is not currently held has undefined behaviour.

make(dependency_resources)

Override this to construct resources.

Parameters:

dependency_resources – A dict mapping name -> resource instance for the resources specified as dependencies.

Returns:

The made resource.

class oslo_db.sqlalchemy.provision.DatabaseResource(database_type, _enginefacade=None, provision_new_database=True, ad_hoc_url=None)

Bases: TestResourceManager

Database resource which connects and disconnects to a URL.

For SQLite, this means the database is created implicitly, as a result of SQLite’s usual behavior. If the database is a file-based URL, it will remain after the resource has been torn down.

For all other kinds of databases, the resource indicates to connect and disconnect from that database.

clean(resource)

Override this to class method to hook into resource removal.

isDirty()

Return True if this managers cached resource is dirty.

Calling when the resource is not currently held has undefined behaviour.

make(dependency_resources)

Override this to construct resources.

Parameters:

dependency_resources – A dict mapping name -> resource instance for the resources specified as dependencies.

Returns:

The made resource.

class oslo_db.sqlalchemy.provision.ProvisionedDatabase(backend, enginefacade, engine, db_token)

Bases: object

Represents a database engine pointing to a DB ready to run tests.

backend: an instance of Backend

enginefacade: an instance of _TransactionFactory

engine: a SQLAlchemy Engine

db_token: if provision_new_database were used, this is the randomly

generated name of the database. Note that with SQLite memory connections, this token is ignored. For a database that wasn’t actually created, will be None.

backend
db_token
engine
enginefacade
class oslo_db.sqlalchemy.provision.Schema

Bases: object

“Represents a database schema that has or will be populated.

This is a marker object as required by testresources but otherwise serves no purpose.

database
class oslo_db.sqlalchemy.provision.SchemaResource(database_resource, generate_schema, teardown=False)

Bases: TestResourceManager

clean(resource)

Override this to class method to hook into resource removal.

isDirty()

Return True if this managers cached resource is dirty.

Calling when the resource is not currently held has undefined behaviour.

make(dependency_resources)

Override this to construct resources.

Parameters:

dependency_resources – A dict mapping name -> resource instance for the resources specified as dependencies.

Returns:

The made resource.

oslo_db.sqlalchemy.session module

Session Handling for SQLAlchemy backend.

Recommended ways to use sessions within this framework:

  • Use the enginefacade system for connectivity, session and transaction management:

    from oslo_db.sqlalchemy import enginefacade
    
    @enginefacade.reader
    def get_foo(context, foo):
        return (model_query(models.Foo, context.session).
                filter_by(foo=foo).
                first())
    
    @enginefacade.writer
    def update_foo(context, id, newfoo):
        (model_query(models.Foo, context.session).
                filter_by(id=id).
                update({'foo': newfoo}))
    
    @enginefacade.writer
    def create_foo(context, values):
        foo_ref = models.Foo()
        foo_ref.update(values)
        foo_ref.save(context.session)
        return foo_ref
    

    In the above system, transactions are committed automatically, and are shared among all dependent database methods. Ensure that methods which “write” data are enclosed within @writer blocks.

    Note

    Statements in the session scope will not be automatically retried.

  • If you create models within the session, they need to be added, but you do not need to call model.save():

    @enginefacade.writer
    def create_many_foo(context, foos):
        for foo in foos:
            foo_ref = models.Foo()
            foo_ref.update(foo)
            context.session.add(foo_ref)
    
    @enginefacade.writer
    def update_bar(context, foo_id, newbar):
        foo_ref = (model_query(models.Foo, context.session).
                    filter_by(id=foo_id).
                    first())
        (model_query(models.Bar, context.session).
                    filter_by(id=foo_ref['bar_id']).
                    update({'bar': newbar}))
    

    The two queries in update_bar can alternatively be expressed using a single query, which may be more efficient depending on scenario:

    @enginefacade.writer
    def update_bar(context, foo_id, newbar):
        subq = (model_query(models.Foo.id, context.session).
                filter_by(id=foo_id).
                limit(1).
                subquery())
        (model_query(models.Bar, context.session).
                filter_by(id=subq.as_scalar()).
                update({'bar': newbar}))
    

    For reference, this emits approximately the following SQL statement:

    UPDATE bar SET bar = '${newbar}'
        WHERE id=(SELECT bar_id FROM foo WHERE id = '${foo_id}' LIMIT 1);
    

    Note

    create_duplicate_foo is a trivially simple example of catching an exception while using a savepoint. Here we create two duplicate instances with same primary key, must catch the exception out of context managed by a single session:

    @enginefacade.writer
    def create_duplicate_foo(context):
        foo1 = models.Foo()
        foo2 = models.Foo()
        foo1.id = foo2.id = 1
        try:
            with context.session.begin_nested():
                session.add(foo1)
                session.add(foo2)
        except exception.DBDuplicateEntry as e:
            handle_error(e)
    
  • The enginefacade system eliminates the need to decide when sessions need to be passed between methods. All methods should instead share a common context object; the enginefacade system will maintain the transaction across method calls.

    @enginefacade.writer
    def myfunc(context, foo):
        # do some database things
        bar = _private_func(context, foo)
        return bar
    
    def _private_func(context, foo):
        with enginefacade.using_writer(context) as session:
            # do some other database things
            session.add(SomeObject())
        return bar
    
  • Avoid with_lockmode('UPDATE') when possible.

    FOR UPDATE is not compatible with MySQL/Galera. Instead, an “opportunistic” approach should be used, such that if an UPDATE fails, the entire transaction should be retried. The @wrap_db_retry decorator is one such system that can be used to achieve this.

Enabling soft deletes:

  • To use/enable soft-deletes, SoftDeleteMixin may be used. For example:

    class NovaBase(models.SoftDeleteMixin, models.ModelBase):
        pass
    

Efficient use of soft deletes:

  • While there is a model.soft_delete() method, prefer query.soft_delete(). Some examples:

    @enginefacade.writer
    def soft_delete_bar(context):
        # synchronize_session=False will prevent the ORM from attempting
        # to search the Session for instances matching the DELETE;
        # this is typically not necessary for small operations.
        count = model_query(BarModel, context.session).\
            find(some_condition).soft_delete(synchronize_session=False)
        if count == 0:
            raise Exception("0 entries were soft deleted")
    
    @enginefacade.writer
    def complex_soft_delete_with_synchronization_bar(context):
        # use synchronize_session='evaluate' when you'd like to attempt
        # to update the state of the Session to match that of the DELETE.
        # This is potentially helpful if the operation is complex and
        # continues to work with instances that were loaded, though
        # not usually needed.
        count = (model_query(BarModel, context.session).
                    find(some_condition).
                    soft_delete(synchronize_session='evaulate'))
        if count == 0:
            raise Exception("0 entries were soft deleted")
    
oslo_db.sqlalchemy.session.EngineFacade

alias of LegacyEngineFacade

class oslo_db.sqlalchemy.session.Query(entities, session=None)

Bases: Query

Subclass of sqlalchemy.query with soft_delete() method.

soft_delete(synchronize_session='evaluate')
update_on_match(specimen, surrogate_key, values, **kw)

Emit an UPDATE statement matching the given specimen.

This is a method-version of oslo_db.sqlalchemy.update_match.update_on_match(); see that function for usage details.

update_returning_pk(values, surrogate_key)

Perform an UPDATE, returning the primary key of the matched row.

This is a method-version of oslo_db.sqlalchemy.update_match.update_returning_pk(); see that function for usage details.

class oslo_db.sqlalchemy.session.Session(bind=None, autoflush=True, future=False, expire_on_commit=True, autocommit=False, twophase=False, binds=None, enable_baked_queries=True, info=None, query_cls=None)

Bases: Session

oslo.db-specific Session subclass.

oslo_db.sqlalchemy.session.create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, mysql_enable_ndb=False, connection_recycle_time=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, connection_trace=False, max_retries=10, retry_interval=10, thread_checkin=True, logging_name=None, json_serializer=None, json_deserializer=None, connection_parameters=None)

Return a new SQLAlchemy engine.

oslo_db.sqlalchemy.session.get_maker(engine, autocommit=False, expire_on_commit=False)

Return a SQLAlchemy sessionmaker using the given engine.

oslo_db.sqlalchemy.test_base module

class oslo_db.sqlalchemy.test_base.DbFixture(test, skip_on_unavailable_db=True)

Bases: Fixture

Basic database fixture.

Allows to run tests on various db backends, such as SQLite, MySQL and PostgreSQL. By default use sqlite backend. To override default backend uri set env variable OS_TEST_DBAPI_ADMIN_CONNECTION with database admin credentials for specific backend.

DBNAME = 'openstack_citest'
DRIVER = 'sqlite'
PASSWORD = 'openstack_citest'
USERNAME = 'openstack_citest'
setUp()

Prepare the Fixture for use.

This should not be overridden. Concrete fixtures should implement _setUp. Overriding of setUp is still supported, just not recommended.

After setUp has completed, the fixture will have one or more attributes which can be used (these depend totally on the concrete subclass).

Raises:

MultipleExceptions if _setUp fails. The last exception captured within the MultipleExceptions will be a SetupError exception.

Returns:

None.

Changed in 1.3:

The recommendation to override setUp has been reversed - before 1.3, setUp() should be overridden, now it should not be.

Changed in 1.3.1:

BaseException is now caught, and only subclasses of Exception are wrapped in MultipleExceptions.

class oslo_db.sqlalchemy.test_base.DbTestCase(*args, **kwds)

Bases: BaseTestCase

Base class for testing of DB code.

FIXTURE

alias of DbFixture

SCHEMA_SCOPE = None
SKIP_ON_UNAVAILABLE_DB = True
generate_schema(engine)

Generate schema objects to be used within a test.

The function is separate from the setUp() case as the scope of this method is controlled by the provisioning system. A test that specifies SCHEMA_SCOPE may not call this method for each test, as the schema may be maintained from a previous run.

property resources
setUp()

Hook method for setting up the test fixture before exercising it.

class oslo_db.sqlalchemy.test_base.MySQLOpportunisticFixture(test, skip_on_unavailable_db=True)

Bases: DbFixture

DRIVER = 'mysql'
class oslo_db.sqlalchemy.test_base.MySQLOpportunisticTestCase(*args, **kwds)

Bases: OpportunisticTestCase

FIXTURE

alias of MySQLOpportunisticFixture

class oslo_db.sqlalchemy.test_base.OpportunisticTestCase(*args, **kwds)

Bases: DbTestCase

Placeholder for backwards compatibility.

class oslo_db.sqlalchemy.test_base.PostgreSQLOpportunisticFixture(test, skip_on_unavailable_db=True)

Bases: DbFixture

DRIVER = 'postgresql'
class oslo_db.sqlalchemy.test_base.PostgreSQLOpportunisticTestCase(*args, **kwds)

Bases: OpportunisticTestCase

FIXTURE

alias of PostgreSQLOpportunisticFixture

oslo_db.sqlalchemy.test_base.backend_specific(*dialects)

Decorator to skip backend specific tests on inappropriate engines.

::dialects: list of dialects names under which the test will be launched.

oslo_db.sqlalchemy.test_fixtures module

class oslo_db.sqlalchemy.test_fixtures.AdHocDbFixture(url=None)

Bases: SimpleDbFixture

“Fixture which creates and disposes a database engine per test.

Also allows a specific URL to be passed, meaning the fixture can be hardcoded to a specific SQLite file.

For a SQLite, this fixture will create the named database upon setup and tear it down upon teardown. For other databases, the database is assumed to exist already and will remain after teardown.

class oslo_db.sqlalchemy.test_fixtures.BaseDbFixture(driver=None, ident=None)

Bases: Fixture

Base database provisioning fixture.

This serves as the base class for the other fixtures, but by itself does not implement _setUp(). It provides the basis for the flags implemented by the various capability mixins (GenerateSchema, DeletesFromSchema, etc.) as well as providing an abstraction over the provisioning objects, which are specific to testresources. Overall, consumers of this fixture just need to use the right classes and the testresources mechanics are taken care of.

DRIVER = 'sqlite'
get_enginefacade()

Return an enginefacade._TransactionContextManager.

This is typically a global variable like “context_manager” declared in the db/api.py module and is the object returned by enginefacade.transaction_context().

If left not implemented, the global enginefacade manager is used.

For the case where a project uses per-object or per-test enginefacades like Gnocchi, the get_per_test_enginefacade() method should also be implemented.

get_per_test_enginefacade()

Return an enginefacade._TransactionContextManager per test.

This facade should be the one that the test expects the code to use. Usually this is the same one returned by get_engineafacade() which is the default. For special applications like Gnocchi, this can be overridden to provide an instance-level facade.

class oslo_db.sqlalchemy.test_fixtures.DeletesFromSchema

Bases: ResetsData

Mixin defining a fixture that can delete from all tables in place.

When DeletesFromSchema is present in a fixture, _DROP_SCHEMA_PER_TEST is now False; this means that the “teardown” flag of provision.SchemaResource will be False, which prevents SchemaResource from dropping all objects within the schema after each test.

This is a “capability” mixin that works in conjunction with classes that include BaseDbFixture as a base.

delete_from_schema(engine)

A hook which should delete all data from an existing schema.

Should not drop any objects, just remove data from tables that needs to be reset between tests.

reset_schema_data(engine, facade)

Reset the data in the schema.

class oslo_db.sqlalchemy.test_fixtures.GeneratesSchema

Bases: object

Mixin defining a fixture as generating a schema using create_all().

This is a “capability” mixin that works in conjunction with classes that include BaseDbFixture as a base.

generate_schema_create_all(engine)

A hook which should generate the model schema using create_all().

This hook is called within the scope of creating the database assuming BUILD_WITH_MIGRATIONS is False.

class oslo_db.sqlalchemy.test_fixtures.GeneratesSchemaFromMigrations

Bases: GeneratesSchema

Mixin defining a fixture as generating a schema using migrations.

This is a “capability” mixin that works in conjunction with classes that include BaseDbFixture as a base.

generate_schema_migrations(engine)

A hook which should generate the model schema using migrations.

This hook is called within the scope of creating the database assuming BUILD_WITH_MIGRATIONS is True.

class oslo_db.sqlalchemy.test_fixtures.MySQLOpportunisticFixture(test, driver=None, ident=None)

Bases: OpportunisticDbFixture

DRIVER = 'mysql'
class oslo_db.sqlalchemy.test_fixtures.OpportunisticDBTestMixin

Bases: object

Test mixin that integrates the test suite with testresources.

There are three goals to this system:

  1. Allow creation of “stub” test suites that will run all the tests in a parent suite against a specific kind of database (e.g. Mysql, Postgresql), where the entire suite will be skipped if that target kind of database is not available to the suite.

  2. provide a test with a process-local, anonymously named schema within a target database, so that the test can run concurrently with other tests without conflicting data

  3. provide compatibility with the testresources.OptimisingTestSuite, which organizes TestCase instances ahead of time into groups that all make use of the same type of database, setting up and tearing down a database schema once for the scope of any number of tests within. This technique is essential when testing against a non-SQLite database because building of a schema is expensive, and also is most ideally accomplished using the applications schema migration which are even more vastly slow than a straight create_all().

This mixin provides the .resources attribute required by testresources when using the OptimisingTestSuite.The .resources attribute then provides a collection of testresources.TestResourceManager objects, which are defined here in oslo_db.sqlalchemy.provision. These objects know how to find available database backends, build up temporary databases, and invoke schema generation and teardown instructions. The actual “build the schema objects” part of the equation, and optionally a “delete from all the tables” step, is provided by the implementing application itself.

FIXTURE

alias of OpportunisticDbFixture

SKIP_ON_UNAVAILABLE_DB = True
generate_fixtures()
property resources

Provide a collection of TestResourceManager objects.

The collection here is memoized, both at the level of the test case itself, as well as in the fixture object(s) which provide those resources.

setUp()
class oslo_db.sqlalchemy.test_fixtures.OpportunisticDbFixture(test, driver=None, ident=None)

Bases: BaseDbFixture

Fixture which uses testresources fully for optimised runs.

This fixture relies upon the use of the OpportunisticDBTestMixin to supply a test.resources attribute, and also works much more effectively when combined the testresources.OptimisingTestSuite. The optimize_package_test_loader() function should be used at the module and package levels to optimize database provisioning across many tests.

class oslo_db.sqlalchemy.test_fixtures.PostgresqlOpportunisticFixture(test, driver=None, ident=None)

Bases: OpportunisticDbFixture

DRIVER = 'postgresql'
class oslo_db.sqlalchemy.test_fixtures.ReplaceEngineFacadeFixture(enginefacade, replace_with_enginefacade)

Bases: Fixture

A fixture that will plug the engine of one enginefacade into another.

This fixture can be used by test suites that already have their own non- oslo_db database setup / teardown schemes, to plug any URL or test-oriented enginefacade as-is into an enginefacade-oriented API.

For applications that use oslo.db’s testing fixtures, the ReplaceEngineFacade fixture is used internally.

E.g.:

class MyDBTest(TestCase):

    def setUp(self):
        from myapplication.api import main_enginefacade

        my_test_enginefacade = enginefacade.transaction_context()
        my_test_enginefacade.configure(connection=my_test_url)

        self.useFixture(
            ReplaceEngineFacadeFixture(
                main_enginefacade, my_test_enginefacade))

Above, the main_enginefacade object is the normal application level one, and my_test_enginefacade is a local one that we’ve created to refer to some testing database. Throughout the fixture’s setup, the application level enginefacade will use the engine factory and engines of the testing enginefacade, and at fixture teardown will be replaced back.

class oslo_db.sqlalchemy.test_fixtures.ResetsData

Bases: object

Mixin defining a fixture that resets schema data without dropping.

reset_schema_data(engine, enginefacade)

Reset the data in the schema.

setup_for_reset(engine, enginefacade)

“Perform setup that may be needed before the test runs.

class oslo_db.sqlalchemy.test_fixtures.SimpleDbFixture(driver=None, ident=None)

Bases: BaseDbFixture

Fixture which provides an engine from a fixed URL.

The SimpleDbFixture is generally appropriate only for a SQLite memory database, as this database is naturally isolated from other processes and does not require management of schemas. For tests that need to run specifically against MySQL or Postgresql, the OpportunisticDbFixture is more appropriate.

The database connection information itself comes from the provisoning system, matching the desired driver (typically sqlite) to the default URL that provisioning provides for this driver (in the case of sqlite, it’s the SQLite memory URL, e.g. sqlite://. For MySQL and Postgresql, it’s the familiar “openstack_citest” URL on localhost).

There are a variety of create/drop schemes that can take place:

  • The default is to procure a database connection on setup, and at teardown, an instruction is issued to “drop” all objects in the schema (e.g. tables, indexes). The SQLAlchemy engine itself remains referenced at the class level for subsequent re-use.

  • When the GeneratesSchema or GeneratesSchemaFromMigrations mixins are implemented, the appropriate generate_schema method is also called when the fixture is set up, by default this is per test.

  • When the DeletesFromSchema mixin is implemented, the generate_schema method is now only called once, and the “drop all objects” system is replaced with the delete_from_schema method. This allows the same database to remain set up with all schema objects intact, so that expensive migrations need not be run on every test.

  • The fixture does not dispose the engine at the end of a test. It is assumed the same engine will be re-used many times across many tests. The AdHocDbFixture extends this one to provide engine.dispose() at the end of a test.

This fixture is intended to work without needing a reference to the test itself, and therefore cannot take advantage of the OptimisingTestSuite.

oslo_db.sqlalchemy.test_fixtures.optimize_module_test_loader()

Organize module-level tests into a testresources.OptimizingTestSuite.

This function provides a unittest-compatible load_tests hook for a given module; for per-package, use the optimize_package_test_loader() function.

When a unitest or subunit style test runner is used, the function will be called in order to return a TestSuite containing the tests to run; this function ensures that this suite is an OptimisingTestSuite, which will organize the production of test resources across groups of tests at once.

The function is invoked as:

from oslo_db.sqlalchemy import test_base

load_tests = test_base.optimize_module_test_loader()

The loader must be present in an individual module, and not the package level __init__.py.

The function also applies testscenarios expansion to all test collections. This so that an existing test suite that already needs to build TestScenarios from a load_tests call can still have this take place when replaced with this function.

oslo_db.sqlalchemy.test_fixtures.optimize_package_test_loader(file_)

Organize package-level tests into a testresources.OptimizingTestSuite.

This function provides a unittest-compatible load_tests hook for a given package; for per-module, use the optimize_module_test_loader() function.

When a unitest or subunit style test runner is used, the function will be called in order to return a TestSuite containing the tests to run; this function ensures that this suite is an OptimisingTestSuite, which will organize the production of test resources across groups of tests at once.

The function is invoked as:

from oslo_db.sqlalchemy import test_base

load_tests = test_base.optimize_package_test_loader(__file__)

The loader must be present in the package level __init__.py.

The function also applies testscenarios expansion to all test collections. This so that an existing test suite that already needs to build TestScenarios from a load_tests call can still have this take place when replaced with this function.

oslo_db.sqlalchemy.test_migrations module

class oslo_db.sqlalchemy.test_migrations.ModelsMigrationsSync

Bases: object

A helper class for comparison of DB migration scripts and models.

It’s intended to be inherited by test cases in target projects. They have to provide implementations for methods used internally in the test (as we have no way to implement them here).

test_model_sync() will run migration scripts for the engine provided and then compare the given metadata to the one reflected from the database. The difference between MODELS and MIGRATION scripts will be printed and the test will fail, if the difference is not empty. The return value is really a list of actions, that should be performed in order to make the current database schema state (i.e. migration scripts) consistent with models definitions. It’s left up to developers to analyze the output and decide whether the models definitions or the migration scripts should be modified to make them consistent.

Output:

[(
    'add_table',
    description of the table from models
),
(
    'remove_table',
    description of the table from database
),
(
    'add_column',
    schema,
    table name,
    column description from models
),
(
    'remove_column',
    schema,
    table name,
    column description from database
),
(
    'add_index',
    description of the index from models
),
(
    'remove_index',
    description of the index from database
),
(
    'add_constraint',
    description of constraint from models
),
(
    'remove_constraint,
    description of constraint from database
),
(
    'modify_nullable',
    schema,
    table name,
    column name,
    {
        'existing_type': type of the column from database,
        'existing_server_default': default value from database
    },
    nullable from database,
    nullable from models
),
(
    'modify_type',
    schema,
    table name,
    column name,
    {
        'existing_nullable': database nullable,
        'existing_server_default': default value from database
    },
    database column type,
    type of the column from models
),
(
    'modify_default',
    schema,
    table name,
    column name,
    {
        'existing_nullable': database nullable,
        'existing_type': type of the column from database
    },
    connection column default value,
    default from models
)]

Method include_object() can be overridden to exclude some tables from comparison (e.g. migrate_repo).

compare_server_default(ctxt, ins_col, meta_col, insp_def, meta_def, rendered_meta_def)

Compare default values between model and db table.

Return True if the defaults are different, False if not, or None to allow the default implementation to compare these defaults.

Parameters:
  • ctxt – alembic MigrationContext instance

  • insp_col – reflected column

  • meta_col – column from model

  • insp_def – reflected column default value

  • meta_def – column default value from model

  • rendered_meta_def – rendered column default value (from model)

compare_type(ctxt, insp_col, meta_col, insp_type, meta_type)

Return True if types are different, False if not.

Return None to allow the default implementation to compare these types.

Parameters:
  • ctxt – alembic MigrationContext instance

  • insp_col – reflected column

  • meta_col – column from model

  • insp_type – reflected column type

  • meta_type – column type from model

abstract db_sync(engine)

Run migration scripts with the given engine instance.

This method must be implemented in subclasses and run migration scripts for a DB the given engine is connected to.

filter_metadata_diff(diff)

Filter changes before assert in test_models_sync().

Allow subclasses to whitelist/blacklist changes. By default, no filtering is performed, changes are returned as is.

Parameters:

diff – a list of differences (see compare_metadata() docs for details on format)

Returns:

a list of differences

abstract get_engine()

Return the engine instance to be used when running tests.

This method must be implemented in subclasses and return an engine instance to be used when running tests.

abstract get_metadata()

Return the metadata instance to be used for schema comparison.

This method must be implemented in subclasses and return the metadata instance attached to the BASE model.

include_object(object_, name, type_, reflected, compare_to)

Return True for objects that should be compared.

Parameters:
  • object – a SchemaItem object such as a Table or Column object

  • name – the name of the object

  • type – a string describing the type of object (e.g. “table”)

  • reflected – True if the given object was produced based on table reflection, False if it’s from a local MetaData object

  • compare_to – the object being compared against, if available, else None

test_models_sync()
class oslo_db.sqlalchemy.test_migrations.WalkVersionsMixin

Bases: object

Test mixin to check upgrade and downgrade ability of migration.

This is only suitable for testing of migrate migration scripts. An abstract class mixin. INIT_VERSION, REPOSITORY and migration_api attributes must be implemented in subclasses.

Auxiliary Methods:

migrate_up and migrate_down instance methods of the class can be used with auxiliary methods named _pre_upgrade_<revision_id>, _check_<revision_id>, _post_downgrade_<revision_id>. The methods intended to check applied changes for correctness of data operations. This methods should be implemented for every particular revision which you want to check with data. Implementation recommendations for _pre_upgrade_<revision_id>, _check_<revision_id>, _post_downgrade_<revision_id> implementation:

  • _pre_upgrade_<revision_id>: provide a data appropriate to

    a next revision. Should be used an id of revision which going to be applied.

  • _check_<revision_id>: Insert, select, delete operations

    with newly applied changes. The data provided by _pre_upgrade_<revision_id> will be used.

  • _post_downgrade_<revision_id>: check for absence (inability to use) changes provided by reverted revision.

Execution order of auxiliary methods when revision is upgrading:

_pre_upgrade_### => upgrade => _check_###

Execution order of auxiliary methods when revision is downgrading:

downgrade => _post_downgrade_###

abstract property INIT_VERSION

Initial version of a migration repository.

Can be different from 0, if a migrations were squashed.

Return type:

int

abstract property REPOSITORY

Allows basic manipulation with migration repository.

Returns:

migrate.versioning.repository.Repository subclass.

migrate_down(version, with_data=False)

Migrate down to a previous version of the db.

Parameters:
  • version (str) – id of revision to downgrade.

  • with_data (Bool) – Whether to verify the absence of changes from migration(s) being downgraded, see Auxiliary Methods.

abstract property migrate_engine

Provides engine instance.

Should be the same instance as used when migrations are applied. In most cases, the engine attribute provided by the test class in a setUp method will work.

Example of implementation:

def migrate_engine(self):

return self.engine

Returns:

sqlalchemy engine instance

migrate_up(version, with_data=False)

Migrate up to a new version of the db.

Parameters:
  • version (str) – id of revision to upgrade.

  • with_data (Bool) – Whether to verify the applied changes with data, see Auxiliary Methods.

abstract property migration_api

Provides API for upgrading, downgrading and version manipulations.

Returns:

migrate.api or overloaded analog.

walk_versions(snake_walk=False, downgrade=True)

Check if migration upgrades and downgrades successfully.

Determine the latest version script from the repo, then upgrade from 1 through to the latest, with no data in the databases. This just checks that the schema itself upgrades successfully.

walk_versions calls migrate_up and migrate_down with with_data argument to check changes with data, but these methods can be called without any extra check outside of walk_versions method.

Parameters:
  • snake_walk (bool) –

    enables checking that each individual migration can be upgraded/downgraded by itself.

    If we have ordered migrations 123abc, 456def, 789ghi and we run upgrading with the snake_walk argument set to True, the migrations will be applied in the following order:

    `123abc => 456def => 123abc =>
     456def => 789ghi => 456def => 789ghi`
    

  • downgrade (bool) – Check downgrade behavior if True.

oslo_db.sqlalchemy.types module

class oslo_db.sqlalchemy.types.JsonEncodedDict(mysql_as_long=False, mysql_as_medium=False)

Bases: JsonEncodedType

Represents dict serialized as json-encoded string in db.

Note that this type does NOT track mutations. If you want to update it, you have to assign existing value to a temporary variable, update, then assign back. See this page for more robust work around: http://docs.sqlalchemy.org/en/rel_1_0/orm/extensions/mutable.html

type

alias of dict

class oslo_db.sqlalchemy.types.JsonEncodedList(mysql_as_long=False, mysql_as_medium=False)

Bases: JsonEncodedType

Represents list serialized as json-encoded string in db.

Note that this type does NOT track mutations. If you want to update it, you have to assign existing value to a temporary variable, update, then assign back. See this page for more robust work around: http://docs.sqlalchemy.org/en/rel_1_0/orm/extensions/mutable.html

type

alias of list

class oslo_db.sqlalchemy.types.JsonEncodedType(mysql_as_long=False, mysql_as_medium=False)

Bases: TypeDecorator

Base column type for data serialized as JSON-encoded string in db.

cache_ok = True

This type is safe to cache.

impl

alias of Text

process_bind_param(value, dialect)

Bind parameters to the process.

process_result_value(value, dialect)

Process result value.

type = None
class oslo_db.sqlalchemy.types.SoftDeleteInteger(*args, **kwargs)

Bases: TypeDecorator

Coerce a bound param to be a proper integer before passing it to DBAPI.

Some backends like PostgreSQL are very strict about types and do not perform automatic type casts, e.g. when trying to INSERT a boolean value like false into an integer column. Coercing of the bound param in DB layer by the means of a custom SQLAlchemy type decorator makes sure we always pass a proper integer value to a DBAPI implementation.

This is not a general purpose boolean integer type as it specifically allows for arbitrary positive integers outside of the boolean int range (0, 1, False, True), so that it’s possible to have compound unique constraints over multiple columns including deleted (e.g. to soft-delete flavors with the same name in Nova without triggering a constraint violation): deleted is set to be equal to a PK int value on deletion, 0 denotes a non-deleted row.

cache_ok = True

This type is safe to cache.

impl

alias of Integer

process_bind_param(value, dialect)

Return the binding parameter.

class oslo_db.sqlalchemy.types.String(length, mysql_ndb_length=None, mysql_ndb_type=None, **kw)

Bases: String

String subclass that implements oslo_db specific options.

Initial goal is to support ndb-specific flags.

mysql_ndb_type is used to override the String with another data type. mysql_ndb_size is used to adjust the length of the String.

oslo_db.sqlalchemy.update_match module

exception oslo_db.sqlalchemy.update_match.CantUpdateException

Bases: Exception

exception oslo_db.sqlalchemy.update_match.MultiRowsMatched

Bases: CantUpdateException

exception oslo_db.sqlalchemy.update_match.NoRowsMatched

Bases: CantUpdateException

oslo_db.sqlalchemy.update_match.manufacture_criteria(mapped, values)

Given a mapper/class and a namespace of values, produce a WHERE clause.

The class should be a mapped class and the entries in the dictionary correspond to mapped attribute names on the class.

A value may also be a tuple in which case that particular attribute will be compared to a tuple using IN. The scalar value or tuple can also contain None which translates to an IS NULL, that is properly joined with OR against an IN expression if appropriate.

Parameters:
  • cls – a mapped class, or actual Mapper object.

  • values – dictionary of values.

oslo_db.sqlalchemy.update_match.manufacture_entity_criteria(entity, include_only=None, exclude=None)

Given a mapped instance, produce a WHERE clause.

The attributes set upon the instance will be combined to produce a SQL expression using the mapped SQL expressions as the base of comparison.

Values on the instance may be set as tuples in which case the criteria will produce an IN clause. None is also acceptable as a scalar or tuple entry, which will produce IS NULL that is properly joined with an OR against an IN expression if appropriate.

Parameters:
  • entity – a mapped entity.

  • include_only – optional sequence of keys to limit which keys are included.

  • exclude – sequence of keys to exclude

oslo_db.sqlalchemy.update_match.manufacture_persistent_object(session, specimen, values=None, primary_key=None)

Make an ORM-mapped object persistent in a Session without SQL.

The persistent object is returned.

If a matching object is already present in the given session, the specimen is merged into it and the persistent object returned. Otherwise, the specimen itself is made persistent and is returned.

The object must contain a full primary key, or provide it via the values or primary_key parameters. The object is peristed to the Session in a “clean” state with no pending changes.

Parameters:
  • session – A Session object.

  • specimen – a mapped object which is typically transient.

  • values – a dictionary of values to be applied to the specimen, in addition to the state that’s already on it. The attributes will be set such that no history is created; the object remains clean.

  • primary_key – optional tuple-based primary key. This will also be applied to the instance if present.

oslo_db.sqlalchemy.update_match.update_on_match(query, specimen, surrogate_key, values=None, attempts=3, include_only=None, process_query=None, handle_failure=None)

Emit an UPDATE statement matching the given specimen.

E.g.:

with enginefacade.writer() as session:
    specimen = MyInstance(
        uuid='ccea54f',
        interface_id='ad33fea',
        vm_state='SOME_VM_STATE',
    )

    values = {
        'vm_state': 'SOME_NEW_VM_STATE'
    }

    base_query = model_query(
        context, models.Instance,
        project_only=True, session=session)

    hostname_query = model_query(
            context, models.Instance, session=session,
            read_deleted='no').
        filter(func.lower(models.Instance.hostname) == 'SOMEHOSTNAME')

    surrogate_key = ('uuid', )

    def process_query(query):
        return query.where(~exists(hostname_query))

    def handle_failure(query):
        try:
            instance = base_query.one()
        except NoResultFound:
            raise exception.InstanceNotFound(instance_id=instance_uuid)

        if session.query(hostname_query.exists()).scalar():
            raise exception.InstanceExists(
                name=values['hostname'].lower())

        # try again
        return False

    persistent_instance = base_query.update_on_match(
        specimen,
        surrogate_key,
        values=values,
        process_query=process_query,
        handle_failure=handle_failure
    )

The UPDATE statement is constructed against the given specimen using those values which are present to construct a WHERE clause. If the specimen contains additional values to be ignored, the include_only parameter may be passed which indicates a sequence of attributes to use when constructing the WHERE.

The UPDATE is performed against an ORM Query, which is created from the given Session, or alternatively by passing the `query parameter referring to an existing query.

Before the query is invoked, it is also passed through the callable sent as process_query, if present. This hook allows additional criteria to be added to the query after it is created but before invocation.

The function will then invoke the UPDATE statement and check for “success” one or more times, up to a maximum of that passed as attempts.

The initial check for “success” from the UPDATE statement is that the number of rows returned matches 1. If zero rows are matched, then the UPDATE statement is assumed to have “failed”, and the failure handling phase begins.

The failure handling phase involves invoking the given handle_failure function, if any. This handler can perform additional queries to attempt to figure out why the UPDATE didn’t match any rows. The handler, upon detection of the exact failure condition, should throw an exception to exit; if it doesn’t, it has the option of returning True or False, where False means the error was not handled, and True means that there was not in fact an error, and the function should return successfully.

If the failure handler is not present, or returns False after attempts number of attempts, then the function overall raises CantUpdateException. If the handler returns True, then the function returns with no error.

The return value of the function is a persistent version of the given specimen; this may be the specimen itself, if no matching object were already present in the session; otherwise, the existing object is returned, with the state of the specimen merged into it. The returned persistent object will have the given values populated into the object.

The object is is returned as “persistent”, meaning that it is associated with the given Session and has an identity key (that is, a real primary key value).

In order to produce this identity key, a strategy must be used to determine it as efficiently and safely as possible:

  1. If the given specimen already contained its primary key attributes fully populated, then these attributes were used as criteria in the UPDATE, so we have the primary key value; it is populated directly.

  2. If the target backend supports RETURNING, then when the update() query is performed with a RETURNING clause so that the matching primary key is returned atomically. This currently includes Postgresql, Oracle and others (notably not MySQL or SQLite).

  3. If the target backend is MySQL, and the given model uses a single-column, AUTO_INCREMENT integer primary key value (as is the case for Nova), MySQL’s recommended approach of making use of LAST_INSERT_ID(expr) is used to atomically acquire the matching primary key value within the scope of the UPDATE statement, then it fetched immediately following by using SELECT LAST_INSERT_ID(). http://dev.mysql.com/doc/refman/5.0/en/information- functions.html#function_last-insert-id

  4. Otherwise, for composite keys on MySQL or other backends such as SQLite, the row as UPDATED must be re-fetched in order to acquire the primary key value. The surrogate_key parameter is used for this in order to re-fetch the row; this is a column name with a known, unique value where the object can be fetched.

oslo_db.sqlalchemy.update_match.update_returning_pk(query, values, surrogate_key)

Perform an UPDATE, returning the primary key of the matched row.

The primary key is returned using a selection of strategies:

  • if the database supports RETURNING, RETURNING is used to retrieve the primary key values inline.

  • If the database is MySQL and the entity is mapped to a single integer primary key column, MySQL’s last_insert_id() function is used inline within the UPDATE and then upon a second SELECT to get the value.

  • Otherwise, a “refetch” strategy is used, where a given “surrogate” key value (typically a UUID column on the entity) is used to run a new SELECT against that UUID. This UUID is also placed into the UPDATE query to ensure the row matches.

Parameters:
  • query – a Query object with existing criterion, against a single entity.

  • values – a dictionary of values to be updated on the row.

  • surrogate_key – a tuple of (attrname, value), referring to a UNIQUE attribute that will also match the row. This attribute is used to retrieve the row via a SELECT when no optimized strategy exists.

Returns:

the primary key, returned as a tuple. Is only returned if rows matched is one. Otherwise, CantUpdateException is raised.

oslo_db.sqlalchemy.utils module

class oslo_db.sqlalchemy.utils.DialectFunctionDispatcher

Bases: object

dispatch_for(expr)
classmethod dispatch_for_dialect(expr, multiple=False)

Provide dialect-specific functionality within distinct functions.

e.g.:

@dispatch_for_dialect("*")
def set_special_option(engine):
    pass

@set_special_option.dispatch_for("sqlite")
def set_sqlite_special_option(engine):
    return engine.execute("sqlite thing")

@set_special_option.dispatch_for("mysql+mysqldb")
def set_mysqldb_special_option(engine):
    return engine.execute("mysqldb thing")

After the above registration, the set_special_option() function is now a dispatcher, given a SQLAlchemy Engine, Connection, URL string, or sqlalchemy.engine.URL object:

eng = create_engine('...')
result = set_special_option(eng)

The filter system supports two modes, “multiple” and “single”. The default is “single”, and requires that one and only one function match for a given backend. In this mode, the function may also have a return value, which will be returned by the top level call.

“multiple” mode, on the other hand, does not support return arguments, but allows for any number of matching functions, where each function will be called:

# the initial call sets this up as a "multiple" dispatcher
@dispatch_for_dialect("*", multiple=True)
def set_options(engine):
    # set options that apply to *all* engines

@set_options.dispatch_for("postgresql")
def set_postgresql_options(engine):
    # set options that apply to all Postgresql engines

@set_options.dispatch_for("postgresql+psycopg2")
def set_postgresql_psycopg2_options(engine):
    # set options that apply only to "postgresql+psycopg2"

@set_options.dispatch_for("*+pyodbc")
def set_pyodbc_options(engine):
    # set options that apply to all pyodbc backends

Note that in both modes, any number of additional arguments can be accepted by member functions. For example, to populate a dictionary of options, it may be passed in:

@dispatch_for_dialect("*", multiple=True)
def set_engine_options(url, opts):
    pass

@set_engine_options.dispatch_for("mysql+mysqldb")
def _mysql_set_default_charset_to_utf8(url, opts):
    opts.setdefault('charset', 'utf-8')

@set_engine_options.dispatch_for("sqlite")
def _set_sqlite_in_memory_check_same_thread(url, opts):
    if url.database in (None, 'memory'):
        opts['check_same_thread'] = False

opts = {}
set_engine_options(url, opts)

The driver specifiers are of the form: <database | *>[+<driver | *>]. That is, database name or “*”, followed by an optional + sign with driver or “*”. Omitting the driver name implies all drivers for that database.

dispatch_on_drivername(drivername)

Return a sub-dispatcher for the given drivername.

This provides a means of calling a different function, such as the “*” function, for a given target object that normally refers to a sub-function.

class oslo_db.sqlalchemy.utils.DialectMultiFunctionDispatcher

Bases: DialectFunctionDispatcher

class oslo_db.sqlalchemy.utils.DialectSingleFunctionDispatcher

Bases: DialectFunctionDispatcher

oslo_db.sqlalchemy.utils.add_index(engine, table_name, index_name, idx_columns)

Create an index for given columns.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

  • index_name – name of the index

  • idx_columns – tuple with names of columns that will be indexed

oslo_db.sqlalchemy.utils.change_deleted_column_type_to_boolean(engine, table_name, **col_name_col_instance)
oslo_db.sqlalchemy.utils.change_deleted_column_type_to_id_type(engine, table_name, **col_name_col_instance)
oslo_db.sqlalchemy.utils.change_index_columns(engine, table_name, index_name, new_columns)

Change set of columns that are indexed by given index.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

  • index_name – name of the index

  • new_columns – tuple with names of columns that will be indexed

oslo_db.sqlalchemy.utils.column_exists(engine, table_name, column)

Check if table has given column.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

  • column – name of the colmn

oslo_db.sqlalchemy.utils.dispatch_for_dialect(expr, multiple=False)

Provide dialect-specific functionality within distinct functions.

e.g.:

@dispatch_for_dialect("*")
def set_special_option(engine):
    pass

@set_special_option.dispatch_for("sqlite")
def set_sqlite_special_option(engine):
    return engine.execute("sqlite thing")

@set_special_option.dispatch_for("mysql+mysqldb")
def set_mysqldb_special_option(engine):
    return engine.execute("mysqldb thing")

After the above registration, the set_special_option() function is now a dispatcher, given a SQLAlchemy Engine, Connection, URL string, or sqlalchemy.engine.URL object:

eng = create_engine('...')
result = set_special_option(eng)

The filter system supports two modes, “multiple” and “single”. The default is “single”, and requires that one and only one function match for a given backend. In this mode, the function may also have a return value, which will be returned by the top level call.

“multiple” mode, on the other hand, does not support return arguments, but allows for any number of matching functions, where each function will be called:

# the initial call sets this up as a "multiple" dispatcher
@dispatch_for_dialect("*", multiple=True)
def set_options(engine):
    # set options that apply to *all* engines

@set_options.dispatch_for("postgresql")
def set_postgresql_options(engine):
    # set options that apply to all Postgresql engines

@set_options.dispatch_for("postgresql+psycopg2")
def set_postgresql_psycopg2_options(engine):
    # set options that apply only to "postgresql+psycopg2"

@set_options.dispatch_for("*+pyodbc")
def set_pyodbc_options(engine):
    # set options that apply to all pyodbc backends

Note that in both modes, any number of additional arguments can be accepted by member functions. For example, to populate a dictionary of options, it may be passed in:

@dispatch_for_dialect("*", multiple=True)
def set_engine_options(url, opts):
    pass

@set_engine_options.dispatch_for("mysql+mysqldb")
def _mysql_set_default_charset_to_utf8(url, opts):
    opts.setdefault('charset', 'utf-8')

@set_engine_options.dispatch_for("sqlite")
def _set_sqlite_in_memory_check_same_thread(url, opts):
    if url.database in (None, 'memory'):
        opts['check_same_thread'] = False

opts = {}
set_engine_options(url, opts)

The driver specifiers are of the form: <database | *>[+<driver | *>]. That is, database name or “*”, followed by an optional + sign with driver or “*”. Omitting the driver name implies all drivers for that database.

oslo_db.sqlalchemy.utils.drop_index(engine, table_name, index_name)

Drop index with given name.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

  • index_name – name of the index

oslo_db.sqlalchemy.utils.drop_old_duplicate_entries_from_table(engine, table_name, use_soft_delete, *uc_column_names)

Drop all old rows having the same values for columns in uc_columns.

This method drop (or mark ad deleted if use_soft_delete is True) old duplicate rows form table with name table_name.

Parameters:
  • engine – Sqlalchemy engine

  • table_name – Table with duplicates

  • use_soft_delete – If True - values will be marked as deleted, if False - values will be removed from table

  • uc_column_names – Unique constraint columns

oslo_db.sqlalchemy.utils.get_db_connection_info(conn_pieces)
oslo_db.sqlalchemy.utils.get_foreign_key_constraint_name(engine, table_name, column_name)

Find the name of foreign key in a table, given constrained column name.

Parameters:
  • engine – a SQLAlchemy engine (or connection)

  • table_name – name of table which contains the constraint

  • column_name – name of column that is constrained by the foreign key.

Returns:

the name of the first foreign key constraint which constrains the given column in the given table.

oslo_db.sqlalchemy.utils.get_indexes(engine, table_name)

Get all index list from a given table.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

oslo_db.sqlalchemy.utils.get_non_innodb_tables(connectable, skip_tables=('migrate_version', 'alembic_version'))

Get a list of tables which don’t use InnoDB storage engine.

Parameters:
  • connectable – a SQLAlchemy Engine or a Connection instance

  • skip_tables – a list of tables which might have a different storage engine

oslo_db.sqlalchemy.utils.get_non_ndbcluster_tables(connectable, skip_tables=None)

Get a list of tables which don’t use MySQL Cluster (NDB) storage engine.

Parameters:
  • connectable – a SQLAlchemy Engine or Connection instance

  • skip_tables – a list of tables which might have a different storage engine

oslo_db.sqlalchemy.utils.get_table(engine, name)

Returns an sqlalchemy table dynamically from db.

Needed because the models don’t work for us in migrations as models will be far out of sync with the current data.

Warning

Do not use this method when creating ForeignKeys in database migrations because sqlalchemy needs the same MetaData object to hold information about the parent table and the reference table in the ForeignKey. This method uses a unique MetaData object per table object so it won’t work with ForeignKey creation.

oslo_db.sqlalchemy.utils.get_unique_keys(model)

Get a list of sets of unique model keys.

Parameters:

model – the ORM model class

Return type:

list of sets of strings

Returns:

unique model keys or None if unable to find them

oslo_db.sqlalchemy.utils.index_exists(engine, table_name, index_name)

Check if given index exists.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

  • index_name – name of the index

oslo_db.sqlalchemy.utils.index_exists_on_columns(engine, table_name, columns)

Check if an index on given columns exists.

Parameters:
  • engine – sqlalchemy engine

  • table_name – name of the table

  • columns – a list type of columns that will be checked

oslo_db.sqlalchemy.utils.model_query(model, session, args=None, **kwargs)

Query helper for db.sqlalchemy api methods.

This accounts for deleted and project_id fields.

Parameters:
  • model (models.ModelBase) – Model to query. Must be a subclass of ModelBase.

  • session (sqlalchemy.orm.session.Session) – The session to use.

  • args (tuple) – Arguments to query. If None - model is used.

Keyword arguments:

Parameters:
  • project_id (iterable, model.__table__.columns.project_id.type, None type) – If present, allows filtering by project_id(s). Can be either a project_id value, or an iterable of project_id values, or None. If an iterable is passed, only rows whose project_id column value is on the project_id list will be returned. If None is passed, only rows which are not bound to any project, will be returned.

  • deleted (bool) – If present, allows filtering by deleted field. If True is passed, only deleted entries will be returned, if False - only existing entries.

Usage:

from oslo_db.sqlalchemy import utils


def get_instance_by_uuid(uuid):
    session = get_session()
    with session.begin()
        return (utils.model_query(models.Instance, session=session)
                     .filter(models.Instance.uuid == uuid)
                     .first())

def get_nodes_stat():
    data = (Node.id, Node.cpu, Node.ram, Node.hdd)

    session = get_session()
    with session.begin()
        return utils.model_query(Node, session=session, args=data).all()

Also you can create your own helper, based on utils.model_query(). For example, it can be useful if you plan to use project_id and deleted parameters from project’s context

from oslo_db.sqlalchemy import utils


def _model_query(context, model, session=None, args=None,
                 project_id=None, project_only=False,
                 read_deleted=None):

    # We suppose, that functions ``_get_project_id()`` and
    # ``_get_deleted()`` should handle passed parameters and
    # context object (for example, decide, if we need to restrict a user
    # to query his own entries by project_id or only allow admin to read
    # deleted entries). For return values, we expect to get
    # ``project_id`` and ``deleted``, which are suitable for the
    # ``model_query()`` signature.
    kwargs = {}
    if project_id is not None:
        kwargs['project_id'] = _get_project_id(context, project_id,
                                               project_only)
    if read_deleted is not None:
        kwargs['deleted'] = _get_deleted_dict(context, read_deleted)
    session = session or get_session()

    with session.begin():
        return utils.model_query(model, session=session,
                                 args=args, **kwargs)

def get_instance_by_uuid(context, uuid):
    return (_model_query(context, models.Instance, read_deleted='yes')
                  .filter(models.Instance.uuid == uuid)
                  .first())

def get_nodes_data(context, project_id, project_only='allow_none'):
    data = (Node.id, Node.cpu, Node.ram, Node.hdd)

    return (_model_query(context, Node, args=data, project_id=project_id,
                         project_only=project_only)
                  .all())
oslo_db.sqlalchemy.utils.paginate_query(query, model, limit, sort_keys, marker=None, sort_dir=None, sort_dirs=None)

Returns a query with sorting / pagination criteria added.

Pagination works by requiring a unique sort_key, specified by sort_keys. (If sort_keys is not unique, then we risk looping through values.) We use the last row in the previous page as the ‘marker’ for pagination. So we must return values that follow the passed marker in the order. With a single-valued sort_key, this would be easy: sort_key > X. With a compound-values sort_key, (k1, k2, k3) we must do this to repeat the lexicographical ordering: (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)

We also have to cope with different sort_directions and cases where k2, k3, … are nullable.

Typically, the id of the last row is used as the client-facing pagination marker, then the actual marker object must be fetched from the db and passed in to us as marker.

The “offset” parameter is intentionally avoided. As offset requires a full scan through the preceding results each time, criteria-based pagination is preferred. See http://use-the-index-luke.com/no-offset for further background.

Parameters:
  • query – the query object to which we should add paging/sorting

  • model – the ORM model class

  • limit – maximum number of items to return

  • sort_keys – array of attributes by which results should be sorted

  • marker – the last item of the previous page; we returns the next results after this value.

  • sort_dir – direction in which results should be sorted (asc, desc) suffix -nullsfirst, -nullslast can be added to defined the ordering of null values

  • sort_dirs – per-column array of sort_dirs, corresponding to sort_keys

Return type:

sqlalchemy.orm.query.Query

Returns:

The query with sorting/pagination added.

oslo_db.sqlalchemy.utils.sanitize_db_url(url)
oslo_db.sqlalchemy.utils.suspend_fk_constraints_for_col_alter(engine, table_name, column_name, referents=[])

Detect foreign key constraints, drop, and recreate.

This is used to guard against a column ALTER that on some backends cannot proceed unless foreign key constraints are not present.

e.g.:

from oslo_db.sqlalchemy.util import (
    suspend_fk_constraints_for_col_alter
)

with suspend_fk_constraints_for_col_alter(
    migrate_engine, "user_table",
    referents=[
        "local_user", "nonlocal_user", "project"
    ]):
    user_table.c.domain_id.alter(nullable=False)
Parameters:
  • engine – a SQLAlchemy engine (or connection)

  • table_name – target table name. All foreign key constraints that refer to the table_name / column_name will be dropped and recreated.

  • column_name – target column name. all foreign key constraints which refer to this column, either partially or fully, will be dropped and recreated.

  • referents – sequence of string table names to search for foreign key constraints. A future version of this function may no longer require this argument, however for the moment it is required.

oslo_db.sqlalchemy.utils.to_list(x, default=None)

Module contents

Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.

Page Contents