Package rabotnik

Rabotnik

Message Bus

Rabotnik's rabotnik.bus.MessageBus implements an AMQP publisher-subscriber model to enable communication between instances leveraging a rabbitmq broker. The implementation employs asyncio for asynchronous execution to avoid blocking communication with the message broker. Environment variables RABOTNIK_MESSAGE_BUS_USER, RABOTNIK_MESSAGE_BUS_PASSWORD and RABOTNIK_MESSAGE_BUS_HOST need to be defined to establish a connection.

This example registers a method my_callback to receive banana messages on a rabotnik.bus.MessageBus instance:

import asyncio
from rabotnik.bus import MessageBus


def my_callback(payload):
    # This method will be called after the message <code>banana</code> was received
    print(payload)


async def start_listening():
    message_bus = MessageBus()

    message = "banana"
    await message_bus.subscribe(message=message, callback=my_callback)

    await asyncio.sleep(1000)

asyncio.run(start_listening())

Another instance can now publish a message to trigger my_callback by sending the message banana together with a dict payload:

import asyncio
from rabotnik.bus import MessageBus


async def send_message():
    message_bus = MessageBus()

    message = "banana"
    await message_bus.send(message=message, payload={'color': 'yellow'})


asyncio.run(send_message())

Sub-modules

rabotnik.configure
rabotnik.exceptions
rabotnik.messagebus
rabotnik.processor
rabotnik.rabotnik
rabotnik.rabotnikinstance

Classes

class MessageBus

Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem.

Implemented as publisher-subscriber model based on RabbitMQ (AMQP).

Expand source code
class MessageBus:
    """
    Messagebus for communication between loosely coupled Rabotnik instances and other
    systems, which are part of such an ecosystem.

    Implemented as publisher-subscriber model based on RabbitMQ (AMQP).
    """

    def __init__(self):
        self.channel = None
        self.connection = None

    def connect(self, max_retries: int = None, retry_interval: int = 1.0):
        """
        Establish and configure the broker connection.

        Args:
            max_retries (int, default: None):
                Optional number of times trying to connect to the broker. If not
                given, Rabotnik keeps trying forever to establish a connection.
            retry_interval (int, default: 1.0):
                Time in seconds that the program should pause before trying to connect again.
        """

        host = read_environment_variable("RABOTNIK_MESSAGE_BUS_HOST")
        username = read_environment_variable("RABOTNIK_MESSAGE_BUS_USER")
        password = read_environment_variable("RABOTNIK_MESSAGE_BUS_PASSWORD")

        url_connect = f"amqp://{username}:{password}@{host}/"
        logger.debug("connecting to %s", url_connect)

        self.connection = self._connect(url_connect, max_retries, retry_interval)
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=1)

    @staticmethod
    def _connect(
        url: str,
        max_retries: int = None,
        retry_interval: int = 1.0,
    ):
        """
        Establish connection to a broker with possible retries over a period of time.

        Args:
            url (str):
                URL of broker to connect to.
            max_retries (int, default: None):
                Optional number of times trying to connect to the broker. If not
                given, Rabotnik keeps trying forever to establish a connection.
            retry_interval (int, default: 1.0):
                Time in seconds that the program should pause before trying to connect again.

        Returns:
            Broker connection.
        """

        retries = 0
        while True:
            try:
                connection = BlockingConnection(URLParameters(url))
                return connection
            except ConnectionError as e:
                logger.info(f"Attempt {retries}: Could not connect to {url} - {e}")

                if max_retries is None or retries < max_retries:
                    time.sleep(retry_interval)
                    retries += 1
                else:
                    raise

    def close(self):
        """
        Close the broker connection.
        """

        if self.connection:
            self.connection.close()

    def subscribe(self, message: str, callback: callable):
        """
        Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus
        with name `message` if not registered yet. Declares an exclusive queue for this
        instance, binds it to the exchange and registers the callback function to the queue.

        Args:
            message (str):
                Message that is registered.
            callback (callable):
                Callback function.
        """

        assert message, "Message must be a non-zero length string"

        # Declare a queue and an exchange. Set the maximum priority to 10, because that is the
        # default for the Celery queue.
        queue = self.channel.queue_declare(
            queue=message, exclusive=True, arguments={"x-max-priority": 10}
        )
        self.channel.exchange_declare(message, exchange_type="fanout")
        queue_name = queue.method.queue

        # Bind the queue to the exchange
        self.channel.queue_bind(exchange=message, queue=queue_name)

        # Define a consumer that runs the callback function on each message in the queue. The
        # `auto_ack` is set to True, so the consumer does not wait for results of the callback.
        self.channel.basic_consume(
            queue=queue_name, on_message_callback=callback, auto_ack=True
        )

    def start_consuming(self):
        """
        After all messages are subscribed, the messagebus listeners can start consuming the
        messages in the queues.
        """

        self.channel.start_consuming()

    def send(self, message: str, body: dict, routing_key: str = "", priority: int = 0):
        """
        Send a message to the bus.

        Args:
            message (str):
                Message to send.
            body (dict):
                Payload to send to listeners.
            routing_key (str, optional):
                Routes the message on the bus.
            priority (int):
                Priority of message. The lowest priority is 0 and the highest is 10.
        """

        # Declare the exchange
        self.channel.exchange_declare(message, exchange_type="fanout")

        # Format the body dictionary into a json object, so it can be sent to the broker
        body = json.dumps(body).encode()

        # Send the data to the broker
        self.channel.basic_publish(
            exchange=message,
            body=body,
            routing_key=routing_key,
            properties=BasicProperties(priority=priority),
        )

