Skip to main content

Dates and Timestamps in Spark DataFrames

Loading Data

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('dates').getOrCreate()

df = spark.read.csv('/path/to/demand_planning.csv', header=True, inferSchema=True)

Preview & Schema

df.show()

Result

+----------+--------+-------------+-----------+------------+
| Date| Product|ForecastUnits|ActualUnits|LeadTimeDays|
+----------+--------+-------------+-----------+------------+
|2023-01-01| Soap| 100| 95| 3|
|2023-01-02| Soap| 120| 110| 2|
|2023-01-03| Shampoo| 80| 85| 4|
|2023-01-04| Shampoo| 90| 88| 3|
|2023-02-01| Soap| 130| 125| 2|
|2023-02-02| Shampoo| 85| 82| 4|
|2023-03-01| Soap| 140| 135| 3|
|2023-03-05| Shampoo| 100| 102| 3|
+----------+--------+-------------+-----------+------------+
df.printSchema()

Result

root
|-- Date: date (nullable = true)
|-- Product: string (nullable = true)
|-- ForecastUnits: integer (nullable = true)
|-- ActualUnits: integer (nullable = true)
|-- LeadTimeDays: integer (nullable = true)

(Note: Depending on Spark version, the Date might be read as string. You may cast it to DateType explicitly.)

Working with Date Components & Aggregation

from pyspark.sql.functions import dayofmonth, month, year, weekofyear
df.select(
dayofmonth(df['Date']).alias('Day'),
month(df['Date']).alias('Month'),
year(df['Date']).alias('Year'),
weekofyear(df['Date']).alias('WeekOfYear')
).show()

Result

+---+-----+----+----------+
|Day|Month|Year|WeekOfYear|
+---+-----+----+----------+
| 1 | 1 |2023| 52 | # might vary on week numbering
| 2 | 1 |2023| 1 |
| 3 | 1 |2023| 1 |
| 4 | 1 |2023| 1 |
| 1 | 2 |2023| 5 |
| 2 | 2 |2023| 5 |
| 1 | 3 |2023| 9 |
| 5 | 3 |2023| 10 |
+---+-----+----+----------+

Meaning Extracts day, month, year, and week-of-year from the Date column for time-based analyses.

Aggregation by Year

from pyspark.sql.functions import avg
df_with_year = df.withColumn('Year', year(df['Date']))
df_with_year.groupBy('Year').mean().select('Year', 'avg(ActualUnits)').show()

Result

+----+------------------+
|Year|avg(ActualUnits) |
+----+------------------+
|2023| 103.375 |
+----+------------------

Meaning Compute average actual units shipped per year.

Ratio / Derived Metric & SQL use

df = spark.read.csv('/path/to/demand_planning.csv', header=True, inferSchema=True)
df.select((df['ForecastUnits'] / df['ActualUnits']).alias('Forecast_to_Actual')).show()

Result

+--------------------+
|Forecast_to_Actual |
+--------------------+
|1.0526315789473684 |
|1.0909090909090908 |
|0.9411764705882353 |
|1.0227272727272727 |
|1.04 |
|1.0365853658536585 |
|1.037037037037037 |
|0.9803921568627451 |
+--------------------+

Meaning Computes ratio between forecast and actual units (how close forecast was).

SQL Query with Dates

df.createOrReplaceTempView('demand')

Find the date(s) with max ActualUnits

spark.sql("""
SELECT Date, ActualUnits
FROM demand
WHERE ActualUnits = (SELECT MAX(ActualUnits) FROM demand)
""").show()

Result

+----------+-----------+
| Date |ActualUnits|
+----------+-----------+
|2023-03-05| 102 |
+----------+-----------+

Meaning Uses SQL to find the row(s) having the highest actual demand.

ore Aggregations / Statistics

from pyspark.sql.functions import round, max, min
df.select(
round(avg(df['ActualUnits']), 2).alias('AvgActual'),
max(df['ForecastUnits']).alias('MaxForecast'),
min(df['ForecastUnits']).alias('MinForecast')
).show()

Result

+---------+-------------+-------------+
| AvgActual | MaxForecast | MinForecast |
+---------+-------------+-------------+
| 103.38 | 140 | 80 |
+---------+-------------+-------------+

Meaning Combines multiple stats: average actual, max forecast, min forecast in one row.

1‑Minute Summary — Dates & Time in PySpark (Demand Planning)

Code / ExpressionWhat It Does
dayofmonth(df['Date']), month(...), year(...), weekofyear(...)Extracts parts of the date for analysis
withColumn('Year', year(...))Adds a new column for year derived from the date
groupBy('Year').mean().select(...)Aggregates metrics by year
(df['ForecastUnits'] / df['ActualUnits']).alias('Forecast_to_Actual')Creates ratio column between forecast & actual
createOrReplaceTempView('demand')Register DataFrame for SQL queries
spark.sql(...) WHERE ActualUnits = (SELECT MAX(...))Find row(s) with maximum actual units via SQL
round(avg(...), 2), max(...), min(...)Compute aggregated stats and format results