SPARK

Your first Spark analytics pipeline

Time to actually use Spark for what it's built for. We'll take a raw movie ratings dataset, clean it, aggregate it, and produce a top-10 movies list.

What we're doing

You'll build an end-to-end pipeline: load a CSV of movie ratings, clean the messy rows, compute the average rating per movie, filter to well-rated movies only, and save the results as Parquet files.

Step 1: Prepare the dataset

Open the terminal and create a sample CSV to work with. Copy and paste this whole block:

mkdir -p ~/spark-data
cat > ~/spark-data/ratings.csv << 'EOF'
user_id,movie_title,rating,timestamp
1,The Matrix,9.0,2024-01-15
1,Inception,8.5,2024-01-16
2,The Matrix,8.5,2024-01-17
2,Titanic,7.0,2024-01-18
3,The Matrix,9.5,2024-01-20
3,Inception,9.0,2024-01-21
3,Interstellar,9.0,2024-01-22
4,Titanic,,2024-01-23
4,The Matrix,8.0,2024-01-24
4,Inception,8.5,2024-01-25
5,Interstellar,8.5,2024-01-26
5,The Matrix,9.0,2024-01-27
5,Titanic,7.5,2024-01-28
6,Inception,,2024-01-29
6,Interstellar,9.5,2024-01-30
7,The Matrix,,
8,Inception,9.5,2024-02-01
9,Interstellar,10.0,2024-02-02
10,The Matrix,8.5,2024-02-03
EOF

This creates a tiny ratings file.

You can inspect the file:

cat ~/spark-data/ratings.csv | head

Step 2: Start an interactive Spark session

pyspark --master spark://default:7077

Step 3: Load the CSV into a DataFrame

df = spark.read.csv(
    "/home/preparesh/spark-data/ratings.csv",
    header=True,
    inferSchema=True
)
  • spark.read.csv(...) — Spark's built-in CSV reader
  • header=True — the first row is column names
  • inferSchema=True — figure out data types automatically instead of treating everything as a string

show() prints the DataFrame as a table. printSchema() shows the column names and their inferred data types. You should see rating as a double, timestamp as a date-ish type, and the strings as strings.


Step 4: Clean the data

Some rows have nulls and we need to drop those.

clean_df = df.dropna(subset=["rating"])
clean_df.show()
  • dropna(subset=["rating"]) — drop any row where the rating column is null
  • Rows with a null timestamp but a valid rating are kept — timestamp doesn't affect our analysis

Step 5: Aggregate — average rating per movie

Now compute the average rating and total number of ratings per movie:

from pyspark.sql.functions import avg, count

agg_df = clean_df.groupBy("movie_title").agg(
    avg("rating").alias("average_rating"),
    count("rating").alias("num_ratings")
)
agg_df.show()
  • groupBy("movie_title") — group all rows by movie title
  • avg("rating") — for each group, compute the average of the rating column
  • count("rating") — count how many ratings each movie has
  • alias(...) — rename the output columns to something readable

Step 6: Filter to well-rated movies

Right now the results include every movie, even ones with only one or two ratings. Filter to movies with at least 4 ratings.

popular_df = agg_df.filter(agg_df.num_ratings >= 4)
popular_df.show()

Step 7: Sort and take the top 10

Now order by average rating descending, take the top 10:

top_movies = popular_df.orderBy("average_rating", ascending=False).limit(10)
top_movies.show()
  • orderBy("average_rating", ascending=False) — sort highest rating first
  • limit(10) — keep only the first 10 rows

Step 8: Save the results as Parquet

For the output instead of CSV, we'll save as Parquet, which is the standard columnar format for analytics.

top_movies.write.mode("overwrite").parquet("/home/preparesh/spark-data/top_movies")
  • write.mode("overwrite") — replace any previous results at this path
  • parquet(...) — save in Parquet format

Step 9: Verify

Load the Parquet output to confirm it saved correctly:

result = spark.read.parquet("/home/preparesh/spark-data/top_movies")
result.show()

Same rows as before, but now they're persisted. You could hand this file to a dashboard, another pipeline, or an ML model.


What's next

Now go and try this out in a live environment — boot a fresh cluster and play with the manifests above.

Start Spark
Spec 2 CPU / 8 GiB ·Disk 25 GiB
Sign in to launch this environment
Required 1 VM · 2 CPU · 8 GB · 25 GiB disk
Available 1 VM · 1 CPU · 2 GB · 10 GiB disk
Sign in