Source code for heat.common.messaging
# -*- coding: utf-8 -*-
# Copyright 2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_config import cfg
import oslo_messaging
from oslo_messaging.rpc import dispatcher
from osprofiler import profiler
from heat.common import context
TRANSPORT = None
NOTIFICATIONS_TRANSPORT = None
NOTIFIER = None
[docs]
class RequestContextSerializer(oslo_messaging.Serializer):
    def __init__(self, base):
        self._base = base
[docs]
    def serialize_entity(self, ctxt, entity):
        if not self._base:
            return entity
        return self._base.serialize_entity(ctxt, entity) 
[docs]
    def deserialize_entity(self, ctxt, entity):
        if not self._base:
            return entity
        return self._base.deserialize_entity(ctxt, entity) 
[docs]
    @staticmethod
    def serialize_context(ctxt):
        _context = ctxt.to_dict()
        prof = profiler.get()
        if prof:
            trace_info = {
                "hmac_key": prof.hmac_key,
                "base_id": prof.get_base_id(),
                "parent_id": prof.get_id()
            }
            _context.update({"trace_info": trace_info})
        return _context 
[docs]
    @staticmethod
    def deserialize_context(ctxt):
        trace_info = ctxt.pop("trace_info", None)
        if trace_info:
            profiler.init(**trace_info)
        return context.RequestContext.from_dict(ctxt) 
 
[docs]
def get_specific_transport(url, optional, exmods, is_for_notifications=False):
    try:
        if is_for_notifications:
            return oslo_messaging.get_notification_transport(
                cfg.CONF, url, allowed_remote_exmods=exmods)
        else:
            return oslo_messaging.get_rpc_transport(
                cfg.CONF, url, allowed_remote_exmods=exmods)
    except oslo_messaging.InvalidTransportURL as e:
        if not optional or e.url:
            # NOTE(sileht): oslo_messaging is configured but unloadable
            # so reraise the exception
            raise
        else:
            return None 
[docs]
def setup_transports(url, optional):
    global TRANSPORT, NOTIFICATIONS_TRANSPORT
    oslo_messaging.set_transport_defaults('heat')
    exmods = ['heat.common.exception']
    TRANSPORT = get_specific_transport(url, optional, exmods)
    NOTIFICATIONS_TRANSPORT = get_specific_transport(url, optional, exmods,
                                                     is_for_notifications=True) 
[docs]
def setup(url=None, optional=False):
    """Initialise the oslo_messaging layer."""
    global NOTIFIER
    if url and url.startswith("fake://"):
        # NOTE(sileht): oslo_messaging fake driver uses time.sleep
        # for task switch, so we need to monkey_patch it
        eventlet.monkey_patch(time=True)
    if not TRANSPORT or not NOTIFICATIONS_TRANSPORT:
        setup_transports(url, optional)
        # In the fake driver, make the dict of exchanges local to each exchange
        # manager, instead of using the shared class attribute. Doing otherwise
        # breaks the unit tests.
        if url and url.startswith("fake://"):
            TRANSPORT._driver._exchange_manager._exchanges = {}
    if not NOTIFIER and NOTIFICATIONS_TRANSPORT:
        serializer = RequestContextSerializer(
            oslo_messaging.JsonPayloadSerializer())
        NOTIFIER = oslo_messaging.Notifier(NOTIFICATIONS_TRANSPORT,
                                           serializer=serializer) 
[docs]
def cleanup():
    """Cleanup the oslo_messaging layer."""
    global TRANSPORT, NOTIFICATIONS_TRANSPORT, NOTIFIER
    if TRANSPORT:
        TRANSPORT.cleanup()
        NOTIFICATIONS_TRANSPORT.cleanup()
        TRANSPORT = NOTIFICATIONS_TRANSPORT = NOTIFIER = None 
[docs]
def get_rpc_server(target, endpoint):
    """Return a configured oslo_messaging rpc server."""
    serializer = RequestContextSerializer(
        oslo_messaging.JsonPayloadSerializer())
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
                                         executor='eventlet',
                                         serializer=serializer,
                                         access_policy=access_policy) 
[docs]
def get_rpc_client(**kwargs):
    """Return a configured oslo_messaging RPCClient."""
    target = oslo_messaging.Target(**kwargs)
    serializer = RequestContextSerializer(
        oslo_messaging.JsonPayloadSerializer())
    return oslo_messaging.get_rpc_client(
        TRANSPORT, target, serializer=serializer) 
[docs]
def get_notifier(publisher_id):
    """Return a configured oslo_messaging notifier."""
    return NOTIFIER.prepare(publisher_id=publisher_id)