API Integration: Het Hart van Moderne Data Pipelines
API's vormen de verbindingspunten tussen data sources, transformaties en destinations in moderne data architecturen. Het kiezen van het juiste integration pattern bepaalt de schaalbaarheid, betrouwbaarheid en performance van je data pipelines.
Waarom API Patterns Belangrijk Zijn
Volgens recent onderzoek besteedt een gemiddelde data engineer 40% van hun tijd aan API integration en data extraction. De juiste patterns implementeren kan deze tijd halveren en de betrouwbaarheid van pipelines verdubbelen.
API Protocol Keuze: REST vs GraphQL vs gRPC
| Protocol | Best Voor | Performance | Complexiteit | Data Engineering Use Case |
|---|---|---|---|---|
| REST | CRUD operaties, caching, eenvoudige queries | Goed (HTTP/1.1) | Laag | Batch data extraction, File downloads |
| GraphQL | Flexibele queries, nested data, over/under fetching voorkomen | Uitstekend (single endpoint) | Medium | Complexe data modellen, Real-time dashboards |
| gRPC | Microservices, real-time streaming, binary data | Excellent (HTTP/2, binary) | Hoog | High-frequency data, IoT streams |
| WebSockets | Real-time bidirectional communicatie | Excellent (persistent connection) | Medium | Live dashboards, Trading platforms |
Pattern 1: Batch API Polling
Paginated Data Extraction
Het meest voorkomende pattern voor historische data extraction. Ideaal voor REST APIs met pagination support.
Python Implementatie
import requests
import pandas as pd
from datetime import datetime, timedelta
class PaginatedAPIExtractor:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {api_key}'}
self.session = requests.Session()
def extract_with_pagination(self, endpoint, params=None):
"""Extraheer alle data met pagination"""
all_data = []
page = 1
has_more = True
while has_more:
# Voeg pagination parameters toe
paginated_params = {'page': page, 'limit': 100}
if params:
paginated_params.update(params)
response = self.session.get(
f"{self.base_url}/{endpoint}",
headers=self.headers,
params=paginated_params
)
response.raise_for_status()
data = response.json()
all_data.extend(data.get('results', []))
# Check voor volgende pagina
has_more = data.get('has_next', False)
page += 1
# Rate limiting respecteren
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
if remaining < 10:
time.sleep(60) # Wacht 1 minuut
return pd.DataFrame(all_data)
def incremental_extraction(self, endpoint, date_field='created_at'):
"""Incrementele extractie sinds laatste run"""
last_run = self._get_last_run_time()
params = {f'{date_field}__gte': last_run.isoformat()}
return self.extract_with_pagination(endpoint, params)
Best Practices voor Batch Polling
- Exponential Backoff: Implementeer bij rate limiting errors
- Checkpointing: Sla laatste succesvolle timestamp op
- Idempotentie: Zorg dat herhaalde calls geen duplicate data creëren
- Parallel Processing: Gebruik asyncio voor meerdere endpoints
Performance Tip
Gebruik aiohttp voor async requests wanneer je meerdere API endpoints parallel moet pollen. Dit kan de extractie tijd met 300% verbeteren.
Pattern 2: Webhook Integration (Push-based)
Webhook Receiver Implementatie
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
import json
from typing import Dict, Any
app = FastAPI()
class WebhookProcessor:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.event_handlers = {}
def verify_signature(self, payload: bytes, signature: str) -> bool:
"""Verify webhook signature voor security"""
expected = hmac.new(
self.secret_key.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def register_handler(self, event_type: str, handler):
"""Registreer handler voor specifiek event type"""
self.event_handlers[event_type] = handler
async def process_event(self, event: Dict[str, Any]):
"""Verwerk webhook event"""
event_type = event.get('type')
if event_type in self.event_handlers:
await self.event_handlers[event_type](event)
else:
print(f"Geen handler voor event type: {event_type}")
processor = WebhookProcessor(secret_key="jouw-secret-key")
@app.post("/webhook")
async def receive_webhook(request: Request):
# Lees payload en signature
payload = await request.body()
signature = request.headers.get('X-Webhook-Signature')
# Verifieer signature
if not processor.verify_signature(payload, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse en verwerk event
event = json.loads(payload.decode())
await processor.process_event(event)
return {"status": "success"}
# Registreer event handlers
@processor.register_handler("order.created")
async def handle_order_created(event: Dict[str, Any]):
"""Verwerk nieuwe order"""
order_data = event['data']
# Sla op in database of queue voor verdere verwerking
print(f"Nieuwe order ontvangen: {order_data['id']}")
@processor.register_handler("customer.updated")
async def handle_customer_update(event: Dict[str, Any]):
"""Verwerk klant update"""
customer_data = event['data']
# Update customer record in data warehouse
print(f"Klant bijgewerkt: {customer_data['email']}")
Webhook Best Practices
- Signature Verification: Altijd implementeren voor security
- Idempotency Keys: Voorkom duplicate processing
- Retry Logic: Implementeer dead letter queue voor failed events
- Event Schema Versioning: Behandel backward compatibility
- Async Processing: Verwerk events asynchroon om timeouts te voorkomen
Wanneer Webhooks Gebruiken?
✅ Ideal voor: Real-time data sync, event-driven architecture, instant notifications
⚠️ Niet ideaal voor: Historische data backfill, zeer hoge volumes (>1000 events/sec)
Pattern 3: GraphQL voor Complexe Data
GraphQL Data Extraction met Python
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import pandas as pd
from typing import List, Dict
class GraphQLExtractor:
def __init__(self, endpoint: str, api_key: str):
transport = AIOHTTPTransport(
url=endpoint,
headers={'Authorization': f'Bearer {api_key}'}
)
self.client = Client(transport=transport, fetch_schema_from_transport=True)
async def fetch_nested_data(self, query: str, variables: Dict = None):
"""Fetch data met complexe GraphQL query"""
gql_query = gql(query)
try:
result = await self.client.execute_async(
gql_query,
variable_values=variables
)
return result
except Exception as e:
print(f"GraphQL error: {e}")
raise
def create_ecommerce_query(self):
"""Genereer complexe e-commerce query"""
return """
query GetOrdersWithDetails($startDate: String!, $endDate: String!) {
orders(filter: {
dateRange: { start: $startDate, end: $endDate }
status: [COMPLETED, PROCESSING]
}) {
id
orderNumber
createdAt
totalAmount
customer {
id
email
firstName
lastName
address {
street
city
zipCode
country
}
}
items {
product {
id
name
sku
category {
name
parentCategory {
name
}
}
}
quantity
unitPrice
totalPrice
}
payments {
method
amount
status
transactionId
}
shipping {
method
cost
trackingNumber
estimatedDelivery
}
}
analytics: orderAnalytics(
startDate: $startDate,
endDate: $endDate
) {
totalOrders
totalRevenue
averageOrderValue
topProducts {
productId
productName
quantitySold
revenue
}
}
}
"""
async def extract_ecommerce_data(self, start_date: str, end_date: str):
"""Extract complete e-commerce dataset in één call"""
query = self.create_ecommerce_query()
variables = {
"startDate": start_date,
"endDate": end_date
}
result = await self.fetch_nested_data(query, variables)
# Transformeer naar pandas DataFrames
orders_df = pd.json_normalize(result['orders'])
analytics_df = pd.DataFrame([result['analytics']])
return {
'orders': orders_df,
'analytics': analytics_df
}
Pattern 4: Real-time Streaming APIs
WebSocket Streaming
import asyncio
import websockets
import json
import pandas as pd
from datetime import datetime
from collections import deque
class WebSocketStreamer:
def __init__(self, uri: str, max_buffer: int = 10000):
self.uri = uri
self.buffer = deque(maxlen=max_buffer)
self.running = False
async def connect_and_stream(self):
"""Maak verbinding en stream real-time data"""
self.running = True
async with websockets.connect(self.uri) as websocket:
# Subscribe naar events
await websocket.send(json.dumps({
"action": "subscribe",
"channels": ["trades", "orderbook"]
}))
# Ontvang en verwerk messages
async for message in websocket:
if not self.running:
break
data = json.loads(message)
await self.process_message(data)
async def process_message(self, data: dict):
"""Verwerk incoming message"""
event_type = data.get('type')
if event_type == 'trade':
trade_data = {
'timestamp': datetime.now(),
'symbol': data['symbol'],
'price': float(data['price']),
'quantity': float(data['quantity']),
'side': data['side']
}
self.buffer.append(trade_data)
# Batch verwerking elke 100 trades
if len(self.buffer) >= 100:
await self.process_batch()
elif event_type == 'orderbook_update':
# Update orderbook state
pass
async def process_batch(self):
"""Verwerk batch van trades"""
if not self.buffer:
return
batch = list(self.buffer)
df = pd.DataFrame(batch)
# Aggregaties berekenen
aggregations = df.groupby('symbol').agg({
'price': ['mean', 'min', 'max'],
'quantity': 'sum'
}).round(4)
# Sla op naar database of publish naar message queue
print(f"Verwerkt batch van {len(batch)} trades")
print(aggregations)
# Clear buffer na verwerking
self.buffer.clear()
def stop(self):
"""Stop de stream"""
self.running = False
Streaming Best Practices
- Connection Management: Implementeer reconnection logic
- Backpressure Handling: Buffer management bij hoge volumes
- State Management: Bewaar connection state voor recovery
- Monitoring: Track message rates en latency
- Batch Processing: Verwerk in batches voor efficiency
Error Handling & Resilience Patterns
Resilience Patterns voor API Integrations
1. Circuit Breaker Pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "HALF_OPEN"
else:
raise CircuitBreakerError("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
2. Retry met Exponential Backoff
import asyncio
import random
async def call_api_with_retry(func, max_retries=5, base_delay=1):
"""Retry met exponential backoff en jitter"""
for attempt in range(max_retries):
try:
return await func()
except (requests.exceptions.RequestException,
aiohttp.ClientError) as e:
if attempt == max_retries - 1:
raise
# Exponential backoff met jitter
delay = base_delay * (2 ** attempt)
jitter = random.uniform(0, 0.1 * delay)
wait_time = delay + jitter
print(f"Retry {attempt + 1}/{max_retries} in {wait_time:.2f}s")
await asyncio.sleep(wait_time)
3. Dead Letter Queue
class DeadLetterQueue:
def __init__(self, storage_path="dlq/"):
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
async def store_failed_message(self,
endpoint: str,
payload: dict,
error: str):
"""Sla gefaald bericht op voor later onderzoek"""
filename = f"{self.storage_path}/{datetime.now().isoformat()}.json"
failed_message = {
'endpoint': endpoint,
'payload': payload,
'error': str(error),
'timestamp': datetime.now().isoformat(),
'retry_count': 0
}
with open(filename, 'w') as f:
json.dump(failed_message, f, indent=2)
return filename
Monitoring & Observability
| Metric | Tooling | Alert Threshold | Actie |
|---|---|---|---|
| API Latency | Prometheus, Datadog | > 2 seconden P95 | Circuit breaker activeren |
| Error Rate | ELK Stack, Splunk | > 5% (5 min window) | Retry logic testen |
| Rate Limit Usage | Custom metrics | > 80% van limit | Throttling implementeren |
| Data Freshness | Great Expectations | Data > 15 min oud | Pipeline restart |
| Memory Usage | Grafana, CloudWatch | > 80% van memory | Buffer size aanpassen |
Conclusie en Aanbevelingen
API Integration Patterns Samenvatting
Voor Batch Processing
- REST + Pagination: Historische data extraction
- Checkpointing: Incrementele updates
- Parallel Extraction: Gebruik asyncio voor snelheid
Voor Real-time
- Webhooks: Event-driven architecture
- WebSockets: Live streaming data
- Message Queues: Buffer voor hoge volumes
Voor Complexe Data
- GraphQL: Nested relations, flexible queries
- gRPC: High-performance microservices
- API Composition: Combineer meerdere endpoints
Praktische Tips voor Nederlandse Data Engineers
Voor RDW-achtige APIs: Gebruik batch polling met checkpointing voor grote historische datasets. Implementeer exponential backoff voor rate limiting en sla incremental updates op in Parquet format voor efficiente querying.
Voor E-commerce APIs: Combineer webhooks voor real-time order updates met GraphQL voor complexe product/category queries. Gebruik circuit breakers tijdens piekbelasting (Black Friday).
Voor Financial Data: Implementeer WebSockets voor real-time market data met backpressure handling. Sla raw streams op in time-series databases zoals TimescaleDB voor compliance en analytics.