Asset sensors#

Asset sensors allow you to instigate runs when materializations occur.


Relevant APIs#

NameDescription
RunRequestThe sensor evaluation function can yield one or more run requests. Each run request creates a job run.
SkipReasonIf a sensor evaluation doesn't yield any run requests, it can instead yield a skip reason to log why the evaluation was skipped or why there were no events to be processed.
@asset_sensorThe decorator used to define an asset sensor. The decorated function is an evaluation function that takes in a SensorEvaluationContext and an asset materialization event. The decorator returns an AssetSensorDefinition
AssetSensorDefinitionA special sensor definition class for asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @asset_sensor which returns a AssetSensorDefinition
@freshness_policy_sensorThe decorator used to define a freshness policy sensor. The decorated function is an evaluation function that takes in a FreshnessPolicySensorContext. The decorator returns a FreshnessPolicySensorDefinition
FreshnessPolicySensorDefinitionA special sensor definition class for freshness policy sensors. You almost never want to use initialize this class directly. Instead, you should use the @freshness_policy_sensor which returns a FreshnessPolicySensorDefinition
@multi_asset_sensorThe decorator used to define an asset sensor that can monitor multiple assets at a time. The decorated function is an evaluation function that takes in a MultiAssetSensorEvaluationContext which has methods to fetch materialization events for the monitored assets. The decorator returns an MultiAssetSensorDefinition
MultiAssetSensorDefinitionA special sensor definition class for multi asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @multi_asset_sensor which returns a MultiAssetSensorDefinition
MultiAssetSensorEvaluationContextThe context object passed to a multi asset sensor evaluation function. Has methods for fetching materialization events for assets
build_multi_asset_sensor_contextA function that constructs an instance of MultiAssetSensorEvaluationContext, This is intended to be used to test a multi asset sensor.
build_asset_reconciliation_sensorA function that constructs a sensor that will automatically keep a set of assets updated when their parents are materialized.

Defining an asset sensor#

An asset sensor checks for new AssetMaterialization events for a particular asset key. This can be used to kick off a job that computes downstream assets or notifies appropriate stakeholders.

One benefit of this pattern is that it enables cross-job and even cross-code-location dependencies. Each job run instigated by an asset sensor is agnostic to the job that caused it.

Dagster provides a special asset sensor definition format for sensors that fire a single RunRequest based on a single asset materialization. Here is an example of a sensor that generates a RunRequest for every materialization for the asset key my_table:

from dagster import AssetKey, EventLogEntry, SensorEvaluationContext, asset_sensor


@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
    yield RunRequest(
        run_key=context.cursor,
        run_config={
            "ops": {
                "read_materialization": {
                    "config": {
                        "asset_key": asset_event.dagster_event.asset_key.path,
                    }
                }
            }
        },
    )

Freshness policy sensors#

Freshness policy sensors are experimental.

A freshness policy sensor checks the freshness of a given selection of assets on each tick, and performs some action in response to that status.

FreshnessPolicySensorContext has a current_minutes_late property, specifying how many minutes late the asset is with respect to its FreshnessPolicy, as well as previous_minutes_late, the number of minutes late that asset was on the previous sensor tick. Each tick, the decorated function will be run for each asset within the asset_selection that has a FreshnessPolicy defined.

Currently, freshness policy sensors do not support returning or yielding values (such as RunRequests) from their execution function.

Here is an example of a sensor that will send a single alert once an asset is 10 minutes later than its configured policy allows, and a single alert once that asset is on time again.

from dagster import FreshnessPolicySensorContext, freshness_policy_sensor


@freshness_policy_sensor(asset_selection=AssetSelection.all())
def my_freshness_alerting_sensor(context: FreshnessPolicySensorContext):
    if context.minutes_late is None or context.previous_minutes_late is None:
        return

    if context.minutes_late >= 10 and context.previous_minutes_late < 10:
        send_alert(
            f"Asset with key {context.asset_key} is now more than 10 minutes late."
        )
    elif context.minutes_late == 0 and context.previous_minutes_late >= 10:
        send_alert(f"Asset with key {context.asset_key} is now on time.")

Multi-asset sensors#

Multi-asset sensors are experimental.

Triggering runs upon materializations#

Multi-asset sensors, which can trigger job executions based on multiple asset materialization event streams, can be handled using the @multi_asset_sensor decorator.

In the body of the sensor, you have access to the materialization event records for each asset via the MultiAssetSensorEvaluationContext context object. Methods within the context object will search for events after the cursor.

@multi_asset_sensor(
    monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
    job=my_job,
)
def asset_a_and_b_sensor(context):
    asset_events = context.latest_materialization_records_by_key()
    if all(asset_events.values()):
        context.advance_all_cursors()
        return RunRequest()

Note the context.advance_all_cursors call near the end of the sensor. The cursor helps keep track of which materialization events have been processed by the sensor so that the next time the sensor runs, only newer events are fetched. Since multi_asset_sensors provide flexibility to determine what conditions should result in RunRequests, the sensor must manually update the cursor if a RunRequest is returned.

