Module rabotnik.messagebus

Classes

class MessageBus

Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem.

Implemented as publisher-subscriber model based on RabbitMQ (AMQP).

Expand source code
class MessageBus:
    """
    Messagebus for communication between loosely coupled Rabotnik instances and other
    systems, which are part of such an ecosystem.

    Implemented as publisher-subscriber model based on RabbitMQ (AMQP).
    """

    def __init__(self):
        self.channel = None
        self.connection = None

    def connect(self, max_retries: int = None, retry_interval: int = 1.0):
        """
        Establish and configure the broker connection.

        Args:
            max_retries (int, default: None):
                Optional number of times trying to connect to the broker. If not
                given, Rabotnik keeps trying forever to establish a connection.
            retry_interval (int, default: 1.0):
                Time in seconds that the program should pause before trying to connect again.
        """

        host = read_environment_variable("RABOTNIK_MESSAGE_BUS_HOST")
        username = read_environment_variable("RABOTNIK_MESSAGE_BUS_USER")
        password = read_environment_variable("RABOTNIK_MESSAGE_BUS_PASSWORD")

        url_connect = f"amqp://{username}:{password}@{host}/"
        logger.debug("connecting to %s", url_connect)

        self.connection = self._connect(url_connect, max_retries, retry_interval)
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=1)

    @staticmethod
    def _connect(
        url: str,
        max_retries: int = None,
        retry_interval: int = 1.0,
    ):
        """
        Establish connection to a broker with possible retries over a period of time.

        Args:
            url (str):
                URL of broker to connect to.
            max_retries (int, default: None):
                Optional number of times trying to connect to the broker. If not
                given, Rabotnik keeps trying forever to establish a connection.
            retry_interval (int, default: 1.0):
                Time in seconds that the program should pause before trying to connect again.

        Returns:
            Broker connection.
        """

        retries = 0
        while True:
            try:
                connection = BlockingConnection(URLParameters(url))
                return connection
            except ConnectionError as e:
                logger.info(f"Attempt {retries}: Could not connect to {url} - {e}")

                if max_retries is None or retries < max_retries:
                    time.sleep(retry_interval)
                    retries += 1
                else:
                    raise

    def close(self):
        """
        Close the broker connection.
        """

        if self.connection:
            self.connection.close()

    def subscribe(self, message: str, callback: callable):
        """
        Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus
        with name `message` if not registered yet. Declares an exclusive queue for this
        instance, binds it to the exchange and registers the callback function to the queue.

        Args:
            message (str):
                Message that is registered.
            callback (callable):
                Callback function.
        """

        assert message, "Message must be a non-zero length string"

        # Declare a queue and an exchange. Set the maximum priority to 10, because that is the
        # default for the Celery queue.
        queue = self.channel.queue_declare(
            queue=message, exclusive=True, arguments={"x-max-priority": 10}
        )
        self.channel.exchange_declare(message, exchange_type="fanout")
        queue_name = queue.method.queue

        # Bind the queue to the exchange
        self.channel.queue_bind(exchange=message, queue=queue_name)

        # Define a consumer that runs the callback function on each message in the queue. The
        # `auto_ack` is set to True, so the consumer does not wait for results of the callback.
        self.channel.basic_consume(
            queue=queue_name, on_message_callback=callback, auto_ack=True
        )

    def start_consuming(self):
        """
        After all messages are subscribed, the messagebus listeners can start consuming the
        messages in the queues.
        """

        self.channel.start_consuming()

    def send(self, message: str, body: dict, routing_key: str = "", priority: int = 0):
        """
        Send a message to the bus.

        Args:
            message (str):
                Message to send.
            body (dict):
                Payload to send to listeners.
            routing_key (str, optional):
                Routes the message on the bus.
            priority (int):
                Priority of message. The lowest priority is 0 and the highest is 10.
        """

        # Declare the exchange
        self.channel.exchange_declare(message, exchange_type="fanout")

        # Format the body dictionary into a json object, so it can be sent to the broker
        body = json.dumps(body).encode()

        # Send the data to the broker
        self.channel.basic_publish(
            exchange=message,
            body=body,
            routing_key=routing_key,
            properties=BasicProperties(priority=priority),
        )

Methods

def close(self)

Close the broker connection.

def connect(self, max_retries: int = None, retry_interval: int = 1.0)

Establish and configure the broker connection.

Args

max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again.

def send(self, message: str, body: dict, routing_key: str = '', priority: int = 0)

Send a message to the bus.

Args

message (str): Message to send. body (dict): Payload to send to listeners. routing_key (str, optional): Routes the message on the bus. priority (int): Priority of message. The lowest priority is 0 and the highest is 10.

def start_consuming(self)

After all messages are subscribed, the messagebus listeners can start consuming the messages in the queues.

def subscribe(self, message: str, callback: )

Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus with name message if not registered yet. Declares an exclusive queue for this instance, binds it to the exchange and registers the callback function to the queue.

Args

message (str): Message that is registered. callback (callable): Callback function.