Data Filtering in Spark DataFrames
Loading the Data
from pyspark.sql import SparkSession
# Start Spark session
spark = SparkSession.builder.appName('ops').getOrCreate()
# Load the CSV with header and schema inference
df = spark.read.csv('/path/to/orders.csv', inferSchema=True, header=True)
Why We Need This?
We need to load data into a DataFrame so we can filter, transform, and analyze it using Spark.
Preview & Schema
df.show()
Result
+--------+----------+-------------+--------+--------------+---------+----------+
|order_id| product | category |quantity|price_per_unit| status |total_cost|
+--------+----------+-------------+--------+--------------+---------+----------+
| 1001 | Soap |Personal Care| 10 | 15 |Delivered| 150 |
| 1002 | Shampoo |Personal Care| 5 | 60 |Cancelled| 300 |
| 1003 | Bread | Food | 20 | 10 |Delivered| 200 |
| 1004 |Toothpaste|Personal Care| 7 | 25 | Pending | 175 |
| 1005 | Rice | Food | 50 | 40 |Delivered| 2000 |
| 1006 |Chocolate | Food | 30 | 20 | Pending | 600 |
| 1007 | Juice | Beverages | 15 | 30 |Delivered| 450 |
| 1008 | Cereal | Food | 10 | 35 |Delivered| 350 |
| 1009 | Soda | Beverages | 25 | 25 |Cancelled| 625 |
| 1010 | Facewash |Personal Care| 3 | 120 |Delivered| 360 |
+--------+----------+-------------+--------+--------------+---------+----------+
df.printSchema()
Result
root
|-- order_id: integer (nullable = true)
|-- product: string (nullable = true)
|-- category: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- price_per_unit: integer (nullable = true)
|-- status: string (nullable = true)
|-- total_cost: integer (nullable = true)
Filtering Examples
1. Orders with total_cost less than 500
df.filter("total_cost < 500").show()
Why We Need This?
To filter and analyze only the cheaper orders.
Result
+--------+----------+-------------+--------+--------------+---------+----------+
|order_id| product | category |quantity|price_per_unit| status |total_cost|
+--------+----------+-------------+--------+--------------+---------+----------+
| 1001 | Soap |Personal Care| 10 | 15 |Delivered| 150 |
| 1002 | Shampoo |Personal Care| 5 | 60 |Cancelled| 300 |
| 1003 | Bread | Food | 20 | 10 |Delivered| 200 |
| 1004 |Toothpaste|Personal Care| 7 | 25 | Pending | 175 |
| 1007 | Juice | Beverages | 15 | 30 |Delivered| 450 |
| 1008 | Cereal | Food | 10 | 35 |Delivered| 350 |
| 1010 | Facewash |Personal Care| 3 | 120 |Delivered| 360 |
+--------+----------+-------------+--------+--------------+---------+----------+
2. Select product and total_cost for orders < 500
df.filter("total_cost < 500").select('product', 'total_cost').show()
Why We Need This?
To focus only on the product name and cost after filtering.
Result
+----------+----------+
| product |total_cost|
+----------+----------+
| Soap | 150 |
| Shampoo | 300 |
| Bread | 200 |
|Toothpaste| 175 |
| Juice | 450 |
| Cereal | 350 |
| Facewash | 360 |
+----------+----------+
3. Select using list syntax
df.filter("total_cost < 500").select(['product', 'total_cost']).show()
Why We Need This?
An alternative way to select multiple columns (useful when passing dynamic column lists).
Result
(Same as above)
4. Use column object notation
df.filter(df['total_cost'] < 500).select(['product', 'category', 'quantity']).show()
Why We Need This?
To filter using DataFrame column objects instead of SQL strings.
Result
+----------+-------------+--------+
| product | category |quantity|
+----------+-------------+--------+
| Soap |Personal Care| 10 |
| Shampoo |Personal Care| 5 |
| Bread | Food | 20 |
|Toothpaste|Personal Care| 7 |
| Juice | Beverages | 15 |
| Cereal | Food | 10 |
| Facewash |Personal Care| 3 |
+----------+-------------+--------+
5. Filter on multiple conditions
df.filter((df['total_cost'] < 500) & (df['quantity'] > 10)).show()
Why We Need This?
To filter rows using multiple conditions (logical AND).
Result
+--------+-------+---------+--------+--------------+---------+----------+
|order_id|product|category |quantity|price_per_unit| status |total_cost|
+--------+-------+---------+--------+--------------+---------+----------+
| 1003 | Bread | Food | 20 | 10 |Delivered| 200 |
| 1007 | Juice |Beverages| 15 | 30 |Delivered| 450 |
+--------+-------+---------+--------+--------------+---------+----------+
6. Filter where status is 'Pending'
df.filter(df['status'] == 'Pending').show()
Why We Need This?
To check only the orders that are still pending.
Result
+--------+----------+-------------+--------+--------------+--------+----------+
|order_id| product | category |quantity|price_per_unit| status |total_cost|
+--------+----------+-------------+--------+--------------+--------+----------+
| 1004 |Toothpaste|Personal Care| 7 | 25 |Pending | 175 |
| 1006 |Chocolate | Food | 30 | 20 |Pending | 600 |
+--------+----------+-------------+--------+--------------+--------+----------+
7. Filter specific value (e.g. quantity == 10)
df.filter(df['quantity'] == 10).show()
Why We Need This?
To locate all orders where the quantity purchased equals 10.
Result
+--------+--------+-------------+--------+--------------+---------+----------+
|order_id| product| category |quantity|price_per_unit| status |total_cost|
+--------+--------+-------------+--------+--------------+---------+----------+
| 1001 | Soap |Personal Care| 10 | 15 |Delivered| 150 |
| 1008 | Cereal | Food | 10 | 35 |Delivered| 350 |
+--------+--------+-------------+--------+--------------+---------+----------+
8. Access Row Data Using .collect() and .asDict()
result = df.filter(df['order_id'] == 1003).collect()
row = result[0]
row.asDict()
Why We Need This?
To bring specific row data into Python memory for further manipulation.
Result
{'order_id': 1003, 'product': 'Bread', 'category': 'Food', 'quantity': 20,
'price_per_unit': 10, 'status': 'Delivered', 'total_cost': 200}
row.asDict()['product']
Why We Need This?
To extract a single field (like "product") from the row dictionary.
Result
'Bread'
1-Minute Summary — Data Filtering in PySpark
Code | What it Does |
---|---|
df.filter("total_cost < 500").show() | Filters rows where total cost is less than 500 |
df.filter("total_cost < 500").select('product', 'total_cost').show() | Shows selected columns after filtering on total cost |
df.filter(df['total_cost'] < 500).select([...]).show() | Uses column objects to filter and select multiple fields |
df.filter((df['total_cost'] < 500) & (df['quantity'] > 10)).show() | Filters using multiple conditions (AND logic) |
df.filter(df['status'] == 'Pending').show() | Filters orders with status = 'Pending' |
df.filter(df['quantity'] == 10).show() | Filters rows where quantity is exactly 10 |
df.filter(df['order_id'] == 1003).collect() | Collects matching rows into a Python list |
row = result[0]; row.asDict() | Converts Row object to dictionary |
row.asDict()['product'] | Accesses individual field from the row dictionary ('Bread' ) |