Source code for dagster_celery_docker.executor

import os

import docker.client
from dagster import (
    DagsterInstance,
    Executor,
    Field,
    MetadataEntry,
    Permissive,
    StringSource,
    _check as check,
    executor,
    multiple_process_executor_requirements,
)
from dagster._cli.api import ExecuteStepArgs
from dagster._core.events import EngineEventData
from dagster._core.events.utils import filter_dagster_events_from_cli_logs
from dagster._core.execution.retries import RetryMode
from dagster._core.storage.pipeline_run import DagsterRun
from dagster._serdes import pack_value, serialize_value, unpack_value
from dagster._utils.merger import merge_dicts
from dagster_celery.config import DEFAULT_CONFIG, dict_wrapper
from dagster_celery.core_execution_loop import DELEGATE_MARKER, core_celery_execution_loop
from dagster_celery.defaults import broker_url, result_backend
from dagster_celery.executor import CELERY_CONFIG

CELERY_DOCKER_CONFIG_KEY = "celery-docker"


def celery_docker_config():
    additional_config = {
        "docker": Field(
            {
                "image": Field(
                    StringSource,
                    is_required=False,
                    description="The docker image to be used for step execution.",
                ),
                "registry": Field(
                    {
                        "url": Field(StringSource),
                        "username": Field(StringSource),
                        "password": Field(StringSource),
                    },
                    is_required=False,
                    description="Information for using a non local/public docker registry",
                ),
                "env_vars": Field(
                    [str],
                    is_required=False,
                    description=(
                        "The list of environment variables names to forward from the celery worker"
                        " in to the docker container"
                    ),
                ),
                "network": Field(
                    str,
                    is_required=False,
                    description=(
                        "Name of the network this container will be connected to at creation time"
                    ),
                ),
                "container_kwargs": Field(
                    Permissive(),
                    is_required=False,
                    description="Additional keyword args for the docker container",
                ),
            },
            is_required=True,
            description="The configuration for interacting with docker in the celery worker.",
        ),
    }

    cfg = merge_dicts(CELERY_CONFIG, additional_config)
    return cfg


