Platform Engineering Team/Data Value Stream/Airflow Coding Convention

This page describes the coding conventions of batch jobs, such as algorithms, intended to run on Platform Engineering’s Airflow instance. To see coding conventions for Python please look at: Platform Engineering Team/Data Value Stream/Data Pipeline Onboarding/#Python Code Style

Jupyter Notebooks edit

While the use of Jupyter notebooks is expected during development it is NOT recommended for use on Airflow instances. We recommend users convert Jupyter notebooks into executable scripts.

To convert a Jupyter notebook into a script use `nbconvert` in the command line:

$ jupyter nbconvert --to script notebook.ipynb

This will convert the Jupyter notebook file notebook.ipynb into an executable script.

Spark edit

We recommend using Spark’s cluster mode to run jobs in the Airflow instance. As of the writing of this document, wmfdata does not support the use of cluster mode. Hence we recommend initializing Spark directly:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Local computations and storage edit

The airflow instance has limited memory and storage that is shared among all airflow jobs. That is why we recommend all computations and storage should not be done locally.

Computation edit

We recommend running computations on Spark instead of locally.

For example, instead of pulling the results down locally using pandas and running the computations locally:

# Function to transform a string to uppercase
def to_upper(s):
    if s is not None:
        return s.upper()

# Pull down data locally and convert to pandas dataframe
df=spark.sql(query).toPandas()

# Apply to_upper() to items from column_1
results=[]
for row in df['column_1']:
    results.append(to_upper(row))

We recommend transforming functions into Spark’s User Defined Functions (UDFs) and then applying the changes to the column(s):

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create user defined function to transform a string to uppercase
@udf(returnType=StringType())
def to_upper(s):
    if s is not None:
        return s.upper()

# Cache results of spark query without pulling it down locally
df=spark.sql(query).cache()

# Apply to_upper() to items from column_1
df.withColumn(
    "results", to_upper(F.col("column_1"))
).select("results")

Storage edit

We recommend saving files on HDFS. For example, instead of saving a data frame as a file like shown below:

df=spark.sql(query).toPandas()

# Saves file locally to output_directory
df.to_csv(output_directory)

We recommend saving files directly to HDFS:

df=spark.sql(query).cache()

# Saves file to hdfs_location
df.write.csv("hdfs_location")

Idempotency edit

We recommend making all airflow jobs idempotent. In practice, this generally means making sure any randomized operations are seeded and can be reproduced.

Additional Resources: edit