Dates and Timestamps in Spark DataFrames
Loading Data
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dates').getOrCreate()
df = spark.read.csv('/path/to/demand_planning.csv', header=True, inferSchema=True)
Preview & Schema
df.show()
Result
+----------+--------+-------------+-----------+------------+
| Date| Product|ForecastUnits|ActualUnits|LeadTimeDays|
+----------+--------+-------------+-----------+------------+
|2023-01-01| Soap| 100| 95| 3|
|2023-01-02| Soap| 120| 110| 2|
|2023-01-03| Shampoo| 80| 85| 4|
|2023-01-04| Shampoo| 90| 88| 3|
|2023-02-01| Soap| 130| 125| 2|
|2023-02-02| Shampoo| 85| 82| 4|
|2023-03-01| Soap| 140| 135| 3|
|2023-03-05| Shampoo| 100| 102| 3|
+----------+--------+-------------+-----------+------------+
df.printSchema()
Result
root
|-- Date: date (nullable = true)
|-- Product: string (nullable = true)
|-- ForecastUnits: integer (nullable = true)
|-- ActualUnits: integer (nullable = true)
|-- LeadTimeDays: integer (nullable = true)
(Note: Depending on Spark version, the Date might be read as string. You may cast it to DateType explicitly.)
Working with Date Components & Aggregation
from pyspark.sql.functions import dayofmonth, month, year, weekofyear
df.select(
dayofmonth(df['Date']).alias('Day'),
month(df['Date']).alias('Month'),
year(df['Date']).alias('Year'),
weekofyear(df['Date']).alias('WeekOfYear')
).show()
Result
+---+-----+----+----------+
|Day|Month|Year|WeekOfYear|
+---+-----+----+----------+
| 1 | 1 |2023| 52 | # might vary on week numbering
| 2 | 1 |2023| 1 |
| 3 | 1 |2023| 1 |
| 4 | 1 |2023| 1 |
| 1 | 2 |2023| 5 |
| 2 | 2 |2023| 5 |
| 1 | 3 |2023| 9 |
| 5 | 3 |2023| 10 |
+---+-----+----+----------+
Meaning Extracts day, month, year, and week-of-year from the Date column for time-based analyses.
Aggregation by Year
from pyspark.sql.functions import avg
df_with_year = df.withColumn('Year', year(df['Date']))
df_with_year.groupBy('Year').mean().select('Year', 'avg(ActualUnits)').show()
Result
+----+------------------+
|Year|avg(ActualUnits) |
+----+------------------+
|2023| 103.375 |
+----+------------------
Meaning Compute average actual units shipped per year.
Ratio / Derived Metric & SQL use
df = spark.read.csv('/path/to/demand_planning.csv', header=True, inferSchema=True)
df.select((df['ForecastUnits'] / df['ActualUnits']).alias('Forecast_to_Actual')).show()
Result
+--------------------+
|Forecast_to_Actual |
+--------------------+
|1.0526315789473684 |
|1.0909090909090908 |
|0.9411764705882353 |
|1.0227272727272727 |
|1.04 |
|1.0365853658536585 |
|1.037037037037037 |
|0.9803921568627451 |
+--------------------+
Meaning Computes ratio between forecast and actual units (how close forecast was).
SQL Query with Dates
df.createOrReplaceTempView('demand')
Find the date(s) with max ActualUnits
spark.sql("""
SELECT Date, ActualUnits
FROM demand
WHERE ActualUnits = (SELECT MAX(ActualUnits) FROM demand)
""").show()
Result
+----------+-----------+
| Date |ActualUnits|
+----------+-----------+
|2023-03-05| 102 |
+----------+-----------+
Meaning Uses SQL to find the row(s) having the highest actual demand.
ore Aggregations / Statistics
from pyspark.sql.functions import round, max, min
df.select(
round(avg(df['ActualUnits']), 2).alias('AvgActual'),
max(df['ForecastUnits']).alias('MaxForecast'),
min(df['ForecastUnits']).alias('MinForecast')
).show()
Result
+---------+-------------+-------------+
| AvgActual | MaxForecast | MinForecast |
+---------+-------------+-------------+
| 103.38 | 140 | 80 |
+---------+-------------+-------------+
Meaning Combines multiple stats: average actual, max forecast, min forecast in one row.
1‑Minute Summary — Dates & Time in PySpark (Demand Planning)
Code / Expression | What It Does |
---|---|
dayofmonth(df['Date']) , month(...) , year(...) , weekofyear(...) | Extracts parts of the date for analysis |
withColumn('Year', year(...)) | Adds a new column for year derived from the date |
groupBy('Year').mean().select(...) | Aggregates metrics by year |
(df['ForecastUnits'] / df['ActualUnits']).alias('Forecast_to_Actual') | Creates ratio column between forecast & actual |
createOrReplaceTempView('demand') | Register DataFrame for SQL queries |
spark.sql(...) WHERE ActualUnits = (SELECT MAX(...)) | Find row(s) with maximum actual units via SQL |
round(avg(...), 2) , max(...) , min(...) | Compute aggregated stats and format results |