Package rabotnik
Rabotnik
Message Bus
Rabotnik's rabotnik.bus.MessageBus
implements an AMQP
publisher-subscriber model to enable communication between instances leveraging a rabbitmq
broker.
The implementation employs asyncio
for asynchronous execution to avoid blocking communication with the message broker.
Environment variables RABOTNIK_MESSAGE_BUS_USER
, RABOTNIK_MESSAGE_BUS_PASSWORD
and RABOTNIK_MESSAGE_BUS_HOST
need to be defined to establish a connection.
This example registers a method my_callback
to receive banana
messages on a rabotnik.bus.MessageBus
instance:
import asyncio
from rabotnik.bus import MessageBus
def my_callback(payload):
# This method will be called after the message <code>banana</code> was received
print(payload)
async def start_listening():
message_bus = MessageBus()
message = "banana"
await message_bus.subscribe(message=message, callback=my_callback)
await asyncio.sleep(1000)
asyncio.run(start_listening())
Another instance can now publish a message to trigger my_callback
by sending the message banana
together with a dict payload:
import asyncio
from rabotnik.bus import MessageBus
async def send_message():
message_bus = MessageBus()
message = "banana"
await message_bus.send(message=message, payload={'color': 'yellow'})
asyncio.run(send_message())
Sub-modules
rabotnik.configure
rabotnik.exceptions
rabotnik.messagebus
rabotnik.processor
rabotnik.rabotnik
rabotnik.rabotnikinstance
Classes
class MessageBus
-
Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem.
Implemented as publisher-subscriber model based on RabbitMQ (AMQP).
Expand source code
class MessageBus: """ Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem. Implemented as publisher-subscriber model based on RabbitMQ (AMQP). """ def __init__(self): self.channel = None self.connection = None def connect(self, max_retries: int = None, retry_interval: int = 1.0): """ Establish and configure the broker connection. Args: max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again. """ host = read_environment_variable("RABOTNIK_MESSAGE_BUS_HOST") username = read_environment_variable("RABOTNIK_MESSAGE_BUS_USER") password = read_environment_variable("RABOTNIK_MESSAGE_BUS_PASSWORD") url_connect = f"amqp://{username}:{password}@{host}/" logger.debug("connecting to %s", url_connect) self.connection = self._connect(url_connect, max_retries, retry_interval) self.channel = self.connection.channel() self.channel.basic_qos(prefetch_count=1) @staticmethod def _connect( url: str, max_retries: int = None, retry_interval: int = 1.0, ): """ Establish connection to a broker with possible retries over a period of time. Args: url (str): URL of broker to connect to. max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again. Returns: Broker connection. """ retries = 0 while True: try: connection = BlockingConnection(URLParameters(url)) return connection except ConnectionError as e: logger.info(f"Attempt {retries}: Could not connect to {url} - {e}") if max_retries is None or retries < max_retries: time.sleep(retry_interval) retries += 1 else: raise def close(self): """ Close the broker connection. """ if self.connection: self.connection.close() def subscribe(self, message: str, callback: callable): """ Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus with name `message` if not registered yet. Declares an exclusive queue for this instance, binds it to the exchange and registers the callback function to the queue. Args: message (str): Message that is registered. callback (callable): Callback function. """ assert message, "Message must be a non-zero length string" # Declare a queue and an exchange. Set the maximum priority to 10, because that is the # default for the Celery queue. queue = self.channel.queue_declare( queue=message, exclusive=True, arguments={"x-max-priority": 10} ) self.channel.exchange_declare(message, exchange_type="fanout") queue_name = queue.method.queue # Bind the queue to the exchange self.channel.queue_bind(exchange=message, queue=queue_name) # Define a consumer that runs the callback function on each message in the queue. The # `auto_ack` is set to True, so the consumer does not wait for results of the callback. self.channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True ) def start_consuming(self): """ After all messages are subscribed, the messagebus listeners can start consuming the messages in the queues. """ self.channel.start_consuming() def send(self, message: str, body: dict, routing_key: str = "", priority: int = 0): """ Send a message to the bus. Args: message (str): Message to send. body (dict): Payload to send to listeners. routing_key (str, optional): Routes the message on the bus. priority (int): Priority of message. The lowest priority is 0 and the highest is 10. """ # Declare the exchange self.channel.exchange_declare(message, exchange_type="fanout") # Format the body dictionary into a json object, so it can be sent to the broker body = json.dumps(body).encode() # Send the data to the broker self.channel.basic_publish( exchange=message, body=body, routing_key=routing_key, properties=BasicProperties(priority=priority), )
Methods
def close(self)
-
Close the broker connection.
def connect(self, max_retries: int = None, retry_interval: int = 1.0)
-
Establish and configure the broker connection.
Args
max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again.
def send(self, message: str, body: dict, routing_key: str = '', priority: int = 0)
-
Send a message to the bus.
Args
message (str): Message to send. body (dict): Payload to send to listeners. routing_key (str, optional): Routes the message on the bus. priority (int): Priority of message. The lowest priority is 0 and the highest is 10.
def start_consuming(self)
-
After all messages are subscribed, the messagebus listeners can start consuming the messages in the queues.
def subscribe(self, message: str, callback:
) -
Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus with name
message
if not registered yet. Declares an exclusive queue for this instance, binds it to the exchange and registers the callback function to the queue.Args
message (str): Message that is registered. callback (callable): Callback function.
class Processor
-
The Processor is the Celery app used for distributing processes in Rabotnik. It is once globally instantiated by using the
Processor
function. Afterward, the processor can be retrieved usingProcessor.get_celery_app()
.Args
name (str): Name of the Celery app.
Attributes
_processor (Celery): Celery app used for distributing processes in Rabotnik.
Expand source code
class Processor: """ The Processor is the Celery app used for distributing processes in Rabotnik. It is once globally instantiated by using the `Processor()` function. Afterward, the processor can be retrieved using `Processor.get_celery_app()`. Args: name (str): Name of the Celery app. Attributes: _processor (Celery): Celery app used for distributing processes in Rabotnik. """ _processor = None @classmethod def __init__(cls, name): if cls._processor is not None: raise Exception("Use Processor.get_celery_app") cls._processor: Celery = Celery( name, result_expires=3600, broker=Processor._get_broker_url(), backend=os.environ.get("RABOTNIK_CELERY_BACKEND_RESULTS", "rpc://"), ignore_result=True, # for debugging task_queue_max_priority=10, task_queue_default_prority=1, ) cls._processor.conf.name = name @classmethod def get_celery_app(cls): """ Get the current running Celery app. Returns: Celery app. """ return cls._processor @staticmethod def _get_broker_url(): """ Reads the Rabotnik Celery environment variables and creates the Celery broker URL from the variables. Returns: Broker url. """ # Read environment variables username = read_environment_variable("RABOTNIK_CELERY_BROKER_USER") password = read_environment_variable("RABOTNIK_CELERY_BROKER_PASSWORD") url = read_environment_variable("RABOTNIK_CELERY_BROKER_HOST") credentials = f"{username}:{password}" url = f"pyamqp://{credentials}@{url}" logger.debug("broker_url: %s", url) return url
Static methods
def get_celery_app()
-
Get the current running Celery app.
Returns
Celery app.
class Rabotnik (name)
-
The
Rabotnik
represents the central hub and entry point. It initializes the essential components.Args
name (str): Name of the Rabotnik instance. This also determines the Queue it sends tasks or listens to.
Expand source code
class Rabotnik(Task): """ The `Rabotnik` represents the central hub and entry point. It initializes the essential components. Args: name (str): Name of the Rabotnik instance. This also determines the Queue it sends tasks or listens to. """ def __init__(self, name): # Set processor to be used. This is based on Celery. Processor(name) self.name = name self.processor = Processor.get_celery_app() self.processor.conf.task_default_queue = f"{name}-queue" def run(self, channel, method, properties, body): """ Implement the Celery Task run function. This function receives tasks from a Rabotnik messagebus and distributes them to the Celery workers. Args: channel (pika.adapters.blocking_connection.BlockingConnection): Messagebus channel that is used to consume with. method (pika.amqp_object.Method): Metadata about the broker function call. properties (pika.amqp_object.Properties): Additional properties defined by RabbitMQ, such as the priority of the message. body (bytes): A dictionary with the information of the task. The name of the task can be found in the `task` key and its parameters can be found in the `arguments` key. """ # Log values in debug mode. logger.debug(f"{channel} {method} {properties}") logger.debug(f"message '{body}' received") # Create dictionary from body. body = json.loads(body) # Check if the task is valid and set task name. task = body["task"] if task not in [ "process", "register_all", "register_rule", "unregister_rule", "register_database", ]: raise ValueError(f"Task {task} is not a valid task") task_name = f"rabotnik.rabotnik.{task}" # Send task to the Celery instance with arguments defined in the message body. self.app.send_task(task_name, kwargs=body["arguments"]) def subscribe_to_message_bus(self, messagebus): """ Connect the `run` function to the Rabotnik messagebus. Args: messagebus (MessageBus): Messagebus instance that the Rabotnik listener should connect to. """ # Connect to the messagebus. messagebus.connect() # Subscribe the `run` function to the Rabotnik queue with the same name as the Rabotnik # instance. messagebus.subscribe(self.name, self.run) try: # Consume the messages sent to the messagebus. messagebus.start_consuming() except KeyboardInterrupt: logger.info("Stopping Rabotnik...") # Close the connection to the messagebus. messagebus.close() sys.exit(0)
Ancestors
- celery.app.task.Task
Methods
def run(self, channel, method, properties, body)
-
Implement the Celery Task run function. This function receives tasks from a Rabotnik messagebus and distributes them to the Celery workers.
Args
channel (pika.adapters.blocking_connection.BlockingConnection): Messagebus channel that is used to consume with. method (pika.amqp_object.Method): Metadata about the broker function call. properties (pika.amqp_object.Properties): Additional properties defined by RabbitMQ, such as the priority of the message. body (bytes): A dictionary with the information of the task. The name of the task can be found in the
task
key and its parameters can be found in thearguments
key. def subscribe_to_message_bus(self, messagebus)
-
Connect the
run
function to the Rabotnik messagebus.Args
messagebus (MessageBus): Messagebus instance that the Rabotnik listener should connect to.