Objective
You work in the payroll department and are asked to process the weekly payroll data for the company. You are given two DataFrames: employees contains employee demographic information, and payroll contains their logged hours and rates.
Task
Write a function that calculates the total pay for each employee.
The pay should be calculated as follows:
- If an employee works less than or equal to 40 hours, their pay is the product of their hours worked and their hourly rate.
- If an employee works more than 40 hours, they are paid the regular hourly rate for the first 40 hours, and 1.5 times the hourly rate for any hours worked strictly above 40.
Save your result as result_df. Order the final output by employee_id in ascending order.
File Path
- Employees Dataset:
/home/interview/employees.csv
- Payroll Dataset:
/home/interview/payroll.csv
- Starter script:
/home/interview/run_payroll.py
Schema
employees.csv
| Column Name |
Data Type |
| employee_id |
integer |
| name |
string |
| age |
integer |
| position |
string |
payroll.csv
| Column Name |
Data Type |
| employee_id |
integer |
| hours_worked |
float |
| hourly_rate |
float |
Expected Output Schema
| Column Name |
Data Type |
| employee_id |
integer |
| name |
string |
| position |
string |
| pay |
float |
Example
Given this sample input:
employees
| employee_id |
name |
age |
position |
| 1 |
Alice |
30 |
Software Engineer |
| 2 |
Bob |
28 |
Data Analyst |
| 3 |
Carol |
35 |
Product Manager |
| 4 |
Dave |
26 |
Software Engineer |
payroll
| employee_id |
hours_worked |
hourly_rate |
| 1 |
45.0 |
30.0 |
| 2 |
38.0 |
25.0 |
| 3 |
41.5 |
35.0 |
| 4 |
40.0 |
28.0 |
The output would be:
| employee_id |
name |
pay |
position |
| 1 |
Alice |
1425.00 |
Software Engineer |
| 2 |
Bob |
950.00 |
Data Analyst |
| 3 |
Carol |
1478.75 |
Product Manager |
| 4 |
Dave |
1120.00 |
Software Engineer |
Explanation: * Alice worked 45 hours. Her first 40 hours are at $30.00 ($1200). Her remaining 5 hours are at 1.5x rate ($45.00), equating to $225. Total: $1425.00.
- Bob worked exactly 38 hours at $25.00, resulting in $950.00.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
employees = spark.read.csv("/home/interview/employees.csv", header=True, inferSchema=True)
payroll = spark.read.csv("/home/interview/payroll.csv", header=True, inferSchema=True)
joined_df = employees.join(payroll, on="employee_id", how="inner")
result_df = joined_df.withColumn(
"pay",
F.when(
F.col("hours_worked") > 40,
(40 * F.col("hourly_rate")) + ((F.col("hours_worked") - 40) * F.col("hourly_rate") * 1.5)
).otherwise(F.col("hours_worked") * F.col("hourly_rate"))
)
result_df = result_df.withColumn("pay", F.col("pay").cast("float")) \
.select("employee_id", "name", "position", "pay") \
.orderBy("employee_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Joining the DataFrames
joined_df = employees.join(payroll, on="employee_id", how="inner")
To perform calculations using data from both tables (like knowing an employee's name alongside their hours), we first need to bring them together. employees.join(payroll, on="employee_id", how="inner") merges the two datasets wherever the employee_id matches.
Step 2: Conditional Logic for Overtime
result_df = joined_df.withColumn(
"pay",
F.when(
F.col("hours_worked") > 40,
(40 * F.col("hourly_rate")) + ((F.col("hours_worked") - 40) * F.col("hourly_rate") * 1.5)
).otherwise(F.col("hours_worked") * F.col("hourly_rate"))
)
The core of this problem is applying different math formulas based on how many hours were worked. PySpark handles conditional row-by-row logic using the F.when().otherwise() construct.
F.when(F.col("hours_worked") > 40, ...) checks if the employee qualifies for overtime.
- If true, it applies
(40 * hourly_rate) + ((hours_worked - 40) * hourly_rate * 1.5). This explicitly calculates standard pay for the first 40 hours and adds the time-and-a-half premium to the remaining hours.
.otherwise(...) catches everyone who worked 40 hours or fewer, applying the standard hours_worked * hourly_rate calculation.
Step 3: Casting, Selecting, and Sorting
result_df = result_df.withColumn("pay", F.col("pay").cast("float")) \
.select("employee_id", "name", "position", "pay") \
.orderBy("employee_id")
The output schema strictly asks for pay to be a Float type. Since multiplying floats and integers together can sometimes result in PySpark defaulting to a Double type in the schema, it's a best practice to explicitly enforce the required type using .cast("float"). We then use .select() to filter out the intermediate columns (hours_worked, hourly_rate, age) so the output exactly matches the requested schema, and chain .orderBy("employee_id") to organize the final records.