Objective
A pharmaceutical company tracks its equipment data in two different DataFrames. The first, df1, contains metadata about the equipment (like purchase dates). The second, df2, acts as a ledger keeping track of the maintenance history and costs for each piece of equipment.
Task
Write a PySpark function that extracts the most recent maintenance record for each piece of equipment and evaluates its cost relative to all past maintenance costs for that same equipment.
- Filter
df2 to only keep the latest maintenance date per equipment.
- Calculate the
maintenance_cost_rank using a dense rank (where rank 1 represents the highest maintenance cost for that specific piece of equipment across its entire history).
- Join the filtered records with
df1. Ensure the output only contains equipment that has at least one maintenance record.
Save your resulting DataFrame as result_df. Ensure the output matches the exact schema order requested, and order the final output by equipment_id in ascending order.
File Path
- Equipment Dataset:
/home/interview/df1.csv
- Maintenance Dataset:
/home/interview/df2.csv
- Starter script:
/home/interview/pharmaceutical_maintenance.py
Schema
df1.csv
| Column Name |
Data Type |
| equipment_id |
string |
| equipment_name |
string |
| purchase_date |
date |
df2.csv
| Column Name |
Data Type |
| equipment_id |
string |
| maintenance_date |
date |
| maintenance_cost |
double |
Expected Output Schema
| Column Name |
Data Type |
| equipment_id |
string |
| equipment_name |
string |
| purchase_date |
date |
| latest_maintenance_date |
date |
| maintenance_cost_rank |
integer |
Example
Given this sample input:
df1
| equipment_id |
equipment_name |
purchase_date |
| EQ001 |
Mixer |
2020-01-01 |
| EQ002 |
Centrifuge |
2020-02-01 |
| EQ003 |
Pipette |
2020-03-01 |
df2
| equipment_id |
maintenance_date |
maintenance_cost |
| EQ001 |
2021-06-01 |
500.0 |
| EQ002 |
2021-07-01 |
400.0 |
| EQ001 |
2021-07-02 |
600.0 |
The expected output would be:
| equipment_id |
equipment_name |
purchase_date |
latest_maintenance_date |
maintenance_cost_rank |
| EQ001 |
Mixer |
2020-01-01 |
2021-07-02 |
1 |
| EQ002 |
Centrifuge |
2020-02-01 |
2021-07-01 |
1 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df1 = spark.read.csv("/home/interview/df1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("/home/interview/df2.csv", header=True, inferSchema=True)
# Step 1: Define a window to rank maintenance costs for each piece of equipment (Highest cost = 1)
cost_window = Window.partitionBy("equipment_id").orderBy(F.col("maintenance_cost").desc())
# Step 2: Define a window to find the most recent maintenance date
# We use cost as a secondary sort to ensure deterministic tie-breaking if dates are identical
date_window = Window.partitionBy("equipment_id").orderBy(F.col("maintenance_date").desc(), F.col("maintenance_cost").desc())
# Step 3: Apply both window functions to the maintenance DataFrame
df2_ranked = df2.withColumn("maintenance_cost_rank", F.dense_rank().over(cost_window)) \
.withColumn("rn", F.row_number().over(date_window))
# Step 4: Filter for only the latest maintenance record per equipment (rn == 1)
df2_latest = df2_ranked.filter(F.col("rn") == 1)
# Step 5: Inner join with the equipment metadata to drop any equipment without maintenance records
result_df = df1.join(df2_latest, on="equipment_id", how="inner")
# Step 6: Select, rename, and sort columns to perfectly match the target schema
result_df = result_df.select(
"equipment_id",
"equipment_name",
"purchase_date",
F.col("maintenance_date").alias("latest_maintenance_date"),
"maintenance_cost_rank"
).orderBy("equipment_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Defining the Windows
cost_window = Window.partitionBy("equipment_id").orderBy(F.col("maintenance_cost").desc())
date_window = Window.partitionBy("equipment_id").orderBy(F.col("maintenance_date").desc(), F.col("maintenance_cost").desc())
We need two separate windows because we are evaluating two different metrics within each equipment's history: the highest cost and the newest date. Both windows are partitioned by equipment_id so we evaluate each machine individually, but they are ordered differently to serve their specific purposes.
Step 2: Calculating Dense Rank and Row Number
df2_ranked = df2.withColumn("maintenance_cost_rank", F.dense_rank().over(cost_window)) \
.withColumn("rn", F.row_number().over(date_window))
We apply F.dense_rank() over the cost window to rank the maintenance events by price (1 being the most expensive). We use a dense rank so that identical costs share the exact same rank without skipping numbers (e.g., Rank 1, 1, 2). We simultaneously apply F.row_number() over the date window to assign a chronological index to the events (1 being the newest).
Step 3: Isolating the Latest Record
df2_latest = df2_ranked.filter(F.col("rn") == 1)
By filtering for rn == 1, we drop all historical maintenance records, keeping only the single newest log per piece of equipment. Because we already calculated the cost rank in the previous step, that ranking safely survives this filter!
Step 4: Inner Join with Equipment Metadata
result_df = df1.join(df2_latest, on="equipment_id", how="inner")
The prompt specifically states: "Ensure that the output only contains the equipment that has at least one maintenance record in df2". An inner join perfectly satisfies this. If an item in df1 has never been serviced, it won't exist in df2_latest, and the inner join will naturally drop it from the final report.
Step 5: Formatting and Sorting the Output
result_df = result_df.select(
"equipment_id",
"equipment_name",
"purchase_date",
F.col("maintenance_date").alias("latest_maintenance_date"),
"maintenance_cost_rank"
).orderBy("equipment_id")
Finally, we use a .select() statement to arrange the columns. We grab the maintenance_date column originating from df2 and rename it to latest_maintenance_date using .alias(). This ensures the final DataFrame exactly matches the requested output structure. We chain .orderBy("equipment_id") to finish the job by organizing the report deterministically.