Source code for dagster._core.storage.root_input_manager

from abc import abstractmethod
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Callable, Optional, Union, cast, overload

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.definition_config_schema import (
    CoercableToConfigSchema,
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from dagster._core.definitions.resource_definition import (
    ResourceDefinition,
    ResourceFunction,
    has_at_least_one_parameter,
)
from dagster._core.storage.input_manager import IInputManagerDefinition, InputLoadFn, InputManager
from dagster._utils.backcompat import deprecation_warning

if TYPE_CHECKING:
    from dagster._core.execution.context.input import InputContext


[docs]class RootInputManagerDefinition(ResourceDefinition, IInputManagerDefinition): """Definition of a root input manager resource. Root input managers load op inputs that aren't connected to upstream outputs. An RootInputManagerDefinition is a :py:class:`ResourceDefinition` whose resource_fn returns an :py:class:`RootInputManager`. The easiest way to create an RootInputManagerDefinition is with the :py:func:`@root_input_manager <root_input_manager>` decorator. """ _input_config_schema: IDefinitionConfigSchema def __init__( self, resource_fn: ResourceFunction, config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ): self._input_config_schema = convert_user_facing_definition_config_schema( input_config_schema ) super(RootInputManagerDefinition, self).__init__( resource_fn=resource_fn, config_schema=config_schema, description=description, required_resource_keys=required_resource_keys, version=version, ) @property def input_config_schema(self) -> IDefinitionConfigSchema: return self._input_config_schema def copy_for_configured( self, description: Optional[str], config_schema: CoercableToConfigSchema ) -> "RootInputManagerDefinition": return RootInputManagerDefinition( config_schema=config_schema, description=description or self.description, resource_fn=self.resource_fn, required_resource_keys=self.required_resource_keys, input_config_schema=self.input_config_schema, )
[docs]class RootInputManager(InputManager): """RootInputManagers are used to load inputs to ops at the root of a job. The easiest way to define an RootInputManager is with the :py:func:`@root_input_manager <root_input_manager>` decorator. """ @abstractmethod def load_input(self, context: "InputContext") -> object: """The user-defined read method that loads data given its metadata. Args: context (InputContext): The context of the step output that produces this asset. Returns: Any: The data object. """
@overload def root_input_manager( config_schema: InputLoadFn, ) -> RootInputManagerDefinition: ... @overload def root_input_manager( config_schema: Optional[CoercableToConfigSchema] = ..., description: Optional[str] = ..., input_config_schema: Optional[CoercableToConfigSchema] = ..., required_resource_keys: Optional[AbstractSet[str]] = ..., version: Optional[str] = ..., ) -> Callable[[InputLoadFn], RootInputManagerDefinition]: ...
[docs]@experimental def root_input_manager( config_schema: Optional[Union[InputLoadFn, CoercableToConfigSchema]] = None, description: Optional[str] = None, input_config_schema: CoercableToConfigSchema = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Callable[[InputLoadFn], RootInputManagerDefinition]: """Define a root input manager. Root input managers load op inputs that aren't connected to upstream outputs. The decorated function should accept a :py:class:`InputContext` and resource config, and return a loaded object that will be passed into one of the inputs of an op. The decorator produces an :py:class:`RootInputManagerDefinition`. Args: config_schema (Optional[ConfigSchema]): The schema for the resource-level config. If not set, Dagster will accept any config provided. description (Optional[str]): A human-readable description of the resource. input_config_schema (Optional[ConfigSchema]): A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided. required_resource_keys (Optional[Set[str]]): Keys for the resources required by the input manager. version (Optional[str]): (Experimental) the version of the input manager definition. **Examples:** .. code-block:: python from dagster import root_input_manager, op, job, In @root_input_manager def csv_loader(_): return read_csv("some/path") @op(ins={"input1": In(root_manager_key="csv_loader_key")}) def my_op(_, input1): do_stuff(input1) @job(resource_defs={"csv_loader_key": csv_loader}) def my_job(): my_op() @root_input_manager(config_schema={"base_dir": str}) def csv_loader(context): return read_csv(context.resource_config["base_dir"] + "/some/path") @root_input_manager(input_config_schema={"path": str}) def csv_loader(context): return read_csv(context.config["path"]) """ deprecation_warning( "root_input_manager", "1.0.0", additional_warn_txt="Use an InputManager instead.", ) if callable(config_schema) and not is_callable_valid_config_arg(config_schema): return _InputManagerDecoratorCallable()(config_schema) def _wrap(load_fn: InputLoadFn) -> RootInputManagerDefinition: return _InputManagerDecoratorCallable( config_schema=cast(CoercableToConfigSchema, config_schema), description=description, version=version, input_config_schema=input_config_schema, required_resource_keys=required_resource_keys, )(load_fn) return _wrap
class RootInputManagerWrapper(RootInputManager): def __init__(self, load_fn: InputLoadFn): self._load_fn = load_fn def load_input(self, context: "InputContext") -> object: # type-ignore because function being used as attribute return self._load_fn(context) if has_at_least_one_parameter(self._load_fn) else self._load_fn() # type: ignore class _InputManagerDecoratorCallable: def __init__( self, config_schema: CoercableToConfigSchema = None, description: Optional[str] = None, version: Optional[str] = None, input_config_schema: CoercableToConfigSchema = None, required_resource_keys: Optional[AbstractSet[str]] = None, ): self.config_schema = config_schema self.description = check.opt_str_param(description, "description") self.version = check.opt_str_param(version, "version") self.input_config_schema = input_config_schema self.required_resource_keys = required_resource_keys def __call__(self, load_fn: InputLoadFn) -> RootInputManagerDefinition: check.callable_param(load_fn, "load_fn") def _resource_fn(_): return RootInputManagerWrapper(load_fn) root_input_manager_def = RootInputManagerDefinition( resource_fn=_resource_fn, config_schema=self.config_schema, description=self.description, version=self.version, input_config_schema=self.input_config_schema, required_resource_keys=self.required_resource_keys, ) # `update_wrapper` typing cannot currently handle a Union of Callables correctly update_wrapper(root_input_manager_def, wrapped=load_fn) # type: ignore return root_input_manager_def