cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Malcoln
Databricks Employee
Databricks Employee

Data engineers, scientists, and analysts working with big data often rely on PySpark to build ETL (Extract-Transform-Load) pipelines. These pipelines typically involve a series of transformations applied to raw data to clean, enrich, and prepare it for consumption by BI tools, machine learning models, or downstream applications.

In many code bases or notebooks in Databricks, it's common to see a long chain of DataFrame operations, each performing a specific transformation. While this approach works, it can quickly become difficult to read, debug, or reuse, especially as the number of transformations grows.

Here's an example of what those transformations might look like in practice:

from pyspark.sql.functions import col, when,lower, concat, floor, lit, date_format, regexp_replace, datediff, current_date, round

# Declare variables
my_catalog = "malcoln"  # Your Catalog
my_schema = "blog"  # Your Schema
my_volume = "raw_data"  # Your Volume Name
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Read data
df = spark.read.option("multiline", True).json(volume_path)

# Apply transformations (long chain of transformations)
df_customer = (
    df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
    .withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
    .filter(col("age") >= 18)
    .withColumn("email", lower(regexp_replace(col("email"), "[^a-zA-Z0-9@.]", "")))
    .withColumn(
        "age_group",
        when(col("age") < 25, "young_adult")
        .when(col("age") < 65, "adult")
        .otherwise("senior"),
    )
    .withColumn("purchase_date", date_format(col("purchase_timestamp"), "yyyy-MM-dd"))
    .withColumn("total_with_tax", round(col("purchase_amount") * 1.15, 2))
)

Chaining methods this way works, but what happens six months down the line when requirements change and the transformation logic needs to be updated? Or when you have a new teammate who will be maintaining your code? What if you want to unit-test your transformations in a CI/CD pipeline? And what happens when there is a request to ingest data from another data source and apply the same transformations?

ETL Code, like the one in the example above, solves problems quickly but can create new issues, such as expensive, risky, and difficult future enhancements and bug fixes. Maintainable ETL extends beyond functional code, providing greater flexibility as requirements change, lowered long-term maintenance costs, and readable, flexible code that eases knowledge transfer. 

The key principles of maintainable code include elements from the software development guidelines SOLID and DRY, namely:

  • Assign only one responsibility to a function or data pipeline.
  • Make your code extensible for new features without rewriting the existing code and breaking the logic.
  • Design data pipelines with flexibility and robustness, ensuring they can seamlessly integrate new data sources and accommodate evolving transformation requirements without breaking or disrupting existing workflows.
  • Break big processing tasks into smaller pieces so that one function does not handle unnecessary tasks.
  • Design your code to depend on general rules so it can be reused with different inputs and tools.
  • Add commonly used logic into a function to reduce maintenance effort, mistakes, and inconsistencies.

The transform pattern allows you to apply these principles to your ETL workloads.

What is the transform pattern?

The transform pattern resolves around the DataFrame.transform() method of Apache Sparkâ„¢, which allows you to apply a function to a DataFrame and return a new DataFrame. 

Here is an example of how to use the transform method:

# Imports
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, floor, datediff, current_date

# Create a function that recieves a dataframe, apply the transformation and returns a dataframe

def add_customer_age(df: DataFrame) -> DataFrame:
    """ Create a new column with the age"""
    df = df.withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
    
    return df

# Apply transformation
df_customer = (df
               .transform(add_customer_age)
)

 With the transform pattern, it is clear what transformations are applied through the function name.

Let’s add another transformation function:

from pyspark.sql.functions import when

# Creating our second function
def add_customer_age_group(df: DataFrame) -> DataFrame:
	""" Create a new column with the age group, based on business rules """
	df = df.withColumn("age_group",when(col("age") < 25, "young_adult")
                        .when(col("age") < 65, "adult")
                        .otherwise("senior"))
	return df

# Apply both transformations
df_customer = (
df
.transform(add_customer_age) # apply the first transformation
.transform(add_customer_age_group) # apply the new transformation
)

As we can see, it's way more readable and easier to understand what's happening. It also makes development more straightforward because you can test and debug your ETL layer by layer.

This pattern enables you to:

  • Chain multiple transformations together
  • Reuse transformation logic across different projects
  • Make your code more readable and maintainable
  • Structure your ETL code more like modular, testable software, following practices familiar to software engineers

How to create reusable transformations

In the examples above, we saw how to use the transform pattern and even chain transformations together. But what if multiple engineers or users must apply the same business rules to different tables/datasets?

The transform pattern also helps with this need, as we can define common transformations in functions and distribute them via a notebook or even a Python package. First, create a folder called utils in a shared location and create a file named spark_transform_functions.py. In this file, add the following functions to format column names and a prefix indicating the column’s data type.

This should be created inside a Git Folder in Databricks.

# File path: utils/spark_transform_functions.py

# Imports
from pyspark.sql.functions import DataFrame
from pyspark.sql.functions import col, floor, datediff, current_date, when

def add_customer_age(df: DataFrame) -> DataFrame:
    """ Create a new column with the age"""
    df = df.withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
    
    return df

def add_customer_age_group(df: DataFrame) -> DataFrame:
	""" Create a new column with the age group, based on business rules """
	df = df.withColumn("age_group",when(col("age") < 25, "young_adult")
                        .when(col("age") < 65, "adult")
                        .otherwise("senior"))
	return df

