Data engineers, scientists, and analysts working with big data often rely on PySpark to build ETL (Extract-Transform-Load) pipelines. These pipelines typically involve a series of transformations applied to raw data to clean, enrich, and prepare it for consumption by BI tools, machine learning models, or downstream applications.
In many code bases or notebooks in Databricks, it's common to see a long chain of DataFrame operations, each performing a specific transformation. While this approach works, it can quickly become difficult to read, debug, or reuse, especially as the number of transformations grows.
Here's an example of what those transformations might look like in practice:
from pyspark.sql.functions import col, when,lower, concat, floor, lit, date_format, regexp_replace, datediff, current_date, round
# Declare variables
my_catalog = "malcoln" # Your Catalog
my_schema = "blog" # Your Schema
my_volume = "raw_data" # Your Volume Name
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Read data
df = spark.read.option("multiline", True).json(volume_path)
# Apply transformations (long chain of transformations)
df_customer = (
df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
.withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
.filter(col("age") >= 18)
.withColumn("email", lower(regexp_replace(col("email"), "[^a-zA-Z0-9@.]", "")))
.withColumn(
"age_group",
when(col("age") < 25, "young_adult")
.when(col("age") < 65, "adult")
.otherwise("senior"),
)
.withColumn("purchase_date", date_format(col("purchase_timestamp"), "yyyy-MM-dd"))
.withColumn("total_with_tax", round(col("purchase_amount") * 1.15, 2))
)
Chaining methods this way works, but what happens six months down the line when requirements change and the transformation logic needs to be updated? Or when you have a new teammate who will be maintaining your code? What if you want to unit-test your transformations in a CI/CD pipeline? And what happens when there is a request to ingest data from another data source and apply the same transformations?
ETL Code, like the one in the example above, solves problems quickly but can create new issues, such as expensive, risky, and difficult future enhancements and bug fixes. Maintainable ETL extends beyond functional code, providing greater flexibility as requirements change, lowered long-term maintenance costs, and readable, flexible code that eases knowledge transfer.
The key principles of maintainable code include elements from the software development guidelines SOLID and DRY, namely:
The transform pattern allows you to apply these principles to your ETL workloads.
The transform pattern resolves around the DataFrame.transform() method of Apache Sparkâ„¢, which allows you to apply a function to a DataFrame and return a new DataFrame.
Here is an example of how to use the transform method:
# Imports
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, floor, datediff, current_date
# Create a function that recieves a dataframe, apply the transformation and returns a dataframe
def add_customer_age(df: DataFrame) -> DataFrame:
""" Create a new column with the age"""
df = df.withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
return df
# Apply transformation
df_customer = (df
.transform(add_customer_age)
)
With the transform pattern, it is clear what transformations are applied through the function name.
Let’s add another transformation function:
from pyspark.sql.functions import when
# Creating our second function
def add_customer_age_group(df: DataFrame) -> DataFrame:
""" Create a new column with the age group, based on business rules """
df = df.withColumn("age_group",when(col("age") < 25, "young_adult")
.when(col("age") < 65, "adult")
.otherwise("senior"))
return df
# Apply both transformations
df_customer = (
df
.transform(add_customer_age) # apply the first transformation
.transform(add_customer_age_group) # apply the new transformation
)
As we can see, it's way more readable and easier to understand what's happening. It also makes development more straightforward because you can test and debug your ETL layer by layer.
This pattern enables you to:
In the examples above, we saw how to use the transform pattern and even chain transformations together. But what if multiple engineers or users must apply the same business rules to different tables/datasets?
The transform pattern also helps with this need, as we can define common transformations in functions and distribute them via a notebook or even a Python package. First, create a folder called utils in a shared location and create a file named spark_transform_functions.py. In this file, add the following functions to format column names and a prefix indicating the column’s data type.
This should be created inside a Git Folder in Databricks.
# File path: utils/spark_transform_functions.py
# Imports
from pyspark.sql.functions import DataFrame
from pyspark.sql.functions import col, floor, datediff, current_date, when
def add_customer_age(df: DataFrame) -> DataFrame:
""" Create a new column with the age"""
df = df.withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
return df
def add_customer_age_group(df: DataFrame) -> DataFrame:
""" Create a new column with the age group, based on business rules """
df = df.withColumn("age_group",when(col("age") < 25, "young_adult")
.when(col("age") < 65, "adult")
.otherwise("senior"))
return df
def clean_column_names(df: DataFrame) -> DataFrame:
"""Transform function to clean column names by removing spaces and special characters"""
for column in df.columns:
new_column = column.lower().replace(" ", "_").replace("-", "_")
df = df.withColumnRenamed(column, new_column)
return df
def add_column_prefixes(df: DataFrame) -> DataFrame:
"""Transform function to add standardized prefixes to column names based on data type"""
prefix_mapping = {
'date': 'dt_',
'timestamp': 'dt_',
'string': 'str_',
'int': 'num_',
'double': 'num_',
'float': 'num_',
'boolean': 'bool_'
}
for column in df.columns:
data_type = df.schema[column].dataType.simpleString().lower()
prefix = next((v for k, v in prefix_mapping.items() if k in data_type), '')
if not column.startswith(prefix):
new_column = f"{prefix}{column}"
df = df.withColumnRenamed(column, new_column)
return df
Now, other engineers and users can reuse the existing function by importing the Python module:
# File path: main_notebook.ipynb
# Imports
from utils.spark_transform_functions import clean_column_names
# Read data
df = spark.read.option("multiline", True).json(volume_path)
# Apply the transformations
df_customer = (
df
.transform(clean_column_names)
)
*You can also create a __init__.py file in the utils folder to import the functions without the filename or make your utils a Python package so you can use a tool like pip to install and even distribute. Learn more about using Python modules with Databricks here.
Another great thing about this pattern is that it can be reused across Batch and Streaming use cases.
Let's reuse our age_group function for both Batch and Streaming data processing.
In this example, we will read JSON files from a Volume to process large amounts of data in a scheduled or ad hoc manner.
# Imports
from utils.spark_transform_functions import add_customer_age, add_customer_age_group
# Declare variables
my_catalog = "malcoln" # Your Catalog
my_schema = "blog" # Your Schema
my_volume = "raw_data" # Your Volume Name
my_table = "test_blog" # "<table-name>"
files_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
table_path = f"{my_catalog}.{my_schema}.{my_table}"
# Read JSON data
df = spark.read.option("multiline", True).json(volume_path)
# Apply transformation
processed_df = (
df
.transform(add_customer_age)
.transform(add_customer_age_group)
)
# Write to Delta table
processed_df.write.mode("overwrite").saveAsTable(table_path)
In the following example, we read the same directory continuously, leveraging the Databricks Auto Loader functionality.
from utils.spark_transform_functions import add_customer_age, add_customer_age_group
# Begin streaming data
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{volume_path}/schema")
.option("multiline", "true")
.load(volume_path)
)
# Apply transformation
processed_df = df.transform(add_customer_age).transform(add_customer_age_group)
# Write to Delta table
(
processed_df.writeStream.format("delta")
.option("checkpointLocation", f"{volume_path}/checkpoint")
.trigger(availableNow=True)
.outputMode("append")
.toTable(my_table + "_streaming")
)
import dlt
from utils.spark_transform_functions import add_customer_age, add_customer_age_group
# Declare variables
my_catalog = "malcoln" # "<catalog-name>"
my_schema = "blog" # "<schema-name>"
my_volume = "raw_data" # "<volume-name>"
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define landing - bronze table
@dlt.table
def bronze_layer():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{volume_path}/schema")
.option("multiline", "true")
.load(volume_path)
)
# Define table with transformation - silver table
@dlt.table()
def silver_layer():
return (dlt.read("bronze_layer")
.transform(add_customer_age)
.transform(add_customer_age_group)
)
As shown in the examples above, leveraging the transform pattern makes ETL modular and easier to maintain by creating standard, reusable modules and libraries.
The transform pattern is a powerful way to make ETL modular, reusable, and maintainable. In this blog post, we showed how to apply this pattern in batch and streaming use cases in Spark and even showed that it can be used in DLT pipelines.
Always remember that the key to success using this pattern is to keep your transformations modular, reusable, and tested.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.