← Retour au cours
▶ Aperçu gratuit · Leçon offerte

Leçon 2 — Apache Spark : RDD, DataFrames, Spark SQL, PySpark et tuning

⏱ 60 min · 🎬 Lecon · 🏆 20 XP
🎬
Vidéo en production
Notre équipe pédagogique tourne actuellement cette leçon avec un·e formateur·rice expert·e. Le contenu textuel ci-dessous est complet et utilisable dès maintenant.

Leçon 2 — Apache Spark et PySpark

Architecture Spark, DataFrames, Spark SQL, partitionnement, broadcast joins, tuning et AQE.

Objectifs de la leçon

  • Comprendre l'architecture driver / executors / cluster manager de Spark
  • Maîtriser les DataFrames PySpark et Spark SQL (préférés aux RDD legacy)
  • Appliquer un partitionnement et un bucketing efficaces
  • Utiliser broadcast joins et optimiser les shuffles
  • Configurer AQE (Adaptive Query Execution) et lire les plans physiques

1. Pourquoi Spark ?

Apache Spark (UC Berkeley AMPLab 2009) traite des téraoctets sur cluster en mémoire, 100× plus rapidement qu'Hadoop MapReduce. Aujourd'hui standard de facto pour le batch et le streaming distribués. APIs Scala (natif), Python (PySpark), Java, R, SQL.

Zaharia M. et al., « Spark: Cluster Computing with Working Sets », HotCloud 2010, et « Resilient Distributed Datasets », NSDI 2012.
Source officielle : spark.apache.org/docs/latest/

2. Architecture Spark

  • Driver : programme principal qui crée le SparkContext et planifie les tâches
  • Cluster Manager : YARN, Kubernetes, Mesos, ou standalone
  • Executors : processus JVM sur les workers, exécutant les tâches et stockant les RDD/DataFrames en mémoire
  • Catalyst : optimiseur de requêtes (DataFrames/SQL)
  • Tungsten : moteur d'exécution natif optimisé CPU/mémoire

2.1 Lazy evaluation et DAG

Les transformations (map, filter, join) sont lazy — Spark construit un DAG. Une action (count, show, collect, write) déclenche l'exécution. Permet à Catalyst d'optimiser globalement le plan.

3. RDD vs DataFrame vs Dataset

APIAnnéeTypeAvantageUsage 2026
RDD2010Bas niveau, objet typé JVMContrôle fin, custom partitioningLegacy, ML interne uniquement
DataFrame2015Schéma typé, optimisé CatalystSQL-like, 10× plus rapide que RDDStandard moderne
Dataset2016Scala/Java typé compile-timeType-safetyScala uniquement (pas en PySpark)

4. DataFrames PySpark — opérations clés

Création et opérations de base

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, when, broadcast, current_date

spark = SparkSession.builder.appName("ETL").getOrCreate()

# Lecture
df = spark.read.format("parquet").load("s3://lake/events/")
# ou
df = spark.read.csv("s3://lake/data.csv", header=True, inferSchema=True)

# Sélection et filtrage
df2 = df.select("user_id", "event", col("amount").alias("price")).filter(col("amount") > 0)

# Groupby + agrégation
df_agg = (df.groupBy("country")
    .agg(count("*").alias("n"), sum("amount").alias("total"))
    .orderBy(col("total").desc()))

# Window function
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy(col("ts").desc())
df_last = df.withColumn("rank", row_number().over(w)).filter("rank == 1")

# Écriture partitionnée
df_agg.write.mode("overwrite").partitionBy("country").format("delta").save("s3://lake/agg/")

5. Spark SQL

Pour les transformations complexes, Spark SQL est souvent plus lisible que l'API DataFrame :

df.createOrReplaceTempView("events")

result = spark.sql("""
    WITH purchases AS (
      SELECT user_id, SUM(amount) AS total_spent
      FROM events
      WHERE event_type = 'purchase' AND ts >= '2026-01-01'
      GROUP BY user_id
    )
    SELECT u.country, COUNT(DISTINCT p.user_id) AS buyers, AVG(p.total_spent) AS avg_spent
    FROM purchases p
    JOIN users u USING (user_id)
    GROUP BY u.country
    ORDER BY buyers DESC
""")

6. Partitionnement et shuffles

6.1 Partitions

Un DataFrame est divisé en n_partitions traitées en parallèle. Trop peu = sous-utilisation des executors. Trop = overhead. Bonne règle : 2-4 × nombre de cœurs disponibles.

