Module rabotnik.messagebus


class MessageBus

Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem.

Implemented as publisher-subscriber model based on RabbitMQ (AMQP).

class MessageBus:
    Messagebus for communication between loosely coupled Rabotnik instances and other
    systems, which are part of such an ecosystem.

    Implemented as publisher-subscriber model based on RabbitMQ (AMQP).

    def __init__(self): = None
        self.connection = None

    def connect(self, max_retries: int = None, retry_interval: int = 1.0):
        Establish and configure the broker connection.

            max_retries (int, default: None):
                Optional number of times trying to connect to the broker. If not
                given, Rabotnik keeps trying forever to establish a connection.
            retry_interval (int, default: 1.0):
                Time in seconds that the program should pause before trying to connect again.

        host = read_environment_variable("RABOTNIK_MESSAGE_BUS_HOST")
        username = read_environment_variable("RABOTNIK_MESSAGE_BUS_USER")
        password = read_environment_variable("RABOTNIK_MESSAGE_BUS_PASSWORD")

        url_connect = f"amqp://{username}:{password}@{host}/"
        logger.debug("connecting to %s", url_connect)

        self.connection = self._connect(url_connect, max_retries, retry_interval) =

    def _connect(
        url: str,
        max_retries: int = None,
        retry_interval: int = 1.0,
        Establish connection to a broker with possible retries over a period of time.

            url (str):
                URL of broker to connect to.
            max_retries (int, default: None):
                Optional number of times trying to connect to the broker. If not
                given, Rabotnik keeps trying forever to establish a connection.
            retry_interval (int, default: 1.0):
                Time in seconds that the program should pause before trying to connect again.

            Broker connection.

        retries = 0
        while True:
                connection = BlockingConnection(URLParameters(url))
                return connection
            except ConnectionError as e:
      "Attempt {retries}: Could not connect to {url} - {e}")

                if max_retries is None or retries < max_retries:
                    retries += 1

    def close(self):
        Close the broker connection.

        if self.connection:

    def subscribe(self, message: str, callback: callable):
        Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus
        with name `message` if not registered yet. Declares an exclusive queue for this
        instance, binds it to the exchange and registers the callback function to the queue.

            message (str):
                Message that is registered.
            callback (callable):
                Callback function.

        assert message, "Message must be a non-zero length string"

        # Declare a queue and an exchange. Set the maximum priority to 10, because that is the
        # default for the Celery queue.
        queue =
            queue=message, exclusive=True, arguments={"x-max-priority": 10}
        ), exchange_type="fanout")
        queue_name = queue.method.queue

        # Bind the queue to the exchange, queue=queue_name)

        # Define a consumer that runs the callback function on each message in the queue. The
        # `auto_ack` is set to True, so the consumer does not wait for results of the callback.
            queue=queue_name, on_message_callback=callback, auto_ack=True

    def start_consuming(self):
        After all messages are subscribed, the messagebus listeners can start consuming the
        messages in the queues.

    def send(self, message: str, body: dict, routing_key: str = "", priority: int = 0):
        Send a message to the bus.

            message (str):
                Message to send.
            body (dict):
                Payload to send to listeners.
            routing_key (str, optional):
                Routes the message on the bus.
            priority (int):
                Priority of message. The lowest priority is 0 and the highest is 10.

        # Declare the exchange, exchange_type="fanout")

        # Format the body dictionary into a json object, so it can be sent to the broker
        body = json.dumps(body).encode()

        # Send the data to the broker


