Mastering PySpark: A Comprehensive Guide to Distributed Data Processing
Learn PySpark from basics to advanced concepts. This comprehensive guide covers DataFrames, transformations, actions, optimizations, and real-world use cases for big data processing.
Apache Spark is a unified analytics engine for large-scale data processing, and PySpark is its Python API. With PySpark, data engineers and scientists can leverage the power of distributed computing using familiar Python syntax.
What is PySpark?
PySpark is the Python API for Apache Spark, allowing Python developers to harness the power of Spark's distributed computing framework.
- Speed: In-memory processing makes Spark up to 100x faster
- Ease of Use: High-level APIs in Python
- Scalability: Scales from single machines to thousands of nodes
# Install PySpark
pip install pyspark
# Or with specific version
pip install pyspark==3.5.0
Create a SparkSession to start programming:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MySparkApp") \
.master("local[*]") \
.getOrCreate()
print(f"Spark Version: {spark.version}")
DataFrames are distributed collections of data organized into named columns.
# Create DataFrame from list
data = [("Alice", 34), ("Bob", 45), ("Charlie", 28)]
df = spark.createDataFrame(data, ["name", "age"])
# Read from files
df_csv = spark.read.csv("data.csv", header=True)
df_json = spark.read.json("data.json")
df_parquet = spark.read.parquet("data.parquet")
df.show()
# Common transformations
df_filtered = df.filter(df.age > 30)
df_selected = df.select("name")
df_grouped = df.groupBy("department").count()
df_sorted = df.orderBy("age", ascending=False)
# Add columns
from pyspark.sql.functions import col, when
df_category = df.withColumn(
"category",
when(col("age") < 30, "Junior")
.otherwise("Senior")
)
# Common actions
df.show() # Display rows
df.count() # Count rows
df.collect() # Return all rows
df.first() # First row
df.describe().show() # Statistics
# Write operations
df.write.parquet("output.parquet")
df.write.csv("output.csv")
# Register as temp view
df.createOrReplaceTempView("employees")
# Run SQL
result = spark.sql("""
SELECT department, AVG(age) as avg_age
FROM employees
GROUP BY department
""")
result.show()
Optimization Tips
- Cache DataFrames used multiple times
- Partition data properly
- Broadcast small DataFrames in joins
- Filter early in the pipeline
# Caching
df.cache()
# Broadcasting
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Repartitioning
df.repartition(10)
# Check query plan
df.explain()
from pyspark.sql.functions import col, to_date, current_timestamp
# Extract
transactions = spark.read.parquet("s3://data/transactions/")
customers = spark.read.json("s3://data/customers/")
# Transform
clean_txn = transactions \
.filter(col("amount") > 0) \
.withColumn("date", to_date(col("timestamp")))
# Join and aggregate
result = clean_txn.join(customers, "customer_id") \
.groupBy("segment") \
.agg({"amount": "sum"}) \
.withColumn("processed_at", current_timestamp())
# Load
result.write \
.format("delta") \
.mode("overwrite") \
.save("s3://data/analytics/")
- Use DataFrame API over RDD
- Avoid collect() on large datasets
- Handle null values explicitly
- Use Parquet or Delta format
- Monitor jobs with Spark UI
- Implement proper error handling
PySpark combines Python's ease of use with Spark's distributed power. This guide covered fundamentals - continue exploring streaming, MLlib, and GraphX for advanced use cases.
Next Steps
- Build a real ETL pipeline
- Explore Structured Streaming
- Learn MLlib for ML at scale
- Try Delta Lake for ACID transactions