Methods

def close(self)

Close the broker connection.

def connect(self, max_retries: int = None, retry_interval: int = 1.0)

Establish and configure the broker connection.

Args

max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again.

def send(self, message: str, body: dict, routing_key: str = '', priority: int = 0)

Send a message to the bus.

Args

message (str): Message to send. body (dict): Payload to send to listeners. routing_key (str, optional): Routes the message on the bus. priority (int): Priority of message. The lowest priority is 0 and the highest is 10.

def start_consuming(self)

After all messages are subscribed, the messagebus listeners can start consuming the messages in the queues.

def subscribe(self, message: str, callback: )

Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus with name message if not registered yet. Declares an exclusive queue for this instance, binds it to the exchange and registers the callback function to the queue.

Args

message (str): Message that is registered. callback (callable): Callback function.

class Processor

The Processor is the Celery app used for distributing processes in Rabotnik. It is once globally instantiated by using the Processor function. Afterward, the processor can be retrieved using Processor.get_celery_app().

Args

name (str): Name of the Celery app.

Attributes

_processor (Celery): Celery app used for distributing processes in Rabotnik.

Expand source code
class Processor:
    """
    The Processor is the Celery app used for distributing processes in Rabotnik. It is once
    globally instantiated by using the `Processor()` function. Afterward, the processor can
    be retrieved using `Processor.get_celery_app()`.

    Args:
        name (str):
            Name of the Celery app.

    Attributes:
        _processor (Celery):
            Celery app used for distributing processes in Rabotnik.
    """

    _processor = None

    @classmethod
    def __init__(cls, name):
        if cls._processor is not None:
            raise Exception("Use Processor.get_celery_app")

        cls._processor: Celery = Celery(
            name,
            result_expires=3600,
            broker=Processor._get_broker_url(),
            backend=os.environ.get("RABOTNIK_CELERY_BACKEND_RESULTS", "rpc://"),
            ignore_result=True,  # for debugging
            task_queue_max_priority=10,
            task_queue_default_prority=1,
        )
        cls._processor.conf.name = name

    @classmethod
    def get_celery_app(cls):
        """
        Get the current running Celery app.

        Returns:
            Celery app.
        """
        return cls._processor

    @staticmethod
    def _get_broker_url():
        """
        Reads the Rabotnik Celery environment variables and creates the Celery broker URL
        from the variables.

        Returns:
            Broker url.
        """

        # Read environment variables
        username = read_environment_variable("RABOTNIK_CELERY_BROKER_USER")
        password = read_environment_variable("RABOTNIK_CELERY_BROKER_PASSWORD")
        url = read_environment_variable("RABOTNIK_CELERY_BROKER_HOST")

        credentials = f"{username}:{password}"
        url = f"pyamqp://{credentials}@{url}"
        logger.debug("broker_url: %s", url)

        return url

