dlt: Data Load Tool voor Moderne Pipelines

Gepubliceerd: 8 juni 2026
Leestijd: 12 minuten
Data Engineering

Ontdek hoe dlt (data load tool) je Python-pipelines vereenvoudigt met automatische schema-inferentie en ingebouwde normalisatie.

Wat is dlt en waarom is het relevant in 2026?

Data engineering heeft de afgelopen jaren een enorme evolutie doorgemaakt. Waar teams vroeger weken besteedden aan het bouwen van robuuste ingestion-pipelines, verwacht het moderne data-ecosysteem dat je binnen uren productie-waardige pipelines opzet. dlt — voluit data load tool — is een open-source Python-bibliotheek die precies dat belooft: snelle, betrouwbare en zelfbeschrijvende data-pipelines zonder de overhead van zware orchestratie-platformen.

Gelanceerd door de makers achter dltHub en inmiddels omarmd door duizenden data engineers wereldwijd, positioneert dlt zich als de "pip install voor data-pipelines". Je schrijft gewone Python, en dlt regelt schema-inferentie, type-coercion, incrementele loads, genormaliseerde tabellen en foutafhandeling voor je. In 2026 is dlt uitgegroeid tot een volwaardige keuze naast gevestigde namen zoals Airbyte, Fivetran en Apache Spark — met als grote voordeel dat het volledig code-first en gratis is.

Definitie: dlt (data load tool)

dlt is een open-source Python-framework (MIT-licentie) voor het bouwen van data-ingestion-pipelines. Het automatiseert schema-inferentie, versioning, normalisatie van geneste JSON-structuren en incrementeel laden naar diverse bestemmingen zoals BigQuery, Snowflake, DuckDB, PostgreSQL en meer. Het vereist geen externe services of UI — alles leeft in je Python-code.

Waarom nu bijzonder relevant? Drie trends maken dlt in 2026 onmisbaar:

Code-first cultuur

Data teams verhuizen van klikken-in-een-UI naar git-gebaseerde, testbare pipelines. dlt sluit perfect aan op deze DevOps-mentaliteit.

AI & LLM-integraties

dlt heeft ingebouwde connectors voor REST APIs en LLM-output, waardoor het ideaal is voor het inladen van AI-gegenereerde data en vector embeddings.

Kostenbeheersing

Geen licentiekosten, geen vendor lock-in. Voor scale-ups en MKB is dit een aantrekkelijk alternatief voor dure SaaS-oplossingen.

Hoe werkt dlt? Architectuur en kernconcepten

Om dlt goed te begrijpen, is het belangrijk de drie bouwstenen te kennen: sources, resources en destinations. Daartussen zorgt dlt voor alles wat normaal handmatig geprogrammeerd moet worden.

1

Source — Waar komt je data vandaan?

Een source is een logische groepering van data-resources. Denk aan een REST API, database, bestandssysteem of een custom Python-generator. dlt heeft een groeiende bibliotheek van kant-en-klare verified sources (GitHub, Salesforce, Stripe, HubSpot, Notion, etc.).

2

Resource — De data-stroom

Een resource is een generator of functie die data oplevert. dlt inspecteert automatisch het schema van elk record, detecteert datatypes, en beheert versioning als het schema wijzigt. Geneste JSON wordt automatisch genormaliseerd naar child-tabellen.

3

Pipeline — De verbinding

Een pipeline verbindt source met destination. Hier configureer je de destination (bijv. BigQuery), de write-disposition (replace, append, merge) en incrementele instellingen.

4

Destination — Waar gaat je data naartoe?

dlt ondersteunt BigQuery, Snowflake, Redshift, PostgreSQL, DuckDB, MotherDuck, Databricks, filesystem (Parquet/CSV) en meer. De destination-laag regelt DDL-migraties automatisch bij schema-wijzigingen.

5

Schema-inferentie & evolutie

dlt detecteert het schema op basis van de eerste dataruns, slaat het op als JSON-bestand, en past de destination-tabellen automatisch aan als er nieuwe kolommen verschijnen. Geen handmatige DDL meer nodig.

Concept dlt-term Analogie
Data-bron @dlt.source De "tap" op de waterpijp
Data-stroom @dlt.resource De "waterleiding"
Transformatie add_map / add_filter Filter in de leiding
Bestemming destination (bijv. bigquery) Het "reservoir"
Laadstrategie write_disposition Klep (vervanging vs. toevoeging)

Praktische codevoorbeelden

Laten we direct aan de slag gaan met concrete, werkende voorbeelden. We beginnen met de installatie en bouwen stap voor stap complexere pipelines.

Installatie

# Basisinstallatie
pip install dlt

# Met BigQuery support
pip install "dlt[bigquery]"