You can also return a SkipReason to document why the sensor didn't launch a run.

@multi_asset_sensor(
    monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
    job=my_job,
)
def asset_a_and_b_sensor_with_skip_reason(context):
    asset_events = context.latest_materialization_records_by_key()
    if all(asset_events.values()):
        context.advance_all_cursors()
        return RunRequest()
    elif any(asset_events.values()):
        materialized_asset_key_strs = [
            key.to_user_string() for key, value in asset_events.items() if value
        ]
        not_materialized_asset_key_strs = [
            key.to_user_string() for key, value in asset_events.items() if not value
        ]
        return SkipReason(
            f"Observed materializations for {materialized_asset_key_strs}, "
            f"but not for {not_materialized_asset_key_strs}"
        )

Triggering runs upon partitioned materializations#

You can use the MultiAssetSensorEvaluationContext context object to fetch partitioned materializations for each monitored asset.

When all partitions have new materializations#

The example below monitors two assets with the same daily partitions definition. Whenever both monitored assets have an unconsumed materialization for a given partition, the sensor kicks off a run for that partition.

@multi_asset_sensor(
    monitored_assets=[
        AssetKey("upstream_daily_1"),
        AssetKey("upstream_daily_2"),
    ],
    job=downstream_daily_job,
)
def trigger_daily_asset_if_both_upstream_partitions_materialized(context):
    run_requests = []
    for (
        partition,
        materializations_by_asset,
    ) in context.latest_materialization_records_by_partition_and_asset().items():
        if set(materializations_by_asset.keys()) == set(context.asset_keys):
            run_requests.append(
                downstream_daily_job.run_request_for_partition(partition)
            )
            for asset_key, materialization in materializations_by_asset.items():
                context.advance_cursor({asset_key: materialization})
    return run_requests

Notice the context.advance_cursor call marks specific materializations as consumed. In future context calls, these materializations will not be returned.

At the end of each tick, the cursor object evaluates the set of consumed materializations. For each monitored asset, the cursor automatically stores:

  • latest_consumed_event_id, the ID of the newest materialization that was marked as consumed.
  • trailing_unconsumed_partitioned_event_ids, the newest unconsumed event ID for up to 25 partitions of each asset. All of these events must be partitioned and older than the latest_consumed_event_id event for the asset.

The events in trailing_unconsumed_partitioned_event_ids will appear in future context calls until they are marked as consumed via a call to advance_cursor. A call to advance_all_cursors will also mark all existing events as consumed.

When any partitions have new materializations#

The following example monitors two upstream daily-partitioned assets, kicking off materializations of the corresponding partition in the downstream daily-partitioned asset. After a partition has been materialized for both upstream assets, the downstream asset can then be materialized and the sensor kicks off a partitioned run. Thereafter, whenever either upstream partition has a new materialization, the sensor will yield a run request for the downstream asset.

@multi_asset_sensor(
    monitored_assets=[
        AssetKey("upstream_daily_1"),
        AssetKey("upstream_daily_2"),
    ],
    job=downstream_daily_job,
)
def trigger_daily_asset_when_any_upstream_partitions_have_new_materializations(context):
    run_requests = []
    for (
        partition,
        materializations_by_asset,
    ) in context.latest_materialization_records_by_partition_and_asset().items():
        if all(
            [
                context.all_partitions_materialized(asset_key, [partition])
                for asset_key in context.asset_keys
            ]
        ):
            run_requests.append(
                downstream_daily_job.run_request_for_partition(partition)
            )
            for asset_key, materialization in materializations_by_asset.items():
                if asset_key in context.asset_keys:
                    context.advance_cursor({asset_key: materialization})
    return run_requests

Looking for more? The examples section features another example of updating a weekly asset when upstream daily assets are materialized.


Asset reconciliation sensors#

Asset reconciliation sensors are experimental.

Dagster provides an out-of-box sensor that will monitor a provided set of assets and automatically materialize them if they're "unreconciled". An asset is considered "unreconciled" if any of:

  • The sensor has never tried to materialize it and it has never been materialized.
  • Any of its parents have been materialized more recently than it has.
  • Any of its parents are unreconciled.
  • It is not up to date with respect to its FreshnessPolicy (see Freshness-based scheduling).

The sensor materializes unreconciled assets and avoids materializing assets before their parents are reconciled.

You can create this sensor using the build_asset_reconciliation_sensor function. This sensor can help you keep your assets up-to-date without needing to write custom schedules or sensors for each asset.

defs = Definitions(
    assets=[asset1, asset2, asset3, asset4],
    sensors=[
        build_asset_reconciliation_sensor(
            asset_selection=AssetSelection.assets(asset1, asset3, asset4),
            name="asset_reconciliation_sensor",
        ),
    ],
)

This sensor will materialize each selected asset asset1, asset3, and asset4 whenever any of its parents materialize.

With partitioned assets#

