Source code for kombu.transport.amqplib

"""
kombu.transport.amqplib
=======================

amqplib transport.

"""
from __future__ import absolute_import

import errno
import socket

from kombu.five import items
from kombu.utils.encoding import str_to_bytes
from kombu.utils.amq_manager import get_manager

from . import base

try:
    from ssl import SSLError
except ImportError:
    class SSLError(Exception):  # noqa
        pass
from struct import unpack


class NA(object):
    pass

try:
    from amqplib import client_0_8 as amqp
    from amqplib.client_0_8 import transport
    from amqplib.client_0_8.channel import Channel as _Channel
    from amqplib.client_0_8.exceptions import AMQPConnectionException
    from amqplib.client_0_8.exceptions import AMQPChannelException
except ImportError:  # pragma: no cover

    class NAx(object):
        pass
    amqp = NA
    amqp.Connection = NA
    transport = _Channel = NA                               # noqa
    # Sphinx crashes if this is NA, must be different class
    transport.TCPTransport = transport.SSLTransport = NAx
    AMQPConnectionException = AMQPChannelException = NA     # noqa

DEFAULT_PORT = 5672
HAS_MSG_PEEK = hasattr(socket, 'MSG_PEEK')

# amqplib's handshake mistakenly identifies as protocol version 1191,
# this breaks in RabbitMQ tip, which no longer falls back to
# 0-8 for unknown ids.
transport.AMQP_PROTOCOL_HEADER = str_to_bytes('AMQP\x01\x01\x08\x00')


# - fixes warnings when socket is not connected.
class TCPTransport(transport.TCPTransport):

    def read_frame(self):
        frame_type, channel, size = unpack('>BHI', self._read(7, True))
        payload = self._read(size)
        ch = ord(self._read(1))
        if ch == 206:  # '\xce'
            return frame_type, channel, payload
        else:
            raise Exception(
                'Framing Error, received 0x%02x while expecting 0xce' % ch)

    def _read(self, n, initial=False):
        read_buffer = self._read_buffer
        while len(read_buffer) < n:
            try:
                s = self.sock.recv(n - len(read_buffer))
            except socket.error as exc:
                if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
                    continue
                raise
            if not s:
                raise IOError('Socket closed')
            read_buffer += s

        result = read_buffer[:n]
        self._read_buffer = read_buffer[n:]

        return result

    def __del__(self):
        try:
            self.close()
        except Exception:
            pass
        finally:
            self.sock = None

transport.TCPTransport = TCPTransport


class SSLTransport(transport.SSLTransport):

    def __init__(self, host, connect_timeout, ssl):
        if isinstance(ssl, dict):
            self.sslopts = ssl
        self.sslobj = None

        transport._AbstractTransport.__init__(self, host, connect_timeout)

    def read_frame(self):
        frame_type, channel, size = unpack('>BHI', self._read(7, True))
        payload = self._read(size)
        ch = ord(self._read(1))
        if ch == 206:  # '\xce'
            return frame_type, channel, payload
        else:
            raise Exception(
                'Framing Error, received 0x%02x while expecting 0xce' % ch)

    def _read(self, n, initial=False):
        result = ''

        while len(result) < n:
            try:
                s = self.sslobj.read(n - len(result))
            except socket.error as exc:
                if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
                    continue
                raise
            if not s:
                raise IOError('Socket closed')
            result += s

        return result

    def __del__(self):
        try:
            self.close()
        except Exception:
            pass
        finally:
            self.sock = None
transport.SSLTransport = SSLTransport