# Met DuckDB (lokaal, ideaal voor ontwikkeling)
pip install "dlt[duckdb]"

# Met meerdere destinations
pip install "dlt[bigquery,snowflake,postgres]"

Voorbeeld 1: Eenvoudige pipeline van een REST API naar DuckDB

import dlt
import requests

# Definieer een resource met de @dlt.resource decorator
@dlt.resource(write_disposition="replace")
def github_issues(repo: str = "dlt-hub/dlt"):
    """Haalt open issues op van GitHub."""
    url = f"https://api.github.com/repos/{repo}/issues"
    params = {"state": "open", "per_page": 100}
    
    while url:
        response = requests.get(url, params=params)
        response.raise_for_status()
        yield response.json()  # dlt verwerkt automatisch lijsten van dicts
        
        # Volgende pagina via Link header
        url = response.links.get("next", {}).get("url")
        params = {}  # Params zitten al in de next-URL

# Maak de pipeline aan — dlt beheert state en schema
pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="github_data"
)

# Voer de pipeline uit
load_info = pipeline.run(github_issues())
print(load_info)

# Schema bekijken
print(pipeline.last_trace.last_normalize_info)

Voorbeeld 2: Incrementeel laden met cursors

import dlt
from dlt.sources.helpers import requests
from datetime import datetime, timezone

@dlt.resource(
    write_disposition="append",
    primary_key="id"  # dlt gebruikt dit voor deduplicatie bij merge
)
def salesforce_opportunities(
    # dlt beheert automatisch de cursor-waarde tussen runs
    updated_at=dlt.sources.incremental(
        "LastModifiedDate",
        initial_value="2024-01-01T00:00:00Z"
    )
):
    """
    Haalt Salesforce opportunities op die zijn bijgewerkt 
    na de laatste run.
    """
    # updated_at.last_value bevat de hoogste waarde van de vorige run
    since = updated_at.last_value
    
    # Simulatie van Salesforce SOQL query
    records = [
        {
            "id": "006xx000001gJ1A",
            "name": "Acme Corp Deal",
            "amount": 50000,
            "stage": "Closed Won",
            "LastModifiedDate": "2025-03-15T10:30:00Z"
        },
        {
            "id": "006xx000001gJ2B", 
            "name": "Beta Inc Renewal",
            "amount": 12000,
            "stage": "Negotiation",
            "LastModifiedDate": "2025-03-16T08:00:00Z"
        }
    ]
    
    yield [r for r in records if r["LastModifiedDate"] > since]

pipeline = dlt.pipeline(
    pipeline_name="salesforce_pipeline",
    destination="bigquery",
    dataset_name="crm_data"
)

# Eerste run: laadt alles vanaf initial_value
# Volgende run: laadt alleen nieuwe/gewijzigde records
load_info = pipeline.run(salesforce_opportunities())
print(f"Geladen: {load_info}")

Voorbeeld 3: Geneste JSON automatisch genormaliseerd

import dlt

# dlt normaliseert geneste structuren automatisch
@dlt.resource(write_disposition="replace")
def orders_with_items():
    yield [
        {
            "order_id": 1001,
            "customer": "Jan de Vries",
            "order_date": "2025-03-01",
            "status": "shipped",
            # Geneste lijst → aparte child-tabel: orders__items
            "items": [
                {"sku": "PROD-A", "qty": 2, "price": 29.99},
                {"sku": "PROD-B", "qty": 1, "price": 49.99}
            ],
            # Geneste dict → kolommen met prefix: address__street etc.
            "shipping_address": {
                "street": "Keizersgracht 123",
                "city": "Amsterdam",
                "postal_code": "1015CJ"
            }
        }
    ]

pipeline = dlt.pipeline(
    pipeline_name="orders_pipeline",
    destination="duckdb",
    dataset_name="shop_data"
)

pipeline.run(orders_with_items())

# Resulterende tabellen:
# - shop_data.orders_with_items          (hoofd-tabel)
# - shop_data.orders_with_items__items   (child-tabel, automatisch gelinkt via _dlt_parent_id)
print("Tabellen aangemaakt:")
with pipeline.sql_client() as client:
    with client.execute_query("SHOW TABLES") as cursor:
        for row in cursor.fetchall():
            print(f"  → {row[0]}")

Voorbeeld 4: Secrets en configuratie beheren

# .dlt/secrets.toml (nooit committen naar git!)
[sources.api_key]
token = "mijn_geheime_api_key"

[destination.bigquery]
project_id = "mijn-gcp-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "service-account@project.iam.gserviceaccount.com"

# In je pipeline-code:
import dlt

