Module rabotnik.processor

Classes

class Processor

The Processor is the Celery app used for distributing processes in Rabotnik. It is once globally instantiated by using the Processor function. Afterward, the processor can be retrieved using Processor.get_celery_app().

Args

name (str): Name of the Celery app.

Attributes

_processor (Celery): Celery app used for distributing processes in Rabotnik.

Expand source code
class Processor:
    """
    The Processor is the Celery app used for distributing processes in Rabotnik. It is once
    globally instantiated by using the `Processor()` function. Afterward, the processor can
    be retrieved using `Processor.get_celery_app()`.

    Args:
        name (str):
            Name of the Celery app.

    Attributes:
        _processor (Celery):
            Celery app used for distributing processes in Rabotnik.
    """

    _processor = None

    @classmethod
    def __init__(cls, name):
        if cls._processor is not None:
            raise Exception("Use Processor.get_celery_app")

        cls._processor: Celery = Celery(
            name,
            result_expires=3600,
            broker=Processor._get_broker_url(),
            backend=os.environ.get("RABOTNIK_CELERY_BACKEND_RESULTS", "rpc://"),
            ignore_result=True,  # for debugging
            task_queue_max_priority=10,
            task_queue_default_prority=1,
        )
        cls._processor.conf.name = name

    @classmethod
    def get_celery_app(cls):
        """
        Get the current running Celery app.

        Returns:
            Celery app.
        """
        return cls._processor

    @staticmethod
    def _get_broker_url():
        """
        Reads the Rabotnik Celery environment variables and creates the Celery broker URL
        from the variables.

        Returns:
            Broker url.
        """

        # Read environment variables
        username = read_environment_variable("RABOTNIK_CELERY_BROKER_USER")
        password = read_environment_variable("RABOTNIK_CELERY_BROKER_PASSWORD")
        url = read_environment_variable("RABOTNIK_CELERY_BROKER_HOST")

        credentials = f"{username}:{password}"
        url = f"pyamqp://{credentials}@{url}"
        logger.debug("broker_url: %s", url)

        return url

Static methods

def get_celery_app()

Get the current running Celery app.

Returns

Celery app.