Static methods

def get_celery_app()

Get the current running Celery app.

Returns

Celery app.

class Rabotnik (name)

The Rabotnik represents the central hub and entry point. It initializes the essential components.

Args

name (str): Name of the Rabotnik instance. This also determines the Queue it sends tasks or listens to.

Expand source code
class Rabotnik(Task):
    """
    The `Rabotnik` represents the central hub and entry point. It initializes the essential
    components.

    Args:
        name (str):
            Name of the Rabotnik instance. This also determines the Queue it sends tasks or
            listens to.
    """

    def __init__(self, name):
        # Set processor to be used. This is based on Celery.
        Processor(name)
        self.name = name
        self.processor = Processor.get_celery_app()
        self.processor.conf.task_default_queue = f"{name}-queue"

    def run(self, channel, method, properties, body):
        """
        Implement the Celery Task run function. This function receives tasks from a Rabotnik
        messagebus and distributes them to the Celery workers.

        Args:
            channel (pika.adapters.blocking_connection.BlockingConnection):
                Messagebus channel that is used to consume with.
            method (pika.amqp_object.Method):
                Metadata about the broker function call.
            properties (pika.amqp_object.Properties):
                Additional properties defined by RabbitMQ, such as the priority of the message.
            body (bytes):
                A dictionary with the information of the task. The name of the task can be found
                in the `task` key and its parameters can be found in the `arguments` key.
        """

        # Log values in debug mode.
        logger.debug(f"{channel} {method} {properties}")
        logger.debug(f"message '{body}' received")

        # Create dictionary from body.
        body = json.loads(body)

        # Check if the task is valid and set task name.
        task = body["task"]
        if task not in [
            "process",
            "register_all",
            "register_rule",
            "unregister_rule",
            "register_database",
        ]:
            raise ValueError(f"Task {task} is not a valid task")
        task_name = f"rabotnik.rabotnik.{task}"

        # Send task to the Celery instance with arguments defined in the message body.
        self.app.send_task(task_name, kwargs=body["arguments"])

    def subscribe_to_message_bus(self, messagebus):
        """
        Connect the `run` function to the Rabotnik messagebus.

        Args:
            messagebus (MessageBus):
                Messagebus instance that the Rabotnik listener should connect to.
        """

        # Connect to the messagebus.
        messagebus.connect()

        # Subscribe the `run` function to the Rabotnik queue with the same name as the Rabotnik
        # instance.
        messagebus.subscribe(self.name, self.run)

        try:
            # Consume the messages sent to the messagebus.
            messagebus.start_consuming()
        except KeyboardInterrupt:
            logger.info("Stopping Rabotnik...")

            # Close the connection to the messagebus.
            messagebus.close()
            sys.exit(0)

Ancestors

  • celery.app.task.Task

Methods

def run(self, channel, method, properties, body)

Implement the Celery Task run function. This function receives tasks from a Rabotnik messagebus and distributes them to the Celery workers.

Args

channel (pika.adapters.blocking_connection.BlockingConnection): Messagebus channel that is used to consume with. method (pika.amqp_object.Method): Metadata about the broker function call. properties (pika.amqp_object.Properties): Additional properties defined by RabbitMQ, such as the priority of the message. body (bytes): A dictionary with the information of the task. The name of the task can be found in the task key and its parameters can be found in the arguments key.

def subscribe_to_message_bus(self, messagebus)

Connect the run function to the Rabotnik messagebus.

Args

messagebus (MessageBus): Messagebus instance that the Rabotnik listener should connect to.