import base64
from abc import ABC, abstractmethod
from enum import Enum
from typing import (
TYPE_CHECKING,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
import dagster._check as check
from dagster._core.assets import AssetDetails
from dagster._core.definitions.events import AssetKey
from dagster._core.event_api import EventHandlerFn, EventLogRecord, EventRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.stats import (
RunStepKeyStatsSnapshot,
build_run_stats_from_events,
build_run_step_stats_from_events,
)
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.storage.pipeline_run import PipelineRunStatsSnapshot
from dagster._core.storage.sql import AlembicVersion
from dagster._seven import json
from dagster._utils import PrintFn
if TYPE_CHECKING:
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
class EventLogConnection(NamedTuple):
records: Sequence[EventLogRecord]
cursor: str
has_more: bool
class EventLogCursorType(Enum):
OFFSET = "OFFSET"
STORAGE_ID = "STORAGE_ID"
class EventLogCursor(NamedTuple):
"""Representation of an event record cursor, keeping track of the log query state."""
cursor_type: EventLogCursorType
value: int
def is_offset_cursor(self) -> bool:
return self.cursor_type == EventLogCursorType.OFFSET
def is_id_cursor(self) -> bool:
return self.cursor_type == EventLogCursorType.STORAGE_ID
def offset(self) -> int:
check.invariant(self.cursor_type == EventLogCursorType.OFFSET)
return max(0, int(self.value))
def storage_id(self) -> int:
check.invariant(self.cursor_type == EventLogCursorType.STORAGE_ID)
return int(self.value)
def __str__(self) -> str:
return self.to_string()
def to_string(self) -> str:
raw = json.dumps({"type": self.cursor_type.value, "value": self.value})
return base64.b64encode(bytes(raw, encoding="utf-8")).decode("utf-8")
@staticmethod
def parse(cursor_str: str) -> "EventLogCursor":
raw = json.loads(base64.b64decode(cursor_str).decode("utf-8"))
return EventLogCursor(EventLogCursorType(raw["type"]), raw["value"])
@staticmethod
def from_offset(offset: int) -> "EventLogCursor":
return EventLogCursor(EventLogCursorType.OFFSET, offset)
@staticmethod
def from_storage_id(storage_id: int) -> "EventLogCursor":
return EventLogCursor(EventLogCursorType.STORAGE_ID, storage_id)
class AssetEntry(
NamedTuple(
"_AssetEntry",
[
("asset_key", AssetKey),
("last_materialization_record", Optional[EventLogRecord]),
("last_run_id", Optional[str]),
("asset_details", Optional[AssetDetails]),
("cached_status", Optional["AssetStatusCacheValue"]),
],
)
):
def __new__(
cls,
asset_key: AssetKey,
last_materialization_record: Optional[EventLogRecord] = None,
last_run_id: Optional[str] = None,
asset_details: Optional[AssetDetails] = None,
cached_status: Optional["AssetStatusCacheValue"] = None,
):
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
return super(AssetEntry, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
last_materialization_record=check.opt_inst_param(
last_materialization_record, "last_materialization_record", EventLogRecord
),
last_run_id=check.opt_str_param(last_run_id, "last_run_id"),
asset_details=check.opt_inst_param(asset_details, "asset_details", AssetDetails),
cached_status=check.opt_inst_param(
cached_status, "cached_status", AssetStatusCacheValue
),
)
@property
def last_materialization(self) -> Optional["EventLogEntry"]:
if self.last_materialization_record is None:
return None
return self.last_materialization_record.event_log_entry
[docs]class AssetRecord(NamedTuple):
"""Internal representation of an asset record, as stored in a :py:class:`~dagster._core.storage.event_log.EventLogStorage`.
Users should not invoke this class directly.
"""
storage_id: int
asset_entry: AssetEntry
[docs]class EventLogStorage(ABC, MayHaveInstanceWeakref[T_DagsterInstance]):
"""Abstract base class for storing structured event logs from pipeline runs.
Note that event log storages using SQL databases as backing stores should implement
:py:class:`~dagster._core.storage.event_log.SqlEventLogStorage`.
Users should not directly instantiate concrete subclasses of this class; they are instantiated
by internal machinery when ``dagit`` and ``dagster-graphql`` load, based on the values in the
``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class
should be done by setting values in that file.
"""
def get_logs_for_run(
self,
run_id: str,
cursor: Optional[Union[str, int]] = None,
of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> Sequence["EventLogEntry"]:
"""Get all of the logs corresponding to a run.
Args:
run_id (str): The id of the run for which to fetch logs.
cursor (Optional[Union[str, int]]): Cursor value to track paginated queries. Legacy
support for integer offset cursors.
of_type (Optional[DagsterEventType]): the dagster event type to filter the logs.
limit (Optional[int]): Max number of records to return.
"""
if isinstance(cursor, int):
cursor = EventLogCursor.from_offset(cursor + 1).to_string()
records = self.get_records_for_run(run_id, cursor, of_type, limit).records
return [record.event_log_entry for record in records]
@abstractmethod
def get_records_for_run(
self,
run_id: str,
cursor: Optional[str] = None,
of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> EventLogConnection:
"""Get all of the event log records corresponding to a run.
Args:
run_id (str): The id of the run for which to fetch logs.
cursor (Optional[str]): Cursor value to track paginated queries.
of_type (Optional[DagsterEventType]): the dagster event type to filter the logs.
limit (Optional[int]): Max number of records to return.
"""
def get_stats_for_run(self, run_id: str) -> PipelineRunStatsSnapshot:
"""Get a summary of events that have ocurred in a run."""
return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id))
def get_step_stats_for_run(
self, run_id: str, step_keys: Optional[Sequence[str]] = None
) -> Sequence[RunStepKeyStatsSnapshot]:
"""Get per-step stats for a pipeline run."""
logs = self.get_logs_for_run(run_id)
if step_keys:
logs = [
event
for event in logs
if event.is_dagster_event and event.get_dagster_event().step_key in step_keys
]
return build_run_step_stats_from_events(run_id, logs)
@abstractmethod
def store_event(self, event: "EventLogEntry") -> None:
"""Store an event corresponding to a pipeline run.
Args:
event (EventLogEntry): The event to store.
"""
@abstractmethod
def delete_events(self, run_id: str) -> None:
"""Remove events for a given run id."""
@abstractmethod
def upgrade(self) -> None:
"""This method should perform any schema migrations necessary to bring an
out-of-date instance of the storage up to date.
"""
@abstractmethod
def reindex_events(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None:
"""Call this method to run any data migrations across the event_log tables."""
@abstractmethod
def reindex_assets(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None:
"""Call this method to run any data migrations across the asset tables."""
@abstractmethod
def wipe(self) -> None:
"""Clear the log storage."""
@abstractmethod
def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) -> None:
"""Call this method to start watching."""
@abstractmethod
def end_watch(self, run_id: str, handler: EventHandlerFn) -> None:
"""Call this method to stop watching."""
@property
@abstractmethod
def is_persistent(self) -> bool:
"""bool: Whether the storage is persistent."""
def dispose(self) -> None:
"""Explicit lifecycle management."""
def optimize_for_dagit(self, statement_timeout: int, pool_recycle: int) -> None:
"""Allows for optimizing database connection / use in the context of a long lived dagit process.
"""
@abstractmethod
def get_event_records(
self,
event_records_filter: EventRecordsFilter,
limit: Optional[int] = None,
ascending: bool = False,
) -> Sequence[EventLogRecord]:
pass
def supports_event_consumer_queries(self) -> bool:
return False
def get_logs_for_all_runs_by_log_id(
self,
after_cursor: int = -1,
dagster_event_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> Mapping[int, "EventLogEntry"]:
"""Get event records across all runs. Only supported for non sharded sql storage."""
raise NotImplementedError()
def get_maximum_record_id(self) -> Optional[int]:
"""Get the current greatest record id in the event log. Only supported for non sharded sql storage.
"""
raise NotImplementedError()
@abstractmethod
def can_cache_asset_status_data(self) -> bool:
pass
@abstractmethod
def wipe_asset_cached_status(self, asset_key: AssetKey) -> None:
pass
@abstractmethod
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Sequence[AssetRecord]:
pass
@abstractmethod
def has_asset_key(self, asset_key: AssetKey) -> bool:
pass
@abstractmethod
def all_asset_keys(self) -> Sequence[AssetKey]:
pass
@abstractmethod
def update_asset_cached_status_data(
self, asset_key: AssetKey, cache_values: "AssetStatusCacheValue"
) -> None:
pass
def get_asset_keys(
self,
prefix: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
) -> Sequence[AssetKey]:
# base implementation of get_asset_keys, using the existing `all_asset_keys` and doing the
# filtering in-memory
asset_keys = sorted(self.all_asset_keys(), key=str)
if prefix:
asset_keys = [
asset_key for asset_key in asset_keys if asset_key.path[: len(prefix)] == prefix
]
if cursor:
cursor_asset = AssetKey.from_db_string(cursor)
if cursor_asset and cursor_asset in asset_keys:
idx = asset_keys.index(cursor_asset)
asset_keys = asset_keys[idx + 1 :]
if limit:
asset_keys = asset_keys[:limit]
return asset_keys
@abstractmethod
def get_latest_materialization_events(
self, asset_keys: Sequence[AssetKey]
) -> Mapping[AssetKey, Optional["EventLogEntry"]]:
pass
def supports_add_asset_event_tags(self) -> bool:
return False
def add_asset_event_tags(
self,
event_id: int,
event_timestamp: float,
asset_key: AssetKey,
new_tags: Mapping[str, str],
) -> None:
raise NotImplementedError()
@abstractmethod
def get_event_tags_for_asset(
self,
asset_key: AssetKey,
filter_tags: Optional[Mapping[str, str]] = None,
filter_event_id: Optional[int] = None,
) -> Sequence[Mapping[str, str]]:
pass
@abstractmethod
def get_asset_run_ids(self, asset_key: AssetKey) -> Sequence[str]:
pass
@abstractmethod
def wipe_asset(self, asset_key: AssetKey) -> None:
"""Remove asset index history from event log for given asset_key."""
@abstractmethod
def get_materialization_count_by_partition(
self, asset_keys: Sequence[AssetKey], after_cursor: Optional[int] = None
) -> Mapping[AssetKey, Mapping[str, int]]:
pass
@abstractmethod
def get_latest_asset_partition_materialization_attempts_without_materializations(
self, asset_key: AssetKey
) -> Mapping[str, Tuple[str, int]]:
pass
@abstractmethod
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
"""Get the list of partition keys for a dynamic partitions definition."""
raise NotImplementedError()
@abstractmethod
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
"""Check if a dynamic partition exists."""
raise NotImplementedError()
@abstractmethod
def add_dynamic_partitions(
self, partitions_def_name: str, partition_keys: Sequence[str]
) -> None:
"""Add a partition for the specified dynamic partitions definition."""
raise NotImplementedError()
@abstractmethod
def delete_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> None:
"""Delete a partition for the specified dynamic partitions definition."""
raise NotImplementedError()
def alembic_version(self) -> Optional[AlembicVersion]:
return None
@property
def is_run_sharded(self) -> bool:
"""Indicates that the EventLogStoarge is sharded."""
return False