from functools import update_wrapper
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Iterator,
Mapping,
Optional,
Union,
cast,
overload,
)
from typing_extensions import TypeAlias
import dagster._check as check
from dagster._annotations import public
from dagster._core.decorator_utils import format_docstring_for_description
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.configurable import AnonymousConfigurableDefinition
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster._utils.backcompat import experimental_arg_warning
from ..decorator_utils import (
get_function_params,
has_at_least_one_parameter,
is_required_param,
positional_arg_name_list,
validate_expected_params,
)
from .definition_config_schema import (
CoercableToConfigSchema,
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)
from .resource_invocation import resource_invocation_result
from .resource_requirement import (
RequiresResources,
ResourceDependencyRequirement,
ResourceRequirement,
)
from .scoped_resources_builder import ( # re-exported
IContainsGenerator as IContainsGenerator,
Resources as Resources,
ScopedResourcesBuilder as ScopedResourcesBuilder,
)
if TYPE_CHECKING:
from dagster._core.execution.resources_init import InitResourceContext
ResourceFunctionWithContext: TypeAlias = Callable[["InitResourceContext"], Any]
ResourceFunctionWithoutContext: TypeAlias = Callable[[], Any]
ResourceFunction: TypeAlias = Union[
ResourceFunctionWithContext,
ResourceFunctionWithoutContext,
]
[docs]class ResourceDefinition(AnonymousConfigurableDefinition, RequiresResources):
"""Core class for defining resources.
Resources are scoped ways to make external resources (like database connections) available to
during job execution and to clean up after execution resolves.
If resource_fn yields once rather than returning (in the manner of functions decorable with
:py:func:`@contextlib.contextmanager <python:contextlib.contextmanager>`) then the body of the
function after the yield will be run after execution resolves, allowing users to write their
own teardown/cleanup logic.
Depending on your executor, resources may be instantiated and cleaned up more than once in a
job execution.
Args:
resource_fn (Callable[[InitResourceContext], Any]): User-provided function to instantiate
the resource, which will be made available to executions keyed on the
``context.resources`` object.
config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check
that config provided for the resource matches this schema and fail if it does not. If
not set, Dagster will accept any config provided for the resource.
description (Optional[str]): A human-readable description of the resource.
required_resource_keys: (Optional[Set[str]]) Keys for the resources required by this
resource. A DagsterInvariantViolationError will be raised during initialization if
dependencies are cyclic.
version (Optional[str]): (Experimental) The version of the resource's definition fn. Two
wrapped resource functions should only have the same version if they produce the same
resource definition when provided with the same inputs.
"""
def __init__(
self,
resource_fn: ResourceFunction,
config_schema: CoercableToConfigSchema = None,
description: Optional[str] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
):
self._resource_fn = check.callable_param(resource_fn, "resource_fn")
self._config_schema = convert_user_facing_definition_config_schema(config_schema)
self._description = check.opt_str_param(description, "description")
self._required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys"
)
self._version = check.opt_str_param(version, "version")
if version:
experimental_arg_warning("version", "ResourceDefinition.__init__")
@property
def resource_fn(self) -> ResourceFunction:
return self._resource_fn
@property
def config_schema(self) -> IDefinitionConfigSchema:
return self._config_schema
@public
@property
def description(self) -> Optional[str]:
return self._description
@public
@property
def version(self) -> Optional[str]:
return self._version
@public
@property
def required_resource_keys(self) -> AbstractSet[str]:
return self._required_resource_keys
[docs] @public
@staticmethod
def none_resource(description: Optional[str] = None) -> "ResourceDefinition":
"""A helper function that returns a none resource.
Args:
description ([Optional[str]]): The description of the resource. Defaults to None.
Returns:
[ResourceDefinition]: A resource that does nothing.
"""
return ResourceDefinition.hardcoded_resource(value=None, description=description)
[docs] @public
@staticmethod
def hardcoded_resource(value: Any, description: Optional[str] = None) -> "ResourceDefinition":
"""A helper function that creates a ``ResourceDefinition`` with a hardcoded object.
Args:
value (Any): The value that will be accessible via context.resources.resource_name.
description ([Optional[str]]): The description of the resource. Defaults to None.
Returns:
[ResourceDefinition]: A hardcoded resource.
"""
return ResourceDefinition(resource_fn=lambda _init_context: value, description=description)
[docs] @public
@staticmethod
def mock_resource(description: Optional[str] = None) -> "ResourceDefinition":
"""A helper function that creates a ``ResourceDefinition`` which wraps a ``mock.MagicMock``.
Args:
description ([Optional[str]]): The description of the resource. Defaults to None.
Returns:
[ResourceDefinition]: A resource that creates the magic methods automatically and helps
you mock existing resources.
"""
from unittest import mock
return ResourceDefinition(
resource_fn=lambda _init_context: mock.MagicMock(), description=description
)
@public
@staticmethod
def string_resource(description: Optional[str] = None) -> "ResourceDefinition":
return ResourceDefinition(
resource_fn=lambda init_context: init_context.resource_config,
config_schema=str,
description=description,
)
def copy_for_configured(
self,
description: Optional[str],
config_schema: CoercableToConfigSchema,
) -> "ResourceDefinition":
return ResourceDefinition(
config_schema=config_schema,
description=description or self.description,
resource_fn=self.resource_fn,
required_resource_keys=self.required_resource_keys,
version=self.version,
)
def __call__(self, *args, **kwargs):
from dagster._core.execution.context.init import UnboundInitResourceContext
if has_at_least_one_parameter(self.resource_fn):
if len(args) + len(kwargs) == 0:
raise DagsterInvalidInvocationError(
"Resource initialization function has context argument, but no context was"
" provided when invoking."
)
if len(args) + len(kwargs) > 1:
raise DagsterInvalidInvocationError(
"Initialization of resource received multiple arguments. Only a first "
"positional context parameter should be provided when invoking."
)
context_param_name = get_function_params(self.resource_fn)[0].name
if args:
check.opt_inst_param(args[0], context_param_name, UnboundInitResourceContext)
return resource_invocation_result(
self, cast(Optional[UnboundInitResourceContext], args[0])
)
else:
if context_param_name not in kwargs:
raise DagsterInvalidInvocationError(
f"Resource initialization expected argument '{context_param_name}'."
)
check.opt_inst_param(
kwargs[context_param_name], context_param_name, UnboundInitResourceContext
)
return resource_invocation_result(
self, cast(Optional[UnboundInitResourceContext], kwargs[context_param_name])
)
elif len(args) + len(kwargs) > 0:
raise DagsterInvalidInvocationError(
"Attempted to invoke resource with argument, but underlying function has no context"
" argument. Either specify a context argument on the resource function, or remove"
" the passed-in argument."
)
else:
return resource_invocation_result(self, None)
def get_resource_requirements(
self, outer_context: Optional[object] = None
) -> Iterator[ResourceRequirement]:
source_key = cast(str, outer_context)
for resource_key in sorted(list(self.required_resource_keys)):
yield ResourceDependencyRequirement(key=resource_key, source_key=source_key)
class _ResourceDecoratorCallable:
def __init__(
self,
config_schema: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
):
self.config_schema = config_schema # checked by underlying definition
self.description = check.opt_str_param(description, "description")
self.version = check.opt_str_param(version, "version")
self.required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys"
)
def __call__(self, resource_fn: ResourceFunction) -> ResourceDefinition:
check.callable_param(resource_fn, "resource_fn")
any_name = ["*"] if has_at_least_one_parameter(resource_fn) else []
params = get_function_params(resource_fn)
missing_positional = validate_expected_params(params, any_name)
if missing_positional:
raise DagsterInvalidDefinitionError(
f"@resource decorated function '{resource_fn.__name__}' expects a single "
"positional argument."
)
extras = params[len(any_name) :]
required_extras = list(filter(is_required_param, extras))
if required_extras:
raise DagsterInvalidDefinitionError(
f"@resource decorated function '{resource_fn.__name__}' expects only a single"
" positional required argument. Got required extra params"
f" {', '.join(positional_arg_name_list(required_extras))}"
)
resource_def = ResourceDefinition(
resource_fn=resource_fn,
config_schema=self.config_schema,
description=self.description or format_docstring_for_description(resource_fn),
version=self.version,
required_resource_keys=self.required_resource_keys,
)
# `update_wrapper` typing cannot currently handle a Union of Callables correctly
update_wrapper(resource_def, wrapped=resource_fn) # type: ignore
return resource_def
@overload
def resource(config_schema: ResourceFunction) -> ResourceDefinition:
...
@overload
def resource(
config_schema: CoercableToConfigSchema = ...,
description: Optional[str] = ...,
required_resource_keys: Optional[AbstractSet[str]] = ...,
version: Optional[str] = ...,
) -> Callable[[ResourceFunction], "ResourceDefinition"]:
...
[docs]def resource(
config_schema: Union[ResourceFunction, CoercableToConfigSchema] = None,
description: Optional[str] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
) -> Union[Callable[[ResourceFunction], "ResourceDefinition"], "ResourceDefinition"]:
"""Define a resource.
The decorated function should accept an :py:class:`InitResourceContext` and return an instance of
the resource. This function will become the ``resource_fn`` of an underlying
:py:class:`ResourceDefinition`.
If the decorated function yields once rather than returning (in the manner of functions
decorable with :py:func:`@contextlib.contextmanager <python:contextlib.contextmanager>`) then
the body of the function after the yield will be run after execution resolves, allowing users
to write their own teardown/cleanup logic.
Args:
config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in
`init_context.resource_config`. If not set, Dagster will accept any config provided.
description(Optional[str]): A human-readable description of the resource.
version (Optional[str]): (Experimental) The version of a resource function. Two wrapped
resource functions should only have the same version if they produce the same resource
definition when provided with the same inputs.
required_resource_keys (Optional[Set[str]]): Keys for the resources required by this resource.
"""
# This case is for when decorator is used bare, without arguments.
# E.g. @resource versus @resource()
if callable(config_schema) and not is_callable_valid_config_arg(config_schema):
return _ResourceDecoratorCallable()(config_schema)
def _wrap(resource_fn: ResourceFunction) -> "ResourceDefinition":
return _ResourceDecoratorCallable(
config_schema=cast(Optional[Dict[str, Any]], config_schema),
description=description,
required_resource_keys=required_resource_keys,
version=version,
)(resource_fn)
return _wrap
[docs]def make_values_resource(**kwargs: Any) -> ResourceDefinition:
"""A helper function that creates a ``ResourceDefinition`` to take in user-defined values.
This is useful for sharing values between ops.
Args:
**kwargs: Arbitrary keyword arguments that will be passed to the config schema of the
returned resource definition. If not set, Dagster will accept any config provided for
the resource.
For example:
.. code-block:: python
@op(required_resource_keys={"globals"})
def my_op(context):
print(context.resources.globals["my_str_var"])
@job(resource_defs={"globals": make_values_resource(my_str_var=str, my_int_var=int)})
def my_job():
my_op()
Returns:
ResourceDefinition: A resource that passes in user-defined values.
"""
return ResourceDefinition(
resource_fn=lambda init_context: init_context.resource_config,
config_schema=kwargs or Any,
)