Source code for dagster._core.execution.context.output

import warnings
from typing import (
    TYPE_CHECKING,
    Any,
    ContextManager,
    Iterator,
    List,
    Mapping,
    Optional,
    Sequence,
    Union,
    cast,
)

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.asset_layer import AssetOutputInfo
from dagster._core.definitions.events import (
    AssetKey,
    AssetMaterialization,
    AssetObservation,
    Materialization,
)
from dagster._core.definitions.metadata import MetadataEntry, RawMetadataValue
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._core.errors import DagsterInvalidMetadata, DagsterInvariantViolationError
from dagster._core.execution.plan.utils import build_resources_for_manager

if TYPE_CHECKING:
    from dagster._core.definitions import PartitionsDefinition, PipelineDefinition
    from dagster._core.definitions.op_definition import OpDefinition
    from dagster._core.definitions.resource_definition import Resources
    from dagster._core.events import DagsterEvent
    from dagster._core.execution.context.system import StepExecutionContext
    from dagster._core.execution.plan.outputs import StepOutputHandle
    from dagster._core.execution.plan.plan import ExecutionPlan
    from dagster._core.log_manager import DagsterLogManager
    from dagster._core.system_config.objects import ResolvedRunConfig
    from dagster._core.types.dagster_type import DagsterType

RUN_ID_PLACEHOLDER = "__EPHEMERAL_RUN_ID"


