Data Engineering Code Voorbeelden
Echte, werkende code snippets voor SQL, Python en PySpark. Kopieer en gebruik direct in je projecten.
SQL Code Voorbeelden
Praktische SQL scripts voor data engineering, ETL en data analysis
Incremental Data Load
Use case: Incrementeel laden van data in Snowflake met MERGE statement. Optimaliseert performance en reduceert kosten.
- Handelt updates en inserts
- Minimaliseert data processing
- Audit logging ingebouwd
incremental_load.sql
-- Incremental Data Load met MERGE statement
-- Geschikt voor Snowflake, BigQuery, Synapse
-- Stap 1: Maak staging table aan (tijdelijk)
CREATE OR REPLACE TEMPORARY TABLE staging_sales AS
SELECT
order_id,
customer_id,
product_id,
quantity,
amount,
order_date,
'batch_' || CURRENT_DATE() AS batch_id,
CURRENT_TIMESTAMP() AS loaded_at
FROM external_sales_source
WHERE order_date >= DATEADD(day, -1, CURRENT_DATE());
-- Stap 2: MERGE statement voor incremental load
MERGE INTO production.sales_fact AS target
USING staging_sales AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN
UPDATE SET
target.quantity = source.quantity,
target.amount = source.amount,
target.updated_at = CURRENT_TIMESTAMP(),
target.is_active = TRUE
WHEN NOT MATCHED THEN
INSERT (
order_id, customer_id, product_id,
quantity, amount, order_date,
created_at, updated_at, is_active
) VALUES (
source.order_id, source.customer_id, source.product_id,
source.quantity, source.amount, source.order_date,
CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), TRUE
);
-- Stap 3: Soft delete oude records (optioneel)
UPDATE production.sales_fact
SET is_active = FALSE, updated_at = CURRENT_TIMESTAMP()
WHERE order_date < DATEADD(month, -13, CURRENT_DATE())
AND is_active = TRUE;
Data Quality Checks
Use case: Uitgebreide data quality checks voor een customer database. Controleert completeness, validity en consistency.
- NULL value checks
- Domain validation
- Referential integrity
- Data profiling
data_quality_checks.sql
-- Data Quality Checks voor Customer Data
-- Geschikt voor PostgreSQL, MySQL, SQL Server
-- 1. Completeness Checks
WITH completeness_checks AS (
SELECT
COUNT(*) AS total_rows,
COUNT(customer_id) AS customer_id_not_null,
COUNT(first_name) AS first_name_not_null,
COUNT(email) AS email_not_null,
COUNT(CASE WHEN email LIKE '%@%.%' THEN 1 END) AS valid_email_format
FROM production.customers
WHERE is_active = TRUE
)
SELECT
total_rows,
ROUND(100.0 * email_not_null / total_rows, 2) AS email_complete_pct,
ROUND(100.0 * valid_email_format / NULLIF(email_not_null,0), 2) AS email_valid_pct
FROM completeness_checks;
-- 2. Referential Integrity Check
SELECT COUNT(*) AS orphaned_records
FROM production.orders o
LEFT JOIN production.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
-- 3. Business Rule Validation
SELECT COUNT(*) AS rule_violations
FROM production.orders
WHERE order_amount < 0
OR order_date > CURRENT_DATE;
Python Code Voorbeelden
Python scripts voor data processing, ETL pipelines en data quality
ETL Pipeline Template
Use case: Complete ETL pipeline template met error handling, logging en configuratie management.
- Configuration from YAML
- Error handling en retry logic
- Logging naar console en file
- Modular design
etl_pipeline.py
#!/usr/bin/env python3
"""ETL Pipeline Template voor Data Engineering"""
import yaml
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('etl_pipeline.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class ETLError(Exception):
pass
class ETLPipeline:
def __init__(self, config_path: str = "config/etl_config.yaml"):
self.config = self.load_config(config_path)
self.stats = {'rows_processed': 0, 'errors': []}
def load_config(self, config_path: str) -> Dict[str, Any]:
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f)
except Exception as e:
raise ETLError(f"Config laden mislukt: {e}")
def extract(self) -> pd.DataFrame:
source_type = self.config['source']['type']
source_path = self.config['source']['path']
logger.info(f"Extract data van {source_type}: {source_path}")
if source_type == 'csv':
return pd.read_csv(source_path, dtype=str)
elif source_type == 'parquet':
return pd.read_parquet(source_path)
raise ETLError(f"Onbekend source type: {source_type}")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
transformations = self.config.get('transformations', {})
if transformations.get('drop_duplicates'):
df = df.drop_duplicates()
for col, dtype in transformations.get('type_conversions', {}).items():
if col in df.columns:
if dtype == 'datetime':
df[col] = pd.to_datetime(df[col], errors='coerce')
elif dtype == 'numeric':
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def load(self, df: pd.DataFrame):
target_path = self.config['target']['path']
df.to_parquet(target_path, index=False)
logger.info(f"Geladen: {len(df)} rijen naar {target_path}")
def run(self):
logger.info("ETL Pipeline gestart")
try:
raw = self.extract()
transformed = self.transform(raw)
self.load(transformed)
self.stats['rows_processed'] = len(transformed)
logger.info(f"Pipeline voltooid: {self.stats['rows_processed']} rijen")
return True
except ETLError as e:
logger.error(f"ETL Pipeline mislukt: {e}")
return False
if __name__ == "__main__":
pipeline = ETLPipeline("config/etl_config.yaml")
success = pipeline.run()
print("Succesvol!" if success else "Mislukt, zie logs.")
PySpark Code Voorbeelden
PySpark scripts voor big data processing en distributed computing
Performance Optimization
Use case: Geavanceerde performance tuning voor PySpark jobs. Optimaliseert partitioning, caching en query execution.
- Dynamic partition pruning
- Broadcast joins voor kleine tables
- Delta Lake optimizations
- Memory management
performance_optimization.py
"""PySpark Performance Optimization Template"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg
spark = SparkSession.builder \
.appName("PerformanceOptimization") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.autoBroadcastJoinThreshold", "104857600") \
.getOrCreate()
class SparkOptimizer:
def __init__(self, spark_session):
self.spark = spark_session
def repartition_optimally(self, df, partition_column=None, num_partitions=None):
current_partitions = df.rdd.getNumPartitions()
if partition_column:
df = df.repartition(num_partitions or 200, partition_column)
elif num_partitions:
df = df.repartition(num_partitions)
else:
optimal = max(1, int(df.count() * 100 / (1024 * 1024) / 128))
df = df.repartition(optimal)
print(f"Repartitioned from {current_partitions} to {df.rdd.getNumPartitions()} partitions")
return df
def ecommerce_analytics_pipeline():
optimizer = SparkOptimizer(spark)
orders_df = spark.read.format("delta").load("/mnt/data/orders")
customers_df = spark.read.format("delta").load("/mnt/data/customers")
recent_orders = orders_df.filter(col("order_date") >= "2024-01-01")
recent_orders = optimizer.repartition_optimally(recent_orders, "customer_id")
customers_df = optimizer.repartition_optimally(customers_df, "customer_id")
enriched = recent_orders.join(customers_df, "customer_id", "inner")
metrics = enriched.groupBy("customer_id", "country").agg(
count("*").alias("total_orders"),
sum("order_amount").alias("total_spent"),
avg("order_amount").alias("avg_order_value")
)
print(f"Pipeline voltooid! {metrics.count()} rijen")
return metrics
if __name__ == "__main__":
try:
results = ecommerce_analytics_pipeline()
results.show(10, truncate=False)
except Exception as e:
print(f"Fout: {e}")
finally:
spark.stop()