Objective
You are working with a consumer goods company that tracks its product catalog, transaction-level sales, and warehouse inventory in three separate tables.
Task
Compute a summary for every product that includes total_quantity and total_revenue from the sales table, and total_stock from the inventory table summed across all warehouses. Aggregate the sales and inventory tables separately by product_id before combining them with the products table. Some products may have no sales records, and some may have no inventory records, so you need to choose a join type that preserves all products regardless. Any null values that appear in total_quantity, total_revenue, or total_stock after joining should be replaced with 0. The final output should contain product_id, name, category, total_quantity, total_revenue, and total_stock. Save your result as result_df.
File Path
- Products:
/home/interview/products.csv
- Sales:
/home/interview/sales.csv
- Inventory:
/home/interview/inventory.csv
- Starter script:
/home/interview/product_summary.py
Schema
products.csv
| Column |
Type |
| product_id |
integer |
| name |
string |
| category |
string |
sales.csv
| Column |
Type |
| sale_id |
integer |
| product_id |
integer |
| quantity |
integer |
| revenue |
float |
inventory.csv
| Column |
Type |
| product_id |
integer |
| stock |
integer |
| warehouse |
string |
Expected output schema
| Column |
Type |
| product_id |
integer |
| name |
string |
| category |
string |
| total_quantity |
integer |
| total_revenue |
float |
| total_stock |
integer |
Example
Given this sample input:
products
| product_id |
name |
category |
| 1 |
Cola |
Beverages |
| 2 |
Chips |
Snacks |
| 3 |
Apple |
Fruits |
sales
| sale_id |
product_id |
quantity |
revenue |
| 1 |
1 |
5 |
120.0 |
| 2 |
1 |
3 |
80.0 |
| 3 |
2 |
10 |
250.0 |
inventory
| product_id |
stock |
warehouse |
| 1 |
50 |
Warehouse_A |
| 1 |
30 |
Warehouse_B |
| 3 |
100 |
Warehouse_A |
The output would be:
| product_id |
name |
category |
total_quantity |
total_revenue |
total_stock |
| 1 |
Cola |
Beverages |
8 |
200.0 |
80 |
| 2 |
Chips |
Snacks |
10 |
250.0 |
0 |
| 3 |
Apple |
Fruits |
0 |
0.0 |
100 |
Cola has two sales totaling quantity 8 and revenue 200.0, and inventory across two warehouses totaling 80. Chips has sales but no inventory, so total_stock is 0. Apple has no sales but does have inventory, so total_quantity and total_revenue are 0.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
products = spark.read.csv("/home/interview/products.csv", header=True, inferSchema=True)
sales = spark.read.csv("/home/interview/sales.csv", header=True, inferSchema=True)
inventory = spark.read.csv("/home/interview/inventory.csv", header=True, inferSchema=True)
sales_agg = sales.groupBy("product_id").agg(
F.sum("quantity").alias("total_quantity"),
F.sum("revenue").alias("total_revenue")
)
stock_agg = inventory.groupBy("product_id").agg(
F.sum("stock").alias("total_stock")
)
result_df = products.join(sales_agg, on="product_id", how="left") \
.join(stock_agg, on="product_id", how="left")
result_df = result_df.withColumn("total_quantity", F.coalesce("total_quantity", F.lit(0))) \
.withColumn("total_revenue", F.coalesce("total_revenue", F.lit(0))) \
.withColumn("total_stock", F.coalesce("total_stock", F.lit(0)))
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Aggregating Sales by Product
sales_agg = sales.groupBy("product_id").agg(
F.sum("quantity").alias("total_quantity"),
F.sum("revenue").alias("total_revenue")
)
A single product can appear in many rows of the sales table since each transaction is its own row. Before joining with the products table, you aggregate to get one row per product. groupBy("product_id") groups all sale records for the same product, and F.sum() computes the totals. Using .alias() gives each aggregated column a meaningful name in the output.
Step 2: Aggregating Inventory by Product
stock_agg = inventory.groupBy("product_id").agg(
F.sum("stock").alias("total_stock")
)
Similarly, a product can have stock spread across multiple warehouses. You sum the stock values per product to get a single total_stock figure. Aggregating separately like this (rather than joining first and then aggregating) keeps the logic cleaner and avoids row duplication issues that can inflate totals.
Step 3: Left Joins to Preserve All Products
result_df = products.join(sales_agg, on="product_id", how="left") \
.join(stock_agg, on="product_id", how="left")
The key decision here is using how="left" instead of the default inner join. A left join keeps every row from the left DataFrame (products) even when there is no matching row in the right DataFrame. If you used an inner join, products with no sales or no inventory would be dropped entirely. With a left join, those unmatched products still appear in the result, but their aggregated columns will be null.
Step 4: Replacing Nulls with coalesce
result_df = result_df.withColumn("total_quantity", F.coalesce("total_quantity", F.lit(0))) \
.withColumn("total_revenue", F.coalesce("total_revenue", F.lit(0))) \
.withColumn("total_stock", F.coalesce("total_stock", F.lit(0)))
F.coalesce() takes one or more columns (or literal values) and returns the first non-null value. Here it checks each aggregated column: if it has a value, that value is kept; if it is null (because the left join found no match), it falls back to F.lit(0). This is the standard PySpark pattern for replacing nulls with a default value. An alternative approach is fillna(0), but coalesce gives you finer control when you want different defaults for different columns.
Step 5: Why Aggregate Before Joining
A common mistake is to join all three tables first and then aggregate. This can produce incorrect results because the joins create a cartesian product between matching rows. For example, if a product has 3 sales records and 2 inventory records, joining first would produce 6 rows, and summing quantity would triple the correct value. By aggregating each table to one row per product first, the join is always one-to-one and the totals stay accurate.