SPARK Your first Spark analytics pipeline
Time to actually use Spark for what it's built for. We'll take a raw movie ratings dataset, clean it, aggregate it, and produce a top-10 movies list.
What we're doing
You'll build an end-to-end pipeline: load a CSV of movie ratings, clean the messy rows, compute the average rating per movie, filter to well-rated movies only, and save the results as Parquet files.
Step 1: Prepare the dataset
Open the terminal and create a sample CSV to work with. Copy and paste this whole block:
mkdir -p ~/spark-data
cat > ~/spark-data/ratings.csv << 'EOF'
user_id,movie_title,rating,timestamp
1,The Matrix,9.0,2024-01-15
1,Inception,8.5,2024-01-16
2,The Matrix,8.5,2024-01-17
2,Titanic,7.0,2024-01-18
3,The Matrix,9.5,2024-01-20
3,Inception,9.0,2024-01-21
3,Interstellar,9.0,2024-01-22
4,Titanic,,2024-01-23
4,The Matrix,8.0,2024-01-24
4,Inception,8.5,2024-01-25
5,Interstellar,8.5,2024-01-26
5,The Matrix,9.0,2024-01-27
5,Titanic,7.5,2024-01-28
6,Inception,,2024-01-29
6,Interstellar,9.5,2024-01-30
7,The Matrix,,
8,Inception,9.5,2024-02-01
9,Interstellar,10.0,2024-02-02
10,The Matrix,8.5,2024-02-03
EOF
This creates a tiny ratings file.
You can inspect the file:
cat ~/spark-data/ratings.csv | head
Step 2: Start an interactive Spark session
pyspark --master spark://default:7077
Step 3: Load the CSV into a DataFrame
df = spark.read.csv(
"/home/preparesh/spark-data/ratings.csv",
header=True,
inferSchema=True
)
spark.read.csv(...)— Spark's built-in CSV readerheader=True— the first row is column namesinferSchema=True— figure out data types automatically instead of treating everything as a string
show() prints the DataFrame as a table. printSchema() shows the column names and their inferred data types. You should see rating as a double, timestamp as a date-ish type, and the strings as strings.
Step 4: Clean the data
Some rows have nulls and we need to drop those.
clean_df = df.dropna(subset=["rating"])
clean_df.show()
dropna(subset=["rating"])— drop any row where the rating column is null- Rows with a null timestamp but a valid rating are kept — timestamp doesn't affect our analysis
Step 5: Aggregate — average rating per movie
Now compute the average rating and total number of ratings per movie:
from pyspark.sql.functions import avg, count
agg_df = clean_df.groupBy("movie_title").agg(
avg("rating").alias("average_rating"),
count("rating").alias("num_ratings")
)
agg_df.show()
groupBy("movie_title")— group all rows by movie titleavg("rating")— for each group, compute the average of the rating columncount("rating")— count how many ratings each movie hasalias(...)— rename the output columns to something readable
Step 6: Filter to well-rated movies
Right now the results include every movie, even ones with only one or two ratings. Filter to movies with at least 4 ratings.
popular_df = agg_df.filter(agg_df.num_ratings >= 4)
popular_df.show()
Step 7: Sort and take the top 10
Now order by average rating descending, take the top 10:
top_movies = popular_df.orderBy("average_rating", ascending=False).limit(10)
top_movies.show()
orderBy("average_rating", ascending=False)— sort highest rating firstlimit(10)— keep only the first 10 rows
Step 8: Save the results as Parquet
For the output instead of CSV, we'll save as Parquet, which is the standard columnar format for analytics.
top_movies.write.mode("overwrite").parquet("/home/preparesh/spark-data/top_movies")
write.mode("overwrite")— replace any previous results at this pathparquet(...)— save in Parquet format
Step 9: Verify
Load the Parquet output to confirm it saved correctly:
result = spark.read.parquet("/home/preparesh/spark-data/top_movies")
result.show()
Same rows as before, but now they're persisted. You could hand this file to a dashboard, another pipeline, or an ML model.
What's next
Start Spark