Objective
As an Economist, you need to compute the annual growth rate of GDP from multiple economic DataFrames. The GDP growth rate is the percentage increase in a country's GDP from one year to the next. It is calculated by using the formula:
GDP growth rate = [(GDP this year - GDP last year) / GDP last year] * 100
You have been provided with two DataFrames containing economic data for different countries and different years.
Task
Write a PySpark function that combines these DataFrames and returns the annual GDP growth rate for each country and each year.
Constraints:
- The output should be sorted in ascending order first by country name and then by year.
- The GDP growth rate should be rounded off to two decimal places.
- If the GDP data for the exact previous year is not available (e.g. a gap in the records), the GDP growth rate for the current year should be
null.
- You can assume that the data in both the input DataFrames is clean (no missing values, GDP >= 0).
Save your resulting DataFrame as result_df. Ensure the output exactly matches the requested Output Schema.
File Path
- Dataset 1:
/home/interview/df1.csv
- Dataset 2:
/home/interview/df2.csv
- Starter script:
/home/interview/gdp_growth.py
Schema
df1.csv & df2.csv
| Column Name |
Data Type |
| Country |
String |
| Year |
Integer |
| GDP |
Double |
Expected Output Schema
| Column Name |
Data Type |
| Country |
String |
| Year |
Integer |
| GDP_growth_rate |
Double |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df1 = spark.read.csv("/home/interview/df1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("/home/interview/df2.csv", header=True, inferSchema=True)
# Step 1: Combine the two DataFrames into one master dataset
combined_df = df1.unionByName(df2)
# Step 2: Define a window partitioned by Country and ordered chronologically
window_spec = Window.partitionBy("Country").orderBy("Year")
# Step 3: Use the lag function to peek at the previous year's row for both the Year and the GDP
df_with_lag = combined_df.withColumn("prev_year", F.lag("Year").over(window_spec)) \
.withColumn("prev_gdp", F.lag("GDP").over(window_spec))
# Step 4: Calculate the growth rate, ensuring the previous row is exactly 1 year ago
result_df = df_with_lag.withColumn(
"GDP_growth_rate",
F.when(
F.col("Year") - 1 == F.col("prev_year"),
F.round(((F.col("GDP") - F.col("prev_gdp")) / F.col("prev_gdp")) * 100, 2)
).otherwise(F.lit(None).cast("double"))
)
# Step 5: Format the final schema and sort the results
result_df = result_df.select("Country", "Year", "GDP_growth_rate").orderBy("Country", "Year")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Combining the Datasets
combined_df = df1.unionByName(df2)
Because both DataFrames share the exact same schema and represent the same type of data, we need to stack them vertically to create a comprehensive historical record. We use .unionByName() to safely append df2 to the bottom of df1.
Step 2: Defining the Time-Series Window
window_spec = Window.partitionBy("Country").orderBy("Year")
To calculate year-over-year growth, we need to evaluate each country's history individually and in chronological sequence. We define a Window partitioned by Country (so the USA's GDP is never compared to China's) and ordered ascendingly by Year.
Step 3: Retrieving Historical Data Using Lag
df_with_lag = combined_df.withColumn("prev_year", F.lag("Year").over(window_spec)) \
.withColumn("prev_gdp", F.lag("GDP").over(window_spec))
To perform math between the current year and the previous year, they must exist on the same row. PySpark's F.lag(column, offset) function allows us to look "up" the window and pull data from a previous row down into the current row.
Crucial Step: We don't just pull the prev_gdp; we also pull the prev_year. If a country is missing data for 2018, the row preceding 2019 might actually be 2017. Pulling the year allows us to verify if there is a gap in the timeline!
Step 4: Calculating Growth with Conditional Logic
F.when(
F.col("Year") - 1 == F.col("prev_year"),
F.round(((F.col("GDP") - F.col("prev_gdp")) / F.col("prev_gdp")) * 100, 2)
).otherwise(F.lit(None).cast("double"))
We use F.when() to enforce our timeline gap check. If Year - 1 exactly matches our retrieved prev_year, we know it's safe to calculate the growth rate using the provided formula ((current - previous) / previous) * 100. We wrap the formula in F.round(..., 2) to get our two decimal places.
If there is a gap (or if it's the very first year on record, making prev_year null), the .otherwise() block executes, returning a null value cast as a double.
Step 5: Output Formatting
Finally, we drop our intermediate prev_year and prev_gdp helper columns by strictly using .select() for the requested schema, and we use .orderBy("Country", "Year") to sort the report.