Tips for Optimizing Apache Spark Queries — Part 1 (Data Partitioning)

||
Posted 1 year, 2 months ago
||
Views 223
||
4 min read
0 reactions

Optimizing Apache Spark queries can lead to significant improvements in performance and resource utilization. Below are some general tips and strategies for speeding up your Spark queries based on my experience,

Pre- requisite

I have used Codelab Notebook for executing the Spark code,

  • Python
  • Spark
  • Codelab
  • Pandas

Data Partitioning:

Partitioning divides your dataset into smaller parts based on a specified column or set of columns, which helps in distributing the data more efficiently across the nodes.

Let’s design an example using a e-commerce scenario. Suppose we have three DataFrames: ordersproducts, and customers. We will focus on the partitioning of the orders DataFrame for this example.

First, let’s create some sample data:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("PartitioningExample").getOrCreate()

# Sample data for customers, products, and orders
customers_list = [Row(id=1, name="Vengat"), Row(id=2, name="Varnika"), Row(id=3, name="magi")]
products_list = [Row(id=1, name="Laptop"), Row(id=2, name="Phone"), Row(id=3, name="Tablet")]
orders_list = [Row(id=1, customerId=1, productId=2), Row(id=2, customerId=2, productId=3), Row(id=3, customerId=3, productId=1), Row(id=4, customerId=1, productId=1)]

customers = spark.createDataFrame(customers_list)
products = spark.createDataFrame(products_list)
orders = spark.createDataFrame(orders_list)

Repartitioning orders DataFrame:

Lets take we want to repartition the orders DataFrame based on customerId: We have partitioned based on number of distinct count customer values in Customer table(DF)

# Repartition based on 'customerId'
orders_repartitioned = orders.repartition(3, "customerId")
print(orders_repartitioned.rdd.getNumPartitions())  # To check the number of partitions

Writing the Partitioned Data:

If you want to save the repartitioned data to a persistent storage like HDFS or S3, you can partition it by customerId:

This will create separate directories (partitions) for each unique customerId in the destination path.

Joining with customers DataFrame:

When joining orders and customers, if both DataFrames are partitioned by customerId, Spark can perform optimizations like avoiding shuffles:

customers_repartitioned = customers.repartition("id")
joined = orders_repartitioned.join(customers_repartitioned, orders_repartitioned.customerId == customers_repartitioned.id)

Coalescing Partitions:

If you find that your DataFrame is over-partitioned (too many small partitions), you can use coalesce to reduce the number of partitions:

coalesced_orders = orders_repartitioned.coalesce(2) print(coalesced_orders.rdd.getNumPartitions())

Lets take another example to understand more about data partitioning. Our sample example using a e-commerce dataset, where we have records of user activity on an online platform. This dataset contains user interactions like views, adds to cart, and purchases.

Data Structure:

Suppose our dataset has the following columns:

  • timestamp: the time of the activity
  • userId: unique identifier for users
  • productId: unique identifier for products
  • activityType: type of activity, e.g., 'view', 'addToCart', 'purchase'
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from random import choice, randint
import datetime
spark = SparkSession.builder.appName("UserExample").getOrCreate()
# Generate random data
n_records = 100000  
users = 1000
products = 500
activities = ["view", "addToCart", "purchase"]
data = [(str(datetime.datetime.now() - datetime.timedelta(hours=randint(0, 72))),
         randint(1, users),
         randint(1, products),
         choice(activities)) for _ in range(n_records)]
columns = ["timestamp", "userId", "productId", "activityType"]
df = spark.createDataFrame(data, schema=columns)
df.show(5)

Partitioning:

  1. By activityType: Let's say you frequently query data based on the type of activity. Partitioning by activityType can optimize your queries.
df_activity_partitioned = df.repartition("activityType")

2. By timestamp: If you often perform time-based analysis, partitioning by a time column is beneficial. However, our timestamp has a high cardinality, making it suboptimal for direct partitioning. Instead, we can create a date column and partition by it.

df_with_date = df.withColumn("date", to_date(col("timestamp")))
df_date_partitioned = df_with_date.repartition("date")

Writing Data:

After repartitioning, you can write the data to storage, like HDFS or S3, in a partitioned manner:

# Writing activity partitioned 
datadf_activity_partitioned.write.partitionBy("activityType").parquet("customer_actvity")
# Writing date partitioned 
datadf_date_partitioned.write.partitionBy("date").parquet("customer_date")

Things to Note:

  • The goal is to pick partition columns that align with your most frequent query patterns.
  • Avoid high cardinality columns as direct partition keys since it leads to a huge number of small partitions.
  • Regularly monitor and potentially repartition your data as the size and query patterns evolve.

I hope you found this guide helpful. Your feedback fuels my passion for sharing knowledge, so if you appreciated this article, please give it a thumbs up and share your thoughts in the comments below.

For more insights and to stay connected, you can find me on:


0 reactions

Discussion


Looking for Freelancing Jobs
Joined on April 15, 2020

Latest Videos