cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
greg-hansen
Databricks Employee
Databricks Employee

Intro

Databricks customers use Structured Streaming to drive critical business functions like equipment monitoring, fraud detection, and inventory management. Reliability is a key design factor for these workloads. Engineers design streaming jobs for consistent performance and minimal downtime. Performance monitoring allows engineers to proactively address issues before an outage or failure.

Monitoring streaming workloads, however, creates unique challenges. Streaming query performance often depends on several metrics. Engineers may want to understand the processed offsets, throughput, batch duration, or state memory when evaluating a streaming query. These metrics must be updated continuously with minimal impact on the workload’s performance.

Spark’s StreamingQueryListener provides an extensible, fault-tolerant interface for monitoring streaming performance. In this blog, we’ll discuss the StreamingQueryListener implementation and show how to set-up StreamingQueryListener for production jobs.

Streaming query metrics

Every streaming query creates metrics with valuable information about streaming progress. This includes several identifiers—the query ID, run ID, and a configurable query name. After processing an incremental batch of data, queries emit default metrics like the batch duration and processed rows per second. A sample QueryProgressEvent is shown below.

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
...
}

QueryProgressEvents also include details about the streaming source and sink. Kafka sources, for example, provide the latest processed and observed offsets for each partition and the number of offsets in the streaming backlog. We can use offset information to understand processing performance across each partition.

{
...,
"sources" : [{
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_001]]",
    "startOffset" : {
      "KAFKA_TOPIC_001" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_001" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_001" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }]
}

By tracking these metrics over time, we can proactively improve streaming performance. We may, for example, identify increasing input volume by looking at the source offsets. To increase throughput, we could tune the cluster size or source configuration. To ensure a stateful query has bounded memory consumption, we could add a watermark to limit the data held in state.

Streaming query execution

Streaming metrics can be processed by a StreamingQueryListener during query execution. Each streaming query follows a consistent pattern: we define a streaming DataFrame, specify a sink format and options, and call start() to begin execution. Internally, Spark registers the stream in its StreamingQueryManager and creates a StreamExecution object responsible for running the query.

Streaming query execution modelStreaming query execution modelWhen the stream is triggered, StreamExecution processes an incremental batch of data from the source and creates a QueryProgressEvent. This event is asynchronously posted to Spark’s StreamingQueryListenerBus where listeners are registered. Received events trigger each listener’s event handlers.

A streaming query listener event in Spark’s driver logsA streaming query listener event in Spark’s driver logs

Each StreamingQueryListener runs on a driver thread and serially processes events received from the StreamingQueryListenerBus. This has several important consequences:

  • Failure of a StreamingQueryListener will not disrupt the streaming query
  • Failure of a StreamingQueryListener will not disrupt another StreamingQueryListener
  • When a StreamingQueryListener cannot keep up with arriving listener events, those events will be dropped

Because the Spark driver is responsible for executing StreamingQueryListener methods, listeners with complex event-handling logic may overwhelm the driver’s resources.  By keeping our StreamingQueryListener implementation lightweight, we can continuously monitor streaming metrics and avoid impacting the ongoing streaming query.

Creating a StreamingQueryListener

Streaming listeners extend the StreamingQueryListener interface. A listener must implement methods to handle various query progress events.

from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.listener import QueryStartedEvent, QueryProgressEvent, QueryTerminatedEvent

class EventHubListener(StreamingQueryListener):
    ...
    def onQueryStarted(self, event: QueryStartedEvent):
        # Handles a QueryStartedEvent when the stream starts
        ...

    def onQueryProgress(self, event: QueryProgressEvent):
        # Handles a QueryProgressEvent whenever a streaming batch finishes
        ...

    def onQueryTerminated(self, event: QueryTerminatedEvent):
        # Handles a QueryTerminatedEvent when the stream is stopped or cancelled
        ...

Production streaming query listeners should avoid complex processing logic and forward metrics to low-latency sinks like queues or logging services. This reduces resource contention and ensures minimal impact on the streaming query performance. The example below forwards streaming metrics to an Azure Event Hub using a static EventHubProducerClient.

from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.listener import QueryStartedEvent, QueryProgressEvent, QueryTerminatedEvent
from azure.eventhub import EventHubProducerClient, EventData

class EventHubQueryListener(StreamingQueryListener):
  """ Forwards structured streaming query metrics to Azure Event Hubs."""
  _connection_string: str
  _event_hub_name: str
  _event_hub_client: EventHubProducerClient

  def __init__(self, connection_string: str, event_hub_name: str):
    """ Initializes the EventHubQueryListener by creating the Event Hub client."""
    self._connection_string = connection_string
    self._event_hub_name = event_hub_name
    self._event_hub_client = EventHubProducerClient.from_connection_string(
      conn_str=self._connection_string,
      eventhub_name=self._event_hub_name
    )

  def sendMessage(self, message: str):
    """ Structures and sends a message to Event Hub."""
    event_batch = self._event_hub_client.create_batch()
    event_batch.add(EventData(message))
    self._event_hub_client.send_batch(event_batch)
  
  def onQueryProgress(self, event: QueryProgressEvent):
    """ Sends an Event Hub message when a micro batch completes."""
    self.sendMessage(str(event.progress))

  ...

