Objective
In this problem, you are analyzing tracking data within the aerospace industry. You are given two DataFrames: aerospace_df (containing information about various aerospace equipment) and company_df (containing information about the companies that own them).
Task
Join these two DataFrames so that you can create a comprehensive tracking report.
Create a new column called status_label that classifies the equipment based on the following rules:
- If the
status is "active" and the country is "USA", the status_label should be "Domestic Active".
- If the
status is "active" and the country is not "USA", the status_label should be "Foreign Active".
- If the
status is not "active", regardless of the country, the status_label should be "Inactive".
Rename the original columns to match the Expected Output Schema exactly, resolving any naming conflicts between the two tables. Save your resulting DataFrame as result_df. Order the final output by id in ascending order.
File Path
- Aerospace Dataset:
/home/interview/aerospace.csv
- Company Dataset:
/home/interview/company.csv
- Starter script:
/home/interview/aerospace_tracker.py
Schema
aerospace.csv
| Column Name |
Data Type |
| id |
string |
| name |
string |
| type |
string |
| status |
string |
| company_id |
string |
company.csv
| Column Name |
Data Type |
| id |
string |
| name |
string |
| country |
string |
Expected Output Schema
| Column Name |
Data Type |
| id |
string |
| equipment_name |
string |
| equipment_type |
string |
| equipment_status |
string |
| company_name |
string |
| country |
string |
| status_label |
string |
Example
Given this sample input:
aerospace_df
| id |
name |
type |
status |
company_id |
| A1 |
Falcon 9 |
Rocket |
active |
C1 |
| A2 |
Starship |
Rocket |
active |
C1 |
| A3 |
Hubble |
Telescope |
active |
C2 |
| A4 |
Galileo |
Satellite |
inactive |
C3 |
| A5 |
Voyager 1 |
Probe |
active |
C3 |
company_df
| id |
name |
country |
| C1 |
SpaceX |
USA |
| C2 |
NASA |
USA |
| C3 |
European Space Agency |
Europe |
The expected output would be:
| id |
equipment_name |
equipment_type |
equipment_status |
company_name |
country |
status_label |
| A1 |
Falcon 9 |
Rocket |
active |
SpaceX |
USA |
Domestic Active |
| A2 |
Starship |
Rocket |
active |
SpaceX |
USA |
Domestic Active |
| A3 |
Hubble |
Telescope |
active |
NASA |
USA |
Domestic Active |
| A4 |
Galileo |
Satellite |
inactive |
European Space Agency |
Europe |
Inactive |
| A5 |
Voyager 1 |
Probe |
active |
European Space Agency |
Europe |
Foreign Active |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
aerospace_df = spark.read.csv("/home/interview/aerospace.csv", header=True, inferSchema=True)
company_df = spark.read.csv("/home/interview/company.csv", header=True, inferSchema=True)
# Step 1: Join the DataFrames using table aliases to prevent column ambiguity
joined_df = aerospace_df.alias("a").join(
company_df.alias("c"),
F.col("a.company_id") == F.col("c.id"),
how="inner"
)
# Step 2 & 3: Apply the conditional logic using F.when() and .otherwise()
result_df = joined_df.withColumn(
"status_label",
F.when((F.col("a.status") == "active") & (F.col("c.country") == "USA"), "Domestic Active")
.when((F.col("a.status") == "active") & (F.col("c.country") != "USA"), "Foreign Active")
.otherwise("Inactive")
)
# Step 4: Select, alias, and sort the columns to perfectly match the Output Schema
result_df = result_df.select(
F.col("a.id").alias("id"),
F.col("a.name").alias("equipment_name"),
F.col("a.type").alias("equipment_type"),
F.col("a.status").alias("equipment_status"),
F.col("c.name").alias("company_name"),
F.col("c.country").alias("country"),
F.col("status_label")
).orderBy("id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Joining with Table Aliases
joined_df = aerospace_df.alias("a").join(
company_df.alias("c"),
F.col("a.company_id") == F.col("c.id"),
how="inner"
)
Because both aerospace_df and company_df contain columns named id and name, simply joining them would create an "ambiguous column" error later when we try to select or modify them. By appending .alias("a") and .alias("c") to the DataFrames, we can explicitly specify which table a column belongs to using F.col("a.name") or F.col("c.name").
Step 2: Conditional Logic with F.when()
F.when((F.col("a.status") == "active") & (F.col("c.country") == "USA"), "Domestic Active")
To create the status_label, we need row-by-row conditional logic. PySpark's F.when(condition, value) operates exactly like an IF/THEN statement in standard programming. We chain multiple conditions together using the bitwise & (AND) operator, wrapping each individual condition in parentheses.
Step 3: The Catch-All Default
.otherwise("Inactive")
The prompt specifies: "If the status is not active, regardless of the country, the status_label should be Inactive." Instead of writing out every possible inactive variation, we use .otherwise("Inactive") at the very end of our F.when() chain. This acts as our ELSE statement, catching any row that didn't meet the previous "active" conditions.
Step 4: Renaming, Formatting, and Sorting
result_df = result_df.select(
F.col("a.id").alias("id"),
F.col("a.name").alias("equipment_name"), ...
).orderBy("id")
Finally, we need to transform our merged, ambiguously named columns into the clean Output Schema. Inside the .select() statement, we reference the aliased columns (like a.name) and immediately chain .alias("equipment_name") to rename them. This accomplishes selecting, renaming, and reordering the columns. Finally, we chain .orderBy("id") to deterministically sort the final report.