The asset reconciliation sensor automatically materializes any "unreconciled" partitions, following the same conditions for reconciliation as un-partitioned assets. When all upstream partition dependencies are materialized for a given partition, the asset reconciliation sensor will trigger materializations for any downstream asset partitions. Upstream partition materializations thereafter will trigger materializations downstream for all dependencies.

In the following asset graph, each asset has partitions [a, b, c]. Say that asset1, asset3, and asset4 are selected in the asset reconciliation sensor, and are currently unreconciled:

Partitioned asset graph

Because each partition of asset1 is currently unreconciled, the sensor will immediately trigger materializations for each partition of asset1. Because asset3 depends on asset2 and no partitions have been materialized in asset2, the sensor will not trigger materializations for asset3.

If we materialize partition "a" of asset2, each upstream dependency of partition "a" of asset3 will be materialized. Because these partitions were materialized more recently than asset3, the sensor will materialize partition "a" of asset3. Since partition "a" of asset4 is downstream and has no other dependencies, the sensor will materialize this partition in the same run. In the following tick, the sensor recognizes it triggered a materialization of asset4 and thus does not trigger any materializations.

With freshness-based scheduling#

This feature is currently experimental.

Setting a FreshnessPolicy on an asset specifies how up-to-date the data in that asset is expected to be. The asset reconciliation sensor created using build_asset_reconciliation_sensor will kick off runs to keep that data in line with the defined policy.

To minimize unnecessary computation, the sensor will take into account a few bits of information when determining if it needs to kick off a run for a given asset:

  • A timestamp of the upstream data that is incorporated into the current state of the asset. This is calculated by finding the most recent materialization of that asset, then recursively finding the most recent materializations of its parents that happened before that event. The minimum timestamp found this way will be used as the current data time.
  • Any in-progress runs that will update that asset in the near future. If an in-progress run will update an asset to an acceptable data time, then a new run will not be kicked off.
  • The constraints specifying what upstream data must be incorporated into the state of an asset by certain times. If waiting some amount of time to kick off a materialization of an asset will allow multiple constraints to be satisfied at once, then the asset will not be materialized on that tick.

The sensor will only kick off runs for assets specified in the asset_selection argument of build_asset_reconciliation_sensor. If an asset in this selection is downstream of an asset outside of the selection, the sensor will not kick off a run of the selected asset until a materialization event for the upstream asset is created.


Examples#

Monitoring daily assets to materialize a weekly asset using a multi_asset_sensor#

The functionality demonstrated by this custom sensor is identical to the functionality provided by build_asset_reconciliation_sensor. However, this example provides a starting point for those wishing to customize this behavior beyond what the pre-built sensor supports.

The following code example monitors an upstream daily-partitioned asset, kicking off materializations of a downstream weekly-partitioned asset whenever a daily partition is materialized and all daily partitions in the week have existing materializations. Every time a daily partition has a new materialization, the weekly partition will materialize.

MultiAssetSensorEvaluationContext.all_partitions_materialized is a utility method accepts a list of partitions and checks if every provided partition has been materialized. This method ignores the cursor, so it searches through all existing materializations.

If the PartitionsDefinition of the monitored assets differs from the triggered asset, you can use the MultiAssetSensorEvaluationContext.get_downstream_partition_keys method to map a partition key from one asset to another. This method accepts a partition key from the upstream asset and uses the existing PartitionMapping object on the downstream asset to fetch the corresponding partitions in the downstream asset.

If a partition mapping is not defined, Dagster will use the default partition mapping, which is the TimeWindowPartitionMapping for time window partitions definitions and the IdentityPartitionMapping for other partitions definitions. The TimeWindowPartitionMapping will map an upstream partition to the downstream partitions that overlap with it.

@multi_asset_sensor(
    monitored_assets=[AssetKey("upstream_daily_1")], job=weekly_asset_job
)
def trigger_weekly_asset_from_daily_asset(context):
    run_requests_by_partition = {}
    materializations_by_partition = context.latest_materialization_records_by_partition(
        AssetKey("upstream_daily_1")
    )

    # Get all corresponding weekly partitions for any materialized daily partitions
    for partition, materialization in materializations_by_partition.items():
        weekly_partitions = context.get_downstream_partition_keys(
            partition,
            from_asset_key=AssetKey("upstream_daily_1"),
            to_asset_key=AssetKey("downstream_weekly_asset"),
        )

        if weekly_partitions:  # Check that a downstream weekly partition exists
            # Upstream daily partition can only map to at most one downstream weekly partition
            daily_partitions_in_week = context.get_downstream_partition_keys(
                weekly_partitions[0],
                from_asset_key=AssetKey("downstream_weekly_asset"),
                to_asset_key=AssetKey("upstream_daily_1"),
            )
            if context.all_partitions_materialized(
                AssetKey("upstream_daily_1"), daily_partitions_in_week
            ):
                run_requests_by_partition[
                    weekly_partitions[0]
                ] = weekly_asset_job.run_request_for_partition(weekly_partitions[0])
                # Advance the cursor so we only check event log records past the cursor
                context.advance_cursor({AssetKey("upstream_daily_1"): materialization})
    return list(run_requests_by_partition.values())