Integration¶
Azure: Microsoft Azure¶
Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob Storage. Note that the Hook, Sensor and Operator are in the contrib section.
Azure Blob Storage¶
All classes communicate via the Window Azure Storage Blob protocol. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=KEY), or login and SAS token in the extra field (see connection wasb_default for an example).
- WasbBlobSensor: Checks if a blob is present on Azure Blob storage.
- WasbPrefixSensor: Checks if blobs matching a prefix are present on Azure Blob storage.
- FileToWasbOperator: Uploads a local file to a container as a blob.
- WasbHook: Interface with Azure Blob Storage.
WasbBlobSensor¶
WasbPrefixSensor¶
FileToWasbOperator¶
WasbHook¶
AWS: Amazon Web Services¶
Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section.
AWS EMR¶
- EmrAddStepsOperator : Adds steps to an existing EMR JobFlow.
- EmrCreateJobFlowOperator : Creates an EMR JobFlow, reading the config from the EMR connection.
- EmrTerminateJobFlowOperator : Terminates an EMR JobFlow.
- EmrHook : Interact with AWS EMR.
EmrAddStepsOperator¶
-
class
airflow.contrib.operators.emr_add_steps_operator.
EmrAddStepsOperator
(job_flow_id, aws_conn_id='s3_default', steps=None, *args, **kwargs)[source]¶ An operator that adds steps to an existing EMR job_flow.
Parameters: - job_flow_id – id of the JobFlow to add steps to
- aws_conn_id (str) – aws connection to uses
- steps (list) – boto3 style steps to be added to the jobflow
EmrCreateJobFlowOperator¶
-
class
airflow.contrib.operators.emr_create_job_flow_operator.
EmrCreateJobFlowOperator
(aws_conn_id='s3_default', emr_conn_id='emr_default', job_flow_overrides=None, *args, **kwargs)[source]¶ Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection.
Parameters: - aws_conn_id (str) – aws connection to uses
- emr_conn_id (str) – emr connection to use
- job_flow_overrides – boto3 style arguments to override emr_connection extra
EmrTerminateJobFlowOperator¶
AWS S3¶
- S3FileTransformOperator : Copies data from a source S3 location to a temporary location on the local filesystem.
- S3ToHiveTransfer : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.
- S3Hook : Interact with AWS S3.
S3FileTransformOperator¶
-
class
airflow.operators.s3_file_transform_operator.
S3FileTransformOperator
(source_s3_key, dest_s3_key, transform_script, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, *args, **kwargs)[source]¶ Copies data from a source S3 location to a temporary location on the local filesystem. Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location.
The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. The transformation script is expected to read the data from source , transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3.
Parameters: - source_s3_key (str) – The key to be retrieved from S3
- source_aws_conn_id (str) – source s3 connection
- dest_s3_key (str) – The key to be written from S3
- dest_aws_conn_id (str) – destination s3 connection
- replace (bool) – Replace dest S3 key if it already exists
- transform_script (str) – location of the executable transformation script
S3ToHiveTransfer¶
AWS EC2 Container Service¶
- ECSOperator : Execute a task on AWS EC2 Container Service.
ECSOperator¶
-
class
airflow.contrib.operators.ecs_operator.
ECSOperator
(task_definition, cluster, overrides, aws_conn_id=None, region_name=None, **kwargs)[source]¶ Execute a task on AWS EC2 Container Service
Parameters: - task_definition (str) – the task definition name on EC2 Container Service
- cluster (str) – the cluster name on EC2 Container Service
- aws_conn_id (str) – connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
- region_name – region name to use in AWS Hook. Override the region_name in connection (if provided)
Param: overrides: the same parameter that boto3 will receive: http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
Type: overrides: dict
AWS RedShift¶
- RedshiftToS3Transfer : Executes an unload command to S3 as a CSV with headers.
RedshiftToS3Transfer¶
Databricks¶
Databricks has contributed an Airflow operator which enables
submitting runs to the Databricks platform. Internally the operator talks to the
api/2.0/jobs/runs/submit
endpoint.
DatabricksSubmitRunOperator¶
-
class
airflow.contrib.operators.databricks_operator.
DatabricksSubmitRunOperator
(json=None, spark_jar_task=None, notebook_task=None, new_cluster=None, existing_cluster_id=None, libraries=None, run_name=None, timeout_seconds=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, **kwargs)[source]¶ Submits an Spark job run to Databricks using the api/2.0/jobs/runs/submit API endpoint.
There are two ways to instantiate this operator.
In the first way, you can take the JSON payload that you typically use to call the
api/2.0/jobs/runs/submit
endpoint and pass it directly to ourDatabricksSubmitRunOperator
through thejson
parameter. For examplejson = { 'new_cluster': { 'spark_version': '2.1.0-db3-scala2.11', 'num_workers': 2 }, 'notebook_task': { 'notebook_path': '/Users/airflow@example.com/PrepareData', }, } notebook_run = DatabricksSubmitRunOperator(task_id='notebook_run', json=json)
Another way to accomplish the same thing is to use the named parameters of the
DatabricksSubmitRunOperator
directly. Note that there is exactly one named parameter for each top level parameter in theruns/submit
endpoint. In this method, your code would look like this:new_cluster = { 'spark_version': '2.1.0-db3-scala2.11', 'num_workers': 2 } notebook_task = { 'notebook_path': '/Users/airflow@example.com/PrepareData', } notebook_run = DatabricksSubmitRunOperator( task_id='notebook_run', new_cluster=new_cluster, notebook_task=notebook_task)
In the case where both the json parameter AND the named parameters are provided, they will be merged together. If there are conflicts during the merge, the named parameters will take precedence and override the top level
json
keys.- Currently the named parameters that
DatabricksSubmitRunOperator
supports are spark_jar_task
notebook_task
new_cluster
existing_cluster_id
libraries
run_name
timeout_seconds
Parameters: - json (dict) –
A JSON object containing API parameters which will be passed directly to the
api/2.0/jobs/runs/submit
endpoint. The other named parameters (i.e.spark_jar_task
,notebook_task
..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys. This field will be templated.See also
For more information about templating see Jinja Templating. https://docs.databricks.com/api/latest/jobs.html#runs-submit
- spark_jar_task (dict) –
The main class and parameters for the JAR task. Note that the actual JAR is specified in the
libraries
. EITHERspark_jar_task
ORnotebook_task
should be specified. This field will be templated. - notebook_task (dict) –
The notebook path and parameters for the notebook task. EITHER
spark_jar_task
ORnotebook_task
should be specified. This field will be templated. - new_cluster (dict) –
Specs for a new cluster on which this task will be run. EITHER
new_cluster
ORexisting_cluster_id
should be specified. This field will be templated. - existing_cluster_id (string) – ID for existing cluster on which to run this task.
EITHER
new_cluster
ORexisting_cluster_id
should be specified. This field will be templated. - libraries (list of dicts) –
Libraries which this run will use. This field will be templated.
- run_name (string) – The run name used for this task.
By default this will be set to the Airflow
task_id
. Thistask_id
is a required parameter of the superclassBaseOperator
. This field will be templated. - timeout_seconds (int32) – The timeout for this run. By default a value of 0 is used which means to have no timeout. This field will be templated.
- databricks_conn_id (string) – The name of the Airflow connection to use.
By default and in the common case this will be
databricks_default
. To use token based authentication, provide the keytoken
in the extra field for the connection. - polling_period_seconds (int) – Controls the rate which we poll for the result of this run. By default the operator will poll every 30 seconds.
- databricks_retry_limit (int) – Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1.
- Currently the named parameters that
GCP: Google Cloud Platform¶
Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.
Logging¶
Airflow can be configured to read and write task logs in Google cloud storage. Follow the steps below to enable Google cloud storage logging.
Airflow’s logging system requires a custom .py file to be located in the
PYTHONPATH
, so that it’s importable from Airflow. Start by creating a directory to store the config file.$AIRFLOW_HOME/config
is recommended.Create empty files called
$AIRFLOW_HOME/config/log_config.py
and$AIRFLOW_HOME/config/__init__.py
.Copy the contents of
airflow/config_templates/airflow_local_settings.py
into thelog_config.py
file that was just created in the step above.Customize the following portions of the template:
# Add this variable to the top of the file. Note the trailing slash. GCS_LOG_FOLDER = 'gs://<bucket where logs should be persisted>/' # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 'gcs.task': { 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'gcs_log_folder': GCS_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
Make sure a Google cloud platform connection hook has been defined in Airflow. The hook should have read and write access to the Google cloud storage bucket defined above in
GCS_LOG_FOLDER
.Update
$AIRFLOW_HOME/airflow.cfg
to contain:task_log_reader = gcs.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the Google cloud platform hook>
Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.
Verify that the Google cloud storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
Note the top line that says it’s reading from the remote log file.
Please be aware that if you were persisting logs to Google cloud storage using the old-style airflow.cfg configuration method, the old logs will no longer be visible in the Airflow UI, though they’ll still exist in Google cloud storage. This is a backwards incompatbile change. If you are unhappy with it, you can change the FILENAME_TEMPLATE
to reflect the old-style log filename format.
BigQuery¶
BigQuery Operators¶
- BigQueryCheckOperator : Performs checks against a SQL query that will return a single row with different values.
- BigQueryValueCheckOperator : Performs a simple value check using SQL code.
- BigQueryIntervalCheckOperator : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
- BigQueryOperator : Executes BigQuery SQL queries in a specific BigQuery database.
- BigQueryToBigQueryOperator : Copy a BigQuery table to another BigQuery table.
- BigQueryToCloudStorageOperator : Transfers a BigQuery table to a Google Cloud Storage bucket
BigQueryCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.
BigQueryCheckOperator
(sql, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]¶ Performs checks against BigQuery. The
BigQueryCheckOperator
expects a sql query that will return a single row. Each value on that first row is evaluated using pythonbool
casting. If any of the values returnFalse
the check is failed and errors out.Note that Python bool casting evals the following as
False
:False
0
- Empty string (
""
) - Empty list (
[]
) - Empty dictionary or set (
{}
)
Given a query like
SELECT COUNT(*) FROM foo
, it will fail only if the count== 0
. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.
Parameters: - sql (string) – the sql to be executed
- bigquery_conn_id (string) – reference to the BigQuery database
BigQueryValueCheckOperator¶
BigQueryIntervalCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.
BigQueryIntervalCheckOperator
(table, metrics_thresholds, date_filter_column='ds', days_back=-7, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]¶ Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
This method constructs a query like so:
- SELECT {metrics_threshold_dict_key} FROM {table}
- WHERE {date_filter_column}=<date>
Parameters: - table (str) – the table name
- days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
- metrics_threshold (dict) – a dictionary of ratios indexed by metrics, for example ‘COUNT(*)’: 1.5 would require a 50 percent or less difference between the current day, and the prior days_back.
BigQueryOperator¶
-
class
airflow.contrib.operators.bigquery_operator.
BigQueryOperator
(bql, destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=False, use_legacy_sql=True, maximum_billing_tier=None, create_disposition='CREATE_IF_NEEDED', query_params=None, *args, **kwargs)[source]¶ Executes BigQuery SQL queries in a specific BigQuery database
Parameters: - bql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql') – the sql code to be executed
- destination_dataset_table (string) – A dotted (<project>.|<project>:)<dataset>.<table> that, if set, will store the results of the query.
- write_disposition (string) – Specifies the action that occurs if the destination table already exists. (default: ‘WRITE_EMPTY’)
- create_disposition (string) – Specifies whether the job is allowed to create new tables. (default: ‘CREATE_IF_NEEDED’)
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- udf_config (list) – The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details.
- use_legacy_sql (boolean) – Whether to use legacy SQL (true) or standard SQL (false).
- maximum_billing_tier (integer) – Positive integer that serves as a multiplier of the basic price. Defaults to None, in which case it uses the value set in the project.
- query_params (dict) – a dictionary containing query parameter types and values, passed to BigQuery.
BigQueryToBigQueryOperator¶
-
class
airflow.contrib.operators.bigquery_to_bigquery.
BigQueryToBigQueryOperator
(source_project_dataset_tables, destination_project_dataset_table, write_disposition='WRITE_EMPTY', create_disposition='CREATE_IF_NEEDED', bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]¶ Copies data from one BigQuery table to another. See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy
For more details about these parameters.
Parameters: - source_project_dataset_tables (list|string) – One or more dotted (project:|project.)<dataset>.<table> BigQuery tables to use as the source data. If <project> is not included, project will be the project defined in the connection json. Use a list if there are multiple source tables.
- destination_project_dataset_table (string) – The destination BigQuery table. Format is: (project:|project.)<dataset>.<table>
- write_disposition (string) – The write disposition if the table already exists.
- create_disposition (string) – The create disposition if the table doesn’t exist.
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
BigQueryToCloudStorageOperator¶
-
class
airflow.contrib.operators.bigquery_to_gcs.
BigQueryToCloudStorageOperator
(source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=', ', print_header=True, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]¶ Transfers a BigQuery table to a Google Cloud Storage bucket.
See here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs
For more details about these parameters.
Parameters: - source_project_dataset_table (string) – The dotted (<project>.|<project>:)<dataset>.<table> BigQuery table to use as the source data. If <project> is not included, project will be the project defined in the connection json.
- destination_cloud_storage_uris (list) – The destination Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). Follows convention defined here: https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
- compression (string) – Type of compression to use.
- export_format – File format to export.
- field_delimiter (string) – The delimiter to use when extracting to a CSV.
- print_header (boolean) – Whether to print a header for a CSV file extract.
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
BigQueryHook¶
-
class
airflow.contrib.hooks.bigquery_hook.
BigQueryHook
(bigquery_conn_id='bigquery_default', delegate_to=None)[source]¶ Interact with BigQuery. This hook uses the Google Cloud Platform connection.
-
get_pandas_df
(bql, parameters=None, dialect='legacy')[source]¶ Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn’t support PEP 249 connections, except for SQLite. See:
https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900
Parameters: - bql (string) – The BigQuery SQL to execute.
- parameters (mapping or iterable) – The parameters to render the SQL query with (not used, leave to override superclass method)
- dialect (string in {'legacy', 'standard'}, default 'legacy') – Dialect of BigQuery SQL – legacy SQL or standard SQL
-
insert_rows
(table, rows, target_fields=None, commit_every=1000)[source]¶ Insertion is currently unsupported. Theoretically, you could use BigQuery’s streaming API to insert rows into a table, but this hasn’t been implemented.
-
table_exists
(project_id, dataset_id, table_id)[source]¶ Checks for the existence of a table in Google BigQuery.
Parameters: project_id – The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project. :type project_id: string :param dataset_id: The name of the dataset in which to look for the table.
storage bucket.Parameters: table_id (string) – The name of the table to check the existence of.
-
Cloud DataFlow¶
DataFlow Operators¶
- DataFlowJavaOperator : launching Cloud Dataflow jobs written in Java.
- DataFlowPythonOperator : launching Cloud Dataflow jobs written in python.
DataFlowJavaOperator¶
-
class
airflow.contrib.operators.dataflow_operator.
DataFlowJavaOperator
(jar, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.
It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.
``` default_args = {
- ‘dataflow_default_options’: {
- ‘project’: ‘my-gcp-project’, ‘zone’: ‘europe-west1-d’, ‘stagingLocation’: ‘gs://my-staging-bucket/staging/’
}
You need to pass the path to your dataflow as a file reference with the
jar
parameter, the jar needs to be a self executing jar. Useoptions
to pass on options to your job.``` t1 = DataFlowOperation(
task_id=’datapflow_example’, jar=’{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar’, options={
‘autoscalingAlgorithm’: ‘BASIC’, ‘maxNumWorkers’: ‘50’, ‘start’: ‘{{ds}}’, ‘partitionType’: ‘DAY’}, dag=my-dag)
Both
jar
andoptions
are templated so you can use variables in them.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date':
(2016, 8, 1),
'email': ['alex@vanboxel.be'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
'dataflow_default_options': {
'project': 'my-gcp-project',
'zone': 'us-central1-f',
'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
}
}
dag = DAG('test-dag', default_args=default_args)
task = DataFlowJavaOperator(
gcp_conn_id='gcp_default',
task_id='normalize-cal',
jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
options={
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '50',
'start': '{{ds}}',
'partitionType': 'DAY'
},
dag=dag)
Cloud DataProc¶
DataProc Operators¶
- DataProcPigOperator : Start a Pig query Job on a Cloud DataProc cluster.
- DataProcHiveOperator : Start a Hive query Job on a Cloud DataProc cluster.
- DataProcSparkSqlOperator : Start a Spark SQL query Job on a Cloud DataProc cluster.
- DataProcSparkOperator : Start a Spark Job on a Cloud DataProc cluster.
- DataProcHadoopOperator : Start a Hadoop Job on a Cloud DataProc cluster.
- DataProcPySparkOperator : Start a PySpark Job on a Cloud DataProc cluster.
DataProcPigOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcPigOperator
(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_pig_properties=None, dataproc_pig_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation will be passed to the cluster.
It’s a good practice to define dataproc_* parameters in the default_args of the dag like the cluster name and UDFs.
``` default_args = {
‘cluster_name’: ‘cluster-1’, ‘dataproc_pig_jars’: [
‘gs://example/udf/jar/datafu/1.2.0/datafu.jar’, ‘gs://example/udf/jar/gpig/1.2/gpig.jar’]
You can pass a pig script as string or file reference. Use variables to pass on variables for the pig script to be resolved on the cluster or use the parameters to be resolved in the script as template parameters.
``` t1 = DataProcPigOperator(
task_id=’dataproc_pig’, query=’a_pig_script.pig’, variables={‘out’: ‘gs://example/output/{{ds}}’},
DataProcHiveOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcHiveOperator
(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_hive_properties=None, dataproc_hive_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Hive query Job on a Cloud DataProc cluster.
DataProcSparkSqlOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcSparkSqlOperator
(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Spark SQL query Job on a Cloud DataProc cluster.
DataProcSparkOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcSparkOperator
(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Spark Job on a Cloud DataProc cluster.
DataProcHadoopOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcHadoopOperator
(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Hadoop Job on a Cloud DataProc cluster.
DataProcPySparkOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcPySparkOperator
(main, arguments=None, archives=None, pyfiles=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a PySpark Job on a Cloud DataProc cluster.
Cloud Datastore¶
DatastoreHook¶
-
class
airflow.contrib.hooks.datastore_hook.
DatastoreHook
(datastore_conn_id='google_cloud_datastore_default', delegate_to=None)[source]¶ Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform connection.
This object is not threads safe. If you want to make multiple requests simultaniously, you will need to create a hook per thread.
-
allocate_ids
(partialKeys)[source]¶ Allocate IDs for incomplete keys. see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
Parameters: partialKeys – a list of partial keys Returns: a list of full keys.
-
begin_transaction
()[source]¶ Get a new transaction handle see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction
Returns: a transaction handle
-
commit
(body)[source]¶ Commit a transaction, optionally creating, deleting or modifying some entities. see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit
Parameters: body – the body of the commit request Returns: the response body of the commit request
-
delete_operation
(name)[source]¶ Deletes the long-running operation
Parameters: name – the name of the operation resource
-
export_to_storage_bucket
(bucket, namespace=None, entity_filter=None, labels=None)[source]¶ Export entities from Cloud Datastore to Cloud Storage for backup
-
get_operation
(name)[source]¶ Gets the latest state of a long-running operation
Parameters: name – the name of the operation resource
-
import_from_storage_bucket
(bucket, file, namespace=None, entity_filter=None, labels=None)[source]¶ Import a backup from Cloud Storage to Cloud Datastore
-
lookup
(keys, read_consistency=None, transaction=None)[source]¶ Lookup some entities by key see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/lookup :param keys: the keys to lookup :param read_consistency: the read consistency to use. default, strong or eventual.
Cannot be used with a transaction.Parameters: transaction – the transaction to use, if any. Returns: the response body of the lookup request.
-
poll_operation_until_done
(name, polling_interval_in_seconds)[source]¶ Poll backup operation state until it’s completed
-
rollback
(transaction)[source]¶ Roll back a transaction see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback :param transaction: the transaction to roll back
-
run_query
(body)[source]¶ Run a query for entities. see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery :param body: the body of the query request :return: the batch of query results.
-
Cloud ML Engine¶
Cloud ML Engine Operators¶
- MLEngineBatchPredictionOperator : Start a Cloud ML Engine batch prediction job.
- MLEngineModelOperator : Manages a Cloud ML Engine model.
- MLEngineTrainingOperator : Start a Cloud ML Engine training job.
- MLEngineVersionOperator : Manages a Cloud ML Engine model version.
MLEngineBatchPredictionOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineBatchPredictionOperator
(project_id, job_id, region, data_format, input_paths, output_path, model_name=None, version_name=None, uri=None, max_worker_count=None, runtime_version=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Start a Google Cloud ML Engine prediction job.
NOTE: For model origin, users should consider exactly one from the three options below: 1. Populate ‘uri’ field only, which should be a GCS location that points to a tensorflow savedModel directory. 2. Populate ‘model_name’ field only, which refers to an existing model, and the default version of the model will be used. 3. Populate both ‘model_name’ and ‘version_name’ fields, which refers to a specific version of a specific model.
In options 2 and 3, both model and version name should contain the minimal identifier. For instance, call
- MLEngineBatchPredictionOperator(
- …, model_name=’my_model’, version_name=’my_version’, …)
if the desired model version is “projects/my_project/models/my_model/versions/my_version”.
Parameters: - project_id (string) – The Google Cloud project name where the prediction job is submitted.
- job_id (string) – A unique id for the prediction job on Google Cloud ML Engine.
- data_format (string) – The format of the input data. It will default to ‘DATA_FORMAT_UNSPECIFIED’ if is not provided or is not one of [“TEXT”, “TF_RECORD”, “TF_RECORD_GZIP”].
- input_paths (list of string) – A list of GCS paths of input data for batch prediction. Accepting wildcard operator *, but only at the end.
- output_path (string) – The GCS path where the prediction results are written to.
- region (string) – The Google Compute Engine region to run the prediction job in.:
- model_name (string) – The Google Cloud ML Engine model to use for prediction. If version_name is not provided, the default version of this model will be used. Should not be None if version_name is provided. Should be None if uri is provided.
- version_name (string) – The Google Cloud ML Engine model version to use for prediction. Should be None if uri is provided.
- uri (string) – The GCS path of the saved model to use for prediction. Should be None if model_name is provided. It should be a GCS path pointing to a tensorflow SavedModel.
- max_worker_count (int) – The maximum number of workers to be used for parallel processing. Defaults to 10 if not specified.
- runtime_version (string) – The Google Cloud ML Engine runtime version to use for batch prediction.
- gcp_conn_id (string) – The connection ID used for connection to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have doamin-wide delegation enabled.
- Raises:
- ValueError: if a unique model/version origin cannot be determined.
MLEngineModelOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineModelOperator
(project_id, model, operation='create', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Operator for managing a Google Cloud ML Engine model.
Parameters: - project_id (string) – The Google Cloud project name to which MLEngine model belongs.
- model (dict) –
A dictionary containing the information about the model. If the operation is create, then the model parameter should contain all the information about this model such as name.
If the operation is get, the model parameter should contain the name of the model.
- operation – The operation to perform. Available operations are: ‘create’: Creates a new model as provided by the model parameter. ‘get’: Gets a particular model where the name is specified in model.
- gcp_conn_id (string) – The connection ID to use when fetching connection info.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
MLEngineTrainingOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineTrainingOperator
(project_id, job_id, package_uris, training_python_module, training_args, region, scale_tier=None, gcp_conn_id='google_cloud_default', delegate_to=None, mode='PRODUCTION', *args, **kwargs)[source]¶ Operator for launching a MLEngine training job.
Parameters: - project_id (string) – The Google Cloud project name within which MLEngine training job should run. This field could be templated.
- job_id (string) – A unique templated id for the submitted Google MLEngine training job.
- package_uris (string) – A list of package locations for MLEngine training job, which should include the main training program + any additional dependencies.
- training_python_module (string) – The Python module name to run within MLEngine training job after installing ‘package_uris’ packages.
- training_args (string) – A list of templated command line arguments to pass to the MLEngine training program.
- region (string) – The Google Compute Engine region to run the MLEngine training job in. This field could be templated.
- scale_tier (string) – Resource tier for MLEngine training job.
- gcp_conn_id (string) – The connection ID to use when fetching connection info.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- mode (string) – Can be one of ‘DRY_RUN’/’CLOUD’. In ‘DRY_RUN’ mode, no real training job will be launched, but the MLEngine training job request will be printed out. In ‘CLOUD’ mode, a real MLEngine training job creation request will be issued.
MLEngineVersionOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineVersionOperator
(project_id, model_name, version_name=None, version=None, operation='create', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]¶ Operator for managing a Google Cloud ML Engine version.
Parameters: - project_id (string) – The Google Cloud project name to which MLEngine model belongs.
- model_name (string) – The name of the Google Cloud ML Engine model that the version belongs to.
- version_name (string) – A name to use for the version being operated upon. If not None and the version argument is None or does not have a value for the name key, then this will be populated in the payload for the name key.
- version (dict) – A dictionary containing the information about the version. If the operation is create, version should contain all the information about this version such as name, and deploymentUrl. If the operation is get or delete, the version parameter should contain the name of the version. If it is None, the only operation possible would be list.
- operation –
- The operation to perform. Available operations are:
- ’create’: Creates a new version in the model specified by model_name,
- in which case the version parameter should contain all the information to create that version (e.g. name, deploymentUrl).
- ’get’: Gets full information of a particular version in the model
- specified by model_name. The name of the version should be specified in the version parameter.
- ’list’: Lists all available versions of the model specified
- by model_name.
- ’delete’: Deletes the version specified in version parameter from the
- model specified by model_name). The name of the version should be specified in the version parameter.
type operation: string - gcp_conn_id (string) – The connection ID to use when fetching connection info.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Cloud ML Engine Hook¶
MLEngineHook¶
-
class
airflow.contrib.hooks.gcp_mlengine_hook.
MLEngineHook
(gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ -
create_job
(project_id, job, use_existing_job_fn=None)[source]¶ Launches a MLEngine job and wait for it to reach a terminal state.
Parameters: - project_id (string) – The Google Cloud project id within which MLEngine job will be launched.
- job (dict) –
MLEngine Job object that should be provided to the MLEngine API, such as: {
’jobId’: ‘my_job_id’, ‘trainingInput’: {’scaleTier’: ‘STANDARD_1’, …}
}
- use_existing_job_fn (function) – In case that a MLEngine job with the same job_id already exist, this method (if provided) will decide whether we should use this existing job, continue waiting for it to finish and returning the job object. It should accepts a MLEngine job object, and returns a boolean value indicating whether it is OK to reuse the existing job. If ‘use_existing_job_fn’ is not provided, we by default reuse the existing MLEngine job.
Returns: The MLEngine job object if the job successfully reach a terminal state (which might be FAILED or CANCELLED state).
Return type: dict
-
create_version
(project_id, model_name, version_spec)[source]¶ Creates the Version on Google Cloud ML Engine.
Returns the operation if the version was created successfully and raises an error otherwise.
-
delete_version
(project_id, model_name, version_name)[source]¶ Deletes the given version of a model. Blocks until finished.
-
Cloud Storage¶
Storage Operators¶
- GoogleCloudStorageDownloadOperator : Downloads a file from Google Cloud Storage.
- GoogleCloudStorageToBigQueryOperator : Loads files from Google cloud storage into BigQuery.
GoogleCloudStorageDownloadOperator¶
-
class
airflow.contrib.operators.gcs_download_operator.
GoogleCloudStorageDownloadOperator
(bucket, object, filename=False, store_to_xcom_key=False, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, *args, **kwargs)[source]¶ Downloads a file from Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to download in the Google cloud storage bucket.
- filename (string) – The file path on the local file system (where the operator is being executed) that the file should be downloaded to. If false, the downloaded data will not be stored on the local file system.
- store_to_xcom_key (string) – If this param is set, the operator will push the contents of the downloaded file to XCom with the key set in this parameter. If false, the downloaded data will not be pushed to XCom.
- google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
GoogleCloudStorageToBigQueryOperator¶
-
class
airflow.contrib.operators.gcs_to_bq.
GoogleCloudStorageToBigQueryOperator
(bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=', ', max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, max_id_key=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, schema_update_options=(), src_fmt_configs={}, *args, **kwargs)[source]¶ Loads files from Google cloud storage into BigQuery.
GoogleCloudStorageHook¶
-
class
airflow.contrib.hooks.gcs_hook.
GoogleCloudStorageHook
(google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None)[source]¶ Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.
-
copy
(source_bucket, source_object, destination_bucket=None, destination_object=None)[source]¶ Copies an object from a bucket to another, with renaming if requested.
destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both.
Parameters: - bucket (string) – The bucket of the object to copy from.
- object (string) – The object to copy.
- destination_bucket (string) – The destination of the object to copied to. Can be omitted; then the same bucket is used.
- destination_object – The (renamed) path of the object if given. Can be omitted; then the same name is used.
-
delete
(bucket, object, generation=None)[source]¶ Delete an object if versioning is not enabled for the bucket, or if generation parameter is used.
Parameters: - bucket (string) – name of the bucket, where the object resides
- object (string) – name of the object to delete
- generation (string) – if present, permanently delete the object of this generation
Returns: True if succeeded
-
download
(bucket, object, filename=False)[source]¶ Get a file from Google Cloud Storage.
Parameters: - bucket (string) – The bucket to fetch from.
- object (string) – The object to fetch.
- filename (string) – If set, a local file path where the file should be written to.
-
exists
(bucket, object)[source]¶ Checks for the existence of a file in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
-
is_updated_after
(bucket, object, ts)[source]¶ Checks if an object is updated in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
- ts (datetime) – The timestamp to check against.
-
list
(bucket, versions=None, maxResults=None, prefix=None)[source]¶ List all objects from the bucket with the give string prefix in name
Parameters: - bucket (string) – bucket name
- versions (boolean) – if true, list all versions of the objects
- maxResults (integer) – max count of items to return in a single page of responses
- prefix (string) – prefix string which filters objects whose name begin with this prefix
Returns: a stream of object names matching the filtering criteria
-
upload
(bucket, object, filename, mime_type='application/octet-stream')[source]¶ Uploads a local file to Google Cloud Storage.
Parameters: - bucket (string) – The bucket to upload to.
- object (string) – The object name to set when uploading the local file.
- filename (string) – The local file path to the file to be uploaded.
- mime_type (string) – The MIME type to set when uploading the file.
-