[docs]class OutputContext: """The context object that is available to the `handle_output` method of an :py:class:`IOManager`. Users should not instantiate this object directly. To construct an `OutputContext` for testing an IO Manager's `handle_output` method, use :py:func:`dagster.build_output_context`. Attributes: step_key (Optional[str]): The step_key for the compute step that produced the output. name (Optional[str]): The name of the output that produced the output. run_id (Optional[str]): The id of the run that produced the output. metadata (Optional[Mapping[str, RawMetadataValue]]): A dict of the metadata that is assigned to the OutputDefinition that produced the output. mapping_key (Optional[str]): The key that identifies a unique mapped output. None for regular outputs. config (Optional[Any]): The configuration for the output. dagster_type (Optional[DagsterType]): The type of this output. log (Optional[DagsterLogManager]): The log manager to use for this output. version (Optional[str]): (Experimental) The version of the output. resource_config (Optional[Mapping[str, Any]]): The config associated with the resource that initializes the RootInputManager. resources (Optional[Resources]): The resources required by the output manager, specified by the `required_resource_keys` parameter. op_def (Optional[OpDefinition]): The definition of the op that produced the output. asset_info: Optional[AssetOutputInfo]: (Experimental) Asset info corresponding to the output. Example: .. code-block:: python from dagster import IOManager, OutputContext class MyIOManager(IOManager): def handle_output(self, context: OutputContext, obj): ... """ _step_key: Optional[str] _name: Optional[str] _pipeline_name: Optional[str] _run_id: Optional[str] _metadata: Optional[Mapping[str, RawMetadataValue]] _mapping_key: Optional[str] _config: object _op_def: Optional["OpDefinition"] _dagster_type: Optional["DagsterType"] _log: Optional["DagsterLogManager"] _version: Optional[str] _resource_config: Optional[Mapping[str, object]] _step_context: Optional["StepExecutionContext"] _asset_info: Optional[AssetOutputInfo] _warn_on_step_context_use: bool _resources: Optional["Resources"] _resources_cm: Optional[ContextManager["Resources"]] _resources_contain_cm: Optional[bool] _cm_scope_entered: Optional[bool] _events: List["DagsterEvent"] _user_events: List[Union[AssetMaterialization, AssetObservation, Materialization]] _metadata_entries: Optional[Sequence[MetadataEntry]] def __init__( self, step_key: Optional[str] = None, name: Optional[str] = None, pipeline_name: Optional[str] = None, run_id: Optional[str] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, mapping_key: Optional[str] = None, config: object = None, dagster_type: Optional["DagsterType"] = None, log_manager: Optional["DagsterLogManager"] = None, version: Optional[str] = None, resource_config: Optional[Mapping[str, object]] = None, resources: Optional[Union["Resources", Mapping[str, object]]] = None, step_context: Optional["StepExecutionContext"] = None, op_def: Optional["OpDefinition"] = None, asset_info: Optional[AssetOutputInfo] = None, warn_on_step_context_use: bool = False, partition_key: Optional[str] = None, ): from dagster._core.definitions.resource_definition import IContainsGenerator, Resources from dagster._core.execution.build_resources import build_resources self._step_key = step_key self._name = name self._pipeline_name = pipeline_name self._run_id = run_id self._metadata = metadata self._mapping_key = mapping_key self._config = config self._op_def = op_def self._dagster_type = dagster_type self._log = log_manager self._version = version self._resource_config = resource_config self._step_context = step_context self._asset_info = asset_info self._warn_on_step_context_use = warn_on_step_context_use if self._step_context and self._step_context.has_partition_key: self._partition_key: Optional[str] = self._step_context.partition_key else: self._partition_key = partition_key if isinstance(resources, Resources): self._resources_cm = None self._resources = resources else: self._resources_cm = build_resources( check.opt_mapping_param(resources, "resources", key_type=str) ) self._resources = self._resources_cm.__enter__() self._resources_contain_cm = isinstance(self._resources, IContainsGenerator) self._cm_scope_entered = False self._events = [] self._user_events = [] self._metadata_entries = None def __enter__(self): if self._resources_cm: self._cm_scope_entered = True return self def __exit__(self, *exc): if self._resources_cm: self._resources_cm.__exit__(*exc) def __del__(self): if ( hasattr(self, "_resources_cm") and self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered ): self._resources_cm.__exit__(None, None, None) @public @property def step_key(self) -> str: if self._step_key is None: raise DagsterInvariantViolationError( "Attempting to access step_key, " "but it was not provided when constructing the OutputContext" ) return self._step_key @public @property def name(self) -> str: if self._name is None: raise DagsterInvariantViolationError( "Attempting to access name, " "but it was not provided when constructing the OutputContext" ) return self._name @property def pipeline_name(self) -> str: if self._pipeline_name is None: raise DagsterInvariantViolationError( "Attempting to access pipeline_name, " "but it was not provided when constructing the OutputContext" ) return self._pipeline_name @public @property def run_id(self) -> str: if self._run_id is None: raise DagsterInvariantViolationError( "Attempting to access run_id, " "but it was not provided when constructing the OutputContext" ) return self._run_id @public @property def metadata(self) -> Optional[Mapping[str, object]]: return self._metadata @public @property def mapping_key(self) -> Optional[str]: return self._mapping_key @public @property def config(self) -> Any: return self._config @public @property def op_def(self) -> "OpDefinition": from dagster._core.definitions import OpDefinition if self._op_def is None: raise DagsterInvariantViolationError( "Attempting to access op_def, " "but it was not provided when constructing the OutputContext" ) return cast(OpDefinition, self._op_def) @public @property def dagster_type(self) -> "DagsterType": if self._dagster_type is None: raise DagsterInvariantViolationError( "Attempting to access dagster_type, " "but it was not provided when constructing the OutputContext" ) return self._dagster_type @public @property def log(self) -> "DagsterLogManager": if self._log is None: raise DagsterInvariantViolationError( "Attempting to access log, " "but it was not provided when constructing the OutputContext" ) return self._log @public @property def version(self) -> Optional[str]: return self._version @public @property def resource_config(self) -> Optional[Mapping[str, object]]: return self._resource_config @public @property def resources(self) -> Any: if self._resources is None: raise DagsterInvariantViolationError( "Attempting to access resources, " "but it was not provided when constructing the OutputContext" ) if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered: raise DagsterInvariantViolationError( "At least one provided resource is a generator, but attempting to access " "resources outside of context manager scope. You can use the following syntax to " "open a context manager: `with build_output_context(...) as context:`" ) return self._resources @property def asset_info(self) -> Optional[AssetOutputInfo]: return self._asset_info @public @property def has_asset_key(self) -> bool: return self._asset_info is not None @public @property def asset_key(self) -> AssetKey: if self._asset_info is None: raise DagsterInvariantViolationError( "Attempting to access asset_key, " "but it was not provided when constructing the OutputContext" ) return self._asset_info.key @public @property def asset_partitions_def(self) -> "PartitionsDefinition": """The PartitionsDefinition on the asset corresponding to this output.""" asset_key = self.asset_key result = self.step_context.pipeline_def.asset_layer.partitions_def_for_asset(asset_key) if result is None: raise DagsterInvariantViolationError( f"Attempting to access partitions def for asset {asset_key}, but it is not" " partitioned" ) return result @property def step_context(self) -> "StepExecutionContext": if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.step_context" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) if self._step_context is None: raise DagsterInvariantViolationError( "Attempting to access step_context, " "but it was not provided when constructing the OutputContext" ) return self._step_context @public @property def has_partition_key(self) -> bool: """Whether the current run is a partitioned run.""" if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.has_partition_key" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) return self._partition_key is not None @public @property def partition_key(self) -> str: """The partition key for the current run. Raises an error if the current run is not a partitioned run. """ if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.partition_key" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) if self._partition_key is None: check.failed( "Tried to access partition_key on a non-partitioned run.", ) return self._partition_key @public @property def has_asset_partitions(self) -> bool: if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.has_asset_partitions" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) if self._step_context is not None: return self._step_context.has_asset_partitions_for_output(self.name) else: return False @public @property def asset_partition_key(self) -> str: """The partition key for output asset. Raises an error if the output asset has no partitioning, or if the run covers a partition range for the output asset. """ if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.asset_partition_key" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) return self.step_context.asset_partition_key_for_output(self.name) @public @property def asset_partition_key_range(self) -> PartitionKeyRange: """The partition key range for output asset. Raises an error if the output asset has no partitioning. """ if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.asset_partition_key_range" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) return self.step_context.asset_partition_key_range_for_output(self.name) @public @property def asset_partition_keys(self) -> Sequence[str]: """The partition keys for the output asset. Raises an error if the output asset has no partitioning. """ if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.asset_partition_keys" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) return self.asset_partitions_def.get_partition_keys_in_range( self.step_context.asset_partition_key_range_for_output(self.name), dynamic_partitions_store=self.step_context.instance, ) @public @property def asset_partitions_time_window(self) -> TimeWindow: """The time window for the partitions of the output asset. Raises an error if either of the following are true: - The output asset has no partitioning. - The output asset is not partitioned with a TimeWindowPartitionsDefinition. """ if self._warn_on_step_context_use: warnings.warn( "You are using InputContext.upstream_output.asset_partitions_time_window" "This use on upstream_output is deprecated and will fail in the future" "Try to obtain what you need directly from InputContext" "For more details: https://github.com/dagster-io/dagster/issues/7900" ) return self.step_context.asset_partitions_time_window_for_output(self.name) def get_run_scoped_output_identifier(self) -> Sequence[str]: """Utility method to get a collection of identifiers that as a whole represent a unique step output. The unique identifier collection consists of - ``run_id``: the id of the run which generates the output. Note: This method also handles the re-execution memoization logic. If the step that generates the output is skipped in the re-execution, the ``run_id`` will be the id of its parent run. - ``step_key``: the key for a compute step. - ``name``: the name of the output. (default: 'result'). Returns: Sequence[str, ...]: A list of identifiers, i.e. run id, step key, and output name """ warnings.warn( "`OutputContext.get_run_scoped_output_identifier` is deprecated. Use " "`OutputContext.get_identifier` instead." ) # if run_id is None and this is a re-execution, it means we failed to find its source run id check.invariant( self.run_id is not None, "Unable to find the run scoped output identifier: run_id is None on OutputContext.", ) check.invariant( self.step_key is not None, "Unable to find the run scoped output identifier: step_key is None on OutputContext.", ) check.invariant( self.name is not None, "Unable to find the run scoped output identifier: name is None on OutputContext.", ) run_id = cast(str, self.run_id) step_key = cast(str, self.step_key) name = cast(str, self.name) if self.mapping_key: return [run_id, step_key, name, self.mapping_key] return [run_id, step_key, name]
[docs] @public def get_identifier(self) -> Sequence[str]: """Utility method to get a collection of identifiers that as a whole represent a unique step output. If not using memoization, the unique identifier collection consists of - ``run_id``: the id of the run which generates the output. Note: This method also handles the re-execution memoization logic. If the step that generates the output is skipped in the re-execution, the ``run_id`` will be the id of its parent run. - ``step_key``: the key for a compute step. - ``name``: the name of the output. (default: 'result'). If using memoization, the ``version`` corresponding to the step output is used in place of the ``run_id``. Returns: Sequence[str, ...]: A list of identifiers, i.e. (run_id or version), step_key, and output_name """ version = self.version step_key = self.step_key name = self.name if version is not None: check.invariant( self.mapping_key is None, ( f"Mapping key and version both provided for output '{name}' of step" f" '{step_key}'. Dynamic mapping is not supported when using versioning." ), ) identifier = ["versioned_outputs", version, step_key, name] else: run_id = self.run_id identifier = [run_id, step_key, name] if self.mapping_key: identifier.append(self.mapping_key) return identifier
def get_output_identifier(self) -> Sequence[str]: warnings.warn( "`OutputContext.get_output_identifier` is deprecated. Use " "`OutputContext.get_identifier` instead." ) return self.get_identifier() @public def get_asset_identifier(self) -> Sequence[str]: if self.asset_key is not None: if self.has_asset_partitions: return [*self.asset_key.path, self.asset_partition_key] else: return self.asset_key.path else: check.failed("Can't get asset output identifier for an output with no asset key") def get_asset_output_identifier(self) -> Sequence[str]: warnings.warn( "`OutputContext.get_asset_output_identifier` is deprecated. Use " "`OutputContext.get_asset_identifier` instead." ) return self.get_asset_identifier()
[docs] @public def log_event( self, event: Union[AssetObservation, AssetMaterialization, Materialization] ) -> None: """Log an AssetMaterialization or AssetObservation from within the body of an io manager's `handle_output` method. Events logged with this method will appear in the event log. Args: event (Union[AssetMaterialization, Materialization, AssetObservation]): The event to log. Examples: .. code-block:: python from dagster import IOManager, AssetMaterialization class MyIOManager(IOManager): def handle_output(self, context, obj): context.log_event(AssetMaterialization("foo")) """ from dagster._core.events import DagsterEvent if isinstance(event, (AssetMaterialization, Materialization)): if self._step_context: self._events.append(DagsterEvent.asset_materialization(self._step_context, event)) self._user_events.append(event) elif isinstance(event, AssetObservation): if self._step_context: self._events.append(DagsterEvent.asset_observation(self._step_context, event)) self._user_events.append(event) else: check.failed("Unexpected event {event}".format(event=event))
def consume_events(self) -> Iterator["DagsterEvent"]: """Pops and yields all user-generated events that have been recorded from this context. If consume_events has not yet been called, this will yield all logged events since the call to `handle_output`. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method. """ events = self._events self._events = [] yield from events def get_logged_events( self, ) -> Sequence[Union[AssetMaterialization, Materialization, AssetObservation]]: """Retrieve the list of user-generated events that were logged via the context. User-generated events that were yielded will not appear in this list. **Examples:** .. code-block:: python from dagster import IOManager, build_output_context, AssetMaterialization class MyIOManager(IOManager): def handle_output(self, context, obj): ... def test_handle_output(): mgr = MyIOManager() context = build_output_context() mgr.handle_output(context) all_user_events = context.get_logged_events() materializations = [event for event in all_user_events if isinstance(event, AssetMaterialization)] ... """ return self._user_events
[docs] @public def add_output_metadata(self, metadata: Mapping[str, RawMetadataValue]) -> None: """Add a dictionary of metadata to the handled output. Metadata entries added will show up in the HANDLED_OUTPUT and ASSET_MATERIALIZATION events for the run. Args: metadata (Mapping[str, RawMetadataValue]): A metadata dictionary to log Examples: .. code-block:: python from dagster import IOManager class MyIOManager(IOManager): def handle_output(self, context, obj): context.add_output_metadata({"foo": "bar"}) """ from dagster._core.definitions.metadata import normalize_metadata prior_metadata_entries = self._metadata_entries or [] overlapping_labels = {entry.label for entry in prior_metadata_entries} & metadata.keys() if overlapping_labels: raise DagsterInvalidMetadata( f"Tried to add metadata for key(s) that already have metadata: {overlapping_labels}" ) self._metadata_entries = [*prior_metadata_entries, *normalize_metadata(metadata, [])]
def get_logged_metadata_entries( self, ) -> Sequence[MetadataEntry]: """Get the list of metadata entries that have been logged for use with this output.""" return self._metadata_entries or [] def consume_logged_metadata_entries( self, ) -> Sequence[MetadataEntry]: """Pops and yields all user-generated metadata entries that have been recorded from this context. If consume_logged_metadata_entries has not yet been called, this will yield all logged events since the call to `handle_output`. If consume_logged_metadata_entries has been called, it will yield all events since the last time consume_logged_metadata_entries was called. Designed for internal use. Users should never need to invoke this method. """ result = self._metadata_entries self._metadata_entries = [] return result or []
def get_output_context( execution_plan: "ExecutionPlan", pipeline_def: "PipelineDefinition", resolved_run_config: "ResolvedRunConfig", step_output_handle: "StepOutputHandle", run_id: Optional[str], log_manager: Optional["DagsterLogManager"], step_context: Optional["StepExecutionContext"], resources: Optional["Resources"], version: Optional[str], warn_on_step_context_use: bool = False, ) -> "OutputContext": """Args: run_id (str): The run ID of the run that produced the output, not necessarily the run that the context will be used in. """ step = execution_plan.get_step_by_key(step_output_handle.step_key) # get config op_config = resolved_run_config.ops[step.node_handle.to_string()] outputs_config = op_config.outputs if outputs_config: output_config = outputs_config.get_output_manager_config(step_output_handle.output_name) else: output_config = None step_output = execution_plan.get_step_output(step_output_handle) output_def = pipeline_def.get_solid(step_output.solid_handle).output_def_named(step_output.name) io_manager_key = output_def.io_manager_key resource_config = resolved_run_config.resources[io_manager_key].config node_handle = execution_plan.get_step_by_key(step.key).node_handle asset_info = pipeline_def.asset_layer.asset_info_for_output( node_handle=node_handle, output_name=step_output.name ) if step_context: check.invariant( not resources, ( "Expected either resources or step context to be set, but " "received both. If step context is provided, resources for IO manager will be " "retrieved off of that." ), ) resources = build_resources_for_manager(io_manager_key, step_context) return OutputContext( step_key=step_output_handle.step_key, name=step_output_handle.output_name, pipeline_name=pipeline_def.name, run_id=run_id, metadata=output_def.metadata, mapping_key=step_output_handle.mapping_key, config=output_config, op_def=pipeline_def.get_solid(step.node_handle).definition, # type: ignore # (should be OpDefinition not NodeDefinition) dagster_type=output_def.dagster_type, log_manager=log_manager, version=version, step_context=step_context, resource_config=resource_config, resources=resources, asset_info=asset_info, warn_on_step_context_use=warn_on_step_context_use, ) def step_output_version( pipeline_def: "PipelineDefinition", execution_plan: "ExecutionPlan", resolved_run_config: "ResolvedRunConfig", step_output_handle: "StepOutputHandle", ) -> Optional[str]: from dagster._core.execution.resolve_versions import resolve_step_output_versions step_output_versions = resolve_step_output_versions( pipeline_def, execution_plan, resolved_run_config ) return ( step_output_versions[step_output_handle] if step_output_handle in step_output_versions else None )
[docs]def build_output_context( step_key: Optional[str] = None, name: Optional[str] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, run_id: Optional[str] = None, mapping_key: Optional[str] = None, config: Optional[Any] = None, dagster_type: Optional["DagsterType"] = None, version: Optional[str] = None, resource_config: Optional[Mapping[str, object]] = None, resources: Optional[Mapping[str, object]] = None, op_def: Optional["OpDefinition"] = None, asset_key: Optional[Union[AssetKey, str]] = None, partition_key: Optional[str] = None, ) -> "OutputContext": """Builds output context from provided parameters. ``build_output_context`` can be used as either a function, or a context manager. If resources that are also context managers are provided, then ``build_output_context`` must be used as a context manager. Args: step_key (Optional[str]): The step_key for the compute step that produced the output. name (Optional[str]): The name of the output that produced the output. metadata (Optional[Mapping[str, Any]]): A dict of the metadata that is assigned to the OutputDefinition that produced the output. mapping_key (Optional[str]): The key that identifies a unique mapped output. None for regular outputs. config (Optional[Any]): The configuration for the output. dagster_type (Optional[DagsterType]): The type of this output. version (Optional[str]): (Experimental) The version of the output. resource_config (Optional[Mapping[str, Any]]): The resource config to make available from the input context. This usually corresponds to the config provided to the resource that loads the output manager. resources (Optional[Resources]): The resources to make available from the context. For a given key, you can provide either an actual instance of an object, or a resource definition. op_def (Optional[OpDefinition]): The definition of the op that produced the output. asset_key: Optional[Union[AssetKey, Sequence[str], str]]: The asset key corresponding to the output. partition_key: Optional[str]: String value representing partition key to execute with. Examples: .. code-block:: python build_output_context() with build_output_context(resources={"foo": context_manager_resource}) as context: do_something """ from dagster._core.definitions import OpDefinition from dagster._core.execution.context_creation_pipeline import initialize_console_manager from dagster._core.types.dagster_type import DagsterType step_key = check.opt_str_param(step_key, "step_key") name = check.opt_str_param(name, "name") metadata = check.opt_mapping_param(metadata, "metadata", key_type=str) run_id = check.opt_str_param(run_id, "run_id", default=RUN_ID_PLACEHOLDER) mapping_key = check.opt_str_param(mapping_key, "mapping_key") dagster_type = check.opt_inst_param(dagster_type, "dagster_type", DagsterType) version = check.opt_str_param(version, "version") resource_config = check.opt_mapping_param(resource_config, "resource_config", key_type=str) resources = check.opt_mapping_param(resources, "resources", key_type=str) op_def = check.opt_inst_param(op_def, "op_def", OpDefinition) asset_key = AssetKey.from_coerceable(asset_key) if asset_key else None partition_key = check.opt_str_param(partition_key, "partition_key") return OutputContext( step_key=step_key, name=name, pipeline_name=None, run_id=run_id, metadata=metadata, mapping_key=mapping_key, config=config, dagster_type=dagster_type, log_manager=initialize_console_manager(None), version=version, resource_config=resource_config, resources=resources, step_context=None, op_def=op_def, asset_info=AssetOutputInfo(key=asset_key) if asset_key else None, partition_key=partition_key, )