Module rabotnik.rabotnik

Functions

def check_databases(sender=None, conf=None, instance=None, body=None, **kwargs)

The check_databases() function is executed at the start of each task through the function on_connect(). All databases are fetched from the database, and it is checked whether each database that exists in the database already exists, does not exist yet or is deleted in the meantime. If needed, the database is registered or unregistered accordingly. The method arguments come from the @task_prerun.connect decorator, however they are not used.

def check_rules(sender=None, conf=None, instance=None, body=None, **kwargs)

The check_rules() function is executed at the start of each task through the function on_connect(). All rules are fetched from the database, and it is checked whether each rule that exists in the database already exists in the set of rules, does not exist yet or is deleted in the meantime. If needed, the rule is registered or unregistered accordingly. The method arguments come from the @task_prerun.connect decorator, however they are not used.

def on_connect(sender=None, conf=None, instance=None, body=None, **kwargs)

First task performed upon connection, runs both check_databases() and check_rules().

If the time since the last check is less than 60 seconds, the function will not be executed to reduce the number of accesses to the database. The method arguments come from the @task_prerun.connect decorator, however they are not used.

def process(key)

The method first runs all the select rules to get the input parameters of the object identified by the key, then runs all the process() rules to get the output parameters and lastly runs the upsert rules, to update or insert the output values in the database(s).

Args

key (Union[str,int]): Identifier used to process all tasks. This could be an OSM ID, a Quadkey or anything else.

def register_all(subtasks: list)
def register_database(db_name=None, db_config=None)

Args

db_name (str, optional, default: None): Name of the database that is referenced in the db_name attribute of the rules. db_config (dict, optional, default: None): Dictionary containing the connection credentials for a database: host : Host name of the server. dbname : Name of the database. port : Port to access the server. username: Name of the database user. password: Password of the database user. timeout : Database timeout (set to 0 for no timeout). itersize: Number of rows fetched from the server at one call.

def register_rule(rule_definition, file_type='zip', db_name=None, update_database=True, not_yet_registered_rules: dict = None)

Register a rule based on a rule definition in one of the three RuleHandlers select_rules, process_rules or upsert_rules.

Args

rule_definition (str): If the file_type is zip, the rule_definition is a binary object that contains a ZIP directory with the definition of the rule. If the file_type is xml, it is an XML-string with the definition of the rule. file_type (str, optional, default: zip): The type of the rule definition file. Can be one of zip and xml. db_name (str, optional, default: None): The name of the database to be registered. update_database (bool, optional, default: True): If true, the rule is also added to the rule database.

def unregister_rule(rule_name, update_database=True)

Unregister a rule in one of the three RuleHandlers select_rules, process_rules or upsert_rules.

Args

rule_name (str): Name of the rule that should be unregistered. update_database (bool, optional, default: True): If true, the rule is also removed from the rule database.

Classes

class Rabotnik (name)

The Rabotnik represents the central hub and entry point. It initializes the essential components.

Args

name (str): Name of the Rabotnik instance. This also determines the Queue it sends tasks or listens to.

Expand source code
class Rabotnik(Task):
    """
    The `Rabotnik` represents the central hub and entry point. It initializes the essential
    components.

    Args:
        name (str):
            Name of the Rabotnik instance. This also determines the Queue it sends tasks or
            listens to.
    """

    def __init__(self, name):
        # Set processor to be used. This is based on Celery.
        Processor(name)
        self.name = name
        self.processor = Processor.get_celery_app()
        self.processor.conf.task_default_queue = f"{name}-queue"

    def run(self, channel, method, properties, body):
        """
        Implement the Celery Task run function. This function receives tasks from a Rabotnik
        messagebus and distributes them to the Celery workers.

        Args:
            channel (pika.adapters.blocking_connection.BlockingConnection):
                Messagebus channel that is used to consume with.
            method (pika.amqp_object.Method):
                Metadata about the broker function call.
            properties (pika.amqp_object.Properties):
                Additional properties defined by RabbitMQ, such as the priority of the message.
            body (bytes):
                A dictionary with the information of the task. The name of the task can be found
                in the `task` key and its parameters can be found in the `arguments` key.
        """

        # Log values in debug mode.
        logger.debug(f"{channel} {method} {properties}")
        logger.debug(f"message '{body}' received")

        # Create dictionary from body.
        body = json.loads(body)

        # Check if the task is valid and set task name.
        task = body["task"]
        if task not in [
            "process",
            "register_all",
            "register_rule",
            "unregister_rule",
            "register_database",
        ]:
            raise ValueError(f"Task {task} is not a valid task")
        task_name = f"rabotnik.rabotnik.{task}"

        # Send task to the Celery instance with arguments defined in the message body.
        self.app.send_task(task_name, kwargs=body["arguments"])

    def subscribe_to_message_bus(self, messagebus):
        """
        Connect the `run` function to the Rabotnik messagebus.

        Args:
            messagebus (MessageBus):
                Messagebus instance that the Rabotnik listener should connect to.
        """

        # Connect to the messagebus.
        messagebus.connect()

        # Subscribe the `run` function to the Rabotnik queue with the same name as the Rabotnik
        # instance.
        messagebus.subscribe(self.name, self.run)

        try:
            # Consume the messages sent to the messagebus.
            messagebus.start_consuming()
        except KeyboardInterrupt:
            logger.info("Stopping Rabotnik...")

            # Close the connection to the messagebus.
            messagebus.close()
            sys.exit(0)

Ancestors

  • celery.app.task.Task

Methods

def run(self, channel, method, properties, body)

Implement the Celery Task run function. This function receives tasks from a Rabotnik messagebus and distributes them to the Celery workers.

Args

channel (pika.adapters.blocking_connection.BlockingConnection): Messagebus channel that is used to consume with. method (pika.amqp_object.Method): Metadata about the broker function call. properties (pika.amqp_object.Properties): Additional properties defined by RabbitMQ, such as the priority of the message. body (bytes): A dictionary with the information of the task. The name of the task can be found in the task key and its parameters can be found in the arguments key.

def subscribe_to_message_bus(self, messagebus)

Connect the run function to the Rabotnik messagebus.

Args

messagebus (MessageBus): Messagebus instance that the Rabotnik listener should connect to.