[docs]@executor( name=CELERY_DOCKER_CONFIG_KEY, config_schema=celery_docker_config(), requirements=multiple_process_executor_requirements(), ) def celery_docker_executor(init_context): """Celery-based executor which launches tasks in docker containers. The Celery executor exposes config settings for the underlying Celery app under the ``config_source`` key. This config corresponds to the "new lowercase settings" introduced in Celery version 4.0 and the object constructed from config will be passed to the :py:class:`celery.Celery` constructor as its ``config_source`` argument. (See https://docs.celeryq.dev/en/stable/userguide/configuration.html for details.) The executor also exposes the ``broker``, `backend`, and ``include`` arguments to the :py:class:`celery.Celery` constructor. In the most common case, you may want to modify the ``broker`` and ``backend`` (e.g., to use Redis instead of RabbitMQ). We expect that ``config_source`` will be less frequently modified, but that when op executions are especially fast or slow, or when there are different requirements around idempotence or retry, it may make sense to execute jobs with variations on these settings. To use the `celery_docker_executor`, set it as the `executor_def` when defining a job: .. code-block:: python from dagster import job from dagster_celery_docker.executor import celery_docker_executor @job(executor_def=celery_docker_executor) def celery_enabled_job(): pass Then you can configure the executor as follows: .. code-block:: YAML execution: config: docker: image: 'my_repo.com/image_name:latest' registry: url: 'my_repo.com' username: 'my_user' password: {env: 'DOCKER_PASSWORD'} env_vars: ["DAGSTER_HOME"] # environment vars to pass from celery worker to docker container_kwargs: # keyword args to be passed to the container. example: volumes: ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1'] broker: 'pyamqp://guest@localhost//' # Optional[str]: The URL of the Celery broker backend: 'rpc://' # Optional[str]: The URL of the Celery results backend include: ['my_module'] # Optional[List[str]]: Modules every worker should import config_source: # Dict[str, Any]: Any additional parameters to pass to the #... # Celery workers. This dict will be passed as the `config_source` #... # argument of celery.Celery(). Note that the YAML you provide here must align with the configuration with which the Celery workers on which you hope to run were started. If, for example, you point the executor at a different broker than the one your workers are listening to, the workers will never be able to pick up tasks for execution. In deployments where the celery_docker_job_executor is used all appropriate celery and dagster_celery commands must be invoked with the `-A dagster_celery_docker.app` argument. """ exc_cfg = init_context.executor_config return CeleryDockerExecutor( broker=exc_cfg.get("broker"), backend=exc_cfg.get("backend"), config_source=exc_cfg.get("config_source"), include=exc_cfg.get("include"), retries=RetryMode.from_config(exc_cfg.get("retries")), docker_config=exc_cfg.get("docker"), )
class CeleryDockerExecutor(Executor): def __init__( self, retries, docker_config, broker=None, backend=None, include=None, config_source=None, ): self._retries = check.inst_param(retries, "retries", RetryMode) self.broker = check.opt_str_param(broker, "broker", default=broker_url) self.backend = check.opt_str_param(backend, "backend", default=result_backend) self.include = check.opt_list_param(include, "include", of_type=str) self.config_source = dict_wrapper( dict(DEFAULT_CONFIG, **check.opt_dict_param(config_source, "config_source")) ) self.docker_config = check.dict_param(docker_config, "docker_config") @property def retries(self): return self._retries def execute(self, plan_context, execution_plan): return core_celery_execution_loop( plan_context, execution_plan, step_execution_fn=_submit_task_docker ) def app_args(self): return { "broker": self.broker, "backend": self.backend, "include": self.include, "config_source": self.config_source, "retries": self.retries, } def _submit_task_docker(app, plan_context, step, queue, priority, known_state): execute_step_args = ExecuteStepArgs( pipeline_origin=plan_context.reconstructable_pipeline.get_python_origin(), pipeline_run_id=plan_context.dagster_run.run_id, step_keys_to_execute=[step.key], instance_ref=plan_context.instance.get_ref(), retry_mode=plan_context.executor.retries.for_inner_plan(), known_state=known_state, ) task = create_docker_task(app) task_signature = task.si( execute_step_args_packed=pack_value(execute_step_args), docker_config=plan_context.executor.docker_config, ) return task_signature.apply_async( priority=priority, queue=queue, routing_key="{queue}.execute_step_docker".format(queue=queue), ) def create_docker_task(celery_app, **task_kwargs): @celery_app.task(bind=True, name="execute_step_docker", **task_kwargs) def _execute_step_docker( self, execute_step_args_packed, docker_config, ): """Run step execution in a Docker container.""" execute_step_args = unpack_value( check.dict_param( execute_step_args_packed, "execute_step_args_packed", ) ) check.inst_param(execute_step_args, "execute_step_args", ExecuteStepArgs) check.dict_param(docker_config, "docker_config") instance = DagsterInstance.from_ref(execute_step_args.instance_ref) pipeline_run = instance.get_run_by_id(execute_step_args.pipeline_run_id) check.inst( pipeline_run, DagsterRun, "Could not load run {}".format(execute_step_args.pipeline_run_id), ) step_keys_str = ", ".join(execute_step_args.step_keys_to_execute) docker_image = ( docker_config["image"] if docker_config.get("image") else pipeline_run.pipeline_code_origin.repository_origin.container_image ) if not docker_image: raise Exception("No docker image specified by either the job or the repository") client = docker.client.from_env() if docker_config.get("registry"): client.login( registry=docker_config["registry"]["url"], username=docker_config["registry"]["username"], password=docker_config["registry"]["password"], ) # Post event for starting execution engine_event = instance.report_engine_event( "Executing steps {} in Docker container {}".format(step_keys_str, docker_image), pipeline_run, EngineEventData( [ MetadataEntry("Step keys", value=step_keys_str), MetadataEntry("Image", value=docker_image), MetadataEntry("Celery worker", value=self.request.hostname), ], marker_end=DELEGATE_MARKER, ), CeleryDockerExecutor, step_key=execute_step_args.step_keys_to_execute[0], ) serialized_events = [serialize_value(engine_event)] docker_env = {} if docker_config.get("env_vars"): docker_env = {env_name: os.getenv(env_name) for env_name in docker_config["env_vars"]} container_kwargs = check.opt_dict_param( docker_config.get("container_kwargs"), "container_kwargs", key_type=str ) # set defaults for detach and auto_remove container_kwargs["detach"] = container_kwargs.get("detach", False) container_kwargs["auto_remove"] = container_kwargs.get("auto_remove", True) # if environment variables are provided via container_kwargs, merge with env_vars if container_kwargs.get("environment") is not None: e_vars = container_kwargs.get("environment") if isinstance(e_vars, dict): docker_env.update(e_vars) else: for v in e_vars: key, val = v.split("=") docker_env[key] = val del container_kwargs["environment"] try: docker_response = client.containers.run( docker_image, command=execute_step_args.get_command_args(), # pass through this worker's environment for things like AWS creds etc. environment=docker_env, network=docker_config.get("network", None), **container_kwargs, ) res = docker_response.decode("utf-8") except docker.errors.ContainerError as err: entries = [MetadataEntry("Job image", value=docker_image)] if err.stderr is not None: entries.append(MetadataEntry("Docker stderr", value=err.stderr)) instance.report_engine_event( "Failed to run steps {} in Docker container {}".format(step_keys_str, docker_image), pipeline_run, EngineEventData(entries), CeleryDockerExecutor, step_key=execute_step_args.step_keys_to_execute[0], ) raise else: if res is None: raise Exception("No response from execute_step in CeleryDockerExecutor") events = filter_dagster_events_from_cli_logs(res.split("\n")) serialized_events += [serialize_value(event) for event in events] return serialized_events return _execute_step_docker