Objective
You work for a Private Equity (PE) Firm and are given two DataFrames: portfolio, which contains the companies that a private equity firm holds, and prices, which contains the daily price movements for the equities.
Task
Write a PySpark function to merge these two datasets and compute the total daily portfolio value for each private equity firm.
To find the daily portfolio value, you must multiply the number of shares a firm holds in a company by the closing_price of that company on a specific date, and then sum those values up for the entire firm.
Save your resulting DataFrame as result_df. Ensure the output strictly matches the requested Output Schema, casting the final portfolio_value to an Integer. Order the output alphabetically by PE_firm, and then chronologically by date.
File Path
- Portfolio Dataset:
/home/interview/portfolio.csv
- Prices Dataset:
/home/interview/prices.csv
- Starter script:
/home/interview/portfolio_values.py
Schema
portfolio.csv
| Column Name |
Data Type |
Description |
| PE_firm |
String |
The name of the private equity firm |
| company |
String |
The name of the company |
| shares |
Integer |
The number of shares the firm holds in the company |
prices.csv
| Column Name |
Data Type |
Description |
| date |
Date |
The date |
| company |
String |
The name of the company |
| closing_price |
Double |
The closing price of the company's equity on the date |
Expected Output Schema
| Column Name |
Data Type |
Description |
| PE_firm |
String |
The name of the private equity firm |
| date |
Date |
The date |
| portfolio_value |
Integer |
The daily portfolio value of the private equity firm |
Example
Given this sample input:
portfolio
| PE_firm |
company |
shares |
| Alpha |
A |
1000 |
| Alpha |
B |
2000 |
| Beta |
A |
1500 |
| Beta |
C |
2500 |
| Gamma |
B |
1200 |
| Gamma |
C |
1300 |
prices
| date |
company |
closing_price |
| 2023-01-01 |
A |
50.0 |
| 2023-01-01 |
B |
20.0 |
| 2023-01-01 |
C |
30.0 |
| 2023-01-02 |
A |
52.0 |
| 2023-01-02 |
B |
21.0 |
| 2023-01-02 |
C |
31.0 |
The expected output would be:
| PE_firm |
date |
portfolio_value |
| Alpha |
2023-01-01 |
90000 |
| Alpha |
2023-01-02 |
94000 |
| Beta |
2023-01-01 |
150000 |
| Beta |
2023-01-02 |
155500 |
| Gamma |
2023-01-01 |
63000 |
| Gamma |
2023-01-02 |
65500 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
portfolio = spark.read.csv("/home/interview/portfolio.csv", header=True, inferSchema=True)
prices = spark.read.csv("/home/interview/prices.csv", header=True, inferSchema=True)
# Step 1: Join the DataFrames on the shared 'company' column
joined_df = portfolio.join(prices, on="company", how="inner")
# Step 2: Multiply shares by closing_price to get the value of each individual holding per day
value_df = joined_df.withColumn("holding_value", F.col("shares") * F.col("closing_price"))
# Step 3: Group by PE_firm and date, then sum the holding values to get the total portfolio value
result_df = value_df.groupBy("PE_firm", "date").agg(
F.sum("holding_value").cast("integer").alias("portfolio_value")
)
# Step 4: Format the final schema and order by firm and date
result_df = result_df.select(
"PE_firm", "date", "portfolio_value"
).orderBy("PE_firm", "date")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Merging the Datasets
joined_df = portfolio.join(prices, on="company", how="inner")
To calculate the value of the holdings, we need the daily stock price mapped to the number of shares owned. Because company is a shared column between the portfolio and prices DataFrames, we perform an inner join. This safely replicates the daily stock prices for every PE firm that holds that specific company.
Step 2: Calculating Individual Holding Values
value_df = joined_df.withColumn("holding_value", F.col("shares") * F.col("closing_price"))
Before we can find the total portfolio value, we must find the value of each individual holding. PySpark allows direct mathematical operations between columns. By multiplying shares by closing_price, we create a new column representing the total dollar amount a PE firm holds in a single company on a specific day.
Step 3: Multi-Column Grouping and Aggregation
result_df = value_df.groupBy("PE_firm", "date").agg(
F.sum("holding_value").cast("integer").alias("portfolio_value")
)
To compute the total value for the entire firm, we need to sum up all of its individual holdings. By passing both "PE_firm" and "date" into the .groupBy() method, we ensure the aggregations are calculated on a daily basis for each firm. We then calculate the F.sum() of our new holding_value column, explicitly .cast("integer") to satisfy the requested data type, and use .alias("portfolio_value") to set the proper column name.
Step 4: Output Formatting
result_df = result_df.select("PE_firm", "date", "portfolio_value").orderBy("PE_firm", "date")
Finally, we chain a .select() block to ensure the final output columns are arranged exactly as requested, and apply .orderBy("PE_firm", "date") to organize the report cleanly.