DataPartner365

Jouw partner voor datagedreven groei en inzichten

API Integration Patterns: Best Practices voor Data Engineering

Gepubliceerd: 20 januari 2026
Leestijd: 12 minuten
API Integration, REST, GraphQL, Webhooks, Data Pipelines, ETL, Python, Best Practices
Expert niveau: Intermediate

In moderne data engineering zijn API's de levensader van data pipelines. Ontdek de belangrijkste integration patterns van batch processing tot real-time streaming, met praktische Python implementaties.

API Integration: Het Hart van Moderne Data Pipelines

API's vormen de verbindingspunten tussen data sources, transformaties en destinations in moderne data architecturen. Het kiezen van het juiste integration pattern bepaalt de schaalbaarheid, betrouwbaarheid en performance van je data pipelines.

Waarom API Patterns Belangrijk Zijn

Volgens recent onderzoek besteedt een gemiddelde data engineer 40% van hun tijd aan API integration en data extraction. De juiste patterns implementeren kan deze tijd halveren en de betrouwbaarheid van pipelines verdubbelen.

API Protocol Keuze: REST vs GraphQL vs gRPC

Protocol Best Voor Performance Complexiteit Data Engineering Use Case
REST CRUD operaties, caching, eenvoudige queries Goed (HTTP/1.1) Laag Batch data extraction, File downloads
GraphQL Flexibele queries, nested data, over/under fetching voorkomen Uitstekend (single endpoint) Medium Complexe data modellen, Real-time dashboards
gRPC Microservices, real-time streaming, binary data Excellent (HTTP/2, binary) Hoog High-frequency data, IoT streams
WebSockets Real-time bidirectional communicatie Excellent (persistent connection) Medium Live dashboards, Trading platforms

Pattern 1: Batch API Polling

1
Paginated Data Extraction

Het meest voorkomende pattern voor historische data extraction. Ideaal voor REST APIs met pagination support.

Python Implementatie
import requests
import pandas as pd
from datetime import datetime, timedelta

class PaginatedAPIExtractor:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.session = requests.Session()
        
    def extract_with_pagination(self, endpoint, params=None):
        """Extraheer alle data met pagination"""
        all_data = []
        page = 1
        has_more = True
        
        while has_more:
            # Voeg pagination parameters toe
            paginated_params = {'page': page, 'limit': 100}
            if params:
                paginated_params.update(params)
            
            response = self.session.get(
                f"{self.base_url}/{endpoint}",
                headers=self.headers,
                params=paginated_params
            )
            response.raise_for_status()
            
            data = response.json()
            all_data.extend(data.get('results', []))
            
            # Check voor volgende pagina
            has_more = data.get('has_next', False)
            page += 1
            
            # Rate limiting respecteren
            if 'X-RateLimit-Remaining' in response.headers:
                remaining = int(response.headers['X-RateLimit-Remaining'])
                if remaining < 10:
                    time.sleep(60)  # Wacht 1 minuut
            
        return pd.DataFrame(all_data)
    
    def incremental_extraction(self, endpoint, date_field='created_at'):
        """Incrementele extractie sinds laatste run"""
        last_run = self._get_last_run_time()
        params = {f'{date_field}__gte': last_run.isoformat()}
        
        return self.extract_with_pagination(endpoint, params)
2
Best Practices voor Batch Polling
  • Exponential Backoff: Implementeer bij rate limiting errors
  • Checkpointing: Sla laatste succesvolle timestamp op
  • Idempotentie: Zorg dat herhaalde calls geen duplicate data creëren
  • Parallel Processing: Gebruik asyncio voor meerdere endpoints
Performance Tip

Gebruik aiohttp voor async requests wanneer je meerdere API endpoints parallel moet pollen. Dit kan de extractie tijd met 300% verbeteren.

Pattern 2: Webhook Integration (Push-based)

Webhook Receiver Implementatie

from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
import json
from typing import Dict, Any

app = FastAPI()

