In PySpark, DataFrames are the primary data structure used for working with structured data. They provide a convenient interface for performing various operations on data, similar to DataFrames in pandas or SQL tables.
This article serves as a comprehensive cheat sheet for various DataFrame operations in PySpark. It covers essential operations such as data loading, manipulation, filtering, aggregation, joining, and more. Each section provides code examples along with explanations to help you understand and apply these operations effectively in your PySpark projects.
Using withColumn, we can add new column to the existing dataframe. This will create and return new dataframe
from pyspark.sql import SparkSession from pyspark.sql.functions import when, col # Initialize SparkSession spark = SparkSession.builder \ .appName("Add Column") \ .getOrCreate() data = [("Vengat", 50), ("Varnika", 15), ("Bob", 40)] df = spark.createDataFrame(data, ["Name", "Age"]) # Add a new column based on an existing column df_with_new_column = df.withColumn("Age_Group", when(col("Age") > 35, "Senior") .otherwise("Young")) # Show the updated DataFrame df_with_new_column.show()
from pyspark.sql import SparkSession from pyspark.sql.functions import col # Initialize SparkSession spark = SparkSession.builder \ .appName("Modify Column Example") \ .getOrCreate() # Sample DataFrame data = [("Vengat", 50), ("Varnika", 15), ("Bob", 40)] df = spark.createDataFrame(data, ["Name", "Age"]) # Modify the "Age" column by multiplying each value by 2 df_modified = df.withColumn("Age", col("Age") * 2) # Show the modified DataFrame df_modified.show()
A constant column to a DataFrame in PySpark involves using the withColumn()
function along with the lit()
function to specify the constant value. Here's how you can do it:
from pyspark.sql import SparkSession from pyspark.sql.functions import lit # Initialize SparkSession spark = SparkSession.builder \ .appName("Add Column") \ .getOrCreate() data = [("Vengat", 50), ("Varnika", 15), ("Bob", 40)] df = spark.createDataFrame(data, ["Name", "Age"]) # Add a constant column named "City" with value "Chennai" df_with_city = df.withColumn("City", lit("Chennai")) # Show the updated DataFrame df_with_city.show()
from pyspark.sql import SparkSession from pyspark.sql.functions import concat, lit # Initialize SparkSession spark = SparkSession.builder \ .appName("Concatenate Columns") \ .getOrCreate() # Sample DataFrame with Indian names data = [("Akshay", "Kumar"), ("Priyanka", "Chopra"), ("Amitabh", "Bachchan")] df = spark.createDataFrame(data, ["First_Name", "Last_Name"]) # Concatenate "First_Name" and "Last_Name" columns into a new column "Full_Name" df_with_full_name = df.withColumn("Full_Name", concat(df["First_Name"], lit(" "), df["Last_Name"])) # Show the DataFrame with the concatenated column df_with_full_name.show()
new_df = df_with_full_name.drop("Full_Name")
# Rename the column "Name" to "Full_Name" df_renamed = df.withColumnRenamed("Name", "Full_Name") # Show the DataFrame with the renamed column df_renamed.show()
# Sample DataFrame data = [("John", 30, "Male"), ("Jane", 35, "Female"), ("Bob", 40, "Male")] df = spark.createDataFrame(data, ["Name", "Age", "Gender"]) # Define the mapping of old column names to new column names column_mapping = { "Name": "Full_Name", "Age": "Years", "Gender": "Sex" } # Rename the columns using a loop for old_name, new_name in column_mapping.items(): df = df.withColumnRenamed(old_name, new_name) # Show the DataFrame with the renamed columns df.show()
from pyspark.sql import SparkSession # Initialize SparkSession spark = SparkSession.builder \ .appName("Convert Column to List Example") \ .getOrCreate() # Sample DataFrame data = [("John", 30), ("Jane", 35), ("Bob", 40)] df = spark.createDataFrame(data, ["Name", "Age"]) # Select the column you want to convert to a list and collect the values into a list name_list = df.select("Name").rdd.flatMap(lambda x: x).collect() # Show the Python list print(name_list)
df_filtered = df.filter(df["Age"] > 30)
# Sort rows by Name in descending order df_sorted = df.orderBy(df["Name"].desc())
df_grouped = df.groupBy("Name").agg({"Age": "avg"})
# Join two DataFrames df1 and df2 on the common column "ID" df_joined = df1.join(df2, df1["ID"] == df2["ID"], "inner")
df_pivoted = df.groupBy("Col1").pivot("Col2").agg(F.sum("Value"))
from pyspark.sql.window import Window import pyspark.sql.functions as F window_spec = Window.partitionBy("Department").orderBy("Salary") df.withColumn("rank", F.rank().over(window_spec))
from pyspark.sql.functions import udf from pyspark.sql.types import StringType # Define UDF to convert names to uppercase upper_udf = udf(lambda x: x.upper(), StringType()) # Apply UDF to DataFrame column "Name" df_with_upper_name = df.withColumn("UpperName", upper_udf(df["Name"]))
df_distinct = df.select("Name", "City").distinct()
df_no_duplicates = df.dropDuplicates(["Column1", "Column2"])
df_filled = df.fillna(0) # Fill null values with a specific value
df_converted = df.withColumn("NewCol", df["OldCol"].cast("double"))