Objective
You are a Data Analyst at an amusement park operator. You've been given two DataFrames: rides (containing metadata about the park's attractions) and visitors (containing logs of visitor ride histories and ratings).
Task
Write a PySpark function to identify the ride with the most anomalous average visitor rating. An anomalous ride is defined as a ride whose average rating is the furthest (either significantly higher or lower) from the global average rating across all rides.
Save your resulting DataFrame as result_df. Ensure the output strictly matches the requested Output Schema, casting average_rating to a Float and is_anomalous to a Boolean. Order the final output by ride_id in ascending order.
File Path
- Rides Dataset:
/home/interview/rides.csv
- Visitors Dataset:
/home/interview/visitors.csv
- Starter script:
/home/interview/park_outlier.py
Schema
rides.csv
| Column Name |
Data Type |
| ride_id |
string |
| ride_name |
string |
| type |
string |
| capacity |
integer |
visitors.csv
| Column Name |
Data Type |
| visitor_id |
string |
| ride_id |
string |
| timestamp |
timestamp |
| rating |
integer |
Expected Output Schema
| Column Name |
Data Type |
| ride_id |
string |
| ride_name |
string |
| average_rating |
float |
| is_anomalous |
boolean |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
rides = spark.read.csv("/home/interview/rides.csv", header=True, inferSchema=True)
visitors = spark.read.csv("/home/interview/visitors.csv", header=True, inferSchema=True)
# Step 1: Calculate the average rating per individual ride
ride_avg = visitors.groupBy("ride_id").agg(
F.avg("rating").cast("float").alias("average_rating")
)
# Step 2: Join with the rides catalog to get the ride names
joined_df = rides.join(ride_avg, on="ride_id", how="inner")
# Step 3: Define a global window (no partition) to calculate park-wide metrics
global_window = Window.partitionBy()
# Step 4: Calculate the global average, the absolute difference for each ride, and the max difference
metrics_df = joined_df.withColumn(
"global_avg", F.avg("average_rating").over(global_window)
).withColumn(
"diff", F.abs(F.col("average_rating") - F.col("global_avg"))
).withColumn(
"max_diff", F.max("diff").over(global_window)
)
# Step 5: Flag the most anomalous ride(s) using a tiny tolerance to avoid floating-point errors
result_df = metrics_df.withColumn(
"is_anomalous",
(F.abs(F.col("diff") - F.col("max_diff")) < 1e-5).cast("boolean")
)
# Step 6: Select columns to match the Output Schema exactly and sort
result_df = result_df.select("ride_id", "ride_name", "average_rating", "is_anomalous") \
.orderBy("ride_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Aggregating Ride Averages
Python
ride_avg = visitors.groupBy("ride_id").agg(
F.avg("rating").cast("float").alias("average_rating")
)
We begin by grouping the visitors table by ride_id to find the average rating for each specific attraction. We explicitly cast this result to a Float to match the target schema.
Step 2: Joining with the Catalog
Python
joined_df = rides.join(ride_avg, on="ride_id", how="inner")
Next, we join our aggregated ratings back to the rides DataFrame using an inner join. This attaches the ride_name and other metadata so we can easily identify the rides in our final report.
Step 3: Defining a Global Window
Python
global_window = Window.partitionBy()
To determine if a ride is an anomaly, we must compare its specific average against the global average of all rides. To calculate a metric across the entire dataset without collapsing the rows (which a standard .groupBy() would do), we define an empty Window function using Window.partitionBy() with no arguments.
Step 4: Calculating Deviation
Python
metrics_df = joined_df.withColumn(
"global_avg", F.avg("average_rating").over(global_window)
).withColumn(
"diff", F.abs(F.col("average_rating") - F.col("global_avg"))
).withColumn(
"max_diff", F.max("diff").over(global_window)
)
Using our global window, we compute three new columns:
global_avg: The average of the average_rating column across the whole park.
diff: The absolute difference F.abs() between the ride's rating and the global rating. This measures how far away it is from the norm.
max_diff: The highest deviation found across the entire park using F.max().over(global_window).
Step 5: Flagging the Outlier Safely
Python
result_df = metrics_df.withColumn(
"is_anomalous",
(F.abs(F.col("diff") - F.col("max_diff")) < 1e-5).cast("boolean")
)
To flag the anomalous ride, we just need to find where the ride's diff is equal to the max_diff. However, PySpark float arithmetic can sometimes yield extremely tiny precision differences (like 1.000000001 != 1.0). To write bulletproof code, we check if the difference between diff and max_diff is less than a microscopic threshold (1e-5) rather than using strict equality (==).
Step 6: Formatting and Sorting the Final Report
result_df = result_df.select("ride_id", "ride_name", "average_rating", "is_anomalous") \
.orderBy("ride_id")
Finally, we chain a .select() statement to drop all of our intermediate mathematical columns (global_avg, diff, max_diff) and output exactly what the prompt schema requested. We then use .orderBy("ride_id") to deterministically sort the report.