class WebhookProcessor:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.event_handlers = {}
        
    def verify_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook signature voor security"""
        expected = hmac.new(
            self.secret_key.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(expected, signature)
    
    def register_handler(self, event_type: str, handler):
        """Registreer handler voor specifiek event type"""
        self.event_handlers[event_type] = handler
    
    async def process_event(self, event: Dict[str, Any]):
        """Verwerk webhook event"""
        event_type = event.get('type')
        if event_type in self.event_handlers:
            await self.event_handlers[event_type](event)
        else:
            print(f"Geen handler voor event type: {event_type}")

processor = WebhookProcessor(secret_key="jouw-secret-key")

@app.post("/webhook")
async def receive_webhook(request: Request):
    # Lees payload en signature
    payload = await request.body()
    signature = request.headers.get('X-Webhook-Signature')
    
    # Verifieer signature
    if not processor.verify_signature(payload, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Parse en verwerk event
    event = json.loads(payload.decode())
    await processor.process_event(event)
    
    return {"status": "success"}

# Registreer event handlers
@processor.register_handler("order.created")
async def handle_order_created(event: Dict[str, Any]):
    """Verwerk nieuwe order"""
    order_data = event['data']
    # Sla op in database of queue voor verdere verwerking
    print(f"Nieuwe order ontvangen: {order_data['id']}")

@processor.register_handler("customer.updated")
async def handle_customer_update(event: Dict[str, Any]):
    """Verwerk klant update"""
    customer_data = event['data']
    # Update customer record in data warehouse
    print(f"Klant bijgewerkt: {customer_data['email']}")

Webhook Best Practices

  • Signature Verification: Altijd implementeren voor security
  • Idempotency Keys: Voorkom duplicate processing
  • Retry Logic: Implementeer dead letter queue voor failed events
  • Event Schema Versioning: Behandel backward compatibility
  • Async Processing: Verwerk events asynchroon om timeouts te voorkomen
Wanneer Webhooks Gebruiken?

✅ Ideal voor: Real-time data sync, event-driven architecture, instant notifications

⚠️ Niet ideaal voor: Historische data backfill, zeer hoge volumes (>1000 events/sec)

Pattern 3: GraphQL voor Complexe Data

GraphQL Data Extraction met Python

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import pandas as pd
from typing import List, Dict

class GraphQLExtractor:
    def __init__(self, endpoint: str, api_key: str):
        transport = AIOHTTPTransport(
            url=endpoint,
            headers={'Authorization': f'Bearer {api_key}'}
        )
        self.client = Client(transport=transport, fetch_schema_from_transport=True)
    
    async def fetch_nested_data(self, query: str, variables: Dict = None):
        """Fetch data met complexe GraphQL query"""
        gql_query = gql(query)
        
        try:
            result = await self.client.execute_async(
                gql_query,
                variable_values=variables
            )
            return result
        except Exception as e:
            print(f"GraphQL error: {e}")
            raise
    
    def create_ecommerce_query(self):
        """Genereer complexe e-commerce query"""
        return """
        query GetOrdersWithDetails($startDate: String!, $endDate: String!) {
          orders(filter: { 
            dateRange: { start: $startDate, end: $endDate }
            status: [COMPLETED, PROCESSING]
          }) {
            id
            orderNumber
            createdAt
            totalAmount
            customer {
              id
              email
              firstName
              lastName
              address {
                street
                city
                zipCode
                country
              }
            }
            items {
              product {
                id
                name
                sku
                category {
                  name
                  parentCategory {
                    name
                  }
                }
              }
              quantity
              unitPrice
              totalPrice
            }
            payments {
              method
              amount
              status
              transactionId
            }
            shipping {
              method
              cost
              trackingNumber
              estimatedDelivery
            }
          }
          analytics: orderAnalytics(
            startDate: $startDate,
            endDate: $endDate
          ) {
            totalOrders
            totalRevenue
            averageOrderValue
            topProducts {
              productId
              productName
              quantitySold
              revenue
            }
          }
        }
        """
    
    async def extract_ecommerce_data(self, start_date: str, end_date: str):
        """Extract complete e-commerce dataset in één call"""
        query = self.create_ecommerce_query()
        variables = {
            "startDate": start_date,
            "endDate": end_date
        }
        
        result = await self.fetch_nested_data(query, variables)
        
        # Transformeer naar pandas DataFrames
        orders_df = pd.json_normalize(result['orders'])
        analytics_df = pd.DataFrame([result['analytics']])
        
        return {
            'orders': orders_df,
            'analytics': analytics_df
        }

Pattern 4: Real-time Streaming APIs

WebSocket Streaming

import asyncio
import websockets
import json
import pandas as pd
from datetime import datetime
from collections import deque

class WebSocketStreamer:
    def __init__(self, uri: str, max_buffer: int = 10000):
        self.uri = uri
        self.buffer = deque(maxlen=max_buffer)
        self.running = False
        
    async def connect_and_stream(self):
        """Maak verbinding en stream real-time data"""
        self.running = True
        
        async with websockets.connect(self.uri) as websocket:
            # Subscribe naar events
            await websocket.send(json.dumps({
                "action": "subscribe",
                "channels": ["trades", "orderbook"]
            }))
            
            # Ontvang en verwerk messages
            async for message in websocket:
                if not self.running:
                    break
                    
                data = json.loads(message)
                await self.process_message(data)
    
    async def process_message(self, data: dict):
        """Verwerk incoming message"""
        event_type = data.get('type')
        
        if event_type == 'trade':
            trade_data = {
                'timestamp': datetime.now(),
                'symbol': data['symbol'],
                'price': float(data['price']),
                'quantity': float(data['quantity']),
                'side': data['side']
            }
            self.buffer.append(trade_data)
            
            # Batch verwerking elke 100 trades
            if len(self.buffer) >= 100:
                await self.process_batch()
                
        elif event_type == 'orderbook_update':
            # Update orderbook state
            pass
    
    async def process_batch(self):
        """Verwerk batch van trades"""
        if not self.buffer:
            return
            
        batch = list(self.buffer)
        df = pd.DataFrame(batch)
        
        # Aggregaties berekenen
        aggregations = df.groupby('symbol').agg({
            'price': ['mean', 'min', 'max'],
            'quantity': 'sum'
        }).round(4)
        
        # Sla op naar database of publish naar message queue
        print(f"Verwerkt batch van {len(batch)} trades")
        print(aggregations)
        
        # Clear buffer na verwerking
        self.buffer.clear()
    
    def stop(self):
        """Stop de stream"""
        self.running = False

Streaming Best Practices

  • Connection Management: Implementeer reconnection logic
  • Backpressure Handling: Buffer management bij hoge volumes
  • State Management: Bewaar connection state voor recovery
  • Monitoring: Track message rates en latency
  • Batch Processing: Verwerk in batches voor efficiency
10ms
End-to-end latency

Error Handling & Resilience Patterns

Resilience Patterns voor API Integrations

1. Circuit Breaker Pattern
class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "HALF_OPEN"
            else:
                raise CircuitBreakerError("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
            
            raise
2. Retry met Exponential Backoff
import asyncio
import random

async def call_api_with_retry(func, max_retries=5, base_delay=1):
    """Retry met exponential backoff en jitter"""
    for attempt in range(max_retries):
        try:
            return await func()
        except (requests.exceptions.RequestException, 
                aiohttp.ClientError) as e:
            
            if attempt == max_retries - 1:
                raise
            
            # Exponential backoff met jitter
            delay = base_delay * (2 ** attempt)
            jitter = random.uniform(0, 0.1 * delay)
            wait_time = delay + jitter
            
            print(f"Retry {attempt + 1}/{max_retries} in {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
3. Dead Letter Queue
class DeadLetterQueue:
    def __init__(self, storage_path="dlq/"):
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)
    
    async def store_failed_message(self, 
                                 endpoint: str, 
                                 payload: dict, 
                                 error: str):
        """Sla gefaald bericht op voor later onderzoek"""
        filename = f"{self.storage_path}/{datetime.now().isoformat()}.json"
        
        failed_message = {
            'endpoint': endpoint,
            'payload': payload,
            'error': str(error),
            'timestamp': datetime.now().isoformat(),
            'retry_count': 0
        }
        
        with open(filename, 'w') as f:
            json.dump(failed_message, f, indent=2)
        
        return filename

Monitoring & Observability

Metric Tooling Alert Threshold Actie
API Latency Prometheus, Datadog > 2 seconden P95 Circuit breaker activeren
Error Rate ELK Stack, Splunk > 5% (5 min window) Retry logic testen
Rate Limit Usage Custom metrics > 80% van limit Throttling implementeren
Data Freshness Great Expectations Data > 15 min oud Pipeline restart
Memory Usage Grafana, CloudWatch > 80% van memory Buffer size aanpassen

Conclusie en Aanbevelingen

API Integration Patterns Samenvatting

Voor Batch Processing
  • REST + Pagination: Historische data extraction
  • Checkpointing: Incrementele updates
  • Parallel Extraction: Gebruik asyncio voor snelheid
Voor Real-time
  • Webhooks: Event-driven architecture
  • WebSockets: Live streaming data
  • Message Queues: Buffer voor hoge volumes
Voor Complexe Data
  • GraphQL: Nested relations, flexible queries
  • gRPC: High-performance microservices
  • API Composition: Combineer meerdere endpoints

Praktische Tips voor Nederlandse Data Engineers

Voor RDW-achtige APIs: Gebruik batch polling met checkpointing voor grote historische datasets. Implementeer exponential backoff voor rate limiting en sla incremental updates op in Parquet format voor efficiente querying.

Voor E-commerce APIs: Combineer webhooks voor real-time order updates met GraphQL voor complexe product/category queries. Gebruik circuit breakers tijdens piekbelasting (Black Friday).

Voor Financial Data: Implementeer WebSockets voor real-time market data met backpressure handling. Sla raw streams op in time-series databases zoals TimescaleDB voor compliance en analytics.

Abdullah Özisik - Data Engineer

👨‍💻 Over de auteur

Abdullah Özisik - Data Engineer met specialisatie in API integrations en data pipeline architectuur. Heeft tientallen API-integraties gebouwd voor Nederlandse organisaties, van government APIs tot e-commerce platforms.

Vorige: DuckDB vs PostgreSQL Volgende: Data Pipeline Best Practices