Databricks customers use Structured Streaming to drive critical business functions like equipment monitoring, fraud detection, and inventory management. Reliability is a key design factor for these workloads. Engineers design streaming jobs for consistent performance and minimal downtime. Performance monitoring allows engineers to proactively address issues before an outage or failure.
Monitoring streaming workloads, however, creates unique challenges. Streaming query performance often depends on several metrics. Engineers may want to understand the processed offsets, throughput, batch duration, or state memory when evaluating a streaming query. These metrics must be updated continuously with minimal impact on the workload’s performance.
Spark’s StreamingQueryListener provides an extensible, fault-tolerant interface for monitoring streaming performance. In this blog, we’ll discuss the StreamingQueryListener implementation and show how to set-up StreamingQueryListener for production jobs.
Every streaming query creates metrics with valuable information about streaming progress. This includes several identifiers—the query ID, run ID, and a configurable query name. After processing an incremental batch of data, queries emit default metrics like the batch duration and processed rows per second. A sample QueryProgressEvent is shown below.
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
...
}
QueryProgressEvents also include details about the streaming source and sink. Kafka sources, for example, provide the latest processed and observed offsets for each partition and the number of offsets in the streaming backlog. We can use offset information to understand processing performance across each partition.
{
...,
"sources" : [{
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_001]]",
"startOffset" : {
"KAFKA_TOPIC_001" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_001" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_001" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}]
}
By tracking these metrics over time, we can proactively improve streaming performance. We may, for example, identify increasing input volume by looking at the source offsets. To increase throughput, we could tune the cluster size or source configuration. To ensure a stateful query has bounded memory consumption, we could add a watermark to limit the data held in state.
Streaming metrics can be processed by a StreamingQueryListener during query execution. Each streaming query follows a consistent pattern: we define a streaming DataFrame, specify a sink format and options, and call start() to begin execution. Internally, Spark registers the stream in its StreamingQueryManager and creates a StreamExecution object responsible for running the query.
Streaming query execution modelWhen the stream is triggered, StreamExecution processes an incremental batch of data from the source and creates a QueryProgressEvent. This event is asynchronously posted to Spark’s StreamingQueryListenerBus where listeners are registered. Received events trigger each listener’s event handlers.
A streaming query listener event in Spark’s driver logs
Each StreamingQueryListener runs on a driver thread and serially processes events received from the StreamingQueryListenerBus. This has several important consequences:
Because the Spark driver is responsible for executing StreamingQueryListener methods, listeners with complex event-handling logic may overwhelm the driver’s resources. By keeping our StreamingQueryListener implementation lightweight, we can continuously monitor streaming metrics and avoid impacting the ongoing streaming query.
Streaming listeners extend the StreamingQueryListener interface. A listener must implement methods to handle various query progress events.
from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.listener import QueryStartedEvent, QueryProgressEvent, QueryTerminatedEvent
class EventHubListener(StreamingQueryListener):
...
def onQueryStarted(self, event: QueryStartedEvent):
# Handles a QueryStartedEvent when the stream starts
...
def onQueryProgress(self, event: QueryProgressEvent):
# Handles a QueryProgressEvent whenever a streaming batch finishes
...
def onQueryTerminated(self, event: QueryTerminatedEvent):
# Handles a QueryTerminatedEvent when the stream is stopped or cancelled
...
Production streaming query listeners should avoid complex processing logic and forward metrics to low-latency sinks like queues or logging services. This reduces resource contention and ensures minimal impact on the streaming query performance. The example below forwards streaming metrics to an Azure Event Hub using a static EventHubProducerClient.
from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.listener import QueryStartedEvent, QueryProgressEvent, QueryTerminatedEvent
from azure.eventhub import EventHubProducerClient, EventData
class EventHubQueryListener(StreamingQueryListener):
""" Forwards structured streaming query metrics to Azure Event Hubs."""
_connection_string: str
_event_hub_name: str
_event_hub_client: EventHubProducerClient
def __init__(self, connection_string: str, event_hub_name: str):
""" Initializes the EventHubQueryListener by creating the Event Hub client."""
self._connection_string = connection_string
self._event_hub_name = event_hub_name
self._event_hub_client = EventHubProducerClient.from_connection_string(
conn_str=self._connection_string,
eventhub_name=self._event_hub_name
)
def sendMessage(self, message: str):
""" Structures and sends a message to Event Hub."""
event_batch = self._event_hub_client.create_batch()
event_batch.add(EventData(message))
self._event_hub_client.send_batch(event_batch)
def onQueryProgress(self, event: QueryProgressEvent):
""" Sends an Event Hub message when a micro batch completes."""
self.sendMessage(str(event.progress))
...
StreamingQueryListener subclasses can be added in code to the listener bus using Spark’s StreamingQueryManager. Because listeners process events asynchronously, they can be added before, during, or after a stream is started.
# Instantiate the listener:
connection_string = "..."
event_hub_name = "..."
event_hub_listener = EventHubListener(connection_string, event_hub_name)
# Register the listener using the StreamingQueryManager:
spark.streams.addListener(event_hub_listener)
By reusing a shared streaming listener implementation, operations teams can achieve consistent observability across streaming workloads with minimal development overhead. This is especially useful for teams monitoring many streaming queries. Streaming metrics can be incorporated into a larger monitoring process. We could, for example:
An architecture for centralizing performance metrics using a shared StreamingQueryListenerWhen we add our listener class to our cluster’s spark.sql.streaming.streamingQueryListeners, it will be registered by default whenever a notebook or job is attached to the cluster. Our custom StreamingQueryListener class will provide out-of-the-box observability without requiring engineers to implement or register listeners in code!
To register our EventHubListener as a Spark listener, we first need to package the subclass as a JAR. Most Java or Scala IDEs have tooling to streamline this process. We can use Maven to package our EventHubListener into a JAR with all the necessary dependencies.
Packaging a JAR with Maven relies on configuration in a pom.xml file. This includes information about the project including its target runtime environment and dependencies. When packaging a JAR for use on Databricks:
Once you’ve created your JAR, you can upload it to a Unity Catalog volume or as a file in your Databricks workspace. The JAR can be installed on Databricks clusters as a cluster-scoped library or by using a cluster-scoped init script.
Databricks’ Spark environment is pre-loaded with JARs providing various functionality. To use our listener class, we need to load the JAR into the Spark class path using a basic shell command.
# ------------------------------------------------------
# Copies the EventHubListener JAR into /databricks/jars.
# This ensures our listener class is loaded in the Spark
# environment when we start a Databricks cluster.
# ------------------------------------------------------
cp /Volumes/org/streaming/artifacts/java/jars/eventhublistener-1.0-SNAPSHOT-jar-with-dependencies.jar /databricks/jars
Our script can be uploaded to a Unity Catalog volume or as a workspace file. We can set this as a cluster-scoped init script in the advanced settings of our cluster configuration.Setting up an init script using the cluster configuration UI
We can now add our listener to the default list of streaming query listeners. We can set the spark.sql.streaming.streamingQueryListeners <fully qualified class name> to our cluster’s Spark configuration.Adding a default listener to the Spark configuration
Our listener will be added to the StreamingQueryListenerBus whenever a SparkSession is created!
Databricks compute policies control configuration for Databricks clusters created in the web portal or with Databricks Asset Bundles. Our init script and Spark configuration can be added as fixed values in the policy settings.
{
"init_scripts.0.workspace.destination": {
"type": "fixed",
"value": "/Users/[email protected]/install_streaming_listener.sh"
},
"spark_conf.spark.sql.streaming.streamingQueryListeners": {
"type": "fixed",
"value": "com.dbfe.EventHubListener"
}
}
When a Databricks cluster is created with this compute policy, the cluster will automatically use the init script and Spark configuration needed to install our StreamingQueryListener. Streaming queries will forward metrics to our monitoring Event Hub by default. These metrics can be standardized and monitored across streaming jobs for an entire organization!
Monitoring streaming jobs is critical to ensuring workloads run reliably. Spark’s StreamingQueryListener provides a low-impact, fault-tolerant solution for forwarding metrics to event queues or logging services. By standardizing streaming query listeners across job deployments, we can minimize development overhead, centralize metrics, and create a complete view of streaming performance across an organization.
Check out our Git repo for an end-to-end example forwarding StreamingQueryListener metrics to Azure Event Hubs. For more on streaming observability, check out our product blog on the StateReader API.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.