Data Engineering Code Voorbeelden

Echte, werkende code snippets voor SQL, Python en PySpark. Kopieer en gebruik direct in je projecten.

SQL Code Voorbeelden

Praktische SQL scripts voor data engineering, ETL en data analysis

Incremental Data Load

SQL Snowflake ETL

Use case: Incrementeel laden van data in Snowflake met MERGE statement. Optimaliseert performance en reduceert kosten.

  • Handelt updates en inserts
  • Minimaliseert data processing
  • Audit logging ingebouwd
incremental_load.sql
-- Incremental Data Load met MERGE statement
-- Geschikt voor Snowflake, BigQuery, Synapse

-- Stap 1: Maak staging table aan (tijdelijk)
CREATE OR REPLACE TEMPORARY TABLE staging_sales AS
SELECT
    order_id,
    customer_id,
    product_id,
    quantity,
    amount,
    order_date,
    'batch_' || CURRENT_DATE() AS batch_id,
    CURRENT_TIMESTAMP() AS loaded_at
FROM external_sales_source
WHERE order_date >= DATEADD(day, -1, CURRENT_DATE());

-- Stap 2: MERGE statement voor incremental load
MERGE INTO production.sales_fact AS target
USING staging_sales AS source
    ON target.order_id = source.order_id
WHEN MATCHED THEN
    UPDATE SET
        target.quantity = source.quantity,
        target.amount = source.amount,
        target.updated_at = CURRENT_TIMESTAMP(),
        target.is_active = TRUE
WHEN NOT MATCHED THEN
    INSERT (
        order_id, customer_id, product_id,
        quantity, amount, order_date,
        created_at, updated_at, is_active
    ) VALUES (
        source.order_id, source.customer_id, source.product_id,
        source.quantity, source.amount, source.order_date,
        CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), TRUE
    );

-- Stap 3: Soft delete oude records (optioneel)
UPDATE production.sales_fact
SET is_active = FALSE, updated_at = CURRENT_TIMESTAMP()
WHERE order_date < DATEADD(month, -13, CURRENT_DATE())
  AND is_active = TRUE;

Data Quality Checks

SQL Data Quality Validation

Use case: Uitgebreide data quality checks voor een customer database. Controleert completeness, validity en consistency.

  • NULL value checks
  • Domain validation
  • Referential integrity
  • Data profiling
data_quality_checks.sql
-- Data Quality Checks voor Customer Data
-- Geschikt voor PostgreSQL, MySQL, SQL Server

-- 1. Completeness Checks
WITH completeness_checks AS (
    SELECT
        COUNT(*) AS total_rows,
        COUNT(customer_id) AS customer_id_not_null,
        COUNT(first_name) AS first_name_not_null,
        COUNT(email) AS email_not_null,
        COUNT(CASE WHEN email LIKE '%@%.%' THEN 1 END) AS valid_email_format
    FROM production.customers
    WHERE is_active = TRUE
)
SELECT
    total_rows,
    ROUND(100.0 * email_not_null / total_rows, 2) AS email_complete_pct,
    ROUND(100.0 * valid_email_format / NULLIF(email_not_null,0), 2) AS email_valid_pct
FROM completeness_checks;

-- 2. Referential Integrity Check
SELECT COUNT(*) AS orphaned_records
FROM production.orders o
LEFT JOIN production.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;

-- 3. Business Rule Validation
SELECT COUNT(*) AS rule_violations
FROM production.orders
WHERE order_amount < 0
   OR order_date > CURRENT_DATE;

Python Code Voorbeelden

Python scripts voor data processing, ETL pipelines en data quality

ETL Pipeline Template

Python Databricks ETL

Use case: Complete ETL pipeline template met error handling, logging en configuratie management.

  • Configuration from YAML
  • Error handling en retry logic
  • Logging naar console en file
  • Modular design
etl_pipeline.py
#!/usr/bin/env python3
"""ETL Pipeline Template voor Data Engineering"""

