Module rabotnik.messagebus
Classes
class MessageBus
-
Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem.
Implemented as publisher-subscriber model based on RabbitMQ (AMQP).
Expand source code
class MessageBus: """ Messagebus for communication between loosely coupled Rabotnik instances and other systems, which are part of such an ecosystem. Implemented as publisher-subscriber model based on RabbitMQ (AMQP). """ def __init__(self): self.channel = None self.connection = None def connect(self, max_retries: int = None, retry_interval: int = 1.0): """ Establish and configure the broker connection. Args: max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again. """ host = read_environment_variable("RABOTNIK_MESSAGE_BUS_HOST") username = read_environment_variable("RABOTNIK_MESSAGE_BUS_USER") password = read_environment_variable("RABOTNIK_MESSAGE_BUS_PASSWORD") url_connect = f"amqp://{username}:{password}@{host}/" logger.debug("connecting to %s", url_connect) self.connection = self._connect(url_connect, max_retries, retry_interval) self.channel = self.connection.channel() self.channel.basic_qos(prefetch_count=1) @staticmethod def _connect( url: str, max_retries: int = None, retry_interval: int = 1.0, ): """ Establish connection to a broker with possible retries over a period of time. Args: url (str): URL of broker to connect to. max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again. Returns: Broker connection. """ retries = 0 while True: try: connection = BlockingConnection(URLParameters(url)) return connection except ConnectionError as e: logger.info(f"Attempt {retries}: Could not connect to {url} - {e}") if max_retries is None or retries < max_retries: time.sleep(retry_interval) retries += 1 else: raise def close(self): """ Close the broker connection. """ if self.connection: self.connection.close() def subscribe(self, message: str, callback: callable): """ Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus with name `message` if not registered yet. Declares an exclusive queue for this instance, binds it to the exchange and registers the callback function to the queue. Args: message (str): Message that is registered. callback (callable): Callback function. """ assert message, "Message must be a non-zero length string" # Declare a queue and an exchange. Set the maximum priority to 10, because that is the # default for the Celery queue. queue = self.channel.queue_declare( queue=message, exclusive=True, arguments={"x-max-priority": 10} ) self.channel.exchange_declare(message, exchange_type="fanout") queue_name = queue.method.queue # Bind the queue to the exchange self.channel.queue_bind(exchange=message, queue=queue_name) # Define a consumer that runs the callback function on each message in the queue. The # `auto_ack` is set to True, so the consumer does not wait for results of the callback. self.channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True ) def start_consuming(self): """ After all messages are subscribed, the messagebus listeners can start consuming the messages in the queues. """ self.channel.start_consuming() def send(self, message: str, body: dict, routing_key: str = "", priority: int = 0): """ Send a message to the bus. Args: message (str): Message to send. body (dict): Payload to send to listeners. routing_key (str, optional): Routes the message on the bus. priority (int): Priority of message. The lowest priority is 0 and the highest is 10. """ # Declare the exchange self.channel.exchange_declare(message, exchange_type="fanout") # Format the body dictionary into a json object, so it can be sent to the broker body = json.dumps(body).encode() # Send the data to the broker self.channel.basic_publish( exchange=message, body=body, routing_key=routing_key, properties=BasicProperties(priority=priority), )
Methods
def close(self)
-
Close the broker connection.
def connect(self, max_retries: int = None, retry_interval: int = 1.0)
-
Establish and configure the broker connection.
Args
max_retries (int, default: None): Optional number of times trying to connect to the broker. If not given, Rabotnik keeps trying forever to establish a connection. retry_interval (int, default: 1.0): Time in seconds that the program should pause before trying to connect again.
def send(self, message: str, body: dict, routing_key: str = '', priority: int = 0)
-
Send a message to the bus.
Args
message (str): Message to send. body (dict): Payload to send to listeners. routing_key (str, optional): Routes the message on the bus. priority (int): Priority of message. The lowest priority is 0 and the highest is 10.
def start_consuming(self)
-
After all messages are subscribed, the messagebus listeners can start consuming the messages in the queues.
def subscribe(self, message: str, callback:
) -
Subscribe a listener to the messagebus. Declares a new fanout exchange on the messagebus with name
message
if not registered yet. Declares an exclusive queue for this instance, binds it to the exchange and registers the callback function to the queue.Args
message (str): Message that is registered. callback (callable): Callback function.