DataPartner365

Jouw partner voor datagedreven groei en inzichten

Weerdata Ophalen en Verwerken met PySpark in Databricks: Een Volledig Praktijkvoorbeeld

In dit artikel bouwen we stap voor stap een complete data-pipeline die weerdata ophaalt via een REST API, verwerkt met PySpark in Databricks en opslaat in Delta Lake. Dit is een ideaal leerproject voor data engineers die willen werken met realistische API-data in een moderne cloudstack. We behandelen elke stap met echte code en uitleg.

Architectuuroverzicht

Weerdata pipeline architectuur

De pipeline volgt het klassieke medallion-patroon:

We gebruiken Databricks als uitvoeringsplatform, Azure Data Lake Storage Gen2 (ADLS) als opslaglaag en Delta Lake als tabelformaat. Alles is gebouwd in Python/PySpark.

Benodigdheden

Stap 1: Databricks Omgeving Instellen

Maak een cluster aan in Databricks met minimaal Databricks Runtime 12.x (inclusief Apache Spark 3.3+). Installeer de benodigde library voor HTTP-requests — die is standaard aanwezig in Databricks. Stel vervolgens de verbinding in met je ADLS-account via een service principal of access key. Gebruik altijd Databricks Secrets om gevoelige gegevens veilig op te slaan:

# Secrets ophalen uit Databricks Secret Scope
storage_account = dbutils.secrets.get(scope="weather-scope", key="storage-account-name")
storage_key = dbutils.secrets.get(scope="weather-scope", key="storage-account-key")
api_key = dbutils.secrets.get(scope="weather-scope", key="openweather-api-key")

# ADLS Gen2 configureren
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_key
)

# Basispad instellen
base_path = f"abfss://weather@{storage_account}.dfs.core.windows.net"

Stap 2: Weerdata Ophalen via de OpenWeatherMap API

We beginnen met een eenvoudige API-call voor één stad en breiden daarna uit naar meerdere steden. De OpenWeatherMap API heeft een gratis tier waarmee je 60 calls per minuut kunt doen — meer dan genoeg voor dit project.

import requests
import json
from datetime import datetime

def fetch_weather(city: str, api_key: str) -> dict:
    """Haal weerdata op voor een stad en voeg metadata toe."""
    url = (
        f"https://api.openweathermap.org/data/2.5/weather"
        f"?q={city}&appid={api_key}&units=metric&lang=nl"
    )
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    data = response.json()

    # Voeg ingest-timestamp toe voor traceerbaarheid
    data["_ingested_at"] = datetime.utcnow().isoformat()
    data["_source"] = "openweathermap"
    return data

# Lijst van Nederlandse en Belgische steden
steden = [
    "Amsterdam", "Rotterdam", "Utrecht", "Den Haag", "Eindhoven",
    "Groningen", "Maastricht", "Breda", "Nijmegen", "Tilburg"
]

# Data ophalen voor alle steden
weerdata = []
for stad in steden:
    try:
        data = fetch_weather(stad, api_key)
        weerdata.append(json.dumps(data))
        print(f"Opgehaald: {stad} - {data['main']['temp']}°C")
    except requests.RequestException as e:
        print(f"Fout bij {stad}: {e}")

print(f"\nTotaal opgehaald: {len(weerdata)} steden")

De functie haalt per stad een volledig JSON-object op met daarin temperatuur, luchtvochtigheid, windsnelheid, bewolkingsgraad en beschrijvingen. De velden _ingested_at en _source voegen we zelf toe voor data lineage — altijd een goede gewoonte.

Stap 3: Ruwe Data Opslaan in de Bronze Laag

De bronze laag slaat de data op in de originele vorm, zonder transformaties. Dit is het vangnet: als er later iets fout gaat in de transformaties, kun je altijd teruggrijpen op de ruwe data.

from pyspark.sql.types import StringType
from pyspark.sql.functions import current_timestamp, lit

# Maak een Spark DataFrame van de JSON-strings
rdd = sc.parallelize(weerdata)
df_bronze = spark.read.json(rdd)

# Voeg partitiekolommen toe voor efficiënte opslag
df_bronze = df_bronze.withColumn("ingest_date", current_timestamp().cast("date"))

# Schrijf naar Delta Lake (bronze laag)
bronze_path = f"{base_path}/bronze/weather"

df_bronze.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("ingest_date") \
    .save(bronze_path)

print(f"Bronze laag bijgewerkt: {df_bronze.count()} rijen geschreven")
df_bronze.printSchema()

Door te partitioneren op ingest_date houd je de data georganiseerd en maken toekomstige queries op specifieke datums gebruik van partition pruning — een grote prestatieverbetering bij grote datasets.

Stap 4: Transformaties in de Silver Laag

In de silver laag schonen we de data op: we selecteren relevante velden, geven ze begrijpelijke namen, converteren datatypes en voegen berekende kolommen toe.

from pyspark.sql.functions import (
    col, from_unixtime, round as spark_round,
    when, current_timestamp, to_timestamp
)

