← Terug naar Downloads DataPartner365 — datapartner365.nl
DataPartner365 · Handleiding · 2026

Data Pipelines Bouwen

Van Python scripts tot enterprise-grade orchestratie: een praktische handleiding voor het bouwen van robuuste, herbruikbare en observeerbare data pipelines.

Python Airflow Azure Data Factory Databricks Gratis
1

Wat is een data pipeline & tooling kiezen

Definitie, vergelijking tools, wanneer wat gebruiken

Een data pipeline is een geautomatiseerd proces dat data verplaatst van bron naar bestemming, inclusief validatie, transformatie en foutafhandeling. Het verschil met een losse script: een pipeline is herhaalbaar, observeerbaar en herstelbaar bij fouten.

ToolTypeSterk inWanneer kiezen
Python + SQLAlchemyScriptFlexibiliteit, eenvoudKlein team, eenvoudige pipelines
Azure Data FactoryNo-code/low-codeAzure integratie, GUINiet-technische teams, Azure-first
Apache AirflowOrchestratorComplexe afhankelijkheden, PythonGrote teams, veel pipelines
Databricks DLTDeclaratiefStreaming + batch, DeltaLakehouse architectuur
dbtTransformatieSQL transformaties, testsTransformatielaag in DWH
ℹ️ Voor de meeste organisaties: begin met Python voor eenvoudige pipelines, voeg Airflow toe zodra je meer dan 5-10 pipelines hebt die van elkaar afhangen. Azure Data Factory is een goede keuze als het team geen Python wil schrijven.
2

Eenvoudige Python pipeline

requests + pandas + SQLAlchemy, API naar database

Een Python pipeline die data ophaalt van een REST API en laadt in een database is een uitstekend startpunt en leert je de fundamenten van pipeline design.

python — requirements.txt
>requests==2.31.0
pandas==2.2.0
sqlalchemy==2.0.27
psycopg2-binary==2.9.9
python-dotenv==1.0.0
python — pipeline.py
>import os
import logging
from datetime import datetime, timezone
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

load_dotenv()
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)


def extract(api_url: str, api_key: str) -> list[dict]:
    """Haal data op van REST API met paginering."""
    records = []
    page = 1
    headers = {"Authorization": f"Bearer {api_key}"}

    while True:
        response = requests.get(
            api_url,
            headers=headers,
            params={"page": page, "per_page": 100},
            timeout=30
        )
        response.raise_for_status()   # gooit HTTPError bij 4xx/5xx

        data = response.json()
        if not data.get("items"):
            break

        records.extend(data["items"])
        log.info(f"Pagina {page}: {len(data['items'])} records opgehaald")
        page += 1

    log.info(f"Extract klaar: {len(records)} records totaal")
    return records


def transform(records: list[dict]) -> pd.DataFrame:
    """Valideer en normaliseer de ruwe API data."""
    df = pd.DataFrame(records)

    # Type casting
    df["order_date"] = pd.to_datetime(df["order_date"], utc=True)
    df["amount"]     = pd.to_numeric(df["amount"], errors="coerce")

    # Validatie
    invalid = df[df["amount"].isna() | (df["amount"] < 0)]
    if not invalid.empty:
        log.warning(f"{len(invalid)} ongeldige records weggegooid")
    df = df[df["amount"] >= 0]

    # Metadata toevoegen
    df["_loaded_at"] = datetime.now(timezone.utc)

    return df[["order_id", "customer_id", "order_date",
               "amount", "status", "_loaded_at"]]


def load(df: pd.DataFrame, engine, table: str) -> None:
    """Laad data via upsert in de database."""
    with engine.begin() as conn:
        # Tijdelijke staging tabel
        df.to_sql(f"{table}_staging", conn,
                  if_exists="replace", index=False)

        # Upsert: insert + update bij conflict
        conn.execute(text(f"""
            INSERT INTO {table} (order_id, customer_id, order_date,
                                  amount, status, _loaded_at)
            SELECT order_id, customer_id, order_date,
                   amount, status, _loaded_at
            FROM {table}_staging
            ON CONFLICT (order_id) DO UPDATE SET
                status     = EXCLUDED.status,
                _loaded_at = EXCLUDED._loaded_at
        """))

    log.info(f"Load klaar: {len(df)} rijen in '{table}'")


if __name__ == "__main__":
    engine = create_engine(os.getenv("DATABASE_URL"))
    raw    = extract(os.getenv("API_URL"), os.getenv("API_KEY"))
    clean  = transform(raw)
    load(clean, engine, "bronze.orders")
3

Azure Data Factory pipeline

Copy Activity, Linked Services, parameters, triggers

Azure Data Factory (ADF) is Microsoft's cloud ETL service met een grafische interface. Pipelines worden opgeslagen als JSON en kunnen worden beheerd via Git.

Linked Service configureren (REST API bron)

