from abc import ABC, abstractmethod
from typing import Any, Callable, Optional, TypeVar, Union
from typing_extensions import Self
from dagster import (
Field,
_check as check,
)
from dagster._config import EvaluateValueResult
from dagster._config.config_schema import UserConfigSchema
from .definition_config_schema import (
CoercableToConfigSchema,
ConfiguredDefinitionConfigSchema,
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)
class ConfigurableDefinition(ABC):
@property
@abstractmethod
def config_schema(self) -> Optional[IDefinitionConfigSchema]:
raise NotImplementedError()
@property
def has_config_field(self) -> bool:
return self.config_schema is not None and bool(self.config_schema.as_field())
@property
def config_field(self) -> Optional[Field]:
return None if not self.config_schema else self.config_schema.as_field()
# getter for typed access
def get_config_field(self) -> Field:
field = self.config_field
if field is None:
check.failed("Must check has_config_Field before calling get_config_field")
return field
def apply_config_mapping(self, config: Any) -> EvaluateValueResult:
"""Applies user-provided config mapping functions to the given configuration and validates the
results against the respective config schema.
Expects incoming config to be validated and have fully-resolved values (StringSource values
resolved, Enum types hydrated, etc.) via process_config() during ResolvedRunConfig
construction and CompositeSolid config mapping.
Args:
config (Any): A validated and resolved configuration dictionary matching this object's
config_schema
Returns (EvaluateValueResult):
If successful, the value is a validated and resolved configuration dictionary for the
innermost wrapped object after applying the config mapping transformation function.
"""
# If schema is on a mapped schema this is the innermost resource (base case),
# so we aren't responsible for validating against anything farther down.
# Returns an EVR for type consistency with config_mapping_fn.
return (
self.config_schema.resolve_config(config)
if isinstance(self.config_schema, ConfiguredDefinitionConfigSchema)
else EvaluateValueResult.for_value(config)
)
class AnonymousConfigurableDefinition(ConfigurableDefinition):
"""An interface that makes the `configured` method not accept a name argument."""
def configured(
self,
config_or_config_fn: Any,
config_schema: CoercableToConfigSchema = None,
description: Optional[str] = None,
) -> Self:
"""Wraps this object in an object of the same type that provides configuration to the inner
object.
Using ``configured`` may result in config values being displayed in
Dagit, so it is not recommended to use this API with sensitive values,
such as secrets.
Args:
config_or_config_fn (Union[Any, Callable[[Any], Any]]): Either (1) Run configuration
that fully satisfies this object's config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object's
config schema. In the latter case, config_schema must be specified. When
passing a function, it's easiest to use :py:func:`configured`.
config_schema (ConfigSchema): If config_or_config_fn is a function, the config schema
that its input must satisfy.
description (Optional[str]): Description of the new definition. If not specified,
inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
"""
new_config_schema = ConfiguredDefinitionConfigSchema(
self, convert_user_facing_definition_config_schema(config_schema), config_or_config_fn
)
return self.copy_for_configured(description, new_config_schema)
@abstractmethod
def copy_for_configured(
self,
description: Optional[str],
config_schema: IDefinitionConfigSchema,
) -> Self:
raise NotImplementedError()
class NamedConfigurableDefinition(ConfigurableDefinition):
"""An interface that makes the `configured` method require a positional `name` argument."""
def configured(
self,
config_or_config_fn: Any,
name: str,
config_schema: Optional[UserConfigSchema] = None,
description: Optional[str] = None,
) -> Self:
"""Wraps this object in an object of the same type that provides configuration to the inner
object.
Using ``configured`` may result in config values being displayed in
Dagit, so it is not recommended to use this API with sensitive values,
such as secrets.
Args:
config_or_config_fn (Union[Any, Callable[[Any], Any]]): Either (1) Run configuration
that fully satisfies this object's config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object's
config schema. In the latter case, config_schema must be specified. When
passing a function, it's easiest to use :py:func:`configured`.
name (str): Name of the new definition. This is a required argument, as this definition
type has a name uniqueness constraint.
config_schema (ConfigSchema): If config_or_config_fn is a function, the config schema
that its input must satisfy.
description (Optional[str]): Description of the new definition. If not specified,
inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
"""
name = check.str_param(name, "name")
new_config_schema = ConfiguredDefinitionConfigSchema(
self, convert_user_facing_definition_config_schema(config_schema), config_or_config_fn
)
return self.copy_for_configured(name, description, new_config_schema)
@abstractmethod
def copy_for_configured(
self,
name: str,
description: Optional[str],
config_schema: IDefinitionConfigSchema,
) -> Self:
...
def _check_configurable_param(configurable: ConfigurableDefinition) -> None:
from dagster._core.definitions.composition import PendingNodeInvocation
check.param_invariant(
not isinstance(configurable, PendingNodeInvocation),
"configurable",
(
"You have invoked `configured` on a PendingNodeInvocation (an intermediate type), which"
" is produced by aliasing or tagging a solid definition. To configure a solid, you must"
" call `configured` on either a SolidDefinition and CompositeSolidDefinition. To fix"
" this error, make sure to call `configured` on the definition object *before* using"
" the `tag` or `alias` methods. For usage examples, see"
" https://docs.dagster.io/concepts/configuration/configured"
),
)
check.inst_param(
configurable,
"configurable",
ConfigurableDefinition,
(
"Only the following types can be used with the `configured` method: ResourceDefinition,"
" ExecutorDefinition, CompositeSolidDefinition, SolidDefinition, and LoggerDefinition."
" For usage examples of `configured`, see"
" https://docs.dagster.io/concepts/configuration/configured"
),
)
T_Configurable = TypeVar(
"T_Configurable", bound=Union["AnonymousConfigurableDefinition", "NamedConfigurableDefinition"]
)