Source code for dagster._core.storage.event_log.base

import base64
from abc import ABC, abstractmethod
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

import dagster._check as check
from dagster._core.assets import AssetDetails
from dagster._core.definitions.events import AssetKey
from dagster._core.event_api import EventHandlerFn, EventLogRecord, EventRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.stats import (
    RunStepKeyStatsSnapshot,
    build_run_stats_from_events,
    build_run_step_stats_from_events,
)
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.storage.pipeline_run import PipelineRunStatsSnapshot
from dagster._core.storage.sql import AlembicVersion
from dagster._seven import json
from dagster._utils import PrintFn

if TYPE_CHECKING:
    from dagster._core.storage.partition_status_cache import AssetStatusCacheValue


class EventLogConnection(NamedTuple):
    records: Sequence[EventLogRecord]
    cursor: str
    has_more: bool


class EventLogCursorType(Enum):
    OFFSET = "OFFSET"
    STORAGE_ID = "STORAGE_ID"


class EventLogCursor(NamedTuple):
    """Representation of an event record cursor, keeping track of the log query state."""

    cursor_type: EventLogCursorType
    value: int

    def is_offset_cursor(self) -> bool:
        return self.cursor_type == EventLogCursorType.OFFSET

    def is_id_cursor(self) -> bool:
        return self.cursor_type == EventLogCursorType.STORAGE_ID

    def offset(self) -> int:
        check.invariant(self.cursor_type == EventLogCursorType.OFFSET)
        return max(0, int(self.value))

    def storage_id(self) -> int:
        check.invariant(self.cursor_type == EventLogCursorType.STORAGE_ID)
        return int(self.value)

    def __str__(self) -> str:
        return self.to_string()

    def to_string(self) -> str:
        raw = json.dumps({"type": self.cursor_type.value, "value": self.value})
        return base64.b64encode(bytes(raw, encoding="utf-8")).decode("utf-8")

    @staticmethod
    def parse(cursor_str: str) -> "EventLogCursor":
        raw = json.loads(base64.b64decode(cursor_str).decode("utf-8"))
        return EventLogCursor(EventLogCursorType(raw["type"]), raw["value"])

    @staticmethod
    def from_offset(offset: int) -> "EventLogCursor":
        return EventLogCursor(EventLogCursorType.OFFSET, offset)

    @staticmethod
    def from_storage_id(storage_id: int) -> "EventLogCursor":
        return EventLogCursor(EventLogCursorType.STORAGE_ID, storage_id)


class AssetEntry(
    NamedTuple(
        "_AssetEntry",
        [
            ("asset_key", AssetKey),
            ("last_materialization_record", Optional[EventLogRecord]),
            ("last_run_id", Optional[str]),
            ("asset_details", Optional[AssetDetails]),
            ("cached_status", Optional["AssetStatusCacheValue"]),
        ],
    )
):
    def __new__(
        cls,
        asset_key: AssetKey,
        last_materialization_record: Optional[EventLogRecord] = None,
        last_run_id: Optional[str] = None,
        asset_details: Optional[AssetDetails] = None,
        cached_status: Optional["AssetStatusCacheValue"] = None,
    ):
        from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

        return super(AssetEntry, cls).__new__(
            cls,
            asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
            last_materialization_record=check.opt_inst_param(
                last_materialization_record, "last_materialization_record", EventLogRecord
            ),
            last_run_id=check.opt_str_param(last_run_id, "last_run_id"),
            asset_details=check.opt_inst_param(asset_details, "asset_details", AssetDetails),
            cached_status=check.opt_inst_param(
                cached_status, "cached_status", AssetStatusCacheValue
            ),
        )

    @property
    def last_materialization(self) -> Optional["EventLogEntry"]:
        if self.last_materialization_record is None:
            return None
        return self.last_materialization_record.event_log_entry