df.rdd.getNumPartitions()
df = df.repartition(200)              # shuffle vers 200 partitions
df = df.coalesce(50)                  # réduit sans shuffle (subset des partitions)

6.2 Partitionnement disque

Lors de l'écriture, partitionBy("country", "date") crée une arborescence de dossiers. Permet du partition pruning à la lecture (skip de partitions non pertinentes).

Anti-pattern : partitionner sur des colonnes à très haute cardinalité (user_id, transaction_id) crée des milliers de petits fichiers (~ kB chacun). Limite recommandée : 50-200 partitions distinctes.

6.3 Bucketing

Distribue les lignes dans N buckets via hash(colonne). Évite le shuffle lors d'un join si les deux tables sont bucketées sur la même colonne avec le même N.

7. Joins et broadcast

Stratégie joinQuand l'utiliserSpark hint
Sort-merge joinDeux grandes tablesDéfaut (sort + merge)
Broadcast hash joinUne table < 10 MB-100 MBbroadcast(small_df)
Shuffle hash joinRare, désactivé par défautspark.sql.join.preferSortMergeJoin=false
BNLJ (Broadcast Nested Loop)Cross join, fallbackAutomatique

Broadcast join explicite

from pyspark.sql.functions import broadcast

dim_countries = spark.read.parquet("s3://lake/dim_countries/")  # 250 lignes
events_with_country = events.join(broadcast(dim_countries), "country_code", "left")
# La petite table est sérialisée et envoyée à chaque executor - PAS de shuffle de events

8. AQE — Adaptive Query Execution

Depuis Spark 3.0 (juin 2020), AQE ré-optimise le plan en cours d'exécution avec les statistiques réelles :

  • Coalesce automatique des partitions de shuffle
  • Convert sort-merge en broadcast si la taille réelle est petite
  • Skew join handling (split des partitions skewed)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
AQE est ON par défaut depuis Spark 3.2. Toujours vérifier qu'il est activé en production - gains 2-5× sur des workloads avec skew.

9. Lecture des plans d'exécution

df.explain(True)        # plan logique + physique
df.explain("formatted") # plus lisible

Chercher dans le plan :

  • Exchange = shuffle coûteux. Minimiser leur nombre.
  • BroadcastExchange = broadcast (OK pour petite table)
  • SortMergeJoin vs BroadcastHashJoin
  • ReuseExchange = bonne réutilisation
  • FileScan avec PartitionFilters = partition pruning actif

10. Spark Structured Streaming

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "events")
    .load())

agg = (stream
    .selectExpr("CAST(value AS STRING) as json")
    .select(from_json("json", schema).alias("d")).select("d.*")
    .withWatermark("event_ts", "10 minutes")
    .groupBy(window("event_ts", "5 minutes"), "country")
    .count())

query = (agg.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/events_count")
    .start("s3://lake/gold/events_window/"))

11. Tuning checklist Spark

  • Activer AQE
  • Choisir spark.sql.shuffle.partitions = 2-4 × cœurs (défaut 200, souvent trop)
  • Préférer Parquet/Delta à JSON/CSV
  • Filtre + select AVANT le join (predicate pushdown)
  • Broadcast les dimensions < 100 MB
  • Cache uniquement si réutilisation multiple : df.cache()
  • Surveiller le Spark UI (Stages, SQL tab, Storage)
  • Éviter collect() sur gros DataFrame (OOM driver)

12. Synthèse et points-clés

  • Spark distribue le compute via driver + executors avec optimiseur Catalyst
  • DataFrames PySpark = standard 2026, RDD relégué au legacy
  • Lazy evaluation : optimisation globale par Catalyst
  • Partitionner par colonne low-cardinality, bucketing pour joins répétés
  • Broadcast join pour petites dimensions (< 100 MB)
  • AQE active depuis Spark 3.2 = gains automatiques skew + coalesce
  • Spark Structured Streaming pour les flux Kafka en quasi-temps réel

Pour aller plus loin

Continuez le parcours 🚀

Inscrivez-vous pour accéder aux 5 autres leçons + le quiz final.

Créer mon compte
🍪 Nous utilisons des cookies essentiels et, avec ton accord, des cookies analytiques. En savoir plus

⚙️ Préférences cookies

Choisis quels cookies tu acceptes — modifiable à tout moment.

🔐 Essentiels (obligatoires)Authentification, session, sécurité. Toujours actifs.
📊 Analytics anonymesMesure d'audience anonymisée — aucune donnée personnelle.
📣 MarketingPublicités ITAG pertinentes sur d'autres sites.
💬 Contactez-nous sur WhatsApp