def clean_column_names(df: DataFrame) -> DataFrame:
    """Transform function to clean column names by removing spaces and special characters"""
    for column in df.columns:
        new_column = column.lower().replace(" ", "_").replace("-", "_")
        df = df.withColumnRenamed(column, new_column)
    return df

def add_column_prefixes(df: DataFrame) -> DataFrame:
    """Transform function to add standardized prefixes to column names based on data type"""
    prefix_mapping = {
        'date': 'dt_',
        'timestamp': 'dt_',
        'string': 'str_', 
        'int': 'num_',
        'double': 'num_',
        'float': 'num_',
        'boolean': 'bool_'
    }
    
    for column in df.columns:
        data_type = df.schema[column].dataType.simpleString().lower()
        prefix = next((v for k, v in prefix_mapping.items() if k in data_type), '')
        if not column.startswith(prefix):
            new_column = f"{prefix}{column}"
            df = df.withColumnRenamed(column, new_column)
    return df

Now, other engineers and users can reuse the existing function by importing the Python module: 

# File path: main_notebook.ipynb 

# Imports
from utils.spark_transform_functions import clean_column_names 
# Read data 

df = spark.read.option("multiline", True).json(volume_path)

# Apply the transformations 
df_customer = (
df
.transform(clean_column_names)
)

*You can also create a __init__.py file in the utils folder to import the functions without the filename or make your utils a Python package so you can use a tool like pip to install and even distribute. Learn more about using Python modules with Databricks here.

Use the same pattern for Batch and Streaming

Another great thing about this pattern is that it can be reused across Batch and Streaming use cases. 

Let's reuse our age_group function for both Batch and Streaming data processing.

Batch

In this example, we will read JSON files from a Volume to process large amounts of data in a scheduled or ad hoc manner.

# Imports
from utils.spark_transform_functions import add_customer_age, add_customer_age_group

# Declare variables
my_catalog = "malcoln"  # Your Catalog
my_schema = "blog"  # Your Schema
my_volume = "raw_data"  # Your Volume Name
my_table = "test_blog" # "<table-name>"

files_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
table_path = f"{my_catalog}.{my_schema}.{my_table}"
# Read JSON data
df = spark.read.option("multiline", True).json(volume_path)

# Apply transformation
processed_df = (
     df
    .transform(add_customer_age)
    .transform(add_customer_age_group)
)
# Write to Delta table
processed_df.write.mode("overwrite").saveAsTable(table_path)

Streaming

In the following example, we read the same directory continuously, leveraging the Databricks Auto Loader functionality.

from utils.spark_transform_functions import add_customer_age, add_customer_age_group

# Begin streaming data
df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{volume_path}/schema")
    .option("multiline", "true")
    .load(volume_path)
)

# Apply transformation
processed_df = df.transform(add_customer_age).transform(add_customer_age_group)

# Write to Delta table
(
    processed_df.writeStream.format("delta")
    .option("checkpointLocation", f"{volume_path}/checkpoint")
    .trigger(availableNow=True)
    .outputMode("append")
    .toTable(my_table + "_streaming")
)

Bonus: Applying the same pattern using DLT

import dlt
from utils.spark_transform_functions import add_customer_age, add_customer_age_group

# Declare variables
my_catalog = "malcoln"  # "<catalog-name>"
my_schema = "blog"  # "<schema-name>"
my_volume = "raw_data"  # "<volume-name>"
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define landing - bronze table
@dlt.table
def bronze_layer():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", f"{volume_path}/schema")
        .option("multiline", "true")
        .load(volume_path)
    )

# Define table with transformation - silver table

@dlt.table()
def silver_layer():
    return (dlt.read("bronze_layer")
            .transform(add_customer_age)
            .transform(add_customer_age_group)
    )

As shown in the examples above, leveraging the transform pattern makes ETL modular and easier to maintain by creating standard, reusable modules and libraries.

Best Practices and Recommendations

  • Keep it simple and performant: Don't use the transform pattern if you only do a sum or rename one column. Usually, transformations that you can write in one line, simple or non-reusable, should be implicit. On the other hand, avoid using the transform pattern with heavy transformations that rely on different rows of data, such as window functions and aggregations with group bys. Using the pattern for these transformations, especially for streaming workloads, can lead to hidden complexity, slower jobs, incorrect results, and failed pipelines. The transform pattern is a better fit for transformations like filtering and adding new columns that occur independently of other rows.
  • Type Hints: Always use Python Type Hints and Docstrings when creating your reusable functions to document what the function does, what it returns, and how to use it.
  • Single Responsibility: We recommend creating functions using the single-responsibility principle, which states that each module should have only one role. Each function should only make one change and be chained together using the transform method instead of creating one function with multiple transformations.
  • Testing: This pattern simplifies unit testing of the transformation code, as it's as simple as testing the individual functions. To make ETL workloads more robust in the long term, we recommend a testing framework like pytest.

Conclusion

The transform pattern is a powerful way to make ETL modular, reusable, and maintainable. In this blog post, we showed how to apply this pattern in batch and streaming use cases in Spark and even showed that it can be used in DLT pipelines.

Always remember that the key to success using this pattern is to keep your transformations modular, reusable, and tested. 

Additional resources

2 Comments
OSZAR »