StreamingQueryListener subclasses can be added in code to the listener bus using Spark’s StreamingQueryManager. Because listeners process events asynchronously, they can be added before, during, or after a stream is started.

# Instantiate the listener:
connection_string = "..."
event_hub_name = "..."
event_hub_listener = EventHubListener(connection_string, event_hub_name)

# Register the listener using the StreamingQueryManager:
spark.streams.addListener(event_hub_listener)

Standardizing StreamingQueryListeners across workloads

By reusing a shared streaming listener implementation, operations teams can achieve consistent observability across streaming workloads with minimal development overhead. This is especially useful for teams monitoring many streaming queries. Streaming metrics can be incorporated into a larger monitoring process. We could, for example:

  • Read listener metrics from multiple streams from a single Event Hub
  • Process and write the metrics to a Delta table in Unity Catalog
  • Use Databricks’ SQL alerts and AI/BI dashboards for real-time incident response and observability

An architecture for centralizing performance metrics using a shared StreamingQueryListenerAn architecture for centralizing performance metrics using a shared StreamingQueryListenerWhen we add our listener class to our cluster’s spark.sql.streaming.streamingQueryListeners, it will be registered by default whenever a notebook or job is attached to the cluster. Our custom StreamingQueryListener class will provide out-of-the-box observability without requiring engineers to implement or register listeners in code!

Creating a StreamingQueryListener JAR

To register our EventHubListener as a Spark listener, we first need to package the subclass as a JAR. Most Java or Scala IDEs have tooling to streamline this process. We can use Maven to package our EventHubListener into a JAR with all the necessary dependencies.

Packaging a JAR with Maven relies on configuration in a pom.xml file. This includes information about the project including its target runtime environment and dependencies. When packaging a JAR for use on Databricks:

  • Compile against the Spark, Java, and Scala versions used in your cluster’s Databricks runtime version to ensure compatibility
  • Set any dependencies provided by the Databricks Runtime to have a provided scope to minimize the JAR size
  • Ensure you include any necessary dependencies when creating the JAR

Once you’ve created your JAR, you can upload it to a Unity Catalog volume or as a file in your Databricks workspace. The JAR can be installed on Databricks clusters as a cluster-scoped library or by using a cluster-scoped init script.

Loading the JAR into Spark’s class path

Databricks’ Spark environment is pre-loaded with JARs providing various functionality. To use our listener class, we need to load the JAR into the Spark class path using a basic shell command.

# ------------------------------------------------------
# Copies the EventHubListener JAR into /databricks/jars.
# This ensures our listener class is loaded in the Spark
# environment when we start a Databricks cluster.
# ------------------------------------------------------

cp /Volumes/org/streaming/artifacts/java/jars/eventhublistener-1.0-SNAPSHOT-jar-with-dependencies.jar /databricks/jars

Our script can be uploaded to a Unity Catalog volume or as a workspace file. We can set this as a cluster-scoped init script in the advanced settings of our cluster configuration.Setting up an init script using the cluster configuration UISetting up an init script using the cluster configuration UI

Setting the Spark configuration

We can now add our listener to the default list of streaming query listeners. We can set the  spark.sql.streaming.streamingQueryListeners <fully qualified class name> to our cluster’s Spark configuration.Adding a default listener to the Spark configurationAdding a default listener to the Spark configuration

 Our listener will be added to the StreamingQueryListenerBus whenever a SparkSession is created!

Creating a compute policy

Databricks compute policies control configuration for Databricks clusters created in the web portal or with Databricks Asset Bundles. Our init script and Spark configuration can be added as fixed values in the policy settings.

{
  "init_scripts.0.workspace.destination": {
    "type": "fixed",
    "value": "/Users/[email protected]/install_streaming_listener.sh"
  },
  "spark_conf.spark.sql.streaming.streamingQueryListeners": {
    "type": "fixed",
    "value": "com.dbfe.EventHubListener"
  }
}

When a Databricks cluster is created with this compute policy, the cluster will automatically use the init script and Spark configuration needed to install our StreamingQueryListener. Streaming queries will forward metrics to our monitoring Event Hub by default. These metrics can be standardized and monitored across streaming jobs for an entire organization!

Conclusion

Monitoring streaming jobs is critical to ensuring workloads run reliably. Spark’s StreamingQueryListener provides a low-impact, fault-tolerant solution for forwarding metrics to event queues or logging services. By standardizing streaming query listeners across job deployments, we can minimize development overhead, centralize metrics, and create a complete view of streaming performance across an organization.

Check out our Git repo for an end-to-end example forwarding StreamingQueryListener metrics to Azure Event Hubs. For more on streaming observability, check out our product blog on the StateReader API.

OSZAR »