PySpark DataFrames provide a powerful set of aggregation methods that let you compute summaries across groups of data. Whether you're calculating totals, averages, or ranking records within groups, the aggregation API gives you the tools to express complex analytical queries efficiently. These operations are central to data processing tasks in Databricks, from simple counts to advanced window functions.
The core of PySpark aggregation revolves around groupBy() to partition data and agg() to apply aggregate functions. Understanding when to use built in functions versus window functions versus pivot operations is critical for writing efficient, readable code that passes the exam.
This topic covers the core aggregation API, window functions for ranking and running calculations, pivot tables for reshaping data, and multi dimensional aggregations with cube() and rollup(). You'll see exam questions asking you to choose the right aggregation method for a given scenario.
from pyspark.sql import functions as F
# Group by product category and calculate multiple aggregates
df.groupBy("category").agg(
F.count("*").alias("total_orders"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value"),
F.min("amount").alias("min_order"),
F.max("amount").alias("max_order")
).show()
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Define window partitioned by customer, ordered by date
window_spec = Window.partitionBy("customer_id").orderBy("purchase_date")
df_with_window = df.withColumn(
"running_total", F.sum("amount").over(window_spec)
).withColumn(
"purchase_rank", F.rank().over(window_spec)
).withColumn(
"previous_amount", F.lag("amount", 1).over(window_spec)
).withColumn(
"next_amount", F.lead("amount", 1).over(window_spec)
)
df_with_window.show()
# Transform long format data to wide format
# Input: rows with (product, month, revenue)
# Output: columns for each month with revenue as values
pivoted_df = df.groupBy("product").pivot("month").sum("revenue")
pivoted_df.show()
# You can specify the values for the pivot column to limit columns
pivoted_df = df.groupBy("product").pivot("month", ["Jan", "Feb", "Mar"]).sum("revenue")
pivoted_df.show()