Configure Databricks Spark Compute selection in Azure Data Orchestration Tools with metadata
In this article, we will discuss some options for improving interoperability between Azure Orchestration tools, like Data Factory, and Databricks Spark Compute. By using some simple metadata, we will show how to dynamically configure pipelines with appropriately sized clusters for all your orchestration and transformation needs as part of a data analytics platform.
Contents
The Challenge
I see two challenges with linking your Azure Orchestration pipelines with Spark Compute targets:
Configuration for optimal performance.
Keeping a tidy environment.
Regarding the first point, you are unlikely to need a "one size fits all" Spark Compute cluster for all your data processing needs. Some datasets are big, while others are small and while you can save a bit of thinking effort by using a big cluster. This comes with the compromise of unwanted costs for your CPUs being idle while your oversized cluster transforms a simple 100 row dataset. There are some configuration options that can help with this, such as min and max node counts, meaning you can have a smaller baseline cluster size, scaling your CPU out rather than up. With that said, it is still nicer to have a case-by-case configuration of your clusters, depending on the activities you need to perform.
This leads us nicely to point 2...
If you pursue this and set up multiple Spark compute connections, things can get messy quite quickly and may result in a workspace with multiple hard-coded artifacts, that need maintaining and configuration for each compute activity. How do you leverage configurability without the administrative effort?
This post will show you how to reduce resource clutter in your environment for the trade-off of a bit of metadata configuration.
Environment Set-up
We'll use an example in Azure Data Factory and compare what is needed for a single pipeline to run against multiple compute targets, based on the metadata supplied to it.
As such, spin up the following Azure Resources in an Azure Development Resource Group:
Azure Data Factory (Orchestration)
Azure Databricks (Spark Compute)
Azure SQL Database (Metadata Store)
You'll also need to allow Data Factory to run Databricks notebooks and read metadata from the Azure SQL Database.
Databricks Configuration
We'll work backwards and set up the compute target and activities we wish to process first.
In Databricks, we'll create a few clusters and some notebooks. Both the compute JSONs and ipynb notebooks can be found in my GitHub repository.
This example contains various clusters each associated with notebook executions, giving us examples of small, medium and large data processing requirements.
------------------------------------------------------------------------------------------------------------------------------
Compute Name | Notebook Name | Description
------------------------------------------------------------------------------------------------------------------------------
Small Cluster | smallactivity | Notebook performing basic SQL SELECT statement
------------------------------------------------------------------------------------------------------------------------------
Medium Cluster | mediumactivity | Notebook performing basic SQL SELECT statement
------------------------------------------------------------------------------------------------------------------------------
Big Cluster | bigactivity | Notebook performing basic SQL SELECT statement
------------------------------------------------------------------------------------------------------------------------------
We'll also be using Job Clusters in this example, as these are autonomous and therefore suitable for production workloads (with a lower cost per execution). There is no configuration required within Databricks for this, as job clusters are created automatically when requested by an Orchestration tool such as Data Factory.
If you'd like to know more about options for Databricks compute configurability in Databricks, check out this other blog.
Data Factory Configuration
Linked Services
For Data Factory to run anything in Databricks, we need to start with a Linked Service. This is where our first decision point occurs - Job cluster, Interactive cluster or cluster pool?
Let's look at job and interactive clusters.
Starting with the interactive clusters, we first need to select the authentication type. As we've set up access for the Data Factory Managed Identity earlier on, we shall select this option.
We want the details to match those we've just created in Databricks and the following values are required:
Stop to pause here for a second. We can start copy-pasting our Workspace URL, Cluster Id etc into the Linked Service, save, and repeat for cluster 2. This is not unmanageable yet, but it doesn't scale very well.
To improve this, create and add a parameter to each required option, giving you something that looks like this:
The Workspace resource ID value requires a few details which are going to be reusable for the interactive cluster, so I've pulled these out as separate parameters. The full expression can be found in the Linked Service JSON in the repo.
For those who are new to parameters in Data Factory, this may look a bit intimidating but offers us much greater flexibility in reusing the Linked Service. The easiest way to get started with this is to run the "Test Connection", which will prompt you to fill in your parameter values. Here, you can pass in the same Cluster references and confirm that you get the "Connection Successful" green tick. As a learning example, do this for 2 of the clusters you've created in Databricks. If you get green ticks for both, it means that Data Factory was able to connect to both clusters using this Linked Service - congrats!
The job cluster requires a few more details in the Data Factory Linked Service. For the Interactive cluster, we already specified configurations like size and runtime when building it in Databricks, but we are going to be doing this in Data Factory instead now. As a result, there are a few more parameters that need to be specified, which align with the configurations we'd normally set in Databricks. I've provided screenshots of these side-by-side below:
We'll talk about how we use these parameters in an execution context once we've set up the metadata database and run a pipeline. It is again worth noting that the Python Version is hard-coded here due to limitation within Data Factory, ensuring users select a valid version of 2 or 3.
Next up - create a Linked Service for your metadata database. For this case, we'll need to hard-code the instance and database name. It should look something like this:
Here are our finished Linked Services:
Datasets
We also require a dataset for the metadata database, so create one like the following:
Pipelines
Now that the references to the compute are ready, it's time to create a pipeline. Again, we could create a pipeline per activity (notebook), per Linked Service, but it is much better to reuse a single pipeline.
As we've lucked out with a straight-forward example, the pipeline doesn't have too many components and follows this logical flow:
Perform a lookup activity against the metadata database to find out what activity it is performing.
Pass the results of the lookup to a switch activity.
Run a Databricks notebook activity using the interactive cluster or job cluster depending on the results of the lookup and switch activities.
The pipeline also includes an "Activity Id" integer parameter, which allows us to specify which activity we are running.
This is more easily visualised below, but the JSON of the pipeline is also available in the repo.
The lookup activity connects to the Metadata database dataset configured earlier and executes a stored procedure which uses our pipeline parameter. We shall talk more about the SQL configuration shortly.
Taking a closer look at each of the Databricks notebook activities:
Optional: Create an Orchestration Pipeline
You may wish to create another pipeline to orchestrate the Activity pipelines. I've created a very simple one which gets the Activity Ids from the Activities table and iterates through the results by executing the PL_Activities pipeline with these Ids in parallel.
Azure SQL Database Configuration
For the metadata, we'll keep things as simple as possible and create two tables and one stored procedure.
The Compute Connection table contains the connection details for all the interactive Databricks compute clusters we've created, along with the configuration specifications for the Job clusters.
Note: We've got a few columns in here that contain repeated values that might be better stored in a different table in real-life. Since this is an example and I don't want to bloat the artifacts, we're happy to include this repeated data here.
The Activity table contains data about the activities we're going to execute. In our example scenario, this includes an Activity Id, Compute Connection Id and Notebook path.
We'll use a stored procedure called GetPayload for Data Factory to query the activity and compute connection details required for an execution. This takes in that Activity Id parameter we created in the Data Factory pipeline, and returns a single row as shown below:
Running this stored procedure for each Activity Id, we can see the info that the notebook activities will use to connect to the notebook and which cluster to use.
All table create scripts, insert statements and stored procedure definition are included in the code base.
Bringing It All Together
We're now ready to run everything. Kick off an execution and fill in the Activity Id parameter accordingly.
I've included screenshots below of the monitoring for each of these.
As shown above, we've got 3 PL_Activities pipeline executions each using metadata to populate the parameters which allow us to switch the compute and notebooks being executed in each context.
Exploring the execution, we can see the Interactive cluster was used in the switch logic, as per our metadata configuration, and the 3 different clusters were used according to the Activity in question.
If we consider this our "development environment", we may wish to update the compute configurations to run autonomously and wish to use the job clusters for the processing instead - mimicking a production environment. The metadata makes this very easy.
Run the following command:
DECLARE @SmallJobClusterId INT = 4
UPDATE [demo].[Activities]
SET [ComputeConnectionId] = 4
WHERE ActivityDisplayName = 'Small Activity';
Rerunning the Stored Procedure and Pipeline for Activity ID 1, we can see in the monitoring it is now using the job cluster. That was easy!
What about scaling up Activity Id 3 to use a storage-optimised cluster? We've can add a different spec Job cluster to the ComputeConnections metadata table, and then update the ComputeConnectionId column in the Activity table to reflect this.
-- Insert New Job Cluster Configuration
INSERT INTO [demo].[ComputeConnections](
[ConnectionDisplayName],
[ComputeWorkspaceURL],
[ComputeClusterId],
[ComputeSize],
[ComputeVersion],
[CountNodes],
[ResourceName],
[ComputeLinkedServiceName],
[ResourceGroupName],
[SubscriptionId],
[Enabled]
)
VALUES
('StorageOptimisedBigJobCluster','adb-xxxxx.azuredatabricks.net','NA','Standard_L8s_v3','15.4.x-scala2.12',2,'workspace resource name', 'LS_Databricks_JobCluster_Python3','demo-rg-uks01','sid-123-456-789',1);
-- Update ActivityId 3 to use new Compute Connection
UPDATE [demo].[Activities]
SET [ComputeConnectionId] = 7
WHERE ActivityDisplayName = 'Big Activity';
With 2 quick SQL statements we've been able to scale up our compute requirements for the Data Factory pipeline to run with no additional resources deployed. We can see this change when running the pipeline from the activity details or in the Databricks execution:
This demonstrates the power and convenience of using metadata to drive your compute configuration when using tools like Data Factory and Databricks. Of course, it can be extended to Synapse and Fabric environments.
Conclusion
Hopefully we've demonstrated that with a bit of configuration effort, you gain a lot of reusability and ease of maintainability.
To me, the standout advantages you gain from using a metadata-driven solution are the following:
Less clutter in Data Factory Linked Services and Pipelines as parameterisation increases reusability.
Allows for a single update statement in metadata to change the requirement for an activity.
Configurable granularity on an activity-by-activity basis.
This approach also opens the door to performance tracking and analytics options at this fine-grain granularity, should you be using Unity Catalog, where system tables allow you to monitor and review the Spark performance and costs in your environment, and review/scale as required.
If you'd like to know more about the benefits of a metadata-driven framework then please reach out to the team - we'd be happy to answer any questions on implementation.
You may also be interested in CF.Cumulus, our metadata-driven Analytics Platform framework, which utilises metadata-driven data orchestration, ingestion and transformation.
Book some time with the team here, or self-serve by signing up on our Getting Started page.
Comments