Module rabotnik.rabotnik
Functions
def check_databases(sender=None, conf=None, instance=None, body=None, **kwargs)
-
The
check_databases()
function is executed at the start of each task through the functionon_connect()
. All databases are fetched from the database, and it is checked whether each database that exists in the database already exists, does not exist yet or is deleted in the meantime. If needed, the database is registered or unregistered accordingly. The method arguments come from the@task_prerun.connect
decorator, however they are not used. def check_rules(sender=None, conf=None, instance=None, body=None, **kwargs)
-
The
check_rules()
function is executed at the start of each task through the functionon_connect()
. All rules are fetched from the database, and it is checked whether each rule that exists in the database already exists in the set of rules, does not exist yet or is deleted in the meantime. If needed, the rule is registered or unregistered accordingly. The method arguments come from the@task_prerun.connect
decorator, however they are not used. def on_connect(sender=None, conf=None, instance=None, body=None, **kwargs)
-
First task performed upon connection, runs both
check_databases()
andcheck_rules()
.If the time since the last check is less than 60 seconds, the function will not be executed to reduce the number of accesses to the database. The method arguments come from the
@task_prerun.connect
decorator, however they are not used. def process(key)
-
The method first runs all the
select
rules to get the input parameters of the object identified by the key, then runs all theprocess()
rules to get the output parameters and lastly runs theupsert
rules, to update or insert the output values in the database(s).Args
key (Union[str,int]): Identifier used to process all tasks. This could be an OSM ID, a Quadkey or anything else.
def register_all(subtasks: list)
def register_database(db_name=None, db_config=None)
-
Args
db_name (str, optional, default: None): Name of the database that is referenced in the
db_name
attribute of the rules. db_config (dict, optional, default: None): Dictionary containing the connection credentials for a database:host
: Host name of the server.dbname
: Name of the database.port
: Port to access the server.username
: Name of the database user.password
: Password of the database user.timeout
: Database timeout (set to 0 for no timeout).itersize
: Number of rows fetched from the server at one call. def register_rule(rule_definition, file_type='zip', db_name=None, update_database=True, not_yet_registered_rules: dict = None)
-
Register a rule based on a rule definition in one of the three RuleHandlers
select_rules
,process_rules
orupsert_rules
.Args
rule_definition (str): If the
file_type
iszip
, the rule_definition is a binary object that contains a ZIP directory with the definition of the rule. If thefile_type
isxml
, it is an XML-string with the definition of the rule. file_type (str, optional, default:zip
): The type of the rule definition file. Can be one ofzip
andxml
. db_name (str, optional, default: None): The name of the database to be registered. update_database (bool, optional, default: True): If true, the rule is also added to the rule database. def unregister_rule(rule_name, update_database=True)
-
Unregister a rule in one of the three RuleHandlers
select_rules
,process_rules
orupsert_rules
.Args
rule_name (str): Name of the rule that should be unregistered. update_database (bool, optional, default: True): If true, the rule is also removed from the rule database.
Classes
class Rabotnik (name)
-
The
Rabotnik
represents the central hub and entry point. It initializes the essential components.Args
name (str): Name of the Rabotnik instance. This also determines the Queue it sends tasks or listens to.
Expand source code
class Rabotnik(Task): """ The `Rabotnik` represents the central hub and entry point. It initializes the essential components. Args: name (str): Name of the Rabotnik instance. This also determines the Queue it sends tasks or listens to. """ def __init__(self, name): # Set processor to be used. This is based on Celery. Processor(name) self.name = name self.processor = Processor.get_celery_app() self.processor.conf.task_default_queue = f"{name}-queue" def run(self, channel, method, properties, body): """ Implement the Celery Task run function. This function receives tasks from a Rabotnik messagebus and distributes them to the Celery workers. Args: channel (pika.adapters.blocking_connection.BlockingConnection): Messagebus channel that is used to consume with. method (pika.amqp_object.Method): Metadata about the broker function call. properties (pika.amqp_object.Properties): Additional properties defined by RabbitMQ, such as the priority of the message. body (bytes): A dictionary with the information of the task. The name of the task can be found in the `task` key and its parameters can be found in the `arguments` key. """ # Log values in debug mode. logger.debug(f"{channel} {method} {properties}") logger.debug(f"message '{body}' received") # Create dictionary from body. body = json.loads(body) # Check if the task is valid and set task name. task = body["task"] if task not in [ "process", "register_all", "register_rule", "unregister_rule", "register_database", ]: raise ValueError(f"Task {task} is not a valid task") task_name = f"rabotnik.rabotnik.{task}" # Send task to the Celery instance with arguments defined in the message body. self.app.send_task(task_name, kwargs=body["arguments"]) def subscribe_to_message_bus(self, messagebus): """ Connect the `run` function to the Rabotnik messagebus. Args: messagebus (MessageBus): Messagebus instance that the Rabotnik listener should connect to. """ # Connect to the messagebus. messagebus.connect() # Subscribe the `run` function to the Rabotnik queue with the same name as the Rabotnik # instance. messagebus.subscribe(self.name, self.run) try: # Consume the messages sent to the messagebus. messagebus.start_consuming() except KeyboardInterrupt: logger.info("Stopping Rabotnik...") # Close the connection to the messagebus. messagebus.close() sys.exit(0)
Ancestors
- celery.app.task.Task
Methods
def run(self, channel, method, properties, body)
-
Implement the Celery Task run function. This function receives tasks from a Rabotnik messagebus and distributes them to the Celery workers.
Args
channel (pika.adapters.blocking_connection.BlockingConnection): Messagebus channel that is used to consume with. method (pika.amqp_object.Method): Metadata about the broker function call. properties (pika.amqp_object.Properties): Additional properties defined by RabbitMQ, such as the priority of the message. body (bytes): A dictionary with the information of the task. The name of the task can be found in the
task
key and its parameters can be found in thearguments
key. def subscribe_to_message_bus(self, messagebus)
-
Connect the
run
function to the Rabotnik messagebus.Args
messagebus (MessageBus): Messagebus instance that the Rabotnik listener should connect to.