# Lees uit bronze
df_raw = spark.read.format("delta").load(bronze_path)

# Transformeer naar silver
df_silver = df_raw.select(
    col("id").alias("stad_id"),
    col("name").alias("stad"),
    col("sys.country").alias("land"),
    col("coord.lat").alias("breedtegraad"),
    col("coord.lon").alias("lengtegraad"),
    from_unixtime(col("dt")).cast("timestamp").alias("meting_tijdstip"),
    from_unixtime(col("sys.sunrise")).cast("timestamp").alias("zonsopgang"),
    from_unixtime(col("sys.sunset")).cast("timestamp").alias("zonsondergang"),
    spark_round(col("main.temp"), 1).alias("temperatuur_celsius"),
    spark_round(col("main.feels_like"), 1).alias("gevoelstemperatuur_celsius"),
    col("main.temp_min").alias("min_temperatuur"),
    col("main.temp_max").alias("max_temperatuur"),
    col("main.humidity").alias("luchtvochtigheid_pct"),
    col("main.pressure").alias("luchtdruk_hpa"),
    col("wind.speed").alias("windsnelheid_ms"),
    col("wind.deg").alias("windrichting_graden"),
    col("clouds.all").alias("bewolking_pct"),
    col("weather")[0]["description"].alias("weerbeschrijving"),
    col("weather")[0]["main"].alias("weercategorie"),
    col("visibility").alias("zichtbaarheid_meter"),
    col("_ingested_at").alias("ingest_tijdstip"),
    col("ingest_date")
).withColumn(
    "windkracht_beaufort",
    when(col("windsnelheid_ms") < 0.3, 0)
    .when(col("windsnelheid_ms") < 1.6, 1)
    .when(col("windsnelheid_ms") < 3.4, 2)
    .when(col("windsnelheid_ms") < 5.5, 3)
    .when(col("windsnelheid_ms") < 8.0, 4)
    .when(col("windsnelheid_ms") < 10.8, 5)
    .when(col("windsnelheid_ms") < 13.9, 6)
    .when(col("windsnelheid_ms") < 17.2, 7)
    .when(col("windsnelheid_ms") < 20.8, 8)
    .when(col("windsnelheid_ms") < 24.5, 9)
    .when(col("windsnelheid_ms") < 28.5, 10)
    .when(col("windsnelheid_ms") < 32.7, 11)
    .otherwise(12)
)

# Verwijder duplicaten op basis van stad en meetmoment
df_silver = df_silver.dropDuplicates(["stad_id", "meting_tijdstip"])

# Schrijf naar Delta Lake (silver laag) met merge-logica
silver_path = f"{base_path}/silver/weather"

df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("ingest_date") \
    .save(silver_path)

print(f"Silver laag bijgewerkt: {df_silver.count()} rijen")
df_silver.show(5)

De Beaufort-schaalberekening is een mooi voorbeeld van het toevoegen van domeinkennis in de silver laag. We voegen ook leesbaarheid toe door kolommen te hernoemen naar beschrijvende Nederlandse namen.

Stap 5: Aggregaties in de Gold Laag

De gold laag bevat rapportage-klare aggregaties. Hier creëren we twee tabellen: een dagelijks overzicht per stad en een vergelijkende tabel over alle steden heen.

from pyspark.sql.functions import (
    avg, min, max, count, stddev, date_trunc
)

# Lees silver data
df_silver = spark.read.format("delta").load(silver_path)

# Aggregatie 1: Dagelijks overzicht per stad
df_gold_dag = df_silver.groupBy(
    col("stad"),
    col("land"),
    col("ingest_date").alias("datum")
).agg(
    avg("temperatuur_celsius").alias("gem_temperatuur"),
    min("temperatuur_celsius").alias("min_temperatuur"),
    max("temperatuur_celsius").alias("max_temperatuur"),
    avg("luchtvochtigheid_pct").alias("gem_luchtvochtigheid"),
    avg("windsnelheid_ms").alias("gem_windsnelheid"),
    max("windsnelheid_ms").alias("max_windsnelheid"),
    avg("bewolking_pct").alias("gem_bewolking"),
    count("*").alias("aantal_metingen")
).withColumn("gem_temperatuur", spark_round(col("gem_temperatuur"), 1)) \
 .withColumn("gem_windsnelheid", spark_round(col("gem_windsnelheid"), 2))

gold_dag_path = f"{base_path}/gold/weather_daily"
df_gold_dag.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_dag_path)

# Aggregatie 2: Ranglijst warmste steden vandaag
df_ranglijst = df_silver.filter(
    col("ingest_date") == df_silver.agg(max("ingest_date")).collect()[0][0]
).groupBy("stad").agg(
    avg("temperatuur_celsius").alias("gem_temp_vandaag")
).orderBy(col("gem_temp_vandaag").desc())

print("Warmste steden vandaag:")
df_ranglijst.show()

gold_rank_path = f"{base_path}/gold/weather_ranking"
df_ranglijst.write.format("delta").mode("overwrite").save(gold_rank_path)

Stap 6: Delta Lake Tabelregistratie in Unity Catalog

