. Advertisement .
..3..
. Advertisement .
..4..
PySpark UDF is among the most popular instruments for programmers, yet novices are still in the dark about its usage and creation. Turn to these guidelines for guidance.
Tips to Write Pyspark UDF
Method 1. Use Column()
Let’s create one simple RDD and DataFrame in the illustration below. We establish one function to convert the structure’s text field into one integer.
Here is our entire code for Method 1. First, inspect its key sections. Use our traditional approach to establish a DataFrame:
df = spark.createDataFrame(data,schema=schema)
There are two tasks to do next. First, create the “colsInt” function and register it. This function registry will call another toInt()
function, which does not need registration.
Our first argument in the udf.registers("ColsInt", colsInt")
will be the label used to allude to this function. On the other hand, the second argument will be the function that we like to register.
colsInt = udf(lambda z: toInt(z), IntegerType())
spark.udf.register("colsInt", colsInt)
def toInt(s):
if isinstance(s, str) == True:
st = [str(ord(i)) for i in s]
return(int(''.join(st)))
else:
return Null
Then you will call the colinsInt function (like what we illustrate below). Your first argument will be the new column label you like to create. On the other hand, the second argument is the DataFrame column to plug the function in.
df2 = df.withColumn( 'semployee',colsInt('employee'))
Never forget that df[’employees’] serves as one column object instead of one single employee – which means it is a must to loop all rows over that column. Hence, the lambda in-line loop will be helpful here.
colsInt = udf(lambda z: toInt(z), IntegerType())
Here is the entire code for Method 1:
import pyspark
from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.functions import udf
from pyspark.sql import Row
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)
schema = StructType([
StructField("sales", FloatType(),True),
StructField("employee", StringType(),True),
StructField("ID", IntegerType(),True)
])
data = [[ 10.2, "Freddy",123]]
df = spark.createDataFrame(data,schema=schema)
colsInt = udf(lambda z: toInt(z), IntegerType())
spark.udf.register("colsInt", colsInt)
def toInt(s):
if isinstance(s, str) == True:
st = [str(ord(i)) for i in s]
return(int(''.join(st)))
else:
return Null
df2 = df.withColumn( 'semployee',colsInt('employee'))
And the output will be like this; you can see that the “semployee” column has been inserted. withColumn() establishes new DataFrames so that we could make df2.
df2.show()
+-----+--------+---+----------+
|sales|employee| ID| semployee|
+-----+--------+---+----------+
| 10.2| Freddy|123|1394624364|
+-----+--------+---+----------+
Method 2. Use SQL
First of all, you need to register one DataFrame as one table to run all SQL statements on it. df represents “DataFrame” while “dftab” refers to the temporarily established table.
spark.registerDataFrameAsTable(df, "dftab")
Next, create “Df3” – a new DataFrame – from the existing ones on your df. Once done, apply the function “colsInt” to your column “employee”.
df3 = spark.sql("select sales, employee, ID, colsInt(employee) as iemployee from dftab")
The results are like this:
df3.show()
+-----+--------+---+----------+
|sales|employee| ID| iemployee|
+-----+--------+---+----------+
| 10.2| Freddy|123|1394624364|
+-----+--------+---+----------+
Method 3. Use UDF with An Annotation
This section will introduce our last method.
One smart and quick way to write Spark UDF is to count on decorators or annotations, which establish your UDF commands much faster. All you need to do is to use @sign in front of the UDFR function, then provide the function’s return type in one of its argument parts (such as StringType (), Intergertype (), etc.).
In this DataFrame, the theme is the students’ names and test scores (out of 100).
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import udf
spark = SparkSession.builder.appName('UDF PRACTICE').getOrCreate()
cms = ["Name","RawScore"]
data = [("Jack", "79"),
("Mira", "80"),
("Carter", "90")]
df = spark.createDataFrame(data=data,schema=cms)
df.show()
Now, let’s apply Decorator to this code:
@udf(returnType=StringType())
def Converter(str):
result = ""
a = str.split(" ")
for q in a:
if q == 'J' or 'C' or 'M':
result += q[1:2].upper()
else:
result += q
return result
df.withColumn("Special Names", Converter("Name")) \
.show()
And here is the output:
+-----+--------+--------------+
|Names|RawScore|Special Names|
+-----+--------+---+----------+
| Jack| 79| A|
| Mira| 80| I|
|Carter| 90| A|
+-----+--------+---+----------+
Conclusion
Our article has shown you how to write a PySpark UDF in simple steps. With our guides and examples, you should have no trouble applying these methods to your current programs. For other similar PySpark aspects (such as writing parqueting files), feel free to browse this website for more tutorials.
Leave a comment