PySpark — Dataframe Operations CheatSheet

||
Posted 2 months, 1 week ago
||
Views 79
||
4 min read
0 reactions

Introduction:

In PySpark, DataFrames are the primary data structure used for working with structured data. They provide a convenient interface for performing various operations on data, similar to DataFrames in pandas or SQL tables.

This article serves as a comprehensive cheat sheet for various DataFrame operations in PySpark. It covers essential operations such as data loading, manipulation, filtering, aggregation, joining, and more. Each section provides code examples along with explanations to help you understand and apply these operations effectively in your PySpark projects.

1. Add new column in existing dataframe

Using withColumn, we can add new column to the existing dataframe. This will create and return new dataframe

from pyspark.sql import SparkSession

from pyspark.sql.functions import when, col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Add Column") \
    .getOrCreate()

data = [("Vengat", 50), ("Varnika", 15), ("Bob", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Add a new column based on an existing column
df_with_new_column = df.withColumn("Age_Group", 
                                   when(col("Age") > 35, "Senior")
                                   .otherwise("Young"))

# Show the updated DataFrame
df_with_new_column.show()

2. Modify a DataFrame column

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Modify Column Example") \
    .getOrCreate()

# Sample DataFrame
data = [("Vengat", 50), ("Varnika", 15), ("Bob", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Modify the "Age" column by multiplying each value by 2
df_modified = df.withColumn("Age", col("Age") * 2)

# Show the modified DataFrame
df_modified.show()

3. Add a constant column

A constant column to a DataFrame in PySpark involves using the withColumn() function along with the lit() function to specify the constant value. Here's how you can do it:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Add Column") \
    .getOrCreate()


data = [("Vengat", 50), ("Varnika", 15), ("Bob", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Add a constant column named "City" with value "Chennai"
df_with_city = df.withColumn("City", lit("Chennai"))

# Show the updated DataFrame
df_with_city.show()

4. Concatenate columns

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Concatenate Columns") \
    .getOrCreate()

# Sample DataFrame with Indian names
data = [("Akshay", "Kumar"), ("Priyanka", "Chopra"), ("Amitabh", "Bachchan")]
df = spark.createDataFrame(data, ["First_Name", "Last_Name"])

# Concatenate "First_Name" and "Last_Name" columns into a new column "Full_Name"
df_with_full_name = df.withColumn("Full_Name", concat(df["First_Name"], lit(" "), df["Last_Name"]))

# Show the DataFrame with the concatenated column
df_with_full_name.show()

5. Drop a column

new_df = df_with_full_name.drop("Full_Name")

6. Update a column name

# Rename the column "Name" to "Full_Name"
df_renamed = df.withColumnRenamed("Name", "Full_Name")

# Show the DataFrame with the renamed column
df_renamed.show()

7. Change multiple columns using loop

# Sample DataFrame
data = [("John", 30, "Male"), ("Jane", 35, "Female"), ("Bob", 40, "Male")]
df = spark.createDataFrame(data, ["Name", "Age", "Gender"])

# Define the mapping of old column names to new column names
column_mapping = {
    "Name": "Full_Name",
    "Age": "Years",
    "Gender": "Sex"
}

# Rename the columns using a loop
for old_name, new_name in column_mapping.items():
    df = df.withColumnRenamed(old_name, new_name)

# Show the DataFrame with the renamed columns
df.show()

8. Convert a DataFrame column to a Python list

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Convert Column to List Example") \
    .getOrCreate()

# Sample DataFrame
data = [("John", 30), ("Jane", 35), ("Bob", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Select the column you want to convert to a list and collect the values into a list
name_list = df.select("Name").rdd.flatMap(lambda x: x).collect()

# Show the Python list
print(name_list)

9. Filtering rows

df_filtered = df.filter(df["Age"] > 30)

10. Sorting Rows:

# Sort rows by Name in descending order
df_sorted = df.orderBy(df["Name"].desc())

11. Grouping and Aggregating:

df_grouped = df.groupBy("Name").agg({"Age": "avg"})

12. Joining DataFrames:

# Join two DataFrames df1 and df2 on the common column "ID"
df_joined = df1.join(df2, df1["ID"] == df2["ID"], "inner")

13. Pivoting:

df_pivoted = df.groupBy("Col1").pivot("Col2").agg(F.sum("Value"))

14. Window Functions:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window_spec = Window.partitionBy("Department").orderBy("Salary")
df.withColumn("rank", F.rank().over(window_spec))

15. UDFs (User-Defined Functions):

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define UDF to convert names to uppercase
upper_udf = udf(lambda x: x.upper(), StringType())

# Apply UDF to DataFrame column "Name"
df_with_upper_name = df.withColumn("UpperName", upper_udf(df["Name"]))

16. Distinct Values:

df_distinct = df.select("Name", "City").distinct()

17. Drop Duplicates:

df_no_duplicates = df.dropDuplicates(["Column1", "Column2"])

18. Handling Null Values:

df_filled = df.fillna(0)  # Fill null values with a specific value

19. Convert Data Types:

df_converted = df.withColumn("NewCol", df["OldCol"].cast("double"))

0 reactions

Discussion


Looking for Freelancing Jobs
Joined on April 15, 2020

Latest Videos