Spark broadcast variables are an important type of shared variable. They can come in handy when you need to share copies of datasets between nodes. Let’s see where you can make use of them and optimize your applications.
Shared Variables In Spark
When you pass a function to an operation in Spark (such as reduce or map) and execute it on a remote cluster, that cluster uses different copies of every variable in the function. Spark copies those variables to each node. When they are altered in those remote clusters, changes won’t get synced back to Spark’s driver program.
General shared variables that are both readable and writable would be a natural approach. But using them across Spark tasks would put a huge burden on the resources of your systems.
To solve this problem to some extent, Spark offers two types of shared variables: accumulators and broadcast variables. While they have limitations, you can use them for common purposes.
Accumulators can be used in an efficient way in parallel since these variables are added to Spark through a commutative and associative operation. You can implement sums or counters in MapReduce with them. Accumulators of numeric types are natively supported in Spark, and you can add implementations for other types.
Spark allows you to create unnamed or named accumulators. Its web UI will show the named accumulator with the stage modifying it. Accumulators having their values modified by Spark tasks will also display in the Tasks table.
This method of tracking modified accumulators through a GUI can help you get a better understanding of running stages and their progress. However, while you can fully take advantage of it in the Scala shell, this functionality isn’t implemented in PySpark yet.
Spark Broadcast Variables
Unlike accumulators – which ship copies of read-only variables with Spark tasks, broadcast variables keep them in the cache of each node. You can use them to, for instance, efficiently give all the nodes access to a large dataset.
Spark implements optimized broadcast algorithms when distributing those shared variables to make sure the communication cost is kept to a minimum. Spark executes actions through various stages, which are separated by shuffle operations.
In each stage, common data is automatically broadcasted. Spark caches it by serializing and deserializing it again before each task is executed.
Here is an example of how to use broadcast variables. You will need to use the function SparkContext.broadcast() to create a Broadcast object and broadcast it to the cluster for reading in distributed functions. Each cluster will receive the created variable just once.
from pyspark import SparkContext sc = SparkContext() sites = sc.broadcast(["ITTutoria", "Stack Overflow", "Quora", "Reddit"]) data = sites.value print("Stored data ->", data) elem = sites.value print("Printing an element ->", elem)
Stored data -> ['ITTutoria', 'Stack Overflow', 'Quora', 'Reddit'] Printing an element -> Quora
In the snippet above, the broadcast variable wraps around the list of websites. You can access its values by using the method value.
After creating a broadcast variable, you should prefer to use it over the original value in functions executed on your cluster. This makes sure your data won’t be shipped more than necessary to each node. Additionally, you shouldn’t modify the data after broadcasting it. Otherwise, the nodes in the cluster may not get the same data when some get shipped later to others.
Spark broadcast variables are a useful feature that you can use to reduce resource usage. By shipping the same object to every node on the cluster only once, you can reduce the communication cost when sharing read-only variables.