. Advertisement .
..3..
. Advertisement .
..4..
Spark SQL shuffle should not be a strange name for coders. This mechanism is often used to redistribute, re-partition, and make the data categorized into different groups across partitions.
It functions depending on the data size, which you will increase or reduce to suit your purposes. Spark shuffle is an expensive operation.
This tutorial will discuss spark SQL shuffle partitions and how to call them.
What Is a Spark Shuffle?
The shuffling mechanism is effective in redistributing the date across different machines and executors. It triggers transformation operations like groupBy(), gropByKey(), join(), and reducebyKey().
Spark shuffle operation is often expensive due to its disk I/O, network I/O, and data deserialization and serialization.
Spark doesn’t have to store the dât for all keys to create an RDD. Instead, it does the following tasks when we execute the reduceByKey() operation:
- Run the map task to group any available value for a specific key.
- Keep the map’s results in memory
- If the data cannot be stored in memory, a disk will serve the purpose.
- Shuffle the data and store it for later recalculating use.
- Execute garbage collection and reduce tasks.
Spark RDD Shuffle
There are several operations triggered by Spark RDD. GroupByKey(), cogroup(), repartition(), join(), and reduceByKey() are a few to name. Yet, countByKey() is not included in this list.
val spark:SparkSession = SparkSession.builder()
.master("local[5]")
.appName("ittutoria.net")
.getOrCreate()
val sc = spark.sparkContext
val rdd:RDD[String] = sc.textFile("src/main/resources/test.txt")
println("RDD Parition Count :"+rdd.getNumPartitions)
val rdd2 = rdd.flatMap(f=>f.split(" "))
.map(m=>(m,1))
//ReduceBy transformation
val rdd5 = rdd2.reduceByKey(_ + _)
println("RDD Parition Count :"+rdd5.getNumPartitions)
#Output
RDD Parition Count : 3
RDD Parition Count : 3
You can get the number of partitions depending on the Spark’s operation and your setup.
Spark SQL DataFrame Shuffle
Spark SQL Dataframe Shuffle tends to increase the number of partitions when performing shuffling. These operations trigger the desired shuffling function in all aggregate ones and join().
import spark.implicits._
val simpleData = Seq(("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
val df2 = df.groupBy("state").count()
println(df2.rdd.getNumPartitions)
The final result is 200.
Spark Default Shuffle Partition
DataFrame automatically counts the partition to 200 when the operation shuffles the data. This default function is due to the spark.sql.shuffle.partitions
configuration, which is fixed at 200.
You can also change this value with the SparkSession’s conf method.
Shuffle Partition Size
Depending on your data size, memory, and core numbers, Spark shuffling can either harm or benefit your programs. If you deal with less data, the shuffle partitions should be reduced.
This way, you can use fewer records to get more partitioned files. Enjoy running various tasks with less processed data.
On the other hand, too much data and fewer partitions will lead to fewer long running tasks. Thanks to it, you can also avoid memory errors.
It is not easy to get enough shuffle partitions. You need to take various runs with values to receive the desired number.
Conclusion
The article has discussed the Spark SQL and some data Spark SQL shuffle partitions. Check each heading to get the right size.
Leave a comment