import yaml
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler('etl_pipeline.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

class ETLError(Exception):
    pass

class ETLPipeline:
    def __init__(self, config_path: str = "config/etl_config.yaml"):
        self.config = self.load_config(config_path)
        self.stats = {'rows_processed': 0, 'errors': []}

    def load_config(self, config_path: str) -> Dict[str, Any]:
        try:
            with open(config_path, 'r') as f:
                return yaml.safe_load(f)
        except Exception as e:
            raise ETLError(f"Config laden mislukt: {e}")

    def extract(self) -> pd.DataFrame:
        source_type = self.config['source']['type']
        source_path = self.config['source']['path']
        logger.info(f"Extract data van {source_type}: {source_path}")
        if source_type == 'csv':
            return pd.read_csv(source_path, dtype=str)
        elif source_type == 'parquet':
            return pd.read_parquet(source_path)
        raise ETLError(f"Onbekend source type: {source_type}")

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        transformations = self.config.get('transformations', {})
        if transformations.get('drop_duplicates'):
            df = df.drop_duplicates()
        for col, dtype in transformations.get('type_conversions', {}).items():
            if col in df.columns:
                if dtype == 'datetime':
                    df[col] = pd.to_datetime(df[col], errors='coerce')
                elif dtype == 'numeric':
                    df[col] = pd.to_numeric(df[col], errors='coerce')
        return df

    def load(self, df: pd.DataFrame):
        target_path = self.config['target']['path']
        df.to_parquet(target_path, index=False)
        logger.info(f"Geladen: {len(df)} rijen naar {target_path}")

    def run(self):
        logger.info("ETL Pipeline gestart")
        try:
            raw = self.extract()
            transformed = self.transform(raw)
            self.load(transformed)
            self.stats['rows_processed'] = len(transformed)
            logger.info(f"Pipeline voltooid: {self.stats['rows_processed']} rijen")
            return True
        except ETLError as e:
            logger.error(f"ETL Pipeline mislukt: {e}")
            return False

if __name__ == "__main__":
    pipeline = ETLPipeline("config/etl_config.yaml")
    success = pipeline.run()
    print("Succesvol!" if success else "Mislukt, zie logs.")

PySpark Code Voorbeelden

PySpark scripts voor big data processing en distributed computing

Performance Optimization

PySpark Databricks Optimization

Use case: Geavanceerde performance tuning voor PySpark jobs. Optimaliseert partitioning, caching en query execution.

  • Dynamic partition pruning
  • Broadcast joins voor kleine tables
  • Delta Lake optimizations
  • Memory management
performance_optimization.py
"""PySpark Performance Optimization Template"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg

spark = SparkSession.builder \
    .appName("PerformanceOptimization") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "104857600") \
    .getOrCreate()

class SparkOptimizer:
    def __init__(self, spark_session):
        self.spark = spark_session

    def repartition_optimally(self, df, partition_column=None, num_partitions=None):
        current_partitions = df.rdd.getNumPartitions()
        if partition_column:
            df = df.repartition(num_partitions or 200, partition_column)
        elif num_partitions:
            df = df.repartition(num_partitions)
        else:
            optimal = max(1, int(df.count() * 100 / (1024 * 1024) / 128))
            df = df.repartition(optimal)
        print(f"Repartitioned from {current_partitions} to {df.rdd.getNumPartitions()} partitions")
        return df

def ecommerce_analytics_pipeline():
    optimizer = SparkOptimizer(spark)

    orders_df    = spark.read.format("delta").load("/mnt/data/orders")
    customers_df = spark.read.format("delta").load("/mnt/data/customers")

    recent_orders = orders_df.filter(col("order_date") >= "2024-01-01")
    recent_orders = optimizer.repartition_optimally(recent_orders, "customer_id")
    customers_df  = optimizer.repartition_optimally(customers_df,  "customer_id")

    enriched = recent_orders.join(customers_df, "customer_id", "inner")

    metrics = enriched.groupBy("customer_id", "country").agg(
        count("*").alias("total_orders"),
        sum("order_amount").alias("total_spent"),
        avg("order_amount").alias("avg_order_value")
    )
    print(f"Pipeline voltooid! {metrics.count()} rijen")
    return metrics

if __name__ == "__main__":
    try:
        results = ecommerce_analytics_pipeline()
        results.show(10, truncate=False)
    except Exception as e:
        print(f"Fout: {e}")
    finally:
        spark.stop()