[docs]class AssetRecord(NamedTuple): """Internal representation of an asset record, as stored in a :py:class:`~dagster._core.storage.event_log.EventLogStorage`. Users should not invoke this class directly. """ storage_id: int asset_entry: AssetEntry
[docs]class EventLogStorage(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract base class for storing structured event logs from pipeline runs. Note that event log storages using SQL databases as backing stores should implement :py:class:`~dagster._core.storage.event_log.SqlEventLogStorage`. Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when ``dagit`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class should be done by setting values in that file. """ def get_logs_for_run( self, run_id: str, cursor: Optional[Union[str, int]] = None, of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ) -> Sequence["EventLogEntry"]: """Get all of the logs corresponding to a run. Args: run_id (str): The id of the run for which to fetch logs. cursor (Optional[Union[str, int]]): Cursor value to track paginated queries. Legacy support for integer offset cursors. of_type (Optional[DagsterEventType]): the dagster event type to filter the logs. limit (Optional[int]): Max number of records to return. """ if isinstance(cursor, int): cursor = EventLogCursor.from_offset(cursor + 1).to_string() records = self.get_records_for_run(run_id, cursor, of_type, limit).records return [record.event_log_entry for record in records] @abstractmethod def get_records_for_run( self, run_id: str, cursor: Optional[str] = None, of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ) -> EventLogConnection: """Get all of the event log records corresponding to a run. Args: run_id (str): The id of the run for which to fetch logs. cursor (Optional[str]): Cursor value to track paginated queries. of_type (Optional[DagsterEventType]): the dagster event type to filter the logs. limit (Optional[int]): Max number of records to return. """ def get_stats_for_run(self, run_id: str) -> PipelineRunStatsSnapshot: """Get a summary of events that have ocurred in a run.""" return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id)) def get_step_stats_for_run( self, run_id: str, step_keys: Optional[Sequence[str]] = None ) -> Sequence[RunStepKeyStatsSnapshot]: """Get per-step stats for a pipeline run.""" logs = self.get_logs_for_run(run_id) if step_keys: logs = [ event for event in logs if event.is_dagster_event and event.get_dagster_event().step_key in step_keys ] return build_run_step_stats_from_events(run_id, logs) @abstractmethod def store_event(self, event: "EventLogEntry") -> None: """Store an event corresponding to a pipeline run. Args: event (EventLogEntry): The event to store. """ @abstractmethod def delete_events(self, run_id: str) -> None: """Remove events for a given run id.""" @abstractmethod def upgrade(self) -> None: """This method should perform any schema migrations necessary to bring an out-of-date instance of the storage up to date. """ @abstractmethod def reindex_events(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None: """Call this method to run any data migrations across the event_log tables.""" @abstractmethod def reindex_assets(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None: """Call this method to run any data migrations across the asset tables.""" @abstractmethod def wipe(self) -> None: """Clear the log storage.""" @abstractmethod def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) -> None: """Call this method to start watching.""" @abstractmethod def end_watch(self, run_id: str, handler: EventHandlerFn) -> None: """Call this method to stop watching.""" @property @abstractmethod def is_persistent(self) -> bool: """bool: Whether the storage is persistent.""" def dispose(self) -> None: """Explicit lifecycle management.""" def optimize_for_dagit(self, statement_timeout: int, pool_recycle: int) -> None: """Allows for optimizing database connection / use in the context of a long lived dagit process. """ @abstractmethod def get_event_records( self, event_records_filter: EventRecordsFilter, limit: Optional[int] = None, ascending: bool = False, ) -> Sequence[EventLogRecord]: pass def supports_event_consumer_queries(self) -> bool: return False def get_logs_for_all_runs_by_log_id( self, after_cursor: int = -1, dagster_event_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ) -> Mapping[int, "EventLogEntry"]: """Get event records across all runs. Only supported for non sharded sql storage.""" raise NotImplementedError() def get_maximum_record_id(self) -> Optional[int]: """Get the current greatest record id in the event log. Only supported for non sharded sql storage. """ raise NotImplementedError() @abstractmethod def can_cache_asset_status_data(self) -> bool: pass @abstractmethod def wipe_asset_cached_status(self, asset_key: AssetKey) -> None: pass @abstractmethod def get_asset_records( self, asset_keys: Optional[Sequence[AssetKey]] = None ) -> Sequence[AssetRecord]: pass @abstractmethod def has_asset_key(self, asset_key: AssetKey) -> bool: pass @abstractmethod def all_asset_keys(self) -> Sequence[AssetKey]: pass @abstractmethod def update_asset_cached_status_data( self, asset_key: AssetKey, cache_values: "AssetStatusCacheValue" ) -> None: pass def get_asset_keys( self, prefix: Optional[Sequence[str]] = None, limit: Optional[int] = None, cursor: Optional[str] = None, ) -> Sequence[AssetKey]: # base implementation of get_asset_keys, using the existing `all_asset_keys` and doing the # filtering in-memory asset_keys = sorted(self.all_asset_keys(), key=str) if prefix: asset_keys = [ asset_key for asset_key in asset_keys if asset_key.path[: len(prefix)] == prefix ] if cursor: cursor_asset = AssetKey.from_db_string(cursor) if cursor_asset and cursor_asset in asset_keys: idx = asset_keys.index(cursor_asset) asset_keys = asset_keys[idx + 1 :] if limit: asset_keys = asset_keys[:limit] return asset_keys @abstractmethod def get_latest_materialization_events( self, asset_keys: Sequence[AssetKey] ) -> Mapping[AssetKey, Optional["EventLogEntry"]]: pass def supports_add_asset_event_tags(self) -> bool: return False def add_asset_event_tags( self, event_id: int, event_timestamp: float, asset_key: AssetKey, new_tags: Mapping[str, str], ) -> None: raise NotImplementedError() @abstractmethod def get_event_tags_for_asset( self, asset_key: AssetKey, filter_tags: Optional[Mapping[str, str]] = None, filter_event_id: Optional[int] = None, ) -> Sequence[Mapping[str, str]]: pass @abstractmethod def get_asset_run_ids(self, asset_key: AssetKey) -> Sequence[str]: pass @abstractmethod def wipe_asset(self, asset_key: AssetKey) -> None: """Remove asset index history from event log for given asset_key.""" @abstractmethod def get_materialization_count_by_partition( self, asset_keys: Sequence[AssetKey], after_cursor: Optional[int] = None ) -> Mapping[AssetKey, Mapping[str, int]]: pass @abstractmethod def get_latest_asset_partition_materialization_attempts_without_materializations( self, asset_key: AssetKey ) -> Mapping[str, Tuple[str, int]]: pass @abstractmethod def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]: """Get the list of partition keys for a dynamic partitions definition.""" raise NotImplementedError() @abstractmethod def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool: """Check if a dynamic partition exists.""" raise NotImplementedError() @abstractmethod def add_dynamic_partitions( self, partitions_def_name: str, partition_keys: Sequence[str] ) -> None: """Add a partition for the specified dynamic partitions definition.""" raise NotImplementedError() @abstractmethod def delete_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> None: """Delete a partition for the specified dynamic partitions definition.""" raise NotImplementedError() def alembic_version(self) -> Optional[AlembicVersion]: return None @property def is_run_sharded(self) -> bool: """Indicates that the EventLogStoarge is sharded.""" return False