import importlib
import os
import warnings
from datetime import datetime
from functools import update_wrapper
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
from typing_extensions import Self
import dagster._check as check
from dagster._annotations import public
from dagster._config import Field, Shape, StringSource
from dagster._config.config_type import ConfigType
from dagster._config.validate import validate_config
from dagster._core.definitions.composition import MappedInputPlaceholder
from dagster._core.definitions.dependency import (
DynamicCollectDependencyDefinition,
IDependencyDefinition,
MultiDependencyDefinition,
Node,
NodeHandle,
NodeInputHandle,
NodeInvocation,
NodeOutput,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataEntry
from dagster._core.definitions.node_definition import NodeDefinition
from dagster._core.definitions.partition import DynamicPartitionsDefinition
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.utils import check_valid_name
from dagster._core.errors import (
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidSubsetError,
)
from dagster._core.selector.subset_selector import (
AssetSelectionData,
OpSelectionData,
SelectionTreeBranch,
SelectionTreeLeaf,
parse_op_selection,
)
from dagster._core.storage.io_manager import IOManagerDefinition, io_manager
from dagster._core.utils import str_format_set
from dagster._utils.merger import merge_dicts
from .asset_layer import AssetLayer, build_asset_selection_job
from .config import ConfigMapping
from .dependency import DependencyDefinition, GraphNode
from .executor_definition import ExecutorDefinition, multi_or_in_process_executor
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
from .hook_definition import HookDefinition
from .logger_definition import LoggerDefinition
from .metadata import RawMetadataValue
from .mode import ModeDefinition
from .partition import PartitionedConfig, PartitionsDefinition, PartitionSetDefinition
from .pipeline_definition import PipelineDefinition
from .preset import PresetDefinition
from .resource_definition import ResourceDefinition
from .run_request import RunRequest
from .utils import DEFAULT_IO_MANAGER_KEY
from .version_strategy import VersionStrategy
if TYPE_CHECKING:
from dagster._core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster._core.execution.resources_init import InitResourceContext
from dagster._core.instance import DagsterInstance
from dagster._core.snap import PipelineSnapshot
[docs]class JobDefinition(PipelineDefinition):
_cached_partition_set: Optional["PartitionSetDefinition"]
_subset_selection_data: Optional[Union[OpSelectionData, AssetSelectionData]]
input_values: Mapping[str, object]
def __init__(
self,
*,
graph_def: GraphDefinition,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
executor_def: Optional[ExecutorDefinition] = None,
logger_defs: Optional[Mapping[str, LoggerDefinition]] = None,
name: Optional[str] = None,
config: Optional[Union[ConfigMapping, Mapping[str, object], PartitionedConfig]] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
tags: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
hook_defs: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
_subset_selection_data: Optional[Union[OpSelectionData, AssetSelectionData]] = None,
asset_layer: Optional[AssetLayer] = None,
input_values: Optional[Mapping[str, object]] = None,
_metadata_entries: Optional[Sequence[MetadataEntry]] = None,
_executor_def_specified: Optional[bool] = None,
_logger_defs_specified: Optional[bool] = None,
_preset_defs: Optional[Sequence[PresetDefinition]] = None,
):
from dagster._core.definitions.run_config import RunConfig, convert_config_input
from dagster._loggers import default_loggers
check.inst_param(graph_def, "graph_def", GraphDefinition)
resource_defs = check.opt_mapping_param(
resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition
)
# We need to check whether an actual executor/logger def was passed in
# before we set a default executor/logger defs. This is so we can
# determine if someone passed in the default executor vs the system set
# it directly. Once JobDefinition no longer subclasses
# PipelineDefinition, we can change the default executor to be set
# elsewhere to avoid the need for this check.
self._executor_def_specified = (
_executor_def_specified
if _executor_def_specified is not None
else executor_def is not None
)
self._logger_defs_specified = (
_logger_defs_specified
if _logger_defs_specified is not None
else logger_defs is not None
)
executor_def = check.opt_inst_param(
executor_def, "executor_def", ExecutorDefinition, default=multi_or_in_process_executor
)
check.opt_mapping_param(
logger_defs,
"logger_defs",
key_type=str,
value_type=LoggerDefinition,
)
logger_defs = logger_defs or default_loggers()
name = check_valid_name(check.opt_str_param(name, "name", default=graph_def.name))
config = check.opt_inst_param(
config, "config", (Mapping, ConfigMapping, PartitionedConfig, RunConfig)
)
config = convert_config_input(config)
description = check.opt_str_param(description, "description")
partitions_def = check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
)
tags = check.opt_mapping_param(tags, "tags", key_type=str)
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
hook_defs = check.opt_set_param(hook_defs, "hook_defs")
op_retry_policy = check.opt_inst_param(op_retry_policy, "op_retry_policy", RetryPolicy)
version_strategy = check.opt_inst_param(
version_strategy, "version_strategy", VersionStrategy
)
_subset_selection_data = check.opt_inst_param(
_subset_selection_data, "_subset_selection_data", (OpSelectionData, AssetSelectionData)
)
asset_layer = check.opt_inst_param(asset_layer, "asset_layer", AssetLayer)
input_values = check.opt_mapping_param(input_values, "input_values", key_type=str)
_metadata_entries = check.opt_sequence_param(_metadata_entries, "_metadata_entries")
_preset_defs = check.opt_sequence_param(
_preset_defs, "preset_defs", of_type=PresetDefinition
)
did_user_provide_resources = bool(resource_defs)
if resource_defs and DEFAULT_IO_MANAGER_KEY in resource_defs:
resource_defs_with_defaults = resource_defs
else:
resource_defs_with_defaults = merge_dicts(
{DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs or {}
)
presets = []
config_mapping = None
partitioned_config = None
self._explicit_config = False
if partitions_def:
partitioned_config = PartitionedConfig.from_flexible_config(config, partitions_def)
else:
if isinstance(config, ConfigMapping):
config_mapping = config
elif isinstance(config, PartitionedConfig):
partitioned_config = config
elif isinstance(config, dict):
check.invariant(
len(_preset_defs) == 0,
(
"Bad state: attempted to pass preset definitions to job alongside config"
" dictionary."
),
)
presets = [PresetDefinition(name="default", run_config=config)]
# Using config mapping here is a trick to make it so that the preset will be used even
# when no config is supplied for the job.
config_mapping = _config_mapping_with_default_value(
get_run_config_schema_for_job(
graph_def,
resource_defs_with_defaults,
executor_def,
logger_defs,
asset_layer,
),
config,
name,
)
self._explicit_config = True
elif config is not None:
check.failed(
"config param must be a ConfigMapping, a PartitionedConfig, or a dictionary,"
f" but is an object of type {type(config)}"
)
# Exists for backcompat - JobDefinition is implemented as a single-mode pipeline.
mode_def = ModeDefinition(
resource_defs=resource_defs_with_defaults,
logger_defs=logger_defs,
executor_defs=[executor_def] if executor_def else None,
_config_mapping=config_mapping,
_partitioned_config=partitioned_config,
)
self._cached_partition_set: Optional["PartitionSetDefinition"] = None
self._subset_selection_data = _subset_selection_data
self.input_values = input_values
for input_name in sorted(list(self.input_values.keys())):
if not graph_def.has_input(input_name):
raise DagsterInvalidDefinitionError(
f"Error when constructing JobDefinition '{name}': Input value provided for key"
f" '{input_name}', but job has no top-level input with that name."
)
super(JobDefinition, self).__init__(
name=name,
description=description,
mode_defs=[mode_def],
preset_defs=presets or _preset_defs,
tags=tags,
metadata=metadata,
metadata_entries=_metadata_entries,
hook_defs=hook_defs,
solid_retry_policy=op_retry_policy,
graph_def=graph_def,
version_strategy=version_strategy,
asset_layer=asset_layer or _infer_asset_layer_from_source_asset_deps(graph_def),
_should_validate_resource_requirements=did_user_provide_resources,
)
@property
def target_type(self) -> str:
return "job"
@property
def is_job(self) -> bool:
return True
def describe_target(self):
return f"{self.target_type} '{self.name}'"
@public
@property
def executor_def(self) -> ExecutorDefinition:
return self.get_mode_definition().executor_defs[0]
@public
@property
def resource_defs(self) -> Mapping[str, ResourceDefinition]:
return self.get_mode_definition().resource_defs
@public
@property
def partitioned_config(self) -> Optional[PartitionedConfig]:
return self.get_mode_definition().partitioned_config
@public
@property
def config_mapping(self) -> Optional[ConfigMapping]:
return self.get_mode_definition().config_mapping
@public
@property
def loggers(self) -> Mapping[str, LoggerDefinition]:
return self.get_mode_definition().loggers
[docs] @public
def execute_in_process(
self,
run_config: Optional[Mapping[str, Any]] = None,
instance: Optional["DagsterInstance"] = None,
partition_key: Optional[str] = None,
raise_on_error: bool = True,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
tags: Optional[Mapping[str, str]] = None,
resources: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
"""Execute the Job in-process, gathering results in-memory.
The `executor_def` on the Job will be ignored, and replaced with the in-process executor.
If using the default `io_manager`, it will switch from filesystem to in-memory.
Args:
run_config (Optional[Mapping[str, Any]]:
The configuration for the run
instance (Optional[DagsterInstance]):
The instance to execute against, an ephemeral one will be used if none provided.
partition_key: (Optional[str])
The string partition key that specifies the run config to execute. Can only be used
to select run config for jobs with partitioned config.
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
Defaults to ``True``.
op_selection (Optional[Sequence[str]]): A list of op selection queries (including single op
names) to execute. For example:
* ``['some_op']``: selects ``some_op`` itself.
* ``['*some_op']``: select ``some_op`` and all its ancestors (upstream dependencies).
* ``['*some_op+++']``: select ``some_op``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.
resources (Optional[Mapping[str, Any]]):
The resources needed if any are required. Can provide resource instances directly,
or resource definitions.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
"""
from dagster._core.definitions.executor_definition import execute_in_process_executor
from dagster._core.definitions.run_config import convert_config_input
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.execution.execute_in_process import core_execute_in_process
run_config = check.opt_mapping_param(convert_config_input(run_config), "run_config")
op_selection = check.opt_sequence_param(op_selection, "op_selection", str)
asset_selection = check.opt_sequence_param(asset_selection, "asset_selection", AssetKey)
resources = check.opt_mapping_param(resources, "resources", key_type=str)
resource_defs = wrap_resources_for_execution(resources)
check.invariant(
not (op_selection and asset_selection),
(
"op_selection and asset_selection cannot both be provided as args to"
" execute_in_process"
),
)
partition_key = check.opt_str_param(partition_key, "partition_key")
input_values = check.opt_mapping_param(input_values, "input_values")
# Combine provided input values at execute_in_process with input values
# provided to the definition. Input values provided at
# execute_in_process will override those provided on the definition.
input_values = merge_dicts(self.input_values, input_values)
bound_resource_defs = dict(self.resource_defs)
logger_defs = dict(self.loggers)
ephemeral_job = JobDefinition(
name=self._name,
graph_def=self._graph_def,
resource_defs={**_swap_default_io_man(bound_resource_defs, self), **resource_defs},
executor_def=execute_in_process_executor,
logger_defs=logger_defs,
hook_defs=self.hook_defs,
config=self.config_mapping or self.partitioned_config,
tags=self.tags,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
asset_layer=self.asset_layer,
input_values=input_values,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
ephemeral_job = ephemeral_job.get_job_def_for_subset_selection(
op_selection, frozenset(asset_selection) if asset_selection else None
)
merged_tags = merge_dicts(self.tags, tags or {})
if partition_key:
if not self.partitioned_config:
check.failed(
f"Provided partition key `{partition_key}` for job `{self._name}` without a"
" partitioned config"
)
partition_set = self.get_partition_set_def()
if not partition_set:
check.failed(
f"Provided partition key `{partition_key}` for job `{self._name}` without a"
" partitioned config"
)
partition = partition_set.get_partition(partition_key, instance)
run_config = (
run_config if run_config else partition_set.run_config_for_partition(partition)
)
merged_tags.update(partition_set.tags_for_partition(partition))
return core_execute_in_process(
ephemeral_pipeline=ephemeral_job,
run_config=run_config,
instance=instance,
output_capturing_enabled=True,
raise_on_error=raise_on_error,
run_tags=merged_tags,
run_id=run_id,
asset_selection=frozenset(asset_selection),
)
@property
def op_selection_data(self) -> Optional[OpSelectionData]:
return (
self._subset_selection_data
if isinstance(self._subset_selection_data, OpSelectionData)
else None
)
@property
def asset_selection_data(self) -> Optional[AssetSelectionData]:
return (
self._subset_selection_data
if isinstance(self._subset_selection_data, AssetSelectionData)
else None
)
@property
def is_subset_pipeline(self) -> bool:
if self._subset_selection_data:
return True
return False
def get_job_def_for_subset_selection(
self,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[AbstractSet[AssetKey]] = None,
) -> Self:
check.invariant(
not (op_selection and asset_selection),
(
"op_selection and asset_selection cannot both be provided as args to"
" execute_in_process"
),
)
if op_selection:
return self._get_job_def_for_op_selection(op_selection)
if asset_selection:
return self._get_job_def_for_asset_selection(asset_selection)
else:
return self
def _get_job_def_for_asset_selection(
self,
asset_selection: Optional[AbstractSet[AssetKey]] = None,
) -> Self:
asset_selection = check.opt_set_param(asset_selection, "asset_selection", AssetKey)
nonexistent_assets = [
asset
for asset in asset_selection
if asset not in self.asset_layer.asset_keys
and asset not in self.asset_layer.source_assets_by_key
]
nonexistent_asset_strings = [
asset_str
for asset_str in (asset.to_string() for asset in nonexistent_assets)
if asset_str
]
if nonexistent_assets:
raise DagsterInvalidSubsetError(
"Assets provided in asset_selection argument "
f"{', '.join(nonexistent_asset_strings)} do not exist in parent asset group or job."
)
asset_selection_data = AssetSelectionData(
asset_selection=asset_selection,
parent_job_def=self,
)
check.invariant(
self.asset_layer.assets_defs_by_key is not None,
"Asset layer must have _asset_defs argument defined",
)
new_job = build_asset_selection_job(
name=self.name,
assets=set(self.asset_layer.assets_defs_by_key.values()),
source_assets=self.asset_layer.source_assets_by_key.values(),
executor_def=self.executor_def,
resource_defs=self.resource_defs,
description=self.description,
tags=self.tags,
asset_selection=asset_selection,
asset_selection_data=asset_selection_data,
config=self.config_mapping or self.partitioned_config,
)
return new_job
def _get_job_def_for_op_selection(
self,
op_selection: Optional[Sequence[str]] = None,
) -> Self:
if not op_selection:
return self
op_selection = check.opt_sequence_param(op_selection, "op_selection", str)
resolved_op_selection_dict = parse_op_selection(self, op_selection)
try:
sub_graph = get_subselected_graph_definition(self.graph, resolved_op_selection_dict)
# if explicit config was passed the config_mapping that resolves the defaults implicitly is
# very unlikely to work. The preset will still present the default config in dagit.
if self._explicit_config:
config_arg = None
else:
config_arg = self.config_mapping or self.partitioned_config
return JobDefinition(
name=self.name,
description=self.description,
resource_defs=dict(self.resource_defs),
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config=config_arg,
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
graph_def=sub_graph,
version_strategy=self.version_strategy,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_subset_selection_data=OpSelectionData(
op_selection=op_selection,
resolved_op_selection=set(
resolved_op_selection_dict.keys()
), # equivalent to solids_to_execute. currently only gets top level nodes.
parent_job_def=self, # used by pipeline snapshot lineage
),
# TODO: subset this structure.
# https://github.com/dagster-io/dagster/issues/7541
asset_layer=self.asset_layer,
_preset_defs=self._preset_defs,
)
except DagsterInvalidDefinitionError as exc:
# This handles the case when you construct a subset such that an unsatisfied
# input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
# we re-raise a DagsterInvalidSubsetError.
raise DagsterInvalidSubsetError(
f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
f"{self.graph.name} results in an invalid graph."
) from exc
def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:
mode = self.get_mode_definition()
if not mode.partitioned_config:
return None
if not self._cached_partition_set:
tags_fn = mode.partitioned_config.tags_for_partition_fn
if not tags_fn:
tags_fn = lambda _: {}
self._cached_partition_set = PartitionSetDefinition(
job_name=self.name,
name=f"{self.name}_partition_set",
partitions_def=mode.partitioned_config.partitions_def,
run_config_fn_for_partition=mode.partitioned_config.run_config_for_partition_fn,
tags_fn_for_partition=tags_fn,
mode=mode.name,
)
return self._cached_partition_set
@public
@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
mode = self.get_mode_definition()
if not mode.partitioned_config:
return None
return mode.partitioned_config.partitions_def
[docs] @public
def run_request_for_partition(
self,
partition_key: str,
run_key: Optional[str] = None,
tags: Optional[Mapping[str, str]] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
run_config: Optional[Mapping[str, Any]] = None,
instance: Optional["DagsterInstance"] = None,
current_time: Optional[datetime] = None,
) -> RunRequest:
"""Creates a RunRequest object for a run that processes the given partition.
Args:
partition_key: The key of the partition to request a run for.
run_key (Optional[str]): A string key to identify this launched run. For sensors, ensures that
only one run is created per run key across all sensor evaluations. For schedules,
ensures that one run is created per tick, across failure recoveries. Passing in a `None`
value means that a run will always be launched per evaluation.
tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
to the launched run.
run_config (Optional[Mapping[str, Any]]: Configuration for the run. If the job has
a :py:class:`PartitionedConfig`, this value will override replace the config
provided by it.
current_time (Optional[datetime): Used to determine which time-partitions exist.
Defaults to now.
Returns:
RunRequest: an object that requests a run to process the given partition.
"""
partition_set = self.get_partition_set_def()
if not partition_set:
check.failed("Called run_request_for_partition on a non-partitioned job")
if isinstance(partition_set.partitions_def, DynamicPartitionsDefinition):
if not instance:
check.failed(
"Must provide a dagster instance when calling run_request_for_partition on a "
"dynamic partition set"
)
partition = partition_set.get_partition(
partition_key, dynamic_partitions_store=instance, current_time=current_time
)
run_request_tags = (
{**tags, **partition_set.tags_for_partition(partition)}
if tags
else partition_set.tags_for_partition(partition)
)
return RunRequest(
run_key=run_key,
run_config=run_config
if run_config is not None
else partition_set.run_config_for_partition(partition),
tags=run_request_tags,
job_name=self.name,
asset_selection=asset_selection,
)
[docs] @public
def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "JobDefinition":
"""Apply a set of hooks to all op instances within the job."""
hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition)
job_def = JobDefinition(
name=self.name,
graph_def=self._graph_def,
resource_defs=dict(self.resource_defs),
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config=self.partitioned_config or self.config_mapping,
tags=self.tags,
hook_defs=hook_defs | self.hook_defs,
description=self._description,
op_retry_policy=self._solid_retry_policy,
asset_layer=self.asset_layer,
_subset_selection_data=self._subset_selection_data,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
update_wrapper(job_def, self, updated=())
return job_def
[docs] @public
def with_top_level_resources(
self, resource_defs: Mapping[str, ResourceDefinition]
) -> "JobDefinition":
"""Apply a set of resources to all op instances within the job."""
resource_defs = check.dict_param(resource_defs, "resource_defs", key_type=str)
merged_resource_defs = {
**resource_defs,
**self.resource_defs,
}
# If we are using the default io_manager, we want to replace it with the one
# provided at the top level
if (
"io_manager" in resource_defs
and self.resource_defs.get("io_manager") == default_job_io_manager
):
merged_resource_defs["io_manager"] = resource_defs["io_manager"]
job_def = JobDefinition(
name=self._name,
graph_def=self._graph_def,
resource_defs=merged_resource_defs,
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config=self.partitioned_config or self.config_mapping,
description=self._description,
tags=self._tags,
hook_defs=self._hook_defs,
version_strategy=self.version_strategy,
_subset_selection_data=self._subset_selection_data,
asset_layer=self._asset_layer,
_metadata_entries=self._metadata_entries,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
update_wrapper(job_def, self, updated=())
return job_def
def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
if self.op_selection_data:
return self.op_selection_data.parent_job_def.get_pipeline_snapshot()
elif self.asset_selection_data:
return self.asset_selection_data.parent_job_def.get_pipeline_snapshot()
else:
return None
def has_direct_input_value(self, input_name: str) -> bool:
return input_name in self.input_values
def get_direct_input_value(self, input_name: str) -> object:
if input_name not in self.input_values:
raise DagsterInvalidInvocationError(
f"On job '{self.name}', attempted to retrieve input value for input named"
f" '{input_name}', but no value was provided. Provided input values:"
f" {sorted(list(self.input_values.keys()))}"
)
return self.input_values[input_name]
def with_executor_def(self, executor_def: ExecutorDefinition) -> "JobDefinition":
return JobDefinition(
graph_def=self.graph,
resource_defs=dict(self.resource_defs),
executor_def=executor_def,
logger_defs=dict(self.loggers),
config=self.config_mapping or self.partitioned_config,
name=self.name,
description=self.description,
tags=self.tags,
_metadata_entries=self.metadata,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
_subset_selection_data=self._subset_selection_data,
asset_layer=self.asset_layer,
input_values=self.input_values,
_executor_def_specified=False,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
def with_logger_defs(self, logger_defs: Mapping[str, LoggerDefinition]) -> "JobDefinition":
return JobDefinition(
graph_def=self.graph,
resource_defs=dict(self.resource_defs),
executor_def=self.executor_def,
logger_defs=logger_defs,
config=self.config_mapping or self.partitioned_config,
name=self.name,
description=self.description,
tags=self.tags,
_metadata_entries=self.metadata,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
_subset_selection_data=self._subset_selection_data,
asset_layer=self.asset_layer,
input_values=self.input_values,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=False,
_preset_defs=self._preset_defs,
)
def _swap_default_io_man(resources: Mapping[str, ResourceDefinition], job: PipelineDefinition):
"""Used to create the user facing experience of the default io_manager
switching to in-memory when using execute_in_process.
"""
from dagster._core.storage.mem_io_manager import mem_io_manager
if (
resources.get(DEFAULT_IO_MANAGER_KEY) in [default_job_io_manager]
and job.version_strategy is None
):
updated_resources = dict(resources)
updated_resources[DEFAULT_IO_MANAGER_KEY] = mem_io_manager
return updated_resources
return resources
def _dep_key_of(node: Node) -> NodeInvocation:
return NodeInvocation(
name=node.definition.name,
alias=node.name,
tags=node.tags,
hook_defs=node.hook_defs,
retry_policy=node.retry_policy,
)
def get_subselected_graph_definition(
graph: GraphDefinition,
resolved_op_selection_dict: SelectionTreeBranch,
parent_handle: Optional[NodeHandle] = None,
) -> SubselectedGraphDefinition:
deps: Dict[
Union[str, NodeInvocation],
Dict[str, IDependencyDefinition],
] = {}
selected_nodes: List[Tuple[str, NodeDefinition]] = []
for node in graph.nodes_in_topological_order:
node_handle = NodeHandle(node.name, parent=parent_handle)
# skip if the node isn't selected
if node.name not in resolved_op_selection_dict:
continue
# rebuild graph if any nodes inside the graph are selected
definition: Union[SubselectedGraphDefinition, NodeDefinition]
selection_node = resolved_op_selection_dict[node.name]
if isinstance(node, GraphNode) and not isinstance(selection_node, SelectionTreeLeaf):
definition = get_subselected_graph_definition(
node.definition,
selection_node,
parent_handle=node_handle,
)
# use definition if the node as a whole is selected. this includes selecting the entire graph
else:
definition = node.definition
selected_nodes.append((node.name, definition))
# build dependencies for the node. we do it for both cases because nested graphs can have
# inputs and outputs too
deps[_dep_key_of(node)] = {}
for node_input in node.inputs():
if graph.dependency_structure.has_direct_dep(node_input):
node_output = graph.dependency_structure.get_direct_dep(node_input)
if node_output.node.name in resolved_op_selection_dict:
deps[_dep_key_of(node)][node_input.input_def.name] = DependencyDefinition(
node=node_output.node.name, output=node_output.output_def.name
)
elif graph.dependency_structure.has_dynamic_fan_in_dep(node_input):
node_output = graph.dependency_structure.get_dynamic_fan_in_dep(node_input)
if node_output.node.name in resolved_op_selection_dict:
deps[_dep_key_of(node)][
node_input.input_def.name
] = DynamicCollectDependencyDefinition(
node_name=node_output.node.name,
output_name=node_output.output_def.name,
)
elif graph.dependency_structure.has_fan_in_deps(node_input):
outputs = graph.dependency_structure.get_fan_in_deps(node_input)
multi_dependencies = [
DependencyDefinition(
node=output_handle.node.name, output=output_handle.output_def.name
)
for output_handle in outputs
if (
isinstance(output_handle, NodeOutput)
and output_handle.node.name in resolved_op_selection_dict
)
]
deps[_dep_key_of(node)][node_input.input_def.name] = MultiDependencyDefinition(
cast(
List[Union[DependencyDefinition, Type[MappedInputPlaceholder]]],
multi_dependencies,
)
)
# else input is unconnected
# filter out unselected input/output mapping
new_input_mappings = list(
filter(
lambda input_mapping: input_mapping.maps_to.node_name
in [name for name, _ in selected_nodes],
graph._input_mappings, # noqa: SLF001
)
)
new_output_mappings = list(
filter(
lambda output_mapping: output_mapping.maps_from.node_name
in [name for name, _ in selected_nodes],
graph._output_mappings, # noqa: SLF001
)
)
return SubselectedGraphDefinition(
parent_graph_def=graph,
dependencies=deps,
node_defs=[definition for _, definition in selected_nodes],
input_mappings=new_input_mappings,
output_mappings=new_output_mappings,
)
def get_direct_input_values_from_job(target: PipelineDefinition) -> Mapping[str, Any]:
if target.is_job:
return cast(JobDefinition, target).input_values
else:
return {}
@io_manager(
description="Built-in filesystem IO manager that stores and retrieves values using pickling."
)
def default_job_io_manager(init_context: "InitResourceContext"):
# support overriding the default io manager via environment variables
module_name = os.getenv("DAGSTER_DEFAULT_IO_MANAGER_MODULE")
attribute_name = os.getenv("DAGSTER_DEFAULT_IO_MANAGER_ATTRIBUTE")
silence_failures = os.getenv("DAGSTER_DEFAULT_IO_MANAGER_SILENCE_FAILURES")
if module_name and attribute_name:
from dagster._core.execution.build_resources import build_resources
try:
module = importlib.import_module(module_name)
attr = getattr(module, attribute_name)
check.invariant(
isinstance(attr, IOManagerDefinition),
(
"DAGSTER_DEFAULT_IO_MANAGER_MODULE and DAGSTER_DEFAULT_IO_MANAGER_ATTRIBUTE"
" must specify an IOManagerDefinition"
),
)
with build_resources({"io_manager": attr}, instance=init_context.instance) as resources:
return resources.io_manager
except Exception as e:
if not silence_failures:
raise
else:
warnings.warn(
f"Failed to load io manager override with module: {module_name} attribute:"
f" {attribute_name}: {e}\nFalling back to default io manager."
)
# normally, default to the fs_io_manager
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
instance = check.not_none(init_context.instance)
return PickledObjectFilesystemIOManager(base_dir=instance.storage_directory())
@io_manager(
description="Built-in filesystem IO manager that stores and retrieves values using pickling.",
config_schema={"base_dir": Field(StringSource, is_required=False)},
)
def default_job_io_manager_with_fs_io_manager_schema(init_context: "InitResourceContext"):
# support overriding the default io manager via environment variables
module_name = os.getenv("DAGSTER_DEFAULT_IO_MANAGER_MODULE")
attribute_name = os.getenv("DAGSTER_DEFAULT_IO_MANAGER_ATTRIBUTE")
silence_failures = os.getenv("DAGSTER_DEFAULT_IO_MANAGER_SILENCE_FAILURES")
if module_name and attribute_name:
from dagster._core.execution.build_resources import build_resources
try:
module = importlib.import_module(module_name)
attr = getattr(module, attribute_name)
check.invariant(
isinstance(attr, IOManagerDefinition),
(
"DAGSTER_DEFAULT_IO_MANAGER_MODULE and DAGSTER_DEFAULT_IO_MANAGER_ATTRIBUTE"
" must specify an IOManagerDefinition"
),
)
with build_resources({"io_manager": attr}, instance=init_context.instance) as resources:
return resources.io_manager
except Exception as e:
if not silence_failures:
raise
else:
warnings.warn(
f"Failed to load io manager override with module: {module_name} attribute:"
f" {attribute_name}: {e}\nFalling back to default io manager."
)
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
# normally, default to the fs_io_manager
base_dir = init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory() if init_context.instance else None
)
return PickledObjectFilesystemIOManager(base_dir=base_dir)
def _config_mapping_with_default_value(
inner_schema: ConfigType,
default_config: Mapping[str, Any],
job_name: str,
) -> ConfigMapping:
if not isinstance(inner_schema, Shape):
check.failed("Only Shape (dictionary) config_schema allowed on Job ConfigMapping")
def config_fn(x):
return x
updated_fields = {}
field_aliases = inner_schema.field_aliases
for name, field in inner_schema.fields.items():
if name in default_config:
updated_fields[name] = Field(
config=field.config_type,
default_value=default_config[name],
description=field.description,
)
elif name in field_aliases and field_aliases[name] in default_config:
updated_fields[name] = Field(
config=field.config_type,
default_value=default_config[field_aliases[name]],
description=field.description,
)
else:
updated_fields[name] = field
config_schema = Shape(
fields=updated_fields,
description=(
"This run config schema was automatically populated with default values "
"from `default_config`."
),
field_aliases=inner_schema.field_aliases,
)
config_evr = validate_config(config_schema, default_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
f"Error in config when building job '{job_name}' ",
config_evr.errors,
default_config,
)
return ConfigMapping(
config_fn=config_fn, config_schema=config_schema, receive_processed_config_values=False
)
def get_run_config_schema_for_job(
graph_def: GraphDefinition,
resource_defs: Mapping[str, ResourceDefinition],
executor_def: "ExecutorDefinition",
logger_defs: Mapping[str, LoggerDefinition],
asset_layer: Optional[AssetLayer],
) -> ConfigType:
return (
JobDefinition(
name=graph_def.name,
graph_def=graph_def,
resource_defs=resource_defs,
executor_def=executor_def,
logger_defs=logger_defs,
asset_layer=asset_layer,
)
.get_run_config_schema("default")
.run_config_schema_type
)
def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) -> AssetLayer:
"""For non-asset jobs that have some inputs that are fed from SourceAssets, constructs an
AssetLayer that includes those SourceAssets.
"""
asset_keys_by_node_input_handle: Dict[NodeInputHandle, AssetKey] = {}
source_assets_list = []
source_asset_keys_set = set()
io_manager_keys_by_asset_key: Mapping[AssetKey, str] = {}
# each entry is a graph definition and its handle relative to the job root
stack: List[Tuple[GraphDefinition, Optional[NodeHandle]]] = [(job_graph_def, None)]
while stack:
graph_def, parent_node_handle = stack.pop()
for node_name, input_source_assets in graph_def.node_input_source_assets.items():
node_handle = NodeHandle(node_name, parent_node_handle)
for input_name, source_asset in input_source_assets.items():
if source_asset.key not in source_asset_keys_set:
source_asset_keys_set.add(source_asset.key)
source_assets_list.append(source_asset)
input_handle = NodeInputHandle(node_handle, input_name)
asset_keys_by_node_input_handle[input_handle] = source_asset.key
for resolved_input_handle in graph_def.node_dict[
node_name
].definition.resolve_input_to_destinations(input_handle):
asset_keys_by_node_input_handle[resolved_input_handle] = source_asset.key
if source_asset.io_manager_key:
io_manager_keys_by_asset_key[source_asset.key] = source_asset.io_manager_key
for node_name, node in graph_def.node_dict.items():
if isinstance(node.definition, GraphDefinition):
stack.append((node.definition, NodeHandle(node_name, parent_node_handle)))
return AssetLayer(
asset_keys_by_node_input_handle=asset_keys_by_node_input_handle,
asset_info_by_node_output_handle={},
asset_deps={},
dependency_node_handles_by_asset_key={},
assets_defs=[],
source_asset_defs=source_assets_list,
io_manager_keys_by_asset_key=io_manager_keys_by_asset_key,
node_output_handles_to_dep_asset_keys={},
partition_mappings_by_asset_dep={},
)