Objective
You are working for an outdoor supplies company that sells various items such as camping equipment, hiking gear, fishing equipment, etc. You are given two DataFrames: df_sales containing daily transaction logs, and df_products containing the product catalog.
Task
Write a PySpark function that joins the two DataFrames and calculates the total quantity sold for each product_category on a daily basis.
Save your resulting DataFrame as result_df. Order the final output chronologically by date (ascending), and then alphabetically by product_category (ascending).
File Path
- Sales Dataset:
/home/interview/sales.csv
- Products Dataset:
/home/interview/products.csv
- Starter script:
/home/interview/camping_sales.py
Schema
sales.csv
| Column Name |
Type |
| sales_id |
String |
| product_id |
String |
| date |
Date |
| quantity_sold |
Integer |
products.csv
| Column Name |
Type |
| product_id |
String |
| product_name |
String |
| product_category |
String |
Expected Output Schema
| Column Name |
Type |
| date |
Date |
| product_category |
String |
| total_quantity |
Integer |
Example
Given this sample input:
df_sales
| sales_id |
product_id |
date |
quantity_sold |
| S1 |
P1 |
2023-06-01 |
10 |
| S2 |
P2 |
2023-06-02 |
15 |
| S3 |
P3 |
2023-06-02 |
20 |
| S4 |
P4 |
2023-06-01 |
12 |
| S5 |
P5 |
2023-06-03 |
25 |
df_products
| product_id |
product_name |
product_category |
| P1 |
Camping Tent |
Camping |
| P2 |
Hiking Shoes |
Hiking |
| P3 |
Fishing Rod |
Fishing |
| P4 |
Insulated Bottle |
Hiking |
| P5 |
Outdoor Grill |
Camping |
The expected output would be:
| date |
product_category |
total_quantity |
| 2023-06-01 |
Camping |
10 |
| 2023-06-01 |
Hiking |
12 |
| 2023-06-02 |
Fishing |
20 |
| 2023-06-02 |
Hiking |
15 |
| 2023-06-03 |
Camping |
25 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df_sales = spark.read.csv("/home/interview/sales.csv", header=True, inferSchema=True)
df_products = spark.read.csv("/home/interview/products.csv", header=True, inferSchema=True)
# Step 1: Join the sales and products DataFrames on the shared product_id
joined_df = df_sales.join(df_products, on="product_id", how="inner")
# Step 2: Group by both date and product_category to get daily category metrics
# We calculate the sum of quantity_sold and alias it to match the Expected Schema
result_df = joined_df.groupBy("date", "product_category").agg(
F.sum("quantity_sold").cast("integer").alias("total_quantity")
)
# Step 3: Order the output chronologically, then alphabetically by category
result_df = result_df.orderBy(F.col("date").asc(), F.col("product_category").asc())
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Joining the DataFrames
joined_df = df_sales.join(df_products, on="product_id", how="inner")
To calculate metrics based on product categories, we must associate every transaction log with its corresponding product metadata. Because product_id is a shared key between the df_sales and df_products DataFrames, we perform an inner join. This safely merges the date and quantity metrics with the category names.
Step 2: Multi-Column Grouping and Aggregation
result_df = joined_df.groupBy("date", "product_category").agg(
F.sum("quantity_sold").cast("integer").alias("total_quantity")
)
The core requirement is to find the total quantity sold for each product category on a daily basis. We can achieve this by passing multiple columns into the .groupBy() method. Grouping by "date" and "product_category" ensures that if multiple "Hiking" items are sold on the exact same date, they get placed into the same bucket.
Once grouped, we use .agg() to calculate the F.sum() of the quantity_sold. We explicitly cast it to an integer to satisfy the requested data type, and use .alias("total_quantity") to ensure the final column name is correct.
Step 3: Sorting the Final Report
result_df = result_df.orderBy(F.col("date").asc(), F.col("product_category").asc())
Finally, we order the DataFrame to make it readable. Passing multiple conditions to .orderBy() sorts the data chronologically first, and then breaks any ties (multiple categories on the same day) by sorting the categories alphabetically.