Wat is dlt en waarom is het relevant in 2026?
Data engineering heeft de afgelopen jaren een enorme evolutie doorgemaakt. Waar teams vroeger weken besteedden aan het bouwen van robuuste ingestion-pipelines, verwacht het moderne data-ecosysteem dat je binnen uren productie-waardige pipelines opzet. dlt — voluit data load tool — is een open-source Python-bibliotheek die precies dat belooft: snelle, betrouwbare en zelfbeschrijvende data-pipelines zonder de overhead van zware orchestratie-platformen.
Gelanceerd door de makers achter dltHub en inmiddels omarmd door duizenden data engineers wereldwijd, positioneert dlt zich als de "pip install voor data-pipelines". Je schrijft gewone Python, en dlt regelt schema-inferentie, type-coercion, incrementele loads, genormaliseerde tabellen en foutafhandeling voor je. In 2026 is dlt uitgegroeid tot een volwaardige keuze naast gevestigde namen zoals Airbyte, Fivetran en Apache Spark — met als grote voordeel dat het volledig code-first en gratis is.
Definitie: dlt (data load tool)
dlt is een open-source Python-framework (MIT-licentie) voor het bouwen van data-ingestion-pipelines. Het automatiseert schema-inferentie, versioning, normalisatie van geneste JSON-structuren en incrementeel laden naar diverse bestemmingen zoals BigQuery, Snowflake, DuckDB, PostgreSQL en meer. Het vereist geen externe services of UI — alles leeft in je Python-code.
Waarom nu bijzonder relevant? Drie trends maken dlt in 2026 onmisbaar:
Code-first cultuur
Data teams verhuizen van klikken-in-een-UI naar git-gebaseerde, testbare pipelines. dlt sluit perfect aan op deze DevOps-mentaliteit.
AI & LLM-integraties
dlt heeft ingebouwde connectors voor REST APIs en LLM-output, waardoor het ideaal is voor het inladen van AI-gegenereerde data en vector embeddings.
Kostenbeheersing
Geen licentiekosten, geen vendor lock-in. Voor scale-ups en MKB is dit een aantrekkelijk alternatief voor dure SaaS-oplossingen.
Hoe werkt dlt? Architectuur en kernconcepten
Om dlt goed te begrijpen, is het belangrijk de drie bouwstenen te kennen: sources, resources en destinations. Daartussen zorgt dlt voor alles wat normaal handmatig geprogrammeerd moet worden.
Source — Waar komt je data vandaan?
Een source is een logische groepering van data-resources. Denk aan een REST API, database, bestandssysteem of een custom Python-generator. dlt heeft een groeiende bibliotheek van kant-en-klare verified sources (GitHub, Salesforce, Stripe, HubSpot, Notion, etc.).
Resource — De data-stroom
Een resource is een generator of functie die data oplevert. dlt inspecteert automatisch het schema van elk record, detecteert datatypes, en beheert versioning als het schema wijzigt. Geneste JSON wordt automatisch genormaliseerd naar child-tabellen.
Pipeline — De verbinding
Een pipeline verbindt source met destination. Hier configureer je de destination (bijv. BigQuery), de write-disposition (replace, append, merge) en incrementele instellingen.
Destination — Waar gaat je data naartoe?
dlt ondersteunt BigQuery, Snowflake, Redshift, PostgreSQL, DuckDB, MotherDuck, Databricks, filesystem (Parquet/CSV) en meer. De destination-laag regelt DDL-migraties automatisch bij schema-wijzigingen.
Schema-inferentie & evolutie
dlt detecteert het schema op basis van de eerste dataruns, slaat het op als JSON-bestand, en past de destination-tabellen automatisch aan als er nieuwe kolommen verschijnen. Geen handmatige DDL meer nodig.
| Concept | dlt-term | Analogie |
|---|---|---|
| Data-bron | @dlt.source |
De "tap" op de waterpijp |
| Data-stroom | @dlt.resource |
De "waterleiding" |
| Transformatie | add_map / add_filter |
Filter in de leiding |
| Bestemming | destination (bijv. bigquery) |
Het "reservoir" |
| Laadstrategie | write_disposition | Klep (vervanging vs. toevoeging) |
Praktische codevoorbeelden
Laten we direct aan de slag gaan met concrete, werkende voorbeelden. We beginnen met de installatie en bouwen stap voor stap complexere pipelines.
Installatie
# Basisinstallatie
pip install dlt
# Met BigQuery support
pip install "dlt[bigquery]"
# Met DuckDB (lokaal, ideaal voor ontwikkeling)
pip install "dlt[duckdb]"
# Met meerdere destinations
pip install "dlt[bigquery,snowflake,postgres]"
Voorbeeld 1: Eenvoudige pipeline van een REST API naar DuckDB
import dlt
import requests
# Definieer een resource met de @dlt.resource decorator
@dlt.resource(write_disposition="replace")
def github_issues(repo: str = "dlt-hub/dlt"):
"""Haalt open issues op van GitHub."""
url = f"https://api.github.com/repos/{repo}/issues"
params = {"state": "open", "per_page": 100}
while url:
response = requests.get(url, params=params)
response.raise_for_status()
yield response.json() # dlt verwerkt automatisch lijsten van dicts
# Volgende pagina via Link header
url = response.links.get("next", {}).get("url")
params = {} # Params zitten al in de next-URL
# Maak de pipeline aan — dlt beheert state en schema
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_data"
)
# Voer de pipeline uit
load_info = pipeline.run(github_issues())
print(load_info)
# Schema bekijken
print(pipeline.last_trace.last_normalize_info)
Voorbeeld 2: Incrementeel laden met cursors
import dlt
from dlt.sources.helpers import requests
from datetime import datetime, timezone
@dlt.resource(
write_disposition="append",
primary_key="id" # dlt gebruikt dit voor deduplicatie bij merge
)
def salesforce_opportunities(
# dlt beheert automatisch de cursor-waarde tussen runs
updated_at=dlt.sources.incremental(
"LastModifiedDate",
initial_value="2024-01-01T00:00:00Z"
)
):
"""
Haalt Salesforce opportunities op die zijn bijgewerkt
na de laatste run.
"""
# updated_at.last_value bevat de hoogste waarde van de vorige run
since = updated_at.last_value
# Simulatie van Salesforce SOQL query
records = [
{
"id": "006xx000001gJ1A",
"name": "Acme Corp Deal",
"amount": 50000,
"stage": "Closed Won",
"LastModifiedDate": "2025-03-15T10:30:00Z"
},
{
"id": "006xx000001gJ2B",
"name": "Beta Inc Renewal",
"amount": 12000,
"stage": "Negotiation",
"LastModifiedDate": "2025-03-16T08:00:00Z"
}
]
yield [r for r in records if r["LastModifiedDate"] > since]
pipeline = dlt.pipeline(
pipeline_name="salesforce_pipeline",
destination="bigquery",
dataset_name="crm_data"
)
# Eerste run: laadt alles vanaf initial_value
# Volgende run: laadt alleen nieuwe/gewijzigde records
load_info = pipeline.run(salesforce_opportunities())
print(f"Geladen: {load_info}")
Voorbeeld 3: Geneste JSON automatisch genormaliseerd
import dlt
# dlt normaliseert geneste structuren automatisch
@dlt.resource(write_disposition="replace")
def orders_with_items():
yield [
{
"order_id": 1001,
"customer": "Jan de Vries",
"order_date": "2025-03-01",
"status": "shipped",
# Geneste lijst → aparte child-tabel: orders__items
"items": [
{"sku": "PROD-A", "qty": 2, "price": 29.99},
{"sku": "PROD-B", "qty": 1, "price": 49.99}
],
# Geneste dict → kolommen met prefix: address__street etc.
"shipping_address": {
"street": "Keizersgracht 123",
"city": "Amsterdam",
"postal_code": "1015CJ"
}
}
]
pipeline = dlt.pipeline(
pipeline_name="orders_pipeline",
destination="duckdb",
dataset_name="shop_data"
)
pipeline.run(orders_with_items())
# Resulterende tabellen:
# - shop_data.orders_with_items (hoofd-tabel)
# - shop_data.orders_with_items__items (child-tabel, automatisch gelinkt via _dlt_parent_id)
print("Tabellen aangemaakt:")
with pipeline.sql_client() as client:
with client.execute_query("SHOW TABLES") as cursor:
for row in cursor.fetchall():
print(f" → {row[0]}")
Voorbeeld 4: Secrets en configuratie beheren
# .dlt/secrets.toml (nooit committen naar git!)
[sources.api_key]
token = "mijn_geheime_api_key"
[destination.bigquery]
project_id = "mijn-gcp-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "service-account@project.iam.gserviceaccount.com"
# In je pipeline-code:
import dlt
@dlt.resource
def secure_api_data(
api_key: str = dlt.secrets.value # Automatisch geladen uit secrets.toml
):
headers = {"Authorization": f"Bearer {api_key}"}
# ... rest van de implementatie
Pro tip: gebruik DuckDB voor lokale ontwikkeling
Begin altijd met destination="duckdb" tijdens ontwikkeling. DuckDB heeft geen externe services nodig, is razendsnel, en je kunt de data direct inspecteren met pipeline.dataset(). Schakel daarna over naar BigQuery of Snowflake voor productie zonder één regel in je pipeline-code te wijzigen.
dlt versus alternatieven
dlt is niet de enige speler in de data-ingestion-ruimte. Hieronder vergelijken we de belangrijkste alternatieven op de meest relevante dimensies voor een moderne data engineer.
| Criterium | dlt | Airbyte (OSS) | Fivetran | Singer/Meltano | Spark (custom) |
|---|---|---|---|---|---|
| Kosten | ✅ Gratis (MIT) | ✅ Gratis (ELv2) | ❌ Betaald per rij | ✅ Gratis | ✅ Gratis (infra apart) |
| Setup-complexiteit | ✅ pip install | ⚠️ Docker vereist | ✅ Managed SaaS | ⚠️ Matig | ❌ Hoog |
| Code-first | ✅ Pure Python | ⚠️ YAML + Python | ❌ UI-gebaseerd | ✅ Python/YAML | ✅ Scala/Python |
| Schema-inferentie | ✅ Automatisch | ✅ Automatisch | ✅ Automatisch | ⚠️ Handmatig | ❌ Handmatig |
| Incremental loads | ✅ Ingebouwd | ✅ Ingebouwd | ✅ Ingebouwd | ⚠️ Wisselend | ✅ Zelf bouwen |
| Geneste JSON | ✅ Auto-normalisatie | ✅ Auto-normalisatie | ✅ Auto-normalisatie | ❌ Handmatig | ⚠️ Zelf bouwen |
| Custom connectors | ✅ Gewone Python | ⚠️ Connector SDK | ❌ Beperkt/betaald | ✅ Tap-protocol | ✅ Volledig custom |
| Orchestratie nodig | ⚠️ Extern (Airflow, Prefect) | ✅ Ingebouwd | ✅ Ingebouwd | ⚠️ Extern | ⚠️ Extern |
| Schaalgrootte | ⚠️ Middelgroot | ✅ Groot | ✅ Enterprise | ⚠️ Klein-middel | ✅ Zeer groot |
De kernboodschap: dlt schittert wanneer je snel, flexibel en code-first wilt werken zonder infrastructuurlasten. Voor enterprise-schaal met tientallen kant-en-klare connectoren en een managed service is Fivetran nog steeds een sterke keuze. Airbyte biedt een interessant middenweg met zijn eigen UI én een Python SDK.
Praktijkvoorbeeld: REST API naar BigQuery
Scenario: E-commerce platform — orders inladen
Een Nederlandse scale-up heeft een custom e-commerce platform met een REST API. Ze willen dagelijks alle nieuwe orders laden naar BigQuery voor analytics. Met dlt bouwen we dit in minder dan 50 regels code, inclusief incrementeel laden en automatische schema-evolutie.
import dlt
from dlt.sources.helpers import requests
BASE_URL = "https://api.shop.nl/v2"
@dlt.source(name="ecommerce")
def ecommerce_source(api_key: str = dlt.secrets.value):
"""Bundelt alle e-commerce resources."""
return [
orders(api_key),
customers(api_key),
products(api_key)
]
@dlt.resource(
primary_key="order_id",
write_disposition="merge" # Upsert op basis van primary_key
)
def orders(
api_key: str,
created_at=dlt.sources.incremental(
"created_at",
initial_value="2024-01-01T00:00:00Z"
)
):
headers = {"X-API-Key": api_key}
page = 1
while True:
resp = requests.get(
f"{BASE_URL}/orders",
headers=headers,
params={
"created_after": created_at.last_value,
"page": page,
"limit": 250
}
)
data = resp.json()
if not data["orders"]:
break
yield data["orders"] # Bevat geneste order_lines → auto-genormaliseerd
if not data["has_more"]:
break
page += 1
@dlt.resource(write_disposition="replace")
def customers(api_key: str):
headers = {"X-API-Key": api_key}
resp = requests.get(f"{BASE_URL}/customers", headers=headers)
yield resp.json()["customers"]
@dlt.resource(write_disposition="replace")
def products(api_key: str):
headers = {"X-API-Key": api_key}
resp = requests.get(f"{BASE_URL}/products", headers=headers)
yield resp.json()["products"]
# Pipeline configuratie
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="ecommerce_bq",
destination="bigquery",
dataset_name="ecommerce_raw",
dev_mode=False # True = dataset suffix met timestamp (voor testing)
)
load_info = pipeline.run(ecommerce_source())
# Load statistieken printen
for package in load_info.load_packages:
for table_name, table in package.schema_update.items():
print(f"Tabel bijgewerkt: {table_name}")
print(f"\nStatus: {load_info.asstr(1)}")
Dit script draait je via een Airflow DAG, GitHub Actions workflow of Cloud Run job. dlt beheert de cursor-staat tussen runs in een _dlt_loads-tabel in BigQuery zelf.
Best practices voor productie
dlt maakt het makkelijk om snel te starten, maar voor productie-waardige pipelines zijn er een aantal patronen die je altijd moet toepassen.
Secrets management
Gebruik .dlt/secrets.toml lokaal, maar in productie: environment variables of een secrets manager zoals AWS Secrets Manager of Google Secret Manager. dlt ondersteunt dit out-of-the-box.
Dev vs. productie
Gebruik altijd dev_mode=True tijdens lokale tests. Dit voegt een timestamp toe aan de dataset-naam zodat je productie-data nooit overschrijft.
Monitoring & alerting
Gebruik load_info.has_failed_jobs om mislukte loads te detecteren. Koppel dit aan Slack-notificaties of een monitoring-tool zoals Grafana.
Top 5 productie best practices
- Altijd
primary_keyinstellen bij merge-disposition. Zonder primary key kun je dubbele records krijgen bij herproccessing. - Schema-contracten definiëren met
@dlt.resource(columns={...})voor kritieke pipelines. Dit voorkomt verrassingen bij API-wijzigingen. - Retry-logica inbouwen met
dlt.sources.helpers.requests(ingebouwde retry met exponential backoff) in plaats van de standaardrequests-bibliotheek. - Loggings structureren: stel
RUNTIME__LOG_LEVEL=INFOin als environment variable voor gestructureerde logs in productie. - State isoleren per omgeving: gebruik verschillende
pipeline_name-waarden voor dev, staging en productie om state-conflicten te vermijden.
Schema-contracten definiëren
import dlt
from dlt.common.schema.typing import TTableSchemaColumns
# Expliciete kolom-definities voor kritieke pipelines
ORDERS_COLUMNS: TTableSchemaColumns = {
"order_id": {"data_type": "bigint", "nullable": False},
"customer_email": {"data_type": "text", "nullable": False},
"total_amount": {"data_type": "decimal", "precision": 10, "scale": 2},
"created_at": {"data_type": "timestamp", "nullable": False},
"status": {"data_type": "text"}
}
@dlt.resource(
primary_key="order_id",
write_disposition="merge",
columns=ORDERS_COLUMNS,
# Schema-evolutie policy: raise als er onverwachte kolommen zijn
schema_contract={"columns": "freeze"}
)
def orders():
yield [...] # Je data hier
Integratie met Airflow
from airflow.decorators import dag, task
from pendulum import datetime
import dlt
@dag(
start_date=datetime(2025, 1, 1),
schedule="0 6 * * *", # Dagelijks om 06:00
catchup=False
)
def ecommerce_ingestion():
@task()
def run_dlt_pipeline():
from your_pipelines.ecommerce import ecommerce_source
pipeline = dlt.pipeline(
pipeline_name="ecommerce_bq",
destination="bigquery",
dataset_name="ecommerce_raw"
)
load_info = pipeline.run(ecommerce_source())
if load_info.has_failed_jobs:
raise Exception(f"Pipeline mislukt: {load_info}")
return str(load_info)
run_dlt_pipeline()
ecommerce_ingestion()
Conclusie: wanneer wel en niet gebruiken
dlt is een indrukwekkend gereedschap dat data engineering democratiseert — maar zoals elk gereedschap heeft het zijn optimale gebruiksscenario's en beperkingen.
| Situatie | dlt aanbevolen? | Reden |
|---|---|---|
| Custom REST API inladen | ✅ Absoluut | Snel, flexibel, automatische normalisatie |
| Incrementele loads van databases | ✅ Ja | Ingebouwde cursors, merge-support |
| Kleine tot middelgrote datasets (<10 GB/dag) | ✅ Ideaal | Lage overhead, snelle iteratie |
| 50+ standaard SaaS-connectoren nodig | ⚠️ Overweeg Airbyte | Airbyte heeft meer kant-en-klare verified sources |
| Petabyte-schaal streaming | ❌ Niet geschikt | Gebruik Spark Structured Streaming of Flink |
| Non-technisch team zonder Python-kennis | ❌ Moeilijk | Fivetran of Airbyte UI is toegankelijker |
| Prototyping & PoC's |