. Advertisement .
..3..
. Advertisement .
..4..
What are PySpark Window functions? The PySpark applies statistical actions, such as row number, rank, etc., to a frame, set, or collection of rows and delivers results for each row separately.
You may find information on the functions, syntax, and usage, as well as instruction on how to use them with PySpark’s DataFrame API and PySpark SQL in this writing.
PySpark Window Functions
Functions in PySpark Window operate on a row collection (such as partition and frame) and give back a single value for each row of input.
PySpark SQL provides three types of functions:
- Ranking
- Analytic
- Aggregate
Partitioning the data with Window.partitionBy() is necessary before performing an action on a group. For rank function and row number, you also need to place order by on the partitioned data utilizing orderBy.
Let’s make a DateFrame in PySpark to work with.
spark = SparkSession.builder.appName('ITtutoria.net').getOrCreate()
simpleData = (("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000),\
("Saif", "Sales", 4100) \
)
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)
Output:
root
|-- employee_name: string (nullable = true)
|-- department: string (nullable = true)
|-- salary: long (nullable = true)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Robert |Sales |4100 |
|Maria |Finance |3000 |
|James |Sales |3000 |
|Scott |Finance |3300 |
|Jen |Finance |3900 |
|Jeff |Marketing |3000 |
|Kumar |Marketing |2000 |
|Saif |Sales |4100 |
+-------------+----------+------+
Ranking Functions Of PySpark Window
row_number()
This function offers the consecutive row number beginning with 1 to each window partitioning outcome.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number().over(windowSpec)) \
.show(truncate=False)
Output:
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James |Sales |3000 |1 |
|James |Sales |3000 |2 |
|Robert |Sales |4100 |3 |
|Saif |Sales |4100 |4 |
|Michael |Sales |4600 |5 |
|Maria |Finance |3000 |1 |
|Scott |Finance |3300 |2 |
|Jen |Finance |3900 |3 |
|Kumar |Marketing |2000 |1 |
|Jeff |Marketing |3000 |2 |
+-------------+----------+------+----------+
rank()
It is used to assign the result within a partition a rank. In the event of ties, this function results in rank gaps.
"""rank"""
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
.show()
Output:
+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 3|
| Saif| Sales| 4100| 3|
| Michael| Sales| 4600| 5|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----+
This corresponds to the SQL RANK function.
dense_rank()
The dense_rank() function is utilized to obtain results with the rank of the rows within a partition with no gaps. Notice that this one is quite similar to the rank() function. The sole distinction is that in the event of ties, rank() causes gaps in rank.
"""dens_rank"""
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.show()
Output:
+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 2|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 3|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----------+
As you can tell, this is equivalent to SQL’s DENSE_RANK.
percent_rank()
""" percent_rank """
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
.show()
Output:
+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
| James| Sales| 3000| 0.0|
| James| Sales| 3000| 0.0|
| Robert| Sales| 4100| 0.5|
| Saif| Sales| 4100| 0.5|
| Michael| Sales| 4600| 1.0|
| Maria| Finance| 3000| 0.0|
| Scott| Finance| 3300| 0.5|
| Jen| Finance| 3900| 1.0|
| Kumar| Marketing| 2000| 0.0|
| Jeff| Marketing| 3000| 1.0|
+-------------+----------+------+------------+
This is equivalent to the SQL’s PERCENT_RANK function.
ntile()
ntile() function gives back the results row’s relative rank inside a window partition. As we passed 2 as a parameter to ntile in the following example, it returns a ranking between two values of 1 and 2.
"""ntile"""
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
.show()
Output:
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 1|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 2|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 1|
| Jen| Finance| 3900| 2|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+-----+
This is similar to SQL NTILE.
Analytic Functions Of PySpark Window
cume_dist()
cume_dist() determines the total distribution of values found in a partitioning window. Similar to SQL’s DENSE_RANK.
""" cume_dist """
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
.show()
Output:
+-------------+----------+------+------------------+
|employee_name|department|salary| cume_dist|
+-------------+----------+------+------------------+
| James| Sales| 3000| 0.4|
| James| Sales| 3000| 0.4|
| Robert| Sales| 4100| 0.8|
| Saif| Sales| 4100| 0.8|
| Michael| Sales| 4600| 1.0|
| Maria| Finance| 3000|0.3333333333333333|
| Scott| Finance| 3300|0.6666666666666666|
| Jen| Finance| 3900| 1.0|
| Kumar| Marketing| 2000| 0.5|
| Jeff| Marketing| 3000| 1.0|
+-------------+----------+------+------------------+
lag()
It is the same as the LAG of SQL. This function is helpful when it comes to comparing the values of the preceding and current row. Suppose there are fewer rows than the specified offset before the current ones; it will give back a “null” value.
"""lag"""
from pyspark.sql.functions import lag
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
.show()
Output:
+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
| James| Sales| 3000|null|
| James| Sales| 3000|null|
| Robert| Sales| 4100|3000|
| Saif| Sales| 4100|3000|
| Michael| Sales| 4600|4100|
| Maria| Finance| 3000|null|
| Scott| Finance| 3300|null|
| Jen| Finance| 3900|3000|
| Kumar| Marketing| 2000|null|
| Jeff| Marketing| 3000|null|
+-------------+----------+------+----+
lead()
This function is similar to the LEAD in SQL, which gives access to the row at a given offset following the current ones. Same as the LAG function, it returns a “null” result when there are fewer rows than the specified offset before the current rows.
"""lead"""
from pyspark.sql.functions import lead
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
.show()
Output:
+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
| James| Sales| 3000|4100|
| James| Sales| 3000|4100|
| Robert| Sales| 4100|4600|
| Saif| Sales| 4100|null|
| Michael| Sales| 4600|null|
| Maria| Finance| 3000|3900|
| Scott| Finance| 3300|null|
| Jen| Finance| 3900|null|
| Kumar| Marketing| 2000|null|
| Jeff| Marketing| 3000|null|
+-------------+----------+------+----+
Aggregate Functions Of PySpark Window
You can use aggregate functions with WindowSpec to calculate the min, max, and sum values of each department. Remember that orderBy is unnecessary when using these functions.
windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
df.withColumn("row",row_number().over(windowSpec)) \
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
.where(col("row")==1).select("department","avg","sum","min","max") \
.show()
Output:
+----------+------+-----+----+----+
|department| avg| sum| min| max|
+----------+------+-----+----+----+
| Sales|3760.0|18800|3000|4600|
| Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+
Source Code Example
Use the following example as a reference:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ITtutoria.net').getOrCreate()
simpleData = (("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000),\
("Saif", "Sales", 4100) \
)
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number().over(windowSpec)) \
.show(truncate=False)
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
.show()
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.show()
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
.show()
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
.show()
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
.show()
from pyspark.sql.functions import lag
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
.show()
from pyspark.sql.functions import lead
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
.show()
windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
df.withColumn("row",row_number().over(windowSpec)) \
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
.where(col("row")==1).select("department","avg","sum","min","max") \
.show()
Conclusion
So, you have learned the syntax of PySpark Window functions of SQL in this guide and how to use them with aggregate functions. Now you can start applying these functions into use.
Leave a comment