Objective
You are working with call center data where individual call records and customer information are stored in separate tables.
Task
Join the calls table with the customers table on cust_id to enrich each call record with customer details. Then group the joined data by date to compute two metrics for each date: the number of distinct customers who made calls and the total duration of all calls. Note that a single customer may place multiple calls on the same day, but they should only be counted once toward the distinct customer count. Your output should contain three columns: date, num_customers, and total_duration. Save your result as result_df.
File Path
- Calls:
/home/interview/calls.csv
- Customers:
/home/interview/customers.csv
- Starter script:
/home/interview/call_center_stats.py
Schema
calls.csv
| Column |
Type |
| call_id |
integer |
| cust_id |
integer |
| date |
string |
| duration |
integer |
customers.csv
| Column |
Type |
| cust_id |
integer |
| name |
string |
| state |
string |
Expected output schema
| Column |
Type |
| date |
string |
| num_customers |
integer |
| total_duration |
integer |
Example
Given this sample input:
calls
| call_id |
cust_id |
date |
duration |
| 1 |
101 |
2024-03-01 |
120 |
| 2 |
102 |
2024-03-01 |
300 |
| 3 |
101 |
2024-03-01 |
200 |
| 4 |
103 |
2024-03-02 |
150 |
| 5 |
101 |
2024-03-02 |
400 |
customers
| cust_id |
name |
state |
| 101 |
Alice Wong |
CA |
| 102 |
Bob Smith |
TX |
| 103 |
Carol Lee |
NY |
The output would be:
| date |
num_customers |
total_duration |
| 2024-03-01 |
2 |
620 |
| 2024-03-02 |
2 |
550 |
On 2024-03-01, customer 101 made two calls (120 + 200) and customer 102 made one call (300). Even though there are three call records, only two distinct customers called, so num_customers is 2 and total_duration is 620. On 2024-03-02, customers 103 and 101 each made one call, giving 2 distinct customers and a total duration of 550.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
calls = spark.read.csv("/home/interview/calls.csv", header=True, inferSchema=True)
customers = spark.read.csv("/home/interview/customers.csv", header=True, inferSchema=True)
joined_df = calls.join(customers, "cust_id")
result_df = joined_df.groupBy("date").agg(
F.countDistinct("cust_id").alias("num_customers"),
F.sum("duration").alias("total_duration")
)
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Reading the CSV Files
calls = spark.read.csv("/home/interview/calls.csv", header=True, inferSchema=True)
customers = spark.read.csv("/home/interview/customers.csv", header=True, inferSchema=True)
Both CSVs are loaded with header=True so the first row is treated as column names and inferSchema=True so Spark automatically detects column types. Without inferSchema, every column would be read as a string, which would cause F.sum("duration") to fail since you cannot sum strings.
Step 2: Joining Calls with Customers
joined_df = calls.join(customers, "cust_id")
This inner join connects each call record to its corresponding customer using cust_id as the join key. When you pass a single string like "cust_id", Spark performs an equi-join and keeps only one copy of the join column. If a call references a cust_id that does not exist in the customers table, that call is dropped (inner join behavior). The joined DataFrame now has columns from both tables: cust_id, call_id, date, duration, name, and state.
Step 3: Grouping by Date and Aggregating
result_df = joined_df.groupBy("date").agg(
F.countDistinct("cust_id").alias("num_customers"),
F.sum("duration").alias("total_duration")
)
This is where the core logic lives. groupBy("date") partitions all call records by their date. Within each group, two aggregations run simultaneously:
F.countDistinct("cust_id") counts how many unique customers made calls on that date. If customer 101 made three calls on 2024-03-01, they are counted only once.
F.sum("duration") adds up the duration of every call on that date, including all calls from the same customer.
The .alias() calls rename the output columns to num_customers and total_duration.
Step 4: Why countDistinct, not count
The key insight in this problem is the difference between F.count("cust_id") and F.countDistinct("cust_id"). F.count() counts every row in the group, so a customer who made 5 calls would contribute 5 to the count. F.countDistinct() counts unique values only, so that same customer contributes just 1. For the num_customers metric, you want to know how many different customers called, not how many calls were made, so countDistinct is the correct choice.
Step 5: Alternative Syntax
You could also write the aggregation using the agg dictionary shorthand, but it does not support countDistinct directly. Another approach is to use approx_count_distinct for very large datasets where an exact count is not required, but for this problem the dataset is small enough that exact counting with countDistinct is appropriate.