import os
from typing import List, Mapping, Optional, Tuple
from airflow.models.connection import Connection
from airflow.models.dagbag import DagBag
from dagster import (
Definitions,
JobDefinition,
ResourceDefinition,
ScheduleDefinition,
_check as check,
)
from dagster_airflow.dagster_job_factory import make_dagster_job_from_airflow_dag
from dagster_airflow.dagster_schedule_factory import (
_is_dag_is_schedule,
make_dagster_schedule_from_airflow_dag,
)
from dagster_airflow.patch_airflow_example_dag import patch_airflow_example_dag
from dagster_airflow.resources import (
make_ephemeral_airflow_db_resource as make_ephemeral_airflow_db_resource,
)
from dagster_airflow.resources.airflow_ephemeral_db import AirflowEphemeralDatabase
from dagster_airflow.resources.airflow_persistent_db import AirflowPersistentDatabase
from dagster_airflow.utils import (
is_airflow_2_loaded_in_environment,
)
[docs]def make_dagster_definitions_from_airflow_dag_bag(
dag_bag: DagBag,
connections: Optional[List[Connection]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = {},
) -> Definitions:
"""Construct a Dagster definition corresponding to Airflow DAGs in DagBag.
Usage:
Create `make_dagster_definition.py`:
from dagster_airflow import make_dagster_definition_from_airflow_dag_bag
from airflow_home import my_dag_bag
def make_definition_from_dag_bag():
return make_dagster_definition_from_airflow_dag_bag(my_dag_bag)
Use Definitions as usual, for example:
`dagit -f path/to/make_dagster_definition.py`
Args:
dag_bag (DagBag): Airflow DagBag Model
connections (List[Connection]): List of Airflow Connections to be created in the Airflow DB
Returns:
Definitions
"""
check.inst_param(dag_bag, "dag_bag", DagBag)
connections = check.opt_list_param(connections, "connections", of_type=Connection)
resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
if resource_defs is None or "airflow_db" not in resource_defs:
resource_defs = dict(resource_defs) if resource_defs else {}
resource_defs["airflow_db"] = make_ephemeral_airflow_db_resource(connections=connections)
schedules, jobs = make_schedules_and_jobs_from_airflow_dag_bag(
dag_bag=dag_bag,
connections=connections,
resource_defs=resource_defs,
)
return Definitions(
schedules=schedules,
jobs=jobs,
resources=resource_defs,
)
[docs]def make_dagster_definitions_from_airflow_dags_path(
dag_path: str,
safe_mode: bool = True,
connections: Optional[List[Connection]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = {},
) -> Definitions:
"""Construct a Dagster repository corresponding to Airflow DAGs in dag_path.
Usage:
Create ``make_dagster_definitions.py``:
.. code-block:: python
from dagster_airflow import make_dagster_definitions_from_airflow_dags_path
def make_definitions_from_dir():
return make_dagster_definitions_from_airflow_dags_path(
'/path/to/dags/',
)
Use RepositoryDefinition as usual, for example:
``dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir``
Args:
dag_path (str): Path to directory or file that contains Airflow Dags
include_examples (bool): True to include Airflow's example DAGs. (default: False)
safe_mode (bool): True to use Airflow's default heuristic to find files that contain DAGs
(ie find files that contain both b'DAG' and b'airflow') (default: True)
connections (List[Connection]): List of Airflow Connections to be created in the Airflow DB
Returns:
Definitions
"""
check.str_param(dag_path, "dag_path")
check.bool_param(safe_mode, "safe_mode")
connections = check.opt_list_param(connections, "connections", of_type=Connection)
resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
if resource_defs is None or "airflow_db" not in resource_defs:
resource_defs = dict(resource_defs) if resource_defs else {}
resource_defs["airflow_db"] = make_ephemeral_airflow_db_resource(connections=connections)
if (
resource_defs["airflow_db"].resource_fn.__qualname__.split(".")[0]
== "AirflowEphemeralDatabase"
):
AirflowEphemeralDatabase._initialize_database(connections=connections) # noqa: SLF001
elif (
resource_defs["airflow_db"].resource_fn.__qualname__.split(".")[0]
== "AirflowPersistentDatabase"
):
AirflowPersistentDatabase._initialize_database( # noqa: SLF001
uri=os.getenv("AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", "")
if is_airflow_2_loaded_in_environment()
else os.getenv("AIRFLOW__CORE__SQL_ALCHEMY_CONN", ""),
connections=connections,
)
dag_bag = DagBag(
dag_folder=dag_path,
include_examples=False, # Exclude Airflow example dags
safe_mode=safe_mode,
)
return make_dagster_definitions_from_airflow_dag_bag(
dag_bag=dag_bag,
connections=connections,
resource_defs=resource_defs,
)
def make_dagster_definitions_from_airflow_example_dags(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = {},
) -> Definitions:
"""Construct a Dagster repository for Airflow's example DAGs.
Usage:
Create `make_dagster_definitions.py`:
from dagster_airflow import make_dagster_definitions_from_airflow_example_dags
def make_airflow_example_dags():
return make_dagster_definitions_from_airflow_example_dags()
Use Definitions as usual, for example:
`dagit -f path/to/make_dagster_definitions.py`
Args:
resource_defs: Optional[Mapping[str, ResourceDefinition]]
Resource definitions to be used with the definitions
Returns:
Definitions
"""
dag_bag = DagBag(
dag_folder="some/empty/folder/with/no/dags", # prevent defaulting to settings.DAGS_FOLDER
include_examples=True,
)
# There is a bug in Airflow v1 where the python_callable for task
# 'search_catalog' is missing a required position argument '_'. It is fixed in airflow v2
patch_airflow_example_dag(dag_bag)
return make_dagster_definitions_from_airflow_dag_bag(
dag_bag=dag_bag, resource_defs=resource_defs
)
[docs]def make_schedules_and_jobs_from_airflow_dag_bag(
dag_bag: DagBag,
connections: Optional[List[Connection]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = {},
) -> Tuple[List[ScheduleDefinition], List[JobDefinition]]:
"""Construct Dagster Schedules and Jobs corresponding to Airflow DagBag.
Args:
dag_bag (DagBag): Airflow DagBag Model
connections (List[Connection]): List of Airflow Connections to be created in the Airflow DB
Returns:
- List[ScheduleDefinition]: The generated Dagster Schedules
- List[JobDefinition]: The generated Dagster Jobs
"""
check.inst_param(dag_bag, "dag_bag", DagBag)
connections = check.opt_list_param(connections, "connections", of_type=Connection)
job_defs = []
schedule_defs = []
count = 0
# To enforce predictable iteration order
sorted_dag_ids = sorted(dag_bag.dag_ids)
for dag_id in sorted_dag_ids:
dag = dag_bag.dags.get(dag_id)
if not dag:
continue
if _is_dag_is_schedule(dag):
schedule_defs.append(
make_dagster_schedule_from_airflow_dag(
dag=dag, tags=None, connections=connections, resource_defs=resource_defs
)
)
else:
job_defs.append(
make_dagster_job_from_airflow_dag(
dag=dag, tags=None, connections=connections, resource_defs=resource_defs
)
)
count += 1
return schedule_defs, job_defs