airflow Task任务之间的依赖关系,返回值如何被其他task使用(xcom)
https://github.com/astronomer/airflow-guides/blob/master/guides/templating.md
https://ttxsgoto.github.io/2019/07/27/yw-airflow-operator/
https://airflow.apache.org/concepts.html#xcoms
https://marclamberti.com/blog/airflow-pythonoperator/
You can access XCom variables from within templated fields. For example, to read from XCom:
myOperator = MyOperator(
message="Operation result: {{ task_instance.xcom_pull(task_ids=['task1', 'task2'], key='result_status') }}",
...
It is also possible to not specify task to get all XCom pushes within one DagRun with the same key name
myOperator = MyOperator(
message="Warning status: {{ task_instance.xcom_pull(task_ids=None, key='warning_status') }}",
...
would return an array.
example:
#! /usr/bin/python # -*- coding: utf-8 -*- import datetime import dateutil.parser from airflow import models from airflow.contrib.operators import mysql_to_gcs from lib import gcs_to_bq from airflow.operators.python_operator import PythonOperator # ENV from variables project = models.Variable.get('gcp_project') gcs_bucket = models.Variable.get("gcs_bucket") db = models.Variable.get("db") default_dag_args = { 'start_date': datetime.datetime(year=2019, month=8, day=5, hour=7), 'email': 'xing@peatio.com', 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': datetime.timedelta(minutes=3), 'project_id': project } dag = models.DAG( dag_id="hourly-sync-dag07", schedule_interval="0 * * * *", default_args=default_dag_args) def set_last_execution_time(**kwargs): print("EXECUTION DATE: {0}".format(kwargs['ts'])) ts = kwargs['ts'] execution_time = dateutil.parser.parse(ts) delta = datetime.timedelta(hours=1) last_execution_time = execution_time - delta print("LAST EXECUTION DATE: {0}".format(last_execution_time.strftime("%Y-%m-%d %H:%M:%S"))) return last_execution_time.strftime("%Y-%m-%d %H:%M:%S") def hourly_sync_account_histories_to_bq(): mysql_table = "account_histories" bq_table = "account_history_details" ts = "{{ ts }}" pre_task = PythonOperator( task_id='push_last_execution_time_to_xcom', python_callable=set_last_execution_time, template_dict={'ts': ts}, dag=dag, provide_context=True ) last_execution_time = "{{ task_instance.xcom_pull(task_ids='push_last_execution_time_to_xcom', key='return_value') }}" print("LAST EXECUTION DATE FROM XCOM: {0}".format(last_execution_time)) sql = """ SELECT ah.id as account_history_id, hc.id as customer_id, hc.broker_id, ah.account_id, a.asset_uuid, TO_BASE64(ah.reason) as reason, CAST(CAST((ah.balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As balance, CAST(CAST((ah.locked_balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As locked_balance, CAST(CAST((ah.delta_balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As delta_balance, CAST(CAST((ah.delta_locked_balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As delta_locked_balance, ah.inserted_at, ah.updated_at FROM `{}`.`account_histories` as ah LEFT JOIN `{}`.`accounts` as a on a.id=ah.account_id LEFT JOIN `{}`.`hashed_customers` as hc on hc.id=a.customer_id WHERE ah.inserted_at > '{}' and ah.inserted_at <= '{}'; """.format(db, db, db, last_execution_time, ts) cacheUUID = "{{ ts_nodash }}" mysql2gcs = mysql_to_gcs.MySqlToGoogleCloudStorageOperator( task_id="mysql2gcs-{}.{}".format(db, mysql_table), sql=sql, mysql_conn_id="pxn-peatio-sql-read-replica", filename="cache_hourly_sync/{}/{}.{}-{{}}".format(cacheUUID, db, mysql_table), bucket=gcs_bucket, google_cloud_storage_conn_id="google_cloud_storage_default", approx_max_file_size_bytes=1024 * 1024 * 100, dag=dag) source_obj = "cache_hourly_sync/{}/{}.{}-*".format(cacheUUID, db, mysql_table) gcs2bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( task_id="gcs2bq-{}.{}".format(db, bq_table), bucket=gcs_bucket, source_objects=[source_obj], schema_object="account_history_details.json", write_disposition='WRITE_APPEND', destination_project_dataset_table="{}.dw_ng.{}".format( project, bq_table), bigquery_conn_id="bigquery_default", source_format="NEWLINE_DELIMITED_JSON", google_cloud_storage_conn_id="google_cloud_storage_default", dag=dag) pre_task >> mysql2gcs >> gcs2bq return pre_task, mysql2gcs, gcs2bq hourly_sync_account_histories_to_bq()