Source code for dagster._core.definitions.materialize

from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Set, Union

import dagster._check as check
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster._utils.merger import merge_dicts

from ..errors import DagsterInvariantViolationError
from ..instance import DagsterInstance
from ..storage.io_manager import IOManagerDefinition
from ..storage.mem_io_manager import mem_io_manager
from .assets import AssetsDefinition
from .source_asset import SourceAsset

if TYPE_CHECKING:
    from ..execution.execute_in_process_result import ExecuteInProcessResult


[docs]def materialize( assets: Sequence[Union[AssetsDefinition, SourceAsset]], run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, partition_key: Optional[str] = None, raise_on_error: bool = True, tags: Optional[Mapping[str, str]] = None, ) -> "ExecuteInProcessResult": """Executes a single-threaded, in-process run which materializes provided assets. By default, will materialize assets to the local filesystem. Args: assets (Sequence[Union[AssetsDefinition, SourceAsset]]): The assets to materialize. Can also provide :py:class:`SourceAsset` objects to fill dependencies for asset defs. resources (Optional[Mapping[str, object]]): The resources needed for execution. Can provide resource instances directly, or resource definitions. Note that if provided resources conflict with resources directly on assets, an error will be thrown. run_config (Optional[Any]): The run config to use for the run that materializes the assets. partition_key: (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config. tags (Optional[Mapping[str, str]]): Tags for the run. Returns: ExecuteInProcessResult: The result of the execution. """ from dagster._core.definitions.definitions_class import Definitions assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) all_executable_keys: Set[AssetKey] = set() for asset in assets: if isinstance(asset, AssetsDefinition): all_executable_keys = all_executable_keys.union(set(asset.keys)) JOB_NAME = "__ephemeral_asset_job__" defs = Definitions(jobs=[define_asset_job(name=JOB_NAME)], assets=assets, resources=resources) return check.not_none( defs.get_job_def(JOB_NAME), "This should always return a job", ).execute_in_process( run_config=run_config, instance=instance, partition_key=partition_key, raise_on_error=raise_on_error, tags=tags, )
[docs]def materialize_to_memory( assets: Sequence[Union[AssetsDefinition, SourceAsset]], run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, partition_key: Optional[str] = None, raise_on_error: bool = True, tags: Optional[Mapping[str, str]] = None, ) -> "ExecuteInProcessResult": """Executes a single-threaded, in-process run which materializes provided assets in memory. Will explicitly use :py:func:`mem_io_manager` for all required io manager keys. If any io managers are directly provided using the `resources` argument, a :py:class:`DagsterInvariantViolationError` will be thrown. Args: assets (Sequence[Union[AssetsDefinition, SourceAsset]]): The assets to materialize. Can also provide :py:class:`SourceAsset` objects to fill dependencies for asset defs. run_config (Optional[Any]): The run config to use for the run that materializes the assets. resources (Optional[Mapping[str, object]]): The resources needed for execution. Can provide resource instances directly, or resource definitions. If provided resources conflict with resources directly on assets, an error will be thrown. partition_key: (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config. tags (Optional[Mapping[str, str]]): Tags for the run. Returns: ExecuteInProcessResult: The result of the execution. """ assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) # Gather all resource defs for the purpose of checking io managers. resources_dict = resources or {} all_resource_keys = set(resources_dict.keys()) for asset in assets: all_resource_keys = all_resource_keys.union(asset.resource_defs.keys()) io_manager_keys = _get_required_io_manager_keys(assets) for io_manager_key in io_manager_keys: if io_manager_key in all_resource_keys: raise DagsterInvariantViolationError( "Attempted to call `materialize_to_memory` with a resource " f"provided for io manager key '{io_manager_key}'. Do not " "provide resources for io manager keys when calling " "`materialize_to_memory`, as it will override io management " "behavior for all keys." ) resource_defs = merge_dicts({key: mem_io_manager for key in io_manager_keys}, resources_dict) return materialize( assets=assets, run_config=run_config, resources=resource_defs, instance=instance, partition_key=partition_key, raise_on_error=raise_on_error, tags=tags, )
def _get_required_io_manager_keys( assets: Sequence[Union[AssetsDefinition, SourceAsset]] ) -> Set[str]: io_manager_keys = set() for asset in assets: for requirement in asset.get_resource_requirements(): if requirement.expected_type == IOManagerDefinition: io_manager_keys.add(requirement.key) return io_manager_keys