json — linked service definitie
>{
  "name": "ls_rest_api_orders",
  "type": "RestService",
  "properties": {
    "url": "https://api.example.com",
    "enableServerCertificateValidation": true,
    "authenticationType": "Anonymous",
    "authHeaders": {
      "Authorization": {
        "type": "AzureKeyVaultSecret",
        "store": { "referenceName": "ls_keyvault" },
        "secretName": "api-token"
      }
    }
  }
}

Pipeline JSON (Copy Activity)

json — pipeline met parameters
>{
  "name": "pl_ingest_orders",
  "parameters": {
    "start_date": { "type": "string" },
    "end_date":   { "type": "string" }
  },
  "activities": [
    {
      "name": "copy_orders_to_adls",
      "type": "Copy",
      "inputs": [{
        "referenceName": "ds_rest_orders",
        "parameters": {
          "start": "@pipeline().parameters.start_date",
          "end":   "@pipeline().parameters.end_date"
        }
      }],
      "outputs": [{ "referenceName": "ds_adls_bronze_orders" }],
      "typeProperties": {
        "source": { "type": "RestSource" },
        "sink":   {
          "type": "ParquetSink",
          "storeSettings": { "type": "AzureBlobFSWriteSettings" }
        }
      }
    }
  ]
}
Sla gevoelige verbindingsgegevens (wachtwoorden, API keys) altijd op in Azure Key Vault. Gebruik Managed Identity voor ADF om zonder opgeslagen credentials te authenticeren bij Azure resources.
4

Airflow DAG schrijven

Python Operator, Sensors, TaskFlow API, scheduling

Apache Airflow is de meest gebruikte open-source orchestrationtool. DAGs (Directed Acyclic Graphs) definiëren de taken en hun afhankelijkheden.

python — airflow dag (taskflow api)
>from __future__ import annotations
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.postgres.hooks.postgres import PostgresHook


default_args = {
    "owner":            "data-team",
    "retries":          3,
    "retry_delay":      timedelta(minutes=5),
    "email_on_failure": True,
    "email":            ["alerts@bedrijf.nl"],
}


@dag(
    dag_id="ingest_orders_daily",
    schedule="0 6 * * *",         # elke dag om 06:00 UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=default_args,
    tags=["bronze", "orders"],
)
def ingest_orders_daily():

    # Sensor: wacht tot API beschikbaar is
    api_ready = HttpSensor(
        task_id="wait_for_api",
        http_conn_id="orders_api",
        endpoint="/health",
        poke_interval=60,
        timeout=600,
    )

    @task()
    def extract(execution_date=None) -> list[dict]:
        import requests
        date_str = execution_date.strftime("%Y-%m-%d")
        response = requests.get(
            f"https://api.example.com/orders?date={date_str}",
            timeout=30
        )
        response.raise_for_status()
        return response.json()["items"]

    @task()
    def load_to_postgres(records: list[dict]) -> int:
        hook = PostgresHook(postgres_conn_id="postgres_dwh")
        with hook.get_conn() as conn, conn.cursor() as cur:
            cur.executemany(
                """INSERT INTO bronze.orders
                   (order_id, customer_id, amount, status, order_date)
                   VALUES (%(order_id)s, %(customer_id)s,
                           %(amount)s, %(status)s, %(order_date)s)
                   ON CONFLICT (order_id) DO NOTHING""",
                records
            )
        return len(records)

    # Definieer afhankelijkheden
    raw_data   = extract()
    row_count  = load_to_postgres(raw_data)
    api_ready >> raw_data


ingest_orders_daily()
ℹ️ Gebruik Astro CLI (Astronomer) of de officiële apache-airflow Docker Compose setup voor lokaal ontwikkelen. In productie: gebruik AWS MWAA, Google Cloud Composer of Astronomer voor managed Airflow.
5

Databricks Delta Live Tables

Declaratieve pipelines, streaming + batch, expectations

Delta Live Tables (DLT) is Databricks' declaratieve pipeline framework. Je beschrijft wat de data moet zijn, niet hoe het berekend moet worden.

python — delta live tables pipeline
>import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


# Bronze: ruwe data inlezen
@dlt.table(
    name="bronze_orders",
    comment="Ruwe orders vanuit ADLS landing zone",
    table_properties={"pipelines.autoOptimize.managed": "true"}
)
def bronze_orders():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load("abfss://landing@storage.dfs.core.windows.net/orders/")
    )


# Silver: validatie met DLT Expectations
@dlt.expect_or_drop("order_id niet null", "order_id IS NOT NULL")
@dlt.expect_or_drop("positief bedrag",   "amount > 0")
@dlt.expect("geldige status",
           "status IN ('open', 'closed', 'pending')")
@dlt.table(name="silver_orders")
def silver_orders():
    return (
        dlt.read_stream("bronze_orders")
          .select(
            F.col("order_id"),
            F.col("customer_id"),
            F.to_date("order_date").alias("order_date"),
            F.col("amount").cast("double"),
            F.lower("status").alias("status"),
            F.current_timestamp().alias("_processed_at")
          )
    )
6

Error handling & Retry logica

Exceptions, exponential backoff, dead letter queues