@dlt.resource
def secure_api_data(
    api_key: str = dlt.secrets.value  # Automatisch geladen uit secrets.toml
):
    headers = {"Authorization": f"Bearer {api_key}"}
    # ... rest van de implementatie

Pro tip: gebruik DuckDB voor lokale ontwikkeling

Begin altijd met destination="duckdb" tijdens ontwikkeling. DuckDB heeft geen externe services nodig, is razendsnel, en je kunt de data direct inspecteren met pipeline.dataset(). Schakel daarna over naar BigQuery of Snowflake voor productie zonder één regel in je pipeline-code te wijzigen.

dlt versus alternatieven

dlt is niet de enige speler in de data-ingestion-ruimte. Hieronder vergelijken we de belangrijkste alternatieven op de meest relevante dimensies voor een moderne data engineer.

Criterium dlt Airbyte (OSS) Fivetran Singer/Meltano Spark (custom)
Kosten ✅ Gratis (MIT) ✅ Gratis (ELv2) ❌ Betaald per rij ✅ Gratis ✅ Gratis (infra apart)
Setup-complexiteit ✅ pip install ⚠️ Docker vereist ✅ Managed SaaS ⚠️ Matig ❌ Hoog
Code-first ✅ Pure Python ⚠️ YAML + Python ❌ UI-gebaseerd ✅ Python/YAML ✅ Scala/Python
Schema-inferentie ✅ Automatisch ✅ Automatisch ✅ Automatisch ⚠️ Handmatig ❌ Handmatig
Incremental loads ✅ Ingebouwd ✅ Ingebouwd ✅ Ingebouwd ⚠️ Wisselend ✅ Zelf bouwen
Geneste JSON ✅ Auto-normalisatie ✅ Auto-normalisatie ✅ Auto-normalisatie ❌ Handmatig ⚠️ Zelf bouwen
Custom connectors ✅ Gewone Python ⚠️ Connector SDK ❌ Beperkt/betaald ✅ Tap-protocol ✅ Volledig custom
Orchestratie nodig ⚠️ Extern (Airflow, Prefect) ✅ Ingebouwd ✅ Ingebouwd ⚠️ Extern ⚠️ Extern
Schaalgrootte ⚠️ Middelgroot ✅ Groot ✅ Enterprise ⚠️ Klein-middel ✅ Zeer groot

De kernboodschap: dlt schittert wanneer je snel, flexibel en code-first wilt werken zonder infrastructuurlasten. Voor enterprise-schaal met tientallen kant-en-klare connectoren en een managed service is Fivetran nog steeds een sterke keuze. Airbyte biedt een interessant middenweg met zijn eigen UI én een Python SDK.

Praktijkvoorbeeld: REST API naar BigQuery

Scenario: E-commerce platform — orders inladen

Een Nederlandse scale-up heeft een custom e-commerce platform met een REST API. Ze willen dagelijks alle nieuwe orders laden naar BigQuery voor analytics. Met dlt bouwen we dit in minder dan 50 regels code, inclusief incrementeel laden en automatische schema-evolutie.

import dlt
from dlt.sources.helpers import requests

BASE_URL = "https://api.shop.nl/v2"

@dlt.source(name="ecommerce")
def ecommerce_source(api_key: str = dlt.secrets.value):
    """Bundelt alle e-commerce resources."""
    return [
        orders(api_key),
        customers(api_key),
        products(api_key)
    ]

@dlt.resource(
    primary_key="order_id",
    write_disposition="merge"  # Upsert op basis van primary_key
)
def orders(
    api_key: str,
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01T00:00:00Z"
    )
):
    headers = {"X-API-Key": api_key}
    page = 1
    
    while True:
        resp = requests.get(
            f"{BASE_URL}/orders",
            headers=headers,
            params={
                "created_after": created_at.last_value,
                "page": page,
                "limit": 250
            }
        )
        data = resp.json()
        
        if not data["orders"]:
            break
            
        yield data["orders"]  # Bevat geneste order_lines → auto-genormaliseerd
        
        if not data["has_more"]:
            break
        page += 1

@dlt.resource(write_disposition="replace")
def customers(api_key: str):
    headers = {"X-API-Key": api_key}
    resp = requests.get(f"{BASE_URL}/customers", headers=headers)
    yield resp.json()["customers"]

@dlt.resource(write_disposition="replace")  
def products(api_key: str):
    headers = {"X-API-Key": api_key}
    resp = requests.get(f"{BASE_URL}/products", headers=headers)
    yield resp.json()["products"]

