Basic DataFrame Operations-part 2
Defining a Custom Schema
This section demonstrates how to manually define a schema for your DataFrame instead of letting Spark infer it automatically.
Import Required Types
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
Meaning
Code | What it Does | Why We Need This? |
---|---|---|
from pyspark.sql.types | Imports necessary types to define a custom schema manually | Without this, Spark can only infer schemas, and we lose control over data quality |
StructField() | Represents a column in the schema with its name, data type, and nullability | Ensures every column has strict rules (type, nullability) |
StringType(), IntegerType() | Define the data types for the fields (columns) | Guarantees columns are interpreted correctly (e.g., age as number, not string) |
StructType() | Combines multiple StructField s into a full schema | Used to build the final schema object for the DataFrame |
Combine Fields into a Schema
data_schema = [
StructField('age', IntegerType(), True),
StructField('name', StringType(), True)
]
Meaning
Code | What it Does | Why We Need This? |
---|---|---|
StructField('age', IntegerType(), True) | Defines a column named age with IntegerType, allowing null values | Ensures the age column is always treated as numeric data |
StructField('name', StringType(), True) | Defines a column named name with StringType, allowing null values | Ensures names are stored as text and not misinterpreted |
data_schema | A list of StructFields representing the schema structure | Collects all field definitions before building full schema |
final_struc = StructType(fields=data_schema)
Meaning
Code | What it Does | Why We Need This? |
---|---|---|
StructType(fields=data_schema) | Combines all StructFields into a full schema object (final_struc ) | Final schema object required to create the DataFrame |
data = [
(30, "Alice"),
(25, "Bob"),
(35, "Charlie")
]
Meaning
Code | What it Does | Why We Need This? |
---|---|---|
data | Raw in-memory data as list of tuples | This is the input dataset we want to convert to Spark |
Create DataFrame with Custom Schema
df = spark.createDataFrame(data=data, schema=final_struc)
Meaning
Code | What it Does | Why We Need This? |
---|---|---|
spark.createDataFrame(..., schema=final_struc) | Creates DataFrame from raw data using the defined schema | To apply schema rules instead of Spark’s default type inference |
printSchema()
df.printSchema()
Meaning
Code | What it Does | Why We Need This? |
---|---|---|
df.printSchema() | Displays the structure of the DataFrame using the custom schema | Helps verify that schema is applied correctly |
Result root |-- age: integer (nullable = true) |-- name: string (nullable = true)
df.show()
Why We Need This?
To quickly preview the rows in a tabular format for validation.
Result
+---+-------+
|age| name|
+---+-------+
| 30| Alice|
| 25| Bob|
| 35|Charlie|
+---+-------+
Accessing a Column (Returns Column Object)
type(df['age'])
Meaning Returns the type of the column object (not the actual data)
Why We Need This?
Useful to confirm that Spark treats df['age'] as a column object, not raw values.
Result
<class 'pyspark.sql.column.Column'>
Selecting a Single Column and Displaying It
df.select('age').show()
Result
+---+
|age|
+---+
| 30|
| 25|
| 35|
+---+
Why We Need This?
To extract and view only one column when analyzing or debugging.
Selecting Multiple Columns (2 Ways)
df.select('age', 'name').show()
# or
df.select(['age', 'name']).show()
Result
+---+-------+
|age| name|
+---+-------+
| 30| Alice|
| 25| Bob|
| 35|Charlie|
+---+-------+
Why We Need This?
To limit the output to only the required columns.
Getting Top N Rows
df.head(2)
Meaning
Returns the first 2 rows as a list of Row objects.
Why We Need This?
Helpful for sampling small portions of data for quick checks.
Result
[Row(age=30, name='Alice'), Row(age=25, name='Bob')]
Adding a New Column with Expression
df.withColumn('double_age', df['age'] * 2).show()
Why We Need This?
To create derived values dynamically without modifying original data.
Result
+---+-------+-----------+
|age| name|double_age|
+---+-------+-----------+
| 30| Alice| 60|
| 25| Bob| 50|
| 35|Charlie| 70|
+---+-------+-----------+
Renaming a Column
df.withColumnRenamed('age', 'new_age_renamed').show()
Why We Need This?
To make column names more descriptive or match reporting needs.
Result
+---------------+-------+
|new_age_renamed| name|
+---------------+-------+
| 30| Alice|
| 25| Bob|
| 35|Charlie|
+---------------+-------+
Using SQL with Temporary Views
Create a temporary SQL view from the DataFrame:
df.createOrReplaceTempView('people')
Why We Need This?
It allows us to run SQL queries on DataFrames directly.
SQL Query: Select All
results = spark.sql("SELECT * FROM people")
results.show()
Why We Need This?
To use familiar SQL syntax for exploring and analyzing data.
Result
+---+-------+
|age| name|
+---+-------+
| 30| Alice|
| 25| Bob|
| 35|Charlie|
+---+-------+
SQL Query with WHERE Clause
results = spark.sql("SELECT age FROM people WHERE name = 'Andy'")
results.show()
Why We Need This?
To filter and query data using SQL-style conditions.
Result
+---+
|age|
+---+
| |
+---+
In this example, no results are returned because "Andy" is not in the current dataset.
🔑 1-Minute Summary
Code | What it Does |
---|---|
type(df['age']) | Returns the type of the column object (pyspark.sql.column.Column ) |
df.select('age').show() | Displays only the age column |
df.select('age', 'name').show() | Displays multiple columns (age , name ) |
df.select(['age', 'name']).show() | Same as above — selects multiple columns using a list |
df.head(2) | Returns the first 2 rows as Row objects |
df.withColumn('double_age', df['age'] * 2).show() | Adds a new column double_age with values as age * 2 |
df.withColumnRenamed('age', 'new_age_renamed').show() | Renames the column age to new_age_renamed |
df.createOrReplaceTempView('people') | Registers the DataFrame as a temporary SQL view named people |
spark.sql("SELECT * FROM people") | Runs SQL to select all data from the people view |
spark.sql("SELECT age FROM people WHERE name = 'Andy'") | Selects the age of the person named "Andy" (returns empty in this case) |