Architecture Spark, DataFrames, Spark SQL, partitionnement, broadcast joins, tuning et AQE.
Apache Spark (UC Berkeley AMPLab 2009) traite des téraoctets sur cluster en mémoire, 100× plus rapidement qu'Hadoop MapReduce. Aujourd'hui standard de facto pour le batch et le streaming distribués. APIs Scala (natif), Python (PySpark), Java, R, SQL.
Zaharia M. et al., « Spark: Cluster Computing with Working Sets », HotCloud 2010, et « Resilient Distributed Datasets », NSDI 2012.
Source officielle : spark.apache.org/docs/latest/
Les transformations (map, filter, join) sont lazy — Spark construit un DAG. Une action (count, show, collect, write) déclenche l'exécution. Permet à Catalyst d'optimiser globalement le plan.
| API | Année | Type | Avantage | Usage 2026 |
|---|---|---|---|---|
| RDD | 2010 | Bas niveau, objet typé JVM | Contrôle fin, custom partitioning | Legacy, ML interne uniquement |
| DataFrame | 2015 | Schéma typé, optimisé Catalyst | SQL-like, 10× plus rapide que RDD | Standard moderne |
| Dataset | 2016 | Scala/Java typé compile-time | Type-safety | Scala uniquement (pas en PySpark) |
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, when, broadcast, current_date
spark = SparkSession.builder.appName("ETL").getOrCreate()
# Lecture
df = spark.read.format("parquet").load("s3://lake/events/")
# ou
df = spark.read.csv("s3://lake/data.csv", header=True, inferSchema=True)
# Sélection et filtrage
df2 = df.select("user_id", "event", col("amount").alias("price")).filter(col("amount") > 0)
# Groupby + agrégation
df_agg = (df.groupBy("country")
.agg(count("*").alias("n"), sum("amount").alias("total"))
.orderBy(col("total").desc()))
# Window function
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy(col("ts").desc())
df_last = df.withColumn("rank", row_number().over(w)).filter("rank == 1")
# Écriture partitionnée
df_agg.write.mode("overwrite").partitionBy("country").format("delta").save("s3://lake/agg/")
Pour les transformations complexes, Spark SQL est souvent plus lisible que l'API DataFrame :
df.createOrReplaceTempView("events")
result = spark.sql("""
WITH purchases AS (
SELECT user_id, SUM(amount) AS total_spent
FROM events
WHERE event_type = 'purchase' AND ts >= '2026-01-01'
GROUP BY user_id
)
SELECT u.country, COUNT(DISTINCT p.user_id) AS buyers, AVG(p.total_spent) AS avg_spent
FROM purchases p
JOIN users u USING (user_id)
GROUP BY u.country
ORDER BY buyers DESC
""")
Un DataFrame est divisé en n_partitions traitées en parallèle. Trop peu = sous-utilisation des executors. Trop = overhead. Bonne règle : 2-4 × nombre de cœurs disponibles.
df.rdd.getNumPartitions()
df = df.repartition(200) # shuffle vers 200 partitions
df = df.coalesce(50) # réduit sans shuffle (subset des partitions)
Lors de l'écriture, partitionBy("country", "date") crée une arborescence de dossiers. Permet du partition pruning à la lecture (skip de partitions non pertinentes).
Distribue les lignes dans N buckets via hash(colonne). Évite le shuffle lors d'un join si les deux tables sont bucketées sur la même colonne avec le même N.
| Stratégie join | Quand l'utiliser | Spark hint |
|---|---|---|
| Sort-merge join | Deux grandes tables | Défaut (sort + merge) |
| Broadcast hash join | Une table < 10 MB-100 MB | broadcast(small_df) |
| Shuffle hash join | Rare, désactivé par défaut | spark.sql.join.preferSortMergeJoin=false |
| BNLJ (Broadcast Nested Loop) | Cross join, fallback | Automatique |
from pyspark.sql.functions import broadcast
dim_countries = spark.read.parquet("s3://lake/dim_countries/") # 250 lignes
events_with_country = events.join(broadcast(dim_countries), "country_code", "left")
# La petite table est sérialisée et envoyée à chaque executor - PAS de shuffle de events
Depuis Spark 3.0 (juin 2020), AQE ré-optimise le plan en cours d'exécution avec les statistiques réelles :
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
df.explain(True) # plan logique + physique
df.explain("formatted") # plus lisible
Chercher dans le plan :
Exchange = shuffle coûteux. Minimiser leur nombre.BroadcastExchange = broadcast (OK pour petite table)SortMergeJoin vs BroadcastHashJoinReuseExchange = bonne réutilisationFileScan avec PartitionFilters = partition pruning actifstream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.load())
agg = (stream
.selectExpr("CAST(value AS STRING) as json")
.select(from_json("json", schema).alias("d")).select("d.*")
.withWatermark("event_ts", "10 minutes")
.groupBy(window("event_ts", "5 minutes"), "country")
.count())
query = (agg.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/events_count")
.start("s3://lake/gold/events_window/"))
spark.sql.shuffle.partitions = 2-4 × cœurs (défaut 200, souvent trop)df.cache()collect() sur gros DataFrame (OOM driver)Inscrivez-vous pour accéder aux 5 autres leçons + le quiz final.
Créer mon compteChoisis quels cookies tu acceptes — modifiable à tout moment.