. Advertisement .
..3..
. Advertisement .
..4..
The following post offers some simple methods to convert PySpark RDD to dataframe. All the methods come in great detail with different parameters and examples.
What Are DataFrame And RDD In PySpark?
Dataframe and RDD are two major Spark APIs, which are used for storing and processing data. The former option offers high-level APIs to support all SQL methods. On the other hand, the latter one comes with lower-level APIs to process distributed data.
Before jumping to convert RDD to dataframe, let’s create the context with tuples:
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext
val rdd = sc.parallelize(
Seq(
("John", "Manager", 38),
("Mary", "Director", 45),
("Sally", "Engineer", 30)
)
)
Construct a new RDD with three rows to learn to convert it to dataframe.
How To Convert PySpark RDD To DataFrame
Method 1: Use The createDataFrame Method
This method comes from the SparkSession class. The createDataFrame function is considered an overloaded method. Thus, you need to pass the RDD to employ the method or use a schema.
Let’s see how you can do without a schema:
val dfWitDefaultSchema = spark.createDataFrame(rdd)
Inspect your new dataframe’s schema:
dfWitDefaultSchema.printSchema()
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: integer (nullable = false)
Here, the names of all columns follow a default name sequence according to its default template. It is created by the type inference, which doesn’t need to be precise all the time.
It would be better to create a dataframe with a predefined object. This way, your chosen method can take schema as the second parameter.
val rowRDD:RDD[Row] = rdd.map(t => Row(t._1, t._2, t._3))
Create a schema object:
val schema = new StructType()
.add(StructField("Name", StringType, false))
.add(StructField("Job", StringType, true))
.add(StructField("Age", IntegerType, true))
Now use an additional parameter to have all columns named appropriately with correctly defined data types.
val dfWithSchema:DataFrame = spark.createDataFrame(rowRDD, schema)
dfWithSchema.printSchema()
|-- Name: string (nullable = false)
|-- Job: string (nullable = true)
|-- Age: integer (nullable = true)
Method 2: Use toDF() Method
Converting RDD to dataframe with the toDF() function is another popular method. You need to import the SparkSession’s implicits first to use this way:
import spark.implicits._
Remember that this function is only suitable for selected RDD types. They include scala.Product sub-classes, int, string, or long. Prepare an RDD containing a tuple sequence and run the following command:
val dfUsingToDFMethod = rdd.toDF("Name", "Job", "Age")
Then, inspect the new dataframe’s schema:
dfUsingToDFMethod.printSchema()
|-- Name: string (nullable = true)
|-- Job: string (nullable = true)
|-- Age: integer (nullable = false)
Method 3: Use StructType Schema
The column’s datatype often results from the data value, ranging from nullable to true. Supply schema with StructType to change this default behavior.
In this case, you can specify a data type, column name, and nullable value for any column or field.
from pyspark.sql.types import StructType,StructField, StringType
deptSchema = StructType([
StructField('dept_name', StringType(), True),
StructField('dept_id', StringType(), True)
])
deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)
Output:
root
|-- dept_name: string (nullable = true)
|-- dept_id: long (nullable = true)
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+
Conclusion
The article teaches you how to convert PySpark RDD to dataframe in three methods.
Leave a comment