欢迎来到cool的博客
7

Music box

Click to Start

点击头像播放音乐
新博客链接

airflow Task任务之间的依赖关系,返回值如何被其他task使用(xcom)

https://github.com/astronomer/airflow-guides/blob/master/guides/templating.md

https://ttxsgoto.github.io/2019/07/27/yw-airflow-operator/

https://stackoverflow.com/questions/51581363/airflow-how-to-use-xcom-push-and-xcom-pull-in-non-pythonoperator

https://airflow.apache.org/concepts.html#xcoms

https://marclamberti.com/blog/airflow-pythonoperator/

You can access XCom variables from within templated fields. For example, to read from XCom:

myOperator = MyOperator(
    message="Operation result: {{ task_instance.xcom_pull(task_ids=['task1', 'task2'], key='result_status') }}",
    ...

It is also possible to not specify task to get all XCom pushes within one DagRun with the same key name

myOperator = MyOperator(
    message="Warning status: {{ task_instance.xcom_pull(task_ids=None, key='warning_status') }}",
    ...

would return an array.

 

example: 

#! /usr/bin/python
# -*- coding: utf-8 -*-

import datetime
import dateutil.parser

from airflow import models
from airflow.contrib.operators import mysql_to_gcs
from lib import gcs_to_bq
from airflow.operators.python_operator import PythonOperator

# ENV from variables
project = models.Variable.get('gcp_project')
gcs_bucket = models.Variable.get("gcs_bucket")
db = models.Variable.get("db")

default_dag_args = {
    'start_date': datetime.datetime(year=2019, month=8, day=5, hour=7),
    'email': 'xing@peatio.com',
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=3),
    'project_id': project
}

dag = models.DAG(
    dag_id="hourly-sync-dag07",
    schedule_interval="0 * * * *",
    default_args=default_dag_args)

def set_last_execution_time(**kwargs):
    print("EXECUTION DATE: {0}".format(kwargs['ts']))
    ts = kwargs['ts']
    execution_time = dateutil.parser.parse(ts)
    delta = datetime.timedelta(hours=1)
    last_execution_time = execution_time - delta
    print("LAST EXECUTION DATE: {0}".format(last_execution_time.strftime("%Y-%m-%d %H:%M:%S")))
    return last_execution_time.strftime("%Y-%m-%d %H:%M:%S")

def hourly_sync_account_histories_to_bq():
    mysql_table = "account_histories"
    bq_table = "account_history_details"

    ts = "{{ ts }}"

    pre_task = PythonOperator(
        task_id='push_last_execution_time_to_xcom',
        python_callable=set_last_execution_time,
        template_dict={'ts': ts},
        dag=dag,
        provide_context=True
    )

    last_execution_time = "{{ task_instance.xcom_pull(task_ids='push_last_execution_time_to_xcom', key='return_value') }}"

    print("LAST EXECUTION DATE FROM XCOM: {0}".format(last_execution_time))

    sql = """
    SELECT
        ah.id as account_history_id,
        hc.id as customer_id,
        hc.broker_id,
        ah.account_id,
        a.asset_uuid,
        TO_BASE64(ah.reason) as reason,
        CAST(CAST((ah.balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As balance,
        CAST(CAST((ah.locked_balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As locked_balance,
        CAST(CAST((ah.delta_balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As delta_balance,
        CAST(CAST((ah.delta_locked_balance * CAST(1e7 AS DECIMAL)) AS DECIMAL(32,9)) AS CHAR) As delta_locked_balance,
        ah.inserted_at,
        ah.updated_at
    FROM
        `{}`.`account_histories` as ah
    LEFT JOIN `{}`.`accounts` as a on a.id=ah.account_id
    LEFT JOIN `{}`.`hashed_customers` as hc on hc.id=a.customer_id
    WHERE
        ah.inserted_at > '{}' and ah.inserted_at <= '{}';
    """.format(db, db, db, last_execution_time, ts)

    cacheUUID = "{{ ts_nodash }}"

    mysql2gcs = mysql_to_gcs.MySqlToGoogleCloudStorageOperator(
        task_id="mysql2gcs-{}.{}".format(db, mysql_table),
        sql=sql,
        mysql_conn_id="pxn-peatio-sql-read-replica",
        filename="cache_hourly_sync/{}/{}.{}-{{}}".format(cacheUUID, db, mysql_table),
        bucket=gcs_bucket,
        google_cloud_storage_conn_id="google_cloud_storage_default",
        approx_max_file_size_bytes=1024 * 1024 * 100,
        dag=dag)

    source_obj = "cache_hourly_sync/{}/{}.{}-*".format(cacheUUID, db, mysql_table)

    gcs2bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
        task_id="gcs2bq-{}.{}".format(db, bq_table),
        bucket=gcs_bucket,
        source_objects=[source_obj],
        schema_object="account_history_details.json",
        write_disposition='WRITE_APPEND',
        destination_project_dataset_table="{}.dw_ng.{}".format(
            project, bq_table),
        bigquery_conn_id="bigquery_default",
        source_format="NEWLINE_DELIMITED_JSON",
        google_cloud_storage_conn_id="google_cloud_storage_default",
        dag=dag)

    pre_task >> mysql2gcs >> gcs2bq
    return pre_task, mysql2gcs, gcs2bq

hourly_sync_account_histories_to_bq()

返回列表