from airflow.models.connection import Connection
from airflow.models.dagbag import DagBag
from dagster import (
Definitions,
_check as check,
)
from dagster_airflow.dagster_pipeline_factory import (
DagsterAirflowError,
_create_airflow_connections,
_make_schedules_and_jobs_from_airflow_dag_bag,
make_dagster_pipeline_from_airflow_dag,
patch_airflow_example_dag,
)
[docs]def make_dagster_job_from_airflow_dag(
dag,
tags=None,
use_airflow_template_context=False,
unique_id=None,
mock_xcom=False,
use_ephemeral_airflow_db=False,
connections=None,
):
"""Construct a Dagster job corresponding to a given Airflow DAG.
Tasks in the resulting job will execute the ``execute()`` method on the corresponding
Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module
containing your DAG definition must be available in the Python environment within which your
Dagster solids execute.
To set Airflow's ``execution_date`` for use with Airflow Operator's ``execute()`` methods,
either:
1. (Best for ad hoc runs) Execute job directly. This will set execution_date to the
time (in UTC) of the run.
2. Add ``{'airflow_execution_date': utc_date_string}`` to the job tags. This will override
behavior from (1).
.. code-block:: python
my_dagster_job = make_dagster_job_from_airflow_dag(
dag=dag,
tags={'airflow_execution_date': utc_execution_date_str}
)
my_dagster_job.execute_in_process()
3. (Recommended) Add ``{'airflow_execution_date': utc_date_string}`` to the run tags,
such as in the Dagit UI. This will override behavior from (1) and (2)
We apply normalized_name() to the dag id and task ids when generating job name and op
names to ensure that names conform to Dagster's naming conventions.
Args:
dag (DAG): The Airflow DAG to compile into a Dagster job
tags (Dict[str, Field]): Job tags. Optionally include
`tags={'airflow_execution_date': utc_date_string}` to specify execution_date used within
execution of Airflow Operators.
use_airflow_template_context (bool): If True, will call get_template_context() on the
Airflow TaskInstance model which requires and modifies the DagRun table. The use_airflow_template_context
setting is ignored if use_ephemeral_airflow_db is True.
(default: False)
unique_id (int): If not None, this id will be postpended to generated op names. Used by
framework authors to enforce unique op names within a repo.
mock_xcom (bool): If True, dagster will mock out all calls made to xcom, features that
depend on xcom may not work as expected. (default: False)
use_ephemeral_airflow_db (bool): If True, dagster will create an ephemeral sqlite airflow
database for each run. (default: False)
connections (List[Connection]): List of Airflow Connections to be created in the Ephemeral
Airflow DB, if use_emphemeral_airflow_db is False this will be ignored.
Returns:
JobDefinition: The generated Dagster job
"""
pipeline_def = make_dagster_pipeline_from_airflow_dag(
dag=dag,
tags=tags,
use_airflow_template_context=use_airflow_template_context,
unique_id=unique_id,
mock_xcom=mock_xcom,
use_ephemeral_airflow_db=use_ephemeral_airflow_db,
connections=connections,
)
# pass in tags manually because pipeline_def.graph doesn't have it threaded
return pipeline_def.graph.to_job(
tags={**pipeline_def.tags},
resource_defs={"airflow_db": pipeline_def.mode_definitions[0].resource_defs["airflow_db"]}
if use_ephemeral_airflow_db
else {},
)
[docs]def make_dagster_definitions_from_airflow_dag_bag(
dag_bag,
use_airflow_template_context=False,
mock_xcom=False,
use_ephemeral_airflow_db=False,
connections=None,
):
"""Construct a Dagster definition corresponding to Airflow DAGs in DagBag.
Usage:
Create `make_dagster_definition.py`:
from dagster_airflow import make_dagster_definition_from_airflow_dag_bag
from airflow_home import my_dag_bag
def make_definition_from_dag_bag():
return make_dagster_definition_from_airflow_dag_bag(my_dag_bag)
Use Definitions as usual, for example:
`dagit -f path/to/make_dagster_definition.py`
Args:
dag_bag (DagBag): Airflow DagBag Model
use_airflow_template_context (bool): If True, will call get_template_context() on the
Airflow TaskInstance model which requires and modifies the DagRun table. The use_airflow_template_context
setting is ignored if use_ephemeral_airflow_db is True.
(default: False)
mock_xcom (bool): If True, dagster will mock out all calls made to xcom, features that
depend on xcom may not work as expected. (default: False)
use_ephemeral_airflow_db (bool): If True, dagster will create an ephemeral sqlite airflow
database for each run. (default: False)
connections (List[Connection]): List of Airflow Connections to be created in the Ephemeral
Airflow DB, if use_emphemeral_airflow_db is False this will be ignored.
Returns:
Definitions
"""
schedules, jobs = _make_schedules_and_jobs_from_airflow_dag_bag(
dag_bag,
use_airflow_template_context,
mock_xcom,
use_ephemeral_airflow_db,
connections,
)
return Definitions(
schedules=schedules,
jobs=jobs,
)
[docs]def make_dagster_definitions_from_airflow_dags_path(
dag_path,
safe_mode=True,
use_airflow_template_context=False,
mock_xcom=False,
use_ephemeral_airflow_db=True,
connections=None,
):
"""Construct a Dagster repository corresponding to Airflow DAGs in dag_path.
Usage:
Create ``make_dagster_definitions.py``:
.. code-block:: python
from dagster_airflow import make_dagster_definitions_from_airflow_dags_path
def make_definitions_from_dir():
return make_dagster_definitions_from_airflow_dags_path(
'/path/to/dags/',
)
Use RepositoryDefinition as usual, for example:
``dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir``
Args:
dag_path (str): Path to directory or file that contains Airflow Dags
include_examples (bool): True to include Airflow's example DAGs. (default: False)
safe_mode (bool): True to use Airflow's default heuristic to find files that contain DAGs
(ie find files that contain both b'DAG' and b'airflow') (default: True)
use_airflow_template_context (bool): If True, will call get_template_context() on the
Airflow TaskInstance model which requires and modifies the DagRun table. The use_airflow_template_context
setting is ignored if use_ephemeral_airflow_db is True.
(default: False)
mock_xcom (bool): If True, dagster will mock out all calls made to xcom, features that
depend on xcom may not work as expected. (default: False)
use_ephemeral_airflow_db (bool): If True, dagster will create an ephemeral sqlite airflow
database for each run. (default: False)
connections (List[Connection]): List of Airflow Connections to be created in the Ephemeral
Airflow DB, if use_emphemeral_airflow_db is False this will be ignored.
Returns:
Definitions
"""
check.str_param(dag_path, "dag_path")
check.bool_param(safe_mode, "safe_mode")
check.bool_param(use_airflow_template_context, "use_airflow_template_context")
mock_xcom = check.opt_bool_param(mock_xcom, "mock_xcom")
use_ephemeral_airflow_db = check.opt_bool_param(
use_ephemeral_airflow_db, "use_ephemeral_airflow_db"
)
connections = check.opt_list_param(connections, "connections", of_type=Connection)
# add connections to airflow so that dag evaluation works
_create_airflow_connections(connections)
try:
dag_bag = DagBag(
dag_folder=dag_path,
include_examples=False, # Exclude Airflow example dags
safe_mode=safe_mode,
)
except Exception:
raise DagsterAirflowError("Error initializing airflow.models.dagbag object with arguments")
return make_dagster_definitions_from_airflow_dag_bag(
dag_bag=dag_bag,
use_airflow_template_context=use_airflow_template_context,
mock_xcom=mock_xcom,
use_ephemeral_airflow_db=use_ephemeral_airflow_db,
connections=connections,
)
def make_dagster_definitions_from_airflow_example_dags(use_ephemeral_airflow_db=True):
"""Construct a Dagster repository for Airflow's example DAGs.
Usage:
Create `make_dagster_definitions.py`:
from dagster_airflow import make_dagster_definitions_from_airflow_example_dags
def make_airflow_example_dags():
return make_dagster_definitions_from_airflow_example_dags()
Use Definitions as usual, for example:
`dagit -f path/to/make_dagster_definitions.py`
Args:
use_ephemeral_airflow_db (bool): If True, dagster will create an ephemeral sqlite airflow
database for each run. (default: True)
Returns:
Definitions
"""
dag_bag = DagBag(
dag_folder="some/empty/folder/with/no/dags", # prevent defaulting to settings.DAGS_FOLDER
include_examples=True,
)
# There is a bug in Airflow v1 where the python_callable for task
# 'search_catalog' is missing a required position argument '_'. It is fixed in airflow v2
patch_airflow_example_dag(dag_bag)
return make_dagster_definitions_from_airflow_dag_bag(
dag_bag=dag_bag, use_ephemeral_airflow_db=use_ephemeral_airflow_db
)