Pipelines falen. Netwerk timeouts, API rate limits, schema wijzigingen — allemaal voorspelbare problemen waarvoor je van tevoren een strategie moet hebben.

python — retry met exponential backoff
>import time
import logging
from functools import wraps

log = logging.getLogger(__name__)


def retry(max_attempts: int = 3, backoff_seconds: float = 2.0,
         exceptions: tuple = (Exception,)):
    """Decorator voor automatische retry met exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        log.error(f"Alle {max_attempts} pogingen mislukt: {e}")
                        raise
                    wait = backoff_seconds ** attempt
                    log.warning(
                        f"Poging {attempt} mislukt ({e}). "
                        f"Wachten {wait:.0f}s voor volgende poging..."
                    )
                    time.sleep(wait)
        return wrapper
    return decorator


# Gebruik:
@retry(max_attempts=3, backoff_seconds=2.0,
       exceptions=(requests.Timeout, requests.ConnectionError))
def fetch_page(url: str, page: int) -> dict:
    response = requests.get(url, params={"page": page}, timeout=30)
    response.raise_for_status()
    return response.json()
Gebruik de tenacity library voor geavanceerde retry logica met jitter (willekeurige vertraging) om thundering herd problemen te voorkomen: pip install tenacity
7

Idempotentie & Checkpointing

Veilig herstarten, watermarks, state management

Een idempotente pipeline geeft hetzelfde resultaat ongeacht hoe vaak je hem uitvoert. Dit maakt veilig herstarten na fouten mogelijk.

Idempotentie patronen

  • Upsert (MERGE): INSERT + UPDATE bij conflict — geen duplicaten mogelijk
  • Truncate + Insert: Verwijder alles, herlaad. Eenvoudig maar traag voor grote tabellen
  • Partitie-overwrite: Vervang alleen de betreffende tijdspartitie
  • Watermark: Sla de laatste verwerkte waarde op, begin altijd daar
python — checkpoint/watermark patroon
>import json
from pathlib import Path

class Checkpoint:
    """Sla pipeline voortgang op in een JSON bestand."""

    def __init__(self, path: str):
        self.path = Path(path)

    def get(self, key: str, default=None):
        if not self.path.exists():
            return default
        data = json.loads(self.path.read_text())
        return data.get(key, default)

    def set(self, key: str, value) -> None:
        data = json.loads(self.path.read_text()) if self.path.exists() else {}
        data[key] = value
        self.path.write_text(json.dumps(data, default=str))


# Gebruik in pipeline:
checkpoint = Checkpoint("pipeline_state.json")
last_loaded = checkpoint.get("last_order_id", default=0)

new_records = fetch_orders_since(last_loaded)
load_to_db(new_records)

# Sla voortgang op na succesvolle load
if new_records:
    checkpoint.set("last_order_id", new_records[-1]["order_id"])
8

Monitoring & Alerting

Logging, metrics, Slack alerts, SLA tracking

Observeerbaarheid is de sleutel tot betrouwbare pipelines. Je moet altijd weten: wat is de laatste succesvolle run, hoeveel records zijn verwerkt, en wat gaat er mis?

python — slack alert bij fout
>import requests
import os


def send_slack_alert(pipeline_name: str, error: str) -> None:
    """Stuur een Slack bericht bij pipeline fout."""
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")
    payload = {
        "blocks": [
            {
                "type": "header",
                "text": {"type": "plain_text",
                          "text": f"🚨 Pipeline fout: {pipeline_name}"}
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn",
                          "text": f"*Fout:* `{error}`\n*Tijd:* {datetime.now()}"}
            }
        ]
    }
    requests.post(webhook_url, json=payload, timeout=10)


# In je pipeline hoofd-loop:
try:
    run_pipeline()
except Exception as e:
    send_slack_alert("ingest_orders", str(e))
    raise
ℹ️ Gebruik OpenTelemetry voor gestandaardiseerde metrics en traces. In Airflow: gebruik de ingebouwde SLA Miss callbacks. In Azure Monitor: configureer alerts op ADF pipeline failures via Azure Monitor alerts.
9

Checklist

Verplichte controles voor productie deployment
Robuustheid
  • Retry logica met exponential backoff geïmplementeerd
  • Timeouts ingesteld op alle externe aanroepen
  • Foutmeldingen zijn informatief en bevatten context
  • Dead letter opslag voor afgewezen records
Idempotentie
  • Dubbel uitvoeren geeft hetzelfde resultaat
  • Upsert of partitie-overwrite gebruikt (geen pure INSERT)
  • Checkpointing/watermark geïmplementeerd
  • Pipeline handmatig herstarten getest na gesimuleerde fout
Observeerbaarheid
  • Structured logging actief (JSON format)
  • Verwerkt recordaantal gelogd per run
  • Alerting geconfigureerd bij fout
  • SLA monitoring actief (maximale vertraging)
Beveiliging
  • Secrets in Key Vault / environment variables (niet in code)
  • Minimale permissies voor service accounts
  • Verbindingen lopen via VNet of Private Endpoint