# Pipeline configuratie
if __name__ == "__main__":
    pipeline = dlt.pipeline(
        pipeline_name="ecommerce_bq",
        destination="bigquery",
        dataset_name="ecommerce_raw",
        dev_mode=False  # True = dataset suffix met timestamp (voor testing)
    )
    
    load_info = pipeline.run(ecommerce_source())
    
    # Load statistieken printen
    for package in load_info.load_packages:
        for table_name, table in package.schema_update.items():
            print(f"Tabel bijgewerkt: {table_name}")
    
    print(f"\nStatus: {load_info.asstr(1)}")

Dit script draait je via een Airflow DAG, GitHub Actions workflow of Cloud Run job. dlt beheert de cursor-staat tussen runs in een _dlt_loads-tabel in BigQuery zelf.

Best practices voor productie

dlt maakt het makkelijk om snel te starten, maar voor productie-waardige pipelines zijn er een aantal patronen die je altijd moet toepassen.

Secrets management

Gebruik .dlt/secrets.toml lokaal, maar in productie: environment variables of een secrets manager zoals AWS Secrets Manager of Google Secret Manager. dlt ondersteunt dit out-of-the-box.

Dev vs. productie

Gebruik altijd dev_mode=True tijdens lokale tests. Dit voegt een timestamp toe aan de dataset-naam zodat je productie-data nooit overschrijft.

Monitoring & alerting

Gebruik load_info.has_failed_jobs om mislukte loads te detecteren. Koppel dit aan Slack-notificaties of een monitoring-tool zoals Grafana.

Top 5 productie best practices

  1. Altijd primary_key instellen bij merge-disposition. Zonder primary key kun je dubbele records krijgen bij herproccessing.
  2. Schema-contracten definiëren met @dlt.resource(columns={...}) voor kritieke pipelines. Dit voorkomt verrassingen bij API-wijzigingen.
  3. Retry-logica inbouwen met dlt.sources.helpers.requests (ingebouwde retry met exponential backoff) in plaats van de standaard requests-bibliotheek.
  4. Loggings structureren: stel RUNTIME__LOG_LEVEL=INFO in als environment variable voor gestructureerde logs in productie.
  5. State isoleren per omgeving: gebruik verschillende pipeline_name-waarden voor dev, staging en productie om state-conflicten te vermijden.

Schema-contracten definiëren

import dlt
from dlt.common.schema.typing import TTableSchemaColumns

# Expliciete kolom-definities voor kritieke pipelines
ORDERS_COLUMNS: TTableSchemaColumns = {
    "order_id": {"data_type": "bigint", "nullable": False},
    "customer_email": {"data_type": "text", "nullable": False},
    "total_amount": {"data_type": "decimal", "precision": 10, "scale": 2},
    "created_at": {"data_type": "timestamp", "nullable": False},
    "status": {"data_type": "text"}
}

@dlt.resource(
    primary_key="order_id",
    write_disposition="merge",
    columns=ORDERS_COLUMNS,
    # Schema-evolutie policy: raise als er onverwachte kolommen zijn
    schema_contract={"columns": "freeze"}
)
def orders():
    yield [...]  # Je data hier

Integratie met Airflow

from airflow.decorators import dag, task
from pendulum import datetime
import dlt

@dag(
    start_date=datetime(2025, 1, 1),
    schedule="0 6 * * *",  # Dagelijks om 06:00
    catchup=False
)
def ecommerce_ingestion():
    
    @task()
    def run_dlt_pipeline():
        from your_pipelines.ecommerce import ecommerce_source
        
        pipeline = dlt.pipeline(
            pipeline_name="ecommerce_bq",
            destination="bigquery",
            dataset_name="ecommerce_raw"
        )
        
        load_info = pipeline.run(ecommerce_source())
        
        if load_info.has_failed_jobs:
            raise Exception(f"Pipeline mislukt: {load_info}")
            
        return str(load_info)
    
    run_dlt_pipeline()

ecommerce_ingestion()

Conclusie: wanneer wel en niet gebruiken

dlt is een indrukwekkend gereedschap dat data engineering democratiseert — maar zoals elk gereedschap heeft het zijn optimale gebruiksscenario's en beperkingen.

Situatie dlt aanbevolen? Reden
Custom REST API inladen ✅ Absoluut Snel, flexibel, automatische normalisatie
Incrementele loads van databases ✅ Ja Ingebouwde cursors, merge-support
Kleine tot middelgrote datasets (<10 GB/dag) ✅ Ideaal Lage overhead, snelle iteratie
50+ standaard SaaS-connectoren nodig ⚠️ Overweeg Airbyte Airbyte heeft meer kant-en-klare verified sources
Petabyte-schaal streaming ❌ Niet geschikt Gebruik Spark Structured Streaming of Flink
Non-technisch team zonder Python-kennis ❌ Moeilijk Fivetran of Airbyte UI is toegankelijker
Prototyping & PoC's