. Advertisement .
..3..
. Advertisement .
..4..
Users can use Spark Window functions to calculate outcomes such as row number, rank, etc., over an input row selection. This article gives you details on window functions, usage, and syntax. It also shows how to integrate them with Spark’s DataFrame SQL and API.
These are useful when you need to do aggregate operations within a certain frame on the columns of DataFrame.
Spark Window Functions
The functions in Spark Window work on a collection of rows (such as partition and frame) and give back a value for each row of input. SQL offers three types of functions:
- Ranking
- Analytic
- Aggregate
Data must first be partitioned utilizing Window.partitionBy() to conduct an action on the group. For rank function and row number, you need to place order the partitioned data using the orderBy function.
Ranking Functions Of Spark Window
row_number()
It provides the consecutive row number beginning with 1 to each window partitioning result.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
//row_number
val windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
.show()
Output:
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
| James| Sales| 3000| 1|
| James| Sales| 3000| 2|
| Robert| Sales| 4100| 3|
| Saif| Sales| 4100| 4|
| Michael| Sales| 4600| 5|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----------+
rank()
This function is utilized to give the outcome within a partition a rank. In the event of ties, it creates gaps in rank.
import org.apache.spark.sql.functions._
//rank
df.withColumn("rank",rank().over(windowSpec))
.show()
Output:
+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 3|
| Saif| Sales| 4100| 3|
| Michael| Sales| 4600| 5|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----+
As you can tell, this is equivalent to SQL’s RANK function.
dense_rank()
dense_rank() is utilized to provide results with the rank of the rows within the partition with no gaps. You might notice that this function is comparable to the rank(). The only difference is that the latter results in gaps in the ranking in the case of ties.
import org.apache.spark.sql.functions._
//dens_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec))
.show()
Output:
+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 2|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 3|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----------+
This corresponds to the SQL DENSE_RANK.
percent_rank()
import org.apache.spark.sql.functions._
//percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec))
.show()
Output:
+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
| James| Sales| 3000| 0.0|
| James| Sales| 3000| 0.0|
| Robert| Sales| 4100| 0.5|
| Saif| Sales| 4100| 0.5|
| Michael| Sales| 4600| 1.0|
| Maria| Finance| 3000| 0.0|
| Scott| Finance| 3300| 0.5|
| Jen| Finance| 3900| 1.0|
| Kumar| Marketing| 2000| 0.0|
| Jeff| Marketing| 3000| 1.0|
+-------------+----------+------+------------+
This is similar to the PERCENT_RANK in SQL.
ntile()
This function gives back the result rows’ relative rank inside a partition. Since we passed 2 as a parameter to ntile in the example below, it gives back a ranking value of either 1 or 2.
//ntile
df.withColumn("ntile",ntile(2).over(windowSpec))
.show()
Output:
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 1|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 2|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 1|
| Jen| Finance| 3900| 2|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+-----+
This is equivalent to the SQL’s NTILE function.
Analytic Functions Of Spark Window
cume_dist()
It determines the total dispersion of values contained within a partitioning window. Same as the SQL’s DENSE_RANK.
//cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec))
.show()
Output:
+-------------+----------+------+------------------+
|employee_name|department|salary| cume_dist|
+-------------+----------+------+------------------+
| James| Sales| 3000| 0.4|
| James| Sales| 3000| 0.4|
| Robert| Sales| 4100| 0.8|
| Saif| Sales| 4100| 0.8|
| Michael| Sales| 4600| 1.0|
| Maria| Finance| 3000|0.3333333333333333|
| Scott| Finance| 3300|0.6666666666666666|
| Jen| Finance| 3900| 1.0|
| Kumar| Marketing| 2000| 0.5|
| Jeff| Marketing| 3000| 1.0|
+-------------+----------+------+------------------+
lag()
This function is similar to the LAG one of SQL. Lag is quite helpful when comparing the values of the current row and the preceding row. Notice that it will return a “null” result if there are fewer rows than the specified offset before the current ones.
//lag
df.withColumn("lag",lag("salary",2).over(windowSpec))
.show()
Output:
+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
| James| Sales| 3000|null|
| James| Sales| 3000|null|
| Robert| Sales| 4100|3000|
| Saif| Sales| 4100|3000|
| Michael| Sales| 4600|4100|
| Maria| Finance| 3000|null|
| Scott| Finance| 3300|null|
| Jen| Finance| 3900|3000|
| Kumar| Marketing| 2000|null|
| Jeff| Marketing| 3000|null|
+-------------+----------+------+----+
lead()
This window function is equivalent to the LEAD in SQL. It grants access to the row at a given physical offset following the current ones. Similar to the lag function, it will give back a “null” value if there are fewer rows than the specified offset before the current ones.
//lead
df.withColumn("lead",lead("salary",2).over(windowSpec))
.show()
Output:
+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
| James| Sales| 3000|4100|
| James| Sales| 3000|4100|
| Robert| Sales| 4100|4600|
| Saif| Sales| 4100|null|
| Michael| Sales| 4600|null|
| Maria| Finance| 3000|3900|
| Scott| Finance| 3300|null|
| Jen| Finance| 3900|null|
| Kumar| Marketing| 2000|null|
| Jeff| Marketing| 3000|null|
+-------------+----------+------+----+
Aggregate Functions Of Spark Window
Users can use aggregate functions with WindowSpec to calculate each department’s min, max, and sum. Keep in mind that the orderBy clause is unnecessary when using these functions.
val windowSpecAgg = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(windowSpec))
.withColumn("avg", avg(col("salary")).over(windowSpecAgg))
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))
.withColumn("min", min(col("salary")).over(windowSpecAgg))
.withColumn("max", max(col("salary")).over(windowSpecAgg))
.where(col("row")===1).select("department","avg","sum","min","max")
.show()
Output:
+----------+------+-----+----+----+
|department| avg| sum| min| max|
+----------+------+-----+----+----+
| Sales|3760.0|18800|3000|4600|
| Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+
Source Code Example
You can use the example below as a reference:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object WindowFunctions extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("ITtutoria.net")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
//row_number
val windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
.show()
//rank
df.withColumn("rank",rank().over(windowSpec))
.show()
//dens_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec))
.show()
//percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec))
.show()
//ntile
df.withColumn("ntile",ntile(2).over(windowSpec))
.show()
//cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec))
.show()
//lag
df.withColumn("lag",lag("salary",2).over(windowSpec))
.show()
//lead
df.withColumn("lead",lead("salary",2).over(windowSpec))
.show()
//Aggregate Functions
val windowSpecAgg = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(windowSpec))
.withColumn("avg", avg(col("salary")).over(windowSpecAgg))
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))
.withColumn("min", min(col("salary")).over(windowSpecAgg))
.withColumn("max", max(col("salary")).over(windowSpecAgg))
.where(col("row")===1).select("department","avg","sum","min","max")
.show()
}
Conclusion
Now you know everything about the syntax of Spark Window functions of SQL in this tutorial, how to utilize aggregate functions, and various Scala examples. Read more related articles about Spark SQL partitions here.
Leave a comment