Observability and Monitoring - Deep Dive
This guide covers comprehensive observability practices for modern distributed systems, including the three pillars of observability, monitoring strategies, and practical implementation techniques.
Part 1: The Three Pillars of Observability
What is Observability?
Definition: Observability is the ability to understand the internal state of a system by examining its external outputs. It goes beyond traditional monitoring by enabling you to ask arbitrary questions about your system's behavior.
The Three Pillars:
- Metrics - Numerical data aggregated over time
- Logs - Discrete events with contextual information
- Traces - Request flows through distributed systems
1. Metrics - The Foundation
Types of Metrics:
// Counter - Always increasing values
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
)
// Gauge - Current value that can go up or down
var (
activeConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_connections",
Help: "Number of active database connections",
},
)
)
// Histogram - Distribution of values
var (
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
)
)
// Summary - Similar to histogram but calculates quantiles
var (
requestSize = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "HTTP request size in bytes",
Objectives: map[float64]float64{
0.5: 0.05, // 50th percentile with 5% error
0.9: 0.01, // 90th percentile with 1% error
0.99: 0.001, // 99th percentile with 0.1% error
},
},
[]string{"method"},
)
)
Implementation Example:
// Comprehensive metrics instrumentation
package main
import (
"context"
"fmt"
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type MetricsMiddleware struct {
requestsTotal *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
requestsInFlight prometheus.Gauge
}
func NewMetricsMiddleware() *MetricsMiddleware {
m := &MetricsMiddleware{
requestsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total HTTP requests processed",
},
[]string{"method", "endpoint", "status"},
),
requestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration",
Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2.5, 5, 10},
},
[]string{"method", "endpoint"},
),
requestsInFlight: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "http_requests_in_flight",
Help: "Current number of HTTP requests being processed",
},
),
}
// Register metrics
prometheus.MustRegister(m.requestsTotal)
prometheus.MustRegister(m.requestDuration)
prometheus.MustRegister(m.requestsInFlight)
return m
}
func (m *MetricsMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
m.requestsInFlight.Inc()
defer m.requestsInFlight.Dec()
// Wrap response writer to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}
next.ServeHTTP(wrapped, r)
duration := time.Since(start).Seconds()
statusCode := strconv.Itoa(wrapped.statusCode)
m.requestsTotal.WithLabelValues(r.Method, r.URL.Path, statusCode).Inc()
m.requestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
2. Structured Logging - Context and Events
Logging Best Practices:
// Structured logging with context
package main
import (
"context"
"time"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)
// Logger with consistent structure
type AppLogger struct {
logger *logrus.Logger
}
func NewAppLogger() *AppLogger {
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "level",
logrus.FieldKeyMsg: "message",
},
})
return &AppLogger{logger: logger}
}
func (l *AppLogger) WithContext(ctx context.Context) *logrus.Entry {
entry := l.logger.WithFields(logrus.Fields{})
// Add trace information if available
span := trace.SpanFromContext(ctx)
if span.SpanContext().IsValid() {
entry = entry.WithFields(logrus.Fields{
"trace_id": span.SpanContext().TraceID().String(),
"span_id": span.SpanContext().SpanID().String(),
})
}
// Add user context if available
if userID := ctx.Value("user_id"); userID != nil {
entry = entry.WithField("user_id", userID)
}
// Add request ID if available
if requestID := ctx.Value("request_id"); requestID != nil {
entry = entry.WithField("request_id", requestID)
}
return entry
}
// Usage examples
func (l *AppLogger) LogUserAction(ctx context.Context, action string, details map[string]interface{}) {
l.WithContext(ctx).WithFields(logrus.Fields{
"action": action,
"details": details,
"component": "user_service",
}).Info("User action performed")
}
func (l *AppLogger) LogError(ctx context.Context, err error, component string) {
l.WithContext(ctx).WithFields(logrus.Fields{
"error": err.Error(),
"component": component,
}).Error("Operation failed")
}
func (l *AppLogger) LogPerformance(ctx context.Context, operation string, duration time.Duration, metadata map[string]interface{}) {
l.WithContext(ctx).WithFields(logrus.Fields{
"operation": operation,
"duration_ms": duration.Milliseconds(),
"metadata": metadata,
"component": "performance",
}).Info("Performance metric recorded")
}
Advanced Logging Patterns:
# Python structured logging with correlation IDs
import logging
import json
import time
import uuid
from contextlib import contextmanager
from contextvars import ContextVar
# Context variables for request tracking
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': time.time(),
'level': record.levelname,
'message': record.getMessage(),
'logger': record.name,
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add context variables
if request_id_var.get():
log_entry['request_id'] = request_id_var.get()
if user_id_var.get():
log_entry['user_id'] = user_id_var.get()
if trace_id_var.get():
log_entry['trace_id'] = trace_id_var.get()
# Add any extra fields
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
return json.dumps(log_entry)
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger.addHandler(handler)
return logger
# Context manager for request tracking
@contextmanager
def request_context(user_id=None):
request_id = str(uuid.uuid4())
request_id_token = request_id_var.set(request_id)
user_id_token = None
if user_id:
user_id_token = user_id_var.set(user_id)
try:
yield request_id
finally:
request_id_var.reset(request_id_token)
if user_id_token:
user_id_var.reset(user_id_token)
# Usage
logger = setup_logger(__name__)
def process_user_request(user_id: str, action: str):
with request_context(user_id=user_id):
logger.info("Processing user request", extra={
'extra_fields': {
'action': action,
'component': 'user_service'
}
})
try:
# Business logic here
result = perform_action(action)
logger.info("Request completed successfully", extra={
'extra_fields': {
'action': action,
'result_count': len(result),
'component': 'user_service'
}
})
except Exception as e:
logger.error("Request failed", extra={
'extra_fields': {
'action': action,
'error': str(e),
'error_type': type(e).__name__,
'component': 'user_service'
}
})
raise
3. Distributed Tracing - Request Flow Visibility
OpenTelemetry Implementation:
// Comprehensive distributed tracing setup
package main
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
oteltrace "go.opentelemetry.io/otel/trace"
)
// Initialize tracing
func initTracing(serviceName string) func() {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
if err != nil {
panic(fmt.Sprintf("failed to initialize jaeger exporter: %v", err))
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String("v1.0.0"),
attribute.String("environment", "production"),
)),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
return func() { _ = tp.Shutdown(context.Background()) }
}
// Service layer with tracing
type UserService struct {
tracer oteltrace.Tracer
database DatabaseInterface
cache CacheInterface
}
func NewUserService(db DatabaseInterface, cache CacheInterface) *UserService {
return &UserService{
tracer: otel.Tracer("user-service"),
database: db,
cache: cache,
}
}
func (s *UserService) GetUser(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "UserService.GetUser")
defer span.End()
// Add attributes to span
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("operation", "get_user"),
)
// Check cache first
user, err := s.getUserFromCache(ctx, userID)
if err == nil {
span.SetAttributes(attribute.Bool("cache.hit", true))
return user, nil
}
span.SetAttributes(attribute.Bool("cache.hit", false))
// Fallback to database
user, err = s.getUserFromDatabase(ctx, userID)
if err != nil {
span.RecordError(err)
span.SetAttributes(attribute.String("error.type", "database_error"))
return nil, fmt.Errorf("failed to get user: %w", err)
}
// Cache the result
go s.cacheUser(context.Background(), user)
span.SetAttributes(attribute.Bool("user.found", user != nil))
return user, nil
}
func (s *UserService) getUserFromCache(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "UserService.getUserFromCache")
defer span.End()
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("cache.operation", "get"),
)
return s.cache.Get(ctx, userID)
}
func (s *UserService) getUserFromDatabase(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "UserService.getUserFromDatabase")
defer span.End()
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("db.operation", "select"),
attribute.String("db.table", "users"),
)
start := time.Now()
user, err := s.database.GetUser(ctx, userID)
span.SetAttributes(attribute.Int64("db.duration_ms", time.Since(start).Milliseconds()))
return user, err
}
// HTTP handler with tracing
func (s *UserService) HandleGetUser(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID := r.URL.Query().Get("user_id")
ctx, span := s.tracer.Start(ctx, "HTTP GET /user")
defer span.End()
span.SetAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("http.user_agent", r.UserAgent()),
attribute.String("user.id", userID),
)
if userID == "" {
span.SetAttributes(attribute.Int("http.status_code", 400))
http.Error(w, "user_id is required", http.StatusBadRequest)
return
}
user, err := s.GetUser(ctx, userID)
if err != nil {
span.RecordError(err)
span.SetAttributes(
attribute.Int("http.status_code", 500),
attribute.String("error.message", err.Error()),
)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
span.SetAttributes(attribute.Int("http.status_code", 200))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
}
Part 2: Advanced Monitoring Strategies
Performance Optimizations for High-Volume Metrics Collection
Problem: Traditional metrics collection can become a bottleneck in high-throughput systems due to memory allocations and inefficient data structures.
Solution: Use object pools and optimized data structures:
// BEFORE - Inefficient approach
type SlowMetricsCollector struct {
metrics []map[string]interface{} // Creates many small allocations
}
func (smc *SlowMetricsCollector) AddMetric(name string, value float64) {
// Creates new map for each metric - expensive
metric := map[string]interface{}{
"name": name,
"value": value,
"timestamp": time.Now().Unix(),
}
smc.metrics = append(smc.metrics, metric)
}
// AFTER - Use object pools and typed structs
import (
"sync"
"time"
)
type MetricsCollector struct {
metricPool sync.Pool
metrics []Metric
mu sync.RWMutex
}
type Metric struct {
Name string
Value float64
Timestamp int64
}
func NewMetricsCollector() *MetricsCollector {
mc := &MetricsCollector{
metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
}
mc.metricPool = sync.Pool{
New: func() interface{} {
return &Metric{}
},
}
return mc
}
func (mc *MetricsCollector) AddMetric(name string, value float64) {
metric := mc.metricPool.Get().(*Metric)
metric.Name = name
metric.Value = value
metric.Timestamp = time.Now().Unix()
mc.mu.Lock()
mc.metrics = append(mc.metrics, *metric)
mc.mu.Unlock()
// Return to pool for reuse
mc.metricPool.Put(metric)
}
func (mc *MetricsCollector) GetMetrics() []Metric {
mc.mu.RLock()
defer mc.mu.RUnlock()
// Return copy to avoid race conditions
result := make([]Metric, len(mc.metrics))
copy(result, mc.metrics)
return result
}
func (mc *MetricsCollector) Reset() {
mc.mu.Lock()
mc.metrics = mc.metrics[:0] // Keep underlying array, reset length
mc.mu.Unlock()
}
Continuous CPU Profiling Integration
Implementation for production monitoring:
// CPU profiling integration
import (
"fmt"
"log"
"os"
"runtime/pprof"
"time"
)
func enableContinuousProfiling() {
// Enable continuous CPU profiling
if os.Getenv("ENABLE_PROFILING") == "true" {
go func() {
for {
f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create CPU profile: %v", err)
time.Sleep(30 * time.Second)
continue
}
pprof.StartCPUProfile(f)
time.Sleep(30 * time.Second)
pprof.StopCPUProfile()
f.Close()
// Upload to object storage for analysis
uploadProfile(f.Name())
}
}()
}
}
// Helper function to upload profiles to cloud storage
func uploadProfile(filename string) {
// Implementation would upload to S3, GCS, etc.
log.Printf("Uploading profile %s to cloud storage", filename)
// Clean up local file after upload
defer os.Remove(filename)
}
// Memory profiling function
func captureMemoryProfile() {
f, err := os.Create(fmt.Sprintf("mem-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create memory profile: %v", err)
return
}
defer f.Close()
if err := pprof.WriteHeapProfile(f); err != nil {
log.Printf("Could not write memory profile: %v", err)
return
}
uploadProfile(f.Name())
}
Golden Signals Monitoring
The Four Golden Signals implementation:
// Golden Signals metrics implementation
type GoldenSignalsCollector struct {
// Latency
requestDuration *prometheus.HistogramVec
// Traffic
requestsPerSecond *prometheus.CounterVec
// Errors
errorRate *prometheus.CounterVec
// Saturation
resourceUtilization *prometheus.GaugeVec
}
func NewGoldenSignalsCollector() *GoldenSignalsCollector {
return &GoldenSignalsCollector{
requestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Request latency in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"method", "endpoint", "status_class"},
),
requestsPerSecond: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total HTTP requests",
},
[]string{"method", "endpoint"},
),
errorRate: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_errors_total",
Help: "Total HTTP errors",
},
[]string{"method", "endpoint", "status_code"},
),
resourceUtilization: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "resource_utilization_percent",
Help: "Resource utilization percentage",
},
[]string{"resource_type", "instance"},
),
}
}
Part 3: Implementation Examples
Microservices Implementation:
// Metrics Collector Service (Go)
package main
import (
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-redis/redis/v8"
)
type MetricsCollector struct {
kafka *kafka.Producer
redis *redis.Client
handlers map[string]MetricHandler
}
func (mc *MetricsCollector) CollectMetric(metric Metric) error {
// Process metric
processed := mc.handlers[metric.Type].Process(metric)
// Publish to message queue for other services
return mc.kafka.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &metric.Type,
Partition: kafka.PartitionAny,
},
Value: processed.ToJSON(),
}, nil)
}
# Data Processor Service (Python)
import asyncio
import json
from kafka import KafkaConsumer
from typing import Dict, Any
class DataProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'metrics-topic',
bootstrap_servers=['kafka:9092'],
group_id='data-processors',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
async def process_metrics(self):
for message in self.consumer:
metric = Metric.from_dict(message.value)
# Process and aggregate
aggregated = await self.aggregate_metric(metric)
# Store in time-series database
await self.store_metric(aggregated)
# Trigger alerts if needed
await self.check_alerts(aggregated)
async def aggregate_metric(self, metric: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for metric aggregation
return {
'name': metric['name'],
'value': metric['value'],
'timestamp': metric['timestamp'],
'aggregation_window': '5m',
'tags': metric.get('tags', {})
}
async def store_metric(self, metric: Dict[str, Any]):
# Store in InfluxDB or similar time-series database
pass
async def check_alerts(self, metric: Dict[str, Any]):
# Check alert thresholds and notify if needed
pass
Monolithic Implementation:
# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
from typing import Dict, Any
class AnalyticsDashboard:
def __init__(self):
self.metrics = metrics.MetricsModule()
self.processor = processing.ProcessingModule()
self.api = api.APIModule()
self.notifications = notifications.NotificationModule()
def handle_metric(self, raw_metric: Dict[str, Any]) -> Dict[str, Any]:
# All in same process - no network calls
metric = self.metrics.parse(raw_metric)
processed = self.processor.aggregate(metric)
# Check for alerts
if self.processor.check_thresholds(processed):
self.notifications.send_alert(processed)
return processed
def get_dashboard_data(self, time_range: str) -> Dict[str, Any]:
# Single process - can share data structures efficiently
raw_data = self.metrics.get_data(time_range)
processed_data = self.processor.prepare_dashboard_data(raw_data)
return {
'metrics': processed_data,
'alerts': self.notifications.get_active_alerts(),
'system_health': self.api.get_health_status()
}
# Single deployment with clear module boundaries
# Can be extracted to separate services later
When to Choose Each Approach:
Factor | Monolith | Microservices |
---|---|---|
Team Size | < 10 developers | > 10 developers |
Complexity | Simple-medium | Complex domain |
Scale | < 1M requests/day | > 10M requests/day |
Technology | Single stack preferred | Multiple technologies needed |
Deployment | Weekly/monthly | Multiple times per day |
Data Consistency | Strong consistency needed | Eventual consistency OK |
For Analytics Dashboard Specifically:
- Early Stage: Start with modular monolith
- Growth Stage: Extract high-scale components (metrics collector) first
- Mature Stage: Full microservices with proper DevOps practices
2. Database Sharding Implementation
Strong Answer: Library Analogy Explanation:
📚 Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout
📚📚📚 Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z
Each library (shard) operates independently but part of same system
Sharding Strategy for Analytics Dashboard:
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List, Optional
import concurrent.futures
class DatabaseSharding:
def __init__(self):
self.shards = {
'shard_americas': {
'host': 'db-americas.example.com',
'regions': ['us', 'ca', 'mx', 'br'],
'connection': self.create_connection('db-americas')
},
'shard_europe': {
'host': 'db-europe.example.com',
'regions': ['uk', 'de', 'fr', 'es'],
'connection': self.create_connection('db-europe')
},
'shard_asia': {
'host': 'db-asia.example.com',
'regions': ['jp', 'sg', 'au', 'in'],
'connection': self.create_connection('db-asia')
}
}
# Time-based sharding for metrics
self.time_shards = {
'metrics_current': 'Last 7 days - hot data',
'metrics_recent': 'Last 30 days - warm data',
'metrics_archive': 'Older than 30 days - cold data'
}
def get_user_shard(self, user_id: str) -> str:
"""Determine shard based on user ID hash"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_value % len(self.shards)
return list(self.shards.keys())[shard_index]
def get_region_shard(self, region: str) -> str:
"""Determine shard based on geographical region"""
for shard_name, shard_info in self.shards.items():
if region.lower() in shard_info['regions']:
return shard_name
return 'shard_americas' # Default fallback
def get_time_shard(self, timestamp: datetime.datetime) -> str:
"""Determine shard based on data age"""
now = datetime.datetime.now()
age = now - timestamp
if age.days <= 7:
return 'metrics_current'
elif age.days <= 30:
return 'metrics_recent'
else:
return 'metrics_archive'
def route_query(self, query_type: str, **kwargs):
"""Route queries to appropriate shard"""
if query_type == 'user_orders':
shard = self.get_user_shard(kwargs['user_id'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'regional_metrics':
shard = self.get_region_shard(kwargs['region'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'historical_data':
# Query multiple time shards and aggregate
return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])
elif query_type == 'cross_shard_analytics':
# Fan-out query to all shards
return self.fan_out_query(query_type, **kwargs)
def query_time_shards(self, start_date, end_date):
"""Query across time-based shards"""
results = []
for shard_name in self.time_shards.keys():
try {
shard_result = self.execute_query(shard_name, 'time_range_query',
start_date=start_date, end_date=end_date)
results.extend(shard_result)
} catch (ShardUnavailableError) {
// Handle shard failures gracefully
this.log_shard_failure(shard_name)
continue
}
return this.aggregate_results(results)
def fan_out_query(self, query_type: str, **kwargs):
"""Execute query across all shards and aggregate results"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
# Submit queries to all shards concurrently
future_to_shard = {
executor.submit(this.execute_query, shard_name, query_type, **kwargs): shard_name
for shard_name in this.shards.keys()
}
for future in concurrent.futures.as_completed(future_to_shard):
shard_name = future_to_shard[future]
try {
result = future.result(timeout=30) # 30 second timeout
results[shard_name] = result
} catch (Exception e) {
this.log_query_failure(shard_name, e)
results[shard_name] = None
}
return this.aggregate_cross_shard_results(results)
def create_connection(self, shard_name: str):
"""Create database connection for shard"""
# Implementation would create actual DB connection
pass
def execute_query(self, shard_name: str, query_type: str, **kwargs):
"""Execute query on specific shard"""
# Implementation would execute actual query
pass
def log_shard_failure(self, shard_name: str):
"""Log shard failure for monitoring"""
pass
def log_query_failure(self, shard_name: str, error: Exception):
"""Log query failure for monitoring"""
pass
def aggregate_results(self, results: List):
"""Aggregate results from multiple shards"""
pass
def aggregate_cross_shard_results(self, results: Dict):
"""Aggregate results from cross-shard queries"""
pass
# Shard-aware query examples
class ShardedAnalyticsQueries:
def __init__(self, sharding: DatabaseSharding):
self.sharding = sharding
def get_user_orders(self, user_id: str):
"""Get orders for specific user"""
return self.sharding.route_query('user_orders', user_id=user_id)
def get_regional_sales(self, region: str, date_range: tuple):
"""Get sales data for specific region"""
return self.sharding.route_query('regional_metrics',
region=region,
start_date=date_range[0],
end_date=date_range[1])
def get_global_metrics(self, metric_type: str):
"""Get global metrics across all shards"""
return self.sharding.route_query('cross_shard_analytics',
metric_type=metric_type)
def get_historical_trends(self, days_back: int):
"""Get historical data across time shards"""
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)
return self.sharding.query_time_shards(start_date, end_date)
Sharding Implementation with PostgreSQL:
-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 2: Europe
CREATE TABLE orders_europe (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 3: Asia
CREATE TABLE orders_asia (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_europe
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');
CREATE USER MAPPING FOR postgres
SERVER shard_europe
OPTIONS (user 'analytics_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
id UUID,
user_id UUID,
region VARCHAR(2),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');
-- View for cross-shard queries
CREATE VIEW orders_global AS
SELECT 'americas' as shard, * FROM orders_americas
UNION ALL
SELECT 'europe' as shard, * FROM orders_europe_remote
UNION ALL
SELECT 'asia' as shard, * FROM orders_asia_remote;