[docs]class Connection(amqp.Connection): # pragma: no cover connected = True def _do_close(self, *args, **kwargs): # amqplib does not ignore socket errors when connection # is closed on the remote end. try: super(Connection, self)._do_close(*args, **kwargs) except socket.error: pass def _dispatch_basic_return(self, channel, args, msg): reply_code = args.read_short() reply_text = args.read_shortstr() exchange = args.read_shortstr() routing_key = args.read_shortstr() exc = AMQPChannelException(reply_code, reply_text, (50, 60)) if channel.events['basic_return']: for callback in channel.events['basic_return']: callback(exc, exchange, routing_key, msg) else: raise exc def __init__(self, *args, **kwargs): super(Connection, self).__init__(*args, **kwargs) self._method_override = {(60, 50): self._dispatch_basic_return}
[docs] def drain_events(self, timeout=None): """Wait for an event on a channel.""" chanmap = self.channels chanid, method_sig, args, content = self._wait_multiple( chanmap, None, timeout=timeout) channel = chanmap[chanid] if (content and channel.auto_decode and hasattr(content, 'content_encoding')): try: content.body = content.body.decode(content.content_encoding) except Exception: pass amqp_method = self._method_override.get(method_sig) or \ channel._METHOD_MAP.get(method_sig, None) if amqp_method is None: raise Exception('Unknown AMQP method (%d, %d)' % method_sig) if content is None: return amqp_method(channel, args) else: return amqp_method(channel, args, content)
[docs] def read_timeout(self, timeout=None): if timeout is None: return self.method_reader.read_method() sock = self.transport.sock prev = sock.gettimeout() if prev != timeout: sock.settimeout(timeout) try: try: return self.method_reader.read_method() except SSLError as exc: # http://bugs.python.org/issue10272 if 'timed out' in str(exc): raise socket.timeout() # Non-blocking SSL sockets can throw SSLError if 'The operation did not complete' in str(exc): raise socket.timeout() raise finally: if prev != timeout: sock.settimeout(prev)
def _wait_multiple(self, channels, allowed_methods, timeout=None): for channel_id, channel in items(channels): method_queue = channel.method_queue for queued_method in method_queue: method_sig = queued_method[0] if (allowed_methods is None or method_sig in allowed_methods or method_sig == (20, 40)): method_queue.remove(queued_method) method_sig, args, content = queued_method return channel_id, method_sig, args, content # Nothing queued, need to wait for a method from the peer read_timeout = self.read_timeout wait = self.wait while 1: channel, method_sig, args, content = read_timeout(timeout) if (channel in channels and allowed_methods is None or method_sig in allowed_methods or method_sig == (20, 40)): return channel, method_sig, args, content # Not the channel and/or method we were looking for. Queue # this method for later channels[channel].method_queue.append((method_sig, args, content)) # # If we just queued up a method for channel 0 (the Connection # itself) it's probably a close method in reaction to some # error, so deal with it right away. # if channel == 0: wait()
[docs] def channel(self, channel_id=None): try: return self.channels[channel_id] except KeyError: return Channel(self, channel_id)
[docs]class Message(base.Message): def __init__(self, channel, msg, **kwargs): props = msg.properties super(Message, self).__init__( channel, body=msg.body, delivery_tag=msg.delivery_tag, content_type=props.get('content_type'), content_encoding=props.get('content_encoding'), delivery_info=msg.delivery_info, properties=msg.properties, headers=props.get('application_headers') or {}, **kwargs)
[docs]class Channel(_Channel, base.StdChannel): Message = Message events = {'basic_return': set()} def __init__(self, *args, **kwargs): self.no_ack_consumers = set() super(Channel, self).__init__(*args, **kwargs)
[docs] def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Encapsulate data into a AMQP message.""" return amqp.Message(body, priority=priority, content_type=content_type, content_encoding=content_encoding, application_headers=headers, **properties)
[docs] def message_to_python(self, raw_message): """Convert encoded message body back to a Python value.""" return self.Message(self, raw_message)
[docs] def close(self): try: super(Channel, self).close() finally: self.connection = None
[docs] def basic_consume(self, *args, **kwargs): consumer_tag = super(Channel, self).basic_consume(*args, **kwargs) if kwargs['no_ack']: self.no_ack_consumers.add(consumer_tag) return consumer_tag
[docs] def basic_cancel(self, consumer_tag, **kwargs): self.no_ack_consumers.discard(consumer_tag) return super(Channel, self).basic_cancel(consumer_tag, **kwargs)
[docs]class Transport(base.Transport): Connection = Connection default_port = DEFAULT_PORT # it's very annoying that amqplib sometimes raises AttributeError # if the connection is lost, but nothing we can do about that here. connection_errors = ( base.Transport.connection_errors + ( AMQPConnectionException, socket.error, IOError, OSError, AttributeError) ) channel_errors = base.Transport.channel_errors + (AMQPChannelException, ) driver_name = 'amqplib' driver_type = 'amqp' supports_ev = True def __init__(self, client, **kwargs): self.client = client self.default_port = kwargs.get('default_port') or self.default_port if amqp is NA: raise ImportError('Missing amqplib library (pip install amqplib)')
[docs] def create_channel(self, connection): return connection.channel()
[docs] def drain_events(self, connection, **kwargs): return connection.drain_events(**kwargs)
[docs] def establish_connection(self): """Establish connection to the AMQP broker.""" conninfo = self.client for name, default_value in items(self.default_connection_params): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) if conninfo.hostname == 'localhost': conninfo.hostname = '127.0.0.1' conn = self.Connection(host=conninfo.host, userid=conninfo.userid, password=conninfo.password, login_method=conninfo.login_method, virtual_host=conninfo.virtual_host, insist=conninfo.insist, ssl=conninfo.ssl, connect_timeout=conninfo.connect_timeout) conn.client = self.client return conn
[docs] def close_connection(self, connection): """Close the AMQP broker connection.""" connection.client = None connection.close()
[docs] def is_alive(self, connection): if HAS_MSG_PEEK: sock = connection.transport.sock prev = sock.gettimeout() sock.settimeout(0.0001) try: sock.recv(1, socket.MSG_PEEK) except socket.timeout: pass except socket.error: return False finally: sock.settimeout(prev) return True
[docs] def verify_connection(self, connection): return connection.channels is not None and self.is_alive(connection)
[docs] def register_with_event_loop(self, connection, loop): loop.add_reader(connection.method_reader.source.sock, self.on_readable, connection, loop)
@property def default_connection_params(self): return {'userid': 'guest', 'password': 'guest', 'port': self.default_port, 'hostname': 'localhost', 'login_method': 'AMQPLAIN'}
[docs] def get_manager(self, *args, **kwargs): return get_manager(self.client, *args, **kwargs)