Om de data beschikbaar te maken via SQL en voor Power BI, registreren we de Delta-bestanden als tabellen in de Unity Catalog of het Hive metastore:

# Maak een database/schema aan als die nog niet bestaat
spark.sql("CREATE DATABASE IF NOT EXISTS weather_db COMMENT 'Weerdata database'")

# Registreer de gold-tabel
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS weather_db.weather_daily
  USING DELTA
  LOCATION '{gold_dag_path}'
  COMMENT 'Dagelijkse weeraggregaties per stad'
""")

# Voeg tabelproperties toe voor governance
spark.sql("""
  ALTER TABLE weather_db.weather_daily
  SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true',
    'quality' = 'gold',
    'owner' = 'data-engineering-team'
  )
""")

# Controleer de tabel
spark.sql("DESCRIBE EXTENDED weather_db.weather_daily").show(50, truncate=False)

Stap 7: Automatisering met Databricks Jobs

Nu de pipeline werkt, willen we deze automatisch uitvoeren. In Databricks stel je een job in via de Jobs-interface of via de REST API:

# Voorbeeld job-configuratie (JSON voor Databricks REST API)
job_config = {
    "name": "Weerdata Pipeline - Uurlijks",
    "tasks": [
        {
            "task_key": "fetch_and_process_weather",
            "notebook_task": {
                "notebook_path": "/Shared/weather/pipeline_main",
                "base_parameters": {
                    "steden": "Amsterdam,Rotterdam,Utrecht,Den Haag,Eindhoven"
                }
            },
            "existing_cluster_id": "jouw-cluster-id",
            "timeout_seconds": 1800
        }
    ],
    "schedule": {
        "quartz_cron_expression": "0 0 * * * ?",  # Elk uur
        "timezone_id": "Europe/Amsterdam",
        "pause_status": "UNPAUSED"
    },
    "email_notifications": {
        "on_failure": ["data-team@bedrijf.nl"]
    }
}

Met deze job-configuratie draait de pipeline elk uur automatisch. Bij een fout ontvang je een e-mailnotificatie. In productie voeg je ook een retry-beleid toe en koppel je een alerting-systeem zoals PagerDuty of Slack.

Stap 8: Power BI Koppelen aan de Gold Laag

De laatste stap is het verbinden van Power BI met de gold-tabellen in Databricks. Dit doe je via de Databricks JDBC/ODBC-connector of via de ingebouwde Databricks-connector in Power BI Desktop:

  1. Open Power BI Desktop en kies "Gegevens ophalen" → "Azure" → "Azure Databricks"
  2. Voer de server-hostnaam in (te vinden in Databricks onder Cluster → JDBC/ODBC)
  3. Voer het HTTP-pad van het cluster in
  4. Kies tussen DirectQuery (altijd actueel) of Import (sneller, maar snapshot)
  5. Selecteer de tabel weather_db.weather_daily

In Power BI bouw je vervolgens een dashboard met:

Weerdata dashboard Power BI

Delta Lake Onderhoud en Optimalisatie

Na verloop van tijd groeien Delta-tabellen en ontstaan er veel kleine bestanden. Voer regelmatig onderhoud uit:

# Optimaliseer bestanden (small file problem oplossen)
spark.sql("OPTIMIZE weather_db.weather_daily ZORDER BY (stad, datum)")

# Verwijder oude versies (standaard 30 dagen history)
spark.sql("VACUUM weather_db.weather_daily RETAIN 168 HOURS")  # 7 dagen bewaren

# Bekijk tabelgeschiedenis (time travel)
spark.sql("DESCRIBE HISTORY weather_db.weather_daily").show(10, truncate=False)

# Herstel naar een eerdere versie indien nodig
spark.sql("""
  RESTORE TABLE weather_db.weather_daily
  TO VERSION AS OF 5
""")

Uitbreidingsideeën

Dit project is een uitstekende basis voor verdere uitbreidingen:

Conclusie

In dit artikel hebben we een complete data-pipeline gebouwd van API-call tot Power BI-dashboard. We hebben de OpenWeatherMap API aangesloten, data in drie lagen (bronze, silver, gold) opgeslagen in Delta Lake, de pipeline geautomatiseerd via Databricks Jobs en de data beschikbaar gemaakt voor rapportage. Dit patroon — een gelaagde medallion-architectuur met PySpark en Delta Lake — is precies hoe moderne data teams werken bij bedrijven als Coolblue, Booking.com en ING.

Het volledige project vind je op GitHub. Wil je meer weten over Delta Lake, Databricks of data engineering in het algemeen? Bekijk dan onze andere artikelen op DataPartner365.

Deel op LinkedIn
← Terug naar het blogoverzicht
Abdullah Özisik - AI Data Engineer

Over de auteur

Abdullah Özisik — Data Engineer met specialisatie in AI-integratie en MLOps. Expert in het bouwen van intelligente data pipelines die gebruik maken van machine learning en generative AI voor geautomatiseerde data processing en optimalisatie.

Wat is YAML? Complete